手写消息队列(基于RabbitMQ)

一、什么是消息队列?

提到消息队列是否唤醒了你脑海深处的记忆?回看前面的这篇文章:《Java 多线程系列Ⅳ(单例模式+阻塞式队列+定时器+线程池)》,其中我们在介绍阻塞队列时说过,阻塞队列最大的用途就是实现 生产者消费者模型

我们知道对于生产者消费者模型来说,它具有两个十分亮眼的特点:

  1. 解耦合.
  2. 削峰填谷.

(1)解耦合
在引入生产者消费者模型之前,两台服务器之间通常是直接交互,这种交互模式使得服务器之间的耦合是非常大的。而引入生产者消费者模型之后,两台服务器之间不再进行直接通信,而是借助阻塞队列进行业务处理,起到了解耦的效果。

(2)削峰填谷
在引入生产者消费者模型之前,同样是两台服务器进行直接通信,如果在一个时间点,服务器 A 突然发送一组请求峰值,此刻服务器 B 也会随之感受到峰值,这种情况下很可能造成服务器故障。如果此时使用阻塞队列,A 将收到的请求发给队列,虽然队列中有很多请求,但是服务器 B 仍然和以按照原有的节奏读取请求。

其实正是因为生产者消费者模型具有以上诸多好处,在实际的后端开发中,特别是分布式系统里,跨主机使用生产者消费者模型是非常普遍的需求。因此通常会把阻塞队列单独分离出来,赋予更加丰富的功能,封装成一个独立的服务器程序,这个程序就称为 消息队列

二、需求整理

1、生产者消费者模型核心概念

  1. 生产者 (Producer): 负责将消息发送到消息队列中。

  2. 消费者 (Consumer): 从消息队列中接收和处理消息。。

  3. 中间人 (Broker): 它负责接收发布者发送的消息,并将这些消息存储在队列中,然后将这些消息传递给订阅者。

  4. 发布 (Publish): 生产者将消息投递到中间人的过程。

  5. 订阅 (Subscribe): 消费者在中间人这里注册的过程。只有消费者注册之后,当一个消息发布到消息队列时消息才会被发送给相应的订阅者。

根据以上概念,我们可以大致画出生产者消费者模型概念图:(PS:下面的每个模块均表示服务器)

一个生产者,一个消费者:

N 个生产者,N 个消费者:

2、Broker 设计概要

我们当前的目的是为了实现一个消息队列,其中 Broker 是最核心的部分,它主要负责消息的 存储转发,其中涉及到的核心概念如下:

  1. 虚拟主机 (VirtualHost): 类似于 MySQL 的 “database”,是⼀个逻辑上的集合。在实际的开发中一个 BrokerServer 可能会同时管理多组业务线上的数据,此时可以使用不同的 VirtualHost 进行区分。

  2. 交换机 (Exchange): 生产者把消息先发送到 Broker 的 Exchange 上,再根据不同的规则,把消息转发给不同的 Queue。

  3. 队列 (Queue): 真正用来存储消息的部分,每个消费者决定自己从哪个 Queue 上读取消息(根据订阅的队列)。⼀个 Exchange 可以绑定多个 Queue (可以向多个 Queue 中转发消息),一个 Queue 也可以被多个 Exchange 绑定
    (一个 Queue 中的消息可以来自于多个 Exchange)。

  4. 绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 “多对多” 关系。使用一个关联表就可以把这两个概念联系起来。

  5. 消息 (Message): 具体来说是服务器之间的请求和响应。一个消息,可以视为一个字符串(二进制数据),具体由程序员自定义。

上述概念在 Broker 中的体现如图所示:

补充说明1:数据存储

以上这些概念对应的数据,既需要在内存中存储,也需要在硬盘上存储,以内存为主,硬盘为辅:

  • 内存存储:对于 MQ 来说,能够高效的处理转发数据时非常关键的指标,因此使用内存组织上述数据,能够得到较高的效率。
  • 硬盘存储:主要是为了防止内存中的数据随着进程/主机重启而丢失。

补充说明:2: 交换机类型与转发规则

上面我们提到,在生产者发送消息时,首先会将消息发送到 Broker 的交换机上,再由交换机根据不同的规则转发到相应的队列中。在 MQ 中支持四种类型的交换机,它们分别是: Direct(直接交换机)、Fanout(扇出交换机)、Topic(主题交换机)、Header(头部交换机)。其中 Header 这种方式比较复杂,也比较少见,当前项目中主要实现了前三种,下面分别对他们进行详细介绍:

