玩转SpringCloud Stream

背景及痛点

现如今消息中间件(MQ)在互联网项目中被广泛的应用,特别是大数据行业应用的特别的多,现在市面上也流行这多个消息中间件框架,比如ActiveMQRabbitMQRocketMQKafka等,这些消息中间件各有各的优劣,但是想要解决的问题都基本相同。由于每个框架都有它自己的使用方式,这无疑是增加了开发者的学习成本以及添加相同的业务复杂度。框架的变更或者多个中间件的混合使用使得业务逻辑代码中中间件的切换、项目的维护和开发都会变得更加繁琐。

有没有一种技术让我们不再需要关注具体MQ的使用细节,我们只需要专注业务逻辑的开发,让程序根据实际项目的使用自己去适配绑定,自动在各种MQ内切换呢?springcloud stream便为此而生。

关于stream

我们用一句话来描述stream就是:屏蔽底层消息中间件的差异,降低切换版本,统一消息的编程模型

官方定义SpringCloud Stream是一个构建消息驱动微服务的框架,应用通过inputs或者outputs来与SpringCloud Stream中的binder对象交互,我们通过配置来绑定消息中间件,而SpringCloud Streambinder对象负责与消息中间件交互,所以我们只需要搞清楚如何与SpringCloud Stream交互即可方便的使用消息中间件。

SpringCloud Stream通过Spring Integration来连接消息代理中间件以实现消息事件驱动,它提供了个性化的自动化配置,引用了发布订阅消费组分区的三个核心概念,但是目前仅支持RabbitMQKafka

设计思想

在此之前

以前的架构

生产者和消费者通过消息媒介(queue等)传递信息内容(Message),消息必须通过特定的通道(MessageChannel),通过消息的发布与订阅来决定消息的发送和消费(publish/subscrib)。

引入中间件

现在假如我们用到了RabbitMQKafka,由于这两个消息中间件的架构上的不同,像RabbitMQExchange,而KafkatopichePartitions分区

引入中间件之后

(binder中,input对于消费者,output对应生产者。)

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,但是后面因为业务需求,需要改用另外一种消息队列进行迁移,这时候无疑就是一 个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

屏蔽底层差异

在没有绑定器(Builder)这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间件,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道, 使得应用程序不需要再考虑各种不同的消息中间件实现。

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

处理架构

Stream对消息中间件的进一步封装可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

处理架构

其遵循了发布-订阅模式,主要使用的就是Topic主题进行广播,RabbitMQ就是Exchange,在Kafka中就是Topic

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

stream流程

stream流程
  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过对Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入

常用api和注解

常用api和注解

使用示例

基本环境

  • 注册中心:Eureka,可以是其他。

  • 消息中间件:RabbitMQ

    rabbitmq:
      host: localhost
      port: 5672
      username: guest
      password: guest
    

生产端

依赖
 <!--stream rabbit -->
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--eureka client-->
<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
配置文件
server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    # stream 配置
    stream:
      binders: # 配置绑定的消息中间件的服务信息
        defaultRabbit: # 自定义的一个名称,用来下面 bindings 绑定
          type: rabbit  # 消息组件的类型
          environment:  #相关环境配置,设置rabbitmq的环境
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output:  # 通道名称
          destination: testExchange # 定义要使用的Exchange的名称
          content-type: application/json  # 设置消息类型,对象为json,文本是text/plain
          binder: defaultRabbit # 设置要绑定的服务的具体设置,就是我们上面配置的defaultRabbit

eureka:
  client:
    #表示是否将自己注册进EurekaServer默认为true
    register-with-eureka: true
    #是否从EurekaServer抓取已有的注册消息,默认为true,单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡
    fetch-registry: true
    service-url:
      #单机版
      defaultZone: http://localhost:8080/eureka/
  instance:
    prefer-ip-address: true
    instance-id: sender01
定义接口

这里需要定义一个接口并实现它,方便其他业务调用。

public interface IMessageProvider {
    /**
     * 发送接口
     * @param msg
     * @return
     */
    public String send(String msg);
}
接口实现

接口实现中需要添加@EnableBinding注解,并引入Source.class,为什么引入Source.class呢?因为它是生产者,我们参考stream流程图就可以知道

import com.martain.study.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder; 
import javax.annotation.Resource;

