RabbitMQ
RabbitMQ
- channel:操作MQ的工具
- exchange:交换机,路由消息到队列中
- queue:队列,缓存消息
- virtual host:虚拟主机,对queue,exchange等资源的逻辑分组
MQ模型
- 基本消息队列
- 工作消息队列
- 发布订阅,根据交换机类型不同分为三种:
- 广播
- 路由
- 主题
HelloWord案例
基本消息队列的消息发送流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 利用channel向队列发送消息
基本消息队列的消息接收流程:
- 建立connection
- 创建channel
- 利用channel声明队列
- 定义consumer的消费行为handleDelivery()
- 利用channel将消费者与队列绑定
发送消息:
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.163.129");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!222";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
接收消息:
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.163.129");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
SpringAMQP
HelloWord案例
发送消息
- 引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置文件
spring:
rabbitmq:
host: 192.168.163.129 #RabbitMQ的地址
username: itcast # 用户名
password: 123456 # 密码
port: 5672 # RabbitMQ的端口
virtual-host: / # 虚拟主机
- 发送消息
@Test
public void testAMQP() {
String queueName = "simple.queue";
String msg = "hello,this is amqp";
rabbitTemplate.convertAndSend(queueName,msg);
}
接收消息
- 引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置文件
spring:
rabbitmq:
host: 192.168.163.129 #RabbitMQ的地址
username: itcast # 用户名
password: 123456 # 密码
port: 5672 # RabbitMQ的端口
virtual-host: / # 虚拟主机
- 编写一个类
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue") //指定队列名称
public void listen(String msg){
System.out.println("接收到了消息:-->" + msg);
}
}
WorkQueue案例
消费预取限制
当有多个消费者向队列中预取消息的时候,每个消费者性能不同导致一些消费者很快处理完消息,一些消费者很慢处理导致消息堆积,可以对性能差的消费者进行消费预取限制,不让他拿太多消息。如果不限制的话,比如有50条消息,两个消费者每个25,快的很快处理完25,慢的堆积处理
spring:
rabbitmq:
host: 192.168.163.129 #RabbitMQ的地址
username: itcast # 用户名
password: 123456 # 密码
port: 5672 # RabbitMQ的端口
virtual-host: / # 虚拟主机
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完才能获取下一条
生产者:
@Test
public void testAMQPWorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String msg = "hello,this is amqp";
for (int i =1 ;i <=50 ;i++){
rabbitTemplate.convertAndSend(queueName,msg + i);
Thread.sleep(20);
}
}
消费者:
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1 接收到了消息:-->" + msg + "====" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2 接收到了消息:-->" + msg + "====" + LocalTime.now());
Thread.sleep(200);
}
Publish/Subscribe
发布订阅模式与之前案例的区别就是允许将同一消息发送给多个消费者。实现方式是加入了exchange(交换机)。
exchange负责消息路由,而不负责存储,如果路由失败则消息丢失
发布订阅-Fanout Exchange
Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue(广播)
- 编写配置类
@Configuration
public class FanoutConfig {
/**
* 声明一个交换机,名字叫:itcast.fanout
* @return
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
/**
* 声明一个队列,名字叫:fanout.queue1
* @return
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 将交换机跟队列绑定
* @param fanoutExchange
* @param fanoutQueue1
* @return 绑定关系对象binding
*/
@Bean
public Binding bindingFanoutExchange(FanoutExchange fanoutExchange,Queue fanoutQueue1){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Binding bindingFanoutExchange2(FanoutExchange fanoutExchange,Queue fanoutQueue2){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
- 编写生产者
@Test
public void testSendFanoutExchange() {
// 交换机名称
String exchangeName = "itcast.fanout";
// 消息内容
String msg = "hello,everyone";
rabbitTemplate.convertAndSend(exchangeName,"",msg);
}
- 编写消费者
/**
* 广播模式
* @param msg
* @throws InterruptedException
*/
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) throws InterruptedException {
System.err.println("消费者1监听:fanout.queue1 接收到了消息:-->" + msg + "====" + LocalTime.now());
Thread.sleep(200);
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) throws InterruptedException {
System.err.println("消费者2监听:fanout.queue2 接收到了消息:-->" + msg + "====" + LocalTime.now());
Thread.sleep(200);
}
发布订阅-DirectExchange
Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式 (routes)。
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
- 消费者不选择用声明bean的形式了,采用注解形式
/**
* 路由模式
* @param msg
* @throws InterruptedException
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct"),
key = {"blue","red"}
))
public void listenDirectQueue1(String msg) throws InterruptedException {
System.err.println("消费者1监听:direct.queue1 接收到了消息:-->" + msg + "====" + LocalTime.now());
Thread.sleep(200);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct"),
key = {"red","yellow"}
))
public void listenDirectQueue2(String msg) throws InterruptedException {
System.err.println("消费者2监听:direct.queue2 接收到了消息:-->" + msg + "====" + LocalTime.now());
Thread.sleep(20);
}
- 生成者
@Test
public void testSendDirectExchangeRed() {
// 交换机名称
String exchangeName = "itcast.direct";
// 消息内容
String redMsg = "hello,everyone,this is redMsg";
rabbitTemplate.convertAndSend(exchangeName,"red",redMsg);
String yellowMsg = "hello,everyone,this is yellowMsg";
rabbitTemplate.convertAndSend(exchangeName,"yellow",yellowMsg);
String blueMsg = "hello,everyone,this is blueMsg";
rabbitTemplate.convertAndSend(exchangeName,"blue",blueMsg);
}
消费者1监听:direct.queue1 接收到了消息:-->hello,everyone,this is redMsg====20:55:30.302807
消费者2监听:direct.queue2 接收到了消息:-->hello,everyone,this is redMsg====20:55:30.302807
消费者2监听:direct.queue2 接收到了消息:-->hello,everyone,this is yellowMsg====20:55:30.336717400
消费者1监听:direct.queue1 接收到了消息:-->hello,everyone,this is blueMsg====20:55:30.505301300
路由模式跟广播模式有什么区别?
- 广播模式会将消息推送给于交换机绑定的所有队列
- 路由模式只会将消息推送给与交换机绑定的,且带有指定key的队列
- 如果与交换机绑定的所有队列都带有同一个key,那么指定这个key推送的时候就跟广播模式一样了
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"), //消费者所在的队列
exchange = @Exchange(name = "itcast.direct"), //消费者所在队列绑定的交换机
key = {"red","yellow"} //消费者所在队列与交换机绑定的key,后续生产者发送消息时,交换机根据key推送消息
))
发布订阅-TopicExchange
- 消费者
/**
* 主题模式
* @param msg
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue1"),
exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.err.println("消费者1监听:topic.queue1 接收到了消息:-->" + msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topic.queue2"),
exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.err.println("消费者2监听:topic.queue2 接收到了消息:-->" + msg);
}
- 生产者
@Test
public void testSendTopicExchangeRed() {
// 交换机名称
String exchangeName = "itcast.topic";
// 消息内容
String redMsg = "这个应该输出两边";
rabbitTemplate.convertAndSend(exchangeName,"china.news",redMsg);
String yellowMsg = "这个输出在消费者1";
rabbitTemplate.convertAndSend(exchangeName,"china.foods",yellowMsg);
String blueMsg = "这个输出在消费者2";
rabbitTemplate.convertAndSend(exchangeName,"usa.news",blueMsg);
}
消费者2监听:topic.queue2 接收到了消息:-->这个应该输出两边
消费者1监听:topic.queue1 接收到了消息:-->这个应该输出两边
消费者2监听:topic.queue2 接收到了消息:-->这个输出在消费者2
消费者1监听:topic.queue1 接收到了消息:-->这个输出在消费者1
路由模式跟主题模式有什么区别?
- 路由模式是一个单词的,例如:blue
- 主题模式是由多个单词拼接起来的,例如:china.gaungzhou.news
- 主题模式在消费者队列绑定交换机的时候是支持通配符*#的,
消息转化器
MessageConverter,因为在发布和订阅的时候,接收内容的参数一直都是object,说明是可以发对象的,而底层都是通过字节码发送的,所以需要将java对象转化成字节码(序列化)发布出去,且订阅的时候需要将字节码转换成对象接收(反序列化),而spring是通过jdk自带的字节码转化器ObjectOutputStream
,不好用,可以使用jack提供的MessageConverter
生产者跟消费者要用一样的消息转换器
- 生产者
<!-- jack依赖,用来覆盖spring自带的jdk的序列化-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
@Bean //配置类,配置序列化方式,类似的在redis中用到过,同样是对象转字节
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@Test
public void testSendObjectMsg() {
List<String> msg = new ArrayList<>();
msg.add("1");
msg.add("2");
msg.add("3");
rabbitTemplate.convertAndSend("ObjectQueue",msg);
}
- 消费者
<!-- jack依赖,用来接收消息,反序列化-->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@RabbitListener(queues = "ObjectQueue")
public void listener(List<String> msg){ //消费者接收方式要跟生产者发送方式一样,比如都是List
System.err.println("消费者 接收到了消息:-->" + msg);
}
服务异步通讯
MQ的常见问题:
- 消息的可靠性问题:如何确保发送的消息至少被消费一次
- 延迟消息问题:如何实现消息的延迟投递
- 消息堆积问题:如何解决数百万消息堆积,无法及时消费的问题
- 高可用问题:如何避免单点的MQ故障而导致的不可用问题
消息可靠性
什么时候有可能会出现消息丢失的问题呢?
- 发送消息时丢失:
- 生产者发送的消息未到达交换机
- 消息到交换机后未到达队列
- MQ宕机,消息在队列中丢失
- 消费者宕机,消费者收到消息后未消费就宕机
生产者消息确认
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:
publisher-confirm,发送者确认
- 消息成功投递到交换机,返回ack
- 消息未投递到交换机,返回nack
publisher-return,发送者回执
- 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
确认消息发送的时候,需要给每个消息都设置一个全局唯一id,用于区分不同的消息,避免ack冲突
实现
- 引入依赖
spring:
rabbitmq:
#生产者确认消息
publisher-confirm-type: correlated #publisher-confirm-type:开启publisher-confirm,
#simple:同步等待confirm结果,直到超时。correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-returns: true #开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback。只有开启了这个才有返回结果
template:
mandatory: true #定义消息路由失败时的策略。true,则调用ReturnCallback;false: 则直接丢弃消息
#到底返不返回结果,由这个决定,如果为false,返回失败就之久丢弃了
- 编写ReturnCallBack,这个是全局唯一的类,只有一种情况会调用这个ReturnCallBack,就是经过交换机但是没有到达队列
/**
* ApplicationContextAware spring的通知接口,spring的bean工厂准备好了之后就会调用这个接口
*/
@Configuration
@Slf4j
public class CommonConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 从bean工厂里获取RabbitTemplate对象
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 给RabbitTemplate对象配置ReturnCallBack,生产者发送消息的返回结果,生产者消息回执
// 什么时候会调用这个方法?消息经过交换机后,没有路由到队列
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.error("消息发送到队列失败,响应码:{},失败原因:{},交换机:{},路由key:{},消息:{}",
replyCode,replyText,exchange,routingKey, message);
//可以选择重发消息
}
});
}
}
- 编写CallBack,这个每次发消息都可以是不同的,针对不同的消息,检测ACK消息到达队列,和NACK消息没到交换机
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
String message = "hello, spring amqp!";
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(new SuccessCallback<CorrelationData.Confirm>() {
@Override
public void onSuccess(CorrelationData.Confirm confirm) {
if (confirm.isAck()){
//ACK
log.debug("消息成功投递到交换机,消息id是:" + correlationData.getId());
}else {
//NACK
log.error("消息没到交换机,消息id是:" + correlationData.getId());
}
}
}, new FailureCallback() {
@Override
public void onFailure(Throwable throwable) {
log.error("消息发送失败-->" + throwable);
}
});
rabbitTemplate.convertAndSend("amq.topic", "simple.queue", message,correlationData);
}
返回结果:
-
没有错误,消息发送到了队列
-
消息成功投递到交换机,消息id是:9db3541c-7c35-4db6-8c68-39522e48d3dc
-
-
消息没有到达交换机
-
消息没到交换机,消息id是:884d0de4-31c3-49be-834a-4e17aaf73a04
-
-
消息到达了交换机,但是没到消息队列
-
消息发送到队列失败,响应码:312,失败原因:NO_ROUTE,交换机:amq.topic,路由key:4simple.queue,消息:(Body:'hello, spring amqp!' MessageProperties [headers={spring_returned_message_correlation=dedbc4da-2a6f-4bd3-b073-1a6a5e0ac900}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]
-
消息持久化
默认情况下,通过Spring创建的交换机,队列,消息都是默认持久化的。new创建的时候会有两个参数传进去。
第二个是持久化开启,第三个是独占队列,第四个是是否自动删除,即没有使用这个队列的时候自动删除。
public Queue(String name) {
this(name, true, false, false);
}
交换机同理,消息是经过了convert将消息处理了(持久化)
消费者消息确认
RabbitMQ支持消费者确认机制,即: 消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。
SpringAMQP则允许配置三种确认模式:
- manual:手动ack,需要在业务代码结束后,调用api发送ack。
- auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack; 抛出异常则返回nack
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
spring:
rabbitmq:
listener:
simple:
prefetch: 1
acknowledge-mode: auto #none关闭,auto自动ack,manual手动ack
失败重试机制
当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue无限循环,导致mq的消息处理飙升,带来不必要的压力。
我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true #开启消费者失败重试机制
initial-interval: 1000 #初次的失败等待时长为1000ms
multiplier: 3 # 下次失败的等待时长倍数,下次等待时长=multiplier * last-interval,比如0 1 3
max-attempts: 3 # 最大重试次数
stateless: true # true无状态,false有状态。如果业务中包含事务,这里改为false
消费者失败消息处理策略
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
@Configuration
public class ErrorMessageConfig {
@Bean
public DirectExchange directExchange(){
return new DirectExchange("error.direct");
}
@Bean
public Queue errorQueue(){
return new Queue("error.queue");
}
@Bean
public Binding errorBinding(){
return BindingBuilder.bind(errorQueue()).to(directExchange()).with("error");
}
/**
* 用来接收错误的交换机消息,并且通过交换机名称,路由key来重新发送消息。
* @param rabbitTemplate
* @return
*/
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
}
}
如何确保RabbitMQ消息的可靠性
从三个方面回答:生产者方面,MQ方面,消费者方面
- 生产者方面:
- 开启生产者确认机制,确保生产者的消息能够到达队列
- MQ方面:
- 开启持久化机制,确保消息在未消费前在队列中不会丢失
- 消费者方面:
- 开启消费者消息确认机制为auto,由Spring确认消息处理成功后返回ack,失败后返回nack
- 开启消费者失败重试机制,并且设置MessageRecoverer,多次失败后将消息投递到异常交换机由人工处理
延迟消息问题
死信交换机
当一个队列中的消息满足下列情况之一时,可以成为死信 (dead letter) :
- 消费者使用basicreject或 basic.nack声明消费失败,并且消息的requeue参数设置为false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机 (Dead Letter Exchange,简称DLX)。
什么样的消息会称为死信消息?
- nack了或者reject,且requeue=false
- 消息超时未消费
- 队列满了,早期的消息就会成为死信消息
如何给队列绑定死信交换机
- 给队列设置dead-letter-exchange属性,指定一个交换机
- 给队列设置dead-letter-routing-key属性,设置死信交换机与死信队列的RoutingKey
死信消息默认是被丢弃的
其实死信交换机跟RepublishMessageRecoverer有点类似
- 死信交换机是由队列将消息投递到死信交换机
- RepublishMessageRecoverer是由消费者将消息投递到错误异常交换机
TTL
TTL,也就是Time-To-Live。如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:
- 消息所在的队列设置了存活时间
- 消息本身设置了存活时间
意味着如果一个消息设置了ttl=5000ms,那这个消息从进入队列的那一刻就开始计时5s,一旦到了5s,就成为死信消息进入死信交换机,由死信交换机通过死信队列投递到监听的消费者。实现了延迟队列的效果
实现
有两种实现办法:
- 消息队列本来就有TTL,超时这个消息就自动变成死信消息了
- 消息本身就有TTL,消息超时也变成了死信消息
队列TTL
//给ttl队列绑定ttl交换机和dl死信交换机
@Configuration
public class TTLMessageConfig {
@Bean
public Queue ttlQueue(){
return QueueBuilder.durable("ttl.queue") //指定队列名称,且持久化
.ttl(10000) //队列的超时时间
.deadLetterExchange("dl.direct") //绑定死信交换机
.deadLetterRoutingKey("dl") //死信交换机的RoutingKey
.build();
}
@Bean
public DirectExchange TTLDirectExchange(){
return new DirectExchange("ttl.direct");
}
@Bean
public Binding DLBinding(DirectExchange TTLDirectExchange, Queue ttlQueue){
return BindingBuilder.bind(ttlQueue).to(TTLDirectExchange).with("ttl");
}
@Bean
public Queue DLQueue(){
return new Queue("dl.queue");
}
@Bean
public DirectExchange DLDirectExchange(){
return new DirectExchange("dl.direct");
}
@Bean
public Binding binding(Queue DLQueue,DirectExchange DLDirectExchange){
return BindingBuilder.bind(DLQueue).to(DLDirectExchange).with("dl");
}
}
消息TTL
@Test
public void testDDLMessage() {
String msg = "通过死信交换机实现延迟队列,消息延迟5s";
Message message = MessageBuilder
.withBody(msg.getBytes(StandardCharsets.UTF_8))
.setExpiration("5000")
.build();
String key = "ttl";
// 生产者消息确认机制
// 消息的唯一id
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// lambada表达式
correlationData.getFuture().addCallback(confirm -> {
if (confirm.isAck()){
log.debug("消息投递到了交换机,消息id: " + correlationData.getId());
}else {
log.debug("消息投递失败,返回了nack,消息id: " + correlationData.getId());
}
}, throwable -> {
log.debug("消息投递异常,消息id: " + correlationData.getId());
});
rabbitTemplate.convertAndSend("ttl.direct", key, message, correlationData);
log.debug("消息成功发送");
}
延迟队列
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(DelayQueue)模式。
延迟队列的使用场景包括:
- 延迟发送短信
- 用户下单,如果用户在15 分钟内未支付,则自动取消
- 预约工作会议,20分钟后自动通知所有参会人员
下载插件DelayExchange
相比于之前的声明,不同就是需要多一个属性delayed,其他一样
@Bean
public Queue DelayedQueue(){
return new Queue("delayed.queue");
}
/**
* 声明具有延迟功能的交换机
* @return
*/
@Bean
public DirectExchange DelayedDirectExchange(){
return ExchangeBuilder
.directExchange("delayed.direct")
.delayed()
.durable(true) //持久化
.build();
}
@Bean
public Binding DelayedBinding(Queue DelayedQueue,DirectExchange DelayedDirectExchange){
return BindingBuilder.bind(DelayedQueue).to(DelayedDirectExchange).with("delayed");
}
与之前的发送消息相比,不同的是多了一个头信息setHeader(“x-delay”,3000)
@Test
public void testDelayedMessage() {
String msg = "通过插件实现延迟队列,消息延迟3s";
Message message = MessageBuilder
.withBody(msg.getBytes(StandardCharsets.UTF_8))
.setHeader("x-delay",3000) //延迟3000ms
.build();
String key = "delayed";
rabbitTemplate.convertAndSend("delayed.direct", key, message, correlationData);
log.debug("消息成功发送");
}
消息堆积
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。
解决消息堆积有三种种思路:
- 增加更多消费者,提高消费速度
- 在消费者内开启线程池加快消息处理速度
- 扩大队列容积,提高堆积上限
惰性队列
从RabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
@Bean
public Queue LazyQueue(){
return QueueBuilder.durable("lazy.queue")
.lazy()
.build();
}
非持久化的数据也会写入磁盘,只是有条件的:等内存不足的情况下才会被写入到磁盘中。
持久化消息在到达队列时写入磁盘,同时会在内存中保存一份备份,当内存吃紧时,消息从内存中清除。这会提高一定的性能。非持久化消息一般只存于内存中,当内存压力大时,数据刷盘处理,以节省内存空间。
MQ集群
RabbitMO的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式。RabbitMO的集群有两种模式:
- 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。
- 如果其中一个节点宕机了,那那个节点上的消息和队列就丢失了
- 镜像集群:是一种主从集群,普通集群的基础上,添加了主从备份功能,提高集群的数据可用性
- 仲裁集群
普通集群
在一个节点的创建的消息可以同步给集群中其他节点,其他节点可以看到并且接收这个消息。
因为其他节点都有这个消息的原信息,知道这个消息是在哪个节点上,如果有消费者在其他节点上消费这个消息,那这些节点会到消息的原节点上取消息给消费者。