Java零基础——RocketMQ篇

1.RocketMQ简介

官网: http://rocketmq.apache.org/
在这里插入图片描述

RocketMQ是阿里巴巴2016年MQ中间件,使用Java语言开发,RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  1. l能够保证严格的消息顺序

  2. l提供丰富的消息拉取模式

  3. l高效的订阅者水平扩展能力

  4. l实时的消息订阅机制

  5. l亿级消息堆积能力

2.为什么要使用MQ

1,要做到系统解耦,当新的模块进来时,可以做到代码改动最小; 能够解耦

2,设置流程缓冲池,可以让后端系统按自身吞吐能力进行消费,不被冲垮; 能够削峰,限流

3,强弱依赖梳理能把非关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步

Mq的作用 削峰限流 异步 解耦合

2.1 定义
中间件(缓存中间件 redis memcache 数据库中间件 mycat canal 消息中间件mq )

面向消息的中间件(message-oriented middleware0) MOM能够很好的解决以上的问题。

是指利用高效可靠的消息传递机制进行与平台无关(跨平台)的数据交流,并基于数据通信来进行分布式系统的集成。

通过提供消息传递和消息排队模型在分布式环境下提供应用解耦,弹性伸缩,冗余存储,流量削峰,异步通信,数据同步等

大致流程

发送者把消息发给消息服务器,消息服务器把消息存放在若干队列/主题中,在合适的时候,消息服务器会把消息转发给接受者。在这个过程中,发送和接受是异步的,也就是发送无需等待,发送者和接受者的生命周期也没有必然关系在发布pub/订阅sub模式下,也可以完成一对多的通信,可以让一个消息有多个接受者[微信订阅号就是这样的]
在这里插入图片描述

2.2 特点
2.2.1 异步处理模式
消息发送者可以发送一个消息而无需等待响应。消息发送者把消息发送到一条虚拟的通道(主题或队列)上;

消息接收者则订阅或监听该通道。一条信息可能最终转发给一个或多个消息接收者,这些接收者都无需对消息发送者做出回应。整个过程都是异步的。

案例:

也就是说,一个系统和另一个系统间进行通信的时候,假如系统A希望发送一个消息给系统B,让它去处理,但是系统A不关注系统B到底怎么处理或者有没有处理好,所以系统A把消息发送给MQ,然后就不管这条消息的“死活” 了,接着系统B从MQ里面消费出来处理即可。至于怎么处理,是否处理完毕,什么时候处理,都是系统B的事,与系统A无关。
在这里插入图片描述

这样的一种通信方式,就是所谓的“异步”通信方式,对于系统A来说,只要把消息发给MQ,然后系统B就会异步处去进行处理了,系统A不能“同步”的等待系统B处理完。这样的好处是什么呢?解耦

2.2.2 应用系统的解耦
发送者和接收者不必了解对方,只需要确认消息

发送者和接收者不必同时在线

2.2.3 现实中的业务
在这里插入图片描述

3.各个MQ产品的比较

在这里插入图片描述

4.RocketMQ重要概念【重点】

Producer:消息的发送者,生产者;举例:发件人

Consumer:消息接收者,消费者;举例:收件人

Broker:暂存和传输消息的通道;举例:快递

NameServer:管理Broker;举例:各个快递公司的管理机构 相当于broker的注册中心,保留了broker的信息

Queue:队列,消息存放的位置,一个Broker中可以有多个队列

Topic:主题,消息的分类

ProducerGroup:生产者组

ConsumerGroup:消费者组,多个消费者组可以同时消费一个主题的消息

消息发送的流程是,Producer询问NameServer,NameServer分配一个broker 然后Consumer也要询问NameServer,得到一个具体的broker,然后消费消息
在这里插入图片描述

5.生产和消费理解【重点】

在这里插入图片描述

6.RocketMQ安装

了解了mq的基本概念和角色以后,我们开始安装rocketmq,建议在linux上

6.1 下载RocketMQ
下载地址:https://rocketmq.apache.org/dowloading/releases/

注意选择版本,这里我们选择4.9.2的版本,后面使用alibaba时对应
在这里插入图片描述

下载地址:

https://archive.apache.org/dist/rocketmq/4.9.2/rocketmq-all-4.9.2-bin-release.zip

6.2 上传服务器
在root目录下创建文件夹

mkdir rocketmq

将下载后的压缩包上传到阿里云服务器或者虚拟机中去
在这里插入图片描述

