Spring Boot 整合 RocketMQ 之消息消费手动提交 ACK 实战【案例分享】

前言:

上一篇我们分享了 RocketMQ 消息重试的一些基本原理,本篇我们基于 Spring Boot 整合 RocketMQ 来分享一下 RocketMQ 消息基于手动提交的案例,在分享手动进行消息 ACK 中也会分享消息重试的使用。

RocketMQ 系列文章传送门

RocketMQ 的介绍及核心概念讲解

Spring Boot 整合 RocketMQ 之普通消息

Spring Boot 整合 RocketMQ 之定时/延时消息

Spring Boot 整合 RocketMQ 之顺序消息

Spring Boot 整合 RocketMQ 之事务消息

RocketMQ 之消息重试机制

同步消息手动提交 ACK 案例分享

同步消息 Producer 消息发送代码案例

同步消息发送的代码前面分享过,感兴趣的朋友也可以通过上面的系列文章链接去查看,这里还是简单的分享一下同步消息 Producer 消息发送代码,具体如下:

package com.order.service.rocketmq.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

/**
 * @ClassName: OneWayMessageProducer
 * @Author: zhangyong
 * @Date: 2024/9/27 17:27
 * @Description: 同步消息发送者
 */
@Slf4j
@Component
public class SyncMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    /**
     * @param message:
     * @date 2024/10/10 17:47
     * @description 同步消息发送
     */
    public void sendSyncMessage(String message) {
        rocketMqTemplate.syncSend("sync-topic", MessageBuilder.withPayload(message).build());
    }

}

同步消息手动 ACK Consumer 端代码案例分享

RocketMQ 消息手动 ACK 就不能再使用 @RocketMQMessageListener 注解 + 实现 RocketMQListener 接口的方式来实现了,RocketMQListener 的源码如下:

package org.apache.rocketmq.spring.core;

public interface RocketMQListener<T> {
    void onMessage(T var1);
}

我们可以看到 RocketMQListener 中只提供了一个 onMessage 方法,且返回值为 void,不接受返回值,因此没办法进行手动 ACK。

这里我们使用 DefaultMQPushConsumer 来实现消息的手动 ACK,DefaultMQPushConsumer 实现了 MQPushConsumer 接口,具体实现代码如下:

package com.order.service.rocketmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @ClassName: ManualCommitSyncCounsumer
 * @Author: Author
 * @Date: 2024/10/16 16:23
 * @Description:
 */
@Slf4j
@Component
public class ManualCommitSyncCounsumer {


    /**
     * @date 2024/10/16 17:19
     * @description 同步消息消费成功 手动提交
     */
    @PostConstruct
    public void onSyncMessage() throws MQClientException {
        //消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sync-group");
        //设置最大消息重试次数
        //consumer.setMaxReconsumeTimes(2);
        //RocketMQ 地址可以 可以用配置文件
        consumer.setNamesrvAddr("xxx-xxx-rocketmq.xxx.com:19876");
        //订阅一个或多个topic,并指定tag过滤条件,这里指定 * 表示接收所有 tag 的消息
        consumer.subscribe("sync-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //存储消息id 和消费次数的关系
            final Map<String, Integer> map = new HashMap<>();
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String dateStr = sdf.format(new Date());
                //消息处理
                MessageExt messageExt = list.get(0);
                String message = new String(messageExt.getBody());
                String msgId = messageExt.getMsgId();
                //获取消息消费次数
                Integer count = map.get(msgId);
                if (count == null) {
                    count = 0;
                }
                //次数+1
                count = count + 1;
                //覆盖map
                map.put(msgId, count);
                if (count > 2) {
                    log.info("当前时间:{},当前消息id:{},是第【{}】次消费,直接返回消费消费成功,消息内容:{}", dateStr, msgId, count, message);
                    //消息消费成功后移除
                    map.remove(msgId);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                log.info("当前时间:{},当前消息id:{},是第【{}】次消费,消息内容:{}", dateStr, msgId, count, message);
                //模拟除 0 异常
                //int a = 1 / 0;
                log.info("当前时间:{},当前消息id:{},是第【{}】次消费,消息消费成功,消息内容:{}", dateStr, msgId, count, message);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                //return null;
            }
        });
        consumer.start();
    }

}

RocketMQ 同步消费端手动 ACK 结果验证:

正常消费返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS

2024-10-19 10:23:07.575  INFO 10820 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:23:07,当前消息id:7F0000012A4418B4AAC25EECF4F50000,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:23:07.575  INFO 10820 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:23:07,当前消息id:7F0000012A4418B4AAC25EECF4F50000,是第【1】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了

