RocketMq详解:三、RocketMq通用生产和消费方法改造

文章目录

  • 1.背景
  • 2.通用方法改造
    • 2.1添加maven依赖
    • 2.2 RocketMq基础配置
    • 2.3 配置类
    • 2.5 消息传输的对象和结果
    • 2.4 消息生产者
    • 2.5 消息消费者
    • 2.6 功能测试

1.背景

在第二章:《RocketMq详解:二、SpringBoot集成RocketMq》中我们已经实现了消费基本生产和消费的实现,但是在真实的开发环境中如果按照这种方式去实现,冗余代码较多,且通过实现RocketMQListeneronMessage的方法去完成消息消费无返回结果,在后期的流程中不易维护,因此,本章将对这些问题进行二次改造和优化。

为了防止新同学从头开始学,本章将如何配置和实现简单在复述一下,至于具体怎么安装RocketMq,本文提供两种安装方法:

  • 《MacOS环境下RocketMQ安装及部署 RocketMQ Dashboard 可视化》
  • 《docker安装rocketMq》

2.通用方法改造

2.1添加maven依赖

		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-spring-boot-starter</artifactId>
			<version>2.2.3</version>
		</dependency>

2.2 RocketMq基础配置

接着,在application.yml或application.properties文件中配置RocketMQ的相关参数,如NameServer地址、生产者组、消费者组等:

rocketmq:
  name-server: 127.0.0.1:9876
  # 生产者
  producer:
    group: boot_group_1
    # 消息发送超时时间
    send-message-timeout: 3000
    # 消息最大长度4M
    max-message-size: 4096
    # 消息发送失败重试次数
    retry-times-when-send-failed: 3
    # 异步消息发送失败重试次数
    retry-times-when-send-async-failed: 2
  # 消费者
  consumer:
    group: boot_group_1
    # 每次提取的最大消息数
    pull-batch-size: 5

上面的配置如果是在分布式环境下也可以配置在Apollonacos等配置中心里进行动态配置

2.3 配置类

在配置类中主要定义两个Bean的加载,即RocketMQTemplateDefaultMQProducer,主要是提供消息发送的能力,即生产消息;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author ninesun
 * @ClassName RocketMqConfig
 * @description: 消息中间件配置类
 * @date 2024年05月19日
 * @version: 1.0
 */
@Configuration
public class RocketMqConfig {

    @Value("${rocketmq.name-server}")
    private String nameServer;

    @Value("${rocketmq.producer.group}")
    private String producerGroup;

    @Value("${rocketmq.producer.send-message-timeout}")
    private Integer sendMsgTimeout;

    @Value("${rocketmq.producer.max-message-size}")
    private Integer maxMessageSize;

    @Value("${rocketmq.producer.retry-times-when-send-failed}")
    private Integer retryTimesWhenSendFailed;

    @Value("${rocketmq.producer.retry-times-when-send-async-failed}")
    private Integer retryTimesWhenSendAsyncFailed;

    @Bean
    public RocketMQTemplate rocketMqTemplate() {
        RocketMQTemplate rocketMqTemplate = new RocketMQTemplate();
        rocketMqTemplate.setProducer(defaultMqProducer());
        return rocketMqTemplate;
    }

    @Bean
    public DefaultMQProducer defaultMqProducer() {
        DefaultMQProducer producer = new DefaultMQProducer();
        producer.setNamesrvAddr(this.nameServer);
        producer.setProducerGroup(this.producerGroup);
        producer.setSendMsgTimeout(this.sendMsgTimeout);
        producer.setMaxMessageSize(this.maxMessageSize);
        producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed);
        producer.setRetryTimesWhenSendAsyncFailed(this.retryTimesWhenSendAsyncFailed);
        return producer;
    }
}

在编写消费者和生产者之前,我们先统一一下消息传输的对象,以及消费的结果

2.5 消息传输的对象和结果

  • 基本传输对象
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class MessageDTO<T> {
    private T data;
    private Integer delayTime;
    @NotBlank(message = "Topic must not be blank")
    private String topic;
    @NotBlank(message = "Key must not be blank")
    private String key;
    private List<String> tags;
    private String messageType;
}
  • 消费结果枚举
public enum MsgRetryStatus {


