spring-cloud-stream

系列文章目录

第一章 Java线程池技术应用
第二章 CountDownLatch和Semaphone的应用
第三章 Spring Cloud 简介
第四章 Spring Cloud Netflix 之 Eureka
第五章 Spring Cloud Netflix 之 Ribbon
第六章 Spring Cloud 之 OpenFeign
第七章 Spring Cloud 之 GateWay
第八章 Spring Cloud Netflix 之 Hystrix
第九章 代码管理gitlab 使用
第十章 SpringCloud Alibaba 之 Nacos discovery
第十一章 SpringCloud Alibaba 之 Nacos Config
第十二章 Spring Cloud Alibaba 之 Sentinel
第十三章 JWT
第十四章 RabbitMQ应用
第十五章 RabbitMQ 延迟队列
第十六章 spring-cloud-stream

在这里插入图片描述


文章目录

  • 系列文章目录
    • @[TOC](文章目录)
  • 前言
  • 1、stream设计思想
  • 2、编码常用的注解
  • 3、编码步骤
    • 3.1、添加依赖
    • 3.2、修改配置文件
    • 3.3、生产
    • 3.4、消费
    • 3.5、延迟队列
      • 3.5.1、修改配置文件
      • 3.5.2、生产端
      • 3.5.2、消息确认机制 消费端
  • 总结

前言

https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit
官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。

SpringCloud stream通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

Stream让我们不再关注具体MQ的细节我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换,总的来说Stream能够屏蔽底层消息中间件的差异、降低切换成本,是统一消息的编程模型。

1、stream设计思想

在这里插入图片描述
在这里插入图片描述

  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

2、编码常用的注解

在这里插入图片描述

组成说明
Middleware中间件,目前只支持RabbitMQ和Kafka
BinderBinder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过BInder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
@Input注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener监听队列,用于消费者的队列的消息接收
@EnableBinding指信道channel和exchange绑定在一起

3、编码步骤

3.1、添加依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

3.2、修改配置文件

server:
  port: 8088

spring:
  cloud:
    stream:
      binders: #需要绑定的rabbitmq的服务信息
        defaultRabbit:  #定义的名称,用于bidding整合
          type: rabbit  #消息组件类型
          environment:  #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost   #rabbitmq 服务器的地址
                port: 5672           #rabbitmq 服务器端口
                username: tiger       #rabbitmq 用户名
                password: tiger       #rabbitmq 密码
                virtual-host: tiger_vh  #虚拟路径
      bindings:        #服务的整合处理
        saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
          destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
        saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道
          destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组

3.3、生产

/**
 * 订单消息输出通道处理器
 */
@Component
public interface OrderOutputChannelProcesor {
    @Output("saveOrderOutput")
    MessageChannel saveOrderOutput();
}

@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {
    @Autowired
    @Output("saveOrderOutput")
    private MessageChannel messageChannel;

    public void sentMsg(UserInfo userInfo){
        messageChannel.send(MessageBuilder.withPayload(userInfo).build());
        log.info("消息发送成功:" + userInfo);
    }
}

3.4、消费

/**
 * 订单消息输入通道处理器
 */
@Component
public interface OrderInputChannelProcesor {
    @Input("saveOrderInput")
    SubscribableChannel saveOrderInput();
}
@Slf4j
@EnableBinding(OrderInputChannelProcesor.class)
public class OrderMessageConsumer {
    @StreamListener("saveOrderInput")
    public void receiveMsg(Message<UserInfo> userInfoMessage){
        log.info("接收消息成功:" + userInfoMessage.getPayload());
    }
}

3.5、延迟队列

安装延迟队列插件:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez
下载解压,到plugins目录,执行以下的命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.5.1、修改配置文件

server:
  port: 8088

spring:
  cloud:
    stream:
      binders: #需要绑定的rabbitmq的服务信息
        defaultRabbit:  #定义的名称,用于bidding整合
          type: rabbit  #消息组件类型
          environment:  #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost   #rabbitmq 服务器的地址
                port: 5672           #rabbitmq 服务器端口
                username: tiger       #rabbitmq 用户名
                password: tiger       #rabbitmq 密码
                virtual-host: tiger_vh  #虚拟路径
      bindings:        #服务的整合处理
        saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
          destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
        saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道
          destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
      rabbit:
        bindings: #服务的整合处理
          saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
            producer:
              delayed-exchange: true
          saveOrderInput:
            consumer:
              delayed-exchange: true

3.5.2、生产端

@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {
    @Autowired
    @Output("saveOrderOutput")
    private MessageChannel messageChannel;

    public void sentMsg(UserInfo userInfo){
        messageChannel.send(MessageBuilder.withPayload(userInfo).setHeader("x-delay", 5000).build());
        log.info("消息发送成功:" + userInfo);
    }
}

