Springboot整合阿里云ONS RocketMq(4.0 http)

1. 引入依赖

<!--阿里云ons,方便的接入到云服务-->
<dependency>
  <groupId>com.aliyun.openservices</groupId>
  <artifactId>ons-client</artifactId>
  <version>1.8.4.Final</version>
</dependency>

2. 配置

配置注意事项:

  1. nameSrvAddr我这里是用的4.0版本的支持http,5.0不支持http
    image.png
  2. 一个 Group ID 代表一个 Consumer 实例群组。同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致,并且也必须保证订阅 Topic 时设置的过滤规则(Tag)一致。否则您的消息可能会丢失。
  3. 订阅关系参考官方文档: 订阅关系一致
  4. 此处我配置了多个GroupId,Tag,Topic(order,market,vehicle)如果不需要配置一个即可,对应基本配置类需要增减对应属性
aliyun:
  rocketmq:
    accessKey: LTAI5txxxxxxx
    secretKey: Afq06tBxrdBxxxxxxxx
    nameSrvAddr: http://MQ_INST_xxxxxxxxxx_BYkZuJCq.cn-beijing.mq.aliyuncs.com:80
    orderGroupId: GID_xxxxxx_test
    orderTag: 'order'
    orderTopic: vehicle-order-test
    marketGroupId: GID_xxxxxx2_test
    marketTag: 'market'
    marketTopic: vehicle-market-test
    vehicleGroupId: GID_xxxxxx3_test
    vehicleTag: 'vehicle'
    vehicleTopic: vehicle-order-test

3. 配置类

3.1 基本配置类

package com.vehicle.manager.core.config;

import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

/**
 * Rocket MQ 配置类
 * @author zr 2024/3/1
 */
@Configuration
@ConfigurationProperties(prefix = "aliyun.rocketmq")
@Data
public class RocketMqConfig {

    private String accessKey;
    private String secretKey;
    private String nameSrvAddr;
    private String marketGroupId;
    private String marketTopic;
    private String marketTag;
    private String orderTopic;
    private String orderGroupId;
    private String orderTag;
    private String vehicleTopic;
    private String vehicleGroupId;
    private String vehicleTag;

    public Properties getMqPropertie() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        return properties;
    }
}

3.2 生产者配置

package com.vehicle.manager.core.config;

import com.aliyun.openservices.ons.api.bean.ProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author zr 2024/3/1
 */
@Configuration
public class ProducerConfig {
    @Autowired
    private RocketMqConfig mqConfig;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean buildProducer() {
        ProducerBean producer = new ProducerBean();
        producer.setProperties(mqConfig.getMqPropertie());
        return producer;
    }
}

3.3 消费者配置

package com.vehicle.manager.core.config;

import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.vehicle.manager.core.listener.VehicleListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * RocketMq消费者
 * @author zr 2024/3/1
 */
@Configuration
public class VehicleConsumerConfig {
    @Autowired
    private RocketMqConfig mqConfig;

    @Autowired
    private VehicleListener vehicleListener;


    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildVehicleBuyerConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        //配置文件
        Properties properties = mqConfig.getMqPropertie();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getVehicleGroupId());
        //将消费者线程数固定为20个 20为默认值
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
        consumerBean.setProperties(properties);
        //订阅关系
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
        Subscription subscription = new Subscription();
        subscription.setTopic(mqConfig.getVehicleTopic());
        subscription.setExpression(mqConfig.getVehicleTag());
        subscriptionTable.put(subscription, vehicleListener);
        //订阅多个topic如上面设置

        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }
}

4. 生产者工具类

  • MessageRecord为记录消息发送的对象,可以自行根据字段进行设计调整
  • 参数说明:
    • topic – 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-“和下划线”_"构成.
    • tag – 消息标签, 请使用合法标识符, 尽量简短且见名知意
    • key – 业务主键
    • body – 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
package com.vehicle.manager.core.util;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.vehicle.manager.core.config.RocketMqConfig;
import com.vehicle.manager.core.mapper.MessageRecordMapper;
import com.vehicle.manager.core.model.entity.MessageRecord;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.time.LocalDateTime;

/**
 * RocketMessageProducer rocketMQ消息生产者
 * @author zr 2024/3/1
 */
@Component
@Slf4j
public class RocketMessageProducer {
    private static ProducerBean producer;
    private static RocketMqConfig mqConfig;