    SUCCEED(-1), FAILURE(0), RETRY(0), RETRY_1S(1), RETRY_5S(2),
    RETRY_10S(3), RETRY_30S(4), RETRY_1M(5), RETRY_2M(6), RETRY_3M(7),
    RETRY_4M(8), RETRY_5M(9), RETRY_6M(10), RETRY_7M(11), RETRY_8M(12),
    RETRY_9M(13), RETRY_10M(14), RETRY_20M(15), RETRY_30M(16), RETRY_1H(17), RETRY_2H(18),
    RETRY_1D(19), RETRY_3D(20),  RETRY_7D(21), RETRY_14D(22), RETRY_21D(23),  RETRY_28D(24),
    RETRY_35D(25);

    int level;

    MsgRetryStatus(int level) {
        this.level = level;
    }

    public int getLevel() {
        return level;
    }
}

2.4 消息生产者

@Component
public class MessageProduct {
    @Resource
    private RocketMQTemplate rocketMqTemplate;

    public SendResult SendMessage(MessageDTO data) {
        // 创建一个Message对象
        org.springframework.messaging.Message<?> message = MessageBuilder.withPayload(JSON.toJSONString(data))
                .setHeader(RocketMQHeaders.KEYS,data.getKey())
                .setHeader(RocketMQHeaders.TAGS,data.getTags())
                .build();
        return rocketMqTemplate.syncSendDelayTimeSeconds(data.getTopic(), message, data.getDelayTime());
    }
}

2.5 消息消费者

在原始的实现中,我们通过实现RocketMQListener接口中的onMessage方法来完成消息的消费。

然而,这种方式存在一个问题:如果消息消费失败,我们无法获取到返回结果,也不便于进行错误处理和重试。

为了优化这个问题,我们可以使用@RocketMQMessageListener注解的returnTopic属性来指定一个返回主题,当消息消费失败时,将消息发送到这个返回主题中。

同时,我们可以创建一个专门的处理失败消息的消费者来处理这些返回的消息。

另外,我们还可以在onMessage方法中添加异常处理逻辑,以便在消费失败时进行错误处理和记录日志。

新增一个抽象类CommonConsumer去实现RocketMQListener,并提供一个有返回值的doConsumerProcess方法,去实现具体的消费逻辑,具体实现如下:

@Slf4j
@Component
public abstract class CommonConsumer implements RocketMQListener<MessageDTO> {
    public void onMessage(MessageDTO message) {
        log.info("收到延迟消息成功,消息体:{}", message);
        doConsumerProcess(message);
    }

    public abstract MsgRetryStatus doConsumerProcess(MessageDTO messageDTO);
}

以boot-mq-topic为例,我们实现具体的消费,新增一个对象BootMqConsumer继承我们的CommonConsumer,来实现具体的消费逻辑

@Slf4j
@Component
public abstract class CommonConsumer implements RocketMQListener<MessageDTO> {
    public void onMessage(MessageDTO message) {

        try {
            // 处理消息的逻辑
            log.info("收到延迟消息成功,消息体:{}", message);
            MsgRetryStatus msgRetryStatus = doConsumerProcess(message);
            if (MsgRetryStatus.RETRY.equals(msgRetryStatus)
                    || MsgRetryStatus.FAILURE.equals(msgRetryStatus)) {
                //TODO 消费失败或重试 则发送重试Topic
            }
        } catch (Exception e) {
            // 记录错误日志
            e.printStackTrace();
            // 可以选择将失败消息发送到指定Topic
            this.sendReturnMessgae();
        }
    }

    public abstract MsgRetryStatus doConsumerProcess(MessageDTO messageDTO);

    private void doRetrytConsumerProcess() {
        //TODO:待实现,后面补上
    }
}

2.6 功能测试

package com.example.demo.controller;

import com.alibaba.fastjson.JSON;
import com.example.demo.annoation.Idempotent;
import com.example.demo.mq.producer.MessageProduct;
import com.example.demo.po.MessageDTO;
import com.example.demo.po.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Collections;
import java.util.UUID;

@RestController
@Slf4j
@Validated
public class TestController01 {
    @Resource
    private RocketMQTemplate rocketMqTemplate;

    @Resource
    private DefaultMQProducer defaultMqProducer;

    @Resource
    private MessageProduct messageProduct;

