Spring Cloud Stream如何屏蔽不同MQ带来的差异性?

引言

在当前的微服务架构下,使用消息队列(MQ)技术是实现服务解耦和削峰填谷的重要策略。为了保证系统的灵活性和可替换性,我们需要避免对单一开源技术的依赖

市面上有多种消息队列技术,如 Kafka、RocketMQ、RabbitMQ 等。关键在于如何在微服务体系中实现这些MQ组件的无缝切换,以减少代码修改需求。

Spring Cloud Stream 通过其与主流消息中间件的灵活集成,实现了通过仅修改配置文件的方式来切换不同的MQ实现,从而提高了系统的适应性和可维护性。

什么是 Spring Cloud Stream

Spring Cloud Stream 是一个用于构建消息驱动的微服务应用程序的框架。

基于 Spring Boot 构建,用于创建独立的生产级 Spring 应用程序,并使用 Spring Integration 提供与消息代理的连接。它提供了来自多个供应商的中间件的固定配置,引入了持久发布-订阅语义、消费者组和分区的概念。

简单来说 Spring Cloud Stream 是对 Spring Integration 和 Spring Boot 的合并。

图一

图一

主要概念:

1. application model(应用模型)

图二.Spring Cloud Stream 应用程序

图二.Spring Cloud Stream 应用程序

由中间件提供的 Binder 来处理绑定。 应用程序通过绑定这个 Binder 与其建立联系,发送消息时应用程序通过 outputs 通道将消息传递给 BinderBinder 再把消息给消息中间件。接收消息时消息中间件将消息传递给 BinderBinder 再把消息通过 inputs 通道传递给应用程序。

比如 Kafka Binder 依赖如下图:

图三 spring cloud stream kafka依赖

图三 spring cloud stream kafka依赖

2. The Binder Abstraction(Binder抽象)

Binder 抽象使 Spring Cloud Stream 应用程序能够灵活地连接中间件。

Spring Cloud Stream 为 Kafka 和 RabbitMQ 提供了 Binder 实现。 RocketMQ Binder 已由 Spring Cloud Alibaba 实现。

Binder 抽象也是该框架的扩展点之一,我们可以在 Spring Cloud Stream 之上实现自定义 Binder。

3. Programming Model(编程模型)

核心概念

  • Destination Binders(目标绑定器):负责提供与外部消息传递系统集成的组件。

  • Bindings(绑定):外部消息系统和应用程序之间的桥梁,提供消息的生产者和消费者(由目标绑定器创建)。

  • Message(消息):生产者和消费者用于与目标绑定器(以及通过外部消息系统与其他应用程序)通信的规范数据结构。

图四

图四

环境搭建

本文环境:

  • Java:17

  • Spring Boot:3.0.2

  • Spring Cloud:2022.0.2

  • Spring Cloud Alibaba:2022.0.0.0

maven依赖配置

pom.xml依赖如下:

消息驱动jar,用哪个mq引入哪个即可。

