SpringCloudStream整合MQ

目录

概念

快速搭建SCS环境

一秒切换MQ

组件

1. Binder

2. Binding

3. Message

分组消费


概念

        Spring Cloud Stream(SCS) 的主要目标是一套代码,兼容所有MQ, 降低MQ的学习成本,提供一致性的编程模型,让开发者能更容易地集成/切换消息组件(如 Apache Kafka、RabbitMQ、RocketMQ)

官网地址:Spring Cloud Stream

快速搭建SCS环境

1. 引入pom依赖

    <dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
			<version>2021.1</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.rocketmq</groupId>
					<artifactId>rocketmq-acl</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.apache.rocketmq</groupId>
					<artifactId>rocketmq-client</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>4.9.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-acl</artifactId>
			<version>4.9.1</version>
		</dependency>
	</dependencies>

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-dependencies</artifactId>
				<version>2.3.4.RELEASE</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Hoxton.SR6</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

2. 配置文件application.properties

# mq地址
spring.cloud.stream.rocketmq.binder.name-server=192.168.6.128:9876

spring.cloud.stream.bindings.output.destination=scstreamExchange
spring.cloud.stream.bindings.input.destination=scstreamExchange
spring.cloud.stream.bindings.input.group=stream
spring.cloud.stream.bindings.input.content-type=text/plain

3. 生产者和消费者代码

@RestController
public class SendMessageController {

    @Autowired
    private Source source;

    @GetMapping("/send")
    public Object send(String message) {
        MessageBuilder<String> messageBuilder =
                MessageBuilder.withPayload(message);
        source.output().send(messageBuilder.build());
        return "message sended : "+message;
    }
}
@Component
public class MessageConsumer {

    @StreamListener(Sink.INPUT)
    public void process(Object message) {
        System.out.println("received message : " + message);
    }
}

4. 验证生产消息,消费消息

 

一秒切换MQ

修改pom文件, 改成目标MQ依赖

        <!--Kafka依赖-->
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka</artifactId>
		</dependency>
		
		<!--RocketMq依赖-->
		<!--<dependency>
			<groupId>com.alibaba.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
			<version>2021.1</version>
			<exclusions>
				<exclusion>
					<groupId>org.apache.rocketmq</groupId>
					<artifactId>rocketmq-acl</artifactId>
				</exclusion>
				<exclusion>
					<groupId>org.apache.rocketmq</groupId>
					<artifactId>rocketmq-client</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-client</artifactId>
			<version>4.9.1</version>
		</dependency>
		<dependency>
			<groupId>org.apache.rocketmq</groupId>
			<artifactId>rocketmq-acl</artifactId>
			<version>4.9.1</version>
		</dependency>-->
		
		<!--RabbitMQ依赖-->
		<!--<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
		</dependency>-->

组件

1. Binder

        SCS通过Binder定义一个外部消息服务器。默认情况下,SCS会使用对应的 SpringBoot插件来构建Binder。

例如RabbitMQ默认值配置

spring.rabbitmq.host=local
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

        在SCS中,支持配置多个Binder访问不同的外部消息服务器。这些配置是通过spring.cloud.stream.binders. [bindername].environment.[props]=[value]的方式进行配置。另外,如果配置了多个Binder,也可以通过spring.cloud.stream.default-binder属性指定默认的 Binder。

spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.host=loca
lhost
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.username=
guest
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.password=
guest
spring.cloud.stream.binders.testbinder.environment.spring.rabbitmq.virtualhost=/
# 指定默认binder
spring.cloud.stream.default-binder=testbinder

 

2. Binding

        Binding是SCS中实际进行消息交互的桥梁。在SCS中,通过Binding和 Binder建立绑定关系,客户端就通过Binding实现的消息收发。 在SCS框架中,配置Binding首先对他进行声明。声明Binding的方式,是在启动类中引入@EnableBinding注解。应用会向Spring容器中注入一个Binding接口的实例对象。在SCS中,默认提供了 Source、Sink、Process三个接口对象,分别代表消息的生产者、消费者和中间处理者。

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

