【Spring Boot】Spring Boot 集成 RocketMQ 实现简单的消息发送和消费

文章目录

  • 前言
  • 基本概念
    • 消息和主题相关
    • 发送普通消息
  • 发送顺序消息
  • RocketMQTemplate的API介绍
  • 参考资料:

前言

本文主要有以下内容:

  • 简单消息的发送
  • 顺序消息的发送
  • RocketMQTemplate的API介绍

环境搭建:
RocketMQ的安装教程:在官网上下载bin文件,解压到本地,并配置环境变量,如下图所示:
在这里插入图片描述

在 Spring boot 项目中引入 RocketMQ 依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

在application.yml增加相关配置:

server:
  port: 10001
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: springboot_produce_group # 必须指定group
    send-message-timeout: 3000 # 消息发送超时时长,默认3s
    retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
    retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2
  consumer:
    group: springboot_consumer_group

在 Spring Boot 中使用RocketMQ很简单直接注入RocketMQTemplate对象即可:

@Resource
private RocketMQTemplate rocketMQTemplate;

基本概念

消息和主题相关

消息 message:通信交互的载体,分为事务消息,半事务消息,延迟消息,顺序消息等。
主题 topic:一类消息的集合,逻辑概念。
队列 queue:主题由一个队列或者多个队列构成,当消息发送到某一个主题时,需要选择某一个队列。
偏移量 offset:消息追加到主题的队列后会分配一个数值,表示该队列的几条消息。
消费者相关:
消费组 consume group:消费组用于订阅主题消费消息,可以订阅多个主题,一个消费组可以有多个消费者。
广播模式:同一个消费组内的所有消费者都会消费订阅主题的所有消息。即一条消息会被该消费者组的所有消费者消费。
集群模式:同一个消费组内的所有消费者只消费订阅主题的一部分消息,即一条消息只会被改消费组的一个消费者消费。
并发消费:同一个队列的消息由多线程消费且不保证消息的顺序。
顺序消费:保证同一队列的消息按顺序消费。

发送普通消息

创建MsgController,代码如下:

@RestController
@RequestMapping("send/")
@CrossOrigin(allowedHeaders = "*", origins = "*")
@Slf4j
public class MsgController {
    @Resource
    private RocketMQTemplate rocketMQTemplate;
    @GetMapping("normal")
    public void sendNormalMsg() {
        Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ Normal_msg").build();
        rocketMQTemplate.send("normal_msg", msg);
    }
}

创建消息的消费者,只需要实现RocketMQListener接口中的方法即可,代码如下:

@Component
@RocketMQMessageListener(topic = "normal_msg", consumerGroup = "consumer_normal")
@Slf4j
public class NormalMsgConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("Receive Normal Msg: {}",message);
    }
}

@RocketMQMessageListener注解用在消费者类上,指定当前类消费的主题。

topic:指定消费者的主题 comsumerGroup:指定消费者组(Consumer Group)名称,用于区分不同的消费者。

启动项目,运行结果如下图所示:
在这里插入图片描述

发送顺序消息

顺序消息:保证同一队列的消息按顺序消费。
在MsgController 中添加如下代码:

@GetMapping("order")
public void sendOrderMsg(){
​
    log.info("开始发送顺序消息");
    for (int j = 0; j < 10; j++) {
        Message<String> sendOrderMsg = MessageBuilder.withPayload("Send Order Msg = " + j + " time: "+ LocalDateTime.now()).build();
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        rocketMQTemplate.convertAndSend("msg:order", sendOrderMsg);
    }
    log.info("顺序消息发送结束");
}

创建对应topic消息的消费者,代码如下所示:

@Component
@RocketMQMessageListener(topic = "msg",
        consumerGroup = "consumer_order_group",
        selectorExpression = "order",
        messageModel = MessageModel.CLUSTERING,
        selectorType = SelectorType.TAG)
@Slf4j
public class OrderMsgConsumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        log.info("Receive Order Msg: {}",message);
    }
}

@RocketMQMessageListener其他属性介绍:

  • selectorExpression: 消息选择表达式,用于过滤消息,只有满足表达式条件的消息才会被消费。默认值为 *,表示订阅所有消息。

全匹配:*,默认值。
属性匹配:指定tag = ‘tagName’,上面的代码就可以改写为"tag = ‘order’"
表达式匹配:需要指定selectType = SelectorType.SQL92,见下面。

  • selectorType:指明了消息选择通过tag的方式,默认值SelectorType.TAG。可选值有SelectorType.SQL92

TAG:支持"tagName"的方式配置,如果有多个标签则用||进行连接
SQL92:关键字有AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL。支持的数据类型有Boolean, String, Decimal, Float number等。使用方式如(a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)

  • messageModel:消息模式,可选值为 MessageModel.CLUSTERING(默认)或 MessageModel.BROADCASTING,分别表示集群模式和广播模式。

重新启动项目,运行结果如下图所示:
在这里插入图片描述

