RocketMQ的简单使用

这里需要创建2.x版本的springboot项目 

导入依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.3</version>
        </dependency>
        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.7.6</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

定义配置文件

server:
  port: 3000

rocketmq:
  name-server: xxx.xxx.xxx.xxx:9876  # NameServer 地址
  producer:
    group: rocketmq-4x-service_common-message-execute_pg # 全局发送者组定义

生产者定义

这里的生产者有两个,一个是普通的,一个是延时。

import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.yhy.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.messaging.support.MessageBuilder;


@Component
@Slf4j
public class GeneralMessageDemoProduce {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public SendResult sendMessage(String topic, String tag, String keys, MessageEvent messageSendEvent) {
        SendResult sendResult;
        try{
            StringBuilder destinationBuilder = StrUtil.builder().append(topic);
            if(StrUtil.isNotBlank(tag)){
                destinationBuilder.append(":").append(tag);
            }
            Message<?> message = MessageBuilder
                    .withPayload(messageSendEvent)
                    .setHeader(MessageConst.PROPERTY_KEYS,keys)
                    .setHeader(MessageConst.PROPERTY_TAGS, tag)
                    .build();
            // 设置消息的延时级别
            sendResult=rocketMQTemplate.syncSend(
                    destinationBuilder.toString(),
                    message,
                    2000L
            );
            log.info("[普通消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);
        }catch(Throwable ex){
            log.error("[普通消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);
            throw ex;
        }
        return sendResult;
    }
}

延时的

@Component
@Slf4j
public class ScheduleProducer {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public SendResult sendMessage(String topic, String tag, String keys, MessageEvent messageSendEvent ) {
        SendResult sendResult;
        try {
            StringBuilder destinationBuilder = StrUtil.builder().append(topic);
            if(StrUtil.isNotBlank(tag)){
                destinationBuilder.append(":").append(tag);
            }
            Message<?> message = MessageBuilder
                    .withPayload(messageSendEvent)
                    .setHeader(MessageConst.PROPERTY_KEYS,keys)
                    .setHeader(MessageConst.PROPERTY_TAGS, tag)
                    .build();
            // 设置消息的延时级别
            sendResult=rocketMQTemplate.syncSend(
                    destinationBuilder.toString(),
                    message,
                    2000L,
                    6
            );
            log.info("[延时消息] 消息发送结果:{},消息ID:{},消息Keys:{}", sendResult.getSendStatus(), sendResult.getMsgId(), keys);
        }catch(Throwable ex){
            log.error("[延时消息] 消息发送失败,消息体:{}", JSON.toJSONString(messageSendEvent), ex);
            throw ex;
        }
        return sendResult;
    }
}

消费者定义

这里也是两个消费者,普通的和延时的不在同一个主题的内

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "rocketmq-yhy_topic",
        selectorExpression = "general",
        consumerGroup = "rocketmq-demo_general-message_cg"
)
public class GeneralMessageDemoConsume implements RocketMQListener<MessageEvent> {
    @Override
    public void onMessage(MessageEvent message) {
        log.info("接到RocketMQ消息,消息体:{}", JSON.toJSONString(message));
    }
}
import com.alibaba.fastjson.JSON;
import com.yhy.MessageEvent;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RocketMQMessageListener(
        topic = "Delay",
        selectorExpression = "general",
        consumerGroup = "rocketmq-demo_general-message_cg"
)
public class GeneralMessageDemoConsume_Delay implements RocketMQListener<MessageEvent> {
    @Override
    public void onMessage(MessageEvent message) {
        log.info("接到RocketMQ的延时消息,消息体:{}", JSON.toJSONString(message));
    }
}

发送消息

这里直接在启动类发送。

@SpringBootApplication
@RestController
public class RocketMQDemoApplication {
    @Autowired
    private GeneralMessageDemoProduce generalMessageDemoProduce;
    @Autowired
    private ScheduleProducer scheduleProducer;

