1.RabbitMQ安装
RabbitMQ Windows 安装、配置、使用 - 小白教程-腾讯云开发者社区-腾讯云下载erlang:http://www.erlang.org/downloads/https://cloud.tencent.com/developer/article/2192340
Windows 10安装RabbitMQ及延时消息插件rabbitmq_delayed_message_exchange - 民工黑猫 - 博客园安装RabbitMQ服务器 第一步:下载erlang 原因:RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang。下载地址:http://www.erlang.org/downloads 第二步:下载RabbitMQ 下载地址:https://https://www.cnblogs.com/yyee/p/14281111.html
2.生产端确保消息发送到交换机和路由键
application.yml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: CORRELATED # 交换机的确认
publisher-returns: true # 队列的确认
logging:
level:
com.atguigu.mq.config: info
RabbitConfig
@Configuration
@Slf4j
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
// 发送到交换机-成功或失败
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("confirm() 回调函数打印 correlationData:{}, ack:{}, cause:{}", correlationData, ack, cause);
}
// 发送到队列-失败
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("returnedMessage() 回调函数 msg:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}",
new String(returned.getMessage().getBody()), returned.getReplyCode(), returned.getReplyText(), returned.getExchange(), returned.getRoutingKey());
}
}
RabbitMQTest
public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
@Autowired
private RabbitTemplate rabbitTemplate;
//发消息-生产端确保消息发到交换机和路由键
rabbitTemplate.convertAndSend(EXCHANGE_DIRECT + "~", ROUTING_KEY + "~", "Message Test Confirm~~~ ~~~");
3.消费端手动ack
application.yml
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 把消息确认模式改为手动确认
prefetch: 1 # 每次从队列中取回消息的数量
MyMessageListener
public static final String QUEUE_NAME = "queue.order";
//接消息-消费端手动ack
//@RabbitListener(queues = {QUEUE_NAME})
public void processMessage(String dataString, Message message, Channel channel) throws IOException {
//获取消息的deliveryTag
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//核心操作
log.info("消费端 消息内容:" + dataString);
System.out.println(10 / 0);
//核心操作成功返ACK
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
//获取消息是否重复投递
Boolean redelivered = message.getMessageProperties().getRedelivered();
//核心操作失败返NACK
if (!redelivered) {
//第一次投递,重新放回队列 (requeue:是否重新放回队列)
channel.basicNack(deliveryTag, false, true);
//channel.basicReject(deliveryTag, true);
} else {
//重复投递,不重新放回队列 (requeue:是否重新放回队列)
channel.basicNack(deliveryTag, false, false);
//channel.basicReject(deliveryTag, false);
}
}
}
4.消息设置超时时间(通过消息后置处理器)
rabbitTemplate.convertAndSend(EXCHANGE_TIMEOUT, ROUTING_KEY_TIMEOUT, "Test timeout", message -> {
message.getMessageProperties().setExpiration("4000");
return message;
});
5.延迟消息(延迟插件)
Windows 10安装RabbitMQ及延时消息插件rabbitmq_delayed_message_exchange - 民工黑猫 - 博客园安装RabbitMQ服务器 第一步:下载erlang 原因:RabbitMQ服务端代码是使用并发式语言Erlang编写的,安装Rabbit MQ的前提是安装Erlang。下载地址:http://www.erlang.org/downloads 第二步:下载RabbitMQ 下载地址:https://https://www.cnblogs.com/yyee/p/14281111.htmlRabbitMQ延迟插件下载地址
Community Plugins | RabbitMQ RabbitMQ设置延迟消息的交换机
生产端发送延时消息(通过消息后置处理器)
//延迟消息-延时插件(最多俩天内)
String msg = "Test delay message by plugin " + new SimpleDateFormat("HH:mm:ss").format(new Date());
rabbitTemplate.convertAndSend(EXCHANGE_DELAY, ROUTING_KEY_DELAY, msg, message -> {
//x-delay参数必须基于x-delayed-message-exchange插件才能生效
message.getMessageProperties().setHeader("x-delay", "10000");
return message;
});
消费端消费延时消息
@RabbitListener(queues = {QUEUE_DELAY})
public void processMessageDelay(String dataString, Message message, Channel channel) throws IOException {
log.info("[delay message][消息本身]" + dataString);
log.info("[delay message][当前时间]" + new SimpleDateFormat("HH:mm:ss").format(new Date()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}