第十一章 Stream消息驱动

Stream消息驱动

gitee:springcloud_study: springcloud:服务集群、注册中心、配置中心(热更新)、服务网关(校验、路由、负载均衡)、分布式缓存、分布式搜索、消息队列(异步通信)、数据库集群、分布式日志、系统监控链路追踪。

1. 消息驱动概述

作用:屏蔽底层消息中间件的差异,降低切换成本,统—消息的编程模型。底层不管是什么中间件如kafka、rabbitmq,Stream可以解决不同中间件的通信。 官网:Spring Cloud Stream

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。
​
应用程序通过 inputs 或者 outputsj来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
​
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
​
但是Stream只支持kafka、rabbitmq。

img

设计思想 标准的MQ:

1.生产者/消费者之间靠消息媒介传递信息内容:Message
2.消息必须走特定的通道:消息通道MessageChannel
3.消息通道里的消息如何被消费呢,谁负责收发处理:消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅

Cloud Stream:

Stream利用Binder来绑定中间件的输入流和输出流。如果系统使用到了两个中间件(kafka、rabbitmq):这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的人—大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

Stream中的消息通信方式遵循了发布-订阅模式:

Topic在Rabbitmq中是Exchange、在kafka中是Topic。

Spring Cloud Stream标准流程套路

img

Middleware:中间件,目前只支持RabbitMQ和Kafka
Binder:是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
@Input:注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output:注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener:监听队列。用于消费者的队列的消息接收
@EnableBinding:指信道channel和exchange绑定在一起

Binder:很方便的连接中间件,屏蔽差异 Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置。 Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

2. 消息驱动之生产者

创建cloud-stream-rabbitmq-provider8801:作为生产者进行发消息模块

  1. pom文件

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  1. application.yaml

server:
  port: 8801
spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: #在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.25.153
                port: 5672
                username: admin
                password: aaaaaa
      bindings: #服务的整合处理
        output: #
          destination: studyExchange  #表示要使用的Exchange名称定义
          content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit #设置要绑定的消.息服务的具体设置
eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
    register-with-eureka: true
    fetch-registry: true
  instance:
    lease-renewal-interval-in-seconds: 2
    lease-expiration-duration-in-seconds: 5
    instance-id: send-8801.com
    prefer-ip-address: true
  1. 主启动类

@SpringBootApplication
@EnableEurekaClient
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class,args);
    }
}
  1. service层

public interface IMessageProvider {
    String send();
}
@EnableBinding(Source.class)  //定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider {
​
    @Resource
    private MessageChannel output;  //消息发送管道
​
    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("********serial:"+serial);
        return null;
    }
}
  1. controller层

@RestController
public class SendMessageController {
​
    @Resource
    private IMessageProvider messageProvider;
​
    @GetMapping(value = "/sendMessage")
    public String sendMessage(){
        return messageProvider.send();
    }
}

测试:

3. 消息驱动之消费者

创建cloud-stream-rabbitmq-consumer8802,作为消息接收模块

  1. pom文件

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
  1. application.yml

server:
  port: 8802
spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: #在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.25.153
                port: 5672
                username: admin
                password: aaaaaa
      bindings: #服务的整合处理
        input: #
          destination: studyExchange  #表示要使用的Exchange名称定义
          content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit #设置要绑定的消.息服务的具体设置
eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka
    register-with-eureka: true
    fetch-registry: true
  instance:
    lease-renewal-interval-in-seconds: 2
    lease-expiration-duration-in-seconds: 5
    instance-id: receive-8802.com
    prefer-ip-address: true
  1. controller层

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageController {
​
    @Value("${server.port}")
    private String serverPort;
​
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("消费者1号,------>接收到的消息:"+message.getPayload()+"\t port:"+serverPort);
    }
}
  1. 主启动类

@SpringBootApplication
@EnableEurekaClient
public class ConsumerMQMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerMQMain8802.class,args);
    }
}

测试:

启动loccalhost:8801/sendMessage就可以了,消费者就是一个监听器,有message就消费。

4. 分组消费与持久化

根据cloud-stream-rabbitmq-consumer8802创建8803项目,运行暴露问题:


消息重复消费和消息持久化问题,需要进行分组操作。注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。

解决重复消费方法:加入同一个组(下图是不同分组的情况)

cloud-stream-rabbitmq-consumer8802和8803设置不同分组yicaiA/B

server:
  port: 8803
spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: #在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.25.153
                port: 5672
                username: admin
                password: aaaaaa
      bindings: #服务的整合处理
        input: #
          destination: studyExchange  #表示要使用的Exchange名称定义
          content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit #设置要绑定的消.息服务的具体设置
          group: yicaiB
server:
  port: 8802
spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: #在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.25.153
                port: 5672
                username: admin
                password: aaaaaa
      bindings: #服务的整合处理
        input: #
          destination: studyExchange  #表示要使用的Exchange名称定义
          content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit #设置要绑定的消.息服务的具体设置
          group: yicaiA

cloud-stream-rabbitmq-consumer8802和8803设置同一个组yicaiA

server:
  port: 8802
spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: #在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.25.153
                port: 5672
                username: admin
                password: aaaaaa
      bindings: #服务的整合处理
        input: #
          destination: studyExchange  #表示要使用的Exchange名称定义
          content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit #设置要绑定的消.息服务的具体设置
          group: yicaiA

测试:

持久化 加上group就算实现类持久化。所谓的持久化就是如果没有分组,一个服务发送消息,其他服务由于没有分组,如果其他哪些服务断开,又继续重启,这样就会导致以前那些消息丢失。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/276847.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

32阵元 MVDR和DREC DOA估计波束方向图对比

32阵元 MVDR和DREC DOA估计波束方向图对比 一、原理 MVDR原理&#xff1a;https://zhuanlan.zhihu.com/p/457528114 DREC原理&#xff08;无失真响应特征干扰相消器&#xff09;&#xff1a;http://radarst.ijournal.cn/html/2019/3/201903018.html 主要参数&#xff1a; 阵…

Idea如何从磁盘中应用 下载好的插件流程,安装zip压缩包。

1、将下载的插件文件&#xff08;通常是一个ZIP文件&#xff09;复制到IntelliJ IDEA的“plugins”文件夹中。 IDEA版本 2、重启IntelliJ IDEA。 3、在设置窗口中&#xff0c;选择左侧的“Plugins”。 4、选择之前复制到“plugins”文件夹中的插件文件&#xff0c;点击“OK”按…

大数据与人工智能|万物皆算法(第三节)

要点一&#xff1a;数据与智能的关系 1. 一切的核心都是数据&#xff0c;数据和智能之间是密切相关的。 数据是对客观现实的描述&#xff0c;而信息是数据转化而来的。 例如&#xff0c;24是数据&#xff0c;但说“今天的气温是24摄氏度”是信息&#xff0c;而说“班可以分成24…

Unity Shader UVLightReveal (紫外线显示,验钞效果)

Unity Shader UVLightReveal &#xff08;紫外线显示&#xff0c;验钞效果&#xff09; UVLight Reveal 实现验钞机的效果实现方案操作实现1.Light2.将另一个图形加入3.加上图形效果4.加上灯光的颜色自定义判定 源码 UVLight Reveal 实现验钞机的效果 大家应该都有见过验钞机验…

电脑系统坏了用U盘重装系统教程

我们平时办公、学习都会用到电脑&#xff0c;如果电脑系统坏了&#xff0c;就会影响自己正常使用电脑&#xff0c;这时候就可以通过U盘来重装一个正常的操作系统。如果您不知道具体的重装操作步骤&#xff0c;那么可以参考下面小编分享的利用U盘快速完成操作系统重装的步骤介绍…

VSCODE : SSH远程配置+免密登录

SSH基础配置 填入地址&#xff0c;回车 ssh userhost-or-ip 然后选择默认的配置&#xff0c;回车&#xff0c;得到以下结果&#xff1a; 点击链接 选择远程的系统 输入密码 免密登录 生成SSH密钥&#xff1a; 首先&#xff0c;确保你已经在本地生成了SSH密钥。你可以使…

在电脑上免费分区的 5 个有效磁盘分区软件工具

磁盘分区可能是一个脆弱而复杂的过程&#xff0c;磁盘崩溃或用户设备受到病毒攻击的风险很高。因此&#xff0c;它们很难由用户单独或手动管理。本文详细介绍了可以帮助简化磁盘分区过程的不同软件工具、它们的功能和优点。那么让我们开始吧。 什么是磁盘分区工具&#xff1f; …

企业级依赖管理: 深入解读 Maven BOM

一、背景 当开发者在一个大型项目中使用 Maven 进行依赖管理时&#xff0c;项目往往会包含多个模块或子项目&#xff0c;并且这些模块会共享相同的依赖项。但是&#xff0c;不同模块可能会独立地指定各自的依赖版本&#xff0c;这可能导致以下问题&#xff1a; 依赖版本不一致…

116基于matlab的盲源信号分离