没有模拟除 0 异常,正常消费返回,一次就消费成功,结果符合预期。

模拟除 0 异常

2024-10-19 10:19:59.026  INFO 34052 --- [MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:19:59,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:20:09.029  INFO 34052 --- [MessageThread_3] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:20:09,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【2】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:20:39.032  INFO 34052 --- [MessageThread_4] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:20:39,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【3】次消费,直接返回消费消费成功,消息内容:小明同学你妈喊你回家吃饭了

可以看到消息在不断重试消息,分别是 2024-10-19 10:19:59、2024-10-19 10:20:09、2024-10-19 10:20:39 触发了消费,时间间隔也是 10秒、30秒,结果符合预期。

正常消费但是返回 NULL

2024-10-19 10:25:47.338  INFO 27256 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:25:47,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:25:47.338  INFO 27256 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:25:47,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【1】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:25:57.344  INFO 27256 --- [MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:25:57,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【2】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:25:57.344  INFO 27256 --- [MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:25:57,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【2】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:26:27.347  INFO 27256 --- [MessageThread_3] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:26:27,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【3】次消费,直接返回消费消费成功,消息内容:小明同学你妈喊你回家吃饭了

可以看到返回 NULL 和模拟除 0 异常是一样的效果,消息在不断重试消息,分别是 2024-10-19 10:19:59、2024-10-19 10:20:09、2024-10-19 10:20:39 触发了消费,时间间隔也是 10秒、30秒,结果符合预期。

上面的案例我们没有控制消息消费重试次数,我们可以设置一个消息消费重试次数,代码如下:

//设置最大消息重试次数
consumer.setMaxReconsumeTimes(2);

使用 @RocketMQMessageListener 注解 + 实现 RocketMQListener 接口的方式消费消息的时候消费失败也会自动进行重试,因此我们一定要控制好重试次数,可以 @RocketMQMessageListener 注解的 maxReconsumeTimes 来控制重试次数(高版本的 starter 才有该属性)。

不管使用哪种方式进行消费,对于达到重试次数还是消费失败的消息一般有两种处理方式,分別是:

  • 直接返回消息消费成功,使用本地库记录消息进行重新推送或者人工介入处理。
  • 不进行处理,让消息进入死信队列,然后去监听死信队列进行处理。

顺序消息 Producer 消息发送代码案例

顺序消息发送的代码前面分享过,感兴趣的朋友也可以通过上面的系列文章链接去查看,这里还是简单的分享一下顺序消息 Producer 消息发送代码,具体如下:

package com.order.service.rocketmq.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;


/**
 * @ClassName: OneWayMessageProducer
 * @Author: Author
 * @Date: 2024/9/27 17:27
 * @Description: 顺序消息发送者
 */
@Slf4j
@Component
public class OrderlyMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    /**
     * @date 2024/10/11 15:45
     * @description 同步顺序消息
     */
    public void syncSendOrderly() {
        //hashKey 用来计算决定消息发送到哪个队列  一般是订单 ID 等信息  这里我们模拟订单 ID 发送
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 创建").build(), "666666");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 支付").build(), "666666");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 确认收货").build(), "666666");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 创建").build(), "888888");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 支付").build(), "888888");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 确认收货").build(), "888888");
    }

}

顺序消息 Consumer 消息消费代码案例

同步消息手动 ACK 我们是注册了一个 MessageListenerConcurrently 消息监听器,顺序消息的手动 ACK 我们需要注册一个 MessageListenerOrderly 的消息监听器,具体代码如下:

package com.order.service.rocketmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @ClassName: ManualCommitOrderlyMessageConsumer
 * @Author: Author
 * @Date: 2024/10/10 17:35
 * @Description: 顺序消息消费
 */
@Slf4j
@Component
public class ManualCommitOrderlyMessageConsumer {

