今天学学消息队列RocketMQ:消息类型

        RocketMQ支持的消息类型有三种:普通消息、顺序消息、延时消息、事务消息。以下内容的代码部分都是基于rocketmq-spring-boot-starter做的。

普通消息

        普通消息是一种无序消息,消息分布在各个MessageQueue当中,以保证效率为第一使命。这种消息适用于对顺序没有要求的基础消费需求。这里的Topic和MessageQueue是多对多关系。

// 生产者
public static void main(String[] args) throws MQClientException {
    DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
    producer.setNamesrvAddr("172.16.200.38:9876");
    producer.setInstanceName("producer");
    producer.start();
    try {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            Message msg = new Message("Topic-test", "testTag"
                    , (new Date() + " RocketMQ test msg " + i).getBytes());
            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult.getMsgId());
            System.out.println(sendResult.getMessageQueue());
            System.out.println(sendResult.getSendStatus());
            System.out.println(sendResult.getOffsetMsgId());
            System.out.println(sendResult.getQueueOffset());
            System.out.println();
            System.out.println("============================");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    producer.shutdown();
}

// 消费者
public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
    consumer.setNamesrvAddr("172.16.200.38:9876");
    consumer.setInstanceName("consumer");
    consumer.subscribe("Topic-test", "*");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(messageExt.getBody()));
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //启动消费者
    consumer.start();
}

顺序消息

        当我开始对消息的时序性有要求的时候,普通消息就无法满足我们的需求了。当我们要求顺序消费的时候,我们的Topic就不能采用多对多的形式,存放在多个MessageQueue中,而是需要牺牲一部分性能,存放在一个MessageQueue中。

// 生产者
public static void main(String[] args) throws MQClientException {
    DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
    producer.setNamesrvAddr("172.16.200.38:9876");
    producer.setInstanceName("producer");
    producer.start();
    try {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            Message msg = new Message("Topic-test", "testTag"
                    , (new Date() + " RocketMQ test msg " + i).getBytes());
            SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> mqs.get(0), null);

            System.out.println(sendResult.getMsgId());
            System.out.println(sendResult.getMessageQueue());
            System.out.println(sendResult.getSendStatus());
            System.out.println(sendResult.getOffsetMsgId());
            System.out.println(sendResult.getQueueOffset());
            System.out.println("RocketMQ test msg " + i);
            System.out.println("============================");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    producer.shutdown();
}

// 消费者
public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
    consumer.setNamesrvAddr("172.16.200.38:9876");
    consumer.setInstanceName("consumer");
    consumer.subscribe("Topic-test", "*");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    // 跟普通消费不同的地方,这里采用了顺序消费方法。
    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            for (MessageExt messageExt : msgs) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(messageExt.getBody()));
            }

            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    //启动消费者
    consumer.start();
}

 延时消息

        通过指定的通知时间间隔,让消息不会立刻被消费者收到。

// 生产者
public static void main(String[] args) throws MQClientException {
    DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
    producer.setNamesrvAddr("172.16.200.38:9876");
    producer.setInstanceName("producer");
    producer.start();
    try {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            Message msg = new Message("Topic-test", "testTag"
                    , (new Date() + " RocketMQ test msg " + i).getBytes());
            // 指定延时等级
            // DelayTimeLevel对应的延时时间在服务端定义:rocketmq/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
            // private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
            msg.setDelayTimeLevel(4);
            // 按毫秒延时
            msg.setDelayTimeMs(1L);
            // 按秒延时
            msg.setDelayTimeSec(1);
            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult.getMsgId());
            System.out.println(sendResult.getMessageQueue());
            System.out.println(sendResult.getSendStatus());
            System.out.println(sendResult.getOffsetMsgId());
            System.out.println(sendResult.getQueueOffset());
            System.out.println();
            System.out.println("============================");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    producer.shutdown();
}

// 消费者
public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
    consumer.setNamesrvAddr("172.16.200.38:9876");
    consumer.setInstanceName("consumer");
    consumer.subscribe("Topic-test", "*");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                System.out.printf("Receive New Messages: %s At %s %n", new String(messageExt.getBody()), new Date());
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //启动消费者
    consumer.start();
}

30秒的延迟

