spring Cloud Stream 实战应用深度讲解

springCloudStream

简介

Spring Cloud Stream是一个框架,用于构建与共享消息传递系统连接的高度可扩展的事件驱动微服务。

该框架提供了一个灵活的编程模型,该模型建立在已经建立和熟悉的 Spring 习惯用语和最佳实践之上,包括对持久发布/订阅语义、消费者组和有状态分区的支持。

核心模块

  • Destination Binders: 负责提供与外部消息系统集成的组件
  • Destination Bindings: 外部消息系统和用户程序代码之间的桥梁(生产者-使用者之间的桥梁)
  • Message:生产者和消费者用于与Destination Binders(以及通过外部消息系统与其他应用程序)通信的规范数据结构。

历史

Spring 的数据集成之旅始于 Spring Integration。通过其编程模型,它提供了一致的开发人员体验来构建应用程序,这些应用程序可以采用企业集成模式来连接外部系统,例如数据库、消息代理等。

快进到云时代,微服务在企业环境中变得突出。Spring Boot 改变了开发人员构建应用程序的方式。借助 Spring 的编程模型和 Spring Boot 处理的运行时职责,可以无缝开发独立的、基于 Spring 的生产级微服务。

为了将其扩展到数据集成工作负载,Spring Integration 和 Spring Boot 被放在一个新项目中。Spring Cloud Stream 诞生了。

架构模型

在这里插入图片描述

这张图是spring-stream官网的,里面的Middleware指的就是RabbitMQ或者KafKa这些消息队列。

下图是我们原来和消息队列通信的方式。我们的程序直接发送数据给MQ或者监听到MQ的数据。

在这里插入图片描述

通过spring stream来做的话,就增加了Binder层来做统一调度,我们的程序只需要和Binder层通信,不需要关注底层的MQ是RabbitMQ还是Kafka

目前官方提供了两个Binder,分别是RabbitMQ的和Kafka的,其余队列的有一些第三方维护的。同时我们也可以自己实现Binder

一开始图中的InputOutput是对于spring stream来说的,input就是输入消息到stream中,output就是输出消息到我们的程序中。

简单介绍一下Binder,其实就是策略模式,统一接口实现,比如MQ1里面发送消息到MQ的方法叫Publish,MQ2里面发送消息到MQ的方法叫Release,但是在Binder接口里面提供了一个方法,就叫做add。也只需要提供一个Message消息。

public interface Binder{
    function add(Message msg);
}

// 连接MQ1的Binder
public class Binder1 implements Binder{
    public function add(Message msg){
        // 消息处理
        // 发送到MQ1
        publish(msg);
    }
}

// 连接MQ2的Binder
public class Binder2 implements Binder{
    public function add(Message msg){
        // 消息处理
        // 发送到MQ2
        release(msg);
    }
}

当我们使用的时候只需要自己决定使用哪个Binder就可以了。就是就和连接数据库一样,不需要关心连接的是Mysql还是PostgreSql。

public class main{
    public static function main() {
        Binder binder = new Binder1();
        Message msg = new Message();
        binder.add(msg);
    }
}
Bindings

Bindings作为一个桥梁,负责连接MQ和用户代码。比如绑定一个代码作为input往某一个Queue里面输入信息,绑定一个代码作为output从某个Queue里面接收信息。然后我们使用Binder来实现推送消息到MQ和消费消息。

这里是官网原文:The application communicates with the outside world by establishing bindings between destinations exposed by the external brokers and input/output arguments in your code. Broker specific details necessary to establish bindings are handled by middleware-specific Binder implementations.

下图为Bindings和Binder的关系

在这里插入图片描述

source 和 sink

source其实就是发送方的发送的Message. sink就是接收方接受的Message

注解实现

注解的实现已经被彻底删除,只有之前低版本的还能使用

函数式编程实现示例

依赖引入

将下面的代码加入pom文件,然后使用maven导入相关依赖即可。

// 引入spring cloud stream依赖
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
// 引入spring cloud stream的rabbit binder依赖
// 如果是kafka,那么把这个换成kafka的binder
// 在这个binder里面已经引入了 rabbit MQ依赖,所以不需要再单独引入rabbit MQ了
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