public interface Processor extends Source, Sink {
}

 binding配置项

# 队列名
spring.cloud.stream.bindings.output.destination=scstreamExchange
# 指定binder。
spring.cloud.stream.bindings.output.binder=testbinder

spring.cloud.stream.bindings.input.destination=scstreamExchange
# 消费群组
spring.cloud.stream.bindings.input.group=stream
spring.cloud.stream.bindings.input.content-type=text/plain
# 指定binder。
spring.cloud.stream.bindings.input.binder=testbinder

# 最大重试次数
spring.cloud.stream.bindings.input.consumer.max-attempts=3

3. Message

 在不同的MQ产品中,对于消息的定义其实也是不相同,SCS框架就需要对这些消息类型进行统一。消息结构包括请求头和消息体。

public interface Message<T> {
    T getPayload();

    MessageHeaders getHeaders();
}

         Payload就是消息体,在SCS中定义成了一个泛型,可以直接传递对象。MessageHeaders是消息的头部属性,也可以说是消息的补充属性。不同的MQ产品下,就可以通过不同的MessageHeaders属性来代表各自的消息差异,消息内容可以通过Payload统一。

        例如,RabbitMQ中有一个非常重要的概念routingKey。通过routingKey可以定制Exchange与Queue之间的路由关系。这个routingKey就可以通过在Headers当中指定一个routingkey属性来实现。

MessageBuilder<String> messageBuilder =
MessageBuilder.withPayload(message).setHeader("routingkey","info");

分组消费

分组消费机制:是在生产者实例和消费者实例之间建立一种对应关系,生产者实例发出的消息只会被对应的消费者消费

1. 设置分区规则, 提前设置好分区ID, 用ID匹配

# 生产者配置
spring.cloud.stream.bindings.output.destination=scstreamExchange
# 指定参与消息分区的消费端节点数量
spring.cloud.stream.bindings.output.producer.partition-count=2
# 只有消费端分区ID为1的消费端能接收到消息
spring.cloud.stream.bindings.output.producer.partition-key-expression=1

# 消费者配置
spring.cloud.stream.bindings.input.destination=scstreamExchange
spring.cloud.stream.bindings.input.group=myinput
# 启动消费分区 新版本这个属性已经取消,改为由分区表达式自动判断
spring.cloud.stream.bindings.input.consumer.partitioned=true
# 参与分区的消费端节点个数
spring.cloud.stream.bindings.input.consumer.instance-count=2
# 设置该实例的消费端分区ID
spring.cloud.stream.bindings.input.consumer.instance-index=1

 2. 设置分区规则2, 根据请求头属性匹配

分区提取器

// 增加分区提取器-提取匹配键值
public class MyPartitionKeyExtractor implements PartitionKeyExtractorStrategy {
    public static final String PARTITION_PROP="partition";
    @Override
    public Object extractKey(Message<?> message) {
        return message.getHeaders().get(MyPartitionKeyExtractor.PARTITION_PROP);
    }
}

分区匹配器

public class MyPartitionSelectorStrategy implements PartitionSelectorStrategy {
    @Override
    public int selectPartition(Object key, int partitionCount) {
        return Integer.parseInt(key.toString()) % partitionCount;
    }
}

分区配置文件

# 添加生产者的分区配置
spring.cloud.stream.bindings.output.destination=scstreamExchange
spring.cloud.stream.bindings.output.binder=testbinder

# 指定参与消息分区的消费端节点数量
spring.cloud.stream.bindings.output.producer.partition-count=2

#只有消费端分区ID为1的消费端能接收到消息
spring.cloud.stream.bindings.output.producer.partition-key-expression=1

# 动态生成分区键
spring.cloud.stream.bindings.output.producer.partition-key-extractorname=myPartitionKeyExtractor
spring.cloud.stream.bindings.output.producer.partition-selectorname=myPartitionSelector

