SpringCloud(17)之SpringCloud Stream

一、Spring Cloud Stream介绍

        Spring Cloud Stream是一个框架,用于构建与共享消息系统连接的高度可扩展的事件驱动微服务。该框架提供了一个灵活的编程模型,该模型建立在已经建立和熟悉的Spring习惯用法和最佳实践之上,包括对持久发布/子语义、使用者组和有状态分区的支持。  

        它可以基于 Spring Boot来创建独立的、可用于生产的  Spring应用程序,Spring Cloud Stream为一些供应商的消息   中间件产品提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。通   过使用 Spring Cloud Stream ,可以有效简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。目前 Spring Cloud Stream 支持 RabbitMQ  Kafka 自动化配置。

        目前Spring Cloud Stream只适配以下中间件信息:

二、Spring Cloud Stream 工作流程

        Spring Cloud Stream应用程序由一个与中间件无关的核心组成。应用程序通过在外部代理公开的目的地和代码中的输入/输出参数之间建立绑定来与外部世界通信。建立绑定所需的特定于Broker的详细信息由特定于中间件的Binder实现来处理。

        通过Stream可以很好的屏蔽各个中间件的API差异,它统一了API,生产者通过OUTPUT向消息中间件发 送消息,此时并不需要关心消息中间件是Kafka还是RabbitMQ,不需要关注他们的API,只需要用到Stream的API,这样可以降低学习成本。消费方通过INPUT消费指定的消息,也不需要关注消息中间件 API,架构图如上图: 

        我们对上图的对象进行说明:

  • Application Core:生产者、消费者;
  • inputs:消费者;
  • ouputs:生产者;
  • Binder:绑定器,主要和消息中间件进行绑定操作;
  • Middleware:消息中间件服务;

        

        我们项目中真正应用到Stream,只需要按照如上流程图操作即可;

 生产者:

        1:使用Source绑定消息的输出管道。

        2:通过MessageChannel输出消息。

        3:通过@EnableBinding开启binder,将生产者绑定到指定的MQ服务。

消费者:      

        1:通过@EnableBinding绑定到MQ。

        2:通过Sink绑定到输入数据管道。

        3:@StreamListener监听指定管道数据。

 2.1 Spring Cloud Stream 实战

        

        如上图,当用户行程结束,用户需进入支付操作,当用户支付完成时,我们需要更新订单状态,此时我 们可以让支付系统将支付状态发送到MQ中,订单系统订阅MQ消息,根据MQ消息修改订单状态。我们 将使用 SpringCloud Stream实现该功能。

2.1.1 生产者

1)引入依赖 

   hailtaxi-pay 中引入依赖:

        <!--stream-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

2) 配置MQ服务

 修改 hailtaxi-pay  application.yml 添加如下配置:

server:
  port: 18083
spring:
  application:
    name: hailtaxi-pay
  cloud:
    #Consul配置
    consul:
      host: localhost
      port: 8500
      discovery:
        #注册到Consul中的服务名字
        service-name: ${spring.application.name}
    #Stream
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.211.145
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: payExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置

3)消息输出管道绑定 

/***
 * 负责向MQ发送消息
 */
@EnableBinding(Source.class)
public class MessageSender {

    @Resource
    private MessageChannel output;//消息发送管道

    /***
     * 发送消息
     * @param message
     * @return
     */
    public Boolean send(Object message) {
        //消息发送
        boolean bo = output.send(MessageBuilder.withPayload(message).build());
        System.out.println("*******send message: "+message);
        return bo;
    }
}

 参数说明:

Source.class:绑定一个输出消息管道Channel。

MessageChannel:发送消息对象,默认是DirectWithAttributesChannel,发消息在 AbstractMessageChannel中完成。

MessageBuilder.withPayload:构建消息。

        此时大家可能会有一个疑问?如果我们多个channel,在rabbitMQ中就是说我一个服务有多个交换机该怎么办?

        我们来看下 Source.class里面定义的内容是什么,定义的内容如下:

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}

        所以说如果此时我们要新的管道的话,我们就可以参考Source来定义新的类,然后OUTPUT就定义新的管道名称,然后再配置文件中我们就定义这个新的管道名称。 

4)消息发送 

  com.itheima.pay.controller.TaxiPayController 中创建支付方法用于发送消息,代码如下:

    /***
     * 支付  http://localhost:18083/pay/wxpay/1
     * @return
     */
    @GetMapping(value = "/wxpay/{id}")
    public TaxiPay pay(@PathVariable(value = "id")String id){
        //支付操作
        TaxiPay taxiPay = new TaxiPay(id,310,3);
        //发送消息
        messageSender.send(taxiPay);
        return taxiPay;
    }

2.1.2 消费者 

1)修改配置 

 修改 hailtaxi-order 的核心配置文件 application.yml ,在文件中配置要监听MQ信息:

server:
  port: 18082
