RocketMQ 顺序消息收发实践

目录

  • 概述
  • 局部有序
    • 创建 Topic
    • 配置
    • 代码
    • 测试
  • 结束

概述

  • 顺序消息
    • 全局有序:适用于性能不是特别高的场景,但是又要求消息又严格一致的概念。
    • 局部有序:适用于性能要求高的场景,想办法通过在设计层面处理有序的消息尽量发送至同一个 Topic 中的同一个队列。
  • 两种有序创建方法
    • 全局有序:
      • perm:2:只写,4:只读;6:读写
      • 创建一个 Topic 只有一个队列。
    • 局部有序:
      • Partly-Orderly-Topic

注意:本文只会针对 局部有序进行实践。

官方文档

局部有序

常见做法就是将 order id 进行处理,将 order id 相同的消息发送到 topicB 的同一个 queue,假设我们 topicB 有 2 个 queue,那么我们可以简单的对 id 取余,奇数的发往 queue0,偶数的发往 queue1,消费者按照 queue 去消费时,就能保证 queue0 里面的消息有序消费,queue1 里面的消息有序消费。

程序局部有序如下图设计进行测试。
在这里插入图片描述

创建 Topic

sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t orderly

[root@hadoop02 rocketmq-all-5.1.4-bin-release]# sh bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t orderly
create topic to 10.32.36.143:10911 success.
TopicConfig [topicName=orderly, readQueueNums=8, writeQueueNums=8, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false, attributes={}]
[root@hadoop02 rocketmq-all-5.1.4-bin-release]# 

在这里插入图片描述

配置

注意:spring.cloud.stream.rocketmq.bindings.input.consumer.orderly=true

spring:
  cloud:
    stream:
      function:
        definition: consumer
      rocketmq:
        binder:
          name-server: 10.32.36.143:9876
        bindings:
          producer-out-0:
            producer:
              group: output_1
              # 定义messageSelector
              messageQueueSelector: orderlyMessageQueueSelector
          consumer-in-0:
            consumer:
              # tag: {@code tag1||tag2||tag3 }; sql: {@code 'color'='blue' AND 'price'>100 } .
#              subscription: 'TagA || TagC || TagD'
              orderly: true
      bindings:
        producer-out-0:
          destination: orderly
        consumer-in-0:
          destination: orderly
          group: orderly-consumer

logging:
  level:
    org.springframework.context.support: debug

上面配置进行补充如下:

  • spring.cloud.stream.bindings.通道名字.group 是针对具体通道的配置,用于设置该通道的消费组名。如果在这里设置了消费组名,那么就会覆盖全局配置。
  • spring.cloud.stream.rocketmq.binder.group 是全局配置,用于设置默认的消费组名。如果没有在具体的通道中设置消费组名,那么就会使用这个全局配置。

spring.cloud.stream.bindings和spring.cloud.stream.rocketmq.bindings 区别

  • 1.spring.cloud.stream.bindings是Spring Cloud Stream的核心配置属性,用于定义消息通道的绑定和配置。
  • spring.cloud.stream.rocketmq.bindings是Spring Cloud Stream与RocketMQ集成时的配置属性,用于定义RocketMQ消息通道的绑定和配置。

代码

public class SimpleMsg {
    private String msg;

    public SimpleMsg(String msg) {
        this.msg = msg;
    }

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }
}
@Component
public class OrderlyMessageQueueSelector implements MessageQueueSelector {
    private static final Logger log = LoggerFactory
            .getLogger(OrderlyMessageQueueSelector.class);
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) ((MessageHeaders) arg).get(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID);
        String tag = (String) ((MessageHeaders) arg).get(MessageConst.PROPERTY_TAGS);
        int index = id % MqApplication.tags.length % mqs.size();
        return mqs.get(index);
    }
}
@EnableDiscoveryClient
@SpringBootApplication
public class MqApplication {
    private static final Logger log = LoggerFactory
            .getLogger(MqApplication.class);

