SpringBoot整合RocketMQ发送延时消息
springboot rocketmq 延迟消息
Windows下RocketMQ安装及可视化界面搭建
Java 客户端
RocketMQ延迟消息
项目背景
项目中有延时消息的需求,综合考量RocketMQ比较适合。
-
RocketMQ支持多维度的延迟级别
-
支持多种消息类型
-
基于分布式架构时间高可用
-
有重试机制
-
高吞吐量,流处理
基本使用
依赖
<!--rocketMq依赖-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>自己选择版本</version>
</dependency>
配置
#这两个是必须配置项
rocketmq:
name-server: localhost:9876
producer:
group: xxx
生产者
@Data
@Component
@ConfigurationProperties(prefix = "mq")
public class MqConfig {
/**
* 是否开启mq延迟消息
*/
private Boolean enabled;
/**
* 缺货mq主题
*/
private String topic = "topic";
/**
* 发送超时时间,单位:毫秒(ms)
*/
private Long timeout = 3000L;
/**
* 延时级别为(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)其下标,从1开始
*/
private Integer delayLevel = 3;
}
@Component
@Slf4j
public class Producer {
@Autowired
private MqConfig mqConfig;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送延迟消息
* @param msg
*/
public void sendScheduleMsg(String msg) {
Message<String> message = MessageBuilder.withPayload(msg).build();
String topic = mqConfig.getTopic();
SendResult sendResult = rocketMQTemplate.syncSend(topic, message, mqConfig.getTimeout(), mqConfig.getDelayLevel());
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus())) {
log.info("【sendScheduleMsg】 发送延迟[{}]消息[{}]成功", topic, msg);
} else {
log.warn("【sendScheduleMsg】 发送延迟[{}]消息[{}]失败", topic, msg);
}
}
}
消费者
@Component
@RocketMQMessageListener(consumerGroup = "group", topic = "topic")
@Slf4j
public class Consumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt messageExt) {
String msg = new String(messageExt.getBody(), StandardCharsets.UTF_8);
log.info("收到延迟消息:{}", msg);
}
}
可以看到broker是收到数据的