spring:
  application:
    name: hailtaxi-order
  zipkin:
    #zipkin服务地址
    base-url: http://localhost:9411
  sleuth:
    sampler:
      probability: 1  #采样值,0~1之间,1表示全部信息都手机,值越大,效率越低
  cloud:
    #Consul配置
    consul:
      host: localhost
      port: 8500
      discovery:
        #注册到Consul中的服务名字
        service-name: ${spring.application.name}
    #Stream
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.211.145
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: payExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit  # 设置要绑定的消息服务的具体设置
          group: paygroup #所属分组

2)消息监听 

  hailtaxi-order 中创建消息监听对象 com.itheima.order.mq.MessageReceiver ,代码如下:

@EnableBinding(Sink.class)
public class MessageReceiver {

    @Value("${server.port}")
    private String port;

    /****
     * 消息监听
     * @param message
     */
    @StreamListener(Sink.INPUT)
    public void receive(String message) {
        System.out.println("消息监听(增加用户积分、修改订单状态)-->" + message+"-->port:"+port);
    }
}

参数说明:

Sink.class:绑定消费者管道。

@StreamListener(Sink.INPUT):监听消息配置,指定了消息为application中的input


1.3 消息分组

         消息分组有2个好处,分别是集群合理消费、数据持久化。

 1.3.1集群消费下的分组

1)分组的意义

        分组在项目中是有非常重大的意义,通常应用于消息并发高、消息堆积的场景,这些场景服务消费方通 常会做集群操作,一旦做集群操作,我们又需要项目中的消费者合理消费,比如用户打车支付完成后, 我们需要增加用户积分同时修改订单状态,如果集群环境中有2台服务器都执行该消费操作,此时用户  积分会增加两次,就会造成非幂等问题。 

 

        此时集群中相同服务应该属于同一个组,同一个组中只允许有一个足节点消费某一个信息,这样就可以 避免费幂等问题的出现。

2)分组实战 

        新增一个 hailtaxi-order消费者节点:

  

        此时运行起来,  18082  18182 节点会同时消费所有数据。 

        修改 hailtaxi-order 的核心配置文件 application.yml ,添加分组: 

 

        此时再次测试,可以发现消费者不会重复消费数据。 

1.3.2 数据持久化

        我们把分组去掉,停掉 hailtaxi-order 服务,然后请求 http://localhost:18083/pay/wxpay/1 送数据,发送完数据后,再启动 hailtaxi-order服务,此时发现没有数据可以消费,这是因为数据没 有持久化,是一种广播模式,如果需要数据持久化,得给每个消费节点添加group组即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/409628.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

《高考》期刊杂志投稿邮箱知网教育类期刊发表

《高考》杂志是由国家新闻出版总署批准的正规教育类期刊。主要宣传高中新课程改革的专业性&#xff0c;是教育管理工作者、高中一线教师交流经验、探讨问题的重要平台&#xff0c;期刊突出政策性、针对性、指导性&#xff0c;是一本以教育科研成果展示为主&#xff0c;兼具教育…

redis数据结构源码分析——压缩列表ziplist(I)

前面讲了跳表的源码分析&#xff0c;本篇我们来聊一聊另外一个重点结构——压缩列表 文章目录 存储结构字节数组结构节点结构 压缩编码zipEntryzlEntry ZIP_DECODE_PREVLENZIP_DECODE_LENGTH API解析ziplistNew(创建压缩列表)ziplistInsert(插入)ziplistDelete(删除)ziplistFi…

一. demo

1. 舞台-场景-控件 import javafx.application.Application; import javafx.scene.Scene; import javafx.scene.control.Button; import javafx.scene.layout.Pane; import javafx.scene.layout.VBox; import javafx.stage.Stage;import java.util.Arrays;public class Main e…

数据结构(算法竞赛、蓝桥杯)--线段树+懒标记

1、B站视频链接&#xff1a;C02【模板】线段树懒标记 Luogu P3372 线段树 1_哔哩哔哩_bilibili 题目链接&#xff1a;P3372 【模板】线段树 1 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn) void build(int p,int l,int r){tr[p]{l,r,w[l],0};if(lr)return;//叶子节点返回int…

快速搭建keepalived+nginx

1.工作原理 keepalived是以VRRP协议为实现基础的,VRRP全称Virtual Router Redundancy Protocol,即虚拟路由冗余协议。 虚拟路由冗余协议,可以认为是实现路由器高可用的协议,即将N台提供相同功能的路由器组成一个路由器组,这个组里面有一个master和多个backup,master上面…

蓝桥杯《修剪灌木》

题目描述 爱丽丝要完成一项修剪灌木的工作。有 N 棵灌木整齐的从左到右排成一排。爱丽丝在每天傍晚会修剪一棵灌木&#xff0c;让灌木的高度变为 0 厘米。爱丽丝修剪灌木的顺序是从最左侧的灌木开始&#xff0c;每天向右修剪一棵灌木。当修剪了最右侧的灌木后&#xff0c;她会…

Java 中常用的数据结构类 API

目录 常用数据结构API 对应的线程安全的api 高可用衡量标准 常用数据结构API ArrayList: 实现了动态数组&#xff0c;允许快速随机访问元素。 import java.util.ArrayList; LinkedList: 实现了双向链表&#xff0c;适用于频繁插入和删除操作。 import java.util.LinkedLis…

【MySQL面试复习】详细说下事务的特性

