Springboot整合RabbitMq,详细使用步骤
- 1 添加springboot-starter依赖
- 2 添加连接配置
- 3 在启动类上添加开启注解`@EnableRabbit`
- 4 创建RabbitMq的配置类,用于创建交换机,队列,绑定关系等基础信息。
- 5 生产者推送消息
- 6 消费者接收消息
- 7 生产者的消息回调机制
- 8 消费者的确认机制
消息队列(Message Queue)是一种应用间的通信方式。顾名思义,将消息放到队列中,排队发出。消息发布者只管把消息发布到MQ中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。
而且消息队列一般有完整的接收确认,发布消息回调等一系列机制,可以确保接收方一定能接受。
用到的场景如:异步处理,应用解耦,流量削锋和消息通讯等。
以下先详细介绍下springboot项目怎么使用RabbitMq
1 添加springboot-starter依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2 添加连接配置
以下几项是最基础的配置,其他配置下面用到时额外添加
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest #默认用户名和密码
password: guest
virtual-host: / # 虚拟主机
3 在启动类上添加开启注解@EnableRabbit
4 创建RabbitMq的配置类,用于创建交换机,队列,绑定关系等基础信息。
可以直接在java代码中通过注入实体类的方式创建交换机及队列等设备。但此方式添加的’设备‘是懒加载的形式,只要当使用到识别到监听注解或调用发送消息的方法时,才会真在rabbitmq中创建。
可以定位到amqp依赖的源码看到在程序启动的时候并不创建连接,只有在添加了监听注解启动程序或要发送消息时,才会走创建连接的方法。
配置类的示例代码如下:
@Configuration
public class RabbitConfig {
/**
* 队列
*/
@Bean
Queue createDirectQueue(){
/**
* durable:是否持久化,默认是false。true为持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在;false为暂存队列:当前连接有效。
* exclusive:默认也是false。true是只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable。
* autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
* 一般设置一下队列的持久化就好,其余两个就是默认false
*/
//两种创建方式
//QueueBuilder.durable("queue.test1").build();
return new Queue("queue.test1",true,false,false);
}
/**
* 交换机
*/
@Bean
DirectExchange createDirectExchange(){
/**
* durable、autoDelete参数性质和上面队列的一致
*/
return new DirectExchange("direct.test1",true,false);
}
/**
* 将队列和交换机绑定, 并设置用于匹配键
*/
@Bean
Binding binding(){
return BindingBuilder.bind(createDirectQueue()).to(createDirectExchange()).with("testRoute");
}
}
以上是以直连交换机为例,创建其他交换机写法一样,具体对应哪个实体类可以在Exchange
接口 —>AbstractExchange
实现类下看到。
可以通过客户端看到队列、交换机、路由关系已经创建成功
5 生产者推送消息
@Autowired
RabbitTemplate rabbitTemplate;
@PostMapping("/sendMessage")
public AjaxResult sendMessage(@RequestBody Map params) {
String id = UUID.randomUUID().toString();
String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
params.put("messageId",id);
params.put("createTime",createTime);
/**
* 发给交换机,在发给路由绑定的队列
*/
rabbitTemplate.convertAndSend("direct.test1","testRoute",params);
return AjaxResult.success("成功");
}
可以看到,rabbitmq成功接收到消息。
6 消费者接收消息
@Component
@RabbitListener(queues = "queue.test1")
public class Receiver {
@RabbitHandler
public void process(Map message){
System.out.printf("消费者接收到消息:" + message.toString());
}
}
可以看到消息成功被消费,监听处理方法也成功被执行
如果多个监听器监听同一个队列,是轮询的方式进行消费,不会出现重复消费的情况;如果多个队列同时以相同的路由绑定同一个交换机,消息会以复制的形式发送至每个队列。
7 生产者的消息回调机制
在实际运用中,作为消息的生产者,很多时候我们需要确认消息是否成功发送到了mq中。同时我们还需要知道,假如消息出现异常时的异常情况。为了满足这个业务场景,我们就需要配置消息回调。
-
增加配置项
spring: rabbitmq: publisher-confirm-type: correlated #消息发送成功交互 publisher-returns: true
可能之前老的版本是
publisher-confirm:true
,但现在写的话会发现变红了,说明过时了。因为在springboot的自动配置依赖里该配置级别已经为error
了
-
目前回调包含发送成功回调
ConfirmCallback
和失败回调ReturnsCallback
。一些老版本的可能有ReturnCallback
。下面先自定义两个回调的回调方法ConfirmCallback的回调
/** * 消息发送成功回调 */ public class RabbitConfirmCallBack implements RabbitTemplate.ConfirmCallback { /** * 消息成功到达exchange,ack=true * @param correlationData * @param ack * @param s */ @Override public void confirm(CorrelationData correlationData, boolean ack, String s) { System.out.println("相关数据:" + correlationData); System.out.println("确认状态:" + ack); System.out.println("造成原因:" + s); } }
ReturnsCallback的回调
/** * 发生异常时的消息返回提醒 */ public class RabbitReturnsCallback implements RabbitTemplate.ReturnsCallback { @Override public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("失败回调:" + returnedMessage); } }
将自定义回调配置到模板中
在Rabbit配置类中添加
RabbitTemplate
并配置两个回调@Configuration public class RabbitConfig { @Bean public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate = new RabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数 rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(new RabbitConfirmCallBack()); rabbitTemplate.setReturnsCallback(new RabbitReturnsCallback()); return rabbitTemplate; } }
那以上两种回调函数什么时候回执行呢?
- 消息发送到exchange,且传播到队列,则只有ConfirmCallback回调,ack=true
- 消息发送不到exchange,则只有ConfirmCallback回调,ack=false
- 消息发送到exchange,没传播到队列(或找不到路由),则ConfirmCallback回调,ack=true、ReturnsCallback回调
由此可见ConfirmCallback
回调是exchange的一种反馈,是发生在生产者和交换机之间的,无论能不能发到都会回调。消息发送出去如果收到交换机的确认反馈则回调为成功,如果没有收到确认反馈,则回调为失败。
ReturnsCallback
回调是队列的一种反馈,是发生在交换机和队列之间的。只有消息先到达交换机,且发送到队列失败才会执行此回调。
下面是对以上三种情况的测试
-
消息完全成功发送到队列
模拟:交换机和路由都存在
@PostMapping("/sendMessage") public AjaxResult sendMessage(@RequestBody Map params) { String id = UUID.randomUUID().toString(); String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); params.put("messageId",id); params.put("createTime",createTime); //direct.test1和testRoute都存在 rabbitTemplate.convertAndSend("direct.test1","testRoute",params); return AjaxResult.success("成功"); }
消费者监听且
ConfirmCallback
回调为true
-
消息没有发送到exchange
模拟:交换机不存在
@PostMapping("/sendMessageFailByNoExchange") public AjaxResult sendMessageFailByNoExchange(@RequestBody Map params) { String id = UUID.randomUUID().toString(); String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); params.put("messageId",id); params.put("createTime",createTime); //该交换机不存在 rabbitTemplate.convertAndSend("direct.exchange不存在","testRoute",params); return AjaxResult.success("成功"); }
ConfirmCallback
回调为false
-
消息发送到exchange,但没发送到队列
模拟:该交换机存在但该路由不存在
@PostMapping("/sendMessageFailByNoRoute") public AjaxResult sendMessageFailByNoRoute(@RequestBody Map params) { String id = UUID.randomUUID().toString(); String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); params.put("messageId",id); params.put("createTime",createTime); //交换机存在但该路由不存在 rabbitTemplate.convertAndSend("direct.test1","failRoute",params); return AjaxResult.success("成功"); }
ConfirmCallback
回调为true,ReturnsCallback
失败回调执行
可以通过两个回调确定哪些消息没有成功发送到队列,记录下来再次发送,保证消息不丢失。
8 消费者的确认机制
消费者和生产者不同,消费者本身就是凭自己喜好,符合条件才会消费。
所有消费者的确认机制有三种模式:
-
自动确认
是默认的消息确认模式,即mq成功将消息发出,消费者成功接收到,就反馈确认。不管消费者是不是已经成功处理。
所以如果处理逻辑抛出异常,就相当于丢失了消息。
一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。
-
手动确认
-
自动确认
自动确认没什么好说的,消费者确认机制的默认模式就是auto,自动反馈确认,所以可以看到只要消息被消费了队列中就不存在了。
-
手动确认
消费者收到消息后,手动调用
basic.ack/basic.nack/basic.reject
后,RabbitMQ收到这些消息后,才认为本次投递成功。- basic.ack:确认正确
- basic.nack:拒绝确认,可以选择是否重新发回队列、是否批处理
- basic.reject:拒绝确认,可以选择是否重新发回队列
后两者对应的方法为
channel.basicNack
和channel.basicReject
两者都表示消息没有被正常处理。其中有个参数requeue
,选择是否重新入队,开启此项可以避免消息丢失。但开启要慎重,如果使用不当会导致一些每次都被你重入列的消息一直消费-入列-消费-入列这样循环,导致消息积压。
两者有略微的区别
channel.basicNack
可以拒绝多个消息,channel.basicReject
只能拒绝一个下面看下代码怎么实现
如果使用的是
RabbitListener
注解,需要将ackMode设置为手动模式ackMode="MANUAL"
三个种情况分别对应下面 【1、2、3】三个方法
@RabbitHandler @RabbitListener(queues = "queue.test1",ackMode = "MANUAL") public void processQueueTest1(Map param, Message message, Channel channel) throws IOException { /** * 【1 确认】 * deliveryTag:消息的标识符 * multiple: * false:仅确认当前消息 * true:确认所有消息 */ channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); /** * 【2 拒绝】 * 第一个参数是消息的唯一ID * 第二个参数表示是否批量处理 * 第三个参数表示是否将消息重发回队列 */ //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); /** * 【3 拒绝】 * 第一个参数deliveryTag表示消息ID * 第二个参数为true表示是否重新入列,如果是true则重新丢回队列里等待再次消费,否则数据只是被消费,不会丢回队列里 */ //channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); System.out.println("queue.test1消费者接收到消息:" + param.toString()); System.out.println("message:" + message); System.out.println("channel:" + channel); }
channel.basicAck确认
以上代码中
channel.basicAck
是消费者向rabbitmq发送确认消息。向queue.test1
队列发送消息,此时开启了手动确认,如果不写此行,队列中会一直存在一条Unacked(未确认)
的消息。
执行了channel.basicAck
消息才会被消费,如下图已经无滞留消息。
channel.basicNack、channel.basicReject否认
可以看到拒绝消息之后,因为requeue
参数为true,消息会被重新入队,入队后再次等待被消费者消费。如果requeue
设为false的话则队列中该消息就是已经被消费。一般情况可以单独记录下,在轮询发送到队列。