RocketMQ(三):集成SpringBoot

RocketMQ系列文章

RocketMQ(一):基本概念和环境搭建

RocketMQ(二):原生API快速入门

RocketMQ(三):集成SpringBoot


目录

  • 一、搭建环境
  • 二、不同类型消息
    • 1、同步消息
    • 2、异步消息
    • 3、单向消息
    • 4、延迟消息
    • 5、顺序消息
    • 6、带tag消息
    • 7、带key消息

一、搭建环境

  • 需要创建两个服务,消息生产服务和消息消费者服务
  • 生产消息存在多个服务,消费则统一由一个服务处理
  • 这样可以做到解耦

pom.xml

  • 生产者和消费者都需要
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.2</version>
</dependency>

生产者配置文件

  • 设置统一的生产者组,这样发送消息时就不用指定了
rocketmq:
    name-server: 127.0.0.1:9876     # rocketMq的nameServer地址
    producer:
        group: boot-producer-group        # 生产者组别
        send-message-timeout: 3000  # 消息发送的超时时间
        retry-times-when-send-async-failed: 2  # 异步消息发送失败重试次数
        max-message-size: 4194304       # 消息的最大长度

生产者配置文件

  • 不能设置统一的消费者组,因为不同的消费者订阅关系不一致,需要设置不同的消费者组
rocketmq:
    name-server: localhost:9876

二、不同类型消息

直接引入即可

@Autowired
private RocketMQTemplate rocketMQTemplate;

1、同步消息

生产消息

  • 消息由消费者发送到broker后,会得到一个确认,是具有可靠性的
  • 比如:重要的消息通知,短信通知等
rocketMQTemplate.syncSend("bootTestTopic", "我是boot的一个消息");

消费消息

  • RocketMQListener的泛型类型即消息类型
    • MessageExt类型是消息的所有内容
    • 其他类型则就只是消息体内容,没有消息头内容(keys、msgId、延迟时间、重试次数、主题名称...)
  • onMessage方法内没有报错就是签收了,报错就是拒收会重试
@Component
@RocketMQMessageListener(topic = "bootTestTopic", consumerGroup = "boot-test-consumer-group")
public class ABootSimpleMsgListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        System.out.println(new String(message.getBody()));
    }
}

2、异步消息

  • 发送异步消息,发送完以后会有一个异步通知
  • 不影响程序往下执行
rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一个异步消息", new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        System.out.println("成功");
    }
    @Override
    public void onException(Throwable throwable) {
        System.out.println("失败" + throwable.getMessage());
    }
});

3、单向消息

  • 不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险
  • 例如日志信息的发送
rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息");

4、延迟消息

  • RocketMQ不支持任意时间的延时
  • 只支持以下18个固定的延时等级,等级1就对应1s,以此类推,最高支持2h延迟
  • private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
  • 发送一个延时消息,延迟等级为4级,也就是30s后被监听消费
Message<String> msg = MessageBuilder.withPayload("我是一个延迟消息").build();
rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 4);

5、顺序消息

生产消息

  • 根据syncSendOrderly方法的第三个参数计算hash值决定消息放入哪个队列
// 顺序消息 发送者放 需要将一组消息 都发在同一个队列中去  消费者 需要单线程消费
List<MsgModel> msgModels = Arrays.asList(
        new MsgModel("qwer", 1, "下单"),
        new MsgModel("qwer", 1, "短信"),
        new MsgModel("qwer", 1, "物流"),

        new MsgModel("zxcv", 2, "下单"),
        new MsgModel("zxcv", 2, "短信"),
        new MsgModel("zxcv", 2, "物流")
);
msgModels.forEach(msgModel -> {
    // 发送  一般都是以json的方式进行处理
    // 根据第三个参数计算hash值决定消息放入哪个队列
    rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());
});

消费消息

  • 默认是并发消费模式,可以设置为单线程顺序模式
  • 设置消费重试次数
@Component
@RocketMQMessageListener(topic = "bootOrderlyTopic",
        consumerGroup = "boot-orderly-consumer-group",
        consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程
        maxReconsumeTimes = 5 // 消费重试的次数
)
public class BOrderlyMsgListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);
        System.out.println(msgModel);
    }
}

6、带tag消息

  • tag带在主题后面用:来携带
rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");

7、带key消息

Message<String> message = MessageBuilder
        .withPayload("我是一个带key的消息")
        .setHeader(RocketMQHeaders.KEYS, "10086")
        .build();
