RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及顺序消费实战

顺序消费实战

顺序消费分为两种:

  • 全局有序:适用于并发度不大,并且对消息要求严格一致性的场景下

    通过创建一个 topic,并且该 topic 下只有一个队列,那么生产者向着一个队列中发消息,消费者也在这一个队列中消费消息,来保证消息的有序性

  • 局部有序:适用于对性能要求比较高的场景,在设计层面将需要保证有序的消息放在 Topic 下的同一个队列即可保证有序

全局有序

要保证全局有序的话,我们先通过上边启动的 Dashboard 项目,创建一个只有一个队列的 Topic

写队列和读队列 都设置为 1 个,perm 设置为6(perm,2:只写; 4-只读; 6-读写;)

在这里插入图片描述

全局有序流程图如下:

在这里插入图片描述

首先消费者主启动类如下:

@SpringBootApplication
@EnableBinding({CustomSink.class })
public class OrderlyConsumerApplication {

    @Value("${server.port}")
    private int port;

    public static void main(String[] args) {
        SpringApplication.run(OrderlyConsumerApplication.class, args);
        System.out.println("【【【【【  OrderlyConsumerApplication 启动成功!!!   】】】】】");
    }

    // 定义两个通道,input 接收全局有序消息,input2 接收局部有序消息
    @StreamListener("input")
    public void receiveInput(String receiveMsg) {
        System.out.println(port + " port, input receive: " + receiveMsg);
    }

    @StreamListener("input2")
    public void receiveInput2(String receiveMsg) {
        System.out.println(port + " port, input2 receive: " + receiveMsg);
    }
}

自定义 CustomSink 如下:

public interface CustomSink extends Sink {

    /**
     * Input channel name.
     */
    String INPUT2 = "input2";

    /**
     * @return input channel.
     */
    @Input(CustomSink.INPUT2)
    SubscribableChannel input2();
}

配置类 application.properties 如下:

spring.application.name=mq_orderly_consumer
server.port=9530

# configure the nameserver of rocketmq
spring.cloud.stream.rocketmq.binder.name-server=127.0.0.1:9876
spring.cloud.stream.rocketmq.binder.group=mq_producer_group

# configure the input binding named input
spring.cloud.stream.bindings.input.destination=Global-Orderly-Topic
spring.cloud.stream.bindings.input.content-type=application/json
spring.cloud.stream.bindings.input.group=Global-Orderly-Topic-group
spring.cloud.stream.rocketmq.bindings.input.consumer.orderly=true

# configure the input binding named input
spring.cloud.stream.bindings.input2.destination=Partly-Orderly-Topic
spring.cloud.stream.bindings.input2.content-type=application/json
spring.cloud.stream.bindings.input2.group=Partly-Orderly-Topic-group
spring.cloud.stream.rocketmq.bindings.input2.consumer.orderly=true

全局有序生产者代码如下:

public class GlobalProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(
                "producer_group",
                true);
        producer.setNamesrvAddr("218.95.37.160:9876");
        producer.start();

        for (int i = 0; i < 12; i++) {
            Message msg = new Message(
                    "Global-Orderly-Topic",
                    "Global_Orderly_Tag",
                    ("( " + i + " )message from GlobalProducer").getBytes());
            msg.setKeys("Global_Orderly_Tag");
            producer.send(msg);
        }
        System.out.println("Send Finished.");
    }
}

先启动消费者,再启动生产者,即可看到在消费者端,消息被有序消费

局部有序

局部有序的话,我们将需要保证有序的消息放在同一个 Topic 下的队列即可保证有序,这里设计的让 OrderId 相同的消息放在同一个队列中发送,流程图如下:

在这里插入图片描述

在局部有序中,消费者依然使用全局有序中的消费者,局部生产者代码如下:

public class PartlyProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(
                "producer_group",
                true);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        /**
         * orderId = 1 的消息,需要按照 step 的顺序进行消费
         * orderId = 2 的消息,需要按照 step 的顺序进行消费
         */
        List<Order> list = new ArrayList<>();
        for (int i = 1; i <= 3; i ++) {
            Order order = new Order();
            order.orderId = 1;
            order.step = i;
            list.add(order);
        }
        for (int i = 5; i <= 8; i ++) {
            Order order = new Order();
            order.orderId = 2;
            order.step = i;
            list.add(order);
        }

        System.out.println(list);

        int size = list.size();
        for (int i = 0; i < size; i++) {
            Order order = list.get(i);
            Message msg = new Message(
                    "Partly-Orderly-Topic",
                    "Partly_Orderly_Tag",
                    (order.toString()).getBytes());
            msg.setKeys("Partly_Orderly_Tag");
            /**
             * 这里发送消息的时候,根据 orderId 来选择对应发送的队列
             */
            producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    int orderId = (int)arg;
                    int idx = orderId % mqs.size();
                    return mqs.get(idx);
                }
            }, order.orderId);
        }
        System.out.println("Send Finished.");
    }



    public static class Order {
        int orderId;
        int step;
        @Override
        public String toString() {
            return "Order{" +
                    "orderId=" + orderId +
                    ", step=" + step +
                    '}';
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/250575.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Redis-数据结构

参考资料 极客时间Redis&#xff08;亚风&#xff09; Redis数据结构 SDS sds(Simple Dynamic String) 字符串接结构体: struct --attribute_- ((-_packed__)) sdshdr8{uint8_t len&#xff1b;/* buf已保祥的字符串字节数&#xff0c;不包含结束标示*/uint8_t alloc&#…

正式开通运营!“一应黔行”服务网点进驻贵阳地铁3号线

今天下午14:00&#xff0c;贵阳地铁3号线正式进入初期运营。为进一步实现一卡通在贵阳市全区域覆盖&#xff0c;不断提升“一应黔行一卡通”业务办理效率&#xff0c;贵阳市信捷科技有限公司在贵阳地铁3号线明珠大道站、顺海站、皂角井站3个站点设立“一应黔行”服务网点&#…

[Spring ~松耦合的设计神器]`SPI`

Java SPI&#xff08;Service Provider Interface&#xff09;是一种Java的服务提供者接口机制。它允许在运行时动态加载实现服务接口的类。 文章目录 基本概念最简单的实例使用 jar 包通过 spi动态实现接口功能 基本概念 SPI 机制的基本思想是&#xff0c;定义一个服务接口&a…

基于ssm线上学习网站论文

线上学习网站 摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;作为学校以及一些培训机构&#xff0c;都在用信息化战术来部署线上学习以及线上考试&#xff0c;可以与线下的考试有机的结合在一起&#xff0c;实现线上学习网站在技术上已成熟。本文介绍了线上学…

【Android开发-25】Android中多线程编程用法介绍

1&#xff0c;线程基本用法 在Android中&#xff0c;线程的使用主要有两种方法&#xff1a;一种是扩展java.lang.Thread类&#xff0c;另一种是实现Runnable接口。 1.1以下是一个简单的Android线程继承Thread的用法示例&#xff1a; public class MyThread extends Thread {…

maven+spock

pom配置 话说JunitMockito的组合用起来是真难用&#xff0c;还是Spock的简单&#xff0c;尤其是参数化的测试。junit的Parameter是鸡肋&#xff0c;杂恶心&#xff1b;Theories用来也不爽。 <?xml version"1.0" encoding"UTF-8"?><project xm…

SoloLinker第一次使用记录,解决新手拿到板子的无所适从

本文目录 一、简介二、进群获取资料2.1 需要下载资料2.2 SDK 包解压 三、SDK 编译3.1 依赖安装3.2 编译配置3.3 启动编译3.4 编译后的固件目录 四、固件烧录4.1 RV1106 驱动安装4.2 打开烧录工具4.3 进入boot 模式&#xff08;烧录模式&#xff09;4.4 烧录启动固件4.5 烧录升级…

Spring入门

学习的最大理由是想摆脱平庸&#xff0c;早一天就多一份人生的精彩&#xff1b;迟一天就多一天平庸的困扰。各位小伙伴&#xff0c;如果您&#xff1a; 想系统/深入学习某技术知识点… 一个人摸索学习很难坚持&#xff0c;想组团高效学习… 想写博客但无从下手&#xff0c;急需…

设计模式——组合模式(结构型)

引言 组合模式是一种结构型设计模式&#xff0c; 你可以使用它将对象组合成树状结构&#xff0c; 并且能像使用独立对象一样使用它们。 问题 如果应用的核心模型能用树状结构表示&#xff0c; 在应用中使用组合模式才有价值。 例如&#xff0c; 你有两类对象&#xff1a; ​…

Graphics Profiler 使用教程

GraphicsProfiler 使用教程 1.工具简介&#xff1a;2.Navigation介绍2.1.打开安装好的Graphics Profiler。2.2.将手机连接到计算机&#xff0c;软件会在手机中安装一个GraphicsProfiler应用(该应用是无界面的&#xff09;。2.3.Show files list2.4.Record new trace2.4.1.Appli…

java --- 集合进阶

目录 一、单列集合顶层接口 Collection 1.1 基本方法 1.2 Collection 的遍历方式 二、list集合 1.2 ArrayList Vector 底层结构 1.3 LinkedList ArrayList 和 LinkedList 比较 三、set接口 3.1、Set 接口和常用方法 3.2 HashSet HashSet 底层机制&#xff08;HashMap…

Python 直观理解基尼系数

基尼系数最开始就是衡量人群财富收入是否均衡&#xff0c;大家收入平平&#xff0c;那就是很平均&#xff0c;如果大家收入不平等&#xff0c;那基尼系数就很高。 还是给老干部们讲的言简意赅。 什么是基尼系数 我们接下来直接直观地看吧&#xff0c;程序说话 # -*- coding:…

万兆网络之屏蔽线序接法(上)

可以经常听到用RJ45指代网线&#xff0c;用RJ11指代电话线的&#xff0c;RJ&#xff08;Registered Jack&#xff09;即已注册插口&#xff0c;可以简单理解为一种约定就行&#xff08;参见参考链接&#xff09; 前一篇已经讲到&#xff0c;网线线对互相缠绕是为了电流方向相反…

CSRF(跨站脚本请求)

一、漏洞原理 CSRF&#xff08;Cross-Site Request Forgery&#xff09;是一种网络安全攻击&#xff0c;攻击者通过欺骗用户在不知情的情况下发送请求&#xff0c;从而实现对目标网站的操作。 网站管理员(已经登录网站后台)——黑客构造的恶意服务器(是网站的创建用户请求)——…

(第6天)RHEL 8 安装单机 Oracle 19C CDB 数据库

RHEL 8 安装单机 Oracle 19C 数据库(第6天) 随着 Oracle 版本的升级,硬件也在不断更新迭代,为了迎合这种趋势,Linux 系统也在不断升级,目前已经更新至 8 代版本。相信不久的将来,Linux 8 和 Oracle 19C 将成为主流版本,因此不得不讲 Linux 8 如何安装 Oracle 19C 数据…

K8s投射数据卷

目录 一.Secret 1.secret介绍 2.secret的类型 3.创建secret 4.使用secret 环境变量的形式 volume数据卷挂载 二ConfigMap 1.创建ConfigMap的方式 2.使用ConfigMap 2.1作为volume挂载使用 2.2.作为环境变量 三.Downward API 1.以环境变量的方式实现 2.Volume挂载 一.S…

C++相关闲碎记录(16)

1、正则表达式 &#xff08;1&#xff09;regex的匹配和查找接口 #include <regex> #include <iostream> using namespace std;void out (bool b) {cout << ( b ? "found" : "not found") << endl; }int main() {// find XML/H…

0/1背包问题

实验要求 随机生成500个0/1背包问题&#xff08;问题规模可以相对较小&#xff09;&#xff0c;分别使用贪心算法和动态规划进行求解&#xff0c; 要求&#xff1a;1&#xff09;统计贪心算法求得最优值的概率&#xff0c; 2&#xff09;计算比值 3&#xff09;应用贪心算法…

Postman中参数填写方式

Postman中参数填写和请求方法有关&#xff0c;一般接口用例请求方法GET与POST常用&#xff0c;所以主要是这两种请求方法请求参数填写 一、GET请求方法参数填写 1、直接在URL中填写请求参数,如直接在URL中填写&#xff1a; http://www.example.com:8089/userapi?unamelisi&…

直播原理,直播CDN及相关协议

一、直播原理 直播是一对多的完整的视频解编码原理&#xff1a; 那么直播的原理无疑也是要基于视频的解编码原理的 参考视频 二、直播CDN 什么是CDN就不多说了&#xff0c;可参考亚马逊的文章 三、相关协议 RTMP及HTTP-FLV&#xff08;都是在FLV封装格式基础上的&#xf…