系列文章目录 在MySQL中&#xff0c;如何定位慢查询&#xff1f; 发现了某个SQL语句执行很慢&#xff0c;如何进行分析&#xff1f; 了解过索引吗&#xff1f;(索引的底层原理)/B 树和B树的区别是什么&#xff1f; 什么是聚簇索引&#xff08;聚集索引&#xff09;和非聚簇索引…

【泰山派RK3566】智能语音助手(一)移植Kaldi语音转文字

文章目录 移植过程硬件资源下载测试 移植过程 参考我的这篇博客 【RV1126】移植kaldi实时语音识别 硬件 资源下载 链接&#xff1a;https://pan.baidu.com/s/1x1udT5eNzzQHoPOTCQ182A?pwdlief 提取码&#xff1a;lief –来自百度网盘超级会员V6的分享 下载的文件里面有一个…

leetcode:46.全排列

1.什么是排列&#xff1f; 有顺序&#xff01;&#xff01; 2.树形结构&#xff1a; 使用used数组进行标记取过的元素&#xff0c;一个元素一个元素地进行取值&#xff0c;取完之后将used数组进行标记。 3.代码实现&#xff1a;&#xff08;循环从i0开始&#xff0c;而不是…

区分服务 DiffServ

目录 区分服务 DiffServ 区分服务的基本概念 区分服务 DiffServ 的要点 每跳行为 PHB DiffServ 定义的两种 PHB 区分服务 DiffServ 区分服务的基本概念 由于综合服务 IntServ 和资源预留协议 RSVP 都较复杂&#xff0c;很难在大规模的网络中实现&#xff0c;因此 IET…

C#常识篇(二)

委托和事件的区别 委托可以认为是对指定签名的函数的引用&#xff0c;通过委托可以实现将函数作为参数传递或者间接调用函数&#xff0c;委托是类型安全的&#xff0c;仅指向与其声明时指定签名相匹配的函数。委托可以分为单播委托和多播委托&#xff0c;二者的区别在于是对单个…

IO 作业 24/2/26

1>思维导图 1> 使用消息队列完成两个进程间相互通信 #include<myhead.h> //定义一个消息类型 struct msgbuf {long mtype; //消息类型char mtext[1024]; //消息正文 }; //定义一个宏&#xff0c;表示消息正文大小 #define MSGSIZE sizeof(struct msgbuf…

深入理解计算机系统学习笔记

2.3整数运算 有时候会发现两个正数相加会得出一个负数&#xff0c;而比较表达式x<y和比较表达式x-y<0会产生不同的结果。这些属性是由于计算机运算的有限性造成的。理解计算机运算的细微之处能够帮助程序员编写更可靠的代码。 2 .3. 1 无符号加法 原理&#xff1a; 在正…

【技术分享】使用nginx完成动静分离➕集成SpringSession➕集成sentinel➕集成seata

&#x1f973;&#x1f973;Welcome 的Huihuis Code World ! !&#x1f973;&#x1f973; 接下来看看由辉辉所写的关于技术点的相关分享吧 目录 &#x1f973;&#x1f973;Welcome 的Huihuis Code World ! !&#x1f973;&#x1f973; 一、 使用nginx完成动静分离 1.下载…

c语言经典测试题5

1.题1 t0; while(printf("*")) { t; if (t<3) break; }关于上述代码描述正确的是&#xff1f; A: 其中循环控制表达式与0等价 B: 其中循环控制表达式与0等价 C: 其中循环控制表达式是不合法的 D: 以上说法都不对 我们来分析一下&#xff1a;printf的返回值…

mac 安装hbuilderx

下载 HBuilderX下载地址: 下载地址 选额mac版本点击下载 安装 如图&#xff0c;将HBuilderX拖到Applications&#xff0c;才是正确的安装姿势。 MacOSX&#xff0c;软件必须安装到/Applications目录&#xff0c;如未安装到此目录&#xff0c;可能会出现插件安装失败、项目创建…

使用Django的admin功能管理数据_vscode

之前的文章 项目 hello_django, app名 hello&#xff0c;已有的model LogMessage&#xff1a; https://blog.csdn.net/weixin_44741835/article/details/136202771?spm1001.2014.3001.5502 参考得到电子书&#xff1a;第八章。 https://www.dedao.cn/ebook/reader?idrEQKv6…

深入理解指针2

各位小伙伴们&#xff0c;我们继续来学习指针&#xff0c;指针和结构体以及动态内存管理对后面的数据结构学习有非常大的帮助&#xff0c;所有我们一定要把这些知识点学会。OK,正式进入学习之旅吧 1.数组名的理解 在上⼀个章节我们在使⽤指针访问数组的内容时&#xff0c;有这…

thinkphp+vue+mysql学生宿舍水电费报修管理系统 0s7h5

本文首先实现了学生宿舍管理系统技术的发展随后依照传统的软件开发流程&#xff0c;最先为系统挑选适用的言语和软件开发平台&#xff0c;依据需求分析开展控制模块制做和数据库查询构造设计&#xff0c;随后依据系统整体功能模块的设计&#xff0c;制作系统的功能模块图、E-R图…