@EnableBinding(Source.class)
public class MessageProvider implements IMessageProvider {
    /**
     * 注入消息发送管道
     */
    @Resource
    private MessageChannel output;

    @Override
    public String send(String msg) {
        output.send(MessageBuilder.withPayload(msg).build());
        System.out.println("******send message:"+msg);
        return msg;
    }
}
定义测试controller

@RestController
public class TestController {

    @Autowired
    IMessageProvider messageProvider;

    @GetMapping("/sendMsg")
    public String sendMsg(){
        String msg = UUID.randomUUID().toString();
        return messageProvider.send(msg);
    }
 
}
启动类
@SpringBootApplication
public class StreamProviderApplication8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamProviderApplication8801.class,args);
    }
} 

服务启动之后,多次请求/sendMsg,发送了多条消息。

生产服务生产消息

消费端

依赖
   <!--stream rabbit -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <!--eureka client-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
配置文件

与生产者类似,只是bindings中的output改成了input

server:
  port: 8802
spring:
  application:
    name: cloud-stream-consume
  cloud:
    # stream 配置
    stream:
      binders: # 配置绑定的消息中间件的服务信息
        defaultRabbit: # 自定义的一个名称,用来下面 bindings 绑定
          type: rabbit  # 消息组件的类型
          environment:  #相关环境配置,设置rabbitmq的环境
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input:  # 通道名称
          destination: testExchange # 定义要使用的Exchange的名称
          content-type: application/json  # 设置消息类型,对象为json,文本是text/plain
          binder: defaultRabbit # 设置要绑定的服务的具体设置,就是我们上面配置的defaultRabbit

eureka:
  client:
    #表示是否将自己注册进EurekaServer默认为true
    register-with-eureka: true
    #是否从EurekaServer抓取已有的注册消息,默认为true,单节点无所谓,集群必须设置为true才能配合ribbon使用负载均衡
    fetch-registry: true
    service-url:
      #单机版
      defaultZone: http://localhost:8080/eureka/
  instance:
    prefer-ip-address: true
    instance-id: recover01

接收服务

接收服务只需要再类名前添加@EnableBinding()注解,并引入Sink.class类,而实际接收的方法中需要添加@StreamListener(Sink.INPUT)注解。

package com.martain.study.controller; 
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component; 

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
    /**
     * 获取本服务的端口
     */
    @Value("${server.port}")
    private String serverPort;
    /**
     * 这里表示监听sink的input
     * @param message
     */
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("**** recv msg :"+message.getPayload()+"   in port "+serverPort);
    }
}

启动类
@SpringBootApplication
public class StreamConsumerApplication8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamConsumerApplication8802.class,args);
    }
}

启动生产服务后,在启动消费服务,多次请求生产服务发送消息,我们可以发现消费者能很快的消费这些消息。

消费者消费消息

消息分组

当我们有多个消费者时,这个时候生产者生产一条消息,会发现所有的消费者都会消费这个消息。比如在一些订单系统的场景中,如果一个订单被多个处理服务一起获取到,就容易造成数据错误,那我们如何避免这种情况呢?这时我们就可以使用Stream的消息分组来解决重复消费问题。

如何实现Stream的消息分组呢?我们只要简单的在yml文件中配置spring.cloud.stream.bindings.input.group即可。示例如下:

...
spring:
  application:
    name: cloud-stream-consume
  cloud:
    # stream 配置
    stream:
      binders: # 配置绑定的消息中间件的服务信息
        defaultRabbit: # 自定义的一个名称,用来下面 bindings 绑定
          type: rabbit  # 消息组件的类型
          environment:  #相关环境配置,设置rabbitmq的环境
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input:  # 通道名称
          destination: testExchange # 定义要使用的Exchange的名称
          content-type: application/json  # 设置消息类型,对象为json,文本是text/plain
          binder: defaultRabbit # 设置要绑定的服务的具体设置,就是我们上面配置的defaultRabbit
          group: groupA # 配置分组
          
            ...

如果没有设置该属性,当消费服务启动时,会有个随机的组名

如果我们将所有的消费服务的group熟悉都设置成一致的话,这些服务就会在同一个组里面,从而能够保证消息只被应用消费一次。

同一组的消费者是竞争关系,不可以重复消费。

消息持久化