    /**
     * @date 2024/10/16 17:19
     * @description 顺序消息消费成功 手动提交
     */
    @PostConstruct
    public void onOrderlyMessage() throws MQClientException {
        //消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-group");
        //设置最大消息重试次数
        //consumer.setMaxReconsumeTimes(2);
        //RocketMQ 地址可以 可以用配置文件
        consumer.setNamesrvAddr("dev-ztn-rocketmq.eminxing.com:19876");
        //订阅一个或多个topic,并指定tag过滤条件,这里指定 * 表示接收所有 tag 的消息
        consumer.subscribe("orderly-topic", "*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            //存储消息id 和消费次数的关系
            final Map<String, Integer> map = new HashMap<>();
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String dateStr = sdf.format(new Date());
                //消息处理
                MessageExt messageExt = list.get(0);
                String message = new String(messageExt.getBody());
                String msgId = messageExt.getMsgId();
                //获取消息消费次数
                Integer count = map.get(msgId);
                if (count == null) {
                    count = 0;
                }
                //次数+1
                count = count + 1;
                //覆盖map
                map.put(msgId, count);
                if (count > 2) {
                    log.info("当前时间:{},当前消息id:{},是第【{}】次消费,直接返回顺序消费消费成功,消息内容:{}", dateStr, msgId, count, message);
                    //消息消费成功后移除
                    map.remove(msgId);
                    return ConsumeOrderlyStatus.SUCCESS;
                }
                log.info("当前时间:{},当前消息id:{},是第【{}】次消费,顺序消息内容:{}", dateStr, msgId, count, message);
                //模拟除 0 异常
                //int a = 1 / 0;
                log.info("当前时间:{},当前消息id:{},是第【{}】次消费,顺序消息消费成功,消息内容:{}", dateStr, msgId, count, message);
                return ConsumeOrderlyStatus.SUCCESS;
                //return null;
            }
        });
        consumer.start();
    }

}

RocketMQ 顺序消费端手动 ACK 结果验证:

正常消费返回 ConsumeOrderlyStatus.SUCCESS

