SpringCloudStream原理和深入使用

简单概述

Spring Cloud Stream是一个用于构建与共享消息传递系统连接的高度可扩展的事件驱动型微服务的框架。

应用程序通过inputs或outputs来与Spring Cloud Stream中binder对象交互,binder对象负责与消息中间件交互。也就是说:Spring Cloud Stream能够屏蔽底层消息中间件【RabbitMQ,kafka等】的差异,降低切换成本,统一消息的编程模型

相关概念

Channel(通道):Channel是消息的传输管道,用于在生产者和消费者之间传递消息。生产者通过输出通道将消息发送到Destination,消费者通过输入通道从Destination接收消息。

在Spring Cloud Stream中,有两种类型的通道:输入(input)和输出(output)。这两种通道分别用于消费者接收消息和生产者发送消息。

  • Input(输入):Input通道用于消费者从消息代理接收消息。消费者可以通过监听Input通道来实时接收传入的消息

  • Output(输出):Output通道用于生产者向消息代理发送消息。生产者可以通过向Output通道发送消息来发布新的消息

Destination(目标):Destination是消息的目的地,通常对应于消息代理中的Topic或Queue。生产者将消息发送到特定的Destination,消费者从其中接收消息。

Binder(绑定器):Binder是Spring Cloud Stream的核心组件之一。它作为消息代理与外部消息中间件进行交互,并负责将消息发送到消息总线或从消息总线接收消息。Binder负责处理消息传递、序列化、反序列化、消息路由等底层细节,使得开发者能够以统一的方式与不同的消息中间件进行交互。Spring Cloud Stream提供了多个可用的Binder实现,包括RabbitMQ、Kafka等。

**消费者组:**在Spring Cloud Stream中,消费组(Consumer Group)是一组具有相同功能的消费者实例。当多个消费者实例属于同一个消费组时,消息代理会将消息均匀地分发给消费者实例,以实现负载均衡。如果其中一个消费者实例失效,消息代理会自动将消息重新分配给其他可用的消费者实例,以实现高可用性。(对于一个消息来说,每个消费者组只会有一个消费者消费消息)

分区:Spring Cloud Stream支持在多个消费者实例之间创建分区,这样我们通过某些特征量做消息分发,保证相同标识的消息总是能被同一个消费者处理

Spring Message

Spring Message是Spring Framework的一个模块,其作用就是统一消息的编程模型。

package org.springframework.messaging;

public interface Message<T> {
    T getPayload();

    MessageHeaders getHeaders();
}

消息通道 MessageChannel 用于接收消息,调用send方法可以将消息发送至该消息通道中:

@FunctionalInterface
public interface MessageChannel {

	long INDEFINITE_TIMEOUT = -1;

	default boolean send(Message<?> message) {
		return send(message, INDEFINITE_TIMEOUT);
	}

	boolean send(Message<?> message, long timeout);

}

消息通道里的消息由消息通道的子接口可订阅的消息通道SubscribableChannel实现,被MessageHandler消息处理器所订阅

public interface SubscribableChannel extends MessageChannel {

	boolean subscribe(MessageHandler handler);

	boolean unsubscribe(MessageHandler handler);

}

MessageHandler真正地消费/处理消息

@FunctionalInterface
public interface MessageHandler {
    void handleMessage(Message<?> message) throws MessagingException;
}

Spring Integration

Spring Integration 提供了 Spring 编程模型的扩展用来支持企业集成模式(Enterprise Integration Patterns),是对 Spring Messaging 的扩展。

它提出了不少新的概念,包括消息路由MessageRoute、消息分发MessageDispatcher、消息过滤Filter、消息转换Transformer、消息聚合Aggregator、消息分割Splitter等等。同时还提供了MessageChannel和MessageHandler的实现,分别包括 DirectChannel、ExecutorChannel、PublishSubscribeChannel和MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等内容。

Spring-Cloud-Stream的架构

img

快速入门

引入依赖

        <!--stream-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

增加配置文件