前要说明:

  • 以下 bindingKey(绑定键)是在创建队列和交换机绑定关系时指定的关键字。
  • 以下 routingKey(路由键)是生产者发送消息时指定的关键字。

(1)Direct(直接交换机)

  1. 生产者发送消息时,会指定一个"目标队列"的名字(此时的 routingKey 就是 队列的名字,bindingKey 无效)
  2. 交换机收到消息后,查看当前交换机对应的绑定里面是否存在队列名字为routingKey的队列
  3. 如果有,就转发过去(把消息塞进对应的“目标队列”中)
  4. 如果没有,消息直接丢弃

(2)Fanout(扇出交换机)

  1. 生产者无需指定routingKey,直接发送消息到指定交换机
  2. 交换机收到消息后,直接将消息转发给当前交换机已绑定的所有队列中。(此时的 bindingKey 和 routingKey 对扇出交换机无效。)

(3)Topic(主题交换机)

  1. 生产者发送消息时,指定一个 routingKey
  2. 交换机收到消息后,查看当前交换机对应的绑定中是否存在一个 bindingKey 通过一定的规则和 routingKey 相匹配
  3. 如果有,就将消息转发到对应的绑定队列中。
  4. 如果没有,则将消息丢弃。

PS:以上所有概念出自 AMQP 协议:一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

3、Broker 核心 API

Broker 基于以上概念和功能,需要实现的核心 API 如下:

  1. 创建队列 (queueDeclare)
  2. 销毁队列 (queueDelete)
  3. 创建交换机 (exchangeDeclare)
  4. 销毁交换机 (exchangeDelete)
  5. 创建绑定 (queueBind)
  6. 解除绑定 (queueUnbind)
  7. 发布消息 (basicPublish)
  8. 订阅消息 (basicConsume)
  9. 确认消息 (basicAck)

补充说明:

  1. 从上面可以看出来,在进行创建操作时并没有使用 create,而是使用 declare,这是从语义上说明,这里的创建起到的效果是不存在则创建,存在则啥也不做。
  2. 上述并没有创建一个“消费消息”的API,这是因为当前我们使用的工作模式是 push(推),Broker 会将消息主动推送给订阅的消费者。当然也有 pull(拉) 工作模式,需要消费者主动调用 Broker 的 API 获取消息,当前项目不涉及这种模式。
  3. 在MQ中有两种应答方式,一种是自动应答,这种方式下 Broker 将消息推送给订阅的消费者后就算应答完毕。另一种应答方式是手动应答,上述确认消息(basicAck)起到的效果,是可以让消费者 显式 的告诉 Broker,这个消息我处理完毕了,提高整个系统的可靠性。

4、客户端设计概要

生产者、消费者都是客户端程序,broker 则是作为服务器,通过网络进行通信。此处设定,使用 TCP + 自定义的应用层协议 实现 生产者/消费者 和 BrokerServer 之间的交互工作,这里需要给客户端提供一组 API,让客户端的业务代码来调用,从而通过网络通信的方式远程调用 brokerserver 上的方法。

客户端核心API:

  1. 创建 Connection
  2. 关闭 Connection
  3. 创建 Channel
  4. 关闭 Channel
  5. 创建队列 (queueDeclare)
  6. 销毁队列 (queueDelete)
  7. 创建交换机 (exchangeDeclare)
  8. 销毁交换机 (exchangeDelete)
  9. 创建绑定 (queueBind)
  10. 解除绑定 (queueUnbind)
  11. 发布消息 (basicPublish)
  12. 订阅消息 (basicConsume)
  13. 确认消息 (basicAck)

补充说明:

  1. 和 Broker 服务器 API 相比,客户端程序还提供了如下 4 个 API:创建 Connection、关闭 Connection、创建 Channel、关闭 Channel。
  2. 这里的一个Connection对象代表一个TCP连接。
  3. Channel 是 Connection 内部逻辑上的链接,多个Channel复用同一个TCP连接,一个Connection 中可以包含多个Channel,每个Channel负责客户端中不同的模块,其中传输的数据是互不相干的。
  4. 这样的设定主要是为了能够更好的复用 TCP 连接, 达到长连接的效果, 避免频繁的创建关闭 TCP 连接。
  5. 上述客户端提供的 API 只是给业务代码进行调用,真正的方法执行是交给了BrokerServer。这个过程称为 远程过程调用(Remote Procedure Call,简称RPC)是一种计算机通信协议,它允许程序调用另一个地址空间(通常是不同的计算机)中的过程或函数,而无需程序员显式编写网络代码。通过使用RPC,应用程序可以像调用本地进程一样调用远程服务器上的进程。