事务消息

        事务消息是RocketMQ提供的一种高级的消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息的生命周期

        (1)初始化:半事务消息被生产者构建并完成初始化,待发送到RocketMQ服务端的状态。

        (2)事务待提交:半事务消息被发送到服务端,和普通消息不同,半事务消息并不会直接被服务端持久化,而是会被单独存储到事务存储系统中,等待第二阶段本地事务返回执行结果后再提交。此时消息对下游消费者不可见。

        (3)消息回滚:第二阶段如果事务执行结果明确为回滚,服务端会将半事务消息回滚,该事务消息流程终止。

        (4)提交事务待消费:第二阶段如果事务执行结果明确为提交,服务端会将半事务消息重新存储到普通存储系统中,此时消息对下游消费者可见。

        (5)消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。此时服务端会等待消费者完成消费并提交消费结果,如果一段时间后没有收到消费者的响应,服务端会对消息进行重试处理。

        (6)消费完成:消费者完成了消费动作,并向服务端提交了消费结果,服务端标记当前消息已经被处理。

        (7)消息删除:服务端按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

// 生产者
public static void main(String[] args) throws MQClientException {
    TransactionMQProducer producer = new TransactionMQProducer("rmq-group");
    TransactionListener listener = new TransactionListenerImpl();
    producer.setNamesrvAddr("172.16.200.38:9876");
    producer.setInstanceName("producer");
    producer.setTransactionListener(listener);
    producer.start();
    try {
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
            Message msg = new Message("Topic-test", "testTag"
                    , (new Date() + " RocketMQ test msg " + i).getBytes());
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.println(new Date());
            System.out.println(sendResult.getMsgId());
            System.out.println(sendResult.getMessageQueue());
            System.out.println(sendResult.getSendStatus());
            System.out.println(sendResult.getOffsetMsgId());
            System.out.println(sendResult.getQueueOffset());
            System.out.println();
            System.out.println("============================");
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    producer.shutdown();
}

// 消费者
public static void main(String[] args) throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
    consumer.setNamesrvAddr("172.16.200.38:9876");
    consumer.setInstanceName("consumer");
    consumer.subscribe("Topic-test", "*");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt messageExt : msgs) {
                System.out.printf("Receive New Messages: %s At %s %n", new String(messageExt.getBody()), new Date());
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //启动消费者
    consumer.start();
}

 控制台打印

// 生产者控制台打印
Wed Jul 26 17:55:45 CST 2023 RocketMQ test msg 0 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:45 CST 2023
7F00000154F073D16E938497DE420000
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
93

============================
Wed Jul 26 17:55:46 CST 2023 RocketMQ test msg 1 : executeLocalTransaction:未知状态
Wed Jul 26 17:55:46 CST 2023
7F00000154F073D16E938497E2340001
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
94

============================
Wed Jul 26 17:55:47 CST 2023 RocketMQ test msg 2 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:47 CST 2023
7F00000154F073D16E938497E6230002
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=3]
SEND_OK
null
95

============================
Wed Jul 26 17:55:48 CST 2023 RocketMQ test msg 3 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:48 CST 2023
7F00000154F073D16E938497EA180003
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=0]
SEND_OK
null
96

============================
Wed Jul 26 17:55:49 CST 2023 RocketMQ test msg 4 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:49 CST 2023
7F00000154F073D16E938497EE0B0004
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
97

============================
Wed Jul 26 17:55:50 CST 2023 RocketMQ test msg 5 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:50 CST 2023
7F00000154F073D16E938497F1F70005
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
98

============================
Wed Jul 26 17:55:51 CST 2023 RocketMQ test msg 6 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:51 CST 2023
7F00000154F073D16E938497F5EC0006
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=3]
SEND_OK
null
99

============================
Wed Jul 26 17:55:52 CST 2023 RocketMQ test msg 7 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:52 CST 2023
7F00000154F073D16E938497F9E50007
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=0]
SEND_OK
null
100

============================
Wed Jul 26 17:55:53 CST 2023 RocketMQ test msg 8 : executeLocalTransaction:事务执行失败,回滚
Wed Jul 26 17:55:53 CST 2023
7F00000154F073D16E938497FDD30008
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=1]
SEND_OK
null
101

============================
Wed Jul 26 17:55:54 CST 2023 RocketMQ test msg 9 : executeLocalTransaction:事务执行成功,提交
Wed Jul 26 17:55:54 CST 2023
7F00000154F073D16E93849801C90009
MessageQueue [topic=Topic-test, brokerName=localhost.localdomain, queueId=2]
SEND_OK
null
102

============================


// 消费者控制台打印
Receive New Messages: Wed Jul 26 17:55:50 CST 2023 RocketMQ test msg 5 At Wed Jul 26 17:55:50 CST 2023 
Receive New Messages: Wed Jul 26 17:55:51 CST 2023 RocketMQ test msg 6 At Wed Jul 26 17:55:51 CST 2023 
Receive New Messages: Wed Jul 26 17:55:54 CST 2023 RocketMQ test msg 9 At Wed Jul 26 17:55:54 CST 2023 

这里消费者只收到了执行事务成功的5,6,9。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/52375.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

AI绘画Stable Diffusion原理之Autoencoder-Latent

前言 传送门&#xff1a; stable diffusion&#xff1a;Git&#xff5c;论文 stable-diffusion-webui&#xff1a;Git Google Colab Notebook&#xff1a;Git kaggle Notebook&#xff1a;Git 今年AIGC实在是太火了&#xff0c;让人大呼许多职业即将消失&#xff0c;比如既能帮…

【Vscode | R | Win】R Markdown转html记录-Win

Rmd文件转html R语言环境Vscode扩展安装及配置配置radian R依赖包pandoc安装配置pandoc环境变量验证是否有效转rmd为html 注意本文代码块均为R语言代码&#xff0c;在R语言环境下执行即可 R语言环境 官网中去下载R语言安装包以及R-tool 可自行搜寻教程 无需下载Rstudio Vscod…

Linux:ELK:日志分析系统(使用elasticsearch集群)

原理 1. 将日志进行集中化管理&#xff08;beats&#xff09; 2. 将日志格式化&#xff08;logstash&#xff09; 将其安装在那个上面就对那个进行监控 3. 对格式化后的数据进行索引和存储&#xff08;elasticsearch&#xff09; 4. 前端数据的展示&#xff08;kibana&…

python多进程编程(模式与锁)

multiprocessing的三种模式 fork&#xff0c;【拷贝几乎所有资源】【支持文件对象/线程锁等传参】【unix】【任意位置开始】【快】spawn&#xff0c;【run参数传参必备资源】【不支持文件对象/线程锁等传参】【unix、win】【main代码块开始】【慢】forkserver&#xff0c;【ru…

C++ 类和对象

面向过程/面向对象 C语言是面向过程&#xff0c;关注过程&#xff0c;分析出求解问题的步骤&#xff0c;通过函数调用逐步解决问题 C是基于面对对象的&#xff0c;关注的是对象——将一件事拆分成不同的对象&#xff0c;依靠对象之间的交互完成 引入 C语言中结构体只能定义…

41. linux通过yum安装postgresql

文章目录 1.下载安装包2.关闭内置PostgreSQL模块:3.安装postgresql服务:4.初始化postgresql数据库:5.设置开机自启动:6.启动postgresql数据库7.查看postgresql进程8.通过netstat命令或者lsof 监听默认端口54329.使用find命令查找了一下postgresql.conf的配置位置10.修改postgre…

ARM将常数加载到寄存器方法之LDR伪指令

一、是什么&#xff1f; LDR Rd,const伪指令可在单个指令中构造任何32位数字常数,使用伪指令可以生成超过MOV和MVN指令 允许范围的常数. 实现原理: (1)如果可以用MOV或MVN指令构造该常数,则汇编程序会生成适当的指令 (2)如果不能用MOV或MVN指令构造该常数,则汇编程序会执行下列…

QEMU源码全解析19 —— QOM介绍(8)

接前一篇文章&#xff1a;QEMU源码全解析18 —— QOM介绍&#xff08;7&#xff09; 本文内容参考&#xff1a; 《趣谈Linux操作系统》 —— 刘超&#xff0c;极客时间 《QEMU/KVM》源码解析与应用 —— 李强&#xff0c;机械工业出版社 特此致谢&#xff01; 上一回讲到了Q…

用C语言实现堆排序算法

1.设计思路 排序的思想将一个数组按递增的顺序进行排序&#xff0c;将数组的第一个位置空下&#xff08;下标为0&#xff09;&#xff0c;因为会导致子节点和本身同一个结点&#xff08;i和2i一致&#xff09;&#xff0c;每次堆排序在下标1的位置放上了最大值&#xff0c;然后…

我对排序算法的理解