spring:
    cloud:
        stream:
            # 定义消息中间件
            binders:
              MyRabbit:
                  type: rabbit
                  environment:
                    spring:
                        rabbitmq:
                            host: localhost
                            port: 5672
                            username: root
                            password: root
                            vhost: /
            bindings:
            # 生产者中定义,定义发布对象
              myInput:
                destination: myStreamExchange
                group: myStreamGroup
                binder: MyRabbit
            # 消费者中定义,定义订阅的对象
              myOutput-in-0:
                destination: myStreamExchange
                group: myStreamGroup
                binder: MyRabbit
        # 消费者中定义,定义输出的函数
        function:
            definition: myOutput

生产者

	@Resource
	private StreamBridge streamBridge;

	public void sendNormal() {
		streamBridge.send("myInput", "hello world");
	}

消费者

	@Bean("myOutput")
	public Consumer<Message<String>> myOutput() {
		return (message) -> {
			MessageHeaders headers = message.getHeaders();
			System.out.println("myOutput head is : " + headers);
			String payload = message.getPayload();
			System.out.println("myOutput payload is : " + payload);
		};
	}

如何自定义Binder

  1. 添加spring-cloud-stream依赖
  2. 提供ProvisioningProvider的实现
  3. 提供MessageProducer的实现
  4. 提供MessageHandler的实现
  5. 提供Binder的实现
  6. 创建Binder的配置
  7. META-INF/spring.binders中定义绑定器

添加spring-cloud-stream依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
    <version>${spring.cloud.stream.version}</version>
</dependency>

提供ProvisioningProvider的实现

ProvisioningProvider负责提供消费者和生产者目的地,并需要将 application.yml 或 application.properties 文件中包含的逻辑目的地转换为物理目的地引用。

public class FileProvisioningProvider implements ProvisioningProvider<
	ExtendedConsumerProperties<FileConsumerProperties>, ExtendedProducerProperties<FileProducerProperties>> {

	public FileProvisioningProvider() {
		super();
	}

	@Override
	public ProducerDestination provisionProducerDestination(String name, ExtendedProducerProperties<FileProducerProperties> properties) throws ProvisioningException {
		return new FileMessageDestination(name);
	}


	@Override
	public ConsumerDestination provisionConsumerDestination(String name, String group, ExtendedConsumerProperties<FileConsumerProperties> properties) throws ProvisioningException {
		return new FileMessageDestination(name);
	}


	private static class FileMessageDestination implements ProducerDestination, ConsumerDestination {

		private final String destination;

		private FileMessageDestination(final String destination) {
			this.destination = destination;
		}

		@Override
		public String getName() {
			return destination.trim();
		}

		@Override
		public String getNameForPartition(int partition) {
			throw new UnsupportedOperationException("Partitioning is not implemented for file messaging.");
		}

	}

}

提供MessageProducer的实现

