SpringBoot整合【RocketMQ】

目录

1.POM文件添加依赖及yml配置

2.RocketmqUtil

3.生产者(异步发送示例)

4.消费者

5.测试


1.POM文件添加依赖及yml配置

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: My_Group
    send-message-timeout: 3000
    retry-times-when-send-failed: 3
    retry-times-when-send-async-failed: 3

2.RocketmqUtil

package com.kaying.marketing.platform.common.util.rocketMq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * @Description: RocketMQ消息的生产者
 * @Author: hwk
 */

@Component
@Slf4j
public class RocketMqUtil {
    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    public void sendMsg(String topic,String data) {
        rocketMqTemplate.convertAndSend(topic,data);
        log.info("【RocketMQ】发送同步消息:{}", data);
    }

    public void asyncSend(String topic, String tag, String data,Integer messageDelayLevel) {
        rocketMqTemplate.asyncSend(topic + ":" + tag, MessageBuilder.withPayload(data).build(), new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        // 消息发送成功
                        log.error("消息发送成功"+sendResult);
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        // 消息发送异常
                        log.error("异步发送消息异常。topic:" + topic + ";tag:" + tag + ";mqMsg" + data, throwable);
                    }
                },
                3000L,
                // messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d
                // messageDelayLevel = 1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18 19
                messageDelayLevel);
    }

    /**
     * 发送同步消息:消息响应后发送下一条消息
     *
     * @param topic 消息主题
     * @param tag   消息tag
     * @param key   业务号
     * @param data  消息内容
     */
    public void sendSyncMsg(String topic, String tag, String key, String data) {
        //消息
        Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
        //主题
        String destination = topic + ":" + tag;
        SendResult sendResult = rocketMqTemplate.syncSend(destination, message);

        log.info("【RocketMQ】发送同步消息:{}", sendResult);
    }

    /**
     * 发送异步消息:异步回调通知消息发送的状况
     *
     * @param topic 消息主题
     * @param tag   消息tag
     * @param key   业务号
     * @param data  消息内容
     */
    public void sendAsyncMsg(String topic, String tag, String key, String data) {
        //消息
        Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
        //主题
        String destination = topic + ":" + tag;
        rocketMqTemplate.asyncSend(destination, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("【RocketMQ】发送异步消息:{}", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                log.info("【RocketMQ】发送异步消息异常:{}", e.getMessage());
            }
        });
    }


    /**
     * 发送单向消息:消息发送后无响应,可靠性差,效率高
     *
     * @param topic 消息主题
     * @param tag   消息tag
     * @param key   业务号
     * @param data  消息内容
     */
    public void sendOneWayMsg(String topic, String tag, String key, String data) {
        //消息
        Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
        //主题
        String destination = topic + ":" + tag;
        rocketMqTemplate.sendOneWay(destination, message);
    }


    /**
     * 同步延迟消息
     *
     * @param topic      主题
     * @param tag        标签
     * @param key        业务号
     * @param data       消息体
     * @param timeout    发送消息的过期时间
     * @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     *
     */
    public void sendSyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {
        // messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 1d
        // messageDelayLevel = 1  2  3   4   5  6  7  8  9  10 11 12 13 14  15  16  17 18 19
        //消息
        Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
        //主题
        String destination = topic + ":" + tag;
        SendResult sendResult = rocketMqTemplate.syncSend(destination, message, timeout, delayLevel);
        log.info("【RocketMQ】发送同步延迟消息:{}", sendResult);
    }


    /**
     * 异步延迟消息
     *
     * @param topic      主题
     * @param tag        标签
     * @param key        业务号
     * @param data       消息体
     * @param timeout    发送消息的过期时间
     * @param delayLevel 延迟等级-----固定等级:1到18分别对应1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
     */
    public void sendAsyncDelayMsg(String topic, String tag, String key, String data, long timeout, int delayLevel) {
        //消息
        Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
        //主题
        String destination = topic + ":" + tag;
        rocketMqTemplate.asyncSend(destination, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("【RocketMQ】发送异步延迟消息:{}", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                log.info("【RocketMQ】发送异步延迟消息异常:{}", e.getMessage());
            }
        }, timeout, delayLevel);
    }


    /**
     * 同步顺序消息
     *
     * @param topic 主题
     * @param tag   标签
     * @param key   业务号
     * @param data  消息体
     */
    public void sendSyncOrderlyMsg(String topic, String tag, String key, String data) {
        //消息
        Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
        //主题
        String destination = topic + ":" + tag;
        SendResult sendResult = rocketMqTemplate.syncSendOrderly(destination, message, key);
        log.info("【RocketMQ】发送同步顺序消息:{}", sendResult);
    }


    /**
     * 异步顺序消息
     *
     * @param topic 主题
     * @param tag   标签
     * @param key   业务号
     * @param data  消息体
     */
    public void sendAsyncOrderlyMsg(String topic, String tag, String key, String data) {
        //消息
        Message message = MessageBuilder.withPayload(data).setHeader(RocketMQHeaders.KEYS, key).build();
        //主题
        String destination = topic + ":" + tag;
        rocketMqTemplate.asyncSendOrderly(destination, message, key, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("【RocketMQ】发送异步顺序消息:{}", sendResult);
            }
            @Override
            public void onException(Throwable e) {
                log.info("【RocketMQ】发送异步顺序消息异常:{}", e.getMessage());
            }
        });
    }
}