RocketMQTemplate的API介绍

在上面的api使用中,都没有去关注是否消息发送的状态,如是否成功,发送到了哪一个队列等。接下来就介绍一下相关API的使用

带返回值的发送普通消息SendResult syncSend(String destination, Message<?> message);

在MsgController添加如下代码:

@GetMapping("normal_result")
public void sendNormalResultMsg() {
    Message<String> msg = MessageBuilder.withPayload("normal_return_result").build();
    SendResult normalMsg = rocketMQTemplate.syncSend("normal_msg", msg);
    log.info("normalMsg = {}",normalMsg);
}

在这里插入图片描述

如log所示,可以看到发送状态等信息。

发送异步消息,在MsgController中添加如下代码:

@GetMapping("callback")
public void sendNormalResultMsgWithCallback(){
    Message<String> msg = MessageBuilder.withPayload("normal_return_result").build();
    rocketMQTemplate.asyncSend("normal_msg", msg, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("success");
        }
        @Override
        public void onException(Throwable throwable) {
            log.info("error");
        }
    });
}

运行结果如下所示:
在这里插入图片描述

发送顺序消息:在第二部分以及展示过了也可以用如下代码替换

rocketMQTemplate.convertAndSend("msg:order", sendOrderMsg);
// 替换为
rocketMQTemplate.syncSendOrderly("msg:order", sendOrderMsg,String.valueOf(j));

发送单向消息

@GetMapping("oneway")
public void  sendOneWay(){
    Message<String> oneWay = MessageBuilder.withPayload("Send Order Msg = " + " time: "+ LocalDateTime.now()).build();
    rocketMQTemplate.sendOneWay("normal_msg",oneWay);
}

运行结果如下图所示:
在这里插入图片描述

发送事务消息:暂不举例,后续补充
发送事务消息带回调:和syncSend()类似,后续补充相关用法。

参考资料:

  • 《RocketMQ 实战》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/62301.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

js-3:DOM常见的操作有哪些?

1、DOM 文档对象模型&#xff08;DOM&#xff09;是HTML和XML文档的编程接口。 它提供了对文档的结构化的表述&#xff0c;并定义了一种方式&#xff0c;可以使从程序中对该结构进行访问&#xff0c;从而改变文档的结构&#xff0c;样式跟内容。 任何HTML和XML文档都可以用DOM表…

解决object转换Date问题