MessageProducer负责使用事件并将其作为消息处理,发送给配置为使用此类事件的客户端应用程序。

		super.onInit();
		executorService = Executors.newScheduledThreadPool(1);
	}

	@Override
	public void doStart() {
		executorService.scheduleWithFixedDelay(() -> {
			String payload = getPayload();
			if (payload != null) {
				Message<String> receivedMessage = MessageBuilder.withPayload(payload).build();
				sendMessage(receivedMessage);
			}

		}, 0, 50, TimeUnit.MILLISECONDS);
	}

	@Override
	protected void doStop() {
		executorService.shutdownNow();
	}

	private String getPayload() {
		try {
			List<String> allLines = Files.readAllLines(Paths.get(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"));
			String currentPayload = allLines.get(allLines.size() - 1);
			if (!currentPayload.equals(previousPayload)) {
				previousPayload = currentPayload;
				return currentPayload;
			}
		} catch (IOException e) {
			FileUtil.touch(new File(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"));
		}

		return null;
	}
}

提供MessageHandler的实现

MessageHandler提供产生事件所需的逻辑。

public class FileMessageHandler extends AbstractMessageHandler {
	FileExtendedBindingProperties fileExtendedBindingProperties;
	ProducerDestination destination;

	public FileMessageHandler(ProducerDestination destination, FileExtendedBindingProperties fileExtendedBindingProperties) {
		this.destination = destination;
		this.fileExtendedBindingProperties = fileExtendedBindingProperties;
	}

	@Override
	protected void handleMessageInternal(Message<?> message) {
		try {
			if (message.getPayload() instanceof byte[]) {
				Files.write(Paths.get(fileExtendedBindingProperties.getPath() + File.separator + destination.getName() + ".txt"), (byte[]) message.getPayload());
			} else {
				throw new RuntimeException("处理消息失败");
			}
		} catch (IOException e) {
			throw new RuntimeException(e);
		}
	}
}

提供Binder的实现

提供自己的Binder抽象实现:

  • 扩展AbstractMessageChannelBinder
  • 将自定义的 ProvisioningProvider 指定为 AbstractMessageChannelBinder 的通用参数
  • 重写createProducerMessageHandlercreateConsumerEndpoint方法
public class FileMessageChannelBinder extends AbstractMessageChannelBinder
	<ExtendedConsumerProperties<FileConsumerProperties>, ExtendedProducerProperties<FileProducerProperties>, FileProvisioningProvider>
	implements ExtendedPropertiesBinder<MessageChannel, FileConsumerProperties, FileProducerProperties> {

	FileExtendedBindingProperties fileExtendedBindingProperties;

	public FileMessageChannelBinder(String[] headersToEmbed, FileProvisioningProvider provisioningProvider, FileExtendedBindingProperties fileExtendedBindingProperties) {
		super(headersToEmbed, provisioningProvider);
		this.fileExtendedBindingProperties = fileExtendedBindingProperties;
	}


	@Override
	protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<FileProducerProperties> producerProperties, MessageChannel errorChannel) throws Exception {
		FileMessageHandler fileMessageHandler = new FileMessageHandler(destination, fileExtendedBindingProperties);
		return fileMessageHandler;
	}


	@Override
	protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<FileConsumerProperties> properties) throws Exception {
		FileMessageProducerAdapter fileMessageProducerAdapter = new FileMessageProducerAdapter(destination, fileExtendedBindingProperties);

		return fileMessageProducerAdapter;
	}


	@Override
	public FileConsumerProperties getExtendedConsumerProperties(String channelName) {
		return fileExtendedBindingProperties.getExtendedConsumerProperties(channelName);
	}

	@Override
	public FileProducerProperties getExtendedProducerProperties(String channelName) {
		return fileExtendedBindingProperties.getExtendedProducerProperties(channelName);
	}


	@Override
	public String getDefaultsPrefix() {
		return fileExtendedBindingProperties.getDefaultsPrefix();
	}

	@Override
	public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
		return fileExtendedBindingProperties.getExtendedPropertiesEntryClass();
	}
}

创建Binder的配置

严格要求创建一个 Spring 配置来初始化你的绑定器实现的 bean

@EnableConfigurationProperties(FileExtendedBindingProperties.class)
@Configuration
public class FileMessageBinderConfiguration {
	@Bean
	@ConditionalOnMissingBean
	public FileProvisioningProvider fileMessageBinderProvisioner() {
		return new FileProvisioningProvider();
	}

	@Bean
	@ConditionalOnMissingBean
	public FileMessageChannelBinder fileMessageBinder(FileProvisioningProvider fileMessageBinderProvisioner, FileExtendedBindingProperties fileExtendedBindingProperties) {
		return new FileMessageChannelBinder(null, fileMessageBinderProvisioner, fileExtendedBindingProperties);
	}


	@Bean
	public FileProducerProperties fileConsumerProperties() {
		return new FileProducerProperties();
	}
}

详细的代码见https://gitee.com/xiaovcloud/spring-cloud-stream

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/719027.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Sunny v1.3.0 官方版 (简洁且漂亮截图应用)

