一文弄懂SpringCloud Stream

目录

    • SpringCloud Stream
    • SpringCloud Stream相关概念
    • SpringCloud Stream使用

SpringCloud Stream

Spring Cloud Stream 是一个构建消息驱动微服务的框架,Spring Cloud Stream 提供了一个抽象层,屏蔽了不同消息中间件之间的差异,使得开发人员可以不再关注具体的消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务。

程序模型
在这里插入图片描述
Spring Cloud Stream的核心与中间件实现无关。Stream应用通过输入输出通道(channel)来与外界交互。通道(channel)通过与外部中间件对应的绑定器(Binder)具体实现,来与外部的中间件产品进行通信。

Spring Cloud Stream提供了对 Kafka, RabbitMQ等中间件的绑定实现。

SpringCloud Stream相关概念

Channel(通道):Channel是消息的传输管道,用于在生产者和消费者之间传递消息。生产者通过输出通道将消息发送到Destination,消费者通过输入通道从Destination接收消息。

在Spring Cloud Stream中,有两种类型的通道:输入(input)和输出(output)。这两种通道分别用于消费者接收消息和生产者发送消息。

  1. Input(输入):Input通道用于消费者从消息代理接收消息。消费者可以通过监听Input通道来实时接收传入的消息。在应用程序中,可以使用@StreamListener注解将方法标记为Input通道的监听器,并在方法参数中指定接收到的消息类型。

  2. Output(输出):Output通道用于生产者向消息代理发送消息。生产者可以通过向Output通道发送消息来发布新的消息。在应用程序中,可以使用@Output注解定义一个Output通道,然后在需要发送消息的方法上使用MessageChannel或OutputStream参数来将消息发送到Output通道。

Destination(目标):Destination是消息的目的地,通常对应于消息代理中的Topic或Queue。生产者将消息发送到特定的Destination,消费者从其中接收消息。

Binder(绑定器):Binder是Spring Cloud Stream的核心组件之一。它作为消息代理与外部消息中间件进行交互,并负责将消息发送到消息总线或从消息总线接收消息。Binder负责处理消息传递、序列化、反序列化、消息路由等底层细节,使得开发者能够以统一的方式与不同的消息中间件进行交互。Spring Cloud Stream提供了多个可用的Binder实现,包括RabbitMQ、Kafka等。

消费者组:在Spring Cloud Stream中,消费组(Consumer Group)是一组具有相同功能的消费者实例。当多个消费者实例属于同一个消费组时,消息代理会将消息均匀地分发给消费者实例,以实现负载均衡。如果其中一个消费者实例失效,消息代理会自动将消息重新分配给其他可用的消费者实例,以实现高可用性。

注:对于一个消息来说,每一个消费者组只会有一个消费者消费消息

分区:Spring Cloud Stream支持在多个消费者实例之间创建分区,这样我们通过某些特征量做消息分发,保证相同标识的消息总是能被同一个消费者处理

SpringCloud Stream使用

1、添加依赖

根据我们使用的中间件来选择我们的依赖,因为我使用的是kafka,所以使用的是spring-cloud-stream-binder-kafka依赖,该依赖会帮我们引入Spring Cloud Stream 和kafka的相关依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
    <version>${binder.version}</version>
</dependency>

如果你使用的是RabbitMQ,那么请使用以下依赖

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
   <version>${binder.version}</version>
</dependency>

2、配置文件配置

下面是kafka的简单配置,关于配置的详细介绍,我会在下一篇文章进行介绍

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: <kafka_broker_address>  # Kafka broker地址
      bindings:
        myInput:
          destination: <input_topic>  # 输入通道对应的Kafka主题名称
        myOutput:
          destination: <output_topic>  # 输出通道对应的Kafka主题名称

3、创建绑定接口类,定义输入和输出通道

/**
 * 定义输出和输入通道
 */
public interface MyProcessor {

    String INPUT = "myInputChannel";
    String OUTPUT = "myOutputChannel";

    /**
     *监听一个通道,通道名为TNPUT的值
     */
    @Input(INPUT)
    SubscribableChannel myInputChannel();

    /**
     * 发送消息到输出通道,通道门为OUTPUT的值
     * @return
     */
    @Output(OUTPUT)
    MessageChannel myOutputChannel();

}

在上述示例中,MyProcessor是一个绑定接口,定义了一个名为myInputChannel的输入通道和一个名为myOutputChannel的输出通道。通过@Input和@Output注解来标识通道的名称。

输入和输出通道的定义我们可以定义在一个接口,也可以输入通道一个接口,输出通道一个接口

4、创建绑定接类

通过@EnableBinding注解将绑定接口绑定到应用程序的逻辑处理器

使用@EnableBinding时,需要指定一个或多个接口类作为参数,这些接口类包含表示可绑定组件(通常是消息通道)的方法。Spring Cloud Stream会自动扫描这些接口类,并根据配置创建相应的消息代理中间件和应用程序之间的连接。

