Springcloud-消息总线-Bus

1.消息总线在微服务中的应用

BUS- 消息总线-将消息变更发送给所有的服务节点。
在微服务架构的系统中,通常我们会使用消息代理来构建一个Topic,让所有
服务节点监听这个主题,当生产者向topic中发送变更时,这个主题产生的消息会被
所有实例消费,这就是消息总线的工作模式。也是我们熟悉的发布-订阅模型。
其实广义的消息总线不单指这种“发布-订阅”模型!也可以代指分布式服务间进行通信,
消息分发的单播模式,甚至有的公司既不使用HTTP也不用RPC来构建微服务。完全靠消息
总线来做服务调用。比如,银行老系统采用总线型架构,在不同服务结点之间做消息分发

SpringCloud中的Bus职责范围就相对小了很多,因为还有一个Stream组件代理了大部分的消息中间件通信服务,因此BUS在实际应用中大多是为了应对“消息广播”的场景。比如和config异同搭配使用推送配置信息。

总线式架构的完整流程
在这里插入图片描述
我们要关注一下白底红框那三个和BUS有关系的步骤
MQ/KAFKA:BUS是一个调用封装,它的背后还是需要依赖消息中间件来完成底层的消息
分发,实际项目中最常用的两个中间件分别是RabbitMQ和kafka。
BUS:作为对接上游应用和下游中间件系统的中间层,当接到刷新请求的时候,通知底层中间件向所有服务结点推送消息。
Refresh:类比config-center中可以通过actuator的Refresh请求刷新配置,那么对于总线式架构的ReFresh请求来说,有两个需要解决的问题:
谁来发起变更,服务结点还是由ConfigServer发起变更请求?
何时发起变更-时手工发起变更?还是每次Github改动完成后自动推送?

2.BUS简介

BUS实现:
加入我们所有的节点都订阅了topic(消息组件这个属性刷新这个topic)当你的属性发生变动的时候,只要发送一个广播消息,所有的节点都会消费消息,并且触发刷新动作。

BUS的标签:
BUS只是对消息进行了简单的封装,底层是依赖Stream(专业用来与消息中间件进行通信的组件)来广播消息。

在这里插入图片描述
BUS的两个场景:
配置变更通知;自定义消息广播;

3.BUS体系结构解析

BUS的三个角色:
消息的发布者,是一个中间件;
事件监听者,监听事件动态,各个监听消息的服务节点;
事件主体,配置变更就是事件

事件的架构:
在BUS配置刷新的事件类是RefreshRemoteApplicationEvent。在 BUS的规范下,所有事件都包含三个维度的信息:
**source:**这是一个必填信息,它可以是一个自定义并且能够被序列化反序列化的pojo对象,它包含了一个事件想要传达的信息;
Original Service 消息来源方,通常是事件发布方的机器ID,或者AppId等;
Destination Service 目标机器,Bus会根据Destination Service指定的过滤条件(比如服务名,端口等),只让指定的监听者响应事件;

消息发布者
我们所有的“事件”都是通过Bus来发布的,Bus默认提供了两个Endpoint作为消息发布者:
bus-env:在本地发布EnvironmentChangeRemoteApplicationEvent事件,表示一个远程环境变更事件。进一步查看这个事件的内容,我们发现其中包含了一个Map<String, String>属性,事件监听者接收到这个事件之后,会将事件中的Map添加到Spring环境变量中(由Spring Cloud的EnvironmentManager负责具体处理),从而达到修改环境变量的目的
bus-refresh:发布RefreshRemoteApplicationEvent事件,表示一个远程配置刷新事件,这个事件会触发@RefreshScope注解所修饰的Java类中属性的刷新(@RefreshScope修饰的类可以在运行期更改属性)
以上两个ENDpoint就是BUS通过、actuator服务对外提供出来的

消息监听者:
BUS中默认创建了两个消息监听器,分别对应上面两个消息发布的Endpoints。
在这里插入图片描述
在spring-cloud-context这个依赖中定义了大量的事件。

4.Bus的接入方式RabbitMQ & Kafka

Spring的组件一向是以一种插件式的方式提供功能,将组件自身和我们项目中的业务代码隔离,使得我们更换组件的成本可以降到最低。Spring Cloud Bus也不例外,它的底层消息组件非常容易替换,替换过程不需要对业务代码引入任何变更。Bus就像一道隔离了底层消息组件和业务应用的中间层,比如我们从RabbitMQ切换为Kafka的时候,只需要做两件事就好了:
在项目pom中替换依赖组件;
更改配置文件里的连接信息。