    private  static MessageRecordMapper messageRecordMapper;

    @Autowired
    private  MessageRecordMapper messageRecordMapperInstance;


    @PostConstruct
    public void init() {
        RocketMessageProducer.messageRecordMapper = messageRecordMapperInstance;
    }

    public RocketMessageProducer(ProducerBean producer, RocketMqConfig mqConfig) {
        this.producer = producer;
        this.mqConfig = mqConfig;
    }

    /**
     * 生产车辆服务普通消息
     * @param tag
     * @param key
     * @param body
     */
    public  static void producerVehicleMsg(String tag, String key, String body) {
        Message msg = new Message(mqConfig.getVehicleTopic(), tag, key, body.getBytes());
        long time = System.currentTimeMillis();
        try {
            SendResult sendResult = producer.send(msg);
            assert sendResult != null;
            log.info(time
                    + " Send mq message success.Topic is:" + msg.getTopic()
                    + " Tag is:" + msg.getTag() + " Key is:" + msg.getKey()+" body is:"+new String(msg.getBody())
                    + " msgId is:" + sendResult.getMessageId());

            MessageRecord messageRecord = new MessageRecord();
            messageRecord.setPlatformType("mq");
            messageRecord.setMessageType("order");
            messageRecord.setMqMessageTopic(msg.getTopic());
            messageRecord.setMqMessageTag(msg.getTag());
            messageRecord.setMqMessageKey(msg.getKey());
            messageRecord.setMqMessageId(sendResult.getMessageId());
            messageRecord.setCreatedTime(LocalDateTime.now());
            messageRecord.setMessageContent(new String(msg.getBody()));
            messageRecordMapper.insert(messageRecord);
        } catch (ONSClientException e) {
            e.printStackTrace();
            log.error(time + " Send mq message failed. Topic is:" + msg.getTopic());
        }
    }

    /**
     * 生产车辆服务延时普通消息
     * @param tag  order:订单服务   vehicle:主要用于本服务的超时回应
     * @param key
     * @param body
     * @param delay 延迟秒
     */
    public  static void producerVehicleDelayMsg(String tag, String key, String body,Integer delay) {
        Message msg = new Message(mqConfig.getVehicleTopic(), tag, key, body.getBytes());
        long time = System.currentTimeMillis();
        msg.setStartDeliverTime(time+ delay*1000);
        try {
            SendResult sendResult = producer.send(msg);
            assert sendResult != null;
            log.info(time
                    + " 发送消息成功.Topic is:" + msg.getTopic()
                    + " Tag 为:" + msg.getTag() + " Key 为:" + msg.getKey()+" body 为:"+new String(msg.getBody())
                    + " msgId 为:" + sendResult.getMessageId());
        } catch (ONSClientException e) {
            e.printStackTrace();
            log.error(time + " Send mq message failed. Topic is:" + msg.getTopic());
        }
    }
}

5. 消费者监听

package com.vehicle.manager.core.listener;

import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.vehicle.manager.core.model.dto.req.VehicleMQMessageDTO;
import com.vehicle.manager.core.service.HlCarService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


/**
 * @author zr 2024/3/1
 */
@Component
@Slf4j
public class VehicleListener implements MessageListener {
    @Autowired
    private HlCarService hlCarService;


    @Override
    public Action consume(Message message, ConsumeContext context) {

        log.info("VehicleReceive 消息: " + message);

        try {
            byte[] body = message.getBody();
            String s = new String(body);
            log.info(s);
            // VehicleMQMessageDTO需要自行根据业务封装
            VehicleMQMessageDTO vehicleMQMessageDTO = JSON.parseObject(s, VehicleMQMessageDTO.class);
            log.info(vehicleMQMessageDTO.toString());

            // 以下做你的业务处理
            // .........
            
            return Action.CommitMessage;//进行消息的确认
        } catch (Exception e) {
            log.info(e.getMessage());
            //消费失败
            return Action.ReconsumeLater;
        }
    }
}

6. 测试

6.1 发送消息

package com.vehicle.manager.core;

import com.alibaba.fastjson.JSON;
import com.vehicle.manager.api.StartApplication;
import com.vehicle.manager.core.util.RocketMessageProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;


/**
 * @author zr 2024/3/1
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = StartApplication.class)
public class MqTest {
    
    @Test
    public void producerMsg() {
        RocketMessageProducer.producerVehicleMsg("vehicle","test", JSON.toJSONString(new String("testBody")));
    }
}

6.2 接收消息

image.png

7. 延时消息

如果需要使用延时消息可以参考RocketMessageProducer中有一个延时消息的方法producerVehicleDelayMsg

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/738996.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MyBatis系列之分页插件及问题

概述 无论是C端产品页面&#xff0c;还是后台系统页面&#xff0c;不可能一次性将全部数据加载出来。后台系统一般都是PC端登录&#xff0c;用Table组件&#xff08;如Ant Design Table&#xff09;渲染展示数据&#xff0c;可点击列表的下一页&#xff08;或指定某一页&#…

Spring Boot集成Redisson

文章目录 Spring Boot集成Redisson1. Redisson概述2. Redission作用3. 集成Redission前提&#xff1a;步骤 1: 添加依赖步骤 2: 配置Redisson 4. 结论 Spring Boot集成Redisson 1. Redisson概述 Redisson是一个在Redis基础上实现的Java驻内存数据网格&#xff08;In-Memory D…

C语言入门课程学习笔记9:指针

C语言入门课程学习笔记9 第41课 - 指针&#xff1a;一种特殊的变量实验-指针的使用小结 第42课 - 深入理解指针与地址实验-指针的类型实验实验小结 第43课 - 指针与数组&#xff08;上&#xff09;实验小结 第44课 - 指针与数组&#xff08;下&#xff09;实验实验小结 第45课 …

替代LTC4449高速同步N道沟MOSFET驱动器|具有轨对轨栅极驱动

1. 产品特性 ➢ 15ns 典型传播延迟 ➢ 5ns 高侧/低侧匹配 ➢ 轨至轨栅极驱动 ➢ 自适应死区和直通保护 ➢ 3A 峰值拉电流和 4.5A 峰值灌电流 ➢ 驱动 2 颗 NMOS 组成的半桥 ➢ 欠压保护 ➢ 过热保护 2. 功能描述 PC4449 产品手册 PC4449是一款专为高频率、高效率的应…

教育护眼灯品牌排行有哪些上榜?中国十大教育照明品牌分享

在当前的时代背景下&#xff0c;孩子们的课业负担依然沉重。随着他们年龄的增长&#xff0c;作业量不断增加&#xff0c;对视力的需求也随之上升。加之&#xff0c;现今许多作业需借助电子屏幕完成&#xff0c;孩子们面临视力问题的风险因而愈加提早。家长们逐渐认识到&#xf…

轻量级在线服装3D定制引擎Myway简介

我写的面向web元宇宙轻量级系列引擎中的另外一个&#xff0c;在线3D定制引擎Myway 3D。 用于在线商品定制&#xff0c;比如个性化服装的定制、日常用品&#xff08;如杯子&#xff09;、家装&#xff08;被套&#xff09;等物品的在线定制。 特性列表&#xff1a; 可更换衣服…

什么是中间件?常见中间件有哪些?

中间件是什么 中间件是一种独立的系统软件或服务程序&#xff0c;分布式应用软件借助这种软件在不同的技术之间共享资源。中间件位于客户机/ 服务器的操作系统之上&#xff0c;管理计算机资源和网络通讯。是连接两个独立应用程序或独立系统的软件。相连接的系统&#xff0c;即…

STM32 - LED灯 蜂鸣器

&#x1f6a9; WRITE IN FRONT &#x1f6a9; &#x1f50e; 介绍&#xff1a;"謓泽"正在路上朝着"攻城狮"方向"前进四" &#x1f50e;&#x1f3c5; 荣誉&#xff1a;2021|2022年度博客之星物联网与嵌入式开发TOP5|TOP4、2021|2222年获评…

adb 查看哪些应用是双开的

adb shell pm list users 得到 这 里有 user 0 ,11,999 其中0是系统默认的&#xff0c;11是平行空间的&#xff0c;999是双开用户 pm list packages --user 999 -3 得到了999用户安装第三方应用的包名 pm list packages --user 11 -3 得到了隐私空间用户安装第三方应用的…

智能客服到个人助理,国内AI大模型如何改变我们的生活?

引言 随着人工智能&#xff08;AI&#xff09;技术的高速发展&#xff0c;AI大模型越来越多地出现在我们的日常生活和工作中。国内的AI大模型在过去几年里取得了显著的进展&#xff0c;不少独创的技术点和实际应用令人瞩目。 那么&#xff0c;国内的AI大模型有哪些独创的技术…

[图解]建模相关的基础知识-17

1 00:00:00,190 --> 00:00:09,650 那么1、2、5这个地方&#xff0c;这几个它都需要修改 2 00:00:09,660 --> 00:00:11,410 都要改成资金管理部 3 00:00:13,340 --> 00:00:15,020 那么违反第三范式 4 00:00:15,030 --> 00:00:19,650 是一个比较严重的问题 5 00:…

苹果Mac系统安装adobe软件“无法打开install因为无法验证开发者”解决方法

对于大部分小伙伴&#xff0c;特别是从事视频后期、设计等专业的人来说&#xff0c;Adobe全家桶系列软件&#xff0c;相信都或多或少用过&#xff0c;比如Photoshop、Premiere、illustrator、Lightroom等等。这些软件不仅支持Windows系统&#xff0c;也完美适配于苹果Mac系统&a…

展讯-GPIO操作

1.修改IO配置 以GPIO92为例 IO配置文件在&#xff1a; bsp/kernel/kernel4.14/arch/arm64/boot/dts/sprd/uis8581e5h10.dts bsp/bootloader/u-boot15/board/spreadtrum/uis8581e5h10/pinmap-sp9863a.c 配置 按上述内容&#xff0c;配置IO口 编译之前查看硬件GPIO&#xff0c…

建议收藏!100款宝藏级AIGC工具分享,70款ChatGPT插件惊艳的开发过程与宏大的商业化愿景

建议收藏&#xff01;100款宝藏级AIGC工具分享&#xff0c;70款ChatGPT插件惊艳的开发过程与宏大的商业化愿景。 不输ChatGPT&#xff1f;整理了100款AIGC神器&#xff0c;打工人速进。 说到AIGC工具&#xff0c;你还是只知道ChatGPT&#xff1f; 实际上&#xff0c;越来越多…

校园设施物联网信息化改造

随着物联网技术的发展越来越成熟&#xff0c;它不断地与人们的日常生活和工作深入融合&#xff0c;推动着社会的进步。其中物联网系统集成在高校实践课程中可以应用到许多项目&#xff0c;如环境气象检测、花卉种植信息化监管、水质信息化监管、校园设施物联网信息化改造、停车…

基于YOLOv8m的水族馆动物识别(附数据集和Coovally操作步骤)

本文主要内容:详细介绍了水族馆动物识别的整个过程&#xff0c;从创建数据集到训练模型再到预测结果全部可视化操作与分析。 文末有数据集获取方式&#xff0c;请先看检测效果 现状 随着水族馆行业的快速发展&#xff0c;对动物识别的需求日益增加。水族馆需要准确识别动物种…

爬虫阶段思考

内容&#xff1a;写这篇文章是因为最近帮同学改了很多的爬虫代码&#xff0c;感触良多。 我用豆瓣为例&#xff0c;并不是不会用别的&#xff0c;而是这个我个人感觉最经典。然后还会写我遇到的一些问题以及解决方法。 首先&#xff0c;我们得先知道怎样爬取。我用的scrapy框…

揭秘shopee、Lazada爆单秘诀:自养号补单策略大公开

在东南亚的电商跨境领域&#xff0c;Shopee和Lazada无疑占据了举足轻重的地位&#xff0c;为印地、马来、台湾、菲律宾、新加坡、泰国和越南等地的消费者提供了丰富的在线购物选择。随着电商竞争的日益激烈&#xff0c;许多商家开始探索各种有效的推广策略&#xff0c;其中&…

Python应用开发——30天学习Streamlit Python包进行APP的构建(8)

st.table 显示静态表格。 这与 st.dataframe 的不同之处在于,这里的表格是静态的:其全部内容直接显示在页面上。 Function signature[source]st.table(data=None) Parametersdata (pandas.DataFrame, pandas.Styler, pyarrow.Table, numpy.ndarray, pyspark.sql.DataFrame,…

哪个牌子充电宝好?好用充电宝排行榜!精选充电宝排行榜

在如今这个科技飞速发展的时代&#xff0c;充电宝已然成为我们日常生活中不可或缺的伴侣。无论是出差旅行&#xff0c;还是日常通勤&#xff0c;我们都离不开它为我们的电子设备保驾护航。然而&#xff0c;面对市场上琳琅满目的充电宝品牌&#xff0c;您是否感到眼花缭乱&#…