5、小结

最后简单总结一下,我们大致需要做的工作,其中涉及到的细节问题,我们后面在进行补充:

  1. 实现生产者、BrokerServer、消费者三个部分
  2. 针对生产者、消费者来说,主要编写客户端和服务器的网络通信部分。
  3. 重点实现BrokerServer,包括内部的基本概念和核心 API。
  4. 数据的持久化存储。

三、具体实现

附上连接:

1、消息队列详细设计与实现思维导图
2、Gitee 完整代码地址

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/165818.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

入股合作协议要不要写章程

公司章程,是注册公司的基本文件,也公司必备的规定公司组织及活动基本规则的书面文件,是公司成立的必不可少的基础,也是公司赖以生存的灵魂。那么,这次要和大家讨论的是有关于入股合作协议要不要写章程的问题了。 入股合…

windows上安装MySQL Server.

进入官网 MySQL 找到 下载,并点进入。 往下翻,找到社区下载,进入页面 选择 Mysql community Server 选择系统,下载 之后解压。 将解压文件夹下的bin路径添加到变量值中 配置初始化的my.ini文件 [mysqld] # 设置3306端口 port330…

蓝桥杯每日一题2023.11.19

题目描述 “蓝桥杯”练习系统 (lanqiao.cn) 题目分析 首先想到的方法为dfs去寻找每一个数&#xff0c;但发现会有超时 #include<bits/stdc.h> using namespace std; const int N 2e5 10; int n, cnt, a[N]; void dfs(int dep, int sum, int start) {if(dep 4){if(s…

Python 自动化(十七)ORM操作

ORM-查询操作 查询简介 数据库的查询需要使用管理器对象 objects 进行 通过 自定义模型类.objects 管理器调用查询方法 查询方法 all()方法 概念与理解 用法&#xff1a;自定义模型类.objects.all()作用&#xff1a;查询自定义模型实体中所有的数据等同于 select * fr…

【好用的个人工具】搭建一款实用的个人IT工具箱——it-tools

【好用的个人工具】搭建一款实用的个人IT工具箱——it-tools 一、it-tools介绍二、本地环境介绍2.1 本地环境规划2.2 本次实践介绍 三、本地环境检查3.1 检查Docker服务状态3.2 检查Docker版本3.3 检查docker compose 版本 四、下载it-tools镜像五、部署it-tools工具箱5.1 创建…

【机器学习】划分训练集和测试集的方法

在机器学习中&#xff0c;我们的模型建立完成后&#xff0c;通常要根据评估指标来对模型进行评估&#xff0c;以此来判断模型的可用性。而评估指标主要的目的是让模型在未知数据上的预测能力最好。因此&#xff0c;我们在模型训练之前&#xff0c;要对训练集和测试集进行划分。…

vue3项目安装eslint和prettier

【几乎最全/全网最长的 2 万 字】前端工程化完整流程&#xff1a;从头搭到尾&#xff08;vue3 vite qiankun docker tailwindcss iview......&#xff09;_前端工程化流程-CSDN博客 vue3tsvite项目中使用eslintprettierstylelinthusky指南 - 掘金 上面两篇文章相互结合操…

mybatis使用xml形式配置