rocketMQTemplate.syncSend("bootKeyTopic", message);

获取带key和tag的消费者

  • 过滤模式有两种:正则表达式和sql92方式
  • keys从MessageExt对象中获取
@Component
@RocketMQMessageListener(topic = "bootTagTopic",
        consumerGroup = "boot-tag-consumer-group",
        selectorType = SelectorType.TAG,// tag过滤模式
        selectorExpression = "tagA || tagB"
//        selectorType = SelectorType.SQL92,// sql92过滤模式
//        selectorExpression = "a in (3,5,7)" // broker.conf中开启enbalePropertyFilter=true
)
public class CTagMsgListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt message) {
        System.out.println("获取keys: " + message.getKeys());
        System.out.println("消息内容: " + new String(message.getBody()));
    }
}

查看源码

  • destination目标 = 主题 : 标签
  • keys从消息头里面获取

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/158933.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

关于python中内存分配的问题,运行一些操作可能会导致为新结果分配内存,用Python的id()函数演示

一、考虑背景&#xff1a; 一般在python中不会考虑像C中的内存问题&#xff0c;但是在一些高级应用中会考虑&#xff0c;例如有一个特别特别大的矩阵&#xff0c;最好不要不断的赋值&#xff0c;导致内存问题产生。 二、python中的id&#xff1a; 在python中有个id&#xff…

内网渗透之信息收集

目录 本机信息收集 查看系统配置信息 查看系统服务信息 查看系统登录信息 自动信息收集 域内信息收集 判断是否存在域 本机信息收集 查看系统配置信息 查看系统服务信息 查看系统登录信息 自动信息收集 域内信息收集 查看机器相关信息 查看用户相关信息 powershel…

【Java 进阶篇】深入浅出:JQuery 事件绑定的奇妙世界

在前端的世界里&#xff0c;事件是不可或缺的一部分。用户的点击、输入、滚动等行为都触发着各种事件&#xff0c;而如何在代码中捕捉并处理这些事件是每位前端开发者必须掌握的技能之一。本文将带你深入浅出&#xff0c;探索 JQuery 中的事件绑定&#xff0c;为你揭开这个奇妙…

ke11..--2其他界面也要提取我的locatStarage

获取浏览器里面的本地缓存 localStorage就是我们的浏览器缓存在哪都可以用 下面代码是获取打印到我们的页面上 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title> </head> …

【杂谈】-蓝牙低功耗数据传输模式比较

蓝牙低功耗数据传输模式比较 文章目录 蓝牙低功耗数据传输模式比较1、无连接数据传输2、无连接数据传输的优点3、无连接数据传输的局限性 3、面向连接的数据传输4、面向连接模式的优点5、面向连接模式的局限性6、家庭自动化项目的性能观察 物联网&#xff08;IoT&#xff09;设…

端口映射软件

今天给大家介绍一个自己制作的工具&#xff0c;本工具可以把本地自己的项目映射到外网可以访问,自己有域名可以使用自己的,没有可以用软件自带的三级域名! Token获取 地址&#xff1a;传送 打开上面网址注册账号&#xff0c;然后点击验证&#xff0c;复制里面的值即可。 软件…

报错:HikariPool-1 - Exception during pool initialization.

问题发现&#xff1a; 原本可以运行的springboot2项目突然无法运行且报错&#xff0c;HikariPool-1 - Exception during pool initialization。 问题分析&#xff1a; 观察报错信息发现是JDBC连接失败&#xff0c;进而搜索HikariPool-1&#xff0c;搜索得知应该是applicatio…

为什么鸿蒙调用弹窗组件(CommonDialog )却不展示或闪退?

鸿蒙OS开发问题 1.效果展示2.问题代码3.问题分析4.完整代码 1.效果展示 1.为什么调用弹窗不展示会闪退? 2.问题代码 1.前端代码: <?xml version"1.0" encoding"utf-8"?> <DirectionalLayoutxmlns:ohos"http://schemas.huawei.com/res/…

01线性回归

目录 常规求解&#xff1a; 矩阵求解 sklean算法求解 # 二元一次方程 # x y 14 # 2x - y 10 常规求解&#xff1a; x np.array([[1,1],[2,-1]])print(x) # [[ 1 1] # [ 2 -1]]y np.array([14, 10])w np.linalg.solve(x, y)print(正常求救&#xff1a;)print(w) …