    @PostMapping("/test/send/general-message")
    public String sendGeneralMessage() {
        String keys= UUID.randomUUID().toString();

        MessageEvent messageEvent=new MessageEvent("消息具体内容——yhy",keys);

        SendResult sendResult=generalMessageDemoProduce.sendMessage(
                "rocketmq-yhy_topic",
                "general",
                keys,
                messageEvent
        );

        SendResult sendResult2=scheduleProducer.sendMessage(
                "Delay",
                "general",
                keys,
                messageEvent
        );
        System.out.println(sendResult.getSendStatus().name() );
        System.out.println(sendResult2.getSendStatus().name());
        return sendResult.getSendStatus().name();
    }
        public static void main(String[] args) {
        SpringApplication.run(RocketMQDemoApplication.class, args);
    }
}

postman触发

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/522250.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ChromeOS 中自启动 Fcitx5 和托盘 stalonetray

ChromeOS 更新的飞快&#xff0c;旧文章的方法也老是不好用&#xff0c;找遍了也没找到很好的可以开机自启动 Linux VM 和输入法、托盘的方法。 研究了一下&#xff08;不&#xff0c;是很久&#xff09;&#xff0c;终于找到个丑陋的实现。 方法基于 ChromeOS 123.0.6312.94…

带刻度透明PFA量筒耐强酸碱耐高温全氟烷氧基树脂量具

PFA量筒为上下等粗的直筒状&#xff0c;特氟龙量杯是上大下小的圆台形&#xff0c;底座均有宽台设计&#xff0c;保证稳定性&#xff0c;两者均可在实验室中作为定量量取液体的量具&#xff0c;上沿一侧有弧嘴设计&#xff0c;便于流畅地倾倒液体。 规格参考&#xff1a;5ml、…

蓝桥杯每日一题:公约数(gcd)

题目描述&#xff1a; 给定两个正整数 a 和 b。 你需要回答 q 个询问。 每个询问给定两个整数 l,r&#xff0c;你需要找到最大的整数 x&#xff0c;满足&#xff1a; x 是 a和 b 的公约数。l≤x≤r。 输入格式 第一行包含两个整数 a,b。 第二行包含一个整数 q。 接下来…

理解Go语言中的并发和并行

即使有多年的并发编程经验,有些开发人员也可能无法清楚地理解并发(concurrency)和并行(parallelism)之间的区别。下面我们以一个真实的例子来说明:一家咖啡店。 在这家咖啡店中,一名服务员负责接收订单并使用一台咖啡机进行准备。顾客下订单,然后等待他们的咖啡。 …

Gateway是什么?(SpringCloudAlibaba组件)

1、网关介绍 **网关(Gateway)又称网间连接器、协议转换器。网关在传输层上以实现网络互连&#xff0c;是最复杂的网络互连设备&#xff0c;仅用于两个高层协议不同的网络互连。**网关的结构也和路由器类似&#xff0c;不同的是互连层。网关既可以用于广域网互连&#xff0c;也可…

IP证书申请

目录 申请IP证书的基本条件&#xff1a; 申请和使用公网IP证书的过程&#xff1a; 为什么需要申请IP地址证书&#xff1f; 申请IP证书&#xff1a; IP证书又称公网IP地址证书&#xff0c;是一种特殊的SSL/TLS证书&#xff0c;其作用原理和普通的域名证书很像&#xff0c;域…

使用 Cloudflare 和全栈框架实现快速开发

去年 Cloudflare 发布了一系列新功能&#xff0c;使在 Cloudflare 上部署 Web 应用程序变得更加容易&#xff0c;我们看到 Astro、Next.js、Nuxt、Qwik、Remix、SolidStart、SvelteKit 和其他托管 Web 应用程序的大幅增长。 近日 Cloudflare 对这些 Web 框架的集成模块进行了重…

思诺流体邀您探索科技前沿2024年第13届生物发酵展

参展企业介绍 保定思诺流体科技有限公司是一家集研发、生产、销售于一体的高新技术企业。从事蠕动泵、蠕动泵软管、蠕动泵OEM产品、蠕动泵灌装系统等的研发、生产与销售。产品在科研实验室、化工、印刷、环保、水处理等领域得到了广泛应用。 “思诺”取自“Signal”的英…

Linux学习记录20——文件的隐藏权限

一.学习的内容 Linux系统中的文件除了具备一般权限和特殊权限之外&#xff0c;还有一种隐藏权限&#xff0c;即被隐藏起来的权限&#xff0c;默认情况下不能直接被用户发觉。既然叫隐藏权限&#xff0c;那么使用常规的ls命令肯定不能看到它的真面目。隐藏权限的专用设置命令是 …

认识JAVA语言(一)扩充

Java语言的程序控制结构 (2.5) 在Java语言中&#xff0c;程序的流程控制对于代码执行的逻辑有着至关重要的作用。通过条件控制和循环控制&#xff0c;程序可以做出决策、重复执行任务&#xff0c;并在合适的时间退出。本章将详细介绍这些结构&#xff0c;并通过代码示例和表格来…

D1084是一款具有5A输出能力、低压差为1.5V的三端稳压器。采用TO-220、TO-263和TO-252封装形式

1、 概述&#xff1a; D1084是一款具有5A输出能力、低压差为1.5V的三端稳压器。输出电压可通过电位器调节或1.5V, 1.8V, 3.3V三个固定电压版。内含电流限制和热保护功能&#xff0c;防止任何过载时产生过高的结温。D1084系列电路有标准TO-220、TO-263和TO-252封装形式。 2、 典…

短剧APP开发:探索剧情新领域,畅享精彩短剧时光

随着移动互联网的快速发展&#xff0c;短剧作为一种新兴的内容形式&#xff0c;以其短小精悍、情节紧凑的特点&#xff0c;逐渐受到广大用户的喜爱。为了满足用户对短剧内容的日益增长需求&#xff0c;我们决定开发一款全新的短剧APP&#xff0c;为用户带来前所未有的观剧体验。…

鼠标灵敏度怎么调,鼠标灵敏度怎么调最稳

鼠标和键盘是操作计算机过程中使用最频繁的设备之一,用电脑的时,我敢说你一定离不开鼠标。有些用户发现鼠标不太好用,尤其是在游戏时,总觉得鼠标移动太慢了。另外,如果你感觉鼠标按键失灵、鼠标单击变双击以及反应迟钝等等,出现这样的问题,应该是鼠标灵敏度没有调整好。…

先锋阀门带您领略2024第13届生物发酵装备展

参展企业介绍 温州先锋阀门有限公司坐落于【中国阀门城】---温州市龙湾&#xff0c;是一家集研发、设计、制造、销售和服务为一体的科技创新型企业。拥有10多项国家专利&#xff0c;三个产品荣获中国通用机械工业协会颁发的(中国国际阀门博览会)银奖称号&#xff0c;部分产品还…

干货 | 探索CUTTag:从样本到文库,实验步步为营!

CUT&Tag&#xff08;Cleavage Under Targets and Tagmentation&#xff09;是一种新型DNA-蛋白互作研究技术&#xff0c;主要用于研究转录因子或组蛋白修饰在全基因组上的结合或分布位点。相比于传统的ChIP-seq技术&#xff0c;CUT&Tag反应在细胞内进行&#xff0c;创新…

win10鼠标无限转圈圈是什么原因,win10系统鼠标无限转圈圈

win10鼠标无限转圈圈是什么原因?一般后台有程序在运行,鼠标出现圆圈转动则代表正在加载中,等待一会就好了。若如果转了好久的圈圈,程序没有响应,点击桌面也没有反应,则尝试打开任务管理器,将未响应或异常的程序强制结束掉。其实,出现这种情况,有可能是win10系统中的一…

HarmonyOS4.0 ArkUI常用组件

一、Image 语法&#xff1a; Image(src:string|PixelMap|Resource)使用方式&#xff1a; string格式&#xff1a;用来加载网络图片&#xff0c;需要在module.json5中申请网络访问权限&#xff1a;ohos.permission.INTERNET Image("http://xxx.png")PixelMap格式&am…

医保是如何报销的

《医保是如何报销的》 这是罗师兄的原创文章 预计5-6分钟读完 作者&#xff1a;罗师兄 地球号&#xff1a;luoyun515 很多时候大家听到医保报销比例80%&#xff0c;85%&#xff0c;90%等&#xff0c; 但真正报销后&#xff0c; 实际花费跟报销额度根本达不到这么高&#…

LeetCode-78. 子集【位运算 数组 回溯】

LeetCode-78. 子集【位运算 数组 回溯】 题目描述&#xff1a;解题思路一&#xff1a;回溯&#xff0c;回溯三部曲解题思路二&#xff1a;0解题思路三&#xff1a;0 题目描述&#xff1a; 给你一个整数数组 nums &#xff0c;数组中的元素 互不相同 。返回该数组所有可能的 子…

【OTA】STM32-OTA升级——持续更新

【OTA】STM32-OTA升级——持续更新 文章目录 前言一、ymodem串口协议1、Ymodem 协议2、PC3、蓝牙4、WIFI云平台 二、UDS车载协议1.UDS协议 总结 前言 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、ymodem串口协议 1、Ymodem 协议 STM32 Ymodem …