然后我们可以使用@StreamListener注解,指定一个方法作为消息的监听器,当消息到达时会自动调用该方法进行处理。

/**
 * 指定接口来绑定消息通道,多个接口使用逗号分隔
 */
@EnableBinding(MyProcessor.class)
public class MyProcessorHandler {

    /**
     * 处理输入通道的消息
     * @param message
     */
    @StreamListener(INPUT)
    public void handleInputMessage(String message) {
        System.out.println(message);
    }

    /**
     * 处理输出通道的消息
     * @param message
     */
    @StreamListener(OUTPUT)
    public void handleOutputMessage(String message) {
        System.out.println(message);
    }
}

在上述示例中,MyProcessor是一个绑定接口,INPUT和OUTPUT分别是输入和输出通道的名称。handleInputMessage和handleOutputMessage方法用于处理从输入通道和输出通道接收到的消息。

当使用了@EnableBinding注解,Spring Cloud Stream会自动创建与绑定接口中定义的通道相关的Bean,并将其添加到应用程序的上下文中。这些Bean就可以通过自动装配(@Autowired)来进行访问和使用了,下面我们就使用通道来发送消息

5、发送消息

@Component
public class SendUtil {

    @Autowired
    private MyProcessor myProcessor;

    /**
     * 发送消息
     * @param msg
     */
    public void sendMsg(String msg){
        myProcessor.myOutputChannel().send(MessageBuilder.withPayload(msg).build());
    }


}

上述例子是通过myOutputChannel通道来发送消息,因为@EnableBinding注解帮我们生成了MyProcessor 的bean,所以我们可以直接注入使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/304932.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python 日志模块 logging 的最佳实践,内容干练简洁

文章目录 1. 引言2. 定义日志类3. 引用日志4. 参考 1. 引言 每次写 python 代码&#xff0c;想找一个日志模块 logging 的最佳实践&#xff0c;都要找一大圈&#xff0c;确不一定可以找到合适的最佳实践。 痛定思痛&#xff0c;我决定下笔记录目前觉得合适的 python 日志的用…

全志R128 SDK架构与目录结构

R128 S2 是全志提供的一款 M33(ARM)C906(RISCV-64)HIFI5(Xtensa) 三核异构 SoC&#xff0c;同时芯片内部 SIP 有 1M SRAM、8M LSPSRAM、8M HSPSRAM 以及 16M NORFLASH。本文档作为 R128 FreeRTOS SDK 开发指南&#xff0c;旨在帮助软件开发工程师、技术支持工程师快速上手&…

基于uniapp封装的card容器 带左右侧两侧标题内容区域

代码 <template><view class"card"><div class"x_flex_header"><div><title v-if"title ! " class"title" :title"title" :num"num"></title></div><div><s…

x-cmd pkg | magick - 开源图像处理工具

目录 简介首次用户功能特点类似工具与竞品进一步探索 简介 magick 是由 ImageMagick 提供的一个功能强大且多功能的开源图像处理工具&#xff0c;可以灵活高效地处理图像文件&#xff0c;例如格式转换、图像大小调整、图像裁减、图像拼接、图像色彩校正和图像合成等常见的图像…

云化XR技术于农业领域中的表现

随着科技的不断发展和应用的深入&#xff0c;农业领域也在逐渐引入新技术来优化生产效率和成本、改进管理和监控等。云化XR&#xff08;CloudXR&#xff09;作为一种融合了云计算、虚拟现实&#xff08;VR&#xff09;和增强现实&#xff08;AR&#xff09;等技术的解决方案&am…

【教程】代码混淆详解

【教程】代码混淆详解 本文将对代码混淆进行详细解释&#xff0c;并介绍ProGuard代码混淆器以及Ipa Guard工具的使用方法。首先&#xff0c;我们将了解代码混淆的概念和作用&#xff0c;然后深入讨论ProGuard混淆文件的参数设置以及代码混淆的方法。接着&#xff0c;我们将介绍…

【node link】Node命令中的node link命令的使用,还有CLI全局命令的使用,开发命令行工具必不可少的部分

&#x1f601; 作者简介&#xff1a;一名大四的学生&#xff0c;致力学习前端开发技术 ⭐️个人主页&#xff1a;夜宵饽饽的主页 ❔ 系列专栏&#xff1a;NodeJs &#x1f450;学习格言&#xff1a;成功不是终点&#xff0c;失败也并非末日&#xff0c;最重要的是继续前进的勇气…

【RabbitMQ】3 RabbitMQ使用及交换机

目录 代码示例交换机概述无名交换机绑定&#xff08;binding&#xff09;交换机的类型FanoutDirectTopic 官网地址&#xff1a;https://www.rabbitmq.com/getstarted.htm 代码示例 先来看下如何使用rabbitmq&#xff1a; 使用 Java 编写两个程序&#xff0c;发送单个消息的生…