6.3 解压
unzip rocketmq-all-4.9.2-bin-release.zip

如果你的服务器没有unzip命令,则下载安装一个

yum install unzip

目录分析
在这里插入图片描述

Benchmark:包含一些性能测试的脚本;

Bin:可执行文件目录;

Conf:配置文件目录;

Lib:第三方依赖;

LICENSE:授权信息;

NOTICE:版本公告;

6.4 配置环境变量
vi /etc/profile

在文件末尾添加

export NAMESRV_ADDR=阿里云公网IP:9876

刷新环境变量

source /etc/profile

6.5 修改nameServer的运行脚本
进入bin目录下,修改runserver.sh文件,将71行和76行的Xms和Xmx等改小一点

vi runserver.sh
在这里插入图片描述

保存退出

6.6 修改broker的运行脚本
进入bin目录下,修改runbroker.sh文件,修改67行
在这里插入图片描述

保存退出

6.7 修改broker的配置文件
进入conf目录下,修改broker.conf文件

brokerClusterName = DefaultCluster

brokerName = broker-a

brokerId = 0

deleteWhen = 04

fileReservedTime = 48

brokerRole = ASYNC_MASTER

flushDiskType = ASYNC_FLUSH

namesrvAddr=localhost:9876

autoCreateTopicEnable=true

brokerIP1=阿里云公网IP

添加参数解释

namesrvAddr:nameSrv地址 可以写localhost因为nameSrv和broker在一个服务器

autoCreateTopicEnable:自动创建主题,不然需要手动创建出来

brokerIP1:broker也需要一个公网ip,如果不指定,那么是阿里云的内网地址,我们再本地无法连接使用

6.8 启动
首先在安装目录下创建一个logs文件夹,用于存放日志

mkdir logs
在这里插入图片描述

一次运行两条命令

启动nameSrv

nohup sh bin/mqnamesrv > ./logs/namesrv.log &

启动broker 这里的-c是指定使用的配置文件

nohup sh bin/mqbroker -c conf/broker.conf > ./logs/broker.log &

查看启动结果
在这里插入图片描述

6.9 RocketMQ控制台的安装RocketMQ-Console
Rocketmq 控制台可以可视化MQ的消息发送!

旧版本源码是在rocketmq-external里的rocketmq-console,新版本已经单独拆分成dashboard

网址: https://github.com/apache/rocketmq-dashboard

下载地址:

https://github.com/apache/rocketmq-dashboard/archive/refs/tags/rocketmq-dashboard-1.0.0.zip

下载后解压出来,在跟目录下执行

mvn clean package -Dmaven.test.skip=true
在这里插入图片描述在这里插入图片描述

将jar包上传到服务器上去
在这里插入图片描述

然后运行

nohup java -jar ./rocketmq-dashboard-1.0.0.jar > ./rocketmq-4.9.2/logs/dashboard.log &

命令拓展:–server.port指定运行的端口

–rocketmq.config.namesrvAddr=127.0.0.1:9876 指定namesrv地址

访问: http://localhost:8001

运行访问端口是8001,如果从官网拉下来打包的话,默认端口是8080

nohup java -jar rocketmq-dashboard-1.0.0.jar --server.port=8081 --rocketmq.config.namesrvAddr=47.100.238.122:9876 > rocketmq-dashboard.log &
在这里插入图片描述

7.RocketMQ安装之docker

7.1 下载RockerMQ需要的镜像
docker pull rocketmqinc/rocketmq

docker pull styletang/rocketmq-console-ng

7.2 启动NameServer服务
7.2.1 创建NameServer数据存储路径
mkdir -p /home/rocketmq/data/namesrv/logs /home/rocketmq/data/namesrv/store

7.2.2 启动NameServer容器
docker run -d --name rmqnamesrv -p 9876:9876 -v /home/rocketmq/data/namesrv/logs:/root/logs -v /home/rocketmq/data/namesrv/store:/root/store -e “MAX_POSSIBLE_HEAP=100000000” rocketmqinc/rocketmq sh mqnamesrv

7.3 启动Broker服务
7.3.1 创建Broker数据存储路径
mkdir -p /home/rocketmq/data/broker/logs /home/rocketmq/data/broker/store

7.3.2 创建conf配置文件目录
mkdir /home/rocketmq/conf

7.3.3 在配置文件目录下创建broker.conf配置文件

# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
#broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
#0表示Master,大于0表示不同的slave
brokerId = 0
#表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
#在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
#有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机制;
brokerRole = ASYNC_MASTER
#刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 设置broker节点所在服务器的ip地址
brokerIP1 = 你服务器外网ip

7.3.4 启动Broker容器

docker run -d  --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 -v  /home/rocketmq/data/broker/logs:/root/logs -v /home/rocketmq/data/broker/store:/root/store -v /home/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf --privileged=true -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf

7.4 启动控制台

docker run -d --name rmqadmin -e "JAVA_OPTS=-Drocketmq.namesrv.addr=你的外网地址:9876 \

-Dcom.rocketmq.sendMessageWithVIPChannel=false \

-Duser.timezone='Asia/Shanghai'" -v /etc/localtime:/etc/localtime -p 9999:8080 styletang/rocketmq-console-ng

7.5 正常启动后的docker ps
在这里插入图片描述

7.6 访问控制台
http://你的服务器外网ip:9999/
在这里插入图片描述

8.RocketMQ快速入门

RocketMQ提供了发送多种发送消息的模式,例如同步消息,异步消息,顺序消息,延迟消息,事务消息等,我们一一学习

8.1 消息发送和监听的流程
我们先搞清楚消息发送和监听的流程,然后我们在开始敲代码

8.1.1 消息生产者
1.创建消息生产者producer,并制定生产者组名

2.指定Nameserver地址

3.启动producer

4.创建消息对象,指定主题Topic、Tag和消息体等

5.发送消息

6.关闭生产者producer

8.1.2 消息消费者
1.创建消费者consumer,制定消费者组名

2.指定Nameserver地址

3.创建监听订阅主题Topic和Tag等

4.处理消息

5.启动消费者consumer

8.2 搭建Rocketmq-demo
8.2.1 加入依赖

<dependencies>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.2</version>

        <!--docker的用下面这个版本-->

<version>4.4.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.22</version>
    </dependency>
</dependencies>

8.2.2 编写生产者

/**
 * 测试生产者
 *
 * @throws Exception
 */
@Test
public void testProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    for (int i = 0; i < 10; i++) {
        // 创建消息
        // 第一个参数:主题的名字
        // 第二个参数:消息内容
        Message msg = new Message("TopicTest", ("Hello RocketMQ " + i).getBytes());
        SendResult send = producer.send(msg);
        System.out.println(send);
    }
    // 关闭实例
    producer.shutdown();
}

8.2.3 编写消费者

/**
     * 测试消费者
     *
     * @throws Exception
     */
    @Test
    public void testConsumer() throws Exception {
        // 创建默认消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
        // 设置nameServer地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅一个主题来消费   *表示没有过滤参数 表示这个主题的任何消息
        consumer.subscribe("TopicTest", "*");
        // 注册一个消费监听 MessageListenerConcurrently 是多线程消费,默认20个线程,可以参看consumer.setConsumeThreadMax()
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + "----" + msgs);
                // 返回消费的状态 如果是CONSUME_SUCCESS 则成功,若为RECONSUME_LATER则该条消息会被重回队列,重新被投递
                // 重试的时间为messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                // 也就是第一次1s 第二次5s 第三次10s  ....  如果重试了18次 那么这个消息就会被终止发送给消费者
//                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        // 这个start一定要写在registerMessageListener下面
        consumer.start();
        System.in.read();
    }

8.2.4 测试

启动生产者和消费者进行测试

9.RocketMQ发送同步消息*

上面的快速入门就是发送同步消息,发送过后会有一个返回值,也就是mq服务器接收到消息后返回的一个确认,这种方式非常安全,但是性能上并没有这么高,而且在mq集群中,也是要等到所有的从机都复制了消息以后才会返回,所以针对重要的消息可以选择这种方式
在这里插入图片描述

10.RocketMQ发送异步消息*

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知

10.1 异步消息生产者

@Test
public void testAsyncProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    Message msg = new Message("TopicTest", ("异步消息").getBytes());
    producer.send(msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("发送成功");
        }
        @Override
        public void onException(Throwable e) {
            System.out.println("发送失败");
        }
    });
    System.out.println("看看谁先执行");
    // 挂起jvm 因为回调是异步的不然测试不出来
    System.in.read();
    // 关闭实例
    producer.shutdown();
}

10.2 异步消息消费者