当生产者在持续生产消息,消费服务突然挂了,使得拥有许多消息并没有被消费,如果消费没有配置分组的话,消费服务重启是无法消费未消费的消息的,如果配置了分组的话,当消费服务重启之后可以自动去消费未消费的数据。



喜欢的朋友记得点赞、收藏、关注哦!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/972720.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Window下Redis的安装和部署详细图文教程(Redis的安装和可视化工具的使用)

文章目录 Redis下载地址&#xff1a;一、zip压缩包方式下载安装 1、下载Redis压缩包2、解压到文件夹3、启动Redis服务4、打开Redis客户端进行连接5、使用一些基础操作来测试 二、msi安装包方式下载安装 1、下载Redis安装包2、进行安装3、进行配置4、启动服务5、测试能否正常工…

SOME/IP--协议英文原文讲解9

前言 SOME/IP协议越来越多的用于汽车电子行业中&#xff0c;关于协议详细完全的中文资料却没有&#xff0c;所以我将结合工作经验并对照英文原版协议做一系列的文章。基本分三大块&#xff1a; 1. SOME/IP协议讲解 2. SOME/IP-SD协议讲解 3. python/C举例调试讲解 4.2.1.4 T…

容器网络(三)- calico网络IPIP模式

一、前置知识 calico的IPIP模式使用到了tun设备&#xff0c;先来了解下什么是tun设备&#xff0c;它的作用是什么&#xff0c;以及使用到tun设备的IPIP隧道是如何工作的。 1.1 tun设备 tun是网络层的虚拟网络设备&#xff0c;可以收发第三层数据报文包&#xff0c;如IP封包&…

网络原理-HTTP/HTTPS

文章目录 HTTPHTTP 是什么&#xff1f;理解“应用层协议”理解 HTTP 协议的⼯作过程HTTP 协议格式抓包⼯具的使用抓包⼯具的原理抓包结果协议格式总结 HTTP 请求&#xff08;Request&#xff09;认识 URLURL 的基本格式关于URL encode 认识“⽅法”&#xff08;method&#xff…

sentinel集成nacos做持久化配置

sentinel提供了非常强大的控制台来提供流控等功能&#xff0c;但是控制台只是临时的配置&#xff0c;想要将流控配置永久的保存&#xff0c;或者在项目启动的时候就加载&#xff0c;不需要手动设置&#xff0c;就需要使用到nacos与sentinel做集成配置。这里都是不变代码&#x…

网络安全技术pat实验 网络安全 实验

&#x1f345; 点击文末小卡片 &#xff0c;免费获取网络安全全套资料&#xff0c;资料在手&#xff0c;涨薪更快 网络安全实验3 前言Kali 常用指令工具教程 ettercap 基本使用 一、口令破解 John the ripper 破解 linux 密码l0phtcrack7 破解 windows 密码John 破解 zip 压…

大模型工具大比拼:SGLang、Ollama、VLLM、LLaMA.cpp 如何选择?

简介&#xff1a;在人工智能飞速发展的今天&#xff0c;大模型已经成为推动技术革新的核心力量。无论是智能客服、内容创作&#xff0c;还是科研辅助、代码生成&#xff0c;大模型的身影无处不在。然而&#xff0c;面对市场上琳琅满目的工具&#xff0c;如何挑选最适合自己的那…

个人简历html网页模板,科技感炫酷html简历模板

炫酷动效登录页 引言 在网页设计中,按钮是用户交互的重要元素之一。这样一款黑色个人简历html网页模板,科技感炫酷html简历模板,设计效果类似科技看板图,可帮您展示技能、任职经历、作品等,喜欢这种风格的小伙伴不要犹豫哦。该素材呈现了数据符号排版显示出人形的动画效…

解决 Mac 只显示文件大小,不显示目录大小

前言 在使用 mac 的时候总是只显示文件的大小&#xff0c;不显示文件夹的大小&#xff0c;为了解决问题可以开启“计算文件夹”。 步骤 1.进入访达 2.工具栏点击“显示”选项&#xff0c;点击 “查看显示选项” 3.勾选 显示“资源库"文件夹 和 计算所有大小 或者点击…

UE5.3 C++ 通过Spline样条实现三维连线,自己UV贴图。