<dependencies>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <version>${spring-boot.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-alibaba-dependencies</artifactId>
            <version>${spring-cloud-alibaba.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>${spring-cloud.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
  </dependencies>
</dependencyManagement>

配置文件

application.yml RocketMq 配置信息:

spring:
  cloud:
    stream:
      stream:
        rocketmq:
          binder:
            name-server: 127.0.0.1:9876;127.0.0.1:9877
        function:
          # 组装和绑定
          definition: myTopicC
        binders:
          default:
            type: rocketmq
        bindings:
          ## 生产者 新版本固定格式  函数名-{out/in}-{index}
          demoChannel-out-0:
            destination: boot-mq-topic
          ## 消费者 新版本固定格式  函数名字-{out/in}-{index}
          demoChannel-in-0:
            destination: boot-mq-topic

application.yml Kafka 配置信息:

spring:
  cloud:
    stream:
      stream:
        kafka:
          binder:
            brokers: 127.0.0.1:9092
        function:
          # 组装和绑定
          definition: myTopicC
        binders:
          default:
            type: kafka
        bindings:
          ## 生产者 新版本固定格式  函数名-{out/in}-{index}
          demoChannel-out-0:
            destination: boot-mq-topic
          ## 消费者 新版本固定格式  函数名字-{out/in}-{index}
          demoChannel-in-0:
            destination: boot-mq-topic

消息生产者

创建一个简单的消息生产者:

@RestController
@Slf4j
public class ProducerStream {

    @Autowired
    private StreamBridge streamBridge;

    @GetMapping("/test-stream")
    public String testStream() {
        streamBridge.send("demoChannel-out-0",
                MessageBuilder
                        .withPayload("消息体")
                        .build()
        );
        return "success";
    }
}

消息消费者

创建一个消息消费者来接收消息:

@Slf4j
@Configuration
public class TestStreamConsumer {

    @Bean
    public Consumer<String> demoChannel() {
        return message -> {
             log.info("demoChannel接到消息:{}", message);
        };
    }
}

假如需要从 Kafka 替换成 RocketMq ,只需要修改pom文件和配置文件即可。

在之前的 Spring Cloud Stream 版本中是采用注解的方式来实现绑定,在新版本中是通过函数式编程模型来绑定名称。采用约定大于配置的思想,简化了应用程序配置。

具体可见官方文档:https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_functional_binding_names

Spring Cloud Stream 发送消息流程

图五 spring cloud stream消息流程图

图五 spring cloud stream消息流程图

消息模型

通过图三可以看到 Sping Cloud Stream 的依赖关系。

Sping Cloud Stream -> Spring Integration -> Spring Messaging

可以看出来 Sping Cloud Stream 是基于 Spring Integration 做了一层封装,是依赖于 Spring Integration 这个组件的,而 Spring Integration 则依赖于 Spring Messaging 组件来实现消息处理机制的基础设施。

Spring Integration 是对 Spring Messaging 的扩展,设计目标是系统集成,因此内部提供了大量的集成化端点方便应用程序直接使用。

各个异构系统相互集成时,Spring Integration 通过通道之间的消息传递,让我们可以在消息的入口和出口使用通道适配器和消息网关这两种典型的端点对消息进行同构化处理。

Spring MessagingSpring 框架中的一个底层模块,用于提供统一的消息编程模型。

消息 Message 接口定义:

public interface Message<T> {
  //消息体
  T getPayload();

  //消息头
  MessageHeaders getHeaders();
}

消息通道 MessageChannel 接口定义:

@FunctionalInterface
public interface MessageChannel {
 long INDEFINITE_TIMEOUT = -1;
    
    //发送消息,无限期阻塞
 default boolean send(Message<?> message) {
  return send(message, INDEFINITE_TIMEOUT);
 }

    //发送消息,阻塞直到到达指定超时时间
 boolean send(Message<?> message, long timeout);
}

消息通道 MessageChannel 接收消息,调用send()方法将消息发送至该消息通道。

消息通道可简单理解为对队列的一种抽象。通道的名称对应队列的名称。

Spring message 把通道抽象成两种基本表现形式

  • 支持轮询的 PollableChannel

  • 实现发布-订阅模式的 SubscribableChannel

这两个通道都继承自具有消息发送功能的 MessageChannel

public interface SubscribableChannel extends MessageChannel {
 //通过注册回调函数MessageHandler来实现事件响应
    //注册消息处理器
 boolean subscribe(MessageHandler handler);

 //取消注册消息处理器
 boolean unsubscribe(MessageHandler handler);
}
public interface PollableChannel extends MessageChannel {
 //通过轮询操作主动获取消息
 //从通道中接收消息
 @Nullable
 Message<?> receive();

    //指定超时时间,从通道中接收消息
 @Nullable
 Message<?> receive(long timeout);
}

MessageHandler接口定义:

@FunctionalInterface
public interface MessageHandler {
 //处理消息方法
 void handleMessage(Message<?> message) throws MessagingException;
}

再回到图五流程图中,我们最终可以看到 KafkaRocketMQ 通过继承 AbstractMessageHandler 抽象类( AbstractMessageHandler 抽象类是实现了 MessageHandler 接口)来实现不同中间件的消息发送操作。而这些都是封装在各自中间件对应的 Binder 代码中来实现。

结论

回到我们的主题,Spring Cloud Stream 如何屏蔽不同 MQ 带来的差异性?

  • 统一的编程模型:发送和接收代码一致,开发者专注于业务逻辑即可。不用管底层消息中间件的实现。

  • Binder 抽象:封装与消息队列的交互逻辑,每种队列有自己的 Binder 实现。

  • 自动配置和约定优于配置:采用约定大于配置的思想,极少的改动配置文件实现消息队列的切换,而代码不用变动。

  • 高级特性的抽象:如分区、消息分组、持久性订阅等高级特性,Spring Cloud Stream 提供了抽象层,由不同的消息队列去实现。

参考资料

  • 官方文档:Spring Cloud Stream Reference Guide

  • 《Spring核心技术和案例实战》

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/202751.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

从零构建属于自己的GPT系列1:文本数据预处理、文本数据tokenizer、逐行代码解读

&#x1f6a9;&#x1f6a9;&#x1f6a9;Hugging Face 实战系列 总目录 有任何问题欢迎在下面留言 本篇文章的代码运行界面均在PyCharm中进行 本篇文章配套的代码资源已经上传 从零构建属于自己的GPT系列1&#xff1a;文本数据预处理 从零构建属于自己的GPT系列2&#xff1a;语…

【精选】Spring整合MyBatis,Junit 及Spring 事务Spring AOP面向切面详解

Spring整合MyBatis 搭建环境 我们知道使用MyBatis时需要写大量创建SqlSessionFactoryBuilder、SqlSessionFactory、SqlSession等对象的代码&#xff0c;而Spring的作用是帮助我们创建和管理对象&#xff0c;所以我们可以使用Spring整合MyBatis&#xff0c;简化MyBatis开发。 …

【Web端CAD/CAE文字标注】webgl+canvas 2d实现文字标注功能

一、需求背景 在CAD/CAE领域经常会遇到显示节点编号这种需求&#xff0c;效果如下图&#xff1a; 本文介绍如何在WebGL中实现文字的显示&#xff0c;对于如何在OpenGL中实现请绕路。 二、实现原理 Canvas是HTML5提供的元素&#xff0c;用于在网页上绘制图形&#xff0c;其支…

elasticsearch DSL语句

目录 一、DSL查询文档1.1 DSL查询分类1.2 全文检索查询1.3 精确查询1.4 地理坐标查询1.5 复合查询1.5.1 相关性算分1.5.2 算分函数查询1.5.3 布尔查询 二、搜索结果处理2.1 排序2.2 分页2.3 高亮2.4 总结 三、RestClient查询文档3.1 查询所有3.2 match查询3.3 精确查询3.4 布尔…

PyMuPDF---Python处理PDF的宝藏库详解

1、PyMuPDF简介 1.1 介绍 在介绍PyMuPDF之前&#xff0c;先来了解一下MuPDF&#xff0c;从命名形式中就可以看出&#xff0c;PyMuPDF是MuPDF的Python接口形式。 MuPDF MuPDF 是一个轻量级的 PDF、XPS和电子书查看器。MuPDF 由软件库、命令行工具和各种平台的查看器组成。 …

C语言进阶之笔试题详解(2)

前言 这里的内容包括二维数组笔试题和指针笔试题&#xff0c;供给读者对这部分知识进行加深和巩固。 ✨ 猪巴戒&#xff1a;个人主页✨ 所属专栏&#xff1a;《C语言进阶》 &#x1f388;跟着猪巴戒&#xff0c;一起学习C语言&#x1f388; 目录 前言 笔试题 二维数组 题目…

借助文档控件Aspose.Words,在 C# 中比较两个 PDF 文件

在当今的数字世界中&#xff0c;管理和比较文档是一项至关重要的任务&#xff0c;尤其是在商业和法律领域。在 C# 中处理 PDF 文档时&#xff0c;Aspose.Words for .NET 提供了用于比较 PDF 文档的强大解决方案。在这篇博文中&#xff0c;我们将探讨如何在 C# 应用程序中比较 P…

MySQL进阶-读写分离

✨作者&#xff1a;猫十二懿 ❤️‍&#x1f525;账号&#xff1a;CSDN 、掘金 、语雀 、Github &#x1f389;公众号&#xff1a;猫十二懿 一、MySQL 读写分离介绍 读写分离,简单地说是把对数据库的读和写操作分开&#xff0c;以对应不同的数据库服务器。主数据库提供写操作&…

从零开始的c语言日记day38——数组参数,指针参数

一维数组传参 要把数组或者指针传给函数&#xff0c;那函数参数如何设计&#xff1f; 上面各写法有问题嘛&#xff1f; 第一个没问题 第二个没问题 第三个没问题 第四个没问题 第五个解析&#xff1a;定义int*arr2[20]为20个int*类型的数组&#xff0c;test2之后用的是ar…

Kubernetes(K8s)资源管理-03

资源管理 资源管理介绍 在kubernetes中&#xff0c;所有的内容都抽象为资源&#xff0c;用户需要通过操作资源来管理kubernetes。 kubernetes的本质上就是一个集群系统&#xff0c;用户可以在集群中部署各种服务&#xff0c;所谓的部署服务&#xff0c;其实就是在kubernetes集…

NoSQL大数据存储技术思考题及参考答案

思考题及参考答案 第1章 绪论 1. NoSQL和关系型数据库在设计目标上有何主要区别&#xff1f; (1)关系数据库 优势&#xff1a;以完善的关系代数理论作为基础&#xff0c;具有数据模型、完整性约束和事务的强一致性等特点&#xff0c;借助索引机制可以实现高效的查询&#xf…

Clickhouse Join

ClickHouse中的Hash Join, Parallel Hash Join, Grace Hash Join https://www.cnblogs.com/abclife/p/17579883.html 总结 本文描述并比较了ClickHouse中基于内存哈希表的3种连接算法。 哈希连接算法速度快&#xff0c;是最通用的算法&#xff0c;支持所有连接类型和严格性设…

TCP/IP封装

数据如何通过网络发送&#xff1f;为什么 OSI 模型需要这么多层&#xff1f; 下图显示了数据在网络传输时如何封装和解封装。 步骤1&#xff1a;当设备A通过HTTP协议通过网络向设备B发送数据时&#xff0c;首先在应用层添加HTTP头。 步骤2&#xff1a;然后将TCP或UDP标头添加…

Hadoop入门学习笔记

视频课程地址&#xff1a;https://www.bilibili.com/video/BV1WY4y197g7 课程资料链接&#xff1a;https://pan.baidu.com/s/15KpnWeKpvExpKmOC8xjmtQ?pwd5ay8 这里写目录标题 一、VMware准备Linux虚拟机1.1. VMware安装Linux虚拟机1.1.1. 修改虚拟机子网IP和网关1.1.2. 安装…

Modbus TCP工业RFID读写器的选型要点

Modbus TCP工业RFID读写器是一种采用Modbus TCP通信协议的RFID读写器。它可以通过TCP/IP网络与计算机或其它设备进行通信&#xff0c;实现远程读取和写入RFID标签数据的目的。 与传统的RFID读写器相比&#xff0c;Modbus TCP工业RFID读写器具有更远的读写距离、更高的读写灵敏度…

uniapp如何与原生应用进行混合开发?

目录 前言 1.集成Uniapp 2.与原生应用进行通信 3.实现原生功能 4.使用原生UI组件 结论: 前言 随着移动应用市场的不断发展&#xff0c;使用原生开发的应用已经不能满足用户的需求&#xff0c;而混合开发成为了越来越流行的选择。其中&#xff0c;Uniapp作为一种跨平台的开…

系统设计概念:生产 Web 应用的架构

在你使用的每个完美应用程序背后&#xff0c;都有一整套的架构、测试、监控和安全措施。今天&#xff0c;让我们来看看一个生产就绪应用程序的非常高层次的架构。 CI/CD 管道 我们的第一个关键领域是持续集成和持续部署——CI/CD 管道。 这确保我们的代码从存储库经过一系列测试…

开发知识点-Maven包管理工具

Maven包管理工具 SpringBootSpringSecuritydubbo图书电商后台实战-环境设置&#xff08;JDK8, STS, Maven, Spring IO, Springboot&#xff09;点餐小程序Java版本的选择和maven仓库的配置视频管理系统&&使用maven-tomcat7插件运行web工程SpringTool suite——maven项目…

promis.all的异步使用

基础 参考 https://blog.csdn.net/qq_52855464/article/details/125376557 简单来说 Promise.all是处理接口返回方法异步的&#xff0c;能够使得接口的获取顺序得到控制&#xff0c;不会出现数据为空的情况 使用 先执行jianshigetGroups->groupIds-> const promises2 …

RNN-T Training,RNN-T模型训练详解——语音信号处理学习(三)(选修三)

参考文献&#xff1a; Speech Recognition (option) - RNN-T Training哔哩哔哩bilibili 2020 年 3月 新番 李宏毅 人类语言处理 独家笔记 Alignment Train - 8 - 知乎 (zhihu.com) 本次省略所有引用论文 目录 一、如何将 Alignment 概率加和 对齐方式概率如何计算 概率加和计…