springCould中的Stream-从小白开始【12】

🥚今日鸡汤🥚

        见过一些人,他们朝九晚五😭,有时也要加班,却能把生活过得很😎有趣。他们有自己的爱好,不怕独处。他们有自己的坚持,哪怕没人在乎。🤦‍♂️

                                                               开心一点😁

                                                               认真一点🤔

                                                               努力一点🫡

目录

😶‍🌫️1.为什么引入Stream

🥚2.什么是Stream 

🧇3.Steam设计思想 

🥓4.案例说明 

🧂5.重复消费 


1.为什么引入Stream🥚🥚🥚

  • 屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型

1.1无感知的使用消息中间件

Stream解决了开发人员无感知的使用消息中间件的问题,因为Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知。

1.2中间件和服务的高度解耦

Spring Cloud Stream进行了配置隔离,只需要调整配置,开发中可以动态的切换中间件(如rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

2.什么是Stream 🥚🥚🥚

  • 官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架
  • 应用程序通过inputs或者outputs来与Spring Cloud Stream中binder对象交互。
  • 通过我们配置binding(绑定),而Spring Cloud Stream的 binder对象负责与消息中间件交互

所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。

3.Steam设计思想🥚🥚🥚 

  • 通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
  • 通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
  • 通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离

4.案例说明 🥚🥚🥚

  • cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块
  • cloud-stream-rabbitmq-consumer8802,作为消息接收模块
  • cloud-stream-rabbitmq-consumer8803,作为消息接收模块

4.1消息驱动-生产者

1.加pom

   <dependencies>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-web</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-actuator</artifactId>
       </dependency>
       <dependency>
           <groupId>org.springframework.boot </groupId>
           <artifactId>spring-boot-starter-test</artifactId>
       </dependency>
       <!--基础依赖-->
       <dependency>
           <groupId>org.projectlombok</groupId>
           <artifactId>lombok</artifactId>
       </dependency>
       <!--eureka客户端-->
       <dependency>
           <groupId>org.springframework.cloud</groupId>
           <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
       </dependency>
       <!--消息驱动-->
       <dependency>
           <groupId>org.springframework.cloud</groupId>
           <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
       </dependency>
   </dependencies>

2.改yml

  • 注意:小张的Rabbitmq是在Linux上的,所以配置如下:
server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  rabbitmq:
    host: 192.168.20.129
    port: 5672
    username: root
    password: 123456
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
      bindings:
        output:
          destination: studyExchange
          content-type: application/json
          binder: defaultRabbit

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka,http://eureka7003.com:7003/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: send-8801.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

3.主启动类

@SpringBootApplication
public class StreamMqMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMqMain8801.class);

    }
}

4.业务类

  • 1.创建接口
  • 2.创建接口实现类
  • @EnableBinding:Spring Cloud Stream中用来启用消息传递功能的注释。
  • 它用于将应用程序绑定到消息传递系统(例如,Apache Kafka, RabbitMQ),并声明用于发送和接收消息的输入和输出通道。
  • 通过使用@EnableBinding,您可以定义应用程序所需的通道和消息处理程序
@EnableBinding(Source.class)//定义消息的推送管道
public class IMessageProviderImpl implements IMessageProvider {

    @Autowired
    private MessageChannel output; //消息发送管道

    @Override
    public String send() {
        String serial= UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("======serial:"+serial);
        return null;
    }
}

5.测试

  • 1.浏览器192.168.20.129:15672访问RabbitMQ
  • 2.localhost:8801/sendMessage访问

4.2消息驱动-消费者

1.建模块

  • 1.在父工程下创建模块cloud-stream-rabbitmq-consumer8802,作为消息接收模块
  • 2.注意jdk和maven版本号

2.加pom

  • 1.springboot依赖
  • 2.通用依赖
  • 3.eureka客户端依赖
  • 4.消息驱动rabbitmq

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot </groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <!--基础依赖-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--eureka客户端-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!--消息驱动-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
    </dependencies>

3.添yml

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  rabbitmq:
    host: 192.168.20.129
    port: 5672
    username: root
    password: 123456
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
      bindings:
        input:
          destination: studyExchange
          content-type: application/json
          binder: defaultRabbit

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka,http://eureka7002.com:7002/eureka,http://eureka7003.com:7003/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: receive-8802.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

4.主启动类

@SpringBootApplication
public class StreamMqMain8802 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMqMain8802.class);
    }
}

5.业务类

  • 1.@StreamListener注解是Spring Cloud Stream框架提供的一个注解,用于定义一个消息监听器
  • 2.通过使用@StreamListener注解,可以将一个方法标记为消息的消费者并指定该方法要监听的消息通道
  • 3.当有消息到达指定的通道时,该方法会被自动触发执行,从而处理这个消息。
  • 4.@StreamListener注解通常与@EnableBinding注解一起使用,用于指定所要绑定的消息通道。
  • 5.@EnableBinding注解用于绑定消息通道与应用程序中的输入输出接口,@StreamListener注解则用于标记一个方法作为消息的消费者。
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("消费者1---接受消息:"+message.getPayload()+",port:"+serverPort);

    }

}