配置文件

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream: # stream的配置
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest

生产者

配置文件修改

对于函数式编程来说,spring cloud stream有一些约定或者说规定。比如我们注册了一个logPubBean,那么它对应的bindings配置的名称就是logPub-in-0或者logPub-out-0,前面是我们的方法名,中间表示生产者或消费者,in表示消费者,out表示生产者。这里的in or out是对于我们的代码来说的。后面的0就是一个序号。

写生产者之前我们需要加上对应的bindings配置。如果注册了多个Bean作为生产者或消费者,那么还需要配置哪些Bean是生产者和消费者。


spring:
  cloud:
    function: # 配置哪些Bean是Stream可以用的
        definition: log;logPub;sendLog
    stream: # stream的配置
        bindings: # 服务的整合处理
            logPub-out-0:
                destination: log # 表示要使用的Exchange名称定义,不存在会自动创建
                content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
写代码

随便新建一个类,并标记为@Component,主要是要让spring知道这个类。类名可以随便起。

@Component
public class logProducer {

}

然后开始编写生产者的代码。加入主要的方法log,方法名可以随便起,只需要记得把这个方法注册为一个Bean就可以了。一定要在上面加@Bean注解。

方法的返回值只能是Supplier函数接口类型。不能是其他的。

方法里面可以写生产者的具体代码。会注册一个名为logPubBean作为生产者。

@Component
public class logListener {
    @Bean
    public Supplier<logListener.Person> logPub() {
        return () -> {
            Person person = new Person();
            person.setName("张三");
            System.out.println("生产者:"+person);
            return person;
        };
    }

    public static class Person {
        private String name;
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        @Override
        public String toString() {
            return this.name;
        }
    }
}

关于Supplier,这个是java提供的函数式编程的接口。从java8开始提供的,java8里面的stream功能也用到了函数式编程。

下面是Supplier的注释和定义

//Represents a supplier of results.
//There is no requirement that a new or distinct result be returned each time the supplier is invoked.
//This is a functional interface whose functional method is get().

public interface Supplier<T>

翻译过来大概就是:一个结果的提供者或者一个结果的生产者。正好对应我们的生产者。该接口只有一个方法T get(),没有参数并且仅返回一个结果。

运行

运行的话会发现控制台一直在打印。我们的队列里面也一直在新增。

在这里插入图片描述

StreamBridge

当前的运行方式是当写完生产者以后,spring cloud stream会1/s次来调用我们的生产者,但是我们一般是自己来控制生产者的调用。就可以使用下面的方法。

我们可以通过StreamBridge来做到这一点。他有四个send方法。

  • public boolean send(String bindingName, Object data):第一个参数是bindingName,我们输入的是sendLog,就需要增加sendLog的配置,我们也可以用之前的logPub-out-0。第二个参数是发送的数据。
  • public boolean send(String bindingName, Object data, MimeType outputContentType):比上面的多了一个数据类型。
  • public boolean send(String bindingName, @Nullable String binderName, Object data):还可以指定Binder的name
  • public boolean send(String bindingName, @Nullable String binderName, Object data, MimeType outputContentType): 四个参数放在一起了。
@RestController
public class logController {

    @Autowired
    private StreamBridge streamBridge;

    @GetMapping("/sendLog")
    public void sendLog() {
        logListener.Person person = new logListener.Person();
        person.setName("李四");
        System.out.println("生产者发送消息"+person);
        streamBridge.send("sendLog", person);
    }
}

消费者

随便新建一个类,并标记为@Component,主要是要让spring知道这个类。类名可以随便起。

@Component
public class logListener {

}

然后开始编写消费者的代码。加入主要的方法log,方法名可以随便起,只需要记得把这个方法注册为一个Bean就可以了。一定要在上面加@Bean注解。

方法的返回值可以是Consumer,也可以是Function。不能是其他的。

方法里面就可以写消费的具体代码了。

@Component
public class logListener {
    @Bean
    public Consumer<logListener.Person> log() {
        return person -> {
            System.out.println("Received: " + person);
        };
    }

    public static class Person {
        private String name;
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        @Override
        public String toString() {
            return this.name;
        }
    }
}