RabbitMQ和Kafka两种消息组件如何接入Bus
接入RabbitMQ
RabbitMQ是实现了AMQP(Advanced Message Queue Protocal)的开源消息代理软件,也是平时项目中应用最广泛的消息分发组件之一。
接入RabbitMQ的方式很简单,我们只要在项目中引入以下依赖:

org.springframework.cloud
spring-cloud-starter-bus-amqp

点进去发现,它还依赖于spring-cloud-starter-stream-rabbit。
也就是说stream组件是被真正用来发送广播消息到RabbitMQ,
BUS只是帮我们封装了整个消息的发布和监听动作!
项目所需要的具体的配置:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest

接入Kafka;
要使用kafka来实现消息代理,只需要把上一步中引入spring-cloud-starter-bus-amqp
依赖替换成spring-cloud-starter-bus-kafka依赖

org.springframework.cloud
spring-cloud-starter-bus-kafka

如果大家的Kafka和ZooKeeper都运行在本地,并且采用了默认配置,那么不需要做任何额外的配置,就可以直接使用。但是在生产环境中往往Kafka和ZooKeeper会部署在不同的环境,所以就需要做一些额外配置:
spring.cloud.stream.kafka.binder.brokers Kafka服务节点(默认localhost)
spring.cloud.stream.kafka.binder.defaultBrokerPort Kafka端口(默认9092)
spring.cloud.stream.kafka.binder.zkNodes ZooKeeper服务节点(默认localhost)
zspring.cloud.stream.kafka.binder.defaultZkPort ZooKeeper端口(默认2181)

5.部分关键源码:

内置事件的架构RefreshRemoteApplicationEvent
刷新事件的发送端-RefreshBusEndpoint

开端:RefreshRemoteApplicationEvent

public class RefreshRemoteApplicationEvent extends RemoteApplicationEvent {

   @SuppressWarnings("unused")
   private RefreshRemoteApplicationEvent() {
      // for serializers
   }

   public RefreshRemoteApplicationEvent(Object source, String originService,
         String destinationService) {
      super(source, originService, destinationService);
   }

查看find usage:有两个大类:RefreshBusEndpoint以及RefreshListener类。
一个是起点RefreshBusEndpoint,一个是终点RefreshListener。
关注起点:RefreshBusEndpoint

@Endpoint(id = "bus-refresh") // TODO: document new id
public class RefreshBusEndpoint extends AbstractBusEndpoint {

   public RefreshBusEndpoint(ApplicationEventPublisher context, String id) {
      super(context, id);
   }

   @WriteOperation
   public void busRefreshWithDestination(@Selector String destination) { // TODO:
                                                         // document
                                                         // destination
      publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), destination));
   }

   @WriteOperation
   public void busRefresh() {
      publish(new RefreshRemoteApplicationEvent(this, getInstanceId(), null));
   }

}

关注到主类的super方法,就是到了RemoteApplicationEvent类

protected RemoteApplicationEvent(Object source, String originService,
      String destinationService) {
   super(source);
   this.originService = originService;
   if (destinationService == null) {
      destinationService = "**";
   }
   // If the destinationService is not already a wildcard, match everything that
   // follows
   // if there at most two path elements, and last element is not a global wildcard
   // already
   if (!"**".equals(destinationService)) {
      if (StringUtils.countOccurrencesOf(destinationService, ":") <= 1
            && !StringUtils.endsWithIgnoreCase(destinationService, ":**")) {
         // All instances of the destination unless specifically requested
         destinationService = destinationService + ":**";
      }
   }
   this.destinationService = destinationService;
   this.id = UUID.randomUUID().toString();
}

本人进行测试的接口是:
测试的接口是:localhost:60002/actuator/bus-refresh
在这里插入图片描述

研究了发现对于RemoteApplicationEvent就是确定destination!

在RefreshBusEndpoint中,将contex存放在ApplicationEventPublisher里。
这就是ApplicationEventPublisher,用来发布上下文消息的!

接下来到了AbstractApplicationContext中

protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
   Assert.notNull(event, "Event must not be null");

   // Decorate event as an ApplicationEvent if necessary
   ApplicationEvent applicationEvent;
   if (event instanceof ApplicationEvent) {
      applicationEvent = (ApplicationEvent) event;
   }
   else {
      applicationEvent = new PayloadApplicationEvent<>(this, event);
      if (eventType == null) {
         eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
      }
   }

   // Multicast right now if possible - or lazily once the multicaster is initialized
   if (this.earlyApplicationEvents != null) {
      this.earlyApplicationEvents.add(applicationEvent);
   }
   else {
      getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
   }

   // Publish event via parent context as well...
   if (this.parent != null) {
      if (this.parent instanceof AbstractApplicationContext) {
         ((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
      }
      else {
         this.parent.publishEvent(event);
      }
   }
}

整个过程是事件驱动,编程解耦!

6.如何实现自动推送?Git WebHook

问题:由谁来发起状态的变更请求?
如何通过GitHub的Webhook机制实现自动推送!

Webhook?Git的一种机制,可以用于自动化的构建。
当每次提交代码到Git以后,会触发Webhook执行一段程序,来完成预定义的操作。比如说让钩子通知CI/CD系统从Github拉取最新代码开始执行构建过程或者执行其他操作!

Webhook三步走:
设置encrypt.key;
将上一步中的key添加到Github仓库设置中;
设置Webhook url;
设置encrypt.key,类似属性加解密方式,只需要在application.yml中设置一个key就好!
encrypt:
key: yourKey

自动推送需要注意的问题
无法测试:改动只要一提交就被推送到所有机器,假如不小心修改错了属性,那所有服务器就要团灭了
定点推送:尽管Bus支持在URL中添加目标范围,定向推送到指定机器,但毕竟URL在Webhook里面是写死的,不方便我们根据实际情况做定点推送

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/756708.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【论文阅读】transformer及其变体

写在前面&#xff1a; transformer模型已经是老生常谈的一个东西&#xff0c;以transformer为基础出现了很多变体和文章&#xff0c;Informer、autoformer、itransformer等等都是顶刊顶会。一提到transformer自然就是注意力机制&#xff0c;变体更是数不胜数&#xff0c;一提到…

解决error Error: certificate has expired问题

安装环境遇到下面问题&#xff1a; 产生原因&#xff1a;可能是开了服务器代理访问导致ssl安全证书失效 解决办法&#xff1a; 在终端输入以下命令&#xff1a; yarn config set "strict-ssl" false -g

Element UI搭建使用过程

本章内容基于上一篇---Vue-cli搭建项目基础版 Vue-cli搭建项目----基础版-CSDN博客 官网地址:Element - The worlds most popular Vue UI framework 介绍:完全基于Vue.js ,用于快速搭建用户界面. 第一步:安装ElementUI 在终端输入 npm i element-ui -S 在main.js输入 …

《SpringBoot+Vue》Chapter04 SpringBoot整合Web开发

返回JSON数据 默认实现 依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency>在springboot web依赖中加入了jackson-databind作为JSON处理器 创建一个实体类对象…

Python逻辑控制语句 之 判断语句--if、if else 和逻辑运算符结合

逻辑运算符&#xff1a; and or not 1.案例一 需求&#xff1a; 1. 获取⽤户输⼊的⽤户名和密码 2. 判断⽤户名是 admin 并且密码是 123456 时, 在控制台输出: 登录成功! 3. 否则在控制台输出: 登录信息错误! # 需求&#xff1a; # 1. 获取用户输入的用户名和密码 # 2. 判断…

Vue3使用jsbarcode生成条形码,以及循环生成条形码

前言&#xff1a;哈喽&#xff0c;大家好&#xff0c;我是前端菜鸟的自我修养&#xff01;今天给大家分享Vue3使用jsbarcode生成条形码&#xff0c;以及循环生成条形码&#xff0c;介绍了JsBarcode插件的详细使用方法&#xff0c;并提供具体代码帮助大家深入理解&#xff0c;彻…

005-GeoGebra基础篇-GeoGebra的点

新手刚开始操作GeoGebra的时候一般都会恨之入骨&#xff0c;因为有些操作不进行学习确实有些难以凭自己发现。 目录 一、点的基本操作1. 通过工具界面添加点2. 关于点的选择&#xff08;对象选择通用方法&#xff09;&#xff08;1&#xff09;选择工具法&#xff08;2&#xf…

synchronized 锁优化原理

目录 一、轻量级锁 二、锁膨胀 三、自旋优化 四、偏向锁 五、锁消除 一、轻量级锁 1. 会创建一个锁记录 Lock Record&#xff08;保存在线程栈中&#xff09;&#xff0c;尝试 CAS 修改 Mark Word 中的对象头&#xff0c;是一种乐观锁的思想&#xff0c;而不是将 Java 对…

如何选择适合的接口自动化测试工具!

引言&#xff1a; 在现代软件开发中&#xff0c;接口自动化测试已经成为保证软件质量的重要环节。通过自动化测试工具&#xff0c;可以有效地提高测试效率、减少人力成本&#xff0c;并且能够更好地发现和解决潜在的问题。然而&#xff0c;面对众多的接口自动化测试工具&#…

React+TS前台项目实战(十九)-- 全局常用组件封装:带加载状态和清除等功能的Input组件实现

文章目录 前言Input组件1. 功能分析2. 代码详细注释3. 使用方式4. 效果展示 总结 前言 今天我们来封装一个input输入框组件&#xff0c;并提供一些常用的功能&#xff0c;你可以选择不同的 尺寸、添加前缀、显示加载状态、触发回调函数、自定义样式 等等。这些功能在这个项目中…

数据结构-分析期末选择题考点(排序)

何似清歌倚桃李 一炉沈水醉红灯 契子 ✨ 上一期给大家提供了大概会考的题型给老铁们复习的大致思路 这一期还会是一样&#xff0c;我将整理一下排序的题型以及解题方法给你们 由于时间还很多&#xff0c;我就慢慢总结吧&#xff0c;一天一章的样子&#xff0c;明天总结串、后天…

开发技术-Java集合(List)删除元素的几种方式

文章目录 1. 错误的删除2. 正确的方法2.1 倒叙删除2.2 迭代器删除2.3 removeAll() 删除2.4 removeIf() 最简单的删除 3. 总结 1. 错误的删除 在写代码时&#xff0c;想将其中的一个元素删除&#xff0c;就遍历了 list &#xff0c;使用了 remove()&#xff0c;发现效果并不是想…

fiddler 返回Raw乱码

有时会发现自己发送的请求后&#xff0c;返回结果Raw里面是乱码&#xff0c;可以勾选Decode并重新发送请求就解决了 这个时候将Decode勾选一下 此时就好了

车辆数据的提取、定位和融合(其二.一 共十二篇)

第一篇&#xff1a; System Introduction 第二篇&#xff1a;State of the Art 第三篇&#xff1a;localization 第四篇&#xff1a;Submapping and temporal weighting 第五篇&#xff1a;Mapping of Point-shaped landmark data 第六篇&#xff1a;Clustering of landma…

基于MDEV的PCI设备虚拟化DEMO实现

利用周末时间做了一个MDEV虚拟化PCI设备的小试验&#xff0c;简单记录一下&#xff1a; DEMO架构&#xff0c;此图参考了内核文档&#xff1a;Documentation/driver-api/vfio-mediated-device.rst host kernel watchdog pci driver: #include <linux/init.h> #include …

yolov8obb角度预测原理解析

预测头 ultralytics/nn/modules/head.py class OBB(Detect):"""YOLOv8 OBB detection head for detection with rotation models."""def __init__(self, nc80, ne1, ch()):"""Initialize OBB with number of classes nc and la…

【Dison夏令营 Day 02】使用 Python 玩井字游戏

在本文中&#xff0c;我们将介绍使用 Python 语言从零开始创建井字游戏的步骤。 在本文中&#xff0c;我们将介绍使用 Python 语言从零开始创建井字游戏的步骤。 游戏简介 井字游戏是一种双人游戏&#xff0c;在 33 正方形网格上进行。每位玩家轮流占据一个单元格&#xff0c…

CMake(1)基础使用

CMake之(1)基础使用 Author: Once Day Date: 2024年6月29日 一位热衷于Linux学习和开发的菜鸟&#xff0c;试图谱写一场冒险之旅&#xff0c;也许终点只是一场白日梦… 漫漫长路&#xff0c;有人对你微笑过嘛… 全系列文章可参考专栏: Linux实践记录_Once-Day的博客-CSDN博客…

双指针算法第一弹(移动零 复写零 快乐数)

目录 前言 1. 移动零 &#xff08;1&#xff09;题目及示例 &#xff08;2&#xff09;一般思路 &#xff08;3&#xff09;双指针解法 2. 复写零 &#xff08;1&#xff09;题目及示例 &#xff08;2&#xff09;一般解法 &#xff08;3&#xff09;双指针解法 3. 快…

计算机基础知识——C基础+C指针+char类型

指针 这里讲的很细 https://blog.csdn.net/weixin_43624626/article/details/130715839 内存地址&#xff1a;内存中每个字节单位都有一个编号&#xff08;一般用十六进制表示&#xff09; 存储类型 数据类型 *指针变量名&#xff1b;int *p; //定义了一个指针变量p,指向的数…