    @GetMapping("/send/msg4")
    public String sendMsg4() {
        try {
            User user = User.builder()
                    .id(1)
                    .name("ninesun")
                    .build();
            MessageDTO<User> messageDTO = MessageDTO.<User>builder()
                    .data(user)
                    .delayTime(3)
                    .topic("boot-mq-topic")
                    .key(String.valueOf(UUID.randomUUID()))
                    .build();
            SendResult sendResult = messageProduct.SendMessage(messageDTO);
            log.info("msgId:{},sendStatus:{},data:{}", sendResult.getMsgId(), sendResult.getSendStatus(), JSON.toJSONString(messageDTO));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "OK";
    }
}

附:User对象

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class User {
    private Integer id;
    private String name;
    private String desc;
}

访问:/send/msg4接口,结果如下:
在这里插入图片描述

至此,我们便可以在真实环境中很容易的去集成消息队列实现功能的解耦,流量的削峰。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/890526.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

动态规划-多状态问题——740.删除获得点数

1.题目解析 题目来源&#xff1a;740.删除并获得点数——力扣 测试用例 2.算法原理 首先将原数组根据每个数映射为下标&#xff0c;相加后存储在以该数本身为下标的新数组中 1.状态表示 这里与路径问题不同的是每个位置都不止一个状态&#xff0c;因此开辟两个dp表&#xff0…

Unity URP shader ———魔系符文宝石是如何练成的

各位同学大家好 我已经很久没有没有写教程了&#xff0c;最近项目比较忙。各种加班各种带小孩儿&#xff0c;不过&#xff0c;老师一有机会也在给尽可能服务大家&#xff0c;今天来一个硬菜&#xff1a;移动端高效魔系符文如何制作&#xff0c;国庆起来&#xff0c;老师抽了点…

六西格玛设计DFSS方法论在消费级无人机设计中的应用——张驰咨询

本文基于六西格玛设计方法论&#xff0c;对消费级无人机的设计流程进行系统化研究&#xff0c;探讨如何通过六西格玛设计的理念、工具和方法提升无人机产品的设计质量和市场竞争力。文章从市场定位、客户需求分析出发&#xff0c;深入到关键KPI指标的制定&#xff0c;并逐步阐述…

vulnhub-Web Developer 1靶机

vulnhub&#xff1a;Web Developer: 1 ~ VulnHub 导入靶机&#xff0c;放在kali同网段&#xff0c;扫描 靶机在192.168.114.129&#xff0c;扫描端口 有网站服务&#xff0c;访问 没什么东西&#xff0c;扫目录 真不少&#xff0c;访问一下&#xff0c;也只是一些普通的Wordpr…

滑雪——记忆化搜索

题目 代码 //#pragma GCC optimize(3)#include <bits/stdc.h> const int N 310; using namespace std; int dx[4] {-1, 0, 1, 0}, dy[4] {0, 1, 0, -1}; int ans; int g[N][N]; int r, c; int f[N][N]; int dfs(int x, int y) {if(~f[x][y]) return f[x][y];f[x][y] …

【JavaSE基础】Java 变量

为什么需要变量 变量是程序的基本组成单位 class Test{public static void main(String[] args){int a 1; //定义一个变量&#xff0c;类型为int&#xff0c;变量名为a&#xff0c;并赋值为1int b 3; //定义另一个变量&#xff0c;类型为int&#xff0c;变量名为b&#xff0…

2-120 基于matlab的滑动平均滤波下通过幅度谱最大值方法估计太阳黑子的周期

基于matlab的滑动平均滤波下通过幅度谱最大值方法估计太阳黑子的周期。具体步骤为&#xff1a;1&#xff09;在Matlab 环境下读取太阳黑子数目序列&#xff0c;并绘制其时域波形&#xff1b; 2&#xff09;采用离散时间卷积计算方法对太阳黑子数据序列进行滑动平均滤波&#xf…

vue后台管理系统从0到1搭建(4)各组件的搭建

文章目录 vue后台管理系统从0到1搭建&#xff08;4&#xff09;各组件的搭建Main.vue 组件的初构 vue后台管理系统从0到1搭建&#xff08;4&#xff09;各组件的搭建 Main.vue 组件的初构 根据我们的效果来看&#xff0c;分析一下&#xff0c;我们把左边的区域分为一个组件&am…

前端的全栈之路:基于 Vue3 + Nest.js 全栈开发的后台应用

☘️ 项目简介 Vue3 Admin 是一个前端基于 Soybean Admin 二次开发&#xff0c;后端基于 Nest.js 的全栈后台应用&#xff0c;适合学习全栈开发的同学参考学习。 &#x1f341; 前端技术栈&#xff1a; Vue3.5、Ant Design Vue、UnoCSS、Pinia &#x1f341; 后端技术栈&…

【浏览器】如何正确使用Microsoft Edge

1、清理主页广告 如今的Microsoft Edge 浏览器 主页太乱了&#xff0c;各种广告推送&#xff0c;点右上角⚙️设置&#xff0c;把快速链接、网站导航、信息提要、背景等全部关闭。这样你就能得到一个超级清爽的主页。 网站导航       关闭 …

线程基础学习

线程的实现 通过实现Runnable接口的方式&#xff0c;实现其中的run方法。继承Thread类&#xff0c;然后重写其中的run方法。通过线程池创建线程&#xff0c;默认采用DefaultThreadFactory。有返回值的callable&#xff0c;实现callable接口&#xff0c;实行call方法。 本质上…

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-13

计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-13 目录 文章目录 计算机前沿技术-人工智能算法-大语言模型-最新研究进展-2024-10-13目录1. The Cognitive Capabilities of Generative AI: A Comparative Analysis with Human Benchmarks2. WALL-E: World Alig…

动态规划的优化与高级应用

姊妹篇&#xff1a; 动态规划基础与经典问题-CSDN博客 贪心算法&#xff1a;原理、应用与优化_最优解-CSDN博客​​​​​​贪心算法&#xff1a;原理、应用与优化_最优解-CSDN博客 一、动态规划的优化策 动态规划在提高时间效率的同时&#xff0c;往往会占用较多的空间。因…

Unity3d折叠Inspector中的变量

InspectorFoldoutGroup插件 [Pixeye.Unity.Foldout("【曲线图】")] public BrokenLineUpDownGraph aimStabilityGraph;[Pixeye.Unity.Foldout("【曲线图】")] public BrokenLineUpGraph aimDensityGraph;[Pixeye.Unity.Foldout("【曲线图】")] p…

Xilinx远程固件升级(二)——STARTUPE2原语的使用

通过&#xff08;一&#xff09;可以看出&#xff0c;对于远程固件升级实际上是通过调用flash不同区域的bit实现&#xff0c;通过golden image和update image共同保障了系统的稳定性。在项目中如果将flash的时钟直接绑定FPGA后进行约束&#xff0c;在综合编译时是无法通过的。这…

优先算法1--双指针

“一念既出&#xff0c;万山无阻。”加油陌生人&#xff01; 目录 1.双指针--移动零 2.双指针-复写零 ok&#xff0c;首先在学习之前&#xff0c;为了方便大家后面的学习&#xff0c;我们这里需要补充一个知识点&#xff0c;我这里所谓的指针&#xff0c;不是之前学习的带有…

【原创】Android Studio 中安装大模型辅助编码插件:通义灵码

在 Android Studio 中内置了 Ginimi 预览版&#xff0c;但需要“加速器”才可使用。 在国内有平替的软件同样可以使用&#xff0c;比如 阿里的通义灵码&#xff0c;智谱的CodeGeeX等&#xff0c;从功能和使用上来说都是大同小异。 这里我们以通义灵码为例来讲解其安装和使用 通…

《机器学习与数据挖掘综合实践》实训课程教学解决方案

一、引言 随着信息技术的飞速发展&#xff0c;人工智能已成为推动社会进步的重要力量。作为人工智能的核心技术之一&#xff0c;机器学习与数据挖掘在各行各业的应用日益广泛。本方案旨在通过系统的理论教学、丰富的实践案例和先进的实训平台&#xff0c;帮助学生掌握机器学习…

selenium-Alert类用于操作提示框/确认弹框(4)

之前文章我们提到&#xff0c;在webdriver.WebDriver类有一个switch_to方法&#xff0c;通过switch_to.alert()可以返回Alert对象&#xff0c;而Alert对象主要用于网页中弹出的提示框/确认框/文本输入框的确认或者取消等动作。 Alert介绍 当在页面定位到提示框/确认框/文本录入…

Vulnhub靶场案例渗透[7]- DC7

文章目录 1. 靶场搭建2. 信息收集2.1 确定靶机ip2.2 服务信息收集2.3 社工信息收集 3. 提权 1. 靶场搭建 靶场源地址 检验下载文件的检验码&#xff0c;对比没问题使用vmware打开 # windwos 命令 Get-FileHash <filePath> -Algorithm MD5 # linux md5sum filepath2. 信…