3.生产者(异步发送示例)

//异步发送消息代码示例
rocketMqUtil.sendAsyncMsg(RocketConstant.TEST_TOPIC1, RocketConstant.TEST_TAG1, UUID.randomUUID().toString(), "测试消息一");

4.消费者

简单的负载均衡消费的示例(指定topic和tag,相同的组即为负载均衡消费)

也可以指定不同的topic和不同的tag进行消息区分

注意线上和本地连接同一个MQ也会导致负载均衡,导致线上消息丢失

    @RocketMQMessageListener(consumerGroup = "1",
            topic = RocketConstant.TEST_TOPIC1,
            selectorExpression = RocketConstant.TEST_TAG1)
    @Service
    public class RocketConsumerTag1 implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            String orderNo = message;
            log.info("tag1,接收:{}", orderNo);

        }
    }

    @RocketMQMessageListener(consumerGroup ="1",
            topic = RocketConstant.TEST_TOPIC1,
            selectorExpression = RocketConstant.TEST_TAG1)
    @Service
    public class RocketConsumerTag2 implements RocketMQListener<String> {
        @Override
        public void onMessage(String message) {
            String orderNo = message;
            log.info("tag2,接收:{}", orderNo);

        }
    }

5.测试

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/435949.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Android使用Sensor.TYPE_STEP_COUNTER计步器传感器进行步数统计

1、首先&#xff0c;申请权限 必须声明 ACTIVITY_RECOGNITION 权限&#xff0c;以便您的应用在运行 Android 10 (API 级别 29) 或更高版本的设备上使用此传感器。 Manifest.xml也记得声明 if (Build.VERSION.SDK_INT > Build.VERSION_CODES.P) {Log.d(TAG, "[权限]&quo…

Elasticsearch:从 ES|QL 到 Python 数据帧

在我之前的文章 “Elasticsearch&#xff1a;ES|QL 查询展示”&#xff0c;我展示了如何在 Kibana 中使用 ES|QL 对索引来进行查询及统计。在很多的情况下&#xff0c;我们需要在客户端中来对数据进行查询&#xff0c;那么我们该怎么办呢&#xff1f;我们需要使用到 Elasticsea…

挖掘NCDA设计大赛获奖作品的成功之道:探讨表情包设计竞争力的关键因素

第12届大赛简介 - 未来设计师全国高校数字艺术设计大赛&#xff08;NCDA&#xff09;开始啦&#xff01;今天我们就特地来说说它的虚拟IP及表情包设计的命题之一的表情包设计选项&#xff0c;为了使大家更好地参加本次比赛&#xff0c;本文特别整理了往届NCDA的表情包设计获奖作…

(每日持续更新)信息系统项目管理(第四版)(高级项目管理)考试重点整理 第13章 项目资源管理(五)

项目建议与立项申请、初步可行性研究、详细可行性研究、评估与决策是项目投资前使其的四个阶段。在实际工作中&#xff0c;初步可行性研究和详细可行性研究可以依据项目的规模和繁简程度合二为一&#xff0c;但详细可行性研究是不可缺少的。升级改造项目制作初步和详细研究&…

基于Python实现银行卡识别

在本文中将介绍如何使用Python和深度学习技术来实现银行卡识别功能。银行卡识别是一个在金融、安全等领域具有重要应用的问题&#xff0c;将使用深度学习模型来实现银行卡图像的识别和分类。 目录 引言数据集准备预处理和特征提取模型选择与训练模型评估与性能优化部署与应用 引…

【Leetcode 438】找到字符串中所有字母异位词 —— 滑动窗口

438. 找到字符串中所有字母异位词 给定两个字符串s和p&#xff0c;找到s中所有p的 异位词 的子串&#xff0c;返回这些子串的起始索引。不考虑答案输出的顺序。 异位词 指由相同字母重排列形成的字符串&#xff08;包括相同的字符串&#xff09;。 示例 1: 输入: s “cbaeb…

DRAM 是什么?一文看懂DRAM 产业完整介绍!

全球经济前景不乐观&#xff0c;导致 DRAM 需求下滑&#xff0c;随着 DRAM 价格的连续下跌&#xff0c;三星、海力士等相关大厂的业绩前景都不被看好。那究竟 DRAM 到底是什么产品&#xff1f; DRAM 是什么&#xff1f; DRAM 其实就是我们一般生活中常常在讲的“存储”&#x…

回溯算法05-分割回文子串(Java)

5.分割回文子串 题目描述 给你一个字符串 s&#xff0c;请你将 s 分割成一些子串&#xff0c;使每个子串都是 回文串 。返回 s 所有可能的分割方案。 回文串 是正着读和反着读都一样的字符串。 示例 1&#xff1a; 输入&#xff1a;s "aab" 输出&#xff1a;[[…