基于matlab的盲源信号分离。FASTICA方法&#xff0c;能够很好的将信号解混&#xff0c;可以替换数据进行分析。具有GUI界面&#xff0c;可以很好的进行操作。程序已调通&#xff0c;可直接运行。 116matlab盲源信号分离FASTICA (xiaohongshu.com)

java对象整理

1.对象的创建过程 首先class文件加载到内存中 这个过程如下 “加载”是“类加载”(Class Loading)过程的第一步。这个加载过程主要就是靠类加载器实现的&#xff0c; 包括用户自定义类加载器。 加载到内存后做的事情 申请对象内存 成员变量赋默认值 调用构造方法 成员变量顺序…

nginx报错upstream sent invalid header

nginx报错upstream sent invalid header 1.报错背景 最近由于nginx 1.20的某个漏洞需要升级到nginx1.25的版本。在测试环境升级完nginx后&#xff0c;发现应用直接报错502 bad gateway了。 然后查看nginx的errlog&#xff0c;发现&#xff1a; upstream sent invalid head…

【计算机视觉】角点检测(Harris、SIFT)

Harris 角点指的是窗口延任意方向移动&#xff0c;都有很大变化量的点。 用数学公式表示为&#xff1a; E(u,v)反映的移动后窗口的差异&#xff0c;w(x,y)为每个像素的点权值&#xff0c;I(xu,yv)是移动的像素值&#xff0c;I(x,y)是移动前的像素值。 将E(u,v)进行泰勒展开&am…

uni-appcss语法

锋哥原创的uni-app视频教程&#xff1a; 2023版uniapp从入门到上天视频教程(Java后端无废话版)&#xff0c;火爆更新中..._哔哩哔哩_bilibili2023版uniapp从入门到上天视频教程(Java后端无废话版)&#xff0c;火爆更新中...共计23条视频&#xff0c;包括&#xff1a;第1讲 uni…

二-从C到C++

本章会介绍一些C拓展的非面向对象的功能 引用 1 概念 引用从一定程度上讲是指针的平替&#xff0c;几乎被所有的面向对象语言所使用。引用相当于对某一目标变量起“别名”。 操作这个别名&#xff0c;与操作原变量一样。&#xff08;操作同一块地址&#xff09;不能有相同别名 …

ssrf之dict协议和file协议

1.dict协议 dict是什么协议呢&#xff1f; 定义&#xff1a;词典网络协议&#xff0c;在RFC 2009中进行描述。它的目标是超越Webster protocol&#xff0c;并允许客户端在使 用过程中访问更多字典。Dict服务器和客户机使用TCP端口2628。 官方介绍&#xff1a;http://dict.o…

文件监控-IT安全管理软件

文件监控和IT安全管理软件是用于保护企业数据和网络安全的工具。这些工具可以帮助企业监控文件的变化&#xff0c;防止未经授权的访问和修改&#xff0c;并确保数据的安全性和完整性。 一、具有哪些功能 文件监控软件可以实时监控文件系统的活动&#xff0c;包括文件的创建、修…

k8s配置安装ingress服务

k8s配置安装ingress服务 在kuboard页面,网络安装 安装完配置名称保存 apiVersion: networking.k8s.io/v1 kind: Ingress metadata:namespace: testname: pipeline spec:ingressClassName: ingressrules:- host: test.pipeline.comhttp:paths:- path: /pathType: Prefixbacke…

3D视觉-结构光测量法

概述 结构光测量法是一种通过光学投射模块将具有编码信息的结构光投射到物体表面&#xff0c;在被测物表面上形成由被测物体表面形状调制的光条图像&#xff0c;再由图像采集系统采集被测物表面漫反射的光条图像&#xff0c;通过高精度算法处理后&#xff0c;得出被测物表面的三…

oracle与mysql的分析函数(窗口函数)

分析函数定义 在SQL语句中&#xff0c;很多查询语句需要进行GROUP BY分组汇总&#xff0c;但是一旦经过分组&#xff0c;SELECT返回的记录数就会减少。为了保留所有原始行记录&#xff0c;并且仍可以进行分组数据分析&#xff0c;分析函数应运而生。 Oracle 8i 版本开始支持窗…

WebService

调试工具&#xff1a;Postman、SoapUI Soap WebService :.net WCF 、Java CFX WebService三要素&#xff1a; SOAP&#xff08;Simple Object Access Protocol&#xff09;&#xff1a;用来描述传递信息的格式&#xff0c; 可以和现存的许多因特网协议和格式结合使用&#x…