已经不需要为RabbitMQ交换机的离去而感到伤心了,接下来登场的是RabbitMQ-核心特性!!!
文章目录
- 核心特性
- 消息过期机制
- 消息确认机制
- 死信队列
核心特性
消息过期机制
官方文档:https://www.rabbitmq.com/ttl.html
可以给每条消息指定一个有效期,一段时间内未被消费者处理,就过期了
适用场景:清理过期数据
1)给队列中的所有消息指定过期时间
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 5000);
// args 指定参数
channel.queueDeclare(QUEUE_NAME, false, false, false, args);
如果在过期时间内,还没有消费者取消息,消息才会过期
注意,如果消息已经接收到,但是没确认,是不会过期的
消费者中给队列中所有消息设置过期时间:
public class TtlConsumer {
private final static String QUEUE_NAME = "ttl_queue";
public static void main(String[] argv) throws Exception {
// 创建连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 创建队列,指定消息过期参数
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 5000);
// args 指定参数
channel.queueDeclare(QUEUE_NAME, false, false, false, args);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 定义了如何处理消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
// 消费消息,会持续阻塞
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
2)给某条消息指定过期时间
// 给消息指定过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("1000")
.build();
channel.basicPublish("my-exchange", "routing-key", properties, message.getBytes(StandardCharsets.UTF_8));
生产者给某条消息指定过期时间
public class TtlProducer {
private final static String QUEUE_NAME = "ttl_queue";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// factory.setUsername();
// factory.setPassword();
// factory.setPort();
// 建立连接、创建频道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 发送消息
String message = "Hello World!";
// 给消息指定过期时间
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("1000")
.build();
channel.basicPublish("my-exchange", "routing-key", properties, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
消息确认机制
官方文档:https://www.rabbitmq.com/confirms.html
为保证消息成功被消费,rabbitmq提供了消息确认机制,当消费者收到消息后要给一个成功反馈:
●ack:消费成功
●nack:消费失败
●reject:拒绝
如果告诉 rabbitmq 服务器消费成功,服务器才会放心地移除消息。
支持配置 autoack,会自动执行 ack 命令,接收到消息立刻就成功了。
channel.basicConsume(queueName, true, xiaoyuDeliverCallback, consumerTag -> {
});
一般情况,建议 autoack 改为 false,根据实际情况,去手动确认。
指定确认某条消息:
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
第二个参数 multiple 批量确认:是指是否要一次性确认所有的历史消息直到当前这条消息
指定拒绝某条消息:
channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);
第 3 个参数表示是否重新入队,可用于重试
死信队列
官方文档:https://www.rabbitmq.com/dlx.html
为了保证消息的可靠性,比如每条消息都成功消费,需要提供一个容错机制,即:失败的消息怎么处理?
死信:过期的消息,拒收的消息,消息队列满了,处理失败的消息的统称
死信队列:专门处理死信的队列
死信交换机:专门给死信队列转发消息的交换机
示例场景:
实现:
1)创建死信交换机和死信队列,并且绑定关系
2)给失败之后需要容错处理的队列绑定死信交换机
3)可以给要容错的队列指定死信之后的转发规则,死信应该再转发到哪个死信队列
4)可以通过程序来读取死信队列中的消息,从而进行处理
生产者代码:
package com.yupi.springbootinit.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.util.Scanner;
public class DlxDirectProducer {
//死信交换机
private static final String DEAD_EXCHANGE_NAME = "dlx-direct-exchange";
//工作交换机
private static final String WORK_EXCHANGE_NAME = "direct2-exchange";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE_NAME, "direct");
// 创建laoban死信队列
String queueName = "laoban_dlx_queue";
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, DEAD_EXCHANGE_NAME, "laoban");
//创建waibao死信队列
String queueName2 = "waibao_dlx_queue";
channel.queueDeclare(queueName2, true, false, false, null);
channel.queueBind(queueName2, DEAD_EXCHANGE_NAME, "waibao");
DeliverCallback laobanDeliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
// 拒绝消息
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
System.out.println(" [laoban] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
DeliverCallback waibaoDeliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
// 拒绝消息
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
System.out.println(" [waibao] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, false, laobanDeliverCallback, consumerTag -> {
});
channel.basicConsume(queueName2, false, waibaoDeliverCallback, consumerTag -> {
});
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String userInput = scanner.nextLine();
String[] strings = userInput.split(" ");
if (strings.length < 1) {
continue;
}
String message = strings[0];
String routingKey = strings[1];
channel.basicPublish(WORK_EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + " with routing:" + routingKey + "'");
}
}
}
}
消费者代码:
在这里插入代码片package com.yupi.springbootinit.mq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.util.HashMap;
import java.util.Map;
public class DlxDirectConsumer {
private static final String DEAD_EXCHANGE_NAME = "dlx-direct-exchange";
private static final String WORK_EXCHANGE_NAME = "direct2-exchange";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(WORK_EXCHANGE_NAME, "direct");
//小狗的死信要转发到waibao这个死信队列
// 指定死信队列参数
Map<String, Object> args = new HashMap<>();
// 要绑定到哪个交换机
args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
// 指定死信要转发到哪个死信队列
args.put("x-dead-letter-routing-key", "waibao");
// 创建队列,随机分配一个队列名称
String queueName = "xiaodog_queue";
channel.queueDeclare(queueName, true, false, false, args);
channel.queueBind(queueName, WORK_EXCHANGE_NAME, "xiaodog");
//小猫的死信要转发到laoban这个死信队列
Map<String, Object> args2 = new HashMap<>();
args2.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
args2.put("x-dead-letter-routing-key", "laoban");
// 创建队列,随机分配一个队列名称
String queueName2 = "xiaocat_queue";
channel.queueDeclare(queueName2, true, false, false, args2);
channel.queueBind(queueName2, WORK_EXCHANGE_NAME, "xiaocat");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback xiaoyuDeliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
// 拒绝消息,并且不要重新将消息放回队列,只拒绝当前消息
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
System.out.println(" [xiaodog] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
DeliverCallback xiaopiDeliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
// 拒绝消息
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
System.out.println(" [xiaocat] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, false, xiaoyuDeliverCallback, consumerTag -> {
});
channel.basicConsume(queueName2, false, xiaopiDeliverCallback, consumerTag -> {
});
}
}