Direct消息模型
* 路由模型:
* 一个交换机可以绑定多个队列
* 生产者给交换机发送消息时,需要指定消息的路由键
* 消费者绑定队列到交换机时,需要指定所需要消费的信息的路由键
* 交换机会根据消息的路由键将消息转发到对应的队列
* 缺点:
* 当消息很多的时候,需要指定的路由键也会很多,究极复杂。
生产者
package com. example. demo02. mq. direct ;
import com. example. demo02. mq. util. ConnectionUtils ;
import com. rabbitmq. client. BuiltinExchangeType ;
import com. rabbitmq. client. Channel ;
import com. rabbitmq. client. Connection ;
import java. io. IOException ;
public class DirectSender {
public static void main ( String [ ] args) throws Exception {
Connection connection = ConnectionUtils . getConnection ( ) ;
Channel channel = connection. createChannel ( ) ;
channel. exchangeDeclare ( "direct.exchange" , BuiltinExchangeType . DIRECT , false ) ;
String msg1 = "{To DirectReceiver1: orderId:1001}" ;
String msg2 = "{To DirectReceiver2: orderId:1002}" ;
channel. basicPublish ( "direct.exchange" , "order.save" , null , msg1. getBytes ( ) ) ;
channel. basicPublish ( "direct.exchange" , "order.update" , null , msg2. getBytes ( ) ) ;
channel. close ( ) ;
connection. close ( ) ;
}
}
消费者1
package com. example. demo02. mq. direct ;
import com. example. demo02. mq. util. ConnectionUtils ;
import com. rabbitmq. client. * ;
import java. io. IOException ;
public class DirectReceiver1 {
public static void main ( String [ ] args) throws Exception {
Connection connection = ConnectionUtils . getConnection ( ) ;
Channel channel = connection. createChannel ( ) ;
channel. exchangeDeclare ( "direct.exchange" , BuiltinExchangeType . DIRECT , false ) ;
channel. queueDeclare ( "direct.queue1" , false , false , false , null ) ;
channel. queueBind ( "direct.queue1" , "direct.exchange" , "order.save" ) ;
Consumer consumer = new DefaultConsumer ( channel) {
@Override
public void handleDelivery ( String consumerTag, Envelope envelope, AMQP. BasicProperties properties, byte [ ] body) throws IOException {
System . out. println ( "DirectReceiver1接收到的新增订单消息是:" + new String ( body) ) ;
channel. basicAck ( envelope. getDeliveryTag ( ) , false ) ;
}
} ;
channel. basicConsume ( "direct.queue1" , false , consumer) ;
}
}
消费者2
package com. example. demo02. mq. direct ;
import com. example. demo02. mq. util. ConnectionUtils ;
import com. rabbitmq. client. * ;
import java. io. IOException ;
public class DirectReceiver2 {
public static void main ( String [ ] args) throws Exception {
Connection connection = ConnectionUtils . getConnection ( ) ;
Channel channel = connection. createChannel ( ) ;
channel. exchangeDeclare ( "direct.exchange" , BuiltinExchangeType . DIRECT , false ) ;
channel. queueDeclare ( "direct.queue2" , false , false , false , null ) ;
channel. queueBind ( "direct.queue2" , "direct.exchange" , "order.update" ) ;
Consumer consumer = new DefaultConsumer ( channel) {
@Override
public void handleDelivery ( String consumerTag, Envelope envelope, AMQP. BasicProperties properties, byte [ ] body) throws IOException {
System . out. println ( "DirectReceiver2接收到的修改订单消息:" + new String ( body) ) ;
channel. basicAck ( envelope. getDeliveryTag ( ) , false ) ;
}
} ;
channel. basicConsume ( "direct.queue2" , false , consumer) ;
}
}
结果