第十六章 Spring cloud stream应用

文章目录

  • 前言
  • 1、stream设计思想
  • 2、编码常用的注解
  • 3、编码步骤
    • 3.1、添加依赖
    • 3.2、修改配置文件
    • 3.3、生产
    • 3.4、消费
    • 3.5、延迟队列
      • 3.5.1、修改配置文件
      • 3.5.2、生产端
      • 3.5.2、消息确认机制 消费端


在这里插入图片描述

前言

https://github.com/spring-cloud/spring-cloud-stream-binder-rabbit

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。通过我们配置来binding(绑定),而Spring Cloud Stream的binder对象负责与消息中间件交互。
SpringCloud stream通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
Stream让我们不再关注具体MQ的细节我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换,总的来说Stream能够屏蔽底层消息中间件的差异、降低切换成本,是统一消息的编程模型。

1、stream设计思想


  • Binder:很方便的连接中间件,屏蔽差异
  • Channel:通道是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
  • Source和Sink:简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

2、编码常用的注解

组成说明
Middleware中间件,目前只支持RabbitMQ和Kafka
BinderBinder是应用与消息中间件之间的封装,目前实现了Kafka和RabbitMQ的Binder,通过BInder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现。
@Input 注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EnableBinding 指信道channel和exchange绑定在一起

3、编码步骤

3.1、添加依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

3.2、修改配置文件

server:
  port: 8088

spring:
  cloud:
    stream:
      binders: #需要绑定的rabbitmq的服务信息
        defaultRabbit:  #定义的名称,用于bidding整合
          type: rabbit  #消息组件类型
          environment:  #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost   #rabbitmq 服务器的地址
                port: 5672           #rabbitmq 服务器端口
                username: tiger       #rabbitmq 用户名
                password: tiger       #rabbitmq 密码
                virtual-host: tiger_vh  #虚拟路径
      bindings:        #服务的整合处理
        saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
          destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
        saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道
          destination: exchange-saveOrder     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组

3.3、生产

/**
 * 订单消息输出通道处理器
 */
@Component
public interface OrderOutputChannelProcesor {
    @Output("saveOrderOutput")
    MessageChannel saveOrderOutput();
}
@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {
    @Autowired
    @Output("saveOrderOutput")
    private MessageChannel messageChannel;

    public void sentMsg(UserInfo userInfo){
        messageChannel.send(MessageBuilder.withPayload(userInfo).build());
        log.info("消息发送成功:" + userInfo);
    }
}

3.4、消费

/**
 * 订单消息输入通道处理器
 */
@Component
public interface OrderInputChannelProcesor {
    @Input("saveOrderInput")
    SubscribableChannel saveOrderInput();
}
@Slf4j
@EnableBinding(OrderInputChannelProcesor.class)
public class OrderMessageConsumer {
    @StreamListener("saveOrderInput")
    public void receiveMsg(Message<UserInfo> userInfoMessage){
        log.info("接收消息成功:" + userInfoMessage.getPayload());
    }
}

3.5、延迟队列

安装延迟队列插件:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.11.1/rabbitmq_delayed_message_exchange-3.11.1.ez

下载解压,到plugins目录,执行以下的命令:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.5.1、修改配置文件

server:
  port: 8088

spring:
  cloud:
    stream:
      binders: #需要绑定的rabbitmq的服务信息
        defaultRabbit:  #定义的名称,用于bidding整合
          type: rabbit  #消息组件类型
          environment:  #配置rabbimq连接环境
            spring:
              rabbitmq:
                host: localhost   #rabbitmq 服务器的地址
                port: 5672           #rabbitmq 服务器端口
                username: tiger       #rabbitmq 用户名
                password: tiger       #rabbitmq 密码
                virtual-host: tiger_vh  #虚拟路径
      bindings:        #服务的整合处理
        saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
          destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
        saveOrderInput: #生产者绑定,这个是消息通道的名称---> 保存订单输入通道
          destination: exchange-saveOrder-delay     #exchange名称,交换模式默认是topic;把SpringCloud stream的消息输出通道绑定到RabbitMQ的exchange-saveOrder交换器。
          content-type: application/json      #设置消息的类型,本次为json
          default-binder: defaultRabbit
          group: saveOrderGroup               #分组
      rabbit:
        bindings: #服务的整合处理
          saveOrderOutput:    #这个是消息通道的名称 --->保存订单输出通道
            producer:
              delayed-exchange: true
          saveOrderInput:
            consumer:
              delayed-exchange: true

3.5.2、生产端

@Slf4j
@EnableBinding(OrderOutputChannelProcesor.class)
public class OrderMessageProducer {
    @Autowired
    @Output("saveOrderOutput")
    private MessageChannel messageChannel;

