Spring Cloud Stream整合RocketMQ

Spring Cloud Stream整合RocketMQ

这里书接上回,默认你已经搭建好了RocketMQ主从异步集群,前面文章已经介绍过搭建方法。

1、Spring Cloud Stream介绍

Spring Cloud Stream是一个框架,用于构建与共享消息系统连接的高度可扩展的事件驱动微服务。

官网:https://spring.io/projects/spring-cloud-stream

image-20240601120416055

该框架提供了一个灵活的编程模型,该模型基于已经建立和熟悉的Spring习惯用法和最佳实践,包括对持久pub/sub语义、消费者组和有状态分区的支持。

image-20240601120020593

Spring Cloud Stream的核心构建块是:

  • Destination Binders:负责提供与外部消息传递系统集成的组件。
  • Destination Bindings:外部消息系统和最终用户提供的应用程序代码(生产者/消费者)之间的桥梁。
  • Message:生产者和消费者用来与目标绑定器(以及通过外部消息系统的其他应用程序)进行通信的规范数据结构。

2、生产者

2.1 引入依赖

<dependencies>

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-dependencies</artifactId>
            <version>${spring-cloud-alibaba.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
            <version>2.2.2.RELEASE</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-client</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.rocketmq</groupId>
                    <artifactId>rocketmq-acl</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-acl</artifactId>
            <version>4.7.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

注意,RocketMQ官方维护的Spring-Cloud-Stream依赖中的rocketmq版本为4.4,需要排出后加入4.7.1的依。

2.2 编写配置文件

spring:
  application:
    name: my-spring-cloud-rocketmq-producer
  cloud:
    stream:
      bindings:
        output:
          destination: TopicTest
      rocketmq:
        binder:
          name-server: 192.168.159.34:9876
server:
  port: 8080

2.3 启动类打上注解

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@EnableBinding(Source.class)
@SpringBootApplication
public class MySpringCloudRocketmqProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(MySpringCloudRocketmqProducerApplication.class, args);
    }
}

其中,@EnableBinding(Source.class)指向配置文件的output参数。

2.4 编写生产者程序

import org.apache.rocketmq.common.message.MessageConst;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;

@Component
public class MyProducer {

    @Resource
    private Source source;

    public void sendMessage(String msg){
        //封装消息头
        Map<String,Object> headers=new HashMap<>();
        headers.put(MessageConst.PROPERTY_TAGS,"TagA");
        MessageHeaders messageHeaders=new MessageHeaders(headers);
        //创建消息对象
        Message<String> message = MessageBuilder.createMessage(msg, messageHeaders);
        //发送消息
        source.output().send(message);
    }
}

2.5 编写单元测试发送消息

@SpringBootTest
class MySpringCloudRocketmqProducerApplicationTests {

    @Autowired
    private MyProducer producer;

    @Test
    void contextLoads() {
        producer.sendMessage("hello,spring cloud stream message");
    }

}

3、消费者

3.1 引入依赖

与生产者相同。

3.2 编写配置文件

spring:
  application:
    name: my-spring-cloud-rocketmq-consumer
  cloud:
    stream:
      bindings:
      	# input消费者
        input:
          destination: TopicTest
          group: spring-cloud-stream-consumer-group
      # 配置RocketMQ
      rocketmq:
        binder:
          name-server: 192.168.159.34:9876
server:
  port: 8081

3.3 启动类打上注解

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding(Sink.class)
@SpringBootApplication
public class MySpringCloudRocketmqConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(MySpringCloudRocketmqConsumerApplication.class, args);
    }

}

其中@EnableBinding(Sink.class)指向配置文件的input参数。

3.4 编写消费者程序

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

@Component
public class MyConsumer {

    @StreamListener(Sink.INPUT)
    public void processMessage(String message){
        System.out.println("收到的消息:"+message);
    }
}

先启动消费者,使用单元测试发送消息。

image-20240603111535498

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/703402.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

11. 利用MS为Lammps ReaxFF建模(PE/聚乙烯)基础-2