2024-10-19 13:46:44.293  INFO 29348 --- [orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EBC0000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:46:44.294  INFO 29348 --- [orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EBC0000,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:46:44.295  INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC00001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:46:44.295  INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC00001,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:46:44.295  INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC30002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:46:44.295  INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC30002,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:46:44.298  INFO 29348 --- [orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC50003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:46:44.298  INFO 29348 --- [orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC50003,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:46:44.300  INFO 29348 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC70004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:46:44.300  INFO 29348 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC70004,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:46:44.302  INFO 29348 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC90005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 13:46:44.303  INFO 29348 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC90005,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货

可以看到是按发送顺序消费的,结果符合预期。

模拟除 0 异常

2024-10-19 13:50:45.587  INFO 27604 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:45,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:50:46.588  INFO 27604 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:46,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:50:47.592  INFO 27604 --- [orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:47,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:50:47.592  INFO 27604 --- [orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:47,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:50:48.593  INFO 27604 --- [orderly-group_7] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:48,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:50:49.595  INFO 27604 --- [orderly-group_8] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:49,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:50:49.595  INFO 27604 --- [orderly-group_8] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:49,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:50:50.610  INFO 27604 --- [orderly-group_9] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:50,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:50:51.611  INFO 27604 --- [rderly-group_10] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:51,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:50:51.611  INFO 27604 --- [rderly-group_10] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:51,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:50:52.617  INFO 27604 --- [rderly-group_11] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:52,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:50:53.618  INFO 27604 --- [rderly-group_12] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:53,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:50:53.618  INFO 27604 --- [rderly-group_12] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:53,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:50:54.620  INFO 27604 --- [rderly-group_13] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:54,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:50:55.627  INFO 27604 --- [rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:55,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:50:55.627  INFO 27604 --- [rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:55,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 13:50:56.629  INFO 27604 --- [rderly-group_15] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:56,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 13:50:57.631  INFO 27604 --- [rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:57,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货

可以看到是按发送顺序消费的,且每条消息都是经过第三次消费后才返回成功的,每次消费具体上一次消费的时间间隔是 1 秒,结果符合预期。

这里需要注意一点的时候上一条消息没有消费成功时,后面一条消息永远不会被消费,这个从我们的日志中也能够体现出来,因此我们在使用顺序消息的时候一定要注意消息的重试次数,消息重试次数我们可以通过自己的业务来判断消费几次,也可以使用 RocketMQ 来实现,代码如下:

//设置最大消息重试次数
consumer.setMaxReconsumeTimes(2);

模拟返回 NULL

2024-10-19 14:00:51.484  INFO 22728 --- [rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:51,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 14:00:52.485  INFO 22728 --- [rderly-group_15] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:52,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 14:00:53.492  INFO 22728 --- [rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:53,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 14:00:53.492  INFO 22728 --- [rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:53,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 14:00:54.492  INFO 22728 --- [rderly-group_17] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:54,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 14:00:55.494  INFO 22728 --- [rderly-group_18] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:55,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 14:00:55.494  INFO 22728 --- [rderly-group_18] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:55,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 14:00:56.495  INFO 22728 --- [rderly-group_19] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:56,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 14:00:57.497  INFO 22728 --- [rderly-group_20] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:57,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 14:00:57.497  INFO 22728 --- [rderly-group_20] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:57,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 14:00:58.510  INFO 22728 --- [orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:58,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 14:00:59.520  INFO 22728 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:59,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 14:00:59.520  INFO 22728 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:59,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 14:01:00.521  INFO 22728 --- [orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:00,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 14:01:01.523  INFO 22728 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:01,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 14:01:01.523  INFO 22728 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:01,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 14:01:02.525  INFO 22728 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:02,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 14:01:03.527  INFO 22728 --- [orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:03,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货

可以看到是按发送顺序消费的,且每条消息都是经过第三次消费后才返回成功的,每次消费具体上一次消费的时间间隔是 1 秒,和模拟除 0异常 一样的结果,结果符合预期。

总结:RocketMQ 消息重试次数可以通过 @RocketMQMessageListener 注解的 maxReconsumeTimes 属性来设置,也可以通过编码来设置,不管采用何种方式来设置,我们都要在业务编码中做好处理,过多重视造成的性能问题,以及没有合理处理消费失败的消息造成的消息丢失问题,对于顺序消息我们更要慎重对待,顺序消息回到导致消息阻塞。

如有不正确的地方欢迎各位指出纠正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/899238.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

阿里云项目启动OOM问题解决

#1024程序员节&#xff5c;征文# 问题描述 随着项目业务的增长&#xff0c;系统启动时内存紧张&#xff0c;每次第一次启动的时候就会出现oom第二次或者第n的时候&#xff0c;就启动成功了。 带着这个疑问&#xff0c;我就在阿里云上提交了工单&#xff0c;咨询为什么第一次…

WIFI、NBIOT、4G模块调试AT指令连接华为云物联网服务器(MQTT协议)

一、前言 随着物联网&#xff08;IoT&#xff09;技术的飞速发展&#xff0c;越来越多的设备开始连接到互联网&#xff0c;形成了一个万物互联的世界。在这个背景下&#xff0c;设备与云端之间的通讯变得尤为重要。 本文将探讨几种常见的无线通信模块——EC20-4G、Air724ug-4…

CTFHUB技能树之文件上传——MIME绕过

开启靶场&#xff0c;打开链接&#xff1a; 直接指明是MIME验证 新建04MIME.php文件&#xff0c;内容如下&#xff1a; <?php echo "Ciallo&#xff5e;(∠・ω< )⌒★";eval($_POST[pass]);?> &#xff08;这里加了点表情&#xff0c;加带点私货&#x…

传感器驱动系列之PAW3212DB鼠标光电传感器

目录 一、PAW3212DB鼠标光电传感器简介 1.1 主要特点 1.2 引脚定义 1.3 传感器组装 1.4 应用场景 1.5 传感器使用注意 1.5.1 供电选择 1.5.2 SPI读写设置 1.5.3 MOTION引脚 1.6 寄存器说明 1.6.1 Product_ID1寄存器 1.6.2 MOTION_Status寄存器 1.6.3 Delta_X寄存器…

GRU神经网络理解

全文参考以下B站视频及《神经网络与深度学习》邱锡鹏&#xff0c;侧重对GPU模型的理解&#xff0c;初学者入门自用记录&#xff0c;有问题请指正【重温经典】GRU循环神经网络 —— LSTM的轻量级版本&#xff0c;大白话讲解_哔哩哔哩_bilibili 更新门、重置门、学习与输出 注&a…

Go通过gorm连接sqlserver报错TLS Handshake failed

Go通过gorm连接sqlserver报错TLS Handshake failed [error] failed to initialize database, got error TLS Handshake failed: tls: server selected unsupported protocol version 301 panic: TLS Handshake failed: tls: server selected unsupported protocol version 301 …

综合小程序的设计

熟悉python可视化的设计 完成综合小程序的设计。 登录系统设计 from tkinter import * import tkinter.messagebox def onClick(): namebname.get() pwdbpwd.getO() if (namezhou and pwd123): tkinter.messagebox.showinfo(title提示,message登陆成功&a…

Linux 中 .bash_history、.bash_logout 等用户配置文件

目录 前言.bash_history.bash_logout.bash_profile.bashrc.cshrc.tcshrc.viminfo 总结 前言 在 Linux 中我们经常会看见用户家目录下存在 .bash_history、.bash_logout、.bash_profile、.bashrc、.cshrc、.tcshrc、.viminfo 这写文件&#xff0c;那它们区别是什么呢&#xff1…

2024软考网络工程师笔记 - 第8章.网络安全

文章目录 网络安全基础1️⃣网络安全威胁类型2️⃣网络攻击类型3️⃣安全目标与技术 &#x1f551;现代加密技术1️⃣私钥密码/对称密码体制2️⃣对称加密算法总结3️⃣公钥密码/非对称密码4️⃣混合密码5️⃣国产加密算法 - SM 系列6️⃣认证7️⃣基于公钥的认证 &#x1f552…

Unity CRP学习笔记(一)

Unity CRP学习笔记&#xff08;一&#xff09; 主要参考&#xff1a; https://catlikecoding.com/unity/tutorials/custom-srp/ https://docs.unity.cn/cn/2022.3/ScriptReference/index.html 中文教程部分参考&#xff08;可选&#xff09;&#xff1a; https://tuncle.blog/c…

算力的定义、单位、影响因素、提升方法、分类、应用等。附超算排名

文章目录 算力的定义算力的单位FLOPS&#xff08;Floating Point Operations Per Second&#xff0c;浮点运算次数/秒&#xff09;IPS&#xff08;Instructions Per Second&#xff0c;指令/秒&#xff09;TOPS&#xff08;Trillion Operations Per Second&#xff0c;万亿次/秒…

Win10系统安装docker操作步骤

Docker下载 docker下载地址&#xff1a;Docker: Accelerated Container Application Development 打开网页后&#xff0c;点击图下所示&#xff0c;下载windows版本的docker 启用Hyper-V 和容器特性 右键左下角windows图标&#xff0c;选择应用和功能 然后在下面的界面中&am…

【Nuvoton干货分享】开发应用篇 4 -- 8bit MCU Flash 操作

我们在进行实际开发设计中&#xff0c;难免需要进行数据存储&#xff0c;早期很多都是外接EEPROM来进行设计&#xff0c;但是需要增加成本。其实芯片内部的Flash也是可以当成数据存储空间的。本章节主要介绍新唐的8位机如何进行常量数据的存储操作。 一、存储空间划分 我这边…

w~自动驾驶合集6

我自己的原文哦~ https://blog.51cto.com/whaosoft/12286744 #自动驾驶的技术发展路线 端到端自动驾驶 Recent Advancements in End-to-End Autonomous Driving using Deep Learning: A SurveyEnd-to-end Autonomous Driving: Challenges and Frontiers 在线高精地图 HDMa…

小程序无法获取头像昵称以及手机号码

用户在使用小程序的时候&#xff0c;登录弹出获取昵称头像或者个人中心点击默认头像弹窗获取头像昵称的时候&#xff0c;点击弹窗中的头像昵称均无反应&#xff0c; 这个是因为你的小程序隐私政策没有更新&#xff0c;或者老版本没有弹窗让用户同意导致的 解决办法&#xff1…

利用彩色相机给激光点云染色

文章目录 概述核心代码效果概述 在激光SLAM(Simultaneous Localization and Mapping)中,使用彩色相机为激光点云染色是一个常见的做法。这种技术结合了激光雷达的高精度距离测量和相机的丰富色彩信息,使得生成的点云不仅包含空间位置信息,还包含颜色信息,从而更直观和细…

【OpenAI】第六节(语音生成与语音识别技术)从 ChatGPT 到 Whisper 的全方位指南

前言 在人工智能的浪潮中&#xff0c;语音识别技术正逐渐成为我们日常生活中不可或缺的一部分。随着 OpenAI 的 Whisper 模型的推出&#xff0c;语音转文本的过程变得前所未有的简单和高效。无论是从 YouTube 视频中提取信息&#xff0c;还是将播客内容转化为文本&#xff0c;…

[实时计算flink]数据摄入YAML作业快速入门

实时计算Flink版基于Flink CDC&#xff0c;通过开发YAML作业的方式有效地实现了将数据从源端同步到目标端的数据摄入工作。本文介绍如何快速构建一个YAML作业将MySQL库中的所有数据同步到StarRocks中。 前提条件 已创建Flink工作空间&#xff0c;详情请参见开通实时计算Flink版…

Jenkins配置CI/CD开发环境(理论到实践的完整流程)

目录 一、对于CI/CD的理解1.1、什么是CI&#xff08;持续集成&#xff09;1.2、CI 的主要特点&#xff1a;1.3、CI 的优势&#xff1a;**实际开发中的场景举例:** 1.4、什么是 CD&#xff08;持续交付和持续部署&#xff09;1.5、持续交付&#xff08;Continuous Delivery&…

鸿蒙HarmonyOS NEXT 5.0开发(2)—— ArkUI布局组件

文章目录 布局Column&#xff1a;从上往下的布局Row&#xff1a;从左往右的布局Stack&#xff1a;堆叠布局Flex&#xff1a;自动换行或列 组件Swiper各种选择组件 华为官方教程B站视频教程 布局 主轴和交叉轴的概念&#xff1a; 对于Column布局而言&#xff0c;主轴是垂直方…