10年工作经验老程序员推荐的7个开发类工具

做.NET软件工作已经10年了&#xff0c;从程序员做到高级程序员&#xff0c;再到技术主管&#xff0c;技术总监。见证了Visual Studio .NET 2003,Visul Studio 2005, Visual Studio Team System 2008, Visual Studio 2010 Ultimate,Visual Studio 2013一系列近5个版本的变化与亲…

了解vcruntime140.dll文件,有效解决vcruntime140.dll的方法丢失

vcruntime140.dll丢失是一个常见的问题&#xff0c;一旦出现关于vcruntime140.dll丢失的错误弹窗就会导致各种应用程序无法正常启动或运行。本篇文章小编将带大家了解vcruntime140.dll文件&#xff0c;从vcruntime140.dll文件的来源到属性&#xff0c;一一给大家介绍&#xff0…

选择智能酒精壁炉,拥抱环保与未来生活

保护环境一直是我们共同的责任和目标&#xff0c;而在这场争取保护环境的斗争中&#xff0c;选择使用智能酒精壁炉而非传统壁炉成为了一种积极的行动。这不仅仅是对环境负责&#xff0c;更是对我们自身生活质量的关照。 传统壁炉与智能酒精壁炉的对比 传统壁炉常常以木柴、煤炭…

SpringBoot从数据库读取数据数据源配置信息,动态切换数据源

准备多个数据库 首先准备多个数据库&#xff0c;主库smiling-datasource&#xff0c;其它库test1、test2、test3 接下来&#xff0c;我们在主库smiling-datasource中&#xff0c;创建表databasesource&#xff0c;用于存储多数据源相关信息。表结构设计如下 创建好表之后&#…

高德地图vue-amap实现区域掩膜卫星图且背景为灰色

vue-amap高德1.4.4&#xff0c;区域掩膜效果 区域掩膜区域内展示卫星图&#xff0c;区域外背景灰色–>实现原理&#xff0c;先用灰色样式&#xff0c;当区域掩膜实现之后再添加卫星图层 效果如下&#xff1a; 代码如下&#xff1a; <template><div><div c…

使用kubesphere的devops部署SpringCloud项目

devops部署SpringCloud项目 环境说明部署流程创建DevOps工程填写流水线信息创建流水线jenkinsfileDockerfiledeploy.yaml 环境说明 已经安装kubesphere的devops组件安装教程可参考官方文档:https://v3-1.docs.kubesphere.io/zh/docs/pluggable-components/devops/ 部署流程 创…

【服务器数据恢复】Raid5热备盘同步失败导致lvm结构损坏的数据恢复案例

服务器数据恢复环境&#xff1a; 两组由4块磁盘组建的raid5磁盘阵列&#xff0c;两组raid5阵列划分为lun并组成了lvm结构&#xff0c;ext3文件系统。 服务器故障&#xff1a; 一组raid5阵列中的一块硬盘离线&#xff0c;热备盘自动上线并开始同步数据。在热备盘完成同步之前&am…

Python(32):字符串转换成列表或元组,列表转换成字典小例子

1、python 两个列表转换成字典 字符串转换成列表 列表转换成字典 column "ID,aes,sm4,sm4_a,email,phone,ssn,military,passport,intelssn,intelpassport,intelmilitary,intelganghui,inteltaitonei,credit_card_short,credit_card_long,job,sm4_cbc,sm4_a_cbc" …

IDEA的lombok失效导致工程代码编译build失败的问题处理

今天也是奇了怪了&#xff0c;打包工程&#xff0c;编译始终失败&#xff0c;明明代码符号存在的 解决办法就是&#xff1a;-Djps.track.ap.dependenciesfalse

three.js实现信号波效果

three.js实现信号波效果 图例 步骤 创建平面&#xff0c;添加贴图&#xff0c;平移几何体缩放 代码 <template><div class"app"><div ref"canvesRef" class"canvas-wrap"></div></div> </template><…

pmp考试费用要多少?

参加PMP项目管理考试费用主要是分为三部分&#xff0c;报名费用、培训费用、续证费用&#xff08;这部分的费用是获取证书后的后续费用&#xff0c;可以不认为是参加PMP认证费用来看&#xff09;&#xff0c;下面会一一介绍。 一、报名费用&#xff1a; 在中国大陆参加PMP笔试…

Python print 高阶玩法

Python print 高阶玩法 当涉及到在Python中使用print函数时&#xff0c;有许多方式可以玩转文本样式、字体和颜色。在此将深入探讨这些主题&#xff0c;并介绍一些print函数的高级用法。 1. 基本的文本样式与颜色设置 使用ANSI转义码 ANSI转义码是一种用于在终端&#xff0…