Fanout消息模型
* 广播模型:
* 一个交换机绑定多个队列
* 每个队列都有一个消费者
* 每个消费者消费自己队列中的消息,每个队列的信息是一样的
生产者
package com. example. demo02. mq. fanout ;
import com. example. demo02. mq. util. ConnectionUtils ;
import com. rabbitmq. client. BuiltinExchangeType ;
import com. rabbitmq. client. Channel ;
import com. rabbitmq. client. Connection ;
import java. io. IOException ;
public class FanoutSender {
public static void main ( String [ ] args) throws Exception {
Connection connection = ConnectionUtils . getConnection ( ) ;
Channel channel = connection. createChannel ( ) ;
channel. exchangeDeclare ( "fanout.exchange" , BuiltinExchangeType . FANOUT , false ) ;
String msg = "fanout message" ;
channel. basicPublish ( "fanout.exchange" , "" , null , msg. getBytes ( ) ) ;
channel. close ( ) ;
connection. close ( ) ;
}
}
消费者1
package com. example. demo02. mq. fanout ;
import com. example. demo02. mq. util. ConnectionUtils ;
import com. rabbitmq. client. * ;
import java. io. IOException ;
public class FanoutReceiver1 {
public static void main ( String [ ] args) throws Exception {
Connection connection = ConnectionUtils . getConnection ( ) ;
Channel channel = connection. createChannel ( ) ;
channel. exchangeDeclare ( "fanout.exchange" , BuiltinExchangeType . FANOUT , false ) ;
channel. queueDeclare ( "fanout.queue1" , false , false , false , null ) ;
channel. queueBind ( "fanout.queue1" , "fanout.exchange" , "" ) ;
Consumer consumer = new DefaultConsumer ( channel) {
@Override
public void handleDelivery ( String consumerTag, Envelope envelope, AMQP. BasicProperties properties, byte [ ] body) throws IOException {
System . out. println ( "Fanout1接收到的消息是:" + new String ( body) ) ;
channel. basicAck ( envelope. getDeliveryTag ( ) , false ) ;
}
} ;
channel. basicConsume ( "fanout.queue1" , false , consumer) ;
}
}
消费者2
package com. example. demo02. mq. fanout ;
import com. example. demo02. mq. util. ConnectionUtils ;
import com. rabbitmq. client. * ;
import java. io. IOException ;
public class FanoutReceiver2 {
public static void main ( String [ ] args) throws Exception {
Connection connection = ConnectionUtils . getConnection ( ) ;
Channel channel = connection. createChannel ( ) ;
channel. exchangeDeclare ( "fanout.exchange" , BuiltinExchangeType . FANOUT , false ) ;
channel. queueDeclare ( "fanout.queue2" , false , false , false , null ) ;
channel. queueBind ( "fanout.queue2" , "fanout.exchange" , "" ) ;
Consumer consumer = new DefaultConsumer ( channel) {
@Override
public void handleDelivery ( String consumerTag, Envelope envelope, AMQP. BasicProperties properties, byte [ ] body) throws IOException {
System . out. println ( "Fanout2接收到的消息是:" + new String ( body) ) ;
channel. basicAck ( envelope. getDeliveryTag ( ) , false ) ;
}
} ;
channel. basicConsume ( "fanout.queue2" , false , consumer) ;
}
}
结果