Springboot整合RocketMQ 基本消息处理

目录

1. 同步消息

2. 异步消息

3. 单向消息

4. 延迟消息

5. 批量消息

6. 顺序消息

 7. Tag过滤


导入依赖

       <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
        </dependency>

YAML配置

rocketmq:
  name-server: localhost:9876     # rocketMq的nameServer地址

1. 同步消息

同步消息是发送消息后等待Broker的响应,确保消息被成功接收。

生产者:

   @Autowired
    RocketMQTemplate rocketMQTemplate;

    @Test
    void contextLoads() {
        SendResult result = rocketMQTemplate.syncSend("test", MessageBuilder.withPayload("同步消息").build());
//        SendResult result = rocketMQTemplate.syncSend("test", "同步消息");
        System.out.println("发送状态:" + result.getSendStatus() + " 消息id:" + result.getMsgId());
    }

2. 异步消息

异步消息是发送消息后不等待Broker响应,通过回调函数处理发送结果。

@Autowired
    RocketMQTemplate rocketMQTemplate;

    @Test
    void contextLoads() {
        rocketMQTemplate.asyncSend("test", MessageBuilder.withPayload("异步消息").build(), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送成功"+sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("发送失败"+throwable);
            }
        });
    }

3. 单向消息

单向消息是发送消息后不等待Broker响应,也没有回调函数。

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    @Test
    void contextLoads() {
      rocketMQTemplate.sendOneWay("test","单向消息");
    }

4. 延迟消息

延迟消息是设置消息的延迟时间,确保消息在指定时间后才被消费。

 @Autowired
    RocketMQTemplate rocketMQTemplate;

    @Test
    void contextLoads() {
        //在RocketMQ中,timeout(超时时间)是指消息发送的最大等待时间。当你发送一个消息时,系统会等待一定的时间来获取发送结果,这个等待的时间就是超时时间。单位ms
        Message<String> message = MessageBuilder.withPayload("延迟消息").build();
         //延迟级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" 2对应5s
        SendResult result = rocketMQTemplate.syncSend("test", message, 2000, 2);
    }

5. 批量消息

批量消息是将多个消息打包成一个消息批次发送,提高发送效率。

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    @Test
    void contextLoads() {
        List<String> list = Arrays.asList("blue", "red", "pink", "yello");
        rocketMQTemplate.syncSend("test",list);
    }

上面所有生产者对应的消费者代码为:

@Component
@RocketMQMessageListener(topic = "test",consumerGroup = "test-group-consumer")
public class MQMsgListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        String msg = new String(message.getBody());
        System.out.println("消息id:"+msgId+"消息内容:"+msg);
    }
}

6. 顺序消息

顺序消息是保证同一个消息队列中的消息按顺序消费。

生产者代码:

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    @Test
    void contextLoads() {
        for(int i=0;i<10;i++)
        {
            rocketMQTemplate.syncSendOrderly("test","顺序消息"+i,"1");
        }
    }

消费者代码更改:

@Component
@RocketMQMessageListener(topic = "test",consumerGroup = "test-group-consumer",consumeMode = ConsumeMode.ORDERLY)
public class MQMsgListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        String msg = new String(message.getBody());
        System.out.println("消息id:"+msgId+"消息内容:"+msg);
    }
}

 7. Tag过滤

消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。

生产者

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    @Test
    void contextLoads() {
       rocketMQTemplate.syncSend("test:test","hello");
    }

消费者

@Component
@RocketMQMessageListener(topic = "test",consumerGroup = "test-group-consumer",selectorType = SelectorType.TAG,selectorExpression = "test")
public class MQMsgListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt message) {
        String msgId = message.getMsgId();
        String msg = new String(message.getBody());
        System.out.println("消息id:"+msgId+"消息内容:"+msg);
    }
}

 @RocketMQMessageListener 注解参数如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/291800.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CNN——AlexNet