@Test
public void testAsyncConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费   *表示没有过滤参数 表示这个主题的任何消息
    consumer.subscribe("TopicTest", "*");
    // 注册一个消费监听 MessageListenerConcurrently是并发消费
    // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            // 这里执行消费的代码 默认是多线程消费
            System.out.println(Thread.currentThread().getName() + "----" + msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

11.RocketMQ发送单向消息*

这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送

11.1 单向消息生产者

@Test
public void testOnewayProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    Message msg = new Message("TopicTest", ("单向消息").getBytes());
    // 发送单向消息
    producer.sendOneway(msg);
    // 关闭实例
    producer.shutdown();
}

11.2 单向消息消费者

消费者和上面一样

12.RocketMQ发送延迟消息*

消息放入mq后,过一段时间,才会被监听到,然后消费

比如下订单业务,提交了一个订单就可以发送一个延时消息,30min后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

12.1 延迟消息生产者

@Test
public void testDelayProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    Message msg = new Message("TopicTest", ("延迟消息").getBytes());
    // 给这个消息设定一个延迟等级
    // messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    msg.setDelayTimeLevel(3);
    // 发送单向消息
    producer.send(msg);
    // 打印时间
    System.out.println(new Date());
    // 关闭实例
    producer.shutdown();
}

12.2 延迟消息消费者

消费者和上面一样

这里注意的是RocketMQ不支持任意时间的延时

只支持以下几个固定的延时等级,等级1就对应1s,以此类推,最高支持2h延迟

private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;

13.RocketMQ发送顺序消息**

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为:分区有序或者全局有序。

可能大家会有疑问,mq不就是FIFO吗?

rocketMq的broker的机制,导致了rocketMq会有这个问题

因为一个broker中对应了四个queue
在这里插入图片描述

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

下面用订单进行分区有序的示例。一个订单的顺序流程是:下订单、发短信通知、物流、签收。订单顺序号相同的消息会被先后发送到同一个队列中,消费时,同一个顺序获取到的肯定是同一个队列。

13.1 场景分析
模拟一个订单的发送流程,创建两个订单,发送的消息分别是

订单号111 消息流程 下订单->物流->签收

订单号112 消息流程 下订单->物流->拒收

13.2 创建一个订单对象

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
    /**
     * 订单id
     */
    private Integer orderId;

    /**
     * 订单编号
     */
    private Integer orderNumber;

    /**
     * 订单价格
     */
    private Double price;

    /**
     * 订单号创建时间
     */
    private Date createTime;

    /**
     * 订单描述
     */
    private String desc;

}

13.3 顺序消息生产者

@Test
public void testOrderlyProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    List<Order> orderList = Arrays.asList(
            new Order(1, 111, 59D, new Date(), "下订单"),
            new Order(2, 111, 59D, new Date(), "物流"),
            new Order(3, 111, 59D, new Date(), "签收"),
            new Order(4, 112, 89D, new Date(), "下订单"),
            new Order(5, 112, 89D, new Date(), "物流"),
            new Order(6, 112, 89D, new Date(), "拒收")
    );
    // 循环集合开始发送
    orderList.forEach(order -> {
        Message message = new Message("TopicTest", order.toString().getBytes());
        try {
            // 发送的时候 相同的订单号选择同一个队列
            producer.send(message, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    // 当前主题有多少个队列
                    int queueNumber = mqs.size();
                    // 这个arg就是后面传入的 order.getOrderNumber()
                    Integer i = (Integer) arg;
                    // 用这个值去%队列的个数得到一个队列
                    int index = i % queueNumber;
                    // 返回选择的这个队列即可 ,那么相同的订单号 就会被放在相同的队列里 实现FIFO了
                    return mqs.get(index);
                }
            }, order.getOrderNumber());
        } catch (Exception e) {
            System.out.println("发送异常");
        }
    });
    // 关闭实例
    producer.shutdown();
}

13.4 顺序消息消费者,测试时等一会即可有延迟