前言 Sunny是一款漂亮又实用的“截图&钉图”的软件&#xff0c;亦支持“屏幕识图”和“OCR”的软件。 一、下载地址 下载链接&#xff1a;http://dygod/source 点击搜索&#xff1a;Sunny 二、安装步骤 1、解压后将Sunny.exe发送到桌面快捷方式 2、启动桌面图标 3、正…

下载lombok.jar包,简化类的代码

Download (projectlombok.org) 去这个网站下载lombok.jar包 打开这个包文件的位置,拖到项目lib文件夹: 在这里右键添加为库(Add as library)。 添加这三个注解即可&#xff0c;类里面不需要其他东西了

手写操作系统

对喜欢操作系统的伙伴强推一门课程 从0开始实现了支持文件系统、任务切换和网络协议栈的操作系统。 具体见 &#xff1a;http://www.ziyuanwang.online/977.html

012.指纹浏览器编译-修改canvas指纹(高级)

指纹浏览器编译-修改canvas指纹(高级) 一、canvas指纹是什么 之前介绍过canvas指纹和常见网站绕过canvas指纹&#xff0c;插眼&#xff1a; https://blog.csdn.net/w1101662433/article/details/137959179 二、为啥有更高级的canvas指纹 众所周知&#xff0c;creepjs和brow…

Java Lambda表达式:简洁代码的艺术与实战技巧

引言 Java Lambda表达式是Java SE8中引入的一项重要的语言特性&#xff0c;它允许我们以简洁的方式去编写代码&#xff0c;同时也能大大提高代码的可读性和编写的灵活性。结合Java8及以后版本中引入的Stream API&#xff0c;Lambda表达式使得集合操作变得更为直观和强大。本文将…

Codeforces Round 953 (Div. 2 ABCDEF题) 视频讲解

A. Alice and Books Problem Statement Alice has n n n books. The 1 1 1-st book contains a 1 a_1 a1​ pages, the 2 2 2-nd book contains a 2 a_2 a2​ pages, … \ldots …, the n n n-th book contains a n a_n an​ pages. Alice does the following: She …

Centos7如何扩容未做lvm的GPT硬盘

背景&#xff1a;一台根分区为2.5T(已转换GPT格式)的虚拟机使用率达到97%&#xff0c;需要扩容&#xff0c;但是又没做lvm 通过平台新增容量1.5T&#xff0c;如下可看到 安装growpart准备扩容&#xff1a; yum install cloud-utils-growpart -y 执行命令growpart报错&#xff…

11 数制介绍及转换

数制介绍 一、数制介绍 &#xff08;一&#xff09;计算机的数制 ​ 二进制这个词的意思是基于两个数字 ​ 二进制数或二进制位表示为0 和1 ​ 示例&#xff1a;10001011 ​ 十进制数制系统包括10 个数字&#xff1a;十进制数0、1、2、3、4、5、6、7、8、9 ​ 示例&…

性能测试项目实战

项目介绍和部署 项目背景 轻商城项目是一个现在流行的电商项目。我们需要综合评估该项目中各个关键接口的性能&#xff0c;并给出优化建议&#xff0c;以满足项目上线后的性能需要。 项目功能架构 前台商城&#xff1a;购物车、订单、支付、优惠券等 后台管理系统&#xf…

【挑战100天首通《谷粒商城》】-【第一天】06、环境-使用vagrant快速创建linux虚拟机

文章目录 课程介绍1、安装 linux 虚拟机2、安装 VirtualBoxStage 1&#xff1a;开启CPU虚拟化Stage 2&#xff1a;下载 VirtualBoxStage 2&#xff1a;安装 VirtualBoxStage 4&#xff1a;安装 VagrantStage 4-1&#xff1a;Vagrant 下载Stage 4-2&#xff1a;Vagrant 安装Stag…

如何确保pcdn的稳定性?(壹)

确保PCDN的稳定性是一个重要任务&#xff0c;涉及多个方面的操作和考虑。以下是一些建议&#xff0c;帮助你确保PCDN的稳定性&#xff1a; 一&#xff0e;选择合适的服务器与硬件&#xff1a; 选择稳定可靠的服务器供应商和硬件设备&#xff0c;确保服务器具有高可用性和容错…