一.制作了基于USplineComponent的画线插件&#xff0c;就是我们常说的样条线。 直接看怎么用&#xff0c;关于插件实现细节&#xff0c;后续会更新&#xff0c;看思路就行。通过ID,管理每一条线。移除删掉上一帧的线条Mesh。第一个点&#xff0c;是本身直接放过去。第二个点是…

[qt5学习笔记]Application Example示例程序源码解析

开发环境问题 vs2022下直接打开ui、ts文件失败 解决办法如下图&#xff0c; 设置designer独立运行。估计是嵌入运行存在些许bug。 同理&#xff0c;ts编辑工具linguist也存在这个问题。 qrc rc的编辑嵌入编辑都正常&#xff0c;但分离式更稳定可靠。 qt creator编译失败 原…

Ubuntu 系统 LVM 逻辑卷扩容教程

Ubuntu 系统 LVM 逻辑卷扩容教程 前言 在 Linux 系统中&#xff0c;LVM&#xff08;Logical Volume Manager&#xff09;是一种逻辑卷管理工具&#xff0c;允许管理员动态调整磁盘空间&#xff0c;而无需重启系统。 本文将详细介绍如何使用 LVM 扩容逻辑卷&#xff0c;以实现…

Javascript网页设计实例:通过JS实现上传Markdown转化为脑图并下载脑图

功能预览 深度与密度测试 对于测试部分&#xff0c;分别对深度和密度进行了测试&#xff1a; 注意&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;只实现了识别Markdown中的#代表的层级&#xff0c;所以不能使用其余标识符&#xff0…

什么是tomcat

什么是tomcat Tomcat 是由Apache软件基金会开发的一个开源的轻量级Web服务器&#xff0c;主要用于部署和运行Java Servlet和JavaServer Pages (JSP) 技术构建的Web应用。以下是关于Tomcat的一些关键点&#xff1a; Java Web应用容器&#xff1a;Tomcat作为Servlet容器&#x…

一.AI大模型开发-初识机器学习

机器学习基本概念 前言 本文主要介绍了深度学习基础&#xff0c;包括机器学习、深度学习的概念&#xff0c;机器学习的两种典型任务分类任务和回归任务&#xff0c;机器学习中的基础名词解释以及模型训练的基本流程等。 一.认识机器学习 1.人工智能和机器学习 人工智能&am…

Redis未授权访问漏洞原理

redis未授权访问漏洞 目录 redis未授权访问漏洞一、Redis介绍二、redis环境安装三、漏洞原理四、漏洞复现4.1 webshell提权4.2redis写入计划任务反弹shell4.3 ssh key免密登录4.4 Redis基于主从复制的RCE方式 五、Redis加固建议 一、Redis介绍 Redis&#xff0c;全称为Remote …

什么是网络安全?网络安全防范技术包括哪些?

伴随着互联网的发展&#xff0c;它已经成为我们生活中不可或缺的存在&#xff0c;无论是个人还是企业&#xff0c;都离不开互联网。正因为互联网得到了重视&#xff0c;网络安全问题也随之加剧&#xff0c;给我们的信息安全造成严重威胁&#xff0c;而想要有效规避这些风险&…

使用VSCODE开发C语言程序

使用vscode配置C语言开发环境 一、安装VSCODE 1、下载vscode ​ 从官方网站&#xff08;https://code.visualstudio.com/Download&#xff09;上&#xff0c;下载windows版本的vscode 2、安装vscode ​ 下载完毕后&#xff0c;按照提示进行安装即可&#xff08;尽可能不要安…

轴承故障特征—SHAP 模型 3D 可视化

往期精彩内容&#xff1a; Python-凯斯西储大学&#xff08;CWRU&#xff09;轴承数据解读与分类处理 基于FFT CNN - BiGRU-Attention 时域、频域特征注意力融合的轴承故障识别模型-CSDN博客 基于FFT CNN - Transformer 时域、频域特征融合的轴承故障识别模型-CSDN博客 P…

ComfyUI多功能插件安装-Comfy UI Manager

原生ComfyUI中的节点内容较少&#xff0c;在使用过程中所需要的很多实用插件没有 可以安装 Comfy UI Manager 以帮助我们更高效率的使用ComfyUI&#xff08;Comfy UI Manager 也相当于一个节点&#xff0c;但是可以管理、更新其他实用节点&#xff09; ComfyUI是由Dr.Lt.Data开…