@Test
public void testOrderlyConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费   *表示没有过滤参数 表示这个主题的任何消息
    consumer.subscribe("TopicTest", "*");
    // 注册一个消费监听 MessageListenerOrderly 是顺序消费 单线程消费
    consumer.registerMessageListener(new MessageListenerOrderly() {
        @Override
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            MessageExt messageExt = msgs.get(0);
            System.out.println(new String(messageExt.getBody()));
            return ConsumeOrderlyStatus.SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

14.RocketMQ发送批量消息

Rocketmq可以一次性发送一组消息,那么这一组消息会被当做一个消息消费

14.1 批量消息生产者

@Test
public void testBatchProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    List<Message> msgs = Arrays.asList(
            new Message("TopicTest", "我是一组消息的A消息".getBytes()),
            new Message("TopicTest", "我是一组消息的B消息".getBytes()),
            new Message("TopicTest", "我是一组消息的C消息".getBytes())

    );
    SendResult send = producer.send(msgs);
    System.out.println(send);
    // 关闭实例
    producer.shutdown();
}

14.2 批量消息消费者

@Test
public void testBatchConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费   表达式,默认是*
    consumer.subscribe("TopicTest", "*");
    // 注册一个消费监听 MessageListenerConcurrently是并发消费
    // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            // 这里执行消费的代码 默认是多线程消费
            System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

15.RocketMQ发送事务消息****

15.1 事务消息的发送流程
它可以被认为是一个两阶段的提交消息实现,以确保分布式系统的最终一致性。事务性消息确保本地事务的执行和消息的发送可以原子地执行。
在这里插入图片描述在这里插入图片描述

上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。

事务消息发送及提交

  1. 发送消息(half消息)。

  2. 服务端响应消息写入结果。

  3. 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

  4. 根据本地事务状态执行Commit或Rollback(Commit操作生成消息索引,消息对消费者可见)

事务补偿

  1. 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

  2. Producer收到回查消息,检查回查消息对应的本地事务的状态

  3. 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息UNKNOW或者Rollback发生超时或者失败的情况。

事务消息状态

事务消息共有三种状态,提交状态、回滚状态、中间状态:

l TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。

l TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。

l TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

15.2 事务消息生产者

/**
 * TransactionalMessageCheckService的检测频率默认1分钟,可通过在broker.conf文件中设置transactionCheckInterval的值来改变默认值,单位为毫秒。
 * 从broker配置文件中获取transactionTimeOut参数值。
 * 从broker配置文件中获取transactionCheckMax参数值,表示事务的最大检测次数,如果超过检测次数,消息会默认为丢弃,即回滚消息。
 *
 * @throws Exception
 */
@Test
public void testTransactionProducer() throws Exception {
    // 创建一个事务消息生产者
    TransactionMQProducer producer = new TransactionMQProducer("test-group");
    producer.setNamesrvAddr("localhost:9876");
    // 设置事务消息监听器
    producer.setTransactionListener(new TransactionListener() {
        // 这个是执行本地业务方法
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            System.out.println(new Date());
            System.out.println(new String(msg.getBody()));
            // 这个可以使用try catch对业务代码进行性包裹
            // COMMIT_MESSAGE 表示允许消费者消费该消息
            // ROLLBACK_MESSAGE 表示该消息将被删除,不允许消费
            // UNKNOW表示需要MQ回查才能确定状态 那么过一会 代码会走下面的checkLocalTransaction(msg)方法
            return LocalTransactionState.UNKNOW;
        }

        // 这里是回查方法 回查不是再次执行业务操作,而是确认上面的操作是否有结果
        // 默认是1min回查 默认回查15次 超过次数则丢弃打印日志 可以通过参数设置
        // transactionTimeOut 超时时间
        // transactionCheckMax 最大回查次数
        // transactionCheckInterval 回查间隔时间单位毫秒
        // 触发条件
        // 1.当上面执行本地事务返回结果UNKNOW时,或者下面的回查方法也返回UNKNOW时 会触发回查
        // 2.当上面操作超过20s没有做出一个结果,也就是超时或者卡主了,也会进行回查
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            System.err.println(new Date());
            System.err.println(new String(msg.getBody()));
            // 这里
            return LocalTransactionState.UNKNOW;
        }
    });
    producer.start();
    Message message = new Message("TopicTest2", "我是一个事务消息".getBytes());
    // 发送消息
    producer.sendMessageInTransaction(message, null);
    System.out.println(new Date());
    System.in.read();
}

15.3 事务消息消费者

@Test
public void testTransactionConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费   *表示没有过滤参数 表示这个主题的任何消息
    consumer.subscribe("TopicTest2", "*");
    // 注册一个消费监听 MessageListenerConcurrently是并发消费
    // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            // 这里执行消费的代码 默认是多线程消费
            System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

15.4 测试结果
在这里插入图片描述

16.RocketMQ发送带标签的消息,消息过滤

Rocketmq提供消息过滤功能,通过tag或者key进行区分

我们往一个主题里面发送消息的时候,根据业务逻辑,可能需要区分,比如带有tagA标签的被A消费,带有tagB标签的被B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,我们也需要通过过滤才区别对待

16.1 标签消息生产者

@Test
public void testTagProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    Message msg = new Message("TopicTest","tagA", "我是一个带标记的消息".getBytes());
    SendResult send = producer.send(msg);
    System.out.println(send);
    // 关闭实例
    producer.shutdown();
}

16.2 标签消息消费者

@Test
public void testTagConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费   表达式,默认是*,支持"tagA || tagB || tagC" 这样或者的写法 只要是符合任何一个标签都可以消费
    consumer.subscribe("TopicTest", "tagA || tagB || tagC");
    // 注册一个消费监听 MessageListenerConcurrently是并发消费
    // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            // 这里执行消费的代码 默认是多线程消费
            System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
            System.out.println(msgs.get(0).getTags());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

17.RocketMQ中消息的Key(业务相关)

在rocketmq中的消息,默认会有一个messageId当做消息的唯一标识,我们也可以给消息携带一个key,用作唯一标识或者业务标识,包括在控制面板查询的时候也可以使用messageId或者key来进行查询
在这里插入图片描述

17.1 带key消息生产者

@Test
public void testKeyProducer() throws Exception {
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("test-group");
    // 设置nameServer地址
    producer.setNamesrvAddr("localhost:9876");
    // 启动实例
    producer.start();
    Message msg = new Message("TopicTest","tagA","key", "我是一个带标记和key的消息".getBytes());
    SendResult send = producer.send(msg);
    System.out.println(send);
    // 关闭实例
    producer.shutdown();
}

17.2 带key消息消费者

@Test
public void testKeyConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费   表达式,默认是*,支持"tagA || tagB || tagC" 这样或者的写法 只要是符合任何一个标签都可以消费
    consumer.subscribe("TopicTest", "tagA || tagB || tagC");
    // 注册一个消费监听 MessageListenerConcurrently是并发消费
    // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax()
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            // 这里执行消费的代码 默认是多线程消费
            System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody()));
            System.out.println(msgs.get(0).getTags());
            System.out.println(msgs.get(0).getKeys());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}
 

在这里插入图片描述

18.RocketMQ重试机制

18.1 生产者重试

// 失败的情况重发3次

producer.setRetryTimesWhenSendFailed(3);

// 消息在1S内没有发送成功,就会重试

producer.send(msg, 1000);

18.2 消费者重试
在消费者放return ConsumeConcurrentlyStatus.RECONSUME_LATER;后就会执行重试

上图代码中说明了,我们再实际生产过程中,一般重试5-7次,如果还没有消费成功,则可以把消息签收了,通知人工等处理

messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

/**
 * 测试消费者
 *
 * @throws Exception
 */
@Test
public void testConsumer() throws Exception {
    // 创建默认消费者组
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    // 设置nameServer地址
    consumer.setNamesrvAddr("localhost:9876");
    // 订阅一个主题来消费   *表示没有过滤参数 表示这个主题的任何消息
    consumer.subscribe("TopicTest", "*");
    // 注册一个消费监听
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                        ConsumeConcurrentlyContext context) {
            try {
                // 这里执行消费的代码
                System.out.println(Thread.currentThread().getName() + "----" + msgs);
                // 这里制造一个错误
                int i = 10 / 0;
            } catch (Exception e) {
                // 出现问题 判断重试的次数
                MessageExt messageExt = msgs.get(0);
                // 获取重试的次数 失败一次消息中的失败次数会累加一次
                int reconsumeTimes = messageExt.getReconsumeTimes();
                if (reconsumeTimes >= 3) {
                    // 则把消息确认了,可以将这条消息记录到日志或者数据库 通知人工处理
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } else {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

19.RocketMQ死信消息

当消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列

19.1 消息生产者

@Test
public void testDeadMsgProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("dead-group");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    Message message = new Message("dead-topic", "我是一个死信消息".getBytes());
    producer.send(message);
    producer.shutdown();
}

19.2 消息消费者

@Test
public void testDeadMsgConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");
    consumer.setNamesrvAddr("localhost:9876");
    consumer.subscribe("dead-topic", "*");
    // 设置最大消费重试次数 2 次
    consumer.setMaxReconsumeTimes(2);
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println(msgs);
            // 测试消费失败
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    });
    consumer.start();
    System.in.read();
}

19.3 信消费者

注意权限问题 perm 2读 4写 6读写
在这里插入图片描述