图说设计模式:单例模式

更多C学习笔记&#xff0c;关注 wx公众号&#xff1a;cpp读书笔记 5. 单例模式 单例模式 模式动机模式定义模式结构时序图代码分析模式分析实例优点缺点适用环境模式应用模式扩展总结 5.1. 模式动机 对于系统中的某些类来说&#xff0c;只有一个实例很重要&#xff0c;例如…

我在高职教STM32——GPIO入门之蜂鸣器

大家好&#xff0c;我是老耿&#xff0c;高职青椒一枚&#xff0c;一直从事单片机、嵌入式、物联网等课程的教学。对于高职的学生层次&#xff0c;同行应该都懂的&#xff0c;老师在课堂上教学几乎是没什么成就感的。正因如此&#xff0c;才有了借助 CSDN 平台寻求认同感和成就…

聚四氟乙烯离心管 四氟反应管 消解管 PTFE螺口带盖管 特氟龙试管

一、产品介绍 样品悬浮液盛放在管状试样容器中&#xff0c;在离心机的高速旋转下&#xff0c;由于巨大的离心力作用&#xff0c;使悬浮的微小颗粒 以一定的速度沉降&#xff0c;从而与溶液得以分离。这种带密封盖或压盖的管状试样容器&#xff0c;就是离心管。 PTFE离心管&…

《STM32 HAL库》RCC 相关系列函数详尽解析—— HAL_RCC_OscConfig()

观前提示&#xff1a;函数完整代码在文末&#xff0c;本文梳理了函数HAL_RCC_OscConfig()的主要逻辑和实现方法f105时钟树详解图 HAL_RCC_OscConfig() 函数介绍&#xff1a; 此函数是一个用于初始化RCC&#xff08;Reset and Clock Control&#xff09;振荡器&#xff08;Osc…

【ArcGIS微课1000例】0120:ArcGIS批量修改符号的样式(轮廓)

ArcGIS可以批量修改符号的样式,如样式、填充颜色、轮廓等等。 文章目录 一、加载实验数据二、土地利用符号化三、批量修改符号样式四、注意事项一、加载实验数据 订阅专栏后,从私信查收专栏配套的完整实验数据包,打开0120.rar中的土地利用数据,如下图所示: 查看属性表: …

全光万兆时代来临:信而泰如何推动F5G-A(50PONFTTR)技术发展

技术背景 F5G-A&#xff08;Fifth Generation Fixed Network-Advanced&#xff0c;第五代固定网络接入&#xff09;是固定网络技术的一次重大升级&#xff0c;代表了光纤网络技术的最新发展。F5G-A旨在提供更高的带宽、更低的延迟、更可靠的连接以及更广泛的应用场景。 F5G-A六…

ORION Space Scene Generation Framework

ORION太空场景生成框架是一个涵盖所有太空场景生成方面的系统,从程序化的行星和宇宙飞船到任何相关的特效,支持所有管道。 重要提示!!!:ORION资产可以从Sky Master ULTIMATE升级,从而可以与Sky Master ULTIMATE的全容积行星云和大气效果相结合,最适合在云层中飞行。 这…

HTML静态网页成品作业(HTML+CSS)——美食火锅介绍网页(1个页面)

&#x1f389;不定期分享源码&#xff0c;关注不丢失哦 文章目录 一、作品介绍二、作品演示三、代码目录四、网站代码HTML部分代码 五、源码获取 一、作品介绍 &#x1f3f7;️本套采用HTMLCSS&#xff0c;未使用Javacsript代码&#xff0c;共有1个页面。 二、作品演示 三、代…

springboot知识点大全

文章目录 LombokLombok介绍Lombok常用注解Lombok应用实例代码实现idea安装lombok插件 Spring InitializrSpring Initializr介绍Spring Initializr使用演示需求说明方式1: IDEA创建方式2: start.spring.io创建 注意事项和说明 yaml语法yaml介绍使用文档yaml基本语法数据类型字面…