1.AlexNet概述 论文原文&#xff1a;ImageNet Classification with Deep Convolutional Neural Networks 在LeNet提出后&#xff0c;卷积神经网络在计算机视觉和机器学习领域中很有名气。但卷积神经网络并没有主导这些领域。这是因为虽然LeNet在小数据集上取得了很好的效果&am…

保姆级教程:从0到1搭建web自动化测试环境

之前都是在linux上安装&#xff0c;第一次在windows上配置环境&#xff0c;加上距离上次配置环境有点久了&#xff0c;竟也花了点时间。特此记录下保姆级教程&#xff0c;给初学者一个有效的参考&#xff01; 一. 环境搭建 工具清单 工具工具名版本Java开发工具包JDK1.8浏览…

AI边缘计算智能分析网关V4如何配置周界入侵检测算法

旭帆科技的智能分析网关V4内含近40种智能分析算法&#xff0c;包括人体、车辆、消防、环境卫生、异常检测等等&#xff0c;在消防安全、生产安全、行为检测等场景应用十分广泛&#xff0c;如常见的智慧工地、智慧校园、智慧景区、智慧城管等等&#xff0c;还支持抓拍、记录、告…

C++第四天

定义一个Person类&#xff0c;私有成员int age&#xff0c;string &name&#xff0c;定义一个Stu类&#xff0c;包含私有成员double *score&#xff0c;写出两个类的构造函数、析构函数、拷贝构造和拷贝赋值函数&#xff0c;完成对Person的运算符重载(算术运算符、条件运算…

【DevOps-03】Build阶段-Maven安装配置

一、简要说明 下载安装JDK8下载安装Maven二、复制准备一台虚拟机 1、VM虚拟复制克隆一台机器 2、启动刚克隆的虚拟机,修改IP地址 刚刚克隆的虚拟机 ,IP地址和原虚拟的IP地址是一样的,需要修改克隆后的虚拟机IP地址,以免IP地址冲突。 # 编辑修改IP地址 $ vi /etc/sysconfig…

感觉软件测试很简单,但为何这么多劝退的?

上一个说软件测试简单的&#xff0c;已经被面试官问死了。。。 现在已经过了 ”不会但我会学“ 就能感动面试官的时代&#xff0c;随着供需关系的变化&#xff0c;不论是对于面试官还是面试者&#xff0c;面试的成本越来越高。为了筛选到更优秀的程序员&#xff0c;面试官们可谓…

iptables

iptables有多种功能&#xff0c;每一种功能都用一张表来实现 最常用的功能是防火墙和NAT 从RHEL7开始&#xff0c;默认的防火墙为firewalld&#xff0c;但是它的底层仍然调用iptables 安装iptables服务 # 关闭firewalld [rootnode1 ~]# systemctl stop firewalld [rootnode…

在Gradle工程中使用checkstyle来规范你的项目

&#x1f339;作者主页&#xff1a;青花锁 &#x1f339;简介&#xff1a;Java领域优质创作者&#x1f3c6;、Java微服务架构公号作者&#x1f604; &#x1f339;简历模板、学习资料、面试题库、技术互助 &#x1f339;文末获取联系方式 &#x1f4dd; 系列专栏目录 [Java项…

深度学习 | 多模态算法

AIGC也就是AI内容生成已经成为新一轮人工智能发展的热点和必然趋势&#xff0c;它使得大规模高质量的创作变得更加容易。 一 、InstructGPT模型 1、GPT系列回顾 chatGPT和InstructGPT都使用了指示学习和基于人工反馈的强化学习来指导模型的训练&#xff0c;不同点仅仅是在采集数…

ECharts配置个性化图表:圆环、立体柱状图

ECharts配置个性化图表&#xff1a;圆环、立体柱状图 圆环图双纵轴多数据面积图折柱混合图3D立体圆环饼图3D立体饼图参考文章 官网调试地址&#xff1a;点击跳转调试 圆环图 效果图&#xff1a; 配置&#xff1a; option {color: [#29BEFF, #A2DC00, #FFC400, #FF7F5C, #C…