6.测试

  • 1.使用8801生产者发送消息
  • 2.使用8802消费者接受消息

5.重复消费 🥚🥚🥚

问题描述:

  • 1.根据8802,重新创建cloud-stream-rabbitmq-consumer8803,作为消息接收模块
  • 2.8801生产者发送消息
  • 3.8802,8803都可以接收到

 如果一个订单同时被两个服务获取到,就会造成数据错误

注意:在Stream中处于同一个group中的多个消费者竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。

5.1自定义分组

在消费者端添加group配置:分为xzA,xzB

5.2轮询分组 

8802/8803实现了轮询分组,每次只有一个消费者8801模块的发的消息只能被8802或8803其中一个接收到,这样避免了重复消费。

8802,8803的group配置相同名称,重新启动 ,使用8801发送两条消息,8802接受一条,8803接收一条

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/312494.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

靶机实战(10):OSCP备考之VulnHub Tre 1

靶机官网&#xff1a;Tre: 1[1] 实战思路&#xff1a; 一、主机发现二、端口发现&#xff08;服务、组件、版本&#xff09;三、漏洞发现&#xff08;获取权限&#xff09; 8082端口/HTTP服务 组件漏洞URL漏洞&#xff08;目录、文件&#xff09;80端口/HTTP服务 组件漏洞URL漏…

华为ipv4+ipv6双栈加isis多拓扑配置案例

实现效果&#xff1a;sw1中的ipv4和ipv6地址能ping通sw2中的ipv4和ipv6地址 R2-R4为存IPV4连接&#xff0c;其它为ipv6和ipv4双连接 sw1 ipv6 interface Vlanif1 ipv6 enable ip address 10.0.11.1 255.255.255.0 ipv6 address 2001:DB8:11::1/64 interface MEth0/0/1 inter…

K8S的dashboard使用账号密码登录

原文网址&#xff1a;K8S的dashboard使用账号密码登录-CSDN博客 简介 本文介绍K8S的dashboard使用账号密码登录的方法。 ----------------------------------------------------------------------------------------------- 分享Java真实高频面试题&#xff0c;吊打面试官&…

服务器数据传输安全如何保障?保障意义是什么?

数据安全&#xff0c;是指通过采取必要措施确保数据处于有效保护和合法利用的状态&#xff0c;以及具备保障持续安全状态的能力。数据安全应保证数据生产、存储、传输、访问、使用、销毁、公开等全过程的安全&#xff0c;并保证数据处理过程的保密性、完整性、可用性。无论是互…

网络安全B模块(笔记详解)- 隐藏信息探索

隐藏信息探索 1.访问服务器的FTP服务,下载图片QR,从图片中获取flag,并将flag提交; ​ 通过windows电脑自带的图片编辑工具画图将打乱的二维码分割成四个部分,然后将四个部分通过旋转、移动拼接成正确的二维码 ​ 使用二维码扫描工具CQR.exe扫描该二维码 ​ 获得一串…

MT8766安卓核心板/开发板_MTK联发科4G安卓手机主板方案定制开发

MT8766采用台积电 12 nm FinFET 制程工艺&#xff0c;4*A53架构&#xff0c;Android 9.0操作系统&#xff0c;搭载2.0GHz 的 Arm NEON 引擎。提供了支持最新 OpenOS 及其要求苛刻的应用程序所需的处理能力&#xff0c;专为具有全球蜂窝连接的高移动性和功能强大的平板设备而设计…