@Test
public void testDeadMq() throws  Exception{
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("dead-group");
    consumer.setNamesrvAddr("localhost:9876");
    // 消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列
    // 队列名称 默认是 %DLQ% + 消费者组名
    consumer.subscribe("%DLQ%dead-group", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.println(msgs);
            // 处理消息 签收了
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

19.4 控制台显示
在这里插入图片描述在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/218881.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Cyanine7-NHS ester荧光染料的化学结构、光谱性质和荧光特性

Cyanine7-NHS ester的结构包括一个靛菁环结构和一个NHS ester活性基团。NHS ester官能团是一种活化基团&#xff0c;用于将染料共价结合到含有游离氨基官能团的生物分子上。 **光谱性质&#xff1a;**Cyanine7-NHS ester的光谱性质通常包括&#xff1a; **激发波长&#xff08…

如何利用MES系统加强对仓库的管理

相比于ERP对库存数量的统计查看&#xff0c;MES系统对于仓库的管理则更加具体。在这个快速变革的时代&#xff0c;仓库管理对于企业的运营效率和客户满意度至关重要&#xff0c;单靠ERP系统已经很难应对新的挑战&#xff0c;所以为了提高仓库管理的效率和准确性&#xff0c;许多…

Twincat功能块使用经验总结

控制全局变量&#xff1a; //轴控制指令 bi_Power: BOOL; //使能 bi_Reset: BOOL; //复位 bi_Stop: BOOL; //停止 bi_JogForward: BOOL; //正向点动 bi_JogBackwards: BOOL; //反向点动 bi_MoveAdditive: BOOL; //增量位…

Java 数据结构篇-二叉树的深度优先遍历(实现:递归方式、非递归方式)

&#x1f525;博客主页&#xff1a; 【小扳_-CSDN博客】 ❤感谢大家点赞&#x1f44d;收藏⭐评论✍ 文章目录 1.0 二叉树的说明 1.1 二叉树的实现 2.0 二叉树的优先遍历说明 3.0 用递归方式实现二叉树遍历 3.1 用递归方式实现遍历 - 前序遍历 3.2 用递归方式实现遍历 - 中序遍…

RabbitMq整合Springboot超全实战案例+图文演示+源码自取

目录 介绍 简单整合 简单模式 定义 代码示例 work模式 定义 代码示例 pubsub模式 定义 代码示例 routing模式 定义 代码示例 top模式 定义 代码 下单付款加积分示例 介绍 代码 可靠性投递示例 介绍 代码 交换机投递确认回调 队列投递确认回调 ​延迟消…

创作涌动·CSDN·21天创作挑战赛·第三期,正式开启报名!

​ 文章目录 ⭐️ 活动介绍⭐️ 活动详情⭐️ 活动奖品⭐️ 活动流程⭐️ 评审规则⭐️ 报名&投稿事项⭐️ 关于活动组织 活动报名地址&#xff08;点击跳转&#xff09; 本次活动与官方活动及其他博主的创作型活动并不不冲突&#xff01; ⭐️ 活动介绍 亲爱的小伙伴们&a…

树莓派Python程序开机自启动(Linux下Python程序开机自启动)

前一阵子用python编写了一个驱动I2C程序读写屏幕&#xff0c;输出IP的小程序&#xff0c;程序编好后需要树莓派使能程序开机自启动。其实这些方法对任何Linux系统都适用。 方法一&#xff1a;此方法的缺点是不进入默认pi的账号&#xff0c;甚至不开hdmi开启桌面的话&#xff0…

连夜整理的6个开源项目,都很实用

偶然找到的这个宝藏网站&#xff0c;站内集齐了大量的开源项目。 推荐实用的项目 1、vueNextAdmin 基于 vue3.x CompositionAPI setup 语法糖 typescript vite element plus vue-router-next pinia 技术&#xff0c;适配手机、平板、pc 的后台开源免费模板&#xff0c;…

使用K-means把人群分类

1.前言 K-mean 是无监督的聚类算法 算法分类&#xff1a; 2.实现步骤 1.数据加工&#xff1a;把数据转为全数字&#xff08;比如性别男女&#xff0c;转换为0 和 1&#xff09; 2.模型训练 fit 3.预测 3.代码 原数据类似这样(source&#xff1a;http:img-blog.csdnimg.cn…

06 数仓平台MaxWell

Maxwell简介 Maxwell是由Zendesk公司开源&#xff0c;用 Java 编写的MySQL变更数据抓取软件&#xff0c;能实时监控 MySQL数据库的CRUD操作将变更数据以 json 格式发送给 Kafka等平台。 Maxwell输出数据格式 Maxwell 原理 Maxwell工作原理是实时读取MySQL数据库的二进制日志…

Kubernetes(K8s)数据存储-09

数据存储 在前面已经提到&#xff0c;容器的生命周期可能很短&#xff0c;会被频繁地创建和销毁。那么容器在销毁时&#xff0c;保存在容器中的数据也会被清除。这种结果对用户来说&#xff0c;在某些情况下是不乐意看到的。为了持久化保存容器的数据&#xff0c;kubernetes引…

idea利用spring框架整合thymeleaf展现数据库数据

idea初步利用thymeleaf展现列表 上一篇文章简单展现自己写的列表&#xff1b; 这篇文章连接mysql数据库实现数据库数据展现 主要三个文件 controller指定html界面 package com.example.appledemo.controller;import com.example.appledemo.mapper.UserMapper; import com.exam…

“名创优品小动物保护公益基金”项目成立,捐赠1000万助力美好生活

近日&#xff0c;名创优品宣布捐赠1000万元成立“名创优品小动物保护公益基金”项目&#xff0c;将通过专业、关爱、共生的公益理念和行动&#xff0c;助力构建良好的社区生态和美好生活方式&#xff0c;与年轻一代探索中国公益创新发展。 名创优品捐赠1000万元成立“名创优品小…

操作系统概论:揭秘计算机背后的神秘力量

操作系统概论 & 功能 概述定义操作系统功能作为系统资源的管理者向上层提供方便易用的服务作为最接近硬件的层次 主页传送门&#xff1a;&#x1f4c0; 传送 概述 概念&#xff1a; 定义 控制和管理计算机硬件和软件资源的程序一种系统软件为上层用户、应用程序提供简单易…

钉钉员工组织资料实时同步至飞书的应用解析

如何实现应用之间的同步&#xff1f; 随着企业应用的日益增多&#xff0c;在帮助企业提供办公效率的同时&#xff0c;也增加了对这些应用的运维成本。有没有一种好的办法&#xff0c;实现saas应用之间的桥梁搭建&#xff0c;自动化地完成不同应用之间的数据流转呢&#xff1f;…

C语言枚举详解,typedef简介(能看懂文字就能明白系列)

系列文章目录 C语言基础专栏 笔记详解 &#x1f31f; 个人主页&#xff1a;古德猫宁- &#x1f308; 信念如阳光&#xff0c;照亮前行的每一步 文章目录 系列文章目录&#x1f308; *信念如阳光&#xff0c;照亮前行的每一步* 前言一、枚举类型的声明枚举常量三、枚举类型的优…

智能指针及强相关知识经验总结 --- 移动语义、引用计数、循环引用、move()、自定义删除器等

目录 前言 一、shared_ptr 1. 基本用法和构造方法 2. 引用计数机制 3. weak_ptr 解决循环引用 二、unique_ptr 1. 基本用法和构造方法 2. 独占性 3. 所有权转移 1&#xff09;unique_ptr :: release() 2&#xff09;移动语义 和 move() 三、 对比 shared_ptr 和 un…

Dropwizard-metric的使用

背景 近期在开发中用到了dropwizard-metric作为监控metric的埋点框架&#xff0c;由于是分布式的系统&#xff0c;前期曾经对比过hadoop-metric的实现和dropwizard-metric的实现&#xff0c;因为开发的项目后续会和hadoop的项目有一定的上下游关系&#xff0c;所以考虑排除掉h…

Re 花指令学习

概念 花指令又名垃圾代码、脏字节&#xff0c;英文名是junk code。花指令就是在不影响程序运行的情况下&#xff0c;往真实代码中插入一些垃圾代码&#xff0c;从而影响反汇编器的正常运行&#xff1b;或是起到干扰逆向分析人员的静态分析&#xff0c;增加分析难度和分析时间。…

开发的客户收到样品表示质量不如原供应商如何应对

有小伙伴问&#xff0c;在开发客户的过程当中&#xff0c;给客户寄了样品&#xff0c;客户说他的样品没有原来供应商的好怎么办&#xff1f; 这个问题我们来想一下&#xff0c;客户既然愿意把地址给我们&#xff0c;愿意去接你的样品&#xff0c;说明什么&#xff1f;说明客户…