c jpeg 编码解码验证数据

1. yuv420p 1616 像素点 384字节全部数据 把上面的384个char从左到右&#xff0c;从上到下的顺序输入文件&#xff0c;就能显示红绿蓝白4个水平条

SwiftUI之深入解析ContentUnavailableView的实战应用

一、基本用法 SwiftUI 引入了新的 ContentUnavailableView 类型&#xff0c;允许在应用程序中展示空状态、错误状态或任何其他内容不可用的状态。那么&#xff0c;如何使用 ContentUnavailableView 引导用户浏览应用程序中的空状态呢&#xff1f;首先看看 ContentUnavailableV…

python中的selenium安装的步骤(浏览器自动化测试框架)

一、前言 我们今天要安装的selenium 就是浏览器自动化测试框架&#xff0c;是一个用于Web应用程序的测试工具&#xff0c;就是模拟用户操作。支持的浏览器包括Chrome&#xff0c;IE&#xff0c;Mozilla Firefox&#xff0c;Safari&#xff0c;Opera等。今天我们以Chrome为例讲…

msvcr120.dll丢失怎样修复,教你msvcr120.dll丢失的解决办法

在使用电脑的过程中出现关于msvcr120.dll丢失的问题&#xff0c;那么出现这样的问题应该怎么解决呢&#xff1f;其实解决的办法也很简单&#xff0c;今天就和大家说说msvcr120.dll丢失怎样修复&#xff0c;同时给大家介绍一些关于msvcr120.dll文件的相关内容&#xff0c;了解ms…

CMake入门教程【核心篇】添加库(add_library)

&#x1f608;「CSDN主页」&#xff1a;传送门 &#x1f608;「Bilibil首页」&#xff1a;传送门 &#x1f608;「本文的内容」&#xff1a;CMake入门教程 &#x1f608;「动动你的小手」&#xff1a;点赞&#x1f44d;收藏⭐️评论&#x1f4dd; 文章目录 1. 基本用法2.STATIC…

python爬虫实现获取招聘信息

使用的python版本&#xff1a; 3.12.1 selenium版本&#xff1a;4.8.0 urllib版本&#xff1a;1.26.18 from selenium import webdriver from selenium.webdriver import ActionChains import timeimport re import xlwt import urllib.parsedef get_html(url):chrome_drive…

将linux的代码上传至gitte,从创建到linux命令详解

目录 1&#xff1a;创建gitte的代码仓库 1&#xff1a;登录gitte网页 https://gitee.com/ 2&#xff1a;点击导航栏的&#xff0b;号 3&#xff1a;点击新建仓库​编辑4&#xff1a;仓库配置 ​编辑5&#xff1a;复制仓库的路径 linux操作系统命令行 1&#xff1a; linux…

Hadoop集群三节点搭建(一)

一、第一台虚拟机准备 确认是可以上网&#xff0c;方便下载文件和工具&#xff0c;使用ping命令测试下 安装工具 net-tool&#xff1a;工具包集合&#xff0c;包含ifconfig等命令&#xff0c;大家可以根据自己需要按需下载 创建普通用户attest&#xff0c;并修改attest用户的密…

MO 2023 年度回顾

PART-ONE 行业态势 随着供需关系的变化&#xff0c;数据库的竞争在经历了 3 年 “百花齐放” 般的发展后&#xff0c;终于在 2023 年进入到了一个相对收拢的阶段。 2023 年&#xff0c;各个数据库厂商间很有默契地在两个方面达成了一致&#xff1a; HTAP 已经成为新一代数据…

YOLO算法入门指南:了解门槛、学习路径及其易学性

博主猫头虎的技术世界 &#x1f31f; 欢迎来到猫头虎的博客 — 探索技术的无限可能&#xff01; 专栏链接&#xff1a; &#x1f517; 精选专栏&#xff1a; 《面试题大全》 — 面试准备的宝典&#xff01;《IDEA开发秘籍》 — 提升你的IDEA技能&#xff01;《100天精通Golang》…