关于ConsumerFunction,这两个是java提供的函数式编程的接口。从java8开始提供的,java8里面的stream功能也用到了函数式编程。

下面是Consumer接口的注释和接口的定义。

//Represents an operation that accepts a single input argument and returns no result. Unlike most other functional interfaces, Consumer is expected to operate via side-effects.
//This is a functional interface whose functional method is accept(Object).

public interface Consumer<T>

翻译过来大概就是说Consumer接口仅接收一个参数并且没有返回值,我们的代码里面也可以看到,接收了一个person参数,没有return。

该接口只有一个方法void accept(T t),T类型就是我们的Person类型。

下面是Function接口的注释和定义

//Represents a function that accepts one argument and produces a result.
//This is a functional interface whose functional method is apply(Object).
public interface Function<T, R>

翻译过来大概就是说Function接口仅接收一个参数并且返回一个结果。该接口只有一个方法R apply(T t),接收一个T类型的参数,返回一个R类型的结果。

在这里插入图片描述

手动ACK

通过禁止使用死信队列来执行手动的ACK,这个时候如果抛出异常,则会重试。如果开启了死信队列,那么抛出异常以后则会进入死信队列。

log-in-0:
    consumer:
        auto-bind-dlq: false

队列持久化

上面可以看出来,创建的都是匿名队列,当程序启动的时候自动创建,当程序关闭的时候自动删除。

但是正常开发中,很少使用这种,都会指定一个持久化的队列,不管程序是否运行,队列都存在。

我们可以在bindings的配置里面增加group配置来显式指定哪个队列,我们指定log123队列。

log-in-0:
    destination: log
    content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
    group: log123
sendLog:
    destination: log
    content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
    group: log123

再次运行程序,可以看到该队列被创建。接下来停止程序,可以看到队列还存在那里。

bindings重命名

默认约定的名称为log-in-0这种形式

但是我们也可以将它重命名。通过配置文件可以将log-in-0重命名为input,不过这样的话,所有的log-in-0的bindings配置都需要修改成input,使用上也是。注意官方并不推荐这种做法,他们认为在大多数情况下,这有点矫枉过正。

spring:
    cloud:
        stream:
            function:
                bindings:
                    log-in-0: input

显式绑定创建

默认约定的是log-in-0负责输入,log-out-0负责输出,我们也可以显式的创建这些。

通过配置文件

spring:
    cloud:
        stream:
            input-bindings: login;fooin
            output-bindings: logout;fooout

轮询配置属性

spring:
    integration:
        poller:
            # 全局配置
            fixedDelay: 1000L # 默认轮询器的延迟 单位毫秒,默认1000L 
            maxMessagesPerPoll: 1L # 默认轮询器的每个轮询事件的最大消息数。默认 1L
            cron: none # Cron 触发器的 Cron 表达式值。默认 none
            initialDelay: 0 # 周期性触发的初始延迟。 默认0
            timeUnit: MILLISECONDS # 要应用于延迟值的 TimeUnit。默认 MILLISECONDS

也可以单独为某个bindings来配置

spring:
    cloud:
        stream:
            bindings:
                log-out-0:
                    producer:
                        poller:
                            # log-out-0的单独配置
                            fixedDelay: 1000L # 默认轮询器的延迟 单位毫秒,默认1000L 
                            maxMessagesPerPoll: 1L # 默认轮询器的每个轮询事件的最大消息数。默认 1L
                            cron: none # Cron 触发器的 Cron 表达式值。默认 none
                            initialDelay: 0 # 周期性触发的初始延迟。 默认0
                            timeUnit: MILLISECONDS # 要应用于延迟值的 TimeUnit。默认 MILLISECONDS

函数组合

假设我们有两个处理Bean,enrich负责检查header,如果缺少foo,就添加为foo,bar。然后第二个echo则负责检查是否包含foo这个Header然后输出消息内容。

@Bean
public Function<Message<String>, Message<String>> enrich() {
    return message -> {
        Assert.isTrue(!message.getHeaders().containsKey("foo"), "Should NOT contain 'foo' header");
        return MessageBuilder.fromMessage(message).setHeader("foo", "bar").build();
    };
}