契约锁助力货物进出口全程无纸化,加速通关、降低贸易成本

我国作为全球最大的制造业国家和最大的货物贸易国家&#xff0c;政府始终注重引入数字化技术&#xff0c;创新管理和服务模式&#xff0c;帮助降低企业进出口成本&#xff0c;加速货物流通。 近年国家海关总署、商务部、税务总局及各地政府在进出口“报关”、“提货”、“收货备…

Redis持久化策略之RDB与AOF

文章目录 1.RDB1)基本介绍2)自动触发3)手动触发4)RDB文件5)优点缺点 2.AOF1)基本介绍2)使用方式3)工作流程4)重写机制5)AOF文件6)优点缺点 3.RDB AOF 我们都知道&#xff0c;redis 是一个基于内存的数据库。基于内存的好处是访问速度快&#xff0c;缺点是“不持久”——当数据…

golang标准库-crc32的使用

1.概述 crc32实现了32位循环冗余检测算法的实现。目前crc32内部提供 了三种常用的多项式,采用查表法来提高计算checksum的效率。通过crc32.MakeTable()可以获取对应的表&#xff0c;crc32提供了一个IEETABLE可以直接使用&#xff0c;官方链接如下&#xff1a;crc32 package - h…

录屏软件无水印免费的有哪些?我来告诉你!

在日常生活和工作中&#xff0c;我们经常需要使用录屏软件来记录屏幕活动。然而&#xff0c;许多免费录屏软件在录制视频时会添加水印&#xff0c;给用户带来不便。那录屏软件无水印免费的都有哪些呢&#xff1f;在本文中&#xff0c;我们将介绍三款无水印的免费录屏软件&#…

如果面试也能这样说HashMap,那么就不会有那么多遗憾!

&#x1f44f;作者简介&#xff1a;大家好&#xff0c;我是爱吃芝士的土豆倪&#xff0c;24届校招生Java选手&#xff0c;很高兴认识大家&#x1f4d5;系列专栏&#xff1a;Spring源码、JUC源码&#x1f525;如果感觉博主的文章还不错的话&#xff0c;请&#x1f44d;三连支持&…

jenkins清理缓存命令

def jobName "yi-cloud-operation" //删除的项目名称 def maxNumber 300 // 保留的最小编号&#xff0c;意味着小于该编号的构建都将被删除 Jenkins.instance.getItemByFullName(jobName).builds.findAll { it.number < maxNumber }.each { it.delet…

SourceTree修改Git密码

SourceTree用的好好的&#xff0c;无奈公司隔段时间强制更改电脑密码。更改完成后SourceTree无法使用&#xff0c;重新输入密码。VS的nuget也是。查资料虽然也能比较快的解决&#xff0c;但是。。。。在此转载记录下。 1. 找到 SourceTree 配置文件所在目录 ‘userhosts’ 目录…

锐捷OSPF认证

一、知识补充 1、基本概述 OSPF区域认证和端口认证是两种不同的认证机制&#xff0c;用于增强OSPF协议的安全性。 OSPF区域认证&#xff08;OSPF Area Authentication&#xff09;&#xff1a;这种认证机制是基于区域的。在OSPF网络中&#xff0c;每个区域都可以配置一个区域…

HCL设备启动失败——已经解决

摸索了一个多小时&#xff0c;终于搞定了&#xff0c;首先HCL这款软件是需要安装Oracle VM Visual Box的&#xff0c;小伙伴们安装的时候记得点击安装Visual Box&#xff1b; 安装完后显示设备不能启动&#xff0c;然后我根据这个 HCL模拟器中Server设备启动失败的解决办法_hc…

Linux:常见指令

个人主页 &#xff1a; 个人主页 个人专栏 &#xff1a; 《数据结构》 《C语言》《C》 文章目录 前言一、常见指令ls指令pwd指令cd指令touch指令mkdir指令rmdir指令rm指令man指令cp指令mv指令cat指令tac指令echo指令more指令less指令head指令tail指令date显示Cal指令find指令gr…

黑马React18: 基础Part 1

黑马React: 基础1 Date: November 15, 2023 Sum: React介绍、JSX、事件绑定、组件、useState、B站评论 React介绍 概念: React由Meta公司研发&#xff0c;是一个用于 构建Web和原生交互界面的库 优势: 1-组件化的开发方式 2-优秀的性能 3-丰富的生态 4-跨平台开发 开发环境搭…