1. 引入jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置yml
2.1 配置生产者yml
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: / # 虚拟主机
username: guest
password: guest
publisher-returns: true #开启发送失败退回
# simple:同步等待confirm结果,直到超时
#correlated:异步回调,次你故意ConfirmCallback,MQ返回结果时会回调这个ComfirmCallback
publisher-confirm-type: correlated
2.2 配置消费者yml
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: / # 虚拟主机
username: guest
password: guest
cloud:
stream:
bindings:
delayed-topic-input:
destination: delayed-topic-demo #将消费者队列绑定到指定交换机
group: group-1
#消费默认分组,消息到达时同一个分组下多个实例情况,只会有一个实例消费这条消息
consumer:
delayed-exchange: true #开启延时,生产者和消费者端都需要开启这个配置
3.生产者生产消息
3.1 direct 直连
把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中
3.1.1 直连队列消息发送
/***直接交换机 **/
public static final String directExchange = "directExchangeOne";
public static final String routingKey1 = "directKey1";
public static final String routingKey2 = "directKey2";
public static final String directQueue1 = "directQueueOne";
public static final String directQueue2 = "directQueueTwo";
/**
* 直接交换机 一个交换机可以绑定一个队列一个消费者,也可以绑定多个队列多个消费者
* 通过指定路由键directRouting发送给交换机directExchange
* 交互机directExchange通过指定的路由键把消息msg投递到对应的队列上面去
* @param map
*/
public void directToQueue(Map<String, String> map) {
map.put("direct-路由key:",RabbitConstants.routingKey1);
rabbitTemplate.convertAndSend(RabbitConstants.directExchange, RabbitConstants.routingKey1, map);
map.put("direct-路由key:",RabbitConstants.routingKey2);
rabbitTemplate.convertAndSend(RabbitConstants.directExchange, RabbitConstants.routingKey2, map);
}
3.1.2 直连队列消息绑定
package rabbit.config;
import config.RabbitConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 配置类 : 创建我们的直接交换机和队列,以及直接交换机跟队列的绑定关系
* direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去
*
* */
@Configuration
public class DirectConfig {
/**
* Direct Exchange 是 RabbitMQ 默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列
* @return
*/
@Bean
public DirectExchange directExchangeOne(){
return new DirectExchange(RabbitConstants.directExchange);
}
@Bean
public Queue directQueueOne(){
return new Queue(RabbitConstants.directQueue1);
}
@Bean
public Queue directQueueTwo(){
return new Queue(RabbitConstants.directQueue2);
}
/**
* 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息
* 路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
* direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去
* @param directQueueOne
* @param directExchangeOne
* @return
*/
@Bean
public Binding directBindingOne(Queue directQueueOne, DirectExchange directExchangeOne){
return BindingBuilder.bind(directQueueOne).to(directExchangeOne).with(RabbitConstants.routingKey1);
}
/**
* 交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息
* 路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
* direct 类型的行为是”先匹配, 再投送”. 即在绑定时设定一个 routing_key, 消息的routing_key 匹配时, 才会被交换器投送到绑定的队列中去
* @param directQueueTwo
* @param directExchangeOne
* @return
*/
@Bean
public Binding directBindingTwo(Queue directQueueTwo, DirectExchange directExchangeOne) {
return BindingBuilder.bind(directQueueTwo).to(directExchangeOne).with(RabbitConstants.routingKey2);
}
}
3.1.3 直连队列消息接收
@RabbitListener(queues = RabbitConstants.directQueue1)
@RabbitHandler // 指定对消息的处理
public void directClientOne(HashMap<String,String> mes){
System.out.println("直连队列消息1:" + mes);
}
/** @RabbitListener(queues = {"directQueue1","directQueue2"}):这样就可以一次消费两条消息 **/
@RabbitListener(queues = RabbitConstants.directQueue2)
@RabbitHandler
public void directClientTwo(HashMap<String,String> mes){
System.out.println("直连队列消息2: " + mes);
}
3.1.4 结果:
3.2 fanout 扇形
把消息发送到所有与它绑定的Queue中,没有路由概念
3.2.1 扇形消息发送
@Autowired
public RabbitMqProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);
}
/***
* 扇形交换机
* 这个交换机没有路由键概念,就算你绑了路由键也是无视的
* 消息会发送到所有绑定的队列上。
* @param fanoutMap1
*/
public void fanoutToQueue(Map<String, String> fanoutMap1) {
fanoutMap1.put("fanout-交换机:",RabbitConstants.fanoutExchange1);
rabbitTemplate.convertAndSend(RabbitConstants.fanoutExchange1,null,fanoutMap1);
}
3.2.2 扇形消息绑定
/**
* 扇形交换机
* Fanout:转发消息到所有绑定队列,没有路由key
* */
@Configuration
public class FanoutConfig {
/**
* 不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了 routing_key 会被忽略。
* @return
*/
@Bean
public FanoutExchange fanoutExchange1(){
return new FanoutExchange(RabbitConstants.fanoutExchange1);
}
@Bean
public Queue fanoutQueue1(){
return new Queue(RabbitConstants.fanoutQueue1);
}
@Bean
public Queue fanoutQueue2(){
return new Queue(RabbitConstants.fanoutQueue2);
}
/** 扇形交换机没有路由key */
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange1){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange1);
}
/** 扇形交换机没有路由key */
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange1) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange1);
}
}
3.2.3 扇形消息接收
/** 扇形交换机 */
public static final String fanoutExchange1 = "fanout_exchange1";
public static final String fanoutQueue1 = "fanout_queue1";
public static final String fanoutQueue2 = "fanout_queue2";
@RabbitListener(queues = RabbitConstants.fanoutQueue1)
@RabbitHandler
public void fanoutQueue1(HashMap<String,String> fanoutMes){
System.out.println("扇形队列消息1: " + fanoutMes);
}
@RabbitListener(queues = RabbitConstants.fanoutQueue2)
@RabbitHandler
public void fanoutQueue2(HashMap<String,String> fanoutMes){
System.out.println("扇形队列消息2: " + fanoutMes);
}
3.2.4 扇形--结果
3.3 topic 主题
将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中--多了匹配的概念
3.3.1 主题队列消息发送
@Autowired
public RabbitMqProducer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);
}
/***主题交换机:模糊匹配队列
* *:星号表示任意一个字符
* #:表示任意一个或者多个字符
*/
// topic 的 routingKey
public static final String topicA = "helloTopic.world";
public static final String topicB = "helloTopic.#";
public static final String topicAll = "#";
public static final String topicExchange = "topic_exchange";
/** 绑定 topicA = "helloTopic.world"*/
public static final String topicQueue1 = "topic_queue1";
/** 绑定 topicB="helloTopic.#"*/
public static final String topicQueue2 = "topic_queue2";
/** 绑定 #,匹配所有 */
public static final String topicQueue3 = "topic_queue3";
/**
* 主题交换机:模糊匹配队列
* topic.# 可匹配topic topic.add topic.add.add
* topic.* 可匹配topic.add topic.delete
* @param map
*/
public void topicToQueue(Map<String, String> map) {
// 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息
map.put("Topic-路由key:",RabbitConstants.topicA);
rabbitTemplate.convertAndSend(RabbitConstants.topicExchange, RabbitConstants.topicA, map);
map.put("Topic-路由key:",RabbitConstants.topicB);
rabbitTemplate.convertAndSend(RabbitConstants.topicExchange, RabbitConstants.topicB, map);
map.put("Topic-路由key:",RabbitConstants.topicAll);
rabbitTemplate.convertAndSend(RabbitConstants.topicExchange, RabbitConstants.topicAll, map);
}
3.3.2 主题队列消息绑定
/***
* 按规则转发消息
*/
@Configuration
public class TopicConfig {
/**
* Topic Exchange 转发消息主要是根据通配符
* 用来接收生产者发送的消息并将这些消息路由给服务器中的队列中
* @return
*/
@Bean
public TopicExchange topicExchange1(){
return new TopicExchange(RabbitConstants.topicExchange);
}
@Bean
public Queue topicQueue1(){
return new Queue(RabbitConstants.topicQueue1);
}
@Bean
public Queue topicQueue2(){
return new Queue(RabbitConstants.topicQueue2);
}
@Bean
public Queue topicQueue3(){
return new Queue(RabbitConstants.topicQueue3);
}
/**
* 消息并不是直接被投递到 Queue(消息队列) 中的,中间还必须经过 Exchange(交换器) 这一层,
* Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中
* @param topicQueue1
* @param topicExchange1
* @return
*/
@Bean
public Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange1){
return BindingBuilder.bind(topicQueue1).to(topicExchange1).with(RabbitConstants.topicA);
}
@Bean
public Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange1){
return BindingBuilder.bind(topicQueue2).to(topicExchange1).with(RabbitConstants.topicB);
}
@Bean
public Binding topicBinding3(Queue topicQueue3, TopicExchange topicExchange1){
return BindingBuilder.bind(topicQueue3).to(topicExchange1).with(RabbitConstants.topicAll);
}
}
3.3.3 主题队列消息接收
@RabbitListener(queues = RabbitConstants.topicQueue1)
@RabbitHandler
public void topicQueue1(HashMap<String,String> topicMes){
System.out.println("主题消息队列1: " + topicMes);
}
@RabbitListener(queues = RabbitConstants.topicQueue2)
@RabbitHandler
public void topicQueue2(HashMap<String,String> topicMes){
System.out.println("主题消息队列2: " + topicMes);
}
@RabbitListener(queues = RabbitConstants.topicQueue3)
@RabbitHandler
public void topicQueue3(HashMap<String,String> topicMes){
System.out.println("主题消息队列匹配所有: " + topicMes);
}
3.3.4 主题--结果
3.4 Delayed 延时(需要延时插件,参考我另一篇插件安装)
3.4.1 延时队列消息发送
/** 延迟队列 */
public static final String DELAYED_EXCHANGE_NAME = "myDelayedExchange";
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_ROUTING_KEY = "delayed.routing.key";
/**
* 死信延迟队列
* @param message
*/
public void sendDelayedMessage(String message) {
System.out.println("Send time 开始: " + LocalDateTime.now());
rabbitTemplate.convertAndSend(RabbitConstants.DELAYED_EXCHANGE_NAME,
RabbitConstants.DELAYED_ROUTING_KEY,
message,
messagePostProcessor -> {
messagePostProcessor.getMessageProperties().setDelay(10000); // 设置消息的延长时间延,单位毫秒
return messagePostProcessor;
});
System.out.println("Send time 结束: " + LocalDateTime.now() );
}
3.4.2 延时队列消息绑定
public class DelayedConfig {
/** 定义一个延迟交换机 **/
@Bean
public CustomExchange delayedExchange() {
/*Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");*/
return new CustomExchange(RabbitConstants.DELAYED_EXCHANGE_NAME,
"x-delayed-message", // 消息类型 x-delayed-message
true, // 是否持久化
false); // 是否自动删除
}
/** 延时队列 **/
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable(RabbitConstants.DELAYED_QUEUE_NAME)
.withArgument("x-delayed-type", "direct")
.build();
}
/** 绑定队列到这个延迟交换机 */
@Bean
public Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {
return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(RabbitConstants.DELAYED_ROUTING_KEY).noargs();
}
}
3.4.3 延时队列消息接收
@RabbitListener(queues = RabbitConstants.DELAYED_QUEUE_NAME)
public void receiveDelayedMessage(String message, Channel channel) {
System.out.println("Received delayed message: " + message);
log.info("当前时间:{},接收时长信息给延迟队列:{}", LocalTime.now(),message);
System.out.println("Received time: " + LocalDateTime.now() + " Received: " + message);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}