    public void sentMsg(UserInfo userInfo){
        messageChannel.send(MessageBuilder.withPayload(userInfo).setHeader("x-delay", 5000).build());
        log.info("消息发送成功:" + userInfo);
    }
}

3.5.2、消息确认机制 消费端

rabbit:
  bindings: #服务的整合处理
    saveOrderInput:
      consumer:
        acknowledge-mode: MANUAL #手动确认
@StreamListener("saveOrderInput")
public void receiveMsg(Message<UserInfo> userInfoMessage){

    log.info("接收消息成功:" + userInfoMessage.getPayload());
    Channel channel = (Channel) userInfoMessage.getHeaders().get(AmqpHeaders.CHANNEL);
    Long delieverTag = (Long) userInfoMessage.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
    /*
     * deliveryTag:Channel的消息投递的唯一标识符。
     * multiple:是否否定应答多条消息。如果设置为true,则否定应答带指定deliveryTag的消息及该deliveryTag之前的多条消息;
     * 如果设置为false,则仅否定应答带指定deliveryTag的单条消息。
     * requeue:被否定应答的消息是否重入队列。如果设置为true,则消息重入队列;
     * 如果设置为false,则消息被丢弃或发送到死信Exchange。
     */
    try {
        channel.basicAck(delieverTag,true);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

定义交换机类型为direct

rabbit:
  bindings: #服务的整合处理
    saveOrderInput:
      consumer:
        bindingRoutingKey: orderRoutingKey
        bindQueue: true
        exchangeType: direct
    saveOrderOutput:
      producer:
        routingKeyExpression: '''orderRoutingKey'''
        exchangeType: direct


在这里插入图片描述



本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/352694.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

GPT-SoVITS 本地搭建踩坑

GPT-SoVITS 本地搭建踩坑 前言搭建下载解压VSCode打开安装依赖包修改内容1.重新安装版本2.修改文件内容 运行总结 前言 传言GPT-SoVITS作为当前与BertVits2.3并列的TTS大模型&#xff0c;于是本地搭了一个&#xff0c;简单说一下坑。 搭建 下载 到GitHub点击此处下载 http…

【网站项目】基于SSM的246品牌手机销售信息系统

&#x1f64a;作者简介&#xff1a;拥有多年开发工作经验&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的项目或者毕业设计。 代码可以私聊博主获取。&#x1f339;赠送计算机毕业设计600个选题excel文件&#xff0c;帮助大学选题。赠送开题报告模板&#xff…

蓝桥杯——每日一练(简单题)

题目 给定n个十六进制正整数&#xff0c;输出它们对应的八进制数。 解析 一、通过input&#xff08;&#xff09;函数获得需要转化的数字个数 二、for循环的到数字 三、for循环先将16进制转化为10进制&#xff0c;再输出8进制 代码 运行结果

直线拟合(支持任意维空间的直线拟合,附代码)

文章目录 一、问题描述二、推导步骤三、 M A T L A B MATLAB MATLAB代码 一、问题描述 给定一系列的三维空间点 ( x i , y i , z i ) , i 1 , 2 , . . . , n (x_i,y_i,z_i),i1,2,...,n (xi​,yi​,zi​),i1,2,...,n&#xff0c;拟合得到直线的方程。本文的直线拟合方法适用于任…

如何用一根网线和51单片机做简单门禁[带破解器]

仓库:https://github.com/MartinxMax/Simple_Door 支持原创是您给我的最大动力… 原理 -基础设备代码程序- -Arduino爆破器程序 or 51爆破器程序- 任意选一个都可以用… —Arduino带TFT屏幕——— —51带LCD1602——— 基础设备的最大密码长度是0x7F&#xff0c;因为有一位…

10.Golang中的map

目录 概述map实践map声明代码 map使用代码 结束 概述 map实践 map声明 代码 package mainimport ("fmt" )func main() {// 声明方式1var map1 map[string]stringif map1 nil {fmt.Println("map1为空")}// 没有分配空间&#xff0c;是不能使用的// map…

Vulnhub-dc6

信息收集 # nmap -sn 192.168.1.0/24 -oN live.port Starting Nmap 7.94 ( https://nmap.org ) at 2024-01-25 14:39 CST Nmap scan report for 192.168.1.1 Host is up (0.00075s latency). MAC Address: 00:50:56:C0:00:08 (VMware) Nmap scan report for 192.168.1.2…

IS-IS:10 ISIS路由渗透

ISIS的非骨干区域&#xff0c;无明细路由&#xff0c;容易导致次优路径问题。可以引入明细路由。 在IS-IS 网络中&#xff0c;所有的 level-2 和 level-1-2 路由器构成了一个连续的骨干区域。 level-1区域必须且只能与骨干区域相连&#xff0c;不同 level-1 区域之间不能直接…

.NET高级面试指南专题一【委托和事件】

在C#中&#xff0c;委托&#xff08;Delegate&#xff09;和事件&#xff08;Event&#xff09;是两个重要的概念&#xff0c;它们通常用于实现事件驱动编程和回调机制。 委托定义&#xff1a; 委托是一个类&#xff0c;它定义了方法的类型&#xff0c;使得可以将方法当作另一个…

SpringMVC第六天(拦截器)

概念 拦截器(Interceptor)是一种动态拦截方法调用的机制&#xff0c;在SpringMVC中动态拦截控制器方法的执行 作用&#xff1a; 在指定的方法调用前后执行预先设定的代码 阻止原始方法的执行 拦截器与过滤器的区别 归属不同&#xff1a;Filter属于Servlet技术&#xff0c;I…

递归方法猴子吃桃问题

public class A {public static void main(String[] args) {System.out.println("第一天有&#xff1a;"f(1)"个");System.out.println("第二天有&#xff1a;"f(2)"个");System.out.println(".....");System.out.println(&…

【揭秘】ForkJoinTask全面解析

内容摘要 ForkJoinTask的显著优点在于其高效的并行处理能力&#xff0c;它能够将复杂任务拆分成多个子任务&#xff0c;并利用多核处理器同时执行&#xff0c;从而显著提升计算性能&#xff0c;此外&#xff0c;ForkJoinTask还提供了简洁的API和强大的任务管理机制&#xff0c…

保姆级教学:Java项目从0到1部署到云服务器

目录 1、明确内容 2、apt 2.1、apt 语法 2.2、常用命令 2.3、更新apt 3、安装JDK17 4、安装MySQL 4.1、安装 4.2、检查版本及安装位置 4.3、初始化MySQL配置⭐ 4.4、检查状态 4.5、配置远程访问⭐ 4.6、登录MySQL 4.7、测试数据库 4.8、设置权限与密码⭐ 5、安…

cmake工具的安装

1、简介 CMake 是一个开源的、跨平台的自动化建构系统。它用配置文件控制编译过程的方式和Unix的make相似&#xff0c;只是CMake并不依赖特定的编译器。CMake并不直接建构出最终的软件&#xff0c;而是产生标准的建构文件&#xff08;如 Unix 的 Makefile 或 Windows Visual C …

GPT 如何不挂VPN使用

1、下载 Home | Tampermonkey 将下载的文件tampermonkey_stable.crx 拖到上面的扩展程序里面 2、登录Greasy Fork - 安全、实用的用户脚本大全 搜索自己想要使用的东西&#xff0c;如GPT 找到 CHAT网页增强了 点击按安装&#xff0c;然后打开使用方法里面的 网址就可以使用

day04 两两交换链表中的节点、删除链表倒数第N个节点、链表相交、环形链表II

题目链接&#xff1a;leetcode24-两两交换链表中的节点, leetcode19-删除链表倒数第N个节点, leetcode160-链表相交, leetcode142-环形链表II 两两交换链表中的节点 基础题没有什么技巧 解题思路见代码注释 时间复杂度: O(n) 空间复杂度: O(1) Go func swapPairs(head *Li…

JavaEE-自定义SSM-编写核心-解析yml文件

3.3.1 加载yml文件 编写yaml工厂&#xff0c;用于加载yml文件 package com.czxy.yaml;import java.io.InputStream;/*** 用于处理 application.yml文件* 1. 加载application.yml文件* 2. yaml工具类进行解析* Map<String, Map<String, Map<....>> >* …

[数据结构]-哈希

前言 作者&#xff1a;小蜗牛向前冲 名言&#xff1a;我可以接受失败&#xff0c;但我不能接受放弃 如果觉的博主的文章还不错的话&#xff0c;还请点赞&#xff0c;收藏&#xff0c;关注&#x1f440;支持博主。如果发现有问题的地方欢迎❀大家在评论区指正 本期学习目标&…

代码随想录刷题笔记-Day12

1. 二叉树的递归遍历 144. 二叉树的前序遍历https://leetcode.cn/problems/binary-tree-preorder-traversal/94. 二叉树的中序遍历https://leetcode.cn/problems/binary-tree-inorder-traversal/145. 二叉树的后续遍历https://leetcode.cn/problems/binary-tree-postorder-tra…

混淆矩阵、准确率、查准率、查全率、DSC、IoU、敏感度的计算

1.背景介绍 在训练的模型的时候&#xff0c;需要评价模型的好坏&#xff0c;就涉及到混淆矩阵、准确率、查准率、查全率、DSC、IoU、敏感度的计算。 2、混淆矩阵的概念 所谓的混淆矩阵如下表所示&#xff1a; TP:真正类&#xff0c;真的正例被预测为正例 FN:假负类&#xf…