一、发送 & 接收带标签的消息
1.1、概述
消息的种类纷繁复杂,不同的业务场景需要不同的消息,基于此RocketMQ提供了消息过滤功能,通过Tag或者Key进行区分,本章介绍Tag,我们再往一个Topic里面发送消息的时候,根据业务逻辑可能需要区分,例如带有tagA的消息被A消费,带有TagB的消息被B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,这时我们也需要通过过滤才能区别对待。
其实这种场景在生活中也很常见,例如大家每天都使用的微信公众号,当关注的博主在公众号发布完消息后,你只会收到自己自己感兴趣的那部分。
1.2、订阅关系一致性
订阅关系一致性是消息过滤中对【消费者组名-Topic-Tag】的一些要求,如果不能正确的配置,将会出现消费消息紊乱,甚至消息丢失的问题。关于订阅关系一致性问题,请参考
订阅关系一致文档,这里不再赘述。
1.3、Demo07MQTestApp
/**
* @Author : 一叶浮萍归大海
* @Date: 2023/12/25 13:03
* @Description: 发送 & 接收带标签的消息
*/
@Slf4j
public class Demo07MQTestApp {
/**
* 发送带标签的消息
*/
@Test
public void demo7Producer() throws Exception {
// 1、创建一个生产者
DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group");
// 2、连接NameServer
producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
// 3、启动
producer.start();
// 4、创建消息
String[] tags = new String[]{"NBA", "run", "star","car","mobile","tourism"};
for (int i = 1; i <= 6; i++) {
String tag = tags[i % tags.length];
String content = "";
switch (tag) {
case "NBA":
content = "this is a message about NBA,消息编号[" + i + "]";
break;
case "run":
content = "this is a message about run,消息编号[" + i + "]";
break;
case "star":
content = "this is a message about star,消息编号[" + i + "]";
break;
case "mobile":
content = "this is a message about mobile,消息编号[" + i + "]";
break;
case "tourism":
content = "this is a message about tourism,消息编号[" + i + "]";
break;
default:
content = "this is a message about foods,消息编号[" + i + "]";
break;
}
Message message = new Message("tag-topic",tag,content.getBytes(StandardCharsets.UTF_8));
// 5、发送消息
producer.send(message);
log.info("【demo7Producer】发送消息成功,消息内容:{}",content);
}
// 关闭producer
producer.shutdown();
}
/**
* 接收带标签的消息(Push方式)
*/
@Test
public void demo7PushConsumer1() throws Exception {
// 1、创建一个消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-groupA");
// 2、连接NameServer
consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 3、订阅消息,*表示订阅该主题所有的消息
consumer.subscribe("tag-topic", "NBA");
// 4、设置监听器(采用异步回调方式,一直监听)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
log.info("我是消费者【demo7PushConsumer1】,我收到的消息是:{}",StrUtil.utf8Str(message.getBody()));
}
/**
* 返回值:消费消息成功与否
* CONSUME_SUCCESS:表明消费成功,消息会从MQ出队
* RECONSUME_LATER:表明消费失败,消息会重新回到队里,过一会儿再重新投递出来给当前消费者或者其他消费者
*/
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5、启动
consumer.start();
log.info("【demo7PushConsumer1】启动成功,正在等待接收消息...");
// 6、挂起当前JVM
System.in.read();
}
@Test
public void demo7PushConsumer2() throws Exception {
// 1、创建一个消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-groupB");
// 2、连接NameServer
consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 3、订阅消息,*表示订阅该主题所有的消息
consumer.subscribe("tag-topic", "NBA || star || mobile");
// 4、设置监听器(采用异步回调方式,一直监听)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
log.info("我是消费者【demo7PushConsumer2】,我收到的消息是:{}",StrUtil.utf8Str(message.getBody()));
}
/**
* 返回值:消费消息成功与否
* CONSUME_SUCCESS:表明消费成功,消息会从MQ出队
* RECONSUME_LATER:表明消费失败,消息会重新回到队里,过一会儿再重新投递出来给当前消费者或者其他消费者
*/
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5、启动
consumer.start();
log.info("【demo7PushConsumer2】启动成功,正在等待接收消息...");
// 6、挂起当前JVM
System.in.read();
}
}
1.4、测试
先后运行demo7PushConsumer1、demo7PushConsumer1和demo7Producer,观察控制台日志输出信息。
1.5、Topic和Tag如何选择
不同的业务应该使用不同的Topic,如果仅仅是相同的业务里边有不同的表现形式,那么我们要使用Tag进行区分。至于说具体怎么选择,可以从以下几个方面进行区分:
(1)消息类型是否一致:如普通消息、事务消息、延时消息、顺序消息、不同的消息类型使用不同的Topic,无法通过Tag进行区分;
(2)业务是否相关联:没有直接关联的消息,如淘宝交易信息、京东物流消息使用不同的Topic进行区分;而同样是淘宝交易消息,电器类订单、女装类订单、化妆品类订单的消息可以用Tag进行区分;
(3)消息优先级是否一致:如同样是物流消息,盒马必须2小时内送达,天猫超市24小时内送达,淘宝物流则相对会慢一些,不同优先级的消息用不同的Topic进行区分;
(4)消息量级是否相当:有些业务消息虽然量小但是实时性要求高,如果跟某些万亿量级别的消息使用同一个Topic,则有可能会因为过长的等待时间而"饿死",此时需要将不同量级的消息进行区分,使用不同的Topic;
总的来说,针对消息分类、可以选择创建多个Topic或者在同一个Topic下创建多个Tag。但是通常情况下,不同Topic之间的消息没有必然的联系。而Tag则用来区分同一个Topic下相互关联的消息,例如:全集和子集的关系,流程先后的关系。