前言:前面一章内容太多,写了kafka,这里就写一下同类产品rabbitmq,rabbitmq内容较少,正好用来过度一下,概念还是会用一些例子来说明,实际部署的内容会放在概念之后。
1、基础概念
1.1、MQ消息队列简介
MQ 全称为Message Queue, 消息队列。是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间,通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信。队列的使用除去了接收和发送应用程序,同时执行的要求。 在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式,大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
想象你在一家快餐店里点餐。通常情况下,你会排队等待点餐和取餐,这可能会花费很长时间,特别是在高峰时段。现在,快餐店引入了一个新的系统:你可以在点餐时拿到一个号码牌,然后继续坐下或者做其他事情。当你的餐点准备好时,店员会按照号码牌号叫你去取。这样一来,你不必在柜台前等待,可以自由地做其他事情,而店员也能够更高效地处理点餐和准备食物,避免了大家在柜台前的拥堵和等待。
在这个比喻中:
- 号码牌就像消息队列中的消息,每个点餐的顾客(应用程序)都有一个唯一的号码(消息),用来标识自己的订单(任务)。
- 店员就像消息队列的处理程序,负责处理顾客(应用程序)的订单(任务),并通知顾客(应用程序)何时可以取餐(任务完成)。
- 等待时间减少,就像消息队列能够减少应用程序之间直接等待的时间,提高系统的效率和吞吐量
1.2、消息队列核心功能
解耦(将不同的系统分离开):避免操作等待时长不同的俩个系统互相浪费资源
冗余(存储):主要是用于存储请求
扩展性:扩充消息队列应对可能的数据流
削峰:核心功能,即时流量,闲时处理
可恢复性:数据丢失可以恢复
顺序保证:保证访问顺序
缓冲:作为大量数据(消息)的暂存点
异步通信:减少应用程序之间直接等待的时间
1.3、消息队列分类
1)P2P模式(安全)
Point-to-Point(P2P)点到点,P2P模式包含三个角色:
1.消息队列(Queue)
2.发送者(Sender)
3.接收者(Receiver)。
每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时。 P2P的特点如下:
• 每个消息只有一个消费者(Consumer),即一旦被消费,消息就不再在消息队列中
• 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行它不会影响到消息被发送到队列 • 接收者在成功接收消息之后,需向队列应答成功。
• 如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式
2)Pub/Sub模式(并发)
Pub/Sub模式包含三个角色:
主题(Topic)
发布者(Publisher)
订阅者(Subscriber) 。
多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。Pub/Sub的特点如下:
• 每个消息可以有多个消费者 • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息
• 为了消费消息,订阅者必须保持运行的状态 • 如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型
主要是应对超高并发,成本远高于第一种。
1.4、同类产品
Kafka:属于Apache,上一章内容
RabbitMQ:RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现,RabbitMQ比Kafka可靠。
RocketMQ:RocketMQ是阿里开源的消息中间件,它是纯Java开发,阿里用的多。
1.5、RabbitMQ简介
RabbitMQ是一个在AMQP(Advanced Message Queuing Protocol )基础上实现的,可复用的企业消息系统。它可以用于大型软件系统,各个模块之间的高效通信,支持高并发,支持可扩展。它支持多种客户端如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。它同时实现了一个Broker构架,这意味着消息在发送给客户端时先在中心队列排队,对路由(Routing)、负载均衡(Load balance)或者数据持久化都有很好的支持。
1.6、RabbitMQ架构图示
RabbitMQ从整体上来看是一个典型的生产者消费者模型,主要负责接收、存储和转发消息
1.7、基础术语
这部分内容在工作中基本用不上,只是辅助理解一下
Message (消息):消息由消息头和消息体组成。消息体是不透明的,包含一些可选属性,如路由键 (routing-key)、优先级 (priority)、持久性存储 (delivery-mode) 等。
Publisher (消息生产者):发布者是向交换器发送消息的客户端程序,例如一个Java或PHP程序。
Exchange (交换器):交换器接收发布者发送的消息,并根据规则将消息路由到一个或多个队列中去。消息必须经过交换器才能进入队列。
Queue (消息队列):队列用于存储消息,直到消费者连接并将其取走。队列是消息的容器和终点,可以由一个或多个发布者投递消息。
Binding (绑定):绑定定义了交换器和队列之间的关联规则。基于路由键,绑定将消息从交换器路由到相应的队列中。
Virtual Host (虚拟主机):虚拟主机是独立的消息服务器区域,包含交换器、队列及相关对象。每个虚拟主机有自己的身份认证和加密环境,可以看作是RabbitMQ的一个小型实例。
Broker (代理):代理是RabbitMQ的服务器实体,类似于在Linux上创建的虚拟机。
Connection (连接):连接是RabbitMQ服务器和应用程序服务之间的TCP连接。
Channel (信道):信道是在TCP连接内创建的虚拟连接,用于在服务器和客户端之间传递消息。一条TCP连接可以包含多个信道,用于并行地发送、接收和订阅消息。
Consumer (消息消费者):消费者是从消息队列中获取消息的客户端应用程序,与生产者类似,是处理消息的程序。
消息是像是装在信封里的邮件,它由消息头和消息体组成。消息体是不透明的,包含了一些可选属性,如邮件的送达地址、优先级别、是否需要特殊保护等。
发布者就像是寄信人,负责将信封(消息)投递到邮箱(交换器)里。你可以把发布者想象成是发送电子邮件的Java或PHP程序。
交换器是一个智能的邮局,它接收发布者寄来的信件(消息),然后根据地址(路由键)将这些信件发送到正确的邮箱(队列)里。消息必须先经过交换器,才能最终进入队列中等待被取走。
队列就像是一个装满信件的邮箱,它保存信件直到有消费者来取走。队列不仅是存储消息的地方,也是消息的目的地。一个消息可以被寄到一个或多个邮箱里,这取决于发布者的设定。消息会一直留在队列里,直到有消费者连接到队列并把它取走为止。
绑定就像是一条明确的送达规则,连接了邮箱(队列)和寄信人(交换器)。通过绑定,可以根据地址(路由键)将交换器和队列联系起来,使交换器像是一个根据地址送信的邮递员。
虚拟主机就像是一片独立的邮政区域,其中包含了若干个交换器、队列和相关的物件。每个虚拟主机都有自己独特的身份认证和安全加密环境。在RabbitMQ中,虚拟主机可以看作是一个完整的消息服务器,拥有自己的邮箱、邮递员和规则。
代理可以理解为RabbitMQ的实际运行服务器,就像是在计算机系统中创建的一个虚拟邮局。
连接是建立在RabbitMQ服务器和应用程序服务之间的通道,就像是打开了一条电线,让信件可以在两者之间传递。
信道是连接中的虚拟通道,可以理解为一条独立的光纤。在一条连接上可以创建多个信道,每个信道独立处理消息的发送、接收和订阅。
消费者就像是一个专门去邮箱里取信的人,它表示一个从队列中取走消息的客户端程序。消费者和发布者类似,只不过它们的工作是接收和处理消息。
1.8、RabbitMQ的通信过程
这是关于RabbitMQ通信过程的整理:
-
生产者 P1 发送消息: P1生产消息并将其发送到RabbitMQ服务器的 Exchange。
-
Exchange 接收消息: Exchange接收到消息后,根据消息的 ROUTING KEY 将消息路由到匹配的 Queue1。
-
Queue1 接收消息: Queue1收到消息并将其存储,准备发送给订阅者。
-
消费者 C1 订阅消息: 订阅者 C1 已经订阅了 Queue1 中的消息。
-
C1 接收消息: Queue1将消息发送给订阅者 C1,C1接收到消息并进行处理。
-
C1 发送 ACK 确认: C1处理完消息后,向 Queue1 发送 ACK 确认消息,表示已成功接收并处理了消息。
-
Queue1 收到 ACK: Queue1收到 ACK 后,确认消息已成功传递给订阅者,并可以安全地删除队列中缓存的此条消息。
这个流程展示了RabbitMQ中消息从生产者到消费者的完整传递过程,包括消息的路由、存储和确认处理。
2、单机部署RabbitMQ
2.1、基础环境
环境已更换国内源与epel源,三台之间域名解析已配置,防火墙和selinux已关闭。
主机名需配置完毕重新加载,否则后续无法设置节点!!!
IP | 角色/主机名/域名 |
192.168.188.128 | rabbitmq1 |
192.168.188.129 | rabbitmq2 |
192.168.188.130 | rabbitmq3 |
2.2、安装Erlang
这里官网推荐的方式为配置erlang源,然后使用yum下载,这种方式适用于外网环境较稳定的操作环境
[root@rabbitmq1 ~]# wget https://objects.githubusercontent.com/github-production-release-asset-2e65be/47679505/a5e9b597-d9aa-44a2-960f-670142bae779?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=releaseassetproduction%2F20240705%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240705T132027Z&X-Amz-Expires=300&X-Amz-Signature=4b4e673b94952cf24ce0086e6a957ea2e584553fda51bacae46a8031b502b189&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=47679505&response-content-disposition=attachment%3B%20filename%3Derlang-23.3.4.18-1.el7.x86_64.rpm&response-content-type=application%2Foctet-stream
[root@rabbitmq1 ~]# yum install -y erlang-23.3.4.18-1.el7.x86_64.rpm
2.3、安装RabbitMQ
这里有时候会断连,多试几次
[root@rabbitmq1 ~]# wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.13/rabbitmq-server-3.7.13-1.el7.noarch.rpm
[root@rabbitmq1 ~]# yum install -y rabbitmq-server-3.7.13-1.el7.noarch.rpm
2.4、修改配置文件
[root@rabbitmq1 ~]# cp /usr/share/doc/rabbitmq-server-3.7.13/rabbitmq.config.example /etc/rabbitmq/rabbitmq.config
[root@rabbitmq1 ~]# vim /etc/rabbitmq/rabbitmq.config
# 去掉注释%%和逗号,如下图所示,此处为开启默认账号
2.5、安装插件并启动服务
安装启动rabbitmq管理插件,启动rabbitmq,查看节点状态
[root@rabbitmq1 ~]# rabbitmq-plugins enable rabbitmq_management
Enabling plugins on node rabbit@rabbitmq1:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@rabbitmq1...
The following plugins have been enabled:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
set 3 plugins.
Offline change; changes will take effect at broker restart.
[root@rabbitmq1 ~]# systemctl restart rabbitmq-server
[root@rabbitmq1 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@rabbitmq1 ...
[{nodes,[{disc,[rabbit@rabbitmq1]}]},
{running_nodes,[rabbit@rabbitmq1]},
{cluster_name,<<"rabbit@rabbitmq1">>},
{partitions,[]},
{alarms,[{rabbit@rabbitmq1,[]}]}]
2.6、使用浏览器访问
访问rabbitmq服务器的15672端口,默认用户名/密码:guest/guest
2.7、web页面管理
这里为开发人员使用,运维尽量不要去这个页面,这里需要开发人员根据项目需要配置,运维只负责搭建环境!!!
添加用户
2.8、 使用命令行进行管理
执行以下操作
[root@rabbitmq1 ~]# rabbitmqctl list_queues
# 查看所有的队列
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
[root@rabbitmq1 ~]# rabbitmqctl reset
# 清除所有的队列
Error: this command requires the 'rabbit' app to be stopped on the target node. Stop it with 'rabbitmqctl stop_app'.
Arguments given:
reset
Usage:
rabbitmqctl [-n <node>] [-l] [-q] reset
[root@rabbitmq1 ~]# rabbitmqctl add_user yonghuming zheshimima
# 添加用户,语法看命令拼音部分
Adding user "yonghuming" ...
[root@rabbitmq1 ~]# rabbitmqctl set_user_tags yonghuming administrator
# 分配角色为管理员
Setting tags for user "yonghuming" to [administrator] ...
返回web页面查看新增账号
授权用户操作步骤如下
[root@rabbitmq1 ~]# rabbitmqctl add_vhost xunizhujimingzi
# 新增虚拟主机
Adding vhost "xunizhujimingzi" ...
[root@rabbitmq1 ~]# rabbitmqctl set_permissions -p xunizhujimingzi yonghuming ".*" ".*" ".*"
Setting permissions for user "yonghuming" in vhost "xunizhujimingzi" ...
# 将新虚拟主机授权给新用户,后面三个”*”代表用户拥有配置、写、读全部权限
返回web页面查看
查看虚拟主机选项卡
3、RabbitMQ集群部署
3.1、简介
消息中间件RabbitMQ,一般以集群方式部署, 主要提供消息的接受和发送,实现各微服务之间的消息异步。可以与负载均衡结合形成RabbitMQ+HA架构
3.2、原理介绍
1)cookie
RabbitMQ底层是通过Erlang架构来实现的,所以rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统连接RabbitMQ节点,在连接过程中需要正确的Erlang Cookie和节点名称,Erlang节点通过交换Erlang Cookie以获得认证来实现分布式,所以部署Rabbitmq分布式集群时要先安装Erlang,并把其中一个服务的cookie复制到另外的节点。
2)内存节点和磁盘节点
RabbitMQ集群中,各个RabbitMQ为对等节点,即每个节点均提供给客户端连接,进行消息的接收和发送。节点分为内存节点和磁盘节点,区别是消息存储的位置不同,一般都建立为磁盘节点,为了防止机器重启后的消息消失;
3)普通模式和镜像模式
RabbitMQ的传统集群模式一般分为两种,普通模式和镜像模式。消息队列通过RabbitMQ HA镜像队列进行消息队列实体复制。
普通模式
普通模式下,以两个节点(rabbit01、rabbit02)为例来进行说明。对于消息队列来说,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit01、rabbit02间进行消息传输,把A中的消息实体取出并经过B发送给consumer。所以consumer应尽量连接每一个节点,从中取消息。即对于同一个逻辑队列,要在多个节点建立物理Queue。否则无论consumer连rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈
镜像模式
镜像模式下,将需要消费的队列变为镜像队列,存在于多个节点,这样就可以实现RabbitMQ的HA高可用性。作用就是消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样,在consumer消费数据时临时读取。缺点就是,集群内部的同步通讯会占用大量的网络带宽。
3.3、环境要求
1、所有节点需要再同一个局域网内;
2、所有节点需要有相同的 erlang cookie,否则不能正常通信,为了实现cookie内容一致,采用scp的方式进行。
3、准备三台虚拟机,配置相同
4、集群中所有节点都需要hosts文件解析
使用上一个示例的环境继续操作
三台全部安装安装Erlang,安装rabbitmq,修改配置文件同上一示例
3.4、安装插件并启动服务
[root@rabbitmq2 ~]# rabbitmq-plugins enable rabbitmq_management
[root@rabbitmq3 ~]# rabbitmq-plugins enable rabbitmq_management
此处不要启动!!!一定要保证三台机器的cookie内容一致
找到erlang cookie文件的位置,源码包部署一般会存在.erlang.cookie文件;rpm包部署一般是在/var/lib/rabbitmq/.erlang.cookie。将 node1 的该文件使用rsync或者是scp复制到 node2、node3,文件权限需要是400。
[root@rabbitmq1 ~]# scp -r /var/lib/rabbitmq/.erlang.cookie rabbitmq2:/var/lib/rabbitmq/.erlang.cookie
[root@rabbitmq1 ~]# scp -r /var/lib/rabbitmq/.erlang.cookie rabbitmq3:/var/lib/rabbitmq/.erlang.cookie
[root@rabbitmq1 ~]# cat /var/lib/rabbitmq/.erlang.cookie
# 此处必须检查是否完全一致
[root@rabbitmq2 ~]# cat /var/lib/rabbitmq/.erlang.cookie
[root@rabbitmq3 ~]# cat /var/lib/rabbitmq/.erlang.cookie
启动rabbitmq
[root@rabbitmq1 ~]# systemctl restart rabbitmq-server
[root@rabbitmq2 ~]# systemctl restart rabbitmq-server
[root@rabbitmq3 ~]# systemctl restart rabbitmq-server
仅关闭2,3节点的对外连接(集群功能),不关闭rabbitmq服务
[root@rabbitmq2 ~]# rabbitmqctl stop
[root@rabbitmq3 ~]# rabbitmqctl stop
将2,3暂时设置为独立节点运行
[root@rabbitmq2 ~]# rabbitmq-server -detached
[root@rabbitmq3 ~]# rabbitmq-server -detached
# 忽略此处pid文件不可写的警告,这个警告不影响后续操作
查看各个节点状态
[root@rabbitmq1 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@rabbitmq1 ...
[{nodes,[{disc,[rabbit@rabbitmq1]}]},
{running_nodes,[rabbit@rabbitmq1]},
{cluster_name,<<"rabbit@rabbitmq1">>},
{partitions,[]},
{alarms,[{rabbit@rabbitmq1,[]}]}]
# 2,3此处省略。每台主机看到的只有一个的server信息。目前尚未组合成集群
添加用户并为其授权,此时每个节点是单独的一台RabbitMQ,下面来将他们组成集群。
1
[root@rabbitmq1 ~]# rabbitmqctl add_user admin admin
Adding user "admin" ...
User "admin" already exists
[root@rabbitmq1 ~]# rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/" ...
[root@rabbitmq1 ~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
[root@rabbitmq1 ~]#
2
[root@rabbitmq2 ~]# rabbitmqctl add_user admin admin
Adding user "admin" ...
[root@rabbitmq2 ~]# rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/" ...
[root@rabbitmq2 ~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
[root@rabbitmq2 ~]#
3
[root@rabbitmq3 ~]# rabbitmqctl add_user admin admin
Adding user "admin" ...
[root@rabbitmq3 ~]# rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
Setting permissions for user "admin" in vhost "/" ...
[root@rabbitmq3 ~]# rabbitmqctl set_user_tags admin administrator
Setting tags for user "admin" to [administrator] ...
[root@rabbitmq3 ~]#
最后一步组成集群
rabbitmq-server 启动时,会一起启动:节点和应用,它预先设置RabbitMQ应用为standalone(脱机)模式。要将一个节点加入到现有的集群中,你需要停止这个应用,并将节点设置为原始状态。如果使用rabbitmqctl stop,应用和节点都将被关闭。所以使用rabbitmqctl stop_app仅仅关闭应用。(停应用,不停止节点)
此处以磁盘节点示例
2
[root@rabbitmq2 ~]# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@rabbitmq2 ...
[root@rabbitmq2 ~]# rabbitmqctl join_cluster rabbit@rabbitmq1
Clustering node rabbit@rabbitmq2 with rabbit@rabbitmq1
[root@rabbitmq2 ~]# rabbitmqctl start_app
Starting node rabbit@rabbitmq2 ...
completed with 3 plugins.
3
[root@rabbitmq3 ~]# rabbitmqctl stop_app
Stopping rabbit application on node rabbit@rabbitmq3 ...
[root@rabbitmq3 ~]# rabbitmqctl join_cluster rabbit@rabbitmq1
Clustering node rabbit@rabbitmq3 with rabbit@rabbitmq1
[root@rabbitmq3 ~]# rabbitmqctl start_app
Starting node rabbit@rabbitmq3 ...
completed with 3 plugins.
此时集群状态
[root@rabbitmq1 ~]# rabbitmqctl cluster_status
Cluster status of node rabbit@rabbitmq1 ...
[{nodes,[{disc,[rabbit@rabbitmq1,rabbit@rabbitmq2,rabbit@rabbitmq3]}]},
{running_nodes,[rabbit@rabbitmq3,rabbit@rabbitmq2,rabbit@rabbitmq1]},
{cluster_name,<<"rabbit@rabbitmq1">>},
{partitions,[]},
{alarms,[{rabbit@rabbitmq3,[]},{rabbit@rabbitmq2,[]},{rabbit@rabbitmq1,[]}]}]
正常工作中此时移交admin/admin账号给开发组即可
4、Rabbitmq+HAproxy
此处保留haproxy配置文件模板供复制粘贴使用
global
log 127.0.0.1 local2
chroot /var/lib/haproxy
pidfile /var/run/haproxy.pid
maxconn 4000
user haproxy
group haproxy
daemon
stats socket /var/lib/haproxy/stats
defaults
log global
mode tcp
option tcplog
option dontlognull
retries 3
option redispatch
maxconn 2000
contimeout 5s
clitimeout 120s
srvtimeout 120s
listen rabbitmq_cluster 0.0.0.0:80#作为代理的服务器的IP和端口
mode tcp
balance roundrobin
server rabbit1 192.168.188.128:15672 check inter 5000 rise 2 fall 2
server rabbit2 192.168.188.129:15672 check inter 5000 rise 2 fall 2
server rabbit3 192.168.188.130:15672 check inter 2000 rise 2 fall 3
listen monitor
bind 0.0.0.0:8100#监控页面的访问端口
mode http
option httplog
stats enable
stats uri /rabbitmqstats
stats refresh 30s
stats auth admin:admin