3.5.2、消息确认机制 消费端

rabbit:
  bindings: #服务的整合处理
    saveOrderInput:
      consumer:
        acknowledge-mode: MANUAL #手动确认
@StreamListener("saveOrderInput")
public void receiveMsg(Message<UserInfo> userInfoMessage){

    log.info("接收消息成功:" + userInfoMessage.getPayload());
    Channel channel = (Channel) userInfoMessage.getHeaders().get(AmqpHeaders.CHANNEL);
    Long delieverTag = (Long) userInfoMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    /*
     * deliveryTag:Channel的消息投递的唯一标识符。
     * multiple:是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息;
     * 如果设置为false,则仅否定应答带指定deliveryTag的单条消息。
     * requeue:被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;
     * 如果设置为false,则消息被丢弃或发送到死信Exchange。
     */
    try {
        channel.basicAck(delieverTag,true);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

定义交换机类型为direct

rabbit:
  bindings: #服务的整合处理
    saveOrderInput:
      consumer:
        bindingRoutingKey: orderRoutingKey
        bindQueue: true
        exchangeType: direct
    saveOrderOutput:
      producer:
        routingKeyExpression: orderRoutingKey
        exchangeType: direct

总结

spring-cloud-stream目前支持RabbitMQ和Kafka,与spring-cloud无缝集成,非常方便。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/135478.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

gma 2.0.3 (2023.11.12) 更新日志

安装 gma 2.0.3 pip install gma2.0.3新增 此版本为 gma 2 功能更新最大的版本&#xff0c;且主要集中在矢量数据处理上。 0.1 io.ReadVector&#xff1a;直接打开矢量数据为Layer&#xff0c;用以简化io.Open.GetLayer 过程。Layer的新增功能如下&#xff1a; 序号功能性质说…

Mac电脑专业raw图像处理 DxO PhotoLab 7中文最新 for mac

DxO PhotoLab 7是一款专业的图像处理软件&#xff0c;为摄影师和摄影爱好者提供了强大而全面的照片处理和编辑功能。 该软件可以处理来自各种相机的RAW格式图像&#xff0c;包括佳能、尼康、索尼、富士等品牌&#xff0c;同时也支持JPEG格式的处理。这使得用户可以在不损失图像…

【Proteus仿真】【STM32单片机】多路温度控制系统

文章目录 一、功能简介二、软件设计三、实验现象联系作者 一、功能简介 本项目使用Proteus8仿真STM32单片机控制器&#xff0c;使用按键、LED、蜂鸣器、LCD1602、DS18B20温度传感器、HC05蓝牙模块等。 主要功能&#xff1a; 系统运行后&#xff0c;默认LCD1602显示前4路采集的…

4.HTML网页开发的工具

4. 网页开发的工具 4.1 快捷键 4.1.1 快速复制一行 快捷键&#xff1a;shiftalt下箭头&#xff08;上箭头&#xff09; 或者ctrlc 然后 ctrlv 4.1.2 选定多个相同的单词 快捷键&#xff1a; ctrld 4.1.3 添加多个光标 快捷键&#xff1a;ctrlalt上箭头&#xff08;下箭头&…

不使用 pip 安装 Python 包

在本文中&#xff0c;我们将学习如何在 Python 中安装没有 pip 的库。 我们还将学习如何使用 conda 命令在 Python 中安装包。 不使用 pip 命令安装 Python 库 在 Python 中&#xff0c;pip 命令是我们系统中安装开源库最常用的方法。 但是&#xff0c;除了 pip 命令之外&…

springboot模板引擎

1.服务端渲染时相比与前后端分离开发 原理是 跳过前端这一层 直接到服务端 通过数据和模板 生成页面返回前端 springboot包含如下模板引擎 典型如thymeleaf 1>导入依赖 2>查看路径 模板页面在 public static final String DEFAULT_PREFIX “classpath:/templates/”; 即…

Zabbix SNMPv3

一、Snmpv3简述 SNMPv3是Simple Network Management Protocol version 3&#xff08;简单网络管理协议第三版&#xff09;的缩写。它是一种网络管理协议&#xff0c;用于监控和管理网络中的设备、系统和应用程序。 相对于之前的版本&#xff0c;SNMPv3具有更强的安全性和扩展…

[HXPCTF 2021]includer‘s revenge

文章目录 方法一前置知识Nginx 在后端 Fastcgi 响应过大产生临时文件竞争包含绕过include_once限制 解题过程 方法二前置知识Base64 Filter 宽松解析iconv filter 解题过程 方法一 NginxFastCGI临时文件 前置知识 Nginx 在后端 Fastcgi 响应过大产生临时文件 www-data用户在n…

网页判断版本更新

一、需求解析 为什么我会想到这个技术呢&#xff0c;是因为我有一次发现&#xff0c;我司的用户在使用网页的时候&#xff0c;经常会出现一个页面放很久&#xff0c;下班也不关这个页面&#xff0c;这样就会导致页面的代码长时间处于不更新的状态。 在使用到一个功能出了bug&a…

消息队列原理和实现

实现原理 消息队列的本质就是在内核态开辟一块内核态的内存&#xff0c;用于存储数据和从这块内存读取数据而已。 实现函数

模拟实现string类——【C++】

W...Y的主页 &#x1f60a; 代码仓库分享 &#x1f495; &#x1f354;前言&#xff1a; 我们已经将STL中的string类重要接口全部认识并熟练掌握&#xff0c;为了让我们对string与C类与对象更深层次的了解&#xff0c;我们这篇博客将string类进行模拟实现。 目录 string类的…

【C++】C++入门详解 I【C++入门 这一篇文章就够了】

C入门 前言一、C关键字&#xff08;C98&#xff09;二、命名空间 namespace&#xff08;一&#xff09;namespace的出现&#xff08;二&#xff09;namespace的定义&#xff08;1&#xff09;namespace 的正常定义&#xff08;2&#xff09;namespace的功能特性1. 命名空间 可嵌…

SharePoint 的 Web Parts 是什么

Web Parts 可以说是微软 SharePoint 的基础组件。 根据微软自己的描述&#xff0c;Web Parts 是 SharePoint 对内容进行构建的基础&#xff0c;可以想想成一块一块的砖块。 我们需要使用这些砖块来完成一个页面的构建。 我们可以利用 Web Parts 在 SharePoint 中添加文本&am…

时序预测 | MATLAB实现基于BP-Adaboost的BP神经网络结合AdaBoost时间序列预测

时序预测 | MATLAB实现基于BP-Adaboost的BP神经网络结合AdaBoost时间序列预测 目录 时序预测 | MATLAB实现基于BP-Adaboost的BP神经网络结合AdaBoost时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 1.MATLAB实现基于BP-Adaboost的BP神经网络结合AdaB…

NZ系列工具NZ08:图表添加标签工具

我的教程一共九套及VBA汉英手册一部&#xff0c;分为初级、中级、高级三大部分。是对VBA的系统讲解&#xff0c;从简单的入门&#xff0c;到数据库&#xff0c;到字典&#xff0c;到高级的网抓及类的应用。大家在学习的过程中可能会存在困惑&#xff0c;这么多知识点该如何组织…

(二)正点原子I.MX6ULL u-boot移植

一、概述 这里使用的是NXP官方2022.04发布的uboot&#xff0c;移植到正点原子阿尔法开发板&#xff08;v2.1&#xff09; u-boot下载&#xff1a;gitgithub.com:nxp-imx/uboot-imx.git 移植是基于NXP的mx6ull_14x14_evk 二、编译NXP官方uboot 进入NXP的u-boot目录 先在Makefile…

通过设置响应头解决跨域问题

网上很多文章都是告诉你直接Nginx添加这几个响应头信息就能解决跨域&#xff0c;当然大部分情况是能解决&#xff0c;但是我相信还是有很多情况&#xff0c;明明配置上了&#xff0c;也同样会报跨域问题。 这大概率是因为&#xff0c;服务端没有正确处理预检请求也就是OPTIONS请…

Word 插入的 Visio 图片显示为{EMBED Visio.Drawing.11} 解决方案

World中&#xff0c;如果我们插入了Visio图还用了Endnote&#xff0c; 就可能出现&#xff1a;{EMBED Visio.Drawing.11}问题 解决方案&#xff1a; 1.在相应的文字上右击&#xff0c;在出现的快捷菜单中单击“切换域代码”&#xff0c;一个一个的修复。 2.在菜单工具–>…

配置交换机将Log发送到日志服务器

文章目录 一、配置说明二、配置步骤推荐阅读 一、配置说明 配置将实现如下&#xff1a; 配置交换机将Log发送到日志服务器。 将信息等级高于等于 debug 的日志信息将会发送到日志服务器上。 允许输出日志信息的模块为default所有应用模块日志信息。 SW1为我们日志源交换机…

【Redis】redis的下载安装

目录 1.window安装 2.Linux安装 下载 解压 移动redis目录 编译 1.window安装 在github 下载redis的安装包 https://github.com/microsoftarchive/redis/releases 下载完后安装相应的目录下&#xff0c;比如我是放在c盘的Program Files下 开启redis&#xff0c;双击运行…