一、前言
在前面我们通过以下章节对RocketMQ
有了基础的了解:
docker-compose 搭建RocketMQ 5.1.0 集群(双主双从模式) | Spring Cloud 28
docker-compose 搭建RocketMQ 5.1.0 集群开启ACL权限控制 | Spring Cloud 29
现在开始我们正式学习Spring Boot
中集成RocketMQ
使用,,在本章节主要进行对以下部分讲解说明:
- 普通消息的发送接收
- 延时消息的发送接收
- 事务消息的发送接收
- 发送端和接收端开启
ACL
PULL
模式消费及@ExtRocketMQConsumerConfiguration
使用
二、项目集成RocketMQ
2.1 项目总体结构
2.2 引入依赖
rocketmq/pom.xml
:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
2.3 配置文件
rocketmq/src/main/resources/application.yml
:
server:
port: 8888
spring:
application:
name: @artifactId@
rocketmq:
name-server: 192.168.0.30:9876
producer:
group: @artifactId@-group
send-message-timeout: 60000 # 发送消息超时时间,单位:毫秒。默认为 3000
retry-times-when-send-failed: 3 # 同步发送消息时,失败重试次数。默认为 2 次
retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次
retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 默认为 false
access-key: RocketMQAdmin # Access Key
secret-key: 1qaz@WSX # Secret Key
enable-msg-trace: true # 是否开启消息轨迹功能,默认为 true 开启
customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic,默认为 RMQ_SYS_TRACE_TOPIC
consumer:
access-key: RocketMQAdmin # Access Key
secret-key: 1qaz@WSX # Secret Key
logging:
level:
org:
springframework:
boot:
autoconfigure:
logging: info
2.4主题及消费组常量
com/gm/rocketmq/component/rocketmq/TopicConstants.java
:
package com.gm.rocketmq.component.rocketmq;
/**
* 主题常量
*/
public interface TopicConstants {
String NORMAL_ROCKETMQ_TOPIC_TEST= "NORMAL_ROCKETMQ_TOPIC_TEST";
String ORDERLY_ROCKETMQ_TOPIC_TEST= "ORDERLY_ROCKETMQ_TOPIC_TEST";
String SCHEDULE_ROCKETMQ_TOPIC_TEST= "SCHEDULE_ROCKETMQ_TOPIC_TEST";
String TRANSACTION_ROCKETMQ_TOPIC_TEST= "TRANSACTION_ROCKETMQ_TOPIC_TEST";
String PULL_ROCKETMQ_TOPIC_TEST= "PULL_ROCKETMQ_TOPIC_TEST";
String EXT_ROCKETMQ_TOPIC_TEST= "EXT_ROCKETMQ_TOPIC_TEST";
String CONSUMER_GROUP = "_CONSUMER_GROUP";
}
三、各类型消息收发
3.1 普通消息
3.1.1 普通消息发送
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 向rocketmq发送同步和异步消息
*/
@RequestMapping(value = "sendNormal", method = RequestMethod.GET)
public String sendNormal() {
rocketMQTemplate.send(TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST + ":sync", MessageBuilder.withPayload("同步发送消息").build());
rocketMQTemplate.asyncSend(TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST + ":async", MessageBuilder.withPayload("异步发送消息").build(), new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("异步发送成功:{}", sendResult.getSendStatus().name());
}
@Override
public void onException(Throwable throwable) {
log.info("异步发送失败:{}", throwable.getMessage());
}
});
return "OK";
}
3.1.2 普通消息接收
com/gm/rocketmq/component/rocketmq/NormalRocketMqListener.java
:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST, consumerGroup = TopicConstants.NORMAL_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP, accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}")
public class NormalRocketMqListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("普通订阅-接收到的信息:{}", s);
}
}
@RocketMQMessageListener
注解参数说明:
consumerGroup
:消费者订阅组,它是必需的,并且必须是唯一的。topic
:主题名字,生产发送的主题名。consumeMode
:消费模式,可选择并发或有序接收消息;默认CONCURRENTLY
同时接收异步传递的消息。messageModel
:消息模式,默认CLUSTERING
集群消费;如果希望所有订阅者都接收消息,可以设置广播BROADCASTING
。consumeThreadMax
:消费者最大线程数,默认64
。consumeTimeout
:消息阻塞最长时间,默认15
分钟。nameServer
:服务器地址,默认读取配置文件地址,可以单独为消费者设置指定位置。selectorExpression
:消费指定的Tag标签的业务消息。Consumer
端ACL
功能需要在@RocketMQMessageListener
中进行配置Producer
端ACL
功能需要在配置文件中进行配置- 更多查看官方解释
3.2 顺序消息
3.2.1 顺序消息发送
/**
* 向rockertmq发送顺序消息,同步方式
*
* @return
*/
@RequestMapping(value = "sendOrderlySync", method = RequestMethod.GET)
public String sendOrderlySync() {
// 订单列表
List<OrderStep> orderList = buildOrders();
for (int i = 0; i < 10; i++) {
Message msg = MessageBuilder.withPayload(orderList.get(i).toString()).build();
String orderId = String.valueOf(orderList.get(i).getOrderId());
rocketMQTemplate.sendOneWayOrderly(TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + ":sync", msg, orderId);
}
return "OK";
}
/**
* rockertmq发送顺序消息,异步方式
*
* @return
*/
@RequestMapping(value = "sendOrderlyAsync", method = RequestMethod.GET)
public String sendOrderlyAsync() {
// 订单列表
List<OrderStep> orderList = buildOrders();
for (int i = 0; i < 10; i++) {
Message msg = MessageBuilder.withPayload(orderList.get(i).toString()).build();
String orderId = String.valueOf(orderList.get(i).getOrderId());
rocketMQTemplate.syncSendOrderly(TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + ":async", msg, orderId);
}
return "OK";
}
/**
* 订单的步骤
*/
private static class OrderStep {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
@Override
public String toString() {
return "OrderStep{" + "orderId=" + orderId + ", desc='" + desc + '\'' + '}';
}
}
/**
* 生成模拟订单数据
*/
private List<OrderStep> buildOrders() {
List<OrderStep> orderList = new ArrayList<OrderStep>();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("创建");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成");
orderList.add(orderDemo);
return orderList;
}
3.2.1 顺序消息接收
com/gm/rocketmq/component/rocketmq/OrderlyRocketMqListenerA.java
:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST,
consumerGroup = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP,
accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}",
consumeMode = ConsumeMode.ORDERLY)
public class OrderlyRocketMqListenerA implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("顺序订阅-接收到的信息:{}", s);
}
}
com/gm/rocketmq/component/rocketmq/OrderlyRocketMqListenerB.java
:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST,
consumerGroup = TopicConstants.ORDERLY_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP,
accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}",
consumeMode = ConsumeMode.ORDERLY)
public class OrderlyRocketMqListenerB implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("顺序订阅-接收到的信息:{}", s);
}
}
3.3 延时消息
3.3.1 延时消息发送
/**
* rockertmq发送延时消息
*
* @return
*/
@RequestMapping(value = "sendSchedule", method = RequestMethod.GET)
public String sendSchedule() {
Message msg = MessageBuilder.withPayload("延时消息")
.build();
rocketMQTemplate.syncSendDelayTimeSeconds(TopicConstants.SCHEDULE_ROCKETMQ_TOPIC_TEST + ":", msg, 20);
log.info("延时消息-发布时间:{}", System.currentTimeMillis());
return "OK";
}
3.3.2 延时消息接收
com/gm/rocketmq/component/rocketmq/ScheduleRocketMqListener.java
:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RocketMQMessageListener(topic = TopicConstants.SCHEDULE_ROCKETMQ_TOPIC_TEST,
consumerGroup = TopicConstants.SCHEDULE_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP,
accessKey = "${rocketmq.consumer.access-key}", secretKey = "${rocketmq.consumer.secret-key}")
public class ScheduleRocketMqListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt message) {
String msg = "内容:" + new String(message.getBody()) + ",时间:" + (System.currentTimeMillis() - message.getBornTimestamp()) + "ms later";
log.info("延时订阅-接收到的信息:{}", msg);
log.info("延时消息-接受时间:{}", System.currentTimeMillis());
}
}
3.4 发送端事务消息
3.4.1 事务消息发送
/**
* rockertmq发送生产端事务消息
*
* @return
*/
@RequestMapping(value = "sendTransaction", method = RequestMethod.GET)
public String sendTransaction() {
Message msg = MessageBuilder.withPayload("事务消息")
.build();
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(TopicConstants.TRANSACTION_ROCKETMQ_TOPIC_TEST + ":", msg, "自定义参数");
log.info("事务消息-发布结果:{} = {}", result.getTransactionId(), result.getSendStatus());
return "OK";
}
com/gm/rocketmq/component/rocketmq/TransactionListenerImpl.java
:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.messaging.Message;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@RocketMQTransactionListener
class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
// 事务消息共有三种状态,提交状态、回滚状态、中间状态:
// RocketMQLocalTransactionState.COMMIT: 提交事务,它允许消费者消费此消息。
// RocketMQLocalTransactionState.ROLLBACK: 回滚事务,它代表该消息将被删除,不允许被消费。
// RocketMQLocalTransactionState.UNKNOWN: 中间状态,它代表需要检查消息队列来确定状态。
// executeLocalTransaction 方法来执行本地事务
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transactionId = msg.getHeaders().get("__transactionId__").toString();
log.info("执行本地事务,transactionId:{}", transactionId);
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(transactionId, status);
log.info("获取自定义参数:{}", arg);
return RocketMQLocalTransactionState.UNKNOWN;
}
// checkLocalTransaction 方法用于检查本地事务状态,并回应消息队列的检查请求
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transactionId = msg.getHeaders().get("__transactionId__").toString();
log.info("检查本地事务状态,transactionId:{}", transactionId);
Integer status = localTrans.get(transactionId);
if (null != status) {
switch (status) {
case 0:
return RocketMQLocalTransactionState.UNKNOWN;
case 1:
return RocketMQLocalTransactionState.COMMIT;
case 2:
return RocketMQLocalTransactionState.ROLLBACK;
}
}
return RocketMQLocalTransactionState.COMMIT;
}
}
3.5 Pull模式消费
3.5.1 源码分析
在rocketmq-spring-boot-starter
中关于Pull
模式消费的自动配置,
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration
:
其中rocketmq.name-server
、rocketmq.pull-consumer.group
、rocketmq.pull-consumer.topic
三项配置为必填项。
剩余其他配置,org.apache.rocketmq.spring.autoconfigure.RocketMQProperties.PullConsumer
:
由上可知,利用
rocketmq-spring-boot-starter
实现PULL
模式,只支持单Topic
的PULL
消费,要想对多个Topic
进行PULL
模式消费需要用到@ExtRocketMQConsumerConfiguration
。
3.5.2 PULL消费所需配置文件
配置文件新增pull-consumer
相关配置,完整rocketmq/src/main/resources/application.yml
:
rocketmq:
name-server: 192.168.0.30:9876
producer:
group: @artifactId@-group
send-message-timeout: 60000 # 发送消息超时时间,单位:毫秒。默认为 3000
retry-times-when-send-failed: 3 # 同步发送消息时,失败重试次数。默认为 2 次
retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次
retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 默认为 false
access-key: RocketMQAdmin # Access Key
secret-key: 1qaz@WSX # Secret Key
enable-msg-trace: true # 是否开启消息轨迹功能,默认为 true 开启
customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic,默认为 RMQ_SYS_TRACE_TOPIC
consumer:
access-key: RocketMQAdmin # Access Key
secret-key: 1qaz@WSX # Secret Key
pull-consumer:
access-key: RocketMQAdmin # Access Key
secret-key: 1qaz@WSX # Secret Key
topic: PULL_ROCKETMQ_TOPIC_TEST
group: PULL_ROCKETMQ_TOPIC_TEST_CONSUMER_GROUP
3.5.3 消息发送
/**
* 向ockertmq 消费端pull模式发生消息
*
* @return
*/
@RequestMapping(value = "sendPull", method = RequestMethod.GET)
public String pull() {
for (int i = 0; i < 10; i++) {
Message msg = MessageBuilder.withPayload("pull 消息" + i).build();
rocketMQTemplate.syncSend(TopicConstants.PULL_ROCKETMQ_TOPIC_TEST + ":", msg);
}
for (int i = 0; i < 10; i++) {
Message msg = MessageBuilder.withPayload("pull ext 消息" + i).build();
rocketMQTemplate.syncSend(TopicConstants.EXT_ROCKETMQ_TOPIC_TEST + ":", msg);
}
return "OK";
}
3.5.4 @ExtRocketMQConsumerConfiguration使用
此示例利用
@ExtRocketMQConsumerConfiguration
定义消费,声明消费的Topic
和消费组,或声明不同name-server
。
利用
@ExtRocketMQTemplateConfiguration
定义生产者,声明不同name-server
或者其他特定的属性来定义非标的RocketMQTemplate
。
com/gm/rocketmq/component/rocketmq/ExtRocketMQTemplate.java
:
import org.apache.rocketmq.spring.annotation.ExtRocketMQConsumerConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
/**
* 可用于不同name-server或者其他特定的属性来定义非标的RocketMQTemplate,此示例定义消息Topic和消费者
*/
@ExtRocketMQConsumerConfiguration(group = TopicConstants.EXT_ROCKETMQ_TOPIC_TEST + TopicConstants.CONSUMER_GROUP,
topic = TopicConstants.EXT_ROCKETMQ_TOPIC_TEST)
public class ExtRocketMQTemplate extends RocketMQTemplate {
}
3.5.5 PULL模式消息接收
com/gm/rocketmq/component/rocketmq/PullConsumer.java
:
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class PullConsumer implements CommandLineRunner {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private ExtRocketMQTemplate extRocketMQTemplate;
@Override
public void run(String... args) {
while (true) {
List<String> messages = rocketMQTemplate.receive(String.class, 5000);
log.info("receive from rocketMQTemplate, messages={}", messages);
messages = extRocketMQTemplate.receive(String.class, 5000);
log.info("receive from extRocketMQTemplate, messages={}", messages);
}
}
}
3.6 源码
源码地址:https://gitee.com/gm19900510/springboot-cloud-example.git 中模块rocketmq
。