5 转向事件驱动的架构

文章目录

  • 核心概念
    • 消息代理
    • 事件和消息
    • 了解事件
    • 异步消息通信
    • 响应式系统
  • 事件驱动的利弊
  • 消息传递模式
    • 发布—订阅
    • 工作队列
    • 过滤器
    • 数据持久性
  • 消息传递代理协议、标准和工具
  • AMQP和RabbitMQ
    • 基本概念
    • 交换类型和路由
    • 消息确认和拒绝
  • 设置RabbitMQ
    • 安装RabbitMQ
    • RabbitMQ管理界面
  • Spring AMQP和Spring Boot
  • 解决方案设计
    • 计划
    • 添加AMQP支持
    • 在Multiplication微服务中发布事件
    • 在Gamification微服务中订阅事件
  • 场景分析
    • 事件流
    • Gamification微服务不可用
    • 消息代理不可用
    • 事务性
    • 扩展微服务
  • 小结

相关代码可以从这里下载: 示例

前面的文章:
1、1 一个测试驱动的Spring Boot应用程序开发
2、2 使用React构造前端应用
3、3 试驱动的Spring Boot应用程序开发数据层示例
4、4 向微服务架构转变

后面的文章:
6、6 使用网关模式进行负载均衡

经过前面的示例,了解了微服务之间的接口的紧耦合关系,微服务Multiplication调用微服务Gamification,变成了流程的协调器。如果还有其他服务也需要查询每次尝试的数据,需要额外增加Multiplication对这些服务的调用,从而创建一个具有中央处理的分布式整体。这很显然会带来更多的问题。
现在,以Publish-Subscribe模式,重新设计这些接口,这种方式称为事件驱动的架构(Event-driven Architecture)。发布者(Publisher)不将数据传递到特定的目的地,而是不知道订阅者(Subscriber,即系统中接收数据的一方)的情况下,对事件进行分类和发送。这些事件使用者同样不需要知道发布者的逻辑。这种范式的变化使系统的耦合变得松散且可扩展,同时带来新的挑战。

核心概念

消息代理

事件驱动架构的要素是消息代理,系统组件与消息代理通信,而不是彼此直接连接,通过这种方式来保持彼此间的松散耦合。
消息代理通常包含理由功能,可以创建多个“通道(Channel)”,能根据需求划分消息。一个或多个发布者可以在每个通道中生成消息,这些消息可以被一个或多个订阅者(甚至没有订阅者)使用。下面是消息代理使用场景的概念视图。

消息代理
通道A
通道B
通道C
发布者1
发布者2
发布者3
订阅者1
订阅者2
订阅者3

这些概念不是新概念,经验丰富的开发人员肯定会在企业服务总线(ESB)架构中发现类似的模式。总线模式促进了系统不同部分之间的通信,提供了数据转换、映射、消息队列、排序、理由等功能。
企业服务总线(Enterprise Service Bus,ESB)的概念是从面向服务架构(Service Oriented Architecture, SOA)发展而来。ESB——企业服务总线,像一根管道,用来连接各个节点。为了集成不同系统,不同协议的服务,ESB做了消息的转换、解释与路由等工作,让不同的服务互联互通。
ESB是一种在松散耦合的服务和应用之间标准的集成方式。它可以作用于:

  • 面向服务的架构 - 分布式的应用由可重用的服务组成
  • 面向消息的架构 - 应用之间通过ESB发送和接受消息
  • 事件驱动的架构 - 应用之间异步地产生和接收消息

ESB就是在SOA架构中实现服务间智能化集成与管理的中介。

ESB架构和基于消息代理的架构之间的区别,还有一些争议。ESB中,通道本身在系统中具有更重要的意义。服务总线为通信设置协议标准、转换数据并将数据理由到特定目标。有些实现可以处理分布式事务。某些情况下,甚至有一个复杂的UI对业务流程进行建模,并将这些规则转换为配置和代码。ESB架构倾向于将绝大部分系统业务逻辑集中在总线内,构成系统的业务流程层。如图所示:

企业服务总线
智能路由
消息转换
中介
编排
拦截器
安全
服务1
服务2
服务3
服务4

将所有业务逻辑放到同一组件中且系统中有中央协调器的软件架构,往往是容易失败的软件架构模式。采用这种路线的系统只有一个失败点,因为整个系统都依赖核心部分(总线),随着时间的推移,会变得很难维护和扩展。嵌入总线的逻辑往往会变得一团糟。这也是ESB架构遭人诟病的原因之一。
基于这样的原因,逐渐放弃了这种集中协调的、过于智能的消息传送通道,更倾向于使用消息代理实现一种更简单的方式:只用它与其他不同组件进行通信。
可以将ESB视为复杂通道,而将消息代理视为简单通道,想要划清界限并不容易。一方面,可以使用ESB平台,但要适当隔离业务逻辑;另一方面,某些新的消息传递平台(如Kafka)提供了一些工具,可以在通道中嵌入一些逻辑。如果必要,也可以使用包含业务逻辑的函数来转换消息。也可以像使用数据库一样在通道中查询数据,并根据需要处理输出。因此,可在不同架构(ESB/消息代理)的相关工具之间进行切换,用类似的方式使用。也就是要了解模式,然后选择符合需求的工具。

建议:尽量避免在通信通道中包含业务逻辑,把业务流程保留在每个微服务中。

事件和消息

在事件驱动架构中,一个事件表明在系统中发生了某些事情。业务逻辑拥有发生这些事件的领域,可以将事件发布到消息通道(如消息代理)。构架中的其他组件如果到给定事件类型感兴趣,则订阅该通道,以消费所有后续事件实例。事件与发布-订阅模式有关,也与消息代理或总线关联,可以使用消息代理实现事件驱动架构,下面就来看看。
消息是一个更通用的术语。有人将消息视为直接指向系统组件的元素,将事件视为反映了给定领域中发生的事实的信息片段,而且没有专门指向某个系统组件,以此将两者区分开来。从技术角度看,通过消息代理发送事件时,事件实际上是一条消息。因此,在设计事件驱动架构时,就用“事件”来指代消息,而“消息”则指进入消息代理的通用消息。
这样,可对事件进行建模,使用REST API发送事件(类似于前面示例中的操作)。但是,生产者需要发现消费者,才能将事件发送给它们,这对降低耦合毫无帮助。
将事件与消息代理一起使用,可以隔离软件架构中的所有组件。发布者和订阅者不需要知道彼此存在,这非常适合微服务架构。使用这种策略,可引入新的微服务,它们来消费通道中的事件,不需要修改发布这些事件的微服务,也不需要修改其他订阅者。

了解事件

引入消息代理和一些Event类,并不能直接变成“事件驱动架构”。必须在设计软件时考虑到事件,需要付出努力。

  • 第一个场景

假设创建了一个Gamification API给用户分配分数和徽章,然后,微服务Multiplication调用接口updateScore,不仅发现了这个微服务,还成为这部分业务逻辑(通过为成功的尝试分配分数)的所有者。这是刚使用微服务架构时常犯的错误,源自命令式编程风格,倾向于在微服务之间用API调用替换方法调用,实现远程过程调用(RPC)模式,有时甚至不会注意到这一点。如图所示:

发送尝试
更新分数
Multiplication微服务
如果挑战正确,则更新分数
Gamification微服务
接口(API):更新分数
浏览器

更新分数(to:Gamification)
消息代理
Multiplication微服务
如果挑战正确,则更新分数
Gamification微服务
接口(API):更新分数
浏览器

为降低微服务之间的耦合,可以引入消息代理,将REST API调用替换成一条指向微服务Gamification的消息,即UpdateScore消息。但能否做到不改变系统呢?不能。因为消息仍然有一个目的地,它不能被任何新的微服务调用。此外,系统的两个部分仍然是紧耦合的,并且产生了一个副作用,用异步接口替换了同步接口,增加了额外的复杂性。

  • 第二个场景

基于当前的实现,如图所示:

发送尝试
发送成功的挑战
Multiplication微服务
挑战解决后将尝试发送到Gamification
Gamification微服务
接口(API):接收尝试并计算分数
浏览器

ChallengeSolvedEvent(给任意感兴趣的订阅者)
消息代理
Multiplication微服务
将尝试发送到通用通道
Gamification微服务
接口(API):接收尝试并计算分数
浏览器

将一个ChallengeSolvedDTO对象从Multiplication传递到Gamification,维护了域边界。不再在Multiplication服务中包含Gamification的逻辑,但是,仍然需要明确配置Gamification的地址连接,紧耦合依然存在。
通过引入消息代理,可以解决这个问题。微服务Multiplication可将ChallengeSolvedDTO发布到通用的通道,然后继续执行接下来的业务逻辑。第二个微服务可以订阅这个通道并处理消息(从概念上讲,这是一个事件),以计算相应的分数和徽章。如果新加入系统中的微服务也对ChallengeSolvedDTO消息感兴趣,例如生成报告或向用户发送通知,也可以透明地订阅该通道。

第一个场景实现了一种命令模式,其中微服务Multiplication对微服务Gamification执行的操作进行了指引(也称为编制)。第二个场景通过发送关于已发生的事件的通知以及上下文数据,实现了事件模式,消费者将处理此数据,这可能触发业务逻辑,也可能触发其他事件,这种方法称为编排,与编制相反。将软件架构建立在事件驱动设计的基础上时,称其为事件驱动架构。

要实现真正事件驱动架构,必须重新思考哪些可能以命令式表达的业务流程,重新定义动作和事件,不仅仅使用DDD来定义域,还应该将它们之间的交互作为事件。

不需要为了遵循事件驱动架构而更改系统中的每个通信接口。在某些事件驱动模式不适用的情况下,可能相应命令和请求/响应模式。不要试图将只适合命令模式的业务需求包装成事件。

异步消息通信

引入消息代理作为构建事件驱动架构的工具,也就接纳了异步消息传递。发布者发送事件,不需要等待任何事件消费者的回复,将使架构保持松耦合并具有可扩展性。如图所示:

ChallengeSolvedEvent(给任意感兴趣的订阅者)
1、发送尝试
2
4
更新分数
3、响应
尝试
浏览器
Multiplication微服务
Gamification微服务

当然,也可以使用消息代理来保持进程的同步。前面的例子中,计划用消息代理替换REST API接口,创建两个通道来传递事件,而不是只创建一个通道,使用第二个通道接收来自微服务Gamification的响应。可以阻塞请求的线程,并在继续处理之前等待确认。如图所示:

3
ChallengeSolvedEventProcessedByGamification
(给任意感兴趣的订阅者)
2
ChallengeSolvedEvent
(给任意感兴趣的订阅者)
1、发送尝试
4
更新分数
4、响应
尝试—响应
尝试
浏览器
Multiplication微服务
Gamification微服务

