消息中间件在现代分布式系统中起着关键作用,它们提供了一种可靠且高效的方法来进行异步通信和解耦。在这篇博客中,我们将重点介绍 RabbitMQ,一个广泛使用的开源消息中间件。我们将深入探讨 RabbitMQ 的特性、工作原理以及如何在应用程序中使用它来实现可靠的消息传递。
一、RabbitMQ 简介
RabbitMQ 是基于 AMQP(高级消息队列协议)的开源消息中间件。它提供了一个可靠的、灵活的、可扩展的消息传递机制,广泛应用于各行各业。RabbitMQ 的核心思想是生产者将消息发送到交换机,交换机根据路由规则将消息传递给队列,然后消费者从队列中获取并处理消息。
二、相关概念
RabbitMQ 是一个开源的消息中间件,它是由 Erlang 语言编写的,并且实现了高级消息队列协议(AMQP)。作为一种可靠、灵活和可扩展的消息传递系统,RabbitMQ 提供了在应用程序之间传输数据的可靠机制。
下面是 RabbitMQ 的一些关键特性和概念的详解:
-
消息队列:RabbitMQ 使用消息队列来存储和传递消息。消息队列是一种先进先出(FIFO)的数据结构,它将消息暂时存储在其中,直到消费者准备好接收并处理它们。
-
生产者和消费者:消息的发送者称为生产者,而消息的接收者称为消费者。生产者将消息发送到队列中,而消费者从队列中获取消息并进行处理。
-
队列:队列是 RabbitMQ 的核心部分,它是消息的存储和传递载体。生产者将消息发送到队列中,而消费者则从队列中获取消息。队列可以持久化,这意味着即使 RabbitMQ 服务器关闭,消息也不会丢失。
-
交换机(Exchange):交换机是消息的路由器,它将消息从生产者发送到队列。它根据特定的规则将消息路由到一个或多个队列。RabbitMQ 提供了不同类型的交换机,包括直连交换机、主题交换机、广播交换机等。
-
绑定(Binding):绑定将交换机和队列关联起来,以定义消息在交换机和队列之间的路由规则。绑定规定了消息应该如何从交换机路由到队列。
-
路由键(Routing Key):路由键是生产者在发送消息时与消息一起指定的属性。交换机根据路由键来确定将消息路由到哪个队列。
-
持久化:当队列或消息被标记为持久化时,它们会被保存到磁盘上,以防止在 RabbitMQ 重启后丢失数据。
-
发布-订阅模式:RabbitMQ 支持发布-订阅模式,其中一个生产者发送消息到交换机,交换机将消息广播给所有与其绑定的队列,然后每个队列的消费者都可以接收并处理消息。
-
ACK 机制:消费者可以使用 ACK 机制告知 RabbitMQ 已经成功接收和处理了消息。只有在消费者明确确认之后,RabbitMQ 才会将消息从队列中删除。
-
消息确认和持久化:RabbitMQ 允许生产者在发送消息时请求确认。如果消息无法成功路由到队列,或者在路由过程中发生错误,RabbitMQ 将通知生产者。此外,消息和队列的持久化可以确保即使在 RabbitMQ 重启后也不会丢失数据。
总之,RabbitMQ 是一个可靠和灵活的消息中间件,它以消息队列作为核心,使用交换机、队列、绑定等概念来进行消息的路由和传递。它提供了高度可靠的消息传递机制和丰富的特性,广泛用于分布式系统、微服务架构、任务队列等场景中,帮助应用程序实现解耦、异步通信和可靠性保证。
三、RabbitMQ 的工作原理
RabbitMQ 的工作原理可以简单地概括为以下几个步骤:
1、生产者发送消息到交换机:
生产者通过连接到 RabbitMQ,并将消息发送到预定义的交换机。消息可以包含任何结构化数据,如 JSON、XML 等格式。
String message = "Hello, RabbitMQ!";
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
2、交换机将消息路由到队列:
交换机根据预定义的路由规则将消息路由到一个或多个绑定到它上面的队列。路由规则可以根据完全匹配、模式匹配等方式进行。
channel.queueBind(queueName, exchangeName, routingKey);
3、消费者从队列中获取消息:
消费者通过连接到 RabbitMQ,并订阅感兴趣的队列。一旦有消息到达队列,RabbitMQ 将立即将该消息推送给订阅的消费者进行处理。
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received message: " + message);
}
});
4、消费者处理消息并发送确认:
消费者获取到消息后,可以进行相应的业务逻辑处理。一旦消息被成功处理,消费者将发送确认给 RabbitMQ,告知消息已经被消费,RabbitMQ 可以安全删除该消息。
生产者将消息发送到名为 "my_exchange" 的交换机,并通过路由键 "my_routing_key" 将消息路由到名为 "my_queue" 的队列。消费者订阅了 "my_queue" 队列,并在收到消息时调用回调函数进行处理。
四、RabbitMQ 的特性
RabbitMQ 提供了许多强大的特性,使其成为一个广泛使用的消息中间件:
- 持久化:RabbitMQ 可以将消息和队列持久化到磁盘,即使在服务器重启后也不会丢失消息。
- 灵活的路由规则:通过使用不同的交换机类型和路由键,可以实现精确的消息路由策略。
- 可靠性和可恢复性:RabbitMQ 提供了多种保证消息可靠传递的机制,如消息确认(acknowledgement)、事务、发布者确认等。
- 可扩展性:RabbitMQ 支持分布式部署和集群模式,可以实现高吞吐量和高可用性。
- 多语言客户端:RabbitMQ 提供了多种官方支持的客户端库,如 Java、Python、Ruby 等,方便开发者在不同的语言环境下使用。
五、RabbitMQ 在实际应用中的应用场景
参考文章:
【RabbitMQ】什么是RabbitMQ?RabbitMQ有什么用?应用场景有那些?_路遥叶子的博客-CSDN博客
RabbitMQ 在各种场景中都有广泛的应用,包括但不限于:
- 异步任务处理:将需要耗时较长的任务发送到 RabbitMQ,并由消费者异步执行,以提高系统的响应性能和可伸缩性。
- 事件驱动架构:通过使用 RabbitMQ 来实现事件的发布与订阅,不同的组件可以通过订阅感兴趣的事件来解耦。
- 应用解耦与流量控制:通过引入消息中间件,不同的应用程序可以进行解耦,并实现流量控制、服务降级等机制。
- 日志收集、分析:使用 RabbitMQ 将分布式系统中的日志消息发送到中央日志服务器进行集中管理和分析。
5.1 服务间解耦
用户订单,库存处理。【服务间解耦】
使用MQ前:
系统正常时,用户下单,订单系统调用库存系统进行删减操作,操作成功,将成返回消息,提醒下单成功。系统异常时,库存系统将无法访问,导致订单删减操作无法执行,最终导致下单失败。
使用MQ后:
订单系统和库存系统之间不在互相影响,独立运行,达到了应用解耦的目的。订单系统只需要将下单消息写入MQ,就可以直接执行下一步操作。这时即使库存系统出现异常也不会影响订单系统的操作,且下单的库存删减记录,将会被永久保存到MQ中,直到库存系统恢复正常,从MQ中订阅下单消息,进行消费成功为止。
使用MQ前:
使用MQ后:
5.2 实现异步通信
用户注册,发送手机短信,邮件。【实现异步通信】
使用MQ前:
整个操作流程,全部在主线程完成。点击用户注册 --》 入库添加用户 --》发送邮件 --》发送短信。每一步都需要等待上一步完成后才能执行。且每一步操作的响应时间不固定,如果请求过多,会导致主线程请求耗时很长,响应慢,甚至会导致死机的情况出现,严重影响了用户的体验。
使用MQ后:
主线程只需要处理耗时较低的入库操作,然后把需要处理的消息写进MQ消息队列中,然后由不同的独立的邮件系统和发短信系统,同时订阅消息队列中的消息进行消费。这样通过消息队列作为一个中间人去保存和传递消息,不仅仅耗时低消耗的资源也很少且单个服务器能够承受的并发请求将更多。
5.3 流量削峰
商品秒杀和抢购。【流量削峰】
流量削峰是消息队列中常用的场景 一般在秒杀或团购活动中使用广泛。
使用MQ前:对于秒杀、抢购活动,用户访问所产生的流量会很大,甚至会在同一时间段出现上万上亿条请求,这股瞬间的流量暴涨,我们的应用系统配置是无法承受的,会导致系统直接崩溃死机。
例如:A系统平时每秒请求100个,系统稳定运行; 但是晚上8点有秒杀活动 ,每秒并发增至1万条 ,系统最大处理每秒1000条 于是导致系统崩溃。
使用MQ后:我们在大量用户进行秒杀请求时,将那个巨大的流量请求拒在系统业务处理的上层,并将其转移至MQ中,而不是直接涌入我们的接口。在这里MQ消息队列起到了缓存作用。
例如:100万用户在高峰期,每秒请求5000个,将这5000个请求写入MQ系统每秒只能处理2000请求,因为MySQL只能处理2000个请求 ; 系统每秒拉取2000个请求 不超过自己的处理能力即可。
使用MQ前:
使用MQ后:
5.4 其他应用场景:
1、订单处理系统:
在一个电子商务平台中,可以使用 RabbitMQ 来处理订单。当用户下单时,订单信息被发布到 RabbitMQ 的交换机中,然后相关的消费者从队列中获取订单消息并进行处理,如验证订单、库存管理、支付等。
2、日志收集与分发:
假设有多个应用程序生成日志,并希望将它们集中处理和存储。每个应用程序可以将日志消息发布到一个名为 "log_exchange" 的交换机中,然后有不同的消费者订阅该交换机,并将日志消息写入数据库或发送到日志分析系统。
3、实时数据传输:
如果有一个实时监控系统,需要将传感器数据实时传输到监控平台进行处理和可视化展示。传感器将数据发布到 RabbitMQ 的交换机中,监控平台的消费者订阅交换机并处理数据,从而实现实时监控和报警功能。
4、异步任务处理:
假设有一个应用程序需要处理大量耗时的任务,如图像处理、PDF 转换等。应用程序将任务发布到 RabbitMQ 的队列中,然后有多个工作节点作为消费者从队列中获取任务并进行处理,以实现任务的并行处理和减轻主应用程序的压力。
5、消息通知系统:
假设在一个订阅系统中,用户可以订阅不同的主题或事件。当有新的消息发布时,RabbitMQ 会将消息路由到对应的队列,然后订阅该队列的用户会收到相应的通知。这种方式可以用于实现邮件订阅、新闻推送等功能。
6、微服务架构:
在一个微服务架构中,不同的服务之间可能需要进行消息传递和协作。使用 RabbitMQ 可以实现服务之间的解耦和异步通信,每个服务通过交换机和队列收发消息,从而实现微服务之间的松耦合。
六、 RabbitMQ 安装
官网地址:
RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ
6.1 brew安装
brew update #更新一下homebrew
brew install rabbitmq #安装rabbitMQ
安装结果:
==> Caveats
==> rabbitmq
Management Plugin enabled by default at http://localhost:15672To restart rabbitmq after an upgrade:
brew services restart rabbitmq
Or, if you don't want/need a background service you can just run:
CONF_ENV_FILE="/opt/homebrew/etc/rabbitmq/rabbitmq-env.conf" /opt/homebrew/opt/rabbitmq/sbin/rabbitmq-server
rabbitmq 的安装路径:
/opt/homebrew/opt/rabbitmq
6.2 、配置环境变量
1、
vi ~/.bash_profile
2、
export RABBIT_HOME=${PATH}:/opt/homebrew/opt/rabbitmq
export PATH=${PATH}:$RABBIT_HOME/sbin
3、
source ~/.bash_profile
6.3 、启动RabbitMQ
1、前台运行
rabbitmq-server
2、后台运行
rabbitmq-server -detached
3、查看运行状态
rabbitmqctl status
4、开始 Web插件
rabbitmq-plugins enable rabbitmq_management
5、重启
rabbitmq-server restart
5、关闭
rabbitmqctl stop
6.4、访问MQ
1、浏览器地址
http://localhost:15672/
默认用户名和密码为guest
添加用户
rabbitmqctl add_user miaojiang 123
设置用户为管理员
rabbitmqctl set_user_tags miaojiang administrator
配置用户可以远程登录
rabbitmqctl set_permissions -p "/" miaojaing ".*" ".*" ".*"
查看新添加的账户
rabbitmqctl list_users
查看用于的权限
rabbitmqctl list_permissions -p /
七、Spring Boot 项目应用RabbitMQ
7.1、添加Maven依赖:
在你的项目的pom.xml文件中添加RabbitMQ客户端库的依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
7.2、配置RabbitMQ连接:
在Spring Boot的配置文件(application.properties 或 application.yml)中添加RabbitMQ的连接信息。
application.properties:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
在application.yml配置mq的参数:
spring:
rabbitmq:
#设置RabbitMQ的IP地址
host: localhost
#设置rabbitmq服务器用户名
username: guest
#设置rabbitmq服务器密码
password: guest
#设置rabbitmq服务器连接端口
port: 5672
7.3 创建交换机
自定义交换机名称
创建名为“myExchange”的交换机
package com.example.usermanagement.mq;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
/*
使用 @Configuration 注解创建一个配置类,并通过 @Bean 注解创建了一个名为 declareExchange 的方法,用于声明创建交换机。请根据实际情况修改交换机名称、类型和持久化设置。
*/
public static final String EXCHANGE_NAME = "myExchange";
@Bean
public Exchange declareExchange() {
return ExchangeBuilder.directExchange(EXCHANGE_NAME)
.durable(true)
.build();
}
}
7.4 创建消息发送者
创建消息发送者:创建一个消息发送者的类,用于发送消息到RabbitMQ
package com.example.usermanagement.mq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageSender{
private final AmqpTemplate amqpTemplate;
private final String exchangeName = "myExchange"; // 自定义交换机名称
@Autowired
public MessageSender(AmqpTemplate amqpTemplate) {
this.amqpTemplate = amqpTemplate;
}
public void sendMessage(Object message) {
amqpTemplate.convertAndSend(exchangeName, "", message); // 发送消息到默认交换机和空路由键
}
}
注意:
sendMessage 类型使用的是Object类型
7.5 RabbitMQ管理后台添加对列
步骤:
-
打开浏览器,输入RabbitMQ管理后台的URL。默认情况下,该URL为
http://localhost:15672/
。请确保你的RabbitMQ服务器正在运行,并且端口号正确。 -
输入用户名和密码以登录到RabbitMQ管理后台。默认情况下,用户名为
guest
,密码也为guest
。如果你修改过用户名和密码,请使用你的自定义凭据进行登录。 -
成功登录后,你将看到RabbitMQ管理后台的主界面。在顶部导航栏中,选择
Queues
选项卡。 -
在
Queues
页面上,你将看到已经存在的队列列表。如果你想要创建一个新队列,请点击Add a new queue
按钮。 -
在添加队列的页面上,填写以下信息:
Name
:队列的名称。为队列提供一个唯一的名称。(如myQueue)Durability
:队列的持久性。选择是或否,以指定队列是否应该在RabbitMQ服务重启后保留。Auto delete
:队列的自动删除。选择是或否,以指定当最后一个消费者断开连接后,是否删除队列。Arguments
:队列的其他参数。这是可选的,你可以为队列设置一些特定的参数。
-
填写完队列信息后,点击
Add queue
按钮以创建队列。 -
创建成功后,你将在
Queues
页面上看到新添加的队列。你可以在该页面上查看队列的详细信息,包括消息数量、消费者数量等。
http://localhost:15672/#/queues
只需要添加队列名称就可以
7.6 调用生产者
1、注入MessageSender
实例
@Autowired
private MessageSender messageSender;
2、在需要发送消息的地方调用messageSender.sendMessage
方法。根据你的业务逻辑,你可以在合适的位置调用该方法。例如,在订单创建成功后,你可以添加以下代码:
messageSender.sendMessage("订单已创建:" + order.getOrderId());
7.7 创建消息接收者
创建消息接收者:创建一个消息接收者的类,用于处理接收到的RabbitMQ消息。
这里就直接写处理RabbitMQ消息的逻辑。
package com.example.usermanagement.mq;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageReceiver {
@RabbitListener(bindings = @QueueBinding(
value = @Queue("your_queue_name"),
exchange = @Exchange(value = RabbitMQConfig.EXCHANGE_NAME)
// key = "your_routing_key"
))
public void receiveMessage(Object message) {
System.out.println("Received message: " + message);
// 处理消息逻辑
}
}
注意:
sendMessage 类型使用的是Object类型
your_queue_name
替换为你要监听的队列的名称,(如myQueue)
将 your_routing_key
替换为适当的路由键(如果使用)
八、MQ界面介绍
Overview(概览):提供了一份概览报告,包括服务器和集群信息、节点状态、队列和连接摘要、以及最近的相关日志条目。
Connections(连接):显示当前连接到 RabbitMQ 服务器的客户端应用程序,包括连接的名称、协议、虚拟主机等信息。
Channels(通道):显示每个连接上的活动通道,以及与每个通道相关的一些指标,如消费者数量、未确认的消息数量等。
Exchanges(交换机):列出了所有的交换机,包括名称、类型、绑定的队列和绑定的数量。
Queues(队列):显示了所有的队列,包括名称、消息数量、消费者数量等信息。您还可以通过队列进行一些操作,如创建、删除、清空等。
Admin(管理员):提供了一些高级管理功能,如用户和权限管理、虚拟主机管理、插件管理等。
8.1 Overview(概览)
Overview(概览):提供了一份概览报告,包括服务器和集群信息、节点状态、队列和连接摘要、以及最近的相关日志条目。
8.2 Connections(连接)
Connections(连接):显示当前连接到 RabbitMQ 服务器的客户端应用程序,包括连接的名称、协议、虚拟主机等信息。
8.3 Channels(通道)
Channels(通道):显示每个连接上的活动通道,以及与每个通道相关的一些指标,如消费者数量、未确认的消息数量等。
8.3.1 prefetch(预取)
在消息队列(Message Queue)中,prefetch(预取)是一个重要的概念,它用于控制消费者从消息队列中获取消息的速度。
Prefetch 是指在消费者端从消息队列中获取消息之前,先获取一定数量的消息到本地缓存中,以供消费者快速处理。这样可以提高系统的吞吐量和效率,减少网络传输的开销。
具体来说,prefetch 可以帮助避免以下情况发生:
-
公平分发(Fair dispatching):在多个消费者并行处理消息时,如果没有 prefetch 限制,一个消费者可能会一次性获取到过多的消息,导致其他消费者得到较少的机会。通过设置 prefetch,可以确保每个消费者只能获取一定数量的消息,实现更公平的消息分发。
-
消费者负载均衡(Consumer load balancing):当消息队列中有大量待处理的消息时,消费者可能会因为处理速度较慢而积压消息。通过设置合适的 prefetch 值,可以限制消费者每次获取的消息数量,使得消息能够均匀地分配给多个消费者,从而实现负载均衡。
在 RabbitMQ 中,prefetch 的设置可以通过 basic.qos
方法来进行配置。例如,以下代码将设置 prefetch 数量为 10:
channel.basic_qos(prefetch_count=10)
请注意,prefetch 的设置应该根据具体的应用场景和系统负载情况进行调优。合理设置 prefetch 可以提高系统的性能和稳定性。
8.4 Exchanges(交换机)
Exchanges(交换机):列出了所有的交换机,包括名称、类型、绑定的队列和绑定的数量。
8.5 Queues(队列)
Queues(队列):显示了所有的队列,包括名称、消息数量、消费者数量等信息。您还可以通过队列进行一些操作,如创建、删除、清空等。
8.6 Admin(管理员)
Admin(管理员):提供了一些高级管理功能,如用户和权限管理、虚拟主机管理、插件管理等