排序算法一直是一个很困惑我的问题&#xff0c;早在刚开始接触 数据结构的时候&#xff0c;这个地方就很让我不解。就是那种&#xff0c;总是感觉少了些什么的感觉。一开始&#xff0c;重新来过&#xff0c;认真来学习这一部分&#xff0c;也总是学着学着就把概念记住了。过了一…

版本适配好帮手 Android SDK Upgrade Assistant / Android Studio Giraffe新功能

首先是新版本一顿下载↓&#xff1a; Download Android Studio & App Tools - Android Developers 在Tools中找到Android SDK Upgrade Assistant 可以在此直接查看SDK升级相关信息&#xff0c;不用跑到WEB端去查看了。 例如看一下之前经常要对老项目维护的android 12蓝牙…

RAID相关知识

简介 RAID &#xff08; Redundant Array of Independent Disks &#xff09;即独立磁盘冗余阵列&#xff0c;通常简称为磁盘阵列。RAID技术将多个单独的物理硬盘以不同的方式组合成一个逻辑磁盘&#xff0c;从而提高硬盘的读写性能和数据安全性。 数据组织形式 分块&#x…

给定长度值length,把列表切分成每段长度为length的N段列表,Kotlin

给定长度值length&#xff0c;把列表切分成每段长度为length的N段列表&#xff0c;Kotlin import kotlin.random.Randomfun main(args: Array<String>) {var source mutableListOf<String>()val end Random.nextInt(30) 1for (i in 0 until end) {source.add(i.…

ubuntu22.04 DNSSEC(加密DNS服务) configuration

/etx/systemd/resolved.conf是ubuntu下DNS解析服务配置文件&#xff0c;systemd为ubuntu下system and service配置目录 step 1——修改resolved.conf参数 管理员权限打开 /systemd/resolved.conf sudo nano /etc/systemd/resolved.conf修改如下&#xff1a; # This file i…

DAY14_FilterListenerAjaxAxiosJsonfastjson综合案例-axios和html交互

目录 1 Filter1.1 Filter概述1.2 Filter快速入门1.2.1 开发步骤1.2.2 代码演示 1.3 Filter执行流程1.4 Filter拦截路径配置1.5 过滤器链1.5.1 概述1.5.2 代码演示1.5.3 问题 1.6 案例1.6.1 需求1.6.2 分析1.6.3 代码实现1.6.3.1 创建Filter1.6.3.2 编写逻辑代码1.6.3.3 测试并抛…

51单片机定时器/计数器

目录 1、定时器/计数器0/1介绍 1.1 定时器介绍 1.2 单片机定时/计数器原理 2、定时器/计数器0和1的相关寄存器 2.1 定时器/计数器控制寄存器TCON 2.2 定时器/计数器工作模式寄存器TMOD 2.3 定时器/计数器工作模式 2.3.1 模式0(13位定时器/计数器) 2.3.2 模式1(16位定…

SpringBoot运维

能够掌握SpringBoot程序多环境开发 能够基于Linux系统发布SpringBoot工程 能够解决线上灵活配置SpringBoot工程的需求 Windows打包运行 你的电脑不可能一直开着机联网作为服务器&#xff1a; 我们将我们项目打包放到外部的服务器上&#xff0c;这样其他用户才能正常访问&#x…

设计模式之四:工厂模式

引言&#xff1a;除了使用new操作符之外&#xff0c;还有更多制造对象的方法。同时&#xff0c;实例化这个活动不应该总是公开地进行。 1.简单工厂模式 这里有一些相关的具体类&#xff0c;要在运行时有一些具体条件来决定究竟实例化哪个类。这样的代码&#xff08;if..elseif…

MFC自定义控件使用

用VS2005新建一个MFC项目,添加一个Custom Control控件在窗体 我们需要为自定义控件添加一个类。项目,添加类,MFC类 设置类名字,基类为CWnd,你也可以选择CDialog作为基类 类创建完成后,在它的构造函数中注册一个新的自定义窗体,取名为"MyWindowClass" WNDCL…

深入了解HTTP代理在网络爬虫与SEO实践中的角色

随着互联网的不断发展&#xff0c;搜索引擎优化&#xff08;SEO&#xff09;成为各大企业和网站重要的推广手段。然而&#xff0c;传统的SEO方法已经难以应对日益复杂和智能化的搜索引擎算法。在这样的背景下&#xff0c;HTTP代理爬虫作为一种重要的工具&#xff0c;正在逐渐被…