    public static final String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};

    public static void main(String[] args) {
        SpringApplication.run(MqApplication.class);
    }

    @Autowired
    private StreamBridge streamBridge;


    @Bean
    public ApplicationRunner producer() throws InterruptedException {
        Thread.sleep(6000);
        log.info("开始...");
        return args -> {
            for (int i = 0; i < 50; i++) {
                String key = "KEY" + i;
                Map<String, Object> headers = new HashMap<>();
                headers.put(MessageConst.PROPERTY_KEYS, key);
                headers.put(MessageConst.PROPERTY_TAGS, tags[i % tags.length]);
                headers.put(MessageConst.PROPERTY_ORIGIN_MESSAGE_ID, i%2);
                Message<SimpleMsg> msg = new GenericMessage(new SimpleMsg("Hello RocketMQ " + i%2), headers);
                streamBridge.send("producer-out-0", msg);
            }
        };
    }

    @Bean
    public Consumer<Message<SimpleMsg>> consumer() {
        return msg -> {
            String tagHeaderKey = RocketMQMessageConverterSupport.toRocketHeaderKey(
                    MessageConst.PROPERTY_TAGS).toString();
            log.info(Thread.currentThread().getName() + " Receive New Messages: " + msg.getPayload().getMsg() + " TAG:" +
                    msg.getHeaders().get(tagHeaderKey).toString());
            try {
                Thread.sleep(100);
            } catch (InterruptedException ignored) {
            }
        };
    }
}

测试

在这里插入图片描述

在这里插入图片描述

结束

至此,RocketMQ 顺序消息收发实践 就结束了,如有疑问,欢迎评论区留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/254775.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python自动化操作:简单、有趣、高效!解放你的工作流程!

今天跟大家分享一套自动化操作流程解决方案&#xff0c;基于Python语言&#xff0c;涉及pyautogui、pyperclip、pythoncom、win32com依赖包。安装命令为&#xff1a; pip install pyautoguipip install pyperclippip install pythoncompip install win32compyautogui 是一个自…

二级教师属于什么职称

教师的职称评定对于他们的职业发展具有重要意义。教育体系中&#xff0c;教师的职称分为多个等级&#xff0c;其中二级教师是其中的一个重要级别。那么&#xff0c;二级教师属于什么职称呢&#xff1f; 职称的定义。职称是指根据工作性质、职责、难度、能力等因素&#xff0c;对…

I Doc View 多个高危漏洞复现

I Doc View在线文档预览是一款在线文档预览系统。近期出现了多个高危漏洞&#xff0c;因此集中复现一下&#xff0c;有兴趣的童鞋可以收藏一下。#头条首发挑战赛# 1.Upload接口任意文件读取漏洞 1.1 漏洞级别 高危 1.2 漏洞描述 I Doc View存在代码执行漏洞&#xff0c;使…

研究前沿| scNanoCOOL-seq:单分子测序平台的单细胞多组学测序技术

scNanoCOOL-seq 2023年9月12日&#xff0c;汤富酬课题组在Cell Research上发表了题为“scNanoCOOL-seq: a long-read single-cell sequencing method for multi-omics profiling within individual cells”的论文&#xff0c;首次报道了scNanoCOOL-seq单细胞多组学测序技术。该…

深度学习中常见的激活函数

前文介绍 我们在前面了解到了线性回归模型&#xff0c;其实我们可以把线性回归看成一个单个的神经元&#xff0c;它实际上就完成了两个步骤 1.对输入的特征的加权求和 2.将结果通过传递函数&#xff08;或者激活函数&#xff09;输出 这里我们提到了传递函数&#xff08;或者…

如何本地搭建Zblog网站并通过内网穿透将个人博客发布到公网

文章目录 1. 前言2. Z-blog网站搭建2.1 XAMPP环境设置2.2 Z-blog安装2.3 Z-blog网页测试2.4 Cpolar安装和注册 3. 本地网页发布3.1. Cpolar云端设置3.2 Cpolar本地设置 4. 公网访问测试5. 结语 正文开始前给大家推荐个网站&#xff0c;前些天发现了一个巨牛的 人工智能学习网站…

简单搭建一个Python自动化测试框架

1. 安装Python 首先需要安装Python&#xff0c;可以从官网下载对应的版本。安装完成后&#xff0c;可以在终端中输入python来检查是否安装成功。 2. 安装pip pip是Python的包管理工具&#xff0c;用于安装和管理Python模块。可以在终端中输入以下命令来安装pip&#xff1a; …

法大大邀业内大咖剖析汽车名企数智化实战路径

法大大发布中国首部《汽车行业合同数智化白皮书》&#xff0c;聚焦趋势&#xff0c;解读行业数字化转型攻坚战的破局之道&#xff1b;深入内部&#xff0c;剖析名企数字化的探索实践。 长安汽车、蔚来汽车、上汽大通、 东风汽车集团、奥托立夫、长城滨银汽金… 一众名企高层…

LiteClient工具箱:降低成本,减少监管风险

​​发表时间&#xff1a;2023年9月14日 BSV区块链协会的工程团队一直在为即将推出的LiteClient而努力工作&#xff0c;这是一套模块化的组件&#xff0c;可使简易支付验证&#xff08;SPV&#xff09;变得更加便利。 借助LiteClient工具箱&#xff0c;交易所可以通过区块头中…