这实际上是基于消息代理的请求/响应模式,这种组合在某些情况下可能很有用,但不建议在事件驱动方法中使用。主要原因是:微服务Multiplication需要知道订阅者及订阅者的数量,以确保收到所有响应,系统组件会因此而紧耦合。但可从中得到好处,例如可伸缩性;也可以运用其他模式来提高可伸缩性,如负载均衡器。在进程要求必须同步的情况下,可以考虑使用一种更简单的同步接口,例如REST API。下表给出了一种建议,请注意,具体如何使用要具体分析。

模式类型实现方式
请求/响应同步REST API
需要阻塞的命令同步REST API
不需要阻塞的命令异步消息代理
事件异步消息代理

需要注意的是,尽管端到端采用了异步通信,但应用程序与消息代理之间用的是同步接口。当发布消息时,确保代理在继续其他操作之前已收到消息。这同样适用于订阅者,在订阅者消费了消息后,代理需要一个确保已将该消息标记为已处理,然后移到下一个消息。这两个步骤对保证数据的安全和系统的可靠性至关重要。

响应式系统

响应式系统是将其描述为一套应用于软件架构的设计原则,从而使系统响应敏捷(及时响应)、具有弹性(在出现故障时保持响应)、可灵活伸缩(适应在不同工作负载下响应)和基于消息驱动(确保松散耦合和边界隔离)。如果遵循遵循范式构建系统,可以称为响应式系统。
另外,响应式编程指的是一组在编程语言中使用的技术,围绕如下范式,如futures(或promise)、reactive streams、backpressure等。Java中,有一些流行的库,如Reactor或RxJava,可以实现这些范式。使用响应式编程,可将逻辑切分成一组较小的块,这些逻辑块可以异步运行,然后合成或转换结果。这也带来了并发性的改进,在并行执行任务时,可以更快完成任务。
使用响应式编程并不会让架构编程响应式架构。它们在不同的层面工作,响应式编程有利于在组件内和并发性方面做出改进,响应式系统是这些组件之间在更高层面上的变化,有助于构建松耦合、富有弹性和可伸缩性的系统。

事件驱动的利弊

前面的微服务架构,获得了灵活性和可伸缩性,但也面临挑战,如最终一致性、容错性和部分更新等。使用消息代理模式进行事件驱动有助于应对这些挑战。

  • 微服务之间的松耦合:已经找到了让微服务Multiplication不需要知道Gamification服务的方法。Multiplication给代理发送一个事件,Gamification向代理订阅事件,对其做出反应,为用户更新分数和徽章。
  • 可伸缩性:为系统进行水平扩展会很方便。此外,在架构中引入新的微服务也很容易,可以订阅事件并独立工作,例如,可以根据现有服务触发的事件生成报告或发送电子邮件等。
  • 容错性和最终一致性:如果消息代理足够可靠,那么即使系统组件出现了故障,也可以保证最终一致性。因为代理可以持久化消息,所以,假设微服务Gamification宕机了一段时间,在其重新上线后,可以通过事件进行数据补偿。

这里使用异步的过程来简单地通知其他系统组件,避免了创建阻塞的、命令式的流程,这需要一种不同的思维方式,要接受一种观念,即数据的状态可能不会在所有微服务中保持一致。
此外,随着消息代理的引入,需要添加一个新组件,因为不能断定消息代理一定不会出错,就必须让系统准备好以应对潜在的错误。

  • 丢弃的消息:可能是ChallengeSolvedEvent永远无法传递到Gamification服务等情况。如果构建一个不错过任何一个事件的系统,就应该配置消息代理以实现至少一次(at-least-once)的保证策略,以确保消息代理至少传递消息一次,尽管它们是可以重复的。
  • 重复消息:某些情况下,消息代理可能会传递多次仅被发布过一次的消息。在示例中,如果重复收到该事件,将会错误地增加分数。因此,必须考虑将事件的消费幂等化(如果多次调用且不会产生不同的结果就是幂等的)。示例中,可选的解决方案是标记已被Gamification端处理过的事件,并忽略任何重复的事件。一些消息代理还可以提供最多一次的良好保证,如RabbitMQ和Kafka,有助于防止重复。
  • 无序消息:消息代理会尽量避免无序消息,但如果出现故障或存在漏洞,这种情况仍会发生。必须做好处理的准备,如果可能,应该尽量避免按照与发布时间相同的顺序来消费事件。
  • 消息代理的宕机时间:最坏的情况下,代理可能不可用。发布者和订阅者都应该尝试处理这种情况(例如重试策略或缓存)。

事件驱动系统的另一个缺点是追溯变得更加困难。调用一个REST API,可能触发事件;然后,可能会有一些组件对这些事件做出响应,随后发布其他事件,继续延长这个链条。当只有几个分布式进程时,了解不同事件在不同的微服务中引起的动作可能问题不大,但随着系统的发展,要全面了解这些事件和操作链是一项巨大的挑战。需要这个视图对错误操作进行调试,并找出触发指定进程的原因。有一些工具可以实现分布式跟踪,将事件和动作链接起来,并可视化为一系列动作/响应,如Spring Cloud Sleuth,可在日志中自动注入标识符的工具,在发出/接收HTTP调用时,通过RabbitMQ发布/消费消息以及其他时候,将这些标识符一直传播下去,然后,如果使用集中式日志记录,可使用这些标识符链接所有的进程。

消息传递模式

要根据需要运用消息传递平台的几种模式,如图所示:

订阅者1
订阅者2
通道
消费 9 8 7 6 5 4 3 2 1
消费 9 7 5 3 1
消费 8 6 4 2
副本1
副本2
数据库1
工作队列
订阅者1需要两个实例来拆分负载。
副本1
数据库1
过滤
订阅者2对消息5和6不感兴趣。
生成 9 8 7 6 5 4 3 2 1
发布—订阅
不同的订阅者可以收到相同的消息。
发布者

发布—订阅

这种模式下,不同的订阅者将接收相同消息的副本。例如,系统中可能有多个对ChallengeSolvedEvent感兴趣的组件,如微服务Gamification和微服务Reporting。这种情况下,重要的是配置订阅者,使其接收相同的消息。每个订阅者将以不同的目的处理事件,以免引起重复操作。

这种模式更适合事件,不适用于发送到特定服务的消息。

工作队列

这种模式也称为竞争消费者模式,这种情况下,可以将工作拆分到同一个应用程序的多个实例之间。
如上图所示,同一个微服务有多个副本,目的是平衡它们之间的工作负载,每个实例将消费不同的消息、处理它们,将结果存储在数据库中。

注意:同一组件的多个副本应共享同一数据层,便于安全地分割工作。

过滤器

一种常见的情况是,一些订阅者对发布到一个通道的所有消息都感兴趣,而其他一些订阅者则只对其中一部分感兴趣。上图中的订阅者2就是这种情况。最简单的选择就是基于应用程序的过滤逻辑,在消息被消费后,立即丢弃掉。

一些消息代理也提供现成的过滤功能,系统组件可以使用给定的过滤器将自己注册为订阅者。

数据持久性

如果代理可以持久化消息,订阅者就不需要一直保持运行来消费所有数据。每个订阅者在消息代理中都有一个相关联的标记,以便知道最后消费的消息是什么。如果不能在指定的时间点获取消息,稍后数据流会从离开的节点重新传递。
即使所有订阅者检索到特定消息后,也可能想将其存储在消息代理中一段时间。如果新的订阅者想要获得其上线之前的消息,这就很有用了。如果要对某个订阅者进行重置,使所有消息重新得到处理,则持久化指定时间段的所有消息会有所帮助。
在一个将所有操作建模为事件的系统中,可以从中获益。想象一下,误删除了现有数据库中的所有数据。理论上,可以从头开始重播所有事件并重新建立相同的状态。因此,根本不需要在数据库中保留指定实体的最后状态,可将其视为多个事件的聚合,这就是事件追溯的核心概念。

当订阅者的运算不幂等时,可能很危险。

消息传递代理协议、标准和工具

多年来,出现了一些与消息代理有关的消息传递协议和标准:

  • 高级消息队列协议(AMQP):一种有线协议,将消息的数据格式定义为字节流。
  • 消息队列遥测传输(MQTT):一种协议,已成为物联网设备的流行协议,可用很少的代码实现,可在有限的带宽下工作。
  • 某些文本流的消息传递协议(STOMP):一种类似HTTP的、基于文本的协议,面向消息传递中间件。
  • Java消息服务(JMS):一种API标准,关注消息传递系统中应该实现的行为。可以找到不同的JMS客户端实现,使用不同的底层协议连接到消息代理。

下面是实现了一些协议和标准或有自己的协议和标准的流行软件工具:

  • RabbitMQ:一个开源的消息代理实现,支持AMQP、MQTT和STOMP等协议,还提供了强大路由配置的JMS API客户端。
  • Mosquito:一个实现了MQTT协议的Eclipse消息代理,是物联网系统的一个流行选项。
  • Kafka:最初由LinkedIn设计,在TCP上使用自己的二进制协议。尽管Kafka的核心功能不提供与传统消息代理(例如路由)相同的功能,当对消息中间件的要求很简单时,会是一个强大的消息传递平台。常用于需要处理流中大量数据的应用程序中。

任何情况下,如果需要在不同的工具中进行抉择,就应该熟悉它们,分析其功能是否能满足需求。在Java和Spring Boot中,RabbitMQ和Kafka是构建事件驱动架构的常用工具。
这里,使用RabbitMQ和AMQP协议,原因是它们提供了多种配置,可了解大多数选项,也可在其他消息传递平台重用这些知识。

AMQP和RabbitMQ

基本概念

前面说过,发布者是系统中向消息代理发布消息的组件或应用程序,消费者(或订阅者)接收并处理这些消息。AMQP还定义了交换(exchange)、队列(queue)和绑定(binding),如图所示:

消息代理
绑定键:*.correct
绑定键:*.wrong
交换 3 2 1
队列1 3 1
队列2 2
发布者
订阅者1
订阅者2
路由键:
1:“attempts.correct”
2:“attempts:wrong”

交换是消息被发送到的实体,根据交换类型和规则定义的逻辑(称为绑定)路由到队列。交换可以是持久化的,即使消息代理重新启动,仍然存在;如果不存在,也是暂时的。
队列是AMQP中存储将被消费的消息的对象。一个队列可能有0个、1个或多个消费者,可以是持久或临时的,要注意,持久化的队列不意味着它的所有消息都是持久的。要使消息在消息代理重启后仍然存在,必须将其作为持久消息发布。
绑定是将发布到交换的消息路由到特定队列的规则,就是将队列绑定到找到的交换。一些交换支持通过一个可选的绑定键(binding key)来决定哪些发布到交换的消息最终应该到达指定的队列。从这个意义上,可以将绑定看作过滤器。
发布者可以在发送消息时指定路由键(routing key),如果使用这些配置,可以基于绑定键对其进行正确筛选。路由键由点分隔的单词组成,例如attempts.correct,绑定键也有类似的格式,还可能包含模式匹配器,具体取决于交换的类型。

交换类型和路由

