无序消息的重试只针对集群消费模式生效;广播消费模式不提供失败重试特性
Producer
发了100个对象消息
public class AddProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("a-group");
producer.setNamesrvAddr("192.168.0.211:9876");
producer.start();
ArrayList<Alarm> list = new ArrayList<>();
for (int i = 1; i < 101; i++) {
Alarm bean = new Alarm(i, "add_"+i, new SimpleDateFormat("yyyy-MM-dd").parse("2024-01-01"));
list.add(bean);
}
try {
for (Alarm alarm : list) {
Message msg = new Message("ALARM_RECORD", "add",String.valueOf(alarm.getId()), JSONUtil.toJsonStr(alarm).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg,new CustomSendCallback());
System.out.println(alarm.getId() + " Continue execution ");
}
} catch (Exception e) {
e.printStackTrace();
}
// 6. 关闭生产者
// producer.shutdown();
}
}
Consumer
public class ConsumerAdd {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("record_add_group");
consumer.setNamesrvAddr("192.168.0.211:9876");
consumer.subscribe("ALARM_RECORD", "add");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 默认允许每条消息最多重试16次
consumer.setMaxReconsumeTimes(7);
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
for (MessageExt msg : msgs) {
String body = new String(msg.getBody());
Alarm bean = JSONUtil.toBean(body, Alarm.class);
if(bean.getId() == 77){
throw new Exception();
}
System.out.println("正常消费 bean = " + bean);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
System.out.println("重试 "+msgs.get(0).getReconsumeTimes() +" "+ new String(msgs.get(0).getBody()));
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
// 启动消费者
consumer.start();
System.out.printf("Consumer Tag add_1 Started.%n");
}
}
多开测试
失败后进入重试队列
16次的时间间隔
10s、30s、1min、2min、3min、4min、5min、6min、7min、8min、9min、10min、20min、30min、1h、2h
最后一次失败直接进入死信队列、人工处理