文章目录 解决object转换Date问题源代码List<Object>转换List<Date>问题 解决object转换Date问题 源代码 /*** 解决object与Date转换问题*/Testpublic void t4() {Object o new Date();String formatDate DateFormatUtils.format((Date) o, com.alibaba.excel.u…

uniapp-疫情应急管理系统学生端

1 疫情资讯展示 <template><view class"container"><uni-section title"自定义卡片内容" type"line"><uni-card title"基础卡片" class"card-box" v-for"(item,index) in epidemicNewsList"…

CDN安全面临的问题及防御架构

CDN安全 SQL注入攻击&#xff08;各开发小组针对密码和权限的管理&#xff0c;和云安全部门的漏洞扫描和渗透测试&#xff09; Web Server的安全&#xff08;运营商和云安全部门或者漏洞纰漏第三方定期发布漏洞报告修复&#xff0c;例如&#xff1a;nginx版本号和nginx resol…

MONGODB ---- Austindatabases 历年文章合集

开头还是介绍一下群&#xff0c;如果感兴趣polardb ,mongodb ,mysql ,postgresql ,redis 等有问题&#xff0c;有需求都可以加群群内有各大数据库行业大咖&#xff0c;CTO&#xff0c;可以解决你的问题。加群请联系 liuaustin3 &#xff0c;在新加的朋友会分到2群&#xff08;共…

react中hooks的理解与使用

一、作用 我们知道react组件有两种写法一种是类组件&#xff0c;另一种是函数组件。而函数组件是无状态组件&#xff0c;如果我们要想改变组件中的状态就无法实现了。为此&#xff0c;在react16.8版本后官方推出hooks&#xff0c;用于函数组件更改状态。 二、常用API 1、use…

websocket+node实现直播(弱鸡版)

心血历程 这部分主要是写在写这些的时候遇到的问题以及换思路的过程&#xff0c;可以之间看正文 在之前我也写过直播功能&#xff0c;并且与websocket相结合实现了直播弹幕。只不过直播是使用的腾讯云的&#xff0c;而不是手写的直播推流拉流&#xff0c;这次又有一个新的项目…

【C# 基础精讲】C# 开发环境搭建(Visual Studio等)

安装C#开发环境是开始学习和使用C#编程的第一步。目前&#xff0c;最常用的C#开发环境是Microsoft Visual Studio&#xff0c;它是一套强大的集成开发环境&#xff08;IDE&#xff09;&#xff0c;提供了丰富的工具和功能&#xff0c;使开发C#应用程序变得更加便捷。以下是安装…

Redis 和 Mysql 如何保证数据一致性

项目场景&#xff1a; 一般情况下&#xff0c;Redis 用来实现应用和数据库之间读操作的缓存层&#xff0c;主要目的是减少数据库 IO&#xff0c;还可以提升数据的 IO 性能。 如下图所示&#xff0c;这是它的整体架构。 当应用程序需要去读取某个数据的时候&#xff0c;首先会先…

Linux性能学习(4.4):网络_TCP三次握手内核参数优化

文章目录 1 三次握手2 参数优化2.1 tcp_syn_retries--->SYN重传次数2.2 tcp_synack_retries--->ACK重传次数2.3 tcp_retries2--->发送数据失败重传次数2.4 TCP keepalive--->保活机制2.5 tcp_max_syn_backlog/somaxconn--->半/全连接队列长度2.6 tcp_syncookies…

离散化的两种实现方式【sort或者map】

离散化 定义 把无限空间中有限的个体映射到有限的空间中去&#xff0c;以此提高算法的时空效率。通俗的说&#xff0c;离散化是在不改变数据相对大小的条件下&#xff0c;对数据进行相应的缩小。 适用范围&#xff1a;数组中元素值域很大&#xff0c;但个数不是很多。 比如将…

PHP8的表达式-PHP8知识详解

表达式是 PHP 最重要的基石。在 PHP8中&#xff0c;几乎所写的任何东西都是一个表达式。简单但却最精确的定义一个表达式的方式就是"任何有值的东西"。 最基本的表达式形式是常量和变量。当键入"$a 5"&#xff0c;即将值"5"分配给变量 $a。&quo…

C++初阶——拷贝构造和运算符重载(const成员)

目录 1. 拷贝构造函数 1.2 拷贝构造函数特征&#xff1a; 2. 默认拷贝构造函数 2.1 未显式定义&#xff0c;编译器会生成默认的拷贝构造函数。 默认的拷贝构造函数对象按内存存储按字节序完成拷贝&#xff0c;这种拷贝叫做浅拷贝&#xff0c;或者值拷贝 3. 运算符重载 3.1…

Flink CEP(三)pattern动态更新

线上运行的CEP中肯定经常遇到规则变更的情况&#xff0c;如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景&#xff0c;如果规则窗口过长&#xff08;一两个星期&#xff09;&#xff0c;状态过大&#xff0c;就会导致重启…

vue-virtual-scroller的使用,展示巨量数据,长列表优化,虚拟列表

一、原理 计算显示区域的高度&#xff08;或宽度&#xff09; 和显示区域的起始位置&#xff08;scrollTop或scrollLeft&#xff09;根据每个元素的尺寸和总数目&#xff0c;计算出整个列表的高度&#xff08;或宽度&#xff09;显示区域的高度&#xff08;或宽度&#xff09…

基于Orangepi 3 lts 的云台相机

利用orangepi 3 lts 和arduino nano 制作了一个云台相机&#xff0c;可用于室内监控。 硬件&#xff1a; orangepi 3 ,arduino nano ,usb相机&#xff0c;180度舵机两个 WeChat_20230806213004 软件&#xff1a; 整体采用mqtt进行消息的中转。 相机采用python 利用opencv…

LeetCode每日一题Day5——21. 合并两个有序链表

✨博主&#xff1a;命运之光 &#x1f984;专栏&#xff1a;算法修炼之练气篇&#xff08;C\C版&#xff09; &#x1f353;专栏&#xff1a;算法修炼之筑基篇&#xff08;C\C版&#xff09; &#x1f433;专栏&#xff1a;算法修炼之练气篇&#xff08;Python版&#xff09; …

第三章 图论 No.2单源最短路之虚拟源点,状压最短路与最短路次短路条数

文章目录 1137. 选择最佳线路1131. 拯救大兵瑞恩1134. 最短路计数383. 观光 dp是特殊的最短路&#xff0c;是无环图&#xff08;拓扑图&#xff09;上的最短路问题 1137. 选择最佳线路 1137. 选择最佳线路 - AcWing题库 // 反向建图就行 #include <iostream> #include…

C++ 类型兼容规则

类型兼容规则是指在需要基类对象的任何地方&#xff0c;都可以使用公有派生类的对象来替代。 通过公有继承&#xff0c;派生类得到了基类中除构造函数和析构函数之外的所有成员。这样&#xff0c;公有派生类实际就具备了基类的所有功能&#xff0c;凡是基类能解决的问题&#x…

vcomp100.dll丢失怎样修复?总结三个修复方法

在使用Windows系统的电脑的过程中&#xff0c;我们有时会遇到一些错误提示&#xff0c;其中之一就是关于vcomp100.dll文件缺失或损坏的问题。我第一次看到这个错误提示时&#xff0c;我并不知道vcomp100.dll是什么文件&#xff0c;也不了解它在电脑中的作用。我猜测它可能是一个…