以这个注解形式的查询代码为例 Select("select * from emp where name like concat(%,#{name},%) and gender #{gender} and entrydate between #{begin} and #{end} order by update_time desc ")public List<Emp> list(String name, Short gender, LocalDat…

【libGDX】使用ShapeRenderer绘制几何图形

1 ShapeRenderer 简介 ShapeRenderer 是 libGDX 中用于绘制基本形状的工具之一。它可以绘制点、线、矩形、多边形、圆形、椭圆形、扇形、立方体、圆锥体等几何图形。这对于在游戏或图形应用程序中绘制简单的形状是很有用的。 ShapeRenderer 的主要方法如下&#xff1a; 1&…

左支座零件的机械加工工艺规程及工艺装备设计【计算机辅助设计与制造CAD】

wx供重浩&#xff1a;创享日记 对话框发送&#xff1a;左支座 获取完整CAD工程源文件论文报告说明书等 一、论文目录 二、论文部分内容 设计任务 1.完成左支座零件—毛坯合图及左支座零件图 2.完成左支座零件工艺规程设计 3.完成左支座零件加工工艺卡 4.机床专用夹具装备总图 …

【LeetCode刷题-树】--1367.二叉树中的链表

1367.二叉树中的链表 方法&#xff1a;枚举 枚举二叉树中的每个节点为起点往下的路径是否与链表相匹配的路径&#xff0c;为了判断是否匹配设计了一个递归函数dfs(root,head),其中root表示当前匹配到的二叉树节点&#xff0c;head表示当前匹配到的链表节点&#xff0c;整个函数…

【寒武纪(9)】MLU架构

⼀个MLU 设备由 Memory ⼦系统、MTP&#xff08;Multi Tensor Processor&#xff09;⼦系统、Media ⼦系统等构成。MTP⼦系统是寒武纪MLU 架构的核⼼。 文章目录 TP1 架构TP2 架构TP3 1⾯向不同 MLU 架构的 Cambricon BANG 编程最佳实践1.1 Device 级异构调优指南1.2 Cluster …

Javaweb之Vue生命周期的详细解析

2.4 生命周期 vue的生命周期&#xff1a;指的是vue对象从创建到销毁的过程。vue的生命周期包含8个阶段&#xff1a;每触发一个生命周期事件&#xff0c;会自动执行一个生命周期方法&#xff0c;这些生命周期方法也被称为钩子方法。其完整的生命周期如下图所示&#xff1a; 状…

云课五分钟-0B快速排序C++示例代码-注释和编译指令

前篇&#xff1a; 云课五分钟-0ALinux文件系统及权限-查询命令如何使用 智能大模型个人感觉完全颠覆式改变了学习和教学的模式&#xff0c;知识的重要性荡然无存。 越来越需要重视思路和方法&#xff0c;创新和创意。 090A&#xff1a;接着如下 Linux基础入门的内容包括以…

Asp.net MVC Api项目搭建

整个解决方案按照分层思想来划分不同功能模块&#xff0c;以提供User服务的Api为需求&#xff0c;各个层次的具体实现如下所示&#xff1a; 1、新建数据库User表 数据库使用SQLExpress版本&#xff0c;表的定义如下所示&#xff1a; CREATE TABLE [dbo].[User] ([Id] …

阅读芯片源码(RTL)

part one 主要的原则。 一个rtl可以是这样的&#xff1a; 经常大家习惯于算法和数据结构。对于设计的部分&#xff0c;落实不一定多。 另外一个rtl也可以是这样的&#xff1a; 所以从不同的层面来讲&#xff0c;一个Rtl有不同的表述。 首先大概把所有的部分浏览一遍&#x…

麒麟系统安装找不到安装源!!!!设置基础软件仓库时出错

记录--华为RH2288 V3服务器安装麒麟系统遇到的问题 1.遇到的问题--“设置基础软件仓库时出错”报错导致无法继续安装 没办法下一步 先说结论&#xff1a;系统bug 该问题在CentOS、Rocky Linux最新版中均存在 解决&#xff1a; &#xff08;一&#xff09;、如果是外网直接配…

Linux|僵死进程

1.僵死进程产生的原因或者条件: 什么是僵死进程? 当子进程先于父进程结束,父进程没有获取子进程的退出码,此时子进程变成僵死进程. 简而言之,就是子进程先结束,并且父进程没有获取它的退出码; 那么僵死进程产生的原因或者条件就是:子进程先于父进程结束,并且父进程没有获取…

场景交互与场景漫游-交运算与对象选取(8-1)

交运算与对象选取 在面对大规模的场景管理时&#xff0c;场景图形的交运算和图形对象的拾取变成了一项基本工作。OSG作为一个场景管理系统&#xff0c;自然也实现了场景图形的交运算&#xff0c;交运算主要封装在osgUtil 工具中在OSG中&#xff0c;osgUtil是一个非常强有力的工…

SDUT OJ《算法分析与设计》贪心算法

A - 汽车加油问题 Description 一辆汽车加满油后可行驶n公里。旅途中有若干个加油站。设计一个有效算法&#xff0c;指出应在哪些加油站停靠加油&#xff0c;使沿途加油次数最少。并证明算法能产生一个最优解。 对于给定的n和k个加油站位置&#xff0c;计算最少加油次数。 I…