0. 引言
rocketmq支持两种消费模式:pull和push,在实际开发中这两种模式分别是如何实现的呢,在spring框架和springboot框架中集成有什么差异?今天我们一起来探究这两个问题。
1. java client实现消息消费
1、添加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
1.1 Push消息消费
rocketmq的push消费是通过pull模式为基础来进行模拟的,就是通过监听器,不断的pull来实现,因此其实现重点就是实现监听器
rocektmq的监听器支持2种:
- MessageListenerConcurrently 拉取到新消息之后就提交到线程池去消费
- MessageListenerOrderly 通过加分布式锁和本地锁保证同时只有一条线程去消费一个队列上的数据,以此保证顺序消费
这里虽然还有MessageListener
类型,实际上是上述两种的父类,该方法也被弃用了
所以push模式的的重点就是实现MessageListenerConcurrently监听器,其内部只有一个consumeMessage
方法
那么实现的重点就是consumeMessage方法,这里我们睡眠了10s,用于模拟该监听器运行10s
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");
consumer.setNamesrvAddr("127.0.0.1:9876");
// 集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置topic
consumer.subscribe("topic_test", "*");
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
for (MessageExt msg : list) {
String topic = msg.getTopic();
try {
String messageBody = new String(msg.getBody(), "utf-8");
System.out.println(topic+":"+messageBody);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
Thread.sleep(10000);
}
当然,如上的形式只能用于我们单元测试使用,集成在生产中时肯定不能这样用,我们需要将其注册为bean形式,并在项目启动时进行调用,让其注册为监听器
@Component
public class Consumer1PushListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list) {
String topic = msg.getTopic();
try {
String messageBody = new String(msg.getBody(), "utf-8");
System.out.println(topic+":"+messageBody);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
@PostConstruct
public void init(){
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");
consumer.setNamesrvAddr("127.0.0.1:9876");
// 集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 注册监听器
consumer.registerMessageListener(this);
try{
// 设置topic
consumer.subscribe("topic_test", "*");
// 启动示例
consumer.start();
}catch (Exception e){
e.printStackTrace();
System.out.println("rocketmq 消费者启动失败");
}
}
}
我们启动项目,发送一条消息,会发现消费者可以实时消费
消息模式如何调整?
rocektmq 有集群模式和广播模式两种消息模式,如果需要调整的话,通过消费者的setMessageModel方法即可调整
// 集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
1.2 Pull消息消费
pull模式的实现更加简单,直接查看pull消费者类DefaultMQPullConsumer
,其下有pull方法
官方给出的示例代码如下:
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
try {
MessageQueue mq = new MessageQueue();
mq.setQueueId(0);
mq.setTopic("topic_test");
mq.setBrokerName("Broker");
long offset = 26;
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
if (pullResult.getPullStatus().equals(PullStatus.FOUND)) {
System.out.printf("%s%n", pullResult.getMsgFoundList());
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
}
} catch (Exception e) {
e.printStackTrace();
}
consumer.shutdown();
}
但是截止目前,该类已经被弃用了
更加推荐的是用DefaultLitePullConsumer
类实现,其下的poll方法可以帮助我们更加方便的实现消息消费,这里需要注意,两个类,一个是pull,一个是poll,pull实际上是需要指定偏移量的,而poll则自动帮我们更新了偏移量
public static void main(String[] args) throws MQClientException {
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("group2");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("topic_test", "*");
consumer.start();
try {
List<MessageExt> messageList = consumer.poll(3000);
for (MessageExt message : messageList) {
System.out.println("pull消费:"+new String(message.getBody()));
}
} catch (Exception e) {
e.printStackTrace();
}
consumer.shutdown();
}
发送几条消息,运行测试
生产中使用时,大家可以把DefaultLitePullConsumer定义为bean, 以此减少每次资源创建的消耗,具体方式可参考上述push模式的实现代码
1.3 顺序消息消费
rocketmq中提供了两种消费处理形式:并发消费(MessageListenerConcurrently
)和顺序消费(MessageListenerOrderly
)
并发消费消费者会创建多个线程同时消费队列消息,而顺序消费流程跟并发消费最大的区别在于,顺序消费对要处理的队列加锁,确保同一队列,同一时间,只允许一个消费线程处理
我们在之前消息发送的章节已经提前体验过顺序消费代码实现了,通过上述对监听器类型的描述,我们也能知道顺序消费的实现,就是实现MessageListenerOrderly
监听器
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_test");
consumer.setNamesrvAddr("127.0.0.1:9876");
// 集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置topic
consumer.subscribe("topic_order", "*");
// 注册回调函数,处理消息
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
byte[] body = list.get(0).getBody();
System.out.println("接收消息:"+new String(body, StandardCharsets.UTF_8));
return ConsumeOrderlyStatus.SUCCESS;
}
});
// 启动消费者实例
consumer.start();
Thread.sleep(10000);
}
2. springboot实现消息消费
1、添加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
2、修改配置项
rocketmq:
name-server: localhost:9876
producer:
group: group_test # 生产者分组,事务消息会使用
send-message-timeout: 3000 # 消息发送超时时长,默认3s
retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2
2.1 push消息消费
通过实现RocketMQListener<T>
接口,其中T是泛型,及消息内容的数据类型,可以是String, JSONObject,也可以是自定义数据结构类型
将监听器声明为bean,并实现onMessage方法即可
@Component
@RocketMQMessageListener(topic = "topic_test", consumerGroup = "group_test")
public class MessageListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("消费消息:" + s);
}
}
注解中的messageModel属性可以用来设置消息模式,默认为集群模式
2.2 pull消息消费
添加消费者配置
rocketmq:
name-server: localhost:9876
consumer:
group: "group_test"
topic: "topic_test"
通过receive方法实现消费
@GetMapping(value = "/poll")
public void poll() {
List<String> list = rocketMQTemplate.receive(String.class);
for (String message : list) {
System.out.println("poll消费:"+message);
}
}
2.3 顺序消息消费
与普通消息不同的是,要声明消费模式为顺序消费consumeMode= ConsumeMode.ORDERLY
@Component
@RocketMQMessageListener(topic = "topic_order", consumerGroup = "group_order", consumeMode= ConsumeMode.ORDERLY)
public class MessageOrderListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("顺序消费消息:" + s);
}
}
3. 总结
消息消费相对更加简单,实际上掌握一种之后,其他类型的也就能够举一反三了,本文也只是针对最常用的类型进行列举,还有更多参数的支持,需要大家在实际应用中探索。
本文演示源码见:https://gitee.com/wuhanxue/wu_study/tree/master/demo/rocketmq_demo