文章目录
- 一、起因
- 二、代码
- 1. 定义exchange和queue
- 2. RabbitTemplate
- 3. EnhancedCorrelationData
- 4. 发送消息
环境如下
Version | |
---|---|
SpringBoot | 3.2.1 |
spring-amqp | 3.1.1 |
RabbitMq | 3-management |
一、起因
老版本的spring-amqp
在CorrelationData
上设置ConfirmCallback
。但是今天却突然发现correlationData.getFuture()
没有addCallback
函数了。
查询文档和帖子后,发现ConfirmCallback
和ReturnsCallback
都需要在RabbitTemplate
中设置,同时ConfirmCallback
中默认无法得到消息内容,如果想在ConfirmCallback
中把消息内容存到数据库等地方进行记录,怎么办呢?
参考手册
- Spring AMQP 3.1.1
- Spring AMQP 3.1.1 API
二、代码
1. 定义exchange和queue
@Slf4j
@Configuration
public class PayNotifyConfig{
//交换机
public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout";
//支付通知队列
public static final String PAYNOTIFY_QUEUE = "paynotify_queue";
//支付结果通知消息类型
public static final String MESSAGE_TYPE = "payresult_notify";
//声明交换机,且持久化
@Bean(PAYNOTIFY_EXCHANGE_FANOUT)
public FanoutExchange paynotify_exchange_fanout() {
// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);
}
//支付通知队列,且持久化
@Bean(PAYNOTIFY_QUEUE)
public Queue paynotify_queue() {
return QueueBuilder.durable(PAYNOTIFY_QUEUE).build();
}
//交换机和支付通知队列绑定
@Bean
public Binding binding_paynotify_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
}
2. RabbitTemplate
在上面的类中继续添加RabbitTemplate
,并设置ConfirmCallback
和ReturnsCallback
。
@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置confirm callback
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
String body = "1";
if (correlationData instanceof EnhancedCorrelationData) {
body = ((EnhancedCorrelationData) correlationData).getBody();
}
if (ack) {
//消息投递到exchange
log.debug("消息发送到exchange成功:correlationData={},message_id={} ", correlationData, body);
System.out.println("消息发送到exchange成功:correlationData={},message_id={}"+correlationData+body);
} else {
log.debug("消息发送到exchange失败:cause={},message_id={}",cause, body);
System.out.println("消息发送到exchange失败:cause={},message_id={}"+cause+body);
}
});
//设置return callback
rabbitTemplate.setReturnsCallback(returned -> {
Message message = returned.getMessage();
int replyCode = returned.getReplyCode();
String replyText = returned.getReplyText();
String exchange = returned.getExchange();
String routingKey = returned.getRoutingKey();
// 投递失败,记录日志
log.error("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
replyCode, replyText, exchange, routingKey, message.toString());
});
return rabbitTemplate;
}
3. EnhancedCorrelationData
原始的CorrelationData
,目前已经无法从中获取消息内容,也就是说现在的ConfirmCallback
无法获取到消息的内容,因为设计上只关注是否投递到exchange
成功。如果需要在ConfirmCallback
中获取消息的内容,需要扩展这个类,并在发消息的时候,放入自定义数据。
public class EnhancedCorrelationData extends CorrelationData {
private final String body;
public EnhancedCorrelationData(String id, String body) {
super(id);
this.body = body;
}
public String getBody() {
return body;
}
}
4. 发送消息
在EnhancedCorrelationData
把消息本身放进去,或者如果你有表记录消息,你可以只放入其id
。这样触发ConfirmCallback的时候,就可以获取消息内容。
public void notifyPayResult() {
String message = "TEST Message";
Message message1 = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
CorrelationData correlationData = new EnhancedCorrelationData(UUID.randomUUID().toString(), message.toString());
rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT,"", message1, correlationData);
}