发送消息

    @GetMapping("/send2")
    public Object send2(String message) {
        // 发送4条消息, 设置请求头0 1 2 3
        for (int i = 0; i < 4; i++) {
            MessageBuilder<String> messageBuilder = MessageBuilder.withPayload(message)
                    .setHeader(MyPartitionKeyExtractor.PARTITION_PROP, i);
            source.output().send(messageBuilder.build());
        }
        return "message sended : "+message;
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/359072.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

数据可视化练习

文章目录 试题示例 试题示例 绘制下图所示的表格 根据下表的数据&#xff0c;将班级名称一列作为x轴的刻度标签&#xff0c;将男生和女生两列的数据作为刻度标签对应的数值&#xff0c;使用bar()函数绘制下图所示的柱形图。 方式一 import numpy as np import matplotlib.p…

web自动化搞定文件上传

&#x1f525; 交流讨论&#xff1a;欢迎加入我们一起学习&#xff01; &#x1f525; 资源分享&#xff1a;耗时200小时精选的「软件测试」资料包 &#x1f525; 教程推荐&#xff1a;火遍全网的《软件测试》教程 &#x1f4e2;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1…

Spring 学习1

1、什么是Spring Spring 是一款主流的 Java EE 轻量级开源框架 &#xff0c;Spring 由“Spring 之父”Rod Johnson 提出并创立&#xff0c;其目的是用于简化 Java 企业级应用的开发难度和开发周期。Spring的用途不仅限于服务器端的开发。从简单性、可测试性和松耦合的角度而言…

AI算力专题:算力系列之四-各省算力规划建设梳理-绿色低碳高质量发展-部署算力建设AI产业研究

今天分享的是AI算力系列深度研究报告&#xff1a;《AI算力专题&#xff1a;算力系列之四-各省算力规划建设梳理-绿色低碳高质量发展-部署算力建设AI产业研究》。 &#xff08;报告出品方&#xff1a;中泰证券&#xff09; 报告共计&#xff1a;40页 数据中心能耗情况 随着越…

java的面向对象编程(oop)——认识接口

前言&#xff1a; 打好基础&#xff0c;daydayup! 接口 接口概述 java提供一个关键字interface&#xff0c;用这个关键字可以定义出特殊结构&#xff1a;接口 接口格式&#xff1a; public interface 接口名{//成员变量&#xff08;常量&#xff09;//成员方法&#xff08;抽…

Dragons

题目链接&#xff1a; Problem - 230A - Codeforces 解题思路&#xff1a; 用结构体排序就好&#xff0c;从最小的开始比较&#xff0c;大于就加上奖励&#xff0c;小于输出NO 下面是c代码&#xff1a; #include<iostream> #include<algorithm> using namespac…

【2024.1.30练习】李白打酒加强版(25分)

题目描述 题目思路 在最多数据的情况下&#xff0c;有100个店100朵花&#xff0c;总情况为的天文数字&#xff0c;暴力枚举已经不可能实现&#xff0c;考虑使用动态规划解决问题。最后遇到的一定是花&#xff0c;所以思路更倾向于倒推。 建立二维数组&#xff0c;容易联想到为…

软件个性化选型:制造企业如何选择适合自身的工单管理系统-亿发

企业制造业是实体经济中非常重要和基础的组成部分&#xff0c;直接关系到国家经济的血脉。然而&#xff0c;传统制造业在生产与管理上所采用的老一套方法和经验已不再适应当下的发展需求。信息化、数字化和智能化被视为制造企业的必然趋势。要想在竞争激烈的市场中永立潮头&…

都2024年了,谁还在逛良品铺子?

作者 | 辰纹 来源 | 洞见新研社 2019年年初&#xff0c;良品铺子举办了一场高端零食战略发布会&#xff0c;当时还花重金请来顶流明星为品牌代言&#xff0c;在强化“高端零食”定位的同时&#xff0c;良品铺子坚定的表示&#xff0c;要“抛弃价格战”。 时任良品铺子董事长杨…

24小时涨粉10w+的AI小游戏-哄哄模拟器

近年来&#xff0c;随着chatGPT的爆火&#xff0c;一系列的AI应用应运而生。比如&#xff1a;AI绘画&#xff0c;AI写作等。今天我们来看看最近很火的一个AI小游戏-哄哄模拟器。 1. 试玩体验 这款游戏名叫“哄哄模拟器”&#xff0c;体验地址为&#xff1a;https://hong.grea…

RGMII接口介绍

RGMII接口概述 RGMII全称为Reduced Gigabit Media Independent Interface&#xff0c;是一种网络接口标准&#xff0c;用于千兆以太网芯片与PHY芯片之间的接口标准。RGMII接口的设计目的是为了减少I/O的数量&#xff0c;尽可能减小网卡PCB占用面积&#xff0c;同时提高数据传输…

Nacos服务注册源码:客户端

入口 我们就拿nacos自己example下的NamingExample来做测试 public class NamingExample {public static void main(String[] args) throws NacosException, InterruptedException {Properties properties new Properties();properties.setProperty("serverAddr", …

如何在DBeaver中重命名数据库

前言 DBeaver是一款强大的开源通用数据库管理和开发工具&#xff0c;支持多种数据库类型。在某些数据库系统中&#xff0c;你可以直接通过DBeaver的图形界面来重命名数据库名称。本文将详细介绍如何在DBeaver中进行数据库重命名操作。 重要提示&#xff1a; 对于不同的数据库…

Leetcode—2396. 严格回文的数字【中等】

2024每日刷题&#xff08;一零六&#xff09; Leetcode—2396. 严格回文的数字 算法思想 实现代码 class Solution { public:bool isStrictlyPalindromic(int n) {return false;} };运行结果 之后我会持续更新&#xff0c;如果喜欢我的文章&#xff0c;请记得一键三连哦&…

vue2 国际化的使用,自动翻译文件,自动生成国际化文件

vue2 国际化的使用&#xff0c;自动翻译文件&#xff0c;自动生成国际化文件 npm i vue-i18n6 文件代码 // zh.js 用来写全局通用的国际化 export default {home:"首页" }//en.js 用来写全局通用的国际化 export default {home:"home page" }//kor.js …

窗口函数rows between 、range between的区分

【移动窗口】 移动窗口&#xff0c;顾名思义&#xff0c;“窗口”&#xff08;也就是操作数据的范围&#xff09;不是固定的&#xff0c;而是随着设定条件逐行移动的。 在over后面的子句中&#xff0c;使用rows加“范围关键字”可以设置移动窗口&#xff0c;语法如下&#xf…

ESP32 看门狗:保障系统稳定运行的重要机制

ESP32 看门狗&#xff1a;保障系统稳定运行的重要机制 导言&#xff1a; 在嵌入式系统开发中&#xff0c;系统稳定性是至关重要的。为了应对系统出现异常情况或者死锁等问题&#xff0c;ESP32提供了看门狗&#xff08;Watchdog&#xff09;机制。本文将深入探讨ESP32看门狗的工…

内网穿透的应用-如何搭建FastDFS文件服务器并实现无公网ip访问本地文件服务

文章目录 前言1. 本地搭建FastDFS文件系统1.1 环境安装1.2 安装libfastcommon1.3 安装FastDFS1.4 配置Tracker1.5 配置Storage1.6 测试上传下载1.7 与Nginx整合1.8 安装Nginx1.9 配置Nginx 2. 局域网测试访问FastDFS3. 安装cpolar内网穿透4. 配置公网访问地址5. 固定公网地址5.…

Docker的优化和私有容器的部署管理

1 Docker的优化和配置调整 1.1 如何缩小镜像的体积大小 1&#xff09;尽可能使用小体积的基础镜像&#xff08;一般推荐使用alpine阿尔卑斯镜像&#xff09; 2&#xff09;尽可能的减少dockfile指令的数量从而来减少镜像的层数 3&#xff09;在RUN指令末尾添加安装软件后清…

【Python笔记-设计模式】抽象工厂模式

一、说明 (一) 解决问题 抽象工厂是一种创建型设计模式&#xff0c;主要解决接口选择的问题。能够创建一系列相关的对象&#xff0c;而无需指定其具体类。 (二) 使用场景 系统中有多于一个的产品族&#xff0c;且这些产品族类的产品需实现同样的接口。 例如&#xff1a;有…