RabbitMQ有几种交换类型可选,如图所示,通过绑定键定义的不同路由策略和每条消息对应的路由键进行组合。

  • 默认交换(default exchange)由消息代理预先声明。所有已创建的队列都使用与队列同名的绑定键与该交换进行绑定。从概念角度看,这意味着可考虑使用目标队列作为绑定键来发布消息(如果将相应的名称用作路由键)。从技术角度看,这些消息仍然要通过交换。这种设置不常用,因为它违背了整个路由的目的。
  • 直连交换(direct exchange)常用于单播路由。与默认交换的区别在于,可以使用自己的绑定键,也可以创建多个使用相同绑定键的队列,然后,这些队列都将获得其路由键与绑定键匹配的消息。从概念来看,在发布知道目的地(单播)的消息时使用,不需要知道有多少队列会获得消息。
  • 扇状交换(fanout exchange)不使用路由键。将所有消息路由到绑定到交换的所有队列,非常适合广播场景。
  • 主题交换(topic exchange)是最灵活的。可以使用一个模式,而不是使用具体的值将队列绑定到交换上,允许订阅者注册队列来使用经过筛选的消息集。在模式中可以使用#表示匹配任何一组单词,或使用*表示只匹配一个单词。
  • 头部交换(headers exchange)使用消息头部作为路由键以提高灵活性。因为可以设置一个或多个消息头部的匹配条件进行全匹配或任意匹配,可忽略标准路由键。
消息代理
q-gamification
q-reports
game
game
reports
(ignored)
(ignored)
#.correct
#.wrong
“correct=true,
version=1,
x-match=all”
“correct=true,
version=1,
x-match=any”
默认交换 3 2 1
q-gamification 3 1
q-reports 2
直连交换 3 2 1
q-gamification-1 3 1
q-gamification-2 3 1
q-reports 2
扇状交换 3 2 1
q-gamification 3 2 1
q-reports 3 2 1
主题交换 3 2 1
q-gamification 1
q-reports 3 2 1
头部交换 3 2 1
q-gamification 1
q-reports 3 1
发布者
路由键:
1:“q-gamification”
2:“q-reports”
3:“q-gamification”
发布者
路由键:
1:“game”
2:“reports”
3:“game”
发布者
没有考虑路由键
发布者
路由键:
1:“challengesolved.correct”
2:“challengesolved.wrong”
3:“challengesolved.wrong”
发布者
头部:
1:“correct=true”,”version=1“
2:“correct=true”,”version=2“
3:“correct=false”,”version=1“
Gamification
订阅者
Reports
订阅者
Gamification
订阅者
Reports
订阅者
Gamification
订阅者
Reports
订阅者
Gamification
订阅者
Reports
订阅者
Gamification
订阅者
Reports
订阅者

前面介绍的发布/订阅和过滤器模式都适用于这些场景。图中的直连交换可能看起来像是工作队列模式,实际上并非如此。在AMQP协议中,负载均衡发生在同一队列的消费者之间,而不是队列之间。
要实现工作队列模式,通常会多次订阅同一队列,如图所示。

消息代理
game
reports
直连交换 5 4 3 2 1
q-gamification-1 5 4 3 2 1
q-reports 2
发布者
路由键:
1:“game”
2:“reports”
3:“game”
4:“game”
5:“game”
Gamification
订阅者1
3 1
Gamification
订阅者2
5 2
Reports
订阅者
2

消息确认和拒绝

AMQP协议为消费者应用程序定义了两种不同的确认模式。由于消费者发送确认后,消息将从队列中删除,因此理解这两种确认模式很重要。
第一个可选项是自动确认,这种策略,消息在发送到应用程序时将被视为已交付。第二个选项称为显式确认,会等待应用程序发送确认信号。相比之下,第二个选项更好,可以确保所有消息都得到处理。
消费者可以读取信息、运行一些业务逻辑、将相关数据持久化,甚至可以在向消息代理发送确认信号之前触发后续的事件。这种场景下,只有当消息已被完全处理时,才会将其从队列中删除。如果消费者在发送信号之前宕机(或出现错误),那么消息代理将尝试把消息传递给另一个消费者。如果没有错误,将保持等待,直到有一个消费者开用。
消费者也可以拒绝消息。假设某个消费者实例由于网络错误而无法访问数据库:消费者可以拒绝该消息,并指定该消息重新回到队列或将其丢弃。请注意,如果导致拒绝消息的错误持续存在了一段时间,且没有其他消费者能够成功地处理它,最终可能陷入无限的“重入队列/拒绝消息”的循环之中。

设置RabbitMQ

安装RabbitMQ

可以到RabbitMQ的官网去下载(RabbitMQ)。安装之后启动代理即可,这里不再一一介绍。
下面介绍一下Docker下安装RabbitMQ的方法。

  • 拉取镜像

使用docker pull rabbitmq:management命令,推荐使用带有management版本的。

  • 创建数据卷

使用docker volume create rabbitmq-home命令,创建一个数据卷,专门用于持久化RabbitMQ的所有数据,方便管理。

  • 创建并运行容器

使用命令

docker run -id --name=rabbitmq 
-v rabbitmq-home:/var/lib/rabbitmq 
-p 15672:15672 -p 5672:5672 
-e RABBITMQ_DEFAULT_USER=admin 
-e RABBITMQ_DEFAULT_PASS=123456 
rabbitmq:management

这里除了挂载数据卷rabbitmq-home之外,还暴露了两个端口,以及设定了两个环境变量:

  • 15672端口:RabbitMQ的管理页面端口。
  • 5672端口:RabbitMQ的消息接收端口。
  • RABBITMQ_DEFAULT_USER环境变量:指定RabbitMQ的用户名,这里指定为admin,大家部署时替换成自己定义的。
  • RABBITMQ_DEFAULT_PASS环境变量:指定RabbitMQ的密码,这里指定为123456,大家部署时替换成自己定义的。

这样容器就部署完成了!启动容器后,在浏览器中访问服务器地址:15672即可访问到RabbitMQ的管理界面,用户名和密码即为刚刚指定的环境变量的配置值。
RabbitMQ容器是通过指定环境变量的方式进行配置的,这比修改配置文件便捷得多,还有更多的配置用的环境变量,大家可以参考官方文档。

RabbitMQ管理界面

在docker中安装带有management的版本,即可管理RabbitMQ,在浏览器中输入:http://localhost:15672/,可看到登录界面,使用自己设置的用户名和密码登录即可进入管理界面,如下所示:
RabbitMQ
通过这个界面,可监控队列中的消息、处理速率、不同注册节点的统计信息等,还有其他特性,如对队列和交换的监管,甚至可以创建或删除实体。

Spring AMQP和Spring Boot

前面使用Spring Boot构建微服务,也会使用Spring模块连接到RabbitMQ消息代理。这需要Spring AMQP,它包含两个工件:spring-rabbit,是一组与RabbitMQ代理一起使用的工具;spring-amqp,包括所有AMQP抽象,可使实现独立于供应商。当前,Spring提供AMQP协议的RabbitMQ实现。
Spring Boot为AMQP提供了一个启动程序,带有自动配置:spring-boot-starter-amqp,包含了上面的两个工件。

解决方案设计

下图对要构建的功能进行了说明,从微服务Multiplication到客户端的响应可能在Gamification微服务处理消息之前发生,这是一个异步的、最终一致的流程。这里将创建一个Topic类型的Attempts交换,在类似的事件驱动架构中,能灵活地使用特定的路由键发送事件,允许消费者订阅所有事件或在队列中设置自己的过滤器。

2
ChallengeSolvedEvent(给任意感兴趣的订阅者)
1、发送尝试
路由键:
- attempt.correct
- attempt.wrong
绑定:
attempt.correct
4
更新分数
3、响应
Attempts
(主题交换)
Gamification
尝试
浏览器
Multiplication
微服务
Gamification
微服务