数字化医疗新篇章:构建智能医保支付购药系统

在迎接数字化医疗时代的挑战和机遇中&#xff0c;智能医保支付购药系统的建设显得尤为重要。本文将深入介绍如何通过先进的技术实现&#xff0c;构建一套智能、高效的医保支付购药系统&#xff0c;为全面建设健康中国贡献力量。 1. 引言 随着医疗科技的飞速发展&#xff0c;…

node加密集合(前端加密、后台解密)

文章目录 一、crypto 加解密生成私密钥公钥加密&#xff08;也可私钥加密&#xff09;私钥解密&#xff08;也可公钥解密&#xff09; 二、node-rsa加解密生成公私秘钥使用公钥加密&#xff08;也可私钥加密&#xff09;使用私钥解密&#xff08;也可公钥解密&#xff09; 三、…

KSP音频抓包

1. 按照网上其他教程&#xff0c;安装KSP抓音频 Biu~笔记&#xff1a;高通蓝牙ADK&#xff08;38&#xff09;-- KSP in MDE - 大大通(简体站) Biu~笔记&#xff1a;高通蓝牙ADK&#xff08;22&#xff09;--DSP音频链路监听 - 大大通(简体站) <<Biu~笔记&#xff1a;高…

使用java调用python批处理将pdf转为图片

你可以使用Java中的ProcessBuilder来调用Python脚本&#xff0c;并将PDF转换为图片。以下是一个简单的Java代码示例&#xff0c;假设你的Python脚本名为pdf2img.py&#xff1a; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader…

虚拟展会展览如何搭建,虚拟展会展览有哪些优势

引言&#xff1a; 随着科技的不断进步&#xff0c;虚拟展会展览正逐渐成为企业推广和交流的新方式。那么虚拟展会展览应该如何搭建&#xff0c;虚拟展会展览又能带来哪些好处呢&#xff1f; 一.什么是虚拟展会展览 虚拟展会展览是一种通过网络平台进行的展览&#xff0c;与传…

做PPT必须知道这5个PPT模板网站

做PPT千万不能错过这5个网站&#xff0c;免费下载&#xff0c;各种类型风格很齐全&#xff0c;建议收藏起来。 1、菜鸟图库 https://www.sucai999.com/search/ppt/0_0_0_1.html?vNTYxMjky 菜鸟图库素材非常齐全&#xff0c;设计、办公、图片、视频等素材这里都能找到&#xf…

面试算法58:日程表

题目 请实现一个类型MyCalendar用来记录自己的日程安排&#xff0c;该类型用方法book&#xff08;int start&#xff0c;int end&#xff09;在日程表中添加一个时间区域为[start&#xff0c;end&#xff09;的事项&#xff08;这是一个半开半闭区间&#xff09;。如果[start&…

云原生之深入解析Kubernetes本地持久化存储方案OpenEBS LocalPV的最佳实践

一、K8s 本地存储 K8s 支持多达 20 种类型的持久化存储&#xff0c;如常见的 CephFS 、Glusterfs 等&#xff0c;不过这些大都是分布式存储&#xff0c;随着社区的发展&#xff0c;越来越多的用户期望将 K8s 集群中工作节点上挂载的数据盘利用起来&#xff0c;于是就有了 loca…

Prometheus全面学习教程

一、Prometheus概述 1、Prometheus介绍 Prometheus 是一个开源的服务监控系统和时序数据库&#xff0c;其提供了通用的数据模型和快捷数据采集、存储和查询接口。它的核心组件Prometheus server会定期从静态配置的监控目标或者基于服务发现自动配置的自标中进行拉取数据&…

基于单片机的视力保护及身姿矫正器设计(论文+源码)

1. 系统设计 在本次设计中&#xff0c;其系统整个框图如图2-1所示。其主要的核心控制模块由超声波模块&#xff0c;光敏电阻&#xff0c;按键模块&#xff0c;复位电路&#xff0c;红外模块&#xff0c;LCD显示等组成。其包括自动模式&#xff0c;手动模式。自动模式&#xff…

代码随想录第三十四天(一刷C语言)|不同路径不同路径II

创作目的&#xff1a;为了方便自己后续复习重点&#xff0c;以及养成写博客的习惯。 一、不同路径 思路&#xff1a;参考carl文档 机器人每次只能向下或者向右移动一步&#xff0c;机器人走过的路径可以抽象为一棵二叉树&#xff0c;叶子节点就是终点。 1、确定dp数组&#…