菱形以及各种组合图形讲解(*#@¥$)

引言&#xff1a; ***形对于新手了解循环以及嵌套循环帮助是非常大的。&#xff08;以下的题各题之间有关联&#xff09; 我们最终目的&#xff0c;就是会编程写菱形&#xff1b;看下面的图片 解题思路&#xff1a;运用拆分法&#xff0c;我们将菱形分为4个部分&#xff0c;看…

时间差异导致数据缺失,如何调整Grafana时间与Prometheus保持同步?

Grafana时间如何调快或调慢&#xff1f; 在k8s环境中&#xff0c;常使用prometheusgrafana做监控组件&#xff0c;prometheus负责采集、存储数据&#xff0c;grafana负责监控数据的可视化。 在实际的使用中&#xff0c;有时会遇到这样的问题&#xff0c;k8s集群中的时间比真实…

Spark on Hive及 Spark SQL的运行机制

Spark on Hive 集成原理 HiveServer2的主要作用: 接收SQL语句&#xff0c;进行语法检查&#xff1b;解析SQL语句&#xff1b;优化&#xff1b;将SQL转变成MapReduce程序&#xff0c;提交到Yarn集群上运行SparkSQL与Hive集成&#xff0c;实际上是替换掉HiveServer2。是SparkSQL…

“三指针法“合并两个有序数组(力扣每日一练)

我的第一想法确实是&#xff1a;先合并数组&#xff0c;再排序&#xff0c;搞完。 哈哈哈&#xff0c;想那么多干嘛&#xff0c;目的达成了就好了。 力扣官方题解是双指针&#xff1a; 还有糕手&#xff1a; Python&#xff1a; def merge(nums1, m, nums2, n):# 两个指针分别…

27 代码星球卡片

效果演示 实现了一个卡片式的网站页面&#xff0c;其中卡片的背景颜色和字体颜色随着鼠标移动而变化&#xff0c;鼠标悬停时会出现一个渐变的背景和文字颜色&#xff0c;卡片上方还有一个按钮&#xff0c;当鼠标点击按钮时会出现一个动态的渐变背景和文字颜色。整个页面的背景颜…

SQL-修改表操作

目录 DDL-表操作-修改 添加字段 &#xff08;方括号内容可选&#xff09; 修改字段 修改指定字段的数据类型 修改字段名和字段类型 删除字段 修改表名 删除表 删除指定表&#xff0c;并重新创建该表 总结 &#x1f389;欢迎您来到我的MySQL基础复习专栏 ☆* o(≧▽≦…

虹科分享 | 用Redis为LangChain定制AI代理——OpenGPTs

文章速览&#xff1a; OpenGPTs简介Redis在OpenGPTs中的作用在本地使用OpenGPTs在云端使用OpenGPTsRedis与LangChain赋能创新 OpenAI最近推出了OpenAI GPTs——一个构建定制化AI代理的无代码“应用商店”&#xff0c;随后LangChain开发了类似的开源工具OpenGPTs。OpenGPTs是一…

Qt/C++音视频开发63-设置视频旋转角度/支持0-90-180-270度旋转/自定义旋转角度

一、前言 设置旋转角度,相对来说是一个比较小众的需求,如果视频本身带了旋转角度,则解码播放的时候本身就会旋转到对应的角度显示,比如手机上拍摄的视频一般是旋转了90度的,如果该视频文件放到电脑上打开,一些早期的播放器可能播放的时候是躺着的,因为早期播放器设计的…

基于VSG控制的MMC并网逆变器MATLAB仿真模型

微❤关注“电气仔推送”获得资料&#xff08;专享优惠&#xff09; 模型简介 根据传统同步发电机的运行特性设计了MMC-VSG功频控制器和励磁控制器&#xff0c; 实现了MMC-VSG逆变器对高压电网电压和频率的支撑。该模型包含MMC变流器模块&#xff0c;环流抑制模块&#xff0c;…

【算法】增减序列(贪心,差分)

题目 给定一个长度为 n 的数列 a1,a2,…,an&#xff0c;每次可以选择一个区间 [l,r]&#xff0c;使下标在这个区间内的数都加一或者都减一。 求至少需要多少次操作才能使数列中的所有数都一样&#xff0c;并求出在保证最少次数的前提下&#xff0c;最终得到的数列可能有多少种…

21道Java Spring MVC综合面试题详解含答案(值得珍藏)

1.概述 1.1 什么是Spring MVC&#xff1f;简单介绍下你对Spring MVC的理解&#xff1f; Spring MVC是一个基于Java的实现了MVC设计模式的请求驱动类型的轻量级Web框架&#xff0c;通过把模型-视图-控制器分离&#xff0c;将web层进行职责解耦&#xff0c;把复杂的web应用分成…

推荐熊猫电竞赏金电竞系统源码

熊猫电竞赏金电竞系统源码&#xff0c;包含APP、H5和搭建视频教程&#xff0c;支持运营级搭建&#xff0c;这套源码是基于ThinkPHPUniaapp框架开发的。 系统是一套完整的电竞平台开发源码&#xff0c;包括赛事管理、用户系统、竞猜系统、支付系统等模块。源码结构清晰&#xff…

如何从多个文件夹里各提取相应数量的文件放一起到新文件夹中形成多文件夹组合

首先&#xff0c;需要用到的这个工具&#xff1a; 百度 密码&#xff1a;qwu2蓝奏云 密码&#xff1a;2r1z 说明一下情况 文件夹&#xff1a;1、2、3里面分别放置了各100张动物的图片&#xff0c;模拟实际情况的各种文件 操作&#xff1a;这里演示的是从3个文件夹里各取2张图…

Open3D (C++) 计算条件数

目录 一、算法原理1、条件数2、参考文献二、代码实现三、结果展示本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫与GPT。 一、算法原理 1、条件数 条件数法是目前应用最为广泛的一种病态诊断方法。条件数的定义为: