需求
消息实体,根据实体中的一个字段,决定推给多个业务系统。
例:一个点位信息Bean,这个点位信息,设备、能源、安全都有用,那么点位信息表中有适用模块标识。
点位新增 需要通知所有勾选业务系统 tag - add
点位编辑 需要新增勾选业务系统标识 tag - add ,移除勾选 tag - delete ,不变 tag - update
点位删除 通知所有勾选的系统 tag - delete
分析
1、MQ不支持同个消息,一下子发送到不同的topic。发多次也可以实现,但是多少有点抵触
2、那么发送到同一个topic下,让各个业务系统来取,那么必定需要去过滤,不然拿到不属于本业务系统的点位信息了,仅仅靠tag明显是不够的,服务端过滤可以采用SQL92方式
3、那么我随之就想到也可以在各个业务系统中过滤了,不是本业务系统的标识,直接返回。不执行相关逻辑。
本人更倾向于第二种实现
编码
尝试一:SQL92过滤实现
再broker.conf 新增配置 enablePropertyFilter=true
生产者
public class AddProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("a-group");
producer.setNamesrvAddr("192.168.0.211:9876");
producer.start();
Point point0 = new Point(0, "point0");
Point point1 = new Point(1, "point1");
Point point2 = new Point(2, "point2");
Point point3 = new Point(3, "point3");
Point point4 = new Point(4, "point4");
Point point5 = new Point(5, "point5");
Point point6 = new Point(6, "point6");
Point point7 = new Point(7, "point7");
Point point8 = new Point(8, "point8");
Point point9 = new Point(9, "point9");
ArrayList<Point> list = new ArrayList<>();
list.add(point0);
list.add(point1);
list.add(point2);
list.add(point3);
list.add(point4);
list.add(point5);
list.add(point6);
list.add(point7);
list.add(point8);
list.add(point9);
try {
for (Point bean : list) {
Message msg = new Message("UNIFIED_POINT", "add", JSONUtil.toJsonStr(bean).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("testValue",String.valueOf(bean.getId()));
producer.send(msg,new CustomMessageQueueSelector(),bean.getId(),new CustomSendCallback());
System.out.println(bean.getId() + " Continue execution ");
}
Thread.sleep(20000);
for (Point bean : list) {
Message msg = new Message("UNIFIED_POINT", "update", JSONUtil.toJsonStr(bean).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.putUserProperty("testValue",String.valueOf(bean.getId()));
producer.send(msg,new CustomMessageQueueSelector(),bean.getId(),new CustomSendCallback());
System.out.println(bean.getId() + " Continue execution ");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者1 - 模拟业务系统1
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("calc_rules_point");
// 指定Namesrv地址
consumer.setNamesrvAddr("192.168.0.211:9876");
// 订阅主题和标签
consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("(testValue is not null and testValue between 0 and 3)"));
// 设置Consumer第一次启动是从哪个位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setSuspendCurrentQueueTimeMillis(2000);
// 注册消息监听器
consumer.registerMessageListener(new CustomMessageListenerOrderly());
// 启动消费者
consumer.start();
System.out.printf("Group1ConsumerTagA Started.%n");
}
消费者2 - 业务系统2
public class consumer2 {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("a_point");
// 指定Namesrv地址
consumer.setNamesrvAddr("192.168.0.211:9876");
// 订阅主题和标签
consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("(testValue is not null and testValue between 4 and 6)"));
// 设置Consumer第一次启动是从哪个位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setSuspendCurrentQueueTimeMillis(2000);
// 注册消息监听器
consumer.registerMessageListener(new CustomMessageListenerOrderly());
// 启动消费者
consumer.start();
System.out.printf("Group1ConsumerTagA Started.%n");
}
}
测试结果
消费者1
消费者2
sql92 确实可以实现我的需求。
那么我把testValue换成各业务系统唯一标识,逗号拼接
把生产者调整一下
msg.putUserProperty("testValue","aaaa,bbbb,cccc");
客户端试了 in 不行
consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("(testValue is not null and 'aaaa' in (testValue))"));
like
consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("(testValue is not null and testValue LIKE '%aaaa%')"));
不行,只支持这些关键字
in 关键字 只能变量in 特定字符集合这样表达。
思路转变一下,也不是说不能实现
msg.putUserProperty("aaaa","aaaa"); msg.putUserProperty("bbbb","bbbb"); msg.putUserProperty("cccc","cccc");
生产者 setProperty 每个业务系统一个区分开来。
消费者即可实现
consumer.subscribe("UNIFIED_POINT", MessageSelector.bySql("aaaa is not null"));
尝试二:消费端逻辑处理
修改生产者
测试的点位信息加上适用系统标识(逗号分隔)
public class AddProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("a-group");
producer.setNamesrvAddr("192.168.0.211:9876");
producer.start();
Point point1 = new Point(1, "point1","1,2");
Point point2 = new Point(2, "point2","1,2");
Point point3 = new Point(3, "point3","1,4");
Point point4 = new Point(4, "point4","1,4");
Point point5 = new Point(5, "point5","1,5");
ArrayList<Point> list = new ArrayList<>();
list.add(point1);
list.add(point2);
list.add(point3);
list.add(point4);
list.add(point5);
try {
for (Point bean : list) {
Message msg = new Message("UNIFIED_POINT", "add", JSONUtil.toJsonStr(bean).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg,new CustomMessageQueueSelector(),bean.getId(),new CustomSendCallback());
System.out.println(bean.getId() + " Continue execution ");
}
Thread.sleep(20000);
for (Point bean : list) {
Message msg = new Message("UNIFIED_POINT", "update", JSONUtil.toJsonStr(bean).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg,new CustomMessageQueueSelector(),bean.getId(),new CustomSendCallback());
System.out.println(bean.getId() + " Continue execution ");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
模拟消费者一:业务系统标识为1
public class consumer {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("calc_rules_point");
// 指定Namesrv地址
consumer.setNamesrvAddr("192.168.0.211:9876");
// 订阅主题和标签
consumer.subscribe("UNIFIED_POINT", "*");
// 设置Consumer第一次启动是从哪个位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setSuspendCurrentQueueTimeMillis(2000);
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
try {
for (MessageExt msg : list) {
String data = new String(msg.getBody());
Point p = JSONUtil.toBean(data, Point.class);
List<String> ids = Stream.of(p.getApplyModuleIds().split(","))
.map(String::trim)
.collect(Collectors.toList());
if(!ids.contains("1")){
//非本业务系统 直接返回
return ConsumeOrderlyStatus.SUCCESS;
}
if(msg.getTags().equals("add")){
System.out.println("新增消费:" + p + msg.getQueueId());
}else if(msg.getTags().equals("update")){
System.out.println("修改消费:" + p + msg.getQueueId());
}
}
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
MessageExt msg = list.get(0);
log.error("consumer news error " + new String(msg.getBody()));
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
// 启动消费者
consumer.start();
System.out.printf("Group1ConsumerTagA Started.%n");
}
}
模拟消费者二:业务系统标识2
public class consumer2 {
public static void main(String[] args) throws Exception {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("a_point");
// 指定Namesrv地址
consumer.setNamesrvAddr("192.168.0.211:9876");
// 订阅主题和标签
consumer.subscribe("UNIFIED_POINT", "*");
// 设置Consumer第一次启动是从哪个位置开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setSuspendCurrentQueueTimeMillis(2000);
// 注册消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
try {
for (MessageExt msg : list) {
String data = new String(msg.getBody());
Point p = JSONUtil.toBean(data, Point.class);
List<String> ids = Stream.of(p.getApplyModuleIds().split(","))
.map(String::trim)
.collect(Collectors.toList());
if(!ids.contains("2")){
//非本业务系统 直接返回
return ConsumeOrderlyStatus.SUCCESS;
}
if(msg.getTags().equals("add")){
System.out.println("新增消费:" + p + msg.getQueueId());
}else if(msg.getTags().equals("update")){
System.out.println("修改消费:" + p + msg.getQueueId());
}
}
return ConsumeOrderlyStatus.SUCCESS;
} catch (Exception e) {
MessageExt msg = list.get(0);
log.error("consumer news error " + new String(msg.getBody()));
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
// 启动消费者
consumer.start();
System.out.printf("Group1ConsumerTagA Started.%n");
}
}
测试结果 - 消费者1
测试结果 - 消费者2
结论
SQL92方式:过滤在服务端,但是功能还是局限的。如果服务器性能好压力不大,且过滤方式能满足。个人任务还是可用的
消费端逻辑处理方式:略费带宽,服务器压力小。过滤在消费端