@Bean
public Function<Message<String>, Message<String>> echo() {
    return message -> {
        Assert.isTrue(message.getHeaders().containsKey("foo"), "Should contain 'foo' header");
        System.out.println("Incoming message " + message);
        return message;
    };
}

通过配置将这两个bean组合起来,组合之后,这个bean名称就编程了enrich|echo,后续的配置都需要这种冗长的名称,所以这里官方推荐使用重命名的方式将它变成简单的名称。

spring:
    cloud:
        function:
            definition: enrich|echo # 函数组合
        stream:
            function: 
                bindings:
                    enrich|echo-in-0: input # 重命名

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/344878.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

matlab appdesigner系列-常用18-表格

表格&#xff0c;常用来导入外部表格数据 示例&#xff1a; 导入外界excel数据&#xff1a;data.xlsx 姓名年龄城市王一18长沙王二21上海王三56武汉王四47北京王五88成都王六23长春 操作步骤如下&#xff1a; 1&#xff09;将表格拖拽到画布上 2&#xff09;对app1右键进行…

解决方案 | 基于SFTP协议的文件传输断点续传Java实现方案

背景 因项目需要&#xff0c;我们服务每天都需要通过SFTP协议来对接上下游进行文件传输&#xff0c;但是对于一些大文件&#xff0c;在与第三方公司的服务器对接过程中很可能会因为网络问题或上下游服务器性能问题导致文件上传或者下载被中断&#xff0c;每次重试都需要重新对…

骨传导是表示啥?骨传导耳机的工作原理是什么

近期&#xff0c;骨传导耳机这种创新的产品引发了市场关注&#xff0c;也引起了大量用户的疑问&#xff1a;骨传导是表示啥&#xff1f;骨传导的工作原理是什么&#xff1f;下面让我们来详细解析一下骨传是表示啥&#xff1f;以及骨传导耳机的工作原理是什么。 骨传导是表示啥&…

在Word中插入高亮/好看代码

Md2All 一个markdown工具 Md2All 网址 代码一定要用code 高亮主题可选 atom-one-light > 复制到word > 调节字体可选Cnsolas, 间距等 效果 另一个高亮工具 效果

集简云新增邮件发送功能,适用多种创意场景并提升邮件发送效率

在数字营销中&#xff0c;电子邮件依旧是连接企业与客户的重要桥梁。集简云深知这一点&#xff0c;本周推出为企业通讯打造的内置应用——集简云邮件发送&#xff0c;帮助用户创建充满个性化的交易电子邮件&#xff0c;还能通过HTML自定义代码来实现用户的创意场景。可与近千款…

多态应用实例

目录 多态优点1.模拟冲泡饮品过程2.电脑组装 多态优点 多态的优点&#xff1a; 代码组织结构清晰&#xff1b;可读性强&#xff1b;利于前期和后期的扩展以及维护。 如果想扩展新的功能&#xff0c;不需要修改源码&#xff0c;遵循开发中开闭原则&#xff0c;只需在补充所需…

yml配置文件怎么引用pom.xml中的属性

目录 前言配置测试 前言 配置文件中的一些参数有时要用到pom文件中的属性&#xff0c;做到pom文件变配置文件中也跟着变&#xff0c;那如何才能做到呢&#xff0c;下面咱们来一起探讨学习。 配置 1.首先要在pom.xml中做如下配置&#xff0c;让maven渲染src/main/resources下配…

性能优化(CPU优化技术)-NEON 介绍

「发表于知乎专栏《移动端算法优化》」 本节主要介绍基本 SIMD 及其他的指令流与数据流的处理方式&#xff0c;NEON 的基本原理、指令以及与其他平台及硬件的对比。 &#x1f3ac;个人简介&#xff1a;一个全栈工程师的升级之路&#xff01; &#x1f4cb;个人专栏&#xff1a;…

【力扣】记录一下竞赛分上 Knight

记录一下力扣上 Knight 力扣的题还是相对来说比较简单的&#xff0c;前两个月写的题多一点&#xff0c;后面几乎都是只做了每日一题&#xff0c;感觉正常来说刷个两三个月的题水平就差不多够了&#xff0c;甚至在我才刷半个月的时候就可以做三题了&#xff0c;排名和现在差不多…

vue3源码(二)reactiveeffect