云服务器99元一年阿里云和腾讯云对比,明智选择!

腾讯云服务器99元一年是真的吗&#xff1f;真的&#xff0c;只是又降价了&#xff0c;现在只要61元一年&#xff0c;配置为2核2G3M轻量应用服务器&#xff0c;40GB SSD盘&#xff0c;腾讯云百科txybk.com分享腾讯云官方活动购买链接 https://curl.qcloud.com/oRMoSucP 活动打开…

Clion调试QT程序qDebug()、cout控制台无输出的可能解决方法

qDebug()不输出 在当前项目配置中添加一个环境变量 方法一、单独为配置 QT_ASSUME_STDERR_HAS_CONSOLE1 方法二、全局配置&#xff08;系统变量&#xff09; 一劳永逸 效果 cout不输出 Clion在debug调试C/C的时候&#xff0c;printf/cout不会实时输出情况 结果同上~ 谢阅…

【视觉AIGC识别】误差特征、人脸伪造检测、其他类型假图检测

视觉AIGC识别——人脸伪造检测、误差特征 不可见水印 前言视觉AIGC识别【误差特征】DIRE for Diffusion-Generated Image Detection方法扩散模型的角色DIRE作为检测指标 实验结果泛化能力和抗扰动 人脸伪造监测&#xff08;Face Forgery Detection&#xff09;人脸伪造图生成 …

进程间通信之信号灯 || 网络协议UDP/TCP || 三次握手四次挥手

在线程通信中由于数据段等内存空间的共用性&#xff0c;导致同时访问时资源竞争的问题&#xff0c;在线程中我们使用信号量的申请和释放&#xff0c;在防止资源竞争的产生。在进程间的通信中&#xff0c;有信号灯的概念。搭配共享内存实现进程同步。 有名信号量: 1.创建 …

请你简单说一下 Mysql 的事务隔离级别

什么情况&#xff0c;写了 5 年的 CRUD&#xff0c;还搞不清楚 Mysql 的事务隔离级别&#xff0c;难怪第一面就被刷下来。 一个 5 年经验的粉丝&#xff0c;在一个公司干了 5 年&#xff0c;觉得自己特厉害&#xff0c;什么都能搞定&#xff0c;结果每次一到技术面就被刷。问我…

滴滴基于 Clickhouse 构建新一代日志存储系统

ClickHouse 是2016年开源的用于实时数据分析的一款高性能列式分布式数据库&#xff0c;支持向量化计算引擎、多核并行计算、高压缩比等功能&#xff0c;在分析型数据库中单表查询速度是最快的。2020年开始在滴滴内部大规模地推广和应用&#xff0c;服务网约车和日志检索等核心平…

Unity UGUI之InputField(TMP)基本了解

Unity的InputField组件是用于在Unity中创建可供用户输入文本的输入框的UI组件。通过InputField组件&#xff0c;可以让用户在运行时输入文本&#xff0c;比如用户名、密码、搜索关键字等。其中TMP版本的InputField是基于TextMeshPro的InputField组件&#xff0c;提供了更多的文…

【Java JVM】Class 文件的加载

Java 虚拟机把描述类的数据从 Class 文件加载到内存, 并对数据进行校验, 转换解析和初始化, 最终形成可以被虚拟机直接使用的 Java 类型, 这个过程被称作虚拟机的类加载机制。 与那些在编译时需要进行连接的语言不同, 在 Java 语言里面, 类的加载, 连接和初始化过程都是在程序…

java IO 01 输入和输出,File在磁盘上的创建,File的函数,目录

输入和输出&#xff1a; 输入和输出都是从内存的角度出发的&#xff0c;也可以说是java程序角度。 输入到内存的&#xff08;java程序的&#xff09;都是输入 从内存的&#xff08;java程序的&#xff09;都是输出 02. import java.io.File; import java.io.IOException;pu…

vue2源码分析-vue入口文件global-api分析

文章背景 vue项目开发过程中,首先会有一个初始化的流程,以及我们会使用到很多全局的api,如 this.$set this.$delete this.$nextTick,以及初始化方法extend,initUse, initMixin , initExtend, initAssetRegisters 等等那它们是怎么实现,让我们一起来探究下吧 源码目录 global-…

Nodejs web服务器之GET、POST请求初次体验

一、认识http请求 步骤 1.DNS解析域名&#xff0c;找到ip地址&#xff0c;建立TCP连接&#xff0c;发起http请求 2.服务器接收到http请求&#xff0c;进行处理&#xff0c;返回数据 3.客户端接收到返回的数据&#xff0c;处理数据&#xff08;比如渲染页面&#xff09; 二、no…

外贸公司老板最喜欢的wordpress模板

电池wordpress外贸企业主题 电池wordpress外贸企业主题&#xff0c;做新能源外贸公司的企业官方网站模板。 https://www.jianzhanpress.com/?p3602 西联设备wordpress外贸企业模板 西联设备wordpress外贸企业模板&#xff0c;做外贸自建站就用西联wordpress企业主题。 htt…