从概念上讲,Multiplication微服务拥有Attempts交换,将使用它来发布与用户尝试有关的事件,原则上,将发布正确和错误的事件,因为对消费者的逻辑一无所知。另一方面,Gamification微服务使用满足其要求的绑定键来声明队列,路由键用作过滤器,仅接收正确的尝试。如上图所示,可能有多个Gamification微服务实例从同一队列中消费,那么,代理将平衡所有实例之间的负载。
假设另一种微服务也对ChallengeSolvedEvent感兴趣,该微服务需要声明自己的队列来消费相同的消息。例如,引入Reports微服务,该微服务将创建reports队列并使用绑定键attempt.*(或#)来消费正确和错误的尝试。
可以结合发布/订阅和工作队列模式,以便多个微服务可以处理相同的消息,且同一微服务的多个实例可以共享负载。此外,让发布者负责交换和订阅者负责队列,可构建事件驱动的微服务架构,通过引入消息代理实现了二者之间的松耦合。

计划

要实现事件驱动架构改变,需要完成以下工作:

  1. 添加新的依赖到pom.xml中,以扩充Spring Boot应用程序支持。
  2. 删除将挑战显式发送给Gamification和相应控制器的REST API客户端。
  3. 将ChallengeSolvedDTO重命名为ChallengeSolvedEvent。
  4. 作为发布者,声明Multiplication微服务上的交换。
  5. 修改Multiplication微服务的逻辑以发布事件,而不是调用REST API。
  6. 作为订阅者,在Gamification微服务上声明队列。
  7. 添加消费者逻辑以获取队列中的事件,并将其连接到现有的服务层,以处理正确尝试的分数和徽章。
  8. 同时重构测试。

添加AMQP支持

在Spring Boot应用程序中添加AMQP和RabbitMQ支持,需要添加相应的启动项,在pom.xml中添加相应的依赖,如下所示:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

该启动项包括了spring-rabbit和spring-amqp,通过spring-boot-autoconfigure,RabbitAutoConfiguration类提供连接RabbitMQ服务的默认配置,方便使用。
RabbitAutoConfiguration类使用在RabbitProperties类中定义的一组属性,可以在application.properties文件中覆盖这些属性。这里可以找到预定义的主机名(localhost)、端口(5672)、用户名(guest)和密码(guest)等。自动配置类为RabbitTemplate对象构建连接工厂和配置重新,可用它们接收消息或发送消息到RabbitMQ。也可以使用抽象接口AmqpTemplate。自动配置还包括一些使用其他机制接收消息的默认配置,如RabbitListener注解等。

在Multiplication微服务中发布事件

在微服务Multiplication在添加了消息代理支持,就可以发布事件,进行配置了。

  • 交换的名称:保存在配置中会很方便,就不必在以后根据运行应用程序的环境对其进行修改,也不必在应用程序之间共享。
  • 日志记录设置:可以在应用程序与RabbitMQ进行交互时查看日志。可以将RabbitAdmin类的日志级别改为DEBUG,该类与RatherMQ代理进行交互以声明交换、队列和绑定。

另外,可删除指向Gamification服务的属性,现在不再需要了,对application.properties修改如下:

# 配置交换
amqp.exchange.attempts=attempts.topic
# 配置日志记录,显式交换、队列、绑定等声明信息
logging.level.org.springframework.amqp.rabbit.core.RabbitAdmin=DEBUG

接着,将Exchange声明添加到AMQP的独立配置文件中,可使用Spring的交换生成器:ExchangeBuilder,在配置中添加代理中声明的主题类型的Bean,配置使用JSON进行序列化的消息转换器。AMQPConfiguration类代码如下:

package cn.zhangjuli.multiplication.configuration;

import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 配置RabbitMQ,通过AMQP抽象在应用程序中使用事件
 * @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
 */
@Configuration
public class AMQPConfiguration {
    /**
     * 配置主题交换
     * @param exchangeName 交换名称,从配置中获取
     * @return TopicExchange,使用配置的交换名称构建主题交换
     */
    @Bean
    public TopicExchange challengesTopicExchange(
            @Value("${amqp.exchange.attempts}") final String exchangeName
    ) {
        return ExchangeBuilder.topicExchange(exchangeName).durable(true).build();
    }

    /**
     * 配置消息转换器,用JSON对象序列化程序覆盖默认的Java对象序列化程序,以避免Java对象序列化的缺陷。
     * @return 消息转换器
     */
    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

用JSON对象序列化程序覆盖默认的Java对象序列化程序,可以避免Java对象序列化的缺陷。

  • 在不同编程语言之间使用可不好。如果要引入不是用Java编写的消费者,则必须找一个特定的库来执行跨语言反序列化。
  • 在消息的标题中使用硬编码完全限定类型名称。反序列化程序希望Java Bean位于相同的程序包中,并具有相同的名称和字段,这根本不灵活,因为可能只想反序列化某些属性,并遵循良好的领域驱动设计实践来保留自己的事件数据版本。

发布者端,Jackson2JsonMessageConverter 使用预先配置的ObjectMapper,然后,RabbitTemple实现将使用这个Bean,该类将序列化对象并将其作为AMQP消息发送给代理。订阅者端,可从JSON格式中受益,从而可使用任何编程语言对内容进行反序列化,还可以使用自己的对象表示形式,忽略消费者端不需要的属性,从而减少微服务之间的耦合。如果发布者在有效载荷中包括新字段,订阅者也不必更改任何内容。
JSON不是Spring AMQP消息转换器支持的唯一格式,还可以使用XML或Google的协议缓冲区(即protobuf)。在性能至关重要的实际系统中,应考虑二进制格式(如protobuf)。

下面,删除GamificationServiceClient类,同时需要删除ChallengeServiceImpl类中的相关引用(只有有限的几行)。然后,将ChallengeSolvedDTO重命名为ChallengeSolvedEvent,不需要修改任何字段,仅此而已。ChallengeSolvedEvent类如下:

package cn.zhangjuli.multiplication.challenge;

import lombok.Value;

/**
 * @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
 */
@Value
public class ChallengeSolvedEvent {
    long attemptId;
    boolean correct;
    int factorA;
    int factorB;
    long userId;
    String userAlias;
}

注意:使用显式的命名约定是一种良好的实践。

下面,在服务层创建一个新组件以发布事件,等效于已经删除的REST API客户端,但它与消息代理进行通信。ChallengeEventPublisher 类的代码如下:

package cn.zhangjuli.multiplication.challenge;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

/**
 * @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
 */
@Service
public class ChallengeEventPublisher {
    private final AmqpTemplate amqpTemplate;
    private final String challengesTopicExchange;

    public ChallengeEventPublisher(AmqpTemplate amqpTemplate,
                                   @Value("${amqp.exchange.attempts}")
                                   String challengesTopicExchange) {
        this.amqpTemplate = amqpTemplate;
        this.challengesTopicExchange = challengesTopicExchange;
    }

    /**
     * 发布事件,设置路由键,然后发送到代理。
     * @param challengeAttempt 消息源,需要转换为发布的事件。
     */
    public void challengesSolved(final ChallengeAttempt challengeAttempt) {
        ChallengeSolvedEvent event = buildEvent(challengeAttempt);
        // 路由键是 attempt.correct 或 attempt.wrong
        String routingKey = "attempt." + (event.isCorrect() ? "correct" : "wrong");
        amqpTemplate.convertAndSend(challengesTopicExchange, routingKey, event);
    }

    /**
     * 将ChallengeAttempt转换为ChallengeSolvedEvent 对象
     * @param challengeAttempt 消息源
     * @return 事件,要发布的数据。
     */
    private ChallengeSolvedEvent buildEvent(final ChallengeAttempt challengeAttempt) {
        return new ChallengeSolvedEvent(challengeAttempt.getId(),
                challengeAttempt.isCorrect(), challengeAttempt.getFactorA(),
                challengeAttempt.getFactorB(), challengeAttempt.getUser().getId(),
                challengeAttempt.getUser().getAlias());
    }
}

还记得在删除GamificationServiceClient类时,要在ChallengeServiceImpl类中删除的引用吗?这是关联点,现在要使用ChallengeEventPublisher替换GamificationServiceClient功能,即可将消息发布到RabbitMQ代理上,这样,感兴趣的任何组件就可以使用这些消息了。修改ChallengeServiceImpl类以发送新事件,代码如下:

package cn.zhangjuli.multiplication.challenge;

import cn.zhangjuli.multiplication.user.User;
import cn.zhangjuli.multiplication.user.UserRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class ChallengeServiceImpl implements ChallengeService {
    private final UserRepository userRepository;
    private final ChallengeAttemptRepository attemptRepository;
    private final ChallengeEventPublisher challengeEventPublisher;

    @Override
    public ChallengeAttempt verifyAttempt(ChallengeAttemptDTO attemptDTO) {
        // Check if the attempt is correct
        boolean isCorrect =
                attemptDTO.getGuess() == attemptDTO.getFactorA() * attemptDTO.getFactorB();

        // 检查alias用户是否存在,不存在就创建
        User user = userRepository.findByAlias(attemptDTO.getUserAlias())
                .orElseGet(() -> {
                    log.info("Creating new user with alias {}", attemptDTO.getUserAlias());
                    return userRepository.save(
                            new User(attemptDTO.getUserAlias())
                    );
                });

        // Builds the domain object. Null id for now.
        ChallengeAttempt checkedAttempt = new ChallengeAttempt(null,
                user,
                attemptDTO.getFactorA(),
                attemptDTO.getFactorB(),
                attemptDTO.getGuess(),
                isCorrect);

        // Stores the attempt
        ChallengeAttempt storedAttempt = attemptRepository.save(checkedAttempt);
        log.info("attempt: {}", storedAttempt);

        // 发布事件来通知潜在的感兴趣的订阅者
        challengeEventPublisher.challengesSolved(storedAttempt);

        return storedAttempt;
    }

    @Override
    public List<ChallengeAttempt> getStatisticsForUser(final String userAlias) {
        return attemptRepository.findTop10ByUserAliasOrderByIdDesc(userAlias);
    }
}

同样地,修改ChallengeServiceTest 类以测试发送新事件,代码如下:

package cn.zhangjuli.multiplication.challenge;

import cn.zhangjuli.multiplication.user.User;
import cn.zhangjuli.multiplication.user.UserRepository;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.List;
import java.util.Optional;

import static org.assertj.core.api.BDDAssertions.then;
import static org.mockito.AdditionalAnswers.returnsFirstArg;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;

/**
 * @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
 */

@ExtendWith(MockitoExtension.class)
public class ChallengeServiceTest {
    private ChallengeService challengeService;
    // 使用Mockito进行模拟
    @Mock
    private UserRepository userRepository;
    @Mock
    private ChallengeAttemptRepository attemptRepository;
    @Mock
    private ChallengeEventPublisher challengeEventPublisher;

    @BeforeEach
    public void setUp() {
        challengeService = new ChallengeServiceImpl(
                userRepository,
                attemptRepository,
                challengeEventPublisher
        );
    }

    @Test
    public void checkCorrectAttemptTest() {
        // given
        // 这里希望save方法什么都不做,只返回第一个(也是唯一一个)传递的参数,这样不必调用真实的存储库即可测试该层。
        given(attemptRepository.save(any()))
                .will(returnsFirstArg());
        ChallengeAttemptDTO attemptDTO = new ChallengeAttemptDTO(50, 60, "noise", 3000);

        // when
        ChallengeAttempt resultAttempt = challengeService.verifyAttempt(attemptDTO);

        // then
        then(resultAttempt.isCorrect()).isTrue();
        verify(attemptRepository).save(resultAttempt);
        verify(challengeEventPublisher).challengesSolved(resultAttempt);
    }

    @Test
    public void checkWrongAttemptTest() {
        // given
        given(attemptRepository.save(any()))
                .will(returnsFirstArg());
        ChallengeAttemptDTO attemptDTO = new ChallengeAttemptDTO(50, 60, "noise", 5000);

        // when
        ChallengeAttempt resultAttempt = challengeService.verifyAttempt(attemptDTO);

        // then
        then(resultAttempt.isCorrect()).isFalse();
        verify(userRepository).save(new User("noise"));
        verify(attemptRepository).save(resultAttempt);
        verify(challengeEventPublisher).challengesSolved(resultAttempt);
    }

    @Test
    public void checkExistingUserTest() {
        // given
        given(attemptRepository.save(any()))
                .will(returnsFirstArg());
        User existingUser = new User(1L, "john_doe");
        given(userRepository.findByAlias("john_doe"))
                .willReturn(Optional.of(existingUser));
        ChallengeAttemptDTO attemptDTO = new ChallengeAttemptDTO(50, 60, "john_doe", 5000);

        // when
        ChallengeAttempt resultAttempt = challengeService.verifyAttempt(attemptDTO);

        // then
        then(resultAttempt.isCorrect()).isFalse();
        then(resultAttempt.getUser()).isEqualTo(existingUser);
        verify(userRepository, never()).save(any());
        verify(attemptRepository).save(resultAttempt);
        verify(challengeEventPublisher).challengesSolved(resultAttempt);
    }

    @Test
    public void retrieveStatisticsTest() {
        // given
        User user = new User("john_doe");
        ChallengeAttempt attempt1 = new ChallengeAttempt(1L, user, 50, 60, 3010, false);
        ChallengeAttempt attempt2 = new ChallengeAttempt(2L, user, 50, 60, 3051, false);
        List<ChallengeAttempt> lastAttempts = List.of(attempt1, attempt2);
        given(attemptRepository.findTop10ByUserAliasOrderByIdDesc("john_doe"))
                .willReturn(lastAttempts);

        // when
        List<ChallengeAttempt> latestAttemptsResult = challengeService.getStatisticsForUser("john_doe");

        // then
        then(latestAttemptsResult).isEqualTo(lastAttempts);
    }
}

在ChallengeServiceTest 类中,并没有模拟AmqpTemplate的行为。需要一个新的ChallengeEventPublisherTest类,使用Mockit的ArgumentCaptor类捕获传递给Mock的参数,来模拟AmqpTemplate的路由键和事件对象调用。代码如下:

package cn.zhangjuli.multiplication.challenge;

import cn.zhangjuli.multiplication.user.User;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.amqp.core.AmqpTemplate;

import static org.assertj.core.api.BDDAssertions.then;
import static org.mockito.Mockito.verify;

/**
 * @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
 */
@ExtendWith(MockitoExtension.class)
public class ChallengeEventPublisherTest {
    private ChallengeEventPublisher challengeEventPublisher;

    @Mock
    private AmqpTemplate amqpTemplate;

    @BeforeEach
    public void setUp() {
        challengeEventPublisher = new ChallengeEventPublisher(amqpTemplate, "test.topic");
    }

    @ParameterizedTest
    @ValueSource(booleans = {true, false})
    public void sendsAttempt(boolean correct) {
        // given
        ChallengeAttempt challengeAttempt = createTestChallengeAttempt(correct);

        // when
        challengeEventPublisher.challengesSolved(challengeAttempt);

        // then
        var exchangeCaptor = ArgumentCaptor.forClass(String.class);
        var routingKeyCaptor = ArgumentCaptor.forClass(String.class);
        var eventCaptor = ArgumentCaptor.forClass(ChallengeSolvedEvent.class);

        verify(amqpTemplate).convertAndSend(exchangeCaptor.capture(),
                routingKeyCaptor.capture(), eventCaptor.capture());
        then(exchangeCaptor.getValue()).isEqualTo("test.topic");
        then(routingKeyCaptor.getValue()).isEqualTo("attempt." +
                (correct ? "correct" : "wrong"));
        then(eventCaptor.getValue()).isEqualTo(solvedEvent(correct));
    }

    private ChallengeSolvedEvent solvedEvent(boolean correct) {
        return new ChallengeSolvedEvent(1L, correct, 30, 40, 10L, "john_doe");
    }

    private ChallengeAttempt createTestChallengeAttempt(boolean correct) {
        return new ChallengeAttempt(1L,
                new User(10L, "john_doe"),
                30,
                40,
                correct ? 1200 : 1300,
                correct);
    }
}

在Gamification微服务中订阅事件

已经在Multiplication微服务中发布了事件,那么Gamification微服务中怎么订阅事件呢?和Multiplication类似,需要改变事件的使用方式,要替换现有的接受事件订阅者的控制器。
首先,在微服务Gamification的pom.xml中添加消息代理支持,就可以订阅事件,进行配置了。

  • 交换的名称:发布者将事件发送到交换。
  • 队列的名称:交换将消息路由到一个或者多个队列中,订阅者再去消费消息。
  • 日志记录设置:可以在应用程序与RabbitMQ进行交互时查看日志。可以将RabbitAdmin类的日志级别改为DEBUG,该类与RatherMQ代理进行交互以声明交换、队列和绑定。

在application.properties配置这些信息,修改如下:

# 交换名称
amqp.exchange.attempts=attempts.topic
# 队列名称
amqp.queue.gamification=gamification.queue
# 配置日志记录,显式交换、队列、绑定等声明信息
logging.level.org.springframework.amqp.rabbit.core.RabbitAdmin=DEBUG

实际上,生产者将消息发送到 Exchange (交换器:“X”),由交换器将消息路由到一个或者多个队列中,消费者再去消费消息。如果路由不到,或许会返回给生产者,或许直接丢弃。要明白两个概念:

  • BindingKey:绑定键,交换器都是需要与队列进行绑定,这里交换键可以简单的理解为交换器与队列之间的路径的名称(可以重复,即可以把多条队列以同一绑定键与路由器绑定)。
  • RoutingKey:路由键,生产者发送消息的时候可以带上路由键发送给交换器,交换器就会根据路由键去匹配队列(路由键与交换器的匹配)。

注意,消费端也应该声明交换,当声明队列时,交换必须存在。交换使用持久化,这仅在第一次声明时适用。但在微服务中声明所需的交换和队列是一种良好的编程习惯。另外,RabbitMQ实体的声明是幂等操作;如果实体存在,则该操作无效。

同样,需要在AMQPConfiguration类中进行配置,以使用JSON进行序列化的消息转换器,而不是默认消息转换器提供的格式。AMQPConfiguration类代码如下:

package cn.zhangjuli.gamification.configuration;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.module.paramnames.ParameterNamesModule;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;

/**
 * @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
 */
@Configuration
public class AMQPConfiguration {
    /**
     * 声明交换
     * @param exchangeName 交换名
     * @return 主题交换
     */
    @Bean
    public TopicExchange challengesTopicExchange(
            @Value("${amqp.exchange.attempts}") final String exchangeName
    ) {
        return ExchangeBuilder.topicExchange(exchangeName).durable(true).build();
    }

    /**
     * 声明队列
     * @param queueName 队列名
     * @return 队列
     */
    @Bean
    public Queue gamificationQueue(
            @Value("${amqp.queue.gamification}") final String queueName
    ) {
        return QueueBuilder.durable(queueName).build();
    }

    /**
     * 声明绑定,绑定队列到交换器上
     * @param gamificationQueue 队列
     * @param attemptsExchange 交换器
     * @return 绑定
     */
    @Bean
    public Binding correctAttemptsBuilding(
            final Queue gamificationQueue,
            final TopicExchange attemptsExchange
    ) {
        return BindingBuilder.bind(gamificationQueue)
                .to(attemptsExchange)
                .with("attempt.correct");
    }

    /**
     * 设置MessageHandlerMethodFactory替换默认Bean,使用默认工厂为基准,将其消息转换器替换为
     * MappingJackson2MessageConverter实例,处理从JSON到Java类的消息反序列化。并对其包含的
     * ObjectMapper进行微调,添加ParameterNamesModule以避免必须为事件类使用空的构造方法。
     * @return 设置后的MessageHandlerMethodFactory工厂
     */
    @Bean
    public MessageHandlerMethodFactory messageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();

        final MappingJackson2MessageConverter jsonConverter = new MappingJackson2MessageConverter();
        jsonConverter.getObjectMapper().registerModule(new ParameterNamesModule(JsonCreator.Mode.PROPERTIES));

        factory.setMessageConverter(jsonConverter);
        return factory;
    }

    /**
     * 设置RabbitListenerConfigurer,以使监听器使用JSON反序列化,使用MessageHandlerMethodFactory的
     * 实现来覆盖RabbitListenerConfigurer。
     * @param messageHandlerMethodFactory 工厂
     * @return RabbitListenerConfigurer
     */
    @Bean
    public RabbitListenerConfigurer rabbitListenerConfigurer(
            final MessageHandlerMethodFactory messageHandlerMethodFactory
    ) {
        return (c) -> c.setMessageHandlerMethodFactory(messageHandlerMethodFactory);
    }
}

这里不使用AmqpTemplate来接收消息,因为这是基于轮询的,会消耗网络资源;而想使用代理在有消息时通知订阅者,会使用异步通信,AMQP抽象不支持这些功能,spring-rabbit则提供了异步消费消息的机制。

下面将ChallengeSolvedDTO重构为ChallengeSolvedEvent,从技术上讲,不需要改名,因为JSON格式仅指定字段名和值,但应养成良好的编程习惯,使用合适的名称来命名,更容易找到相关的类。ChallengeSolvedEvent代码如下:

package cn.zhangjuli.gamification.challenge;

import lombok.Value;

/**
 * 定义了微服务Multiplication和微服务Gamification之间的契约,为保持独立性,在两个项目中都需要创建。
 * @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
 */
// 表明该类是不可变类
@Value
public class ChallengeSolvedEvent {
    long attemptId;
    boolean correct;
    int factorA;
    int factorB;
    long userId;
    String userAlias;
}

遵循领域驱动的设计实践,可以调整此事件的反序列化字段。例如,不需要userAlias作为Gamification的业务逻辑,可以将其从消费事件中删除。因为Spring Boot默认情况下将ObjectMapper配置为忽略未知属性,所以无需其他配置即可工作。

最好不要在微服务之间共享代码,因为要支持松耦合、向后兼容和独立部署。想象一下,微服务Multiplication将进一步发展并存储额外的数据,假设这是更艰巨挑战的额外因素。然后,这些额外因素将被添加到已发布事件的代码中,好的一面是,通过在每个领域中使用事件的不同表示形式并将映射器配置为忽略未知属性,Gamification微服务仍然有效,而不必更新其事件表示形式。

现在,就来消费事件,使用@RabbitListener注解,使其在消息到达时充当消息的处理逻辑。这里,只需要指定要订阅的队列名称,因为已经在单独的配置类中声明了RabbitMQ实体。GameEventHandler 类代码如下:

package cn.zhangjuli.gamification.game;

import cn.zhangjuli.gamification.challenge.ChallengeSolvedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

/**
 * @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
 */
@Service
@RequiredArgsConstructor
@Slf4j
public class GameEventHandler {
    private final GameService gameService;

    /**
     * 实现RabbitMQ订阅者代码很少,可将队列传递给@RabbitListener注解。
     * 以ChallengeSolvedEvent对象为期望的输入,Spring自动配置一个反序列化器,将消息从代理转换为该对象类型。
     * 因为AMQPConfiguration中的配置,将使用JSON。
     * 默认情况下,当方法最终正常完成时,基于@RabbitListener注解构建的逻辑会将确认发送给代理,
     * 在Spring Rabbit中,称为AUTO确认模式。
     * 如果想在处理之前就发送ACK信号,可将其更改为NONE;如果想完全控制,则可将其修改为MANUAL。
     * 可在工厂级别(全局配置)或监听器级别(通过将额外的参数传递给@RabbitListener注解)设置此参数和其他配置值。
     * 这里使用默认的错误策略:AUTO,捕获任何可能的异常,记录错误,然后重新抛出AmqpRejectAndDontRequeueException。
     * 这是Spring AMQP的快捷方式,用于拒绝该消息并告诉代理不要重新排队,
     * 这意味着,如果消费者逻辑中出现意外错误,将丢失消息。这里是可以接受的。
     * 如果要避免这种情况,还可以设置代码以多次重试,方法是重新抛出InstantRequeueAmqpException,
     * 或使用Spring AMQP中提供的一些工具(如错误处理程序或邮件恢复程序)来处理这些无效的消息。
     * @param event 期望的输入,事件
     */
    @RabbitListener(queues = "${amqp.queue.gamification}")
    void handleMultiplicationSolved(final ChallengeSolvedEvent event) {
        log.info("已接收到成功挑战事件:{}", event.getAttemptId());
        try {
            gameService.newAttemptForUser(event);
        } catch (final Exception e) {
            log.error("在处理ChallengeSolvedEvent时发生错误:", e);
            // 避免事件重新排队和处理
            throw new AmqpRejectAndDontRequeueException(e);
        }
    }
}

可使用@RabbitListener注解做很多事:

  • 声明交换、队列和绑定。
  • 使用相同的方式从多个队列接收消息。
  • 通过使用@Header(对单个值)或@Header(对映射)注解额外的参数来处理消息头。
  • 注入Channel参数,例如,确认控制。
  • 通过从监听器返回值来实现请求/响应模式。
  • 将注解移到类级别,并使用@RabbitHandler注解方法,可以配置多种方法来处理同一队列中出现的不同消息类型。

启动应用程序,在RabbitMQ中发布消息,就可以在控制台接收到消息,如图所示:
发布消息测试
修改了订阅者的逻辑,就可以删除GameController类了。需要重构GameService的接口及其实现GameServiceImpl,来处理ChallengeSolvedEvent,基本逻辑不变。代码如下:

package cn.zhangjuli.gamification.game;

import cn.zhangjuli.gamification.challenge.ChallengeSolvedEvent;
import cn.zhangjuli.gamification.game.badgeprocessors.BadgeProcessor;
import cn.zhangjuli.gamification.game.domain.BadgeCard;
import cn.zhangjuli.gamification.game.domain.BadgeType;
import cn.zhangjuli.gamification.game.domain.ScoreCard;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
 * @author Juli Zhang, <a href="mailto:zhjl@lut.edu.cn">Contact me</a> <br>
 */
@Service
@Slf4j
@RequiredArgsConstructor
public class GameServiceImpl implements GameService {
    private final ScoreRepository scoreRepository;
    private final BadgeRepository badgeRepository;
    private final List<BadgeProcessor> badgeProcessorList;

    @Override
    public GameResult newAttemptForUser(ChallengeSolvedEvent challenge) {
        if (challenge.isCorrect()) {
            ScoreCard scoreCard = new ScoreCard(challenge.getUserId(), challenge.getAttemptId());
            scoreRepository.save(scoreCard);
            log.info("用户 {} 的尝试 {} 得分 {}",
                    challenge.getUserAlias(), challenge.getAttemptId(), scoreCard.getScore());
            List<BadgeCard> badgeCards = processForBadges(challenge);
            return new GameResult(scoreCard.getScore(),
                    badgeCards.stream().map(BadgeCard::getBadgeType).collect(Collectors.toList()));
        } else {
            log.info("尝试 {} 不正确。用户 {} 没有得分。",
                    challenge.getAttemptId(), challenge.getUserAlias());
            return new GameResult(0, List.of());
        }
    }

    /**
     * 检查总分和不同的得分来得到徽章
     * @param challengeSolved 新的尝试
     * @return 徽章的列表
     */
    private List<BadgeCard> processForBadges(final ChallengeSolvedEvent challengeSolved) {
        Optional<Integer> optionalTotalScore =
                scoreRepository.getTotalScoreForUser(challengeSolved.getUserId());
        if (optionalTotalScore.isEmpty()) {
            return Collections.emptyList();
        }
        int totalScore = optionalTotalScore.get();

        // 得到用户的总分和徽章
        List<ScoreCard> scoreCardList =
                scoreRepository.findByUserIdOrderByScoreTimestampDesc(challengeSolved.getUserId());
        Set<BadgeType> alreadyGetBadges =
                badgeRepository.findByUserIdOrderByBadgeTimestampDesc(challengeSolved.getUserId())
                        .stream()
                        .map(BadgeCard::getBadgeType)
                        .collect(Collectors.toSet());

        // 调用徽章处理器来处理还没有得到的徽章
        List<BadgeCard> newBadgeCards = badgeProcessorList.stream()
                .filter(badgeProcessor -> !alreadyGetBadges.contains(badgeProcessor.badgeType()))
                .map(badgeProcessor -> badgeProcessor.processForOptionalBadge(totalScore,
                        scoreCardList, challengeSolved))
                .flatMap(Optional::stream)
                .map(badgeType -> new BadgeCard(challengeSolved.getUserId(), badgeType))
                .collect(Collectors.toList());

        badgeRepository.saveAll(newBadgeCards);

        return newBadgeCards;
    }
}

注意,将ChallengeSolvedDTO修改为ChallengeSolvedEvent,会影响多个类,使用IDEA环境将自动进行重构,如果不能,需要人工干预。

至此,完成了如下工作:

  1. 在Spring Boot应用程序中添加AMQP依赖启动项,以使用AMQP和RabbitMQ。
  2. 删除REST API客户端(Multiplication)和控制器(Gamification),将切换到事件驱动的架构。
  3. 将ChallengeSolvedDTO重命名为ChallengeSolvedEvent,并修改相关的类。
  4. 在微服务中声明主题交换。
  5. 更改Multiplication微服务的逻辑,发布事件而不是调用REST API。
  6. 在Gamification微服务定义队列。
  7. 在Gamification微服务中实现RabbitMQ消费者逻辑。
  8. 重构测试,使其适应新界面。

因为没有改变开放的前端交互的API,所以,前端界面不需要改变。

场景分析

现在,系统已经实现了事件驱动的架构,就来看看使用消息代理带来的优势。系统基本逻辑如图所示。

2
ChallengeSolvedEvent
(给任意感兴趣的订阅者)
绑定:
attempt.correct
1、发送尝试
Attempts
(主题交换)
4
更新分数
3、响应
Gamification
队列
尝试
浏览器
Multiplication微服务
Gamification微服务

下面就启动系统,来验证整个系统逻辑:

  • 确保RabbitMQ服务正常。
  • 运行两个微服务:Multiplication和Gamification。
  • 运行React界面。
  • 在浏览器中,进入http://localhost:15672/的RabbitMQ管理界面,使用相应的密码登录,Spring Boot默认guest/guest,如果不存在,就使用管理员创建之。

事件流

先来看看Gamification微服务的启动日志,会看到下面一些与RabbitMQ有关的信息,如下所示:

2023-12-10T09:10:56.980+08:00  INFO 32272 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2023-12-10T09:10:57.010+08:00  INFO 32272 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#4603845b:0/SimpleConnection@383c94ed [delegate=amqp://admin@127.0.0.1:5672/, localPort=53842]
2023-12-10T09:10:57.013+08:00 DEBUG 32272 --- [           main] o.s.amqp.rabbit.core.RabbitAdmin         : Initializing declarations
2023-12-10T09:10:57.023+08:00 DEBUG 32272 --- [           main] o.s.amqp.rabbit.core.RabbitAdmin         : declaring Exchange 'attempts.topic'
2023-12-10T09:10:57.025+08:00 DEBUG 32272 --- [           main] o.s.amqp.rabbit.core.RabbitAdmin         : declaring Queue 'gamification.queue'
2023-12-10T09:10:57.027+08:00 DEBUG 32272 --- [           main] o.s.amqp.rabbit.core.RabbitAdmin         : Binding destination [gamification.queue (QUEUE)] to exchange [attempts.topic] with routing key [attempt.correct]
2023-12-10T09:10:57.029+08:00 DEBUG 32272 --- [           main] o.s.amqp.rabbit.core.RabbitAdmin         : Declarations finished

使用Spring AMQP时,会记录前面两行,表明已成功连接到代理。另外的日志信息,是因为开启了RabbitAdmin类的日志记录级别为DEBUG。
Multiplication微服务端,还没有RabbitMQ日志,原因是连接和交换的声明仅在发布第一条消息时发生。当前端界面有重试到来时,就启动连接,控制台日志就会显示相关日志。如下所示:

2023-12-09T15:38:48.050+08:00  INFO 59044 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2023-12-09T15:38:48.081+08:00  INFO 59044 --- [nio-8080-exec-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#15f229e8:0/SimpleConnection@3da194a4 [delegate=amqp://guest@127.0.0.1:5672/, localPort=65533]
2023-12-09T15:38:48.083+08:00 DEBUG 59044 --- [nio-8080-exec-1] o.s.amqp.rabbit.core.RabbitAdmin         : Initializing declarations
2023-12-09T15:38:48.095+08:00 DEBUG 59044 --- [nio-8080-exec-1] o.s.amqp.rabbit.core.RabbitAdmin         : declaring Exchange 'attempts.topic'
2023-12-09T15:38:48.099+08:00 DEBUG 59044 --- [nio-8080-exec-1] o.s.amqp.rabbit.core.RabbitAdmin         : Declarations finished

可以通过RabbitMQ管理界面,了解当前状态。在Connections选项卡中,可看到微服务创建的连接,如图所示:
connections
切换到Exchanges选项卡,会看到topic类型的attempts.topic交换,声明为durable(D),如图所示:
exchanges
单击交换名称进入详细信息页面,可以看到相关信息,可以绑定队列和设置路由键,还可以发布消息,如图所示:
Exchanges详细信息
Queues选项卡显示了队列,该队列也配置为durable(D)。如图所示:
Queues列表
Channels选项卡显示了通道信息,如图所示:
Channels
了解了基本情况后,就来看看发送重试信息后的响应,使用HTTPie产生正确的尝试挑战,如下所示:

> http POST :8080/attempts factorA=50 factorB=60 userAlias=noise guess=3000

在Multiplication日志中,可以看到如何连接代理并声明交换的(第一次可见,如前面的日志所示,这里已经存在,就无效了),可以在控制台看到发布消息的输出,如下所示:

2023-12-10T09:49:49.270+08:00  INFO 59044 --- [nio-8080-exec-5] c.z.m.challenge.ChallengeServiceImpl     : attempt: ChallengeAttempt(id=1756, user=User(id=1, alias=noise), factorA=50, factorB=60, resultAttempt=3000, correct=true)

Gamification微服务将反映事件的消费情况并更新分数,日志如下:

2023-12-10T09:49:49.304+08:00  INFO 32272 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler   : 已接收到成功挑战事件:1756
2023-12-10T09:49:49.351+08:00  INFO 32272 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl    : 用户 noise 的尝试 1756 得分 10
2023-12-10T10:02:25.740+08:00  INFO 32272 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler   : 已接收到成功挑战事件:1757
2023-12-10T10:02:25.741+08:00  INFO 32272 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl    : 用户 noise 的尝试 1757 得分 10

多发布几次消息,可以从RabbitMQ管理界面中,从Queues选项卡,单击想要查看的队列名称,即可看到队列的整体情况,如图所示:
队列详细信息
当然,也可以使用前端界面来进行尝试,如图所示:
前端页面

Gamification微服务不可用

前面的微服务实现具有弹性,即使Gamification微服务不可用,也不会失效。但是会错过这段时间发送的成功尝试,现在,引入了消息代理之后呢?
停止Gamification微服务,使用HTTPie发送尝试,看看如何:

> http POST :8080/attempts factorA=50 factorB=60 userAlias=noise1 guess=3000

RabbitMQ的“队列详细信息”页面将显示排队消息,如图所示:
排队消息这些消息仍然存在,但没有消费者消费。检查Multiplication微服务日志,也没有错误。它将消息发布到代理,并向API客户端返回OK响应,实现了松耦合,Multiplication不需要知道消费者是否可用。整个过程是异步的,基于事件驱动的。
再次回到Gamification微服务,启动后将在日志中看到它会从代理接收所有事件消息。然后,会触发其业务逻辑,更新分数。现在就没有丢失任何数据。如下所示:

2023-12-10T11:00:59.801+08:00  INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler   : 已接收到成功挑战事件:1781
2023-12-10T11:00:59.861+08:00  INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl    : 用户 noise1 的尝试 1781 得分 10
2023-12-10T11:00:59.896+08:00  INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler   : 已接收到成功挑战事件:1782
2023-12-10T11:00:59.898+08:00  INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl    : 用户 noise1 的尝试 1782 得分 10
2023-12-10T11:00:59.905+08:00  INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler   : 已接收到成功挑战事件:1783
2023-12-10T11:00:59.909+08:00  INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl    : 用户 noise1 的尝试 1783 得分 10
2023-12-10T11:00:59.916+08:00  INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler   : 已接收到成功挑战事件:1784
2023-12-10T11:00:59.918+08:00  INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl    : 用户 noise1 的尝试 1784 得分 10
2023-12-10T11:00:59.924+08:00  INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler   : 已接收到成功挑战事件:1785
2023-12-10T11:00:59.926+08:00  INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl    : 用户 noise1 的尝试 1785 得分 10
2023-12-10T11:00:59.931+08:00  INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler   : 已接收到成功挑战事件:1786
2023-12-10T11:00:59.931+08:00  INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl    : 用户 noise1 的尝试 1786 得分 10
2023-12-10T11:00:59.938+08:00  INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameEventHandler   : 已接收到成功挑战事件:1787
2023-12-10T11:00:59.939+08:00  INFO 59708 --- [ntContainer#0-1] c.z.gamification.game.GameServiceImpl    : 用户 noise1 的尝试 1787 得分 10

还可用用更新的分数来验证排行榜。系统不仅有弹性,还可用在故障后恢复。RabbitMQ界面的队列详细信息也显示了排队消息的计数为0,因为已经消费完了。
想象一下,如果RabbitMQ可以在丢弃消息之前配置将消息保留在队列中的时间(生存时间,TTL),还可以配置队列的最大长度,就可以根据用户需要进行调整了。
例如,可以将前面Queue配置进行设置,来提供相关的配置,可以配置队列使其具有6小时TTL和最大长度为25000条消息,代码如下:

    @Bean
    public Queue gamificationQueue(
            @Value("${amqp.queue.gamification}") final String queueName
    ) {
        return QueueBuilder.durable(queueName)
                .ttl((int) Duration.ofHours(6).toMillis())
                .maxLength(25000L)
                .build();
    }

消息代理不可用

在队列有待传递的消息时关闭代理呢?要测试这种情况,按照如下步骤操作:

  1. 停止Gamification服务。
  2. 使用一个用户别名发送一些正确的测试,并在RabbitMQ管理界面验证队列是否保存了这些消息。
  3. 停止RabbitMQ代理。
  4. 再次发送正确的测试。
  5. 启动Gamification微服务。
  6. 大约10秒后,再次启动RabbitMQ代理。

和前面一样,手动测试后,代理关闭前,发送的尝试被RabbitMQ放入队列,但没有得到处理。停止RabbitMQ后,再次执行测试,将获得HTTP错误响应,日志如下:

2023-12-10T15:07:21.148+08:00  INFO 59044 --- [nio-8080-exec-6] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2023-12-10T15:07:21.153+08:00 ERROR 59044 --- [nio-8080-exec-6] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: no further information] with root cause

java.net.ConnectException: Connection refused: no further information
//...其余内容省略

可添加一个try/catch子句,来抑制错误,更好的方法是实现自定义HTTP错误处理程序,以返回特定的错误响应,如:503服务不可达,指示与代理断开时系统无法运行,可有多种选择。
再次启动Gamification微服务后,仍然可以运行,但会不断重试连接,日志如下:

2023-12-10T15:14:05.528+08:00  WARN 21044 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: no further information
2023-12-10T15:14:05.528+08:00  INFO 21044 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@52903e9b: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
2023-12-10T15:14:05.529+08:00  INFO 21044 --- [ntContainer#0-3] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]
2023-12-10T15:14:05.530+08:00 ERROR 21044 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Failed to check/redeclare auto-delete queue(s).
2023-12-10T15:14:05.530+08:00  INFO 21044 --- [ntContainer#0-3] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [localhost:5672]

而且,新的尝试请求到来时,Multiplication微服务也会执行相同的操作。再次启动RabbitMQ代理时,两个微服务恢复连接。当Gamification重新连接到RabbitMQ后,就会收到排队的事件,进行后续的处理。这是因为声明了持久化的交换和队列,而且Spring实现在发布消息时使用了持久化传递模式。如果使用RabbitTemplate(而不是AmqpTemplate)来发布消息,可以自己设置消息的属性,将消息传递模式修改为非持久化示例如下:

MessageProperties properties = MessagePropertiesBuilder.newInstance()
    .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
    .build();
rabbitTemplate.getMessageConverter().toMessage(challengeAttempt, properties);
rabbitTemplate.convertAndSend(challengesTopicExchange, routingKey, event);

这个例子告诉大家应该了解所使用的工具的配置项。以持久性方式发送所有消息带来了不错的优势,但会带来额外的性能开销。如果配置正确分布的RabbitMQ集群,宕机的可能性会很小,也可能愿意接受潜在的消息丢失以提高性能,这取决于需求。

事务性

前面的测试暴露了一个意外情况,代理关闭后发送尝试,将收到服务器错误,错误代码为500,给客户端造成了一种印象,未正常处理该消息,实际上已经处理了一部分了。
停止代理,运行Multiplication,再次发送尝试,将得到错误响应,例如:

> http POST :8080/attempts factorA=50 factorB=60 userAlias=noise3 guess=3000
HTTP/1.1 500
Connection: close
Content-Type: application/json
Date: Sun, 10 Dec 2023 07:44:59 GMT
Transfer-Encoding: chunked
Vary: Origin, Access-Control-Request-Method, Access-Control-Request-Headers

{
    "error": "Internal Server Error",
    "path": "/attempts",
    "status": 500,
    "timestamp": "2023-12-10T07:44:59.980+00:00"
}

但是,查看数据库,进入Multiplication数据库的H2控制台,查看ChallengeAttempt数据表,可以看到最后添加了1行尝试,这意味着,即使收到了错误响应,该信息也已经存储了。如果代码在尝试将消息发送给代理之前持久化对象,这就无法避免。
结果
可将服务方法verifyAttempt中包含的整个逻辑视为一个事务,可回滚数据库事务(不执行)。即使在调用存储库中的save方法之后仍然出现错误,仍然可以回滚。使用Spring框架来实现该操作很容易,只需要将@Transactional注解添加到方法上即可,代码如下:

    @Transactional
    @Override
    public ChallengeAttempt verifyAttempt(ChallengeAttemptDTO attemptDTO) {
        // ...
    }

如果在这段代码中存在异常,则事务将回滚。如果要求服务中的所有方法都具有事务性,可以在类级别添加该注解。
导致 @Transactional 失效的常见场景有以下 5 个:

  1. 非 public 修饰的方法;
  2. timeout 超时时间设置过小;
  3. 代码中使用 try/catch 处理异常;
  4. 调用类内部的 @Transactional 方法;
  5. 数据库不支持事务。

修改后重试相同的步骤:重启Multiplication微服务,关闭代理发送尝试,查看数据库存储,发现没有存储。由于抛出异常,Spring回滚数据库操作,就像未执行过。
Spring还支持发布者和订阅者双方的RabbitMQ事务,在@Transactional注解的方法范围内使用AmqpTemplate或RabbitTemplate实现发送消息,当在通道(RabbitMQ)中启动事务性时,即使在发送这些消息的方法调用之后发生异常,这些消息也不会到达代理。在消费端,也可以使用事务拒绝已处理的消息。这种情况下,需要设置队列以重新排列被拒绝的消息(这是默认情况)。
类似情况下,可简化事务处理策略,将其限制在数据库中:

  • 发布时,如果只有一个代理操作,则可在流程结束时发布消息,在发送消息之前或期间发生的任何错误都将导致数据库操作回滚。
  • 在订阅者端,如果有异常,默认情况下将拒绝该消息。如果是想要的消息,则可以对其重新排列。然后,还可以在方法中使用@Transactional注解,如果发生故障,数据库将回滚。

微服务内的本地事务性对保持数据一致性并避免部分流程完成至关重要,当业务逻辑涉及多个步骤并可能与数据库或消息代理之类的外部组件交互时,应该考虑到业务逻辑可能出错。

扩展微服务

微服务架构的优势之一是可以独立扩展系统的各个部分,在消息代理中引入事件驱动方法的好处,可以透明地添加更多的发布者和订阅者,目前为止一直在运行微服务的单个实例,不能确定架构还支持为每个微服务添加更多的副本。
不能使用多个微服务实例的第一个原因是:数据库。当水平扩展微服务时,所有副本都应共享数据层,并将数据存储在一个公共位置,而不是每个实例都隔离。微服务必须是无状态的,原因是不同的请求或消息可能最终出现在不同的微服务实例中。例如,不应该假定同一Multiplication实例将处理来自同一用户发送的两次尝试,因此,不能在尝试中保持任何内存状态,如图所示:

8080
9080
发送尝试/获取用户别名
Multiplication实例1
Multiplication实例2
Multiplication

好的一面是,微服务是无状态的,可以独立处理每个请求或消息,结果最终存储在数据库中。但有一个技术问题,如果在端口9080上启动第二个实例,将无法启动,因为它试图创建新的数据库实例。
下面就来看看这个问题,先正常运行应该8080端口的Multiplication实例。然后,启动第二个实例,覆盖server.port参数为9080,以避免端口冲突。可执行如下命令:

./mvnw spring-boot:run -D"spring-boot.run.arguments"="--server.port=9080"

当启动第二个实例时,日志会提示错误:

org.h2.jdbc.JdbcSQLNonTransientConnectionException: Database may be already in use: "~/multiplication.mv.db". Possible solutions: close all other connection(s); use the server mode [90020-214]

发生错误的原因是H2数据库引擎,该引擎默认情况下被设计成嵌入式进程,而不是服务器。需要改变的是使用H2数据库服务器的模式,以便可以使用多个实例进行连接,在application.properties中添加配置如下:

spring.datasource.url=jdbc:h2:file:~/multiplication1;DB_CLOSE_ON_EXIT=FALSE;AUTO_SERVER=true;

注意,AUTO_SAVE=true可能不起作用,可以换成MySQL来试试。

第二个挑战是负载均衡。如果启动多个实例,如何从用户界面连接到它们?下面看看消息代理是如何实现RabbitMQ消息订阅者之间的负载均衡的。如图所示:

2 负载均衡
1 负载均衡
发送尝试/获取用户别名
8080
发送尝试/获取用户别名
9080
获取排行榜
8081
获取排行榜
9081
3
3
4
4
Gamification实例1
Gamification实例2
Multiplication实例1
Multiplication实例2
浏览器
Multiplication
Gamification
尝试
(主题交换)
Gamification队列

RabbitMQ支持多种来源的消息发布,这意味着可以有多个Multiplication微服务副本将事件发布到同一主题交换,这是透明的,这些实例打开不同的连接,声明交换(仅在第一次创建),然后发布数据而不必知道还有其他发布者。在订阅者端,由多个消费者共享,当启动多个Gamification微服务实例时,所有实例都声明相同的队列和绑定,代理足够智能,可在它们之间进行负载均衡。这就在消息级别解决了负载均衡的问题。
启动每个微服务的实例、前端和RabbitMQ服务。在两个单独的终端运行另外的两个微服务实例,让Multiplication和Gamification都有两个副本,注意端口不同:

> ./mvnw spring-boot:run -D"spring-boot.run.arguments"="--server.port=9080"
> ./mvnw spring-boot:run -D"spring-boot.run.arguments"="--server.port=9081"

一旦启动并运行了所有实例,在前端重试尝试,这些尝试会使用Multiplication实例1的服务,但事件消费在两个Gamification副本之间平衡。检查日志可以验证Gamification实例如何处理事件的。另外,由于数据库在各个实例之间共享,因此前端可以从运行的端口8081的实例请求排行榜,此实例将聚合所有副本存储的得分和徽章。如图所示:

发送尝试/获取用户别名
8080
发送尝试/获取用户别名
9080
获取排行榜
8081
获取排行榜
9081
发送尝试
浏览器
Multiplication实例1
Multiplication实例2
Multiplication
Gamification实例1
Gamification实例2
Gamification
尝试
(主题交换)
Gamification队列
HTTP命令行

还可以验证是否有多个发布者一起使用命令行将正确的尝试发送到Multiplication微服务的第二个实例,使用命令行向位于端口9080的实例发送调用,并检查,可以看到,消息在各个订阅者之间是平衡的,命令行如下:

> http POST :8080/attempts factorA=50 factorB=60 userAlias=noise6 guess=3000

现在,通过消息代理可以实现良好的系统可伸缩性,在多个订阅者之间实现负载均衡,也提高了系统的弹性。停止一个Gamification实例,代理会自动将消息定向到另一个实例。经过测试可以发现整个系统仍然能够正常工作。这里的问题是,前端无法进行负载均衡,或检测到一个副本已关闭。

小结

文章介绍了事件驱动架构的实现过程,了解了如何通过消息代理实现微服务之间的松耦合,事件模式不针对特定目标,仅表示在特定领域中发生的事实,通过对不同的消息类型进行建模,从而实现松耦合,结合RabbitMQ和AMQP提供的方案,实现消息发布和交换、订阅消息的队列和路由绑定,配置相关参数,适应功能性和非功能性需求。使用Spring Boot实现AMQP和RabbitMQ的集成,很容易构建事件驱动架构的应用程序。通过示例,了解事件驱动架构的使用场景,实现了良好的系统可伸缩性,很容易在多个订阅者之间实现负载均衡,也提高了系统的弹性。不同的工具,可能有不同的利弊,应根据需要来选择。

前面的文章:
1、1 一个测试驱动的Spring Boot应用程序开发
2、2 使用React构造前端应用
3、3 试驱动的Spring Boot应用程序开发数据层示例
4、4 向微服务架构转变

后面的文章:
6、6 使用网关模式进行负载均衡

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/241317.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于ssm大学生创新创业平台项目管理子系统设计与实现论文

摘 要 互联网发展至今&#xff0c;无论是其理论还是技术都已经成熟&#xff0c;而且它广泛参与在社会中的方方面面。它让信息都可以通过网络传播&#xff0c;搭配信息管理工具可以很好地为人们提供服务。针对大学生创新创业项目信息管理混乱&#xff0c;出错率高&#xff0c;信…

后台业务管理系统原型模板,Axure后台组件库(整套后台管理页面)

后台业务系统需要产品经理超强的逻辑思维能力和业务理解能力&#xff0c;整理了一批后台原型组件及完整的用 Axure 8 制作的后台系统页面&#xff0c;方便产品经理们快速上手制作后台原型。 包括交互元件、首页、商品、订单、库存、用户、促销、运营、内容、统计、财务、设置、…

跨品牌的手机要怎样相互投屏?iPhone和iPad怎么相互投屏?

选择买不同品牌的手机是基于品牌声誉、产品特点、价格和性价比等多个因素的综合考虑。每个人的需求和偏好不同&#xff0c;选择适合自己的手机品牌是一个个人化的决策。 一些品牌可能更加注重摄影功能&#xff0c;而其他品牌可能更加注重性能和速度。选择不同品牌的手机可以根据…

Transformer预测销售量

&#x1f916; 专栏《人工智能》 &#x1f4d6; 博客说明&#xff1a; 本专栏记录我个人学习和实践人工智能相关算法的心得与内容&#xff0c;一同探索人工智能的奇妙世界吧&#xff01; &#x1f680; 零、说明 心血来潮&#xff0c;想利用Transformer做一个销售量预测的内容…

k8s集群部分使用gpu资源的pod出现UnexpectedAdmissionError问题

记录一次排查UnexpectedAdmissionError问题的过程 1. 问题 环境 3master节点N个GPU节点 kubelet版本&#xff1a;v1.19.4 kubernetes版本&#xff1a;v1.19.4 生产环境K8S集群&#xff0c;莫名其妙的出现大量UnexpectedAdmissionError状态的Pod&#xff0c;导致部分任务执…

C# | CountdownEvent使用教程 (通过与ManualResetEvent对比,快速了解其特性)

C# CountdownEvent使用教程 对于熟悉ManualResetEvent的同学来说&#xff0c;了解CountdownEvent的差异对于更好地利用它们是非常重要的。通过对ManualResetEvent和CountdownEvent的对比&#xff0c;我们可以更好地理解CountdownEvent的特点和使用场景。 ManualResetEvent回顾…

SpringCloud微服务 【实用篇】| Docker启示录

目录 一&#xff1a;Docker启示录 1. Docker启示录 2. Docker和虚拟机的区别 3. Docker架构 4. Centos7安装Docker 4.1. 卸载 4.2. 安装docker 4.3. 启动docker 4.4. 配置镜像加速 前些天突然发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽…

阿里云国际版如何为SSL证书更换域名?

如果您需要为已签发的SSL证书更换绑定的域名&#xff0c;您可以使用数字证书管理服务提供的更换域名功能。本文介绍如何为SSL证书更换域名。 操作步骤 为SSL证书变更域名&#xff0c;您相当于重新购买了一张新证书&#xff0c;需要支付一定的费用。 您在更换域名前&#xff…

快速解决Edge浏览器常见问题:完整教程

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 目录 文章目录 前言 一、Edge浏览器是什么&#xff1f; 二、常见的问题 1. DNS服务器出错 解决方案一&#xff1a;清除浏览器缓存和Cookie 2.网络问题 3.缓存和Cook…

Java .shp文件解析转换成地图可用的经纬度格式

1.新建ShapeUtils工具类解析shp文件 package com.ruoyi.info.geotoolsUtils; import java.io.File; import java.io.IOException; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.Lis…

C语言实现在顺序表中找到最大值

用C语言实现在顺序表中找到最大值&#xff1a; #include <stdio.h> #define MAX_SIZE 100 int findMax(int arr[], int size) { int max arr[0]; // 假设第一个元素为最大值 for (int i 1; i < size; i) { // 从第二个元素开始遍历列表 if (…

新手上路:盘点「性能测试」必须掌握的技术点

前段时间&#xff0c;有一些小伙伴提出希望我们推送点性能测试的技术干货。所以&#xff0c;小编今天通过上网查资料&#xff0c;结合项目实操过程中的一些问题&#xff0c;总结了一些关于性能测试的内容&#xff0c;希望是大家想要了解的内容哈。 1、性能测试的目的 首先&am…

数据可视化:解锁企业经营的智慧之道

在现代企业管理中&#xff0c;数据可视化已经成为了一项重要的工具。它不仅仅是简单地展示数据&#xff0c;更是提供了深入理解数据、做出更明智决策的方法。作为一名可视化设计从业人员&#xff0c;我经手过一些企业自用的数据可视化项目&#xff0c;今天就来和大家聊聊数据可…

vue3 echarts 各省地图展示

效果&#xff1a; 1.在src下新建utils文件夹添加各省地图的json文件&#xff08;下载各省地图的网址 DataV.GeoAtlas地理小工具系列&#xff09; 2.安装echarts npm install echarts 3.在项目文件中中引入json <template><div class"back"><div id…

在接触新的游戏引擎的时候,如何能快速地熟悉并开发出一款新游戏?

引言 大家好&#xff0c;今天分享点个人经验。 有一定编程经验或者游戏开发经验的小伙伴&#xff0c;在接触新的游戏引擎的时候&#xff0c;如何能快速地熟悉并开发出一款新游戏&#xff1f; 利用现成开发框架。 1.什么是开发框架&#xff1f; 开发框架&#xff0c;顾名思…

微信小程序、uniapp仿扎克新闻(附源码)

介绍 本着试试 mpvue 的态度开发此程序&#xff0c;界面主要是模仿 ZAKER 新闻&#xff0c;数据全部是由 Mock 随机生成的&#xff0c;使用的是 Easy-Mock 服务。本程序只开发了的几个页面&#xff0c;尝试了自定义组件&#xff0c;路由跳转及参数传递等功能。再开发下去只是组…

微信游戏开发:连接社交与娱乐的创新之路

在移动互联网时代&#xff0c;微信已经成为了人们日常生活中不可或缺的社交工具。而微信游戏&#xff0c;作为在这一平台上崛起的新兴产业&#xff0c;不仅给用户提供了更多娱乐选择&#xff0c;也为开发者们创造了独特的机遇。本文将探讨微信游戏开发的关键步骤、技术要点以及…

C# OpenCvSharp DNN 部署yolov5旋转目标检测

目录 效果 模型信息 项目 代码 下载 C# OpenCvSharp DNN 部署yolov5旋转目标检测 效果 模型信息 Inputs ------------------------- name&#xff1a;images tensor&#xff1a;Float[1, 3, 1024, 1024] -------------------------------------------------------------…

六、CM4树莓派USBRS转485串口通讯

一、串行通讯接口 串行通讯接口简称串口&#xff08;UART&#xff09; 采用串行通信方式的扩展接口&#xff0c;数据位一位一位的按照顺序传送 优点&#xff1a;通信线路简单&#xff0c;只要一对传输线就可以实现双向通信能够大大降低成本&#xff0c;适合远距离通信。 缺点…

【后端学前端】第三天 css动画 动态搜索框(定位、动态设置宽度)

1、学习信息 视频地址&#xff1a;css动画 动态搜索框&#xff08;定位、动态设置宽度&#xff09;_哔哩哔哩_bilibili 2、源码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>test3</title>…