目录
引言
生产者消费者模型作用
消息队列核心概念
Broker Server 内部关键概念
Broker Server 核心 API
交换机(Exchange)类型
关于持久化
关于网络通信
总结
引言
问题:
- 什么是消息队列(Message Queue / MQ)?
回答:
- 阻塞队列(Blocking Queue)-> 生产者消费者模型(是在一个进程内部进行的)
- 消息队列就是将阻塞队列这样的数据结构,单独提取成了一个程序进行独立部署 -> 生产者消费者模型(进程和进程之间 / 服务和服务之间)
注意:
- 分布式系统 整个服务器程序不是一个单一的程序,而是由一组服务器构成的 集群
生产者消费者模型作用
1、实现了发送方和接收方之间的 解耦
- 如上图所示,服务器A 调用服务器B
- A 将请求转发给 B 处理,B 处理完将结果反回给 A,即 A 和 B 之间的耦合是比较大的
- 如果 A 要调用 B ,则 A 务必要知道 B 的存在
- 如果 B 挂了,则很容易引起 A 的 bug
- 如果要是再加一个服务器C,此时也需要对服务器A 进行修改
- 因此就需要针对 A 重新修改代码、重新测试、重新发布、重新部署 等,十分麻烦!
- 引入消息队列后,A 将请求发给消息队列,B 再从消息队列中获取到请求
- 此时 A 和 B 之间的耦合就降低了很多
- A 并不知道 B,A 只知道队列,即 A 中的代码没有任何一行与 B 相关
- B 也不知道 A,B 只知道队列,即 B 中的代码没有任何一行与 A 相关
- 如果 B 挂了,对 A 没有任何影响,因为只要队列在,A 仍可以继续给队列插入元素,如果队列满了,直接阻塞就行了
- 如果 A 挂了,对 B 没有任何影响,因为只要队列在,B 仍可以从队列中取元素,如果队列空了,直接阻塞就行了
- 同时当我们像要新增一个服务器C 来作为消费者时,对于 A 来说是无感知的
2、可以做到 削峰填谷,保证系统的稳定性
- 我们进行服务器开发,也和上述这个模型是非常相似的
- 上游就是用户发送的请求,下游就是一些执行具体业务的服务器
- 用户发多少请求是不可控的
具体理解:
- 比如 A 为入口服务器,A 调用 B 完成一些具体的任务
- 如果 A 与 B 直接通信,且 A 突然收到一组用户请求的峰值,此时 B 将随之感受到峰值
- 引入消息队列后,A 将请求发给队列,B 从队列中获取请求
- 虽然 A 收到的请求很多,队列收到的请求也不少,但是 B 仍可按照原有节奏来取请求,不至于说一下就收到太多的并发量
- 市面上一些比较知名的 mq:RabbitMQ、Kafka、RocketMQ、ActiveMQ
注意:
- 这些 mq 大同小异
消息队列核心概念
一个生产者 + 一个消费者
多个生产者 + 多个消费者
- 生产者(Producer):发布消息的客户端应用程序
- 消费者(Consumer):订阅消息的客户端应用程序,用于处理生产者的消息
- 中间人(Broker):消费者拿生产者的消息时,需经过中间人
- 发布(Publish):生产者向中间人投递消息的过程
- 订阅(Subscribe):消费者从中间人获取消息的前提为 先订阅消息
- 消费(Consume):消费者从中间人这里取数据的操作
注意:
- 图上画均为服务器!
Broker Server 内部关键概念
虚拟主机(Virtual Host)
- 类似于 MySQL 中的 database,算是一个 "逻辑" 上的数据集合
- 实际开发中,一个 Broker Server 上可组织多种不同类别的数据,即可能同时管理多组 业务线上的数据
- 此时便可以使用 Virtual Host 做出逻辑上的区分
交换机(Exchange)
- 实际上,当生产者将消息投递给 Broker Server 时,是先将消息交给了 Broker Server 上的某个交换机,再由交换机把消息转发给对应的队列
队列(Queue)
- 真正用来存储处理消息的实体,后续消费者也是从对应的队列中取数据
- 一个大的消息队列中,可以有很多具体的小的队列
绑定(Binding)
- 将交换机和队列之间建立起关联关系
- 可以把交换机和队列视为是 类似于 数据库 中的 多对多 这样的关系
- 一个交换机,可以对应到多个队列
- 一个队列,也可以对应多个交换机
- 在数据库中,标识多对多关系,会使用一个中间表 / 关联表
- 而在 mq 中,也是有一个这样的中间表的,所谓的 绑定 其实就是中间表中的一项
消息(Message)
- 具体来说,可以认为 服务器A 给 B 发送的请求(通过 mq 转发),就是一个消息
- 服务器B 给 A 返回的响应(通过 mq 转发),也是一个消息
- 一个消息可视为是一个 字符串(二进制数据)
- 消息中具体包含啥样的数据,都是程序员自定义的
- RabbitMQ 就是按照上述概念来组织的(基于 AMQP 协议)
Broker Server 核心 API
- 创建队列(queueDeclare)
- 销毁队列(queueDelete)
- 创建交换机(exchangeDeclare)
- 销毁交换机(exchangeDelete)
- 创建绑定(queueBind)
- 解除绑定(queueUnbind)
- 发布消息(basicPublish)
- 订阅消息(basicConsume)
- 确认消息(basicAck)
问题一:
- 创建 为啥不使用 Create 这样的术语,而是使用 Declare ?
回答:
- Create 仅表示单纯的创建
- Declare 起到的效果为 不存在则创建,存在就啥都不做了
问题二:
- 我们为啥不要搞一个 api,叫做 "消费消息" 呢?
- 让消费者通过该 api 从服务器上取走消息
回答:
- 在当前项目中,并不打算搞一个 "消费消息" 的 api
- mq 与 消费者之间有两种工作模式:
- Push(推):Broker 将收到的数据主动发送给订阅的消费者
- Pull(拉):消费者主动调用 Broker 的 api 获取数据
- 而 RabbitMQ 仅支持 Push 的方式(Kafka 就能支持 Pull)
注意点一:
- 确认消息(basicAck) 所起到的效果,是可以让消费者显式的告诉 Broker Server,这个消息我已经处理完毕了
- 用于提高整个系统的可靠性,以保证消息处理没有遗漏
注意点二:
- 消息应答模式有两种
- 自动应答:消费者将该消息取走了,便算作应答(相当于没应答)
- 手动应答:basicAck 方法属于手动应答(消费者需要主动调用这个 api 来进行应答)
注意点三:
- 对于 RabbitMQ 来说,除了提供肯定的确认,还提供了否定的确认
- 但此处我们主要实现肯定确认,就不实现否定确认了
注意点三:
- 此处的项目是以 RabbitMQ 作为蓝本的
- 即上述 API 名称以及用法均参考的 RabbitMQ
交换机(Exchange)类型
- 交换机在转发消息时,是有着一套转发规则的!
- 此处提供了几种不同的 交换机类型(Exchange Type)来描述这里的不同的转发规则
- RabbitMQ 主要实现了 四种交换机类型(也是 AMQP 协议定义的)
直接交换机(Direct )
- 生产者发送消息时,会先指定一个 目标队列 的名字
- 交换机收到之后,就看看绑定的队列里,有没有能够匹配的队列
- 如果有,则将消息塞进对应的队列中转发过去
- 如果没有,则直接丢弃掉该消息
扇出交换机(Fanout )
主题交换机(Topic )
- 有两个关键概念
- bindingKey:把队列和交换机绑定的时候,指定一个单词(像是个暗号一样)
- routingKey:生产者发送消息的时候,也指定一个单词
- 如果当前 routingKey 和 bindingKey 能够对上暗号了
- 此时就可把这个消息转发到对应的队列中了
- 咱们项目仅实现上述这三种交换机类型
关于持久化
- 虚拟主机、交换机、队列、绑定、消息等,这些概念对应的数据都需要让 Broker Server 组织管理并存储起来
- 此时内存和硬盘上都会各自存储一份,以内存为主,硬盘为辅
在内存中存储的原因:
- 对于 mq 来说,能够高效的转发处理数据,是非常关键的指标
- 因此使用内存来组织上述数据得到的效率,就比放硬盘中要高很多!
在硬盘中存储的原因:
- 为了防止内存中的数据随着 进程重启 或 主机重启 而丢失
注意:
- 硬盘上是能持久存储,但这个持久是相对于 内存 的
- 对于一个硬盘来说存储消息的寿命,一般为 几年到十几年(一直不通电的情况下)
关于网络通信
- 其他的服务器(生产者 / 消费者)通过网络与 Broker Server 进行交互
- 此处设定使用 TCP + 自定义的应用层协议 实现生产者 / 消费者和 Broker Server 之间的交互工作
- 自定义的应用层协议 要做的主要工作就是让客户端可以通过网络调用 Broker Server 提供的编程接口
- 因此,在客户端这边也需要提供对应的上述这些方法
- 如上图所示,服务器版本的方法为真正干实事的,即 将管理数据进行调整
- 而客户端版本的方法,则只是发送请求 / 接收响应的
具体理解:
- 此处客户端调用了一个本地方法,结果该方法的背后,又给服务器发来一系列消息,由服务器完成了一系列工作
- 站在调用者的角度,只是看到了这个功能已经完成了,却不知道这背后的细节
- 虽然调用的是一个本地方法,但实际上好像调用另一个远端服务器的方法一样
- 此处可视为是编写客户端服务器程序,通信过程的一种设计思想,即 远程过程调用(RPC)
- 客户端除了提供下方这 9 个和服务器对应的方法外
- 客户端还需再提供 4个方法来支撑其他工作
- 创建 Connection
- 关闭 Connection
- 创建 Channel
- 关闭 Channel
注意点一:
- 一个 Connection 对象,就代表一个 TCP 连接
注意点二:
- Channel ——> 通道/信道
- 一个 Connection 里面可以包含多个 Channel
- 每个 Channel 上面传输的数据均 互不相干
问题:
- 为什么有了 Connection,还要搞一个 Channel ?
回答:
- TCP 建立 / 断开一个连接的成本,其实还是比较高的
- 因此,很多时候并不希望频繁的建立 / 断开 TCP 连接
- 此处的 Channel 仅为一个逻辑上的概念,比起 TCP 连接的建立和断开 要轻量得多
- Connection 和 Channel 之间的关系就如图网线一样
总结
- 细化一下具体要做哪些工作?
- 需实现 生产者、Broker Server、消费者 这三个部分
- 针对生产者和消费者来说,主要编写的是 客户端和服务器 的网络通信部分,给客户端提供一组 api 让客户端的业务代码来调用,从而通过网络通信的方式远程调用 Broker Server 上的方法
- 实现 Broker Server 以及 Broker Server 内部的一些基本概念和核心 api【重点】
- 上述这些关键数据,如何在硬盘中存储?以啥格式存储?是存储在数据库还是文件中?后续服务器重启了,如何读取上述数据,并将内存中的内容给恢复回来呢?【持久化】
注意:
- 生产者 的数据从哪来?消费者取到数据之后要干啥?
- 生产者与消费者具体的业务逻辑都是通用的,无需太多关心
- 上述工作的最终目标,就是实现一个 分布式系统下 的生产者消费者模型
- 但是此处我们的 Broker Server 并不支持分布式部署(集群模式)
- 我们此处实现的仅为一个能够给多个生产者消费者提供服务的单机 Broker Server
- 但是专业的 mq ,如 RabbitMQ、kafka 这些均支持集群模式,可用性更高、可处理更高的并发、数据能够相互备份