一.reactive与effect功能 reactive方法会将对象变成proxy对象&#xff0c; effect中使用reactive对象时会进行依赖收集&#xff0c;稍后属性变化时会重新执行effect函数。 <div id"app"></div><script type"module">import {reactive,…

2024年需要重点关注的15种计算机病毒

2024年&#xff0c;计算机病毒威胁变得愈发多元化和复杂化。涉及勒索病毒、二维码病毒、挖矿木马等15种类型&#xff0c;这些病毒从数据勒索到系统入侵&#xff0c;对全球网络安全构成严峻挑战。 2024年&#xff0c;计算机病毒威胁变得愈发多元化和复杂化。涉及勒索病毒、二维码…

Java 报错java.Net.UnknownHostException:raw.githubusercontent.com

1.问题 今天在vscode 学习如何使用 plantUML生成图片的时候&#xff0c;发生错误 java.util.concurrent.ExecutionException: java.net.UnknownHostException: raw.githubusercontent.com issue raw.githubusercontent.com java.util.concurrent.ExecutionException: java.n…

vulhub之Zabbix篇

CVE-2016-10134--SQL注入 一、漏洞介绍 zabbix是一款服务器监控软件&#xff0c;其由server、agent、web等模块组成&#xff0c;其中web模块由PHP编写&#xff0c;用来显示数据库中的结果。 漏洞环境 在vulhub靶场进行复现&#xff0c;启动zabbix 3.0.3。 二、复现步骤 1…

JL-03-Q6 校园气象站

产品概述 校园气象站针对测量与环境、科学研究等相关的气象指标进行设计制造&#xff0c;气象站对采集数据信息以图表、数据的形式真实、直观的反应当前环境数据指标。可通过各种传感器对气压、气温、相对湿度、风向、风速、雨量、太阳辐射、乃至空气质量等要素进行采集、存储…

Type-C平板接口协议芯片介绍,实现单C口充放电功能

在现代平板电脑中&#xff0c;Type-C接口已经成为了一个非常常见的接口类型。相比于传统的USB接口&#xff0c;Type-C接口具有更小的体积、更快的传输速度和更方便的插拔体验。但是&#xff0c;在使用Type-C接口的平板电脑上&#xff0c;如何实现单C口充电、放电和USB2.0数据传…

【后端技术】术有千法,道本归一

目录 1.概述 2.机器的问题 2.1.计算 2.2.存储 2.3.传输 3.人的问题 3.1.代码工程的管理 3.2.过程的把控 4.总结 1.概述 术有千法&#xff0c;道本归一。 之所以这样说&#xff0c;是因为当前出现的纷繁复杂的后端技术&#xff0c;其本质其实都是为了解决同一套问题。…

C++:迭代器失效问题

目录 1.vector迭代器失效问题 1.底层空间改变 ​编辑 2.指定位置元素的删除操作 2.Linux下的迭代器失效检测 1.扩容 2.删除 3.解决方法 1.vector迭代器失效问题 迭代器的主要作用就是让算法能够不用关心底层数据结构&#xff0c;其底层实际就是一个指针&#xff0c;或者是…

uniapp设置隐藏原生导航栏(3)

1、单个页面隐藏 在pages.json里配置 (第一种方式) {"path": "pages/home/index","style": {"navigationBarTitleText": "首页","navigationStyle": "custom" // 使用自定义导航栏&#xff0c;系统会关…

LoadRunner从零开始之走近LoadRunner

3.1 LoadRunner 的运行原理 安装LoadRunner 后&#xff0c;在菜单“开始” 一“MercuryLoadRunner” 中&#xff0c;你会看 到这样一组程序&#xff0c;如图 3-1 所示。 • 其中Applications 下面的Analysis、Controller 和Virtual User Generator 是我们 做性能测试最常用的…

【基础配置】Python2/Python3并存安装配置教程

Nx01 产品简介 Python是一种高级的、解释型的、面向对象的通用编程语言&#xff0c;具有简单易学、代码可读性强、功能强大、可移植性好等特点。它可以应用于多种领域&#xff0c;如Web开发、数据科学、人工智能、机器学习、科学计算、自动化测试等。Python由Guido van Rossum于…