来源&#xff1a; “码农不会写诗”公众号 链接&#xff1a;利用MS为Lammps ReaxFF建模(PE/聚乙烯)基础-2 文章目录 01 msi2lmp工具简介1. 编译生成msi2lmp可执行文件2. 使用方式 02 赋予模型CVFF力场03 导出car/mdf文件04 生成data文件05 data文件进一步处理 文接上篇 上篇利用…

大模型高级 RAG 检索策略之流程与模块化

我们介绍了很多关于高级 RAG&#xff08;Retrieval Augmented Generation&#xff09;的检索策略&#xff0c;每一种策略就像是机器中的零部件&#xff0c;我们可以通过对这些零部件进行不同的组合&#xff0c;来实现不同的 RAG 功能&#xff0c;从而满足不同的需求。 今天我们…

Android RTSP/RTMP多路播放时动态切换输出View类型(SurfaceView和TextureView 动态切换)

SurfaceView和TextureView的区别和优缺点等, 相关的资料很多. 从Android低延时播放器实现角度来看, 总结了下主要区别有: 1. MediaCodec输出到SurfaceView延时一般比到TextureView更低. 2. MediaCodec用SurfaceView比TextureView占用的资源一般更少些(CPU和内存都小一些, 不过还…

算法专题总结链接地址

刷力扣的时候会遇到一些总结类型的题解&#xff0c;在此记录&#xff0c;方便自己以后找 前缀和 前缀和https://leetcode.cn/problems/unique-substrings-in-wraparound-string/solutions/432752/xi-fa-dai-ni-xue-suan-fa-yi-ci-gao-ding-qian-zhui-/ 单调栈 单调栈https:…

论文解读——《I2EDL: Interactive Instruction Error Detection and Localization》

一、研究背景 视觉与语言导航&#xff08;VLN&#xff09;是一个AI领域的研究任务&#xff0c;旨在开发能够按照自然语言指令在三维空间中导航到指定位置的智能体。这项任务与人类的日常活动——如按照口头指示到达某个地点——十分相似&#xff0c;对于推动人机交互的自然性和…

k8s学习--kubernetes服务自动伸缩之水平伸缩(pod副本伸缩)HPA详细解释与案例应用

文章目录 前言HPA简介简单理解详细解释HPA 的工作原理监控系统负载模式HPA 的优势使用 HPA 的注意事项应用类型 应用环境1.metircs-server部署2.HPA演示示例&#xff08;1&#xff09;部署一个服务&#xff08;2&#xff09;创建HPA对象&#xff08;3&#xff09;执行压测 前言…

汇聚荣科技有限公司实力强吗?

汇聚荣科技有限公司实力强吗?在当今快速发展的科技行业中&#xff0c;公司的实力往往决定了其市场竞争力和发展前景。对于汇聚荣科技有限公司而言&#xff0c;其是否具备强大的实力&#xff0c;不仅关系到自身的发展&#xff0c;也影响着投资者和合作伙伴的选择。因此&#xf…

集成算法实验(Bagging策略)

Bagging模型(随机森林) Bagging&#xff1a;训练多个分类器取平均 f ( x ) 1 / M ∑ m 1 M f m ( x ) f(x)1/M\sum^M_{m1}{f_m(x)} f(x)1/M∑m1M​fm​(x) 全称&#xff1a; bootstrap aggregation&#xff08;说白了就是并行训练一堆分类器&#xff09; 最典型的代表就是随…

[ue5]建模场景学习笔记(6)——必修内容可交互的地形,交互沙(4)

1.需求分析&#xff1a; 现在我们已经有了可以在世界内近于无限的跑动痕迹&#xff0c;现在需要对痕迹进行细化&#xff0c;包括例如当人物跳起时便不再绘制痕迹&#xff0c;以及痕迹应该存在深浅&#xff0c;应该由两只脚分别绘制&#xff0c;同时也应该对地面材质进行进一步处…

国内核心期刊基本情况

对于广大师生来说&#xff0c;发表核心期刊论文是当前阶段绕不开的任务&#xff0c;有的高校晋升副高需要发表核心论文5篇以上&#xff0c;有的学校硕博研究生毕业条件必须是一作发核心。很多人对核心的理解仅停留在“北核、南核”&#xff0c;其他的一概不知。但是我国的核心期…

CG-85C 振弦式土压力计厂家 结构物内部土压力变化量如何测量?

产品概述 振弦式土压力计由背板、感应板、信号传输电缆、振弦及激振电磁线圈等组成&#xff0c;是了解被测结构物内部土压力变化量、并可同步测量埋设点温度的监测设备。 功能特点 ◆精度高&#xff0c;能够提供准确的测量结果。 ◆稳定性好&#xff0c;不易受到外界因素的…

端点物联开发教程之(一)什么是端点物联

目录 一、手机端演示 二、开发套件 三、嵌入式端 四、平台端 五、手机端 本项目的交流QQ群:701889554 物联网实战--入门篇https://blog.csdn.net/ypp240124016/category_12609773.html 物联网实战--驱动篇https://blog.csdn.net/ypp240124016/category_12631333.html 物…

centos7 安装 mysql5.7 LTS

centos7 安装 mysql5.7 LTS 参考&#xff1a; https://blog.csdn.net/EB_NUM/article/details/105425622 可以在运行安装程序之前导入密钥&#xff1a; sudo rpm --import https://repo.mysql.com/RPM-GPG-KEY-mysql-2022第一步、下载MySQL 安装包&#xff1a; sudo wget h…

【QT5】<总览二> QT信号槽、对象树及常用函数

文章目录 前言 一、QT信号与槽 1. 信号槽连接模型 2. 信号槽介绍 3. 自定义信号槽 二、QT的对象树 三、添加资源文件 四、样式表的使用 五、QSS文件的使用 六、常用函数与宏 前言 承接【QT5】&#xff1c;总览一&#xff1e; QT环境搭建、快捷键及编程规范。若存在版…

vs2015+win10编译LAStools

文章目录 下载LasTool安装包编译laslib测试 下载LasTool安装包 不要再GitHub上下载&#xff0c;在官网下载&#xff1a;link 编译laslib 将压缩包解压到对应路径下&#xff0c;注意路径下不要有空格和汉字。用vs打开目录下的 “lastools.dsw” 文件 下面注意几点&#xff1a…

代码随想录算法训练营第36天(py)| 贪心 | 452. 用最少数量的箭引爆气球、435. 无重叠区间、763.划分字母区间

452. 用最少数量的箭引爆气球 力扣链接 有一些球形气球贴在一堵用 XY 平面表示的墙面上。墙面上的气球记录在整数数组 points &#xff0c;其中points[i] [xstart, xend] 表示水平直径在 xstart 和 xend之间的气球。你不知道气球的确切 y 坐标。 一支弓箭可以沿着 x 轴从不同…

python 10个高频率的自动化脚本(干货,速度收藏)

1. 文件操作&#xff1a;自动备份文件 场景&#xff1a;每日自动备份重要文件到指定目录。 import shutilimport datetimedef backup_file(src, dst_folder): now datetime.datetime.now().strftime(%Y%m%d%H%M%S) dst_path f"{dst_folder}/backup_{now}_{src.s…

Qt 实战(4)信号与槽 | 4.1、信号与槽机制

文章目录 一、信号与槽机制1、基本概念2、信号与槽函数连接2.1、connect宏实现信号与槽连接2.2、Qt5新connect函数2.3、使用函数指针2.4、使用lambda表达式2.5、使用Qt Creator添加信号的槽函数 3、结论 前言&#xff1a; Qt信号与槽机制是一种用于处理对象间通信的强大机制&am…

精品KEITHELY6517B参数资料/静电计/高阻计

Keithley 5 位 6517B 静电计/高阻计提供最先进的精度和灵敏度规格&#xff0c;并具有各种功能&#xff0c;可简化高阻和绝缘材料电阻率的测量。Keithley 6517B 的读数速率高达 425 次/秒&#xff0c;可快速、轻松地测量低电平电流。 Keithley 6517B 是更新版本&#xff0c;取代…

Day 18:881. 救生艇

Leetcode 881. 救生艇 给定数组 people 。people[i]表示第 i 个人的体重 &#xff0c;船的数量不限&#xff0c;每艘船可以承载的最大重量为 limit。 每艘船最多可同时载两人&#xff0c;但条件是这些人的重量之和最多为 limit。 返回 承载所有人所需的最小船数 。 这里有一个条…