当前位置: 首页 > >

RabbitMQ??路由模式

发布时间:

下面我们将要实现这个模型,所有的代码将都是以这个模型为基础:


direct:

首先,我们设置的routingKey是 error,那么按照路由规则,我们最终将向这两个队列发送消息:


生产者:


public class Send {

private static final String EXCHANGE_NAME = "test_exchange_direct";

public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitConnection.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
//send a msg
String msg = "hello direct";
String routingKey = "info"; //这里是定义的routingKey
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes());
System.out.println("send:"+msg);
channel.close();
connection.close();
}
}

消费者1:(他只能接受error


public class ReceiveOne {
private static final String QUEUE_NAME = "receive1_queue";
private static final String EXCHANGE_NAME = "test_exchange_direct";

public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

channel.basicQos(1);
String routingKey = "error"; //表示只能接收error
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, routingKey);

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [ConsumerOne is] Received + message + ");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); //这里是手动应答
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
}
}

消费者2:(能接收error,info 和 warning


public class ReceiveTwo {

private static final String QUEUE_NAME = "receive2_queue";
private static final String EXCHANGE_NAME = "test_exchange_direct";


public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = RabbitConnection.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

channel.basicQos(1);
// 设置了三种key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); //注意这里,我们设置了三个路由key
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [ConsumerTwo is] Received + message + ");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); //这里是手动应答
}
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });}
}

Topics:(模式匹配)



“#” 匹配一个或多个
“ * ” 匹配一个



用法和上面的差不多,具体可以看 官方例子 。



下面再补充一点例子,关于message属性的例子:
Send.java


public class Send {

private static final String QUEUE_NAME = "simple_mq";

public static void main(String[] args) throws IOException, TimeoutException {

Connection connection = RabbitConnection.getConnection();

Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello simple_mq";
Map headpro = new HashMap<>();
headpro.put("name","king");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("utf-8")
.expiration("10000")
.headers(headpro)
.build();

try {
channel.txSelect();
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());
channel.txCommit();
} catch (IOException e) {
channel.txRollback();
System.out.println("回滚~~");
}
System.out.println(" [x] Sent.......... + message + ");
channel.close();
connection.close();
}
}

这里我们主要看一下这段代码:


Map headpro = new HashMap<>();
headpro.put("name","king");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("utf-8")
.expiration("10000")
.headers(headpro)
.build();

try {
channel.txSelect();
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes());

我们以流式的方式给message添加各种属性



友情链接: