目录
前言
List实现消息队列
PubSub消息队列
Stream消息队列
三种实现方式对比
前言
为什么要使用Redis的消息队列?
成本低,对于RabbitMQ或是Kafka来说,已经是重量级的消息队列。
Redis的三种实现方式:
- List结构:一种有序的双向链表
- PubSub发布订阅:基于点对点的消息模型
- Stream:在Redis5.0之后提供的,比较完善的消息队列模型
List实现消息队列
我们可以利用Redis中List的命令LPUSH与RPOP来实现消息的发送与接收,但是需要注意的是,队列中没有消息时,RPOP会返回null,不会向JVM中阻塞队列一样进行阻塞并等待消息,因此这里应该使用BRPOP来实现阻塞效果。
优点:利用Redis存储,不受限于JVM内存上限。
基于Redis的持久化机制,数据安全性有保证。
可以满足消息有序性。
缺点:无法避免消息丢失。
只支持单消费者。
PubSub消息队列
是Redis2.0版本引入的消息传递模型,顾名思义,消费者可以订阅一个或多个channel,生产者向对应的channel发送消息后,所有订阅者都能收到相关信息。
PubSub消息队列的基本命令
# 订阅一个或多个频道
SUBSCRIBE channel [channel]
# 向一个频道发送消息
PUBLISH channel msg
# 订阅与pattern格式匹配的所有频道
PSUBSCRIBE pattern [pattern]
优点:采用发布订阅模式,支持多生产者,多消费者。
缺点:不支持数据持久化。
无法避免消息丢失。
消息堆积有上限,超出时数据丢失。
Stream消息队列
Stream是Redis5.0之后引入新的数据类型,支持持久化,因此相比于PubSub更加安全,可以通过Stream实现一个功能完善的消息队列
发送消息的命令:
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID filed value[filed value]
命令解释:
- key:队列名称
- NOMKSTREAM:如果队列不存在,是否自动创建队列,默认是自动创建
- MAXLEN|MINID [=|~] threshold [LIMIT count]:设置消息队列的最大消息数量
- *|ID:消息的唯一ID,*代表由Redis自动生成,格式是"时间戳-递增数字"
- field value:发送到队列的消息名称为Entry
读取消息的第一种方法
命令如下
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
命令解释:
- COUNT count:每次读取消息的最大数量
- BLOCK milliseconds:当没有消息时进行阻塞,并指定阻塞时长,如果为0则指永久阻塞
- STREAMS key:要从哪个队列读取消息
- ID:起始ID,只返回大于该ID的消息,0表示从第一个消息开始读取,$表示从最新消息开始
XREAD命令的特点:
- 消息可回溯
- 一个消息可以被多个消费者拿到
- 可以阻塞读取
- 有消息漏读的风险
读取消息的第二种方法
将多个消费者划分到一个组(Consumer Group)当中,监听同一个队列。特点如下
- 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度
- 消息标识:消费者组会维护一个标示记录最后一个被处理的消息哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
- 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移除。
创建消费者组命令:
XGROUP CREATE key groupName ID [MKSTREAM]
命令解释:
- key:队列名称
- groupName:消费者组名称
- ID:起始ID标识,$代表队列中最后一个消息,0则代表队列中第一个消息
- MKSTREAM:队列不存在时自动创建
# 删除指定的消费者组
XGROUP DESTORY key groupName
# 给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupName consumername
# 删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupname comsumername
从消费者组中读取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID[ID ...]
命令解释:
- group:消费组名称
- consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
- count:本次查询的最大数量
- BLOCK milliseconds:当没有消息时最长等待时间
- NOACK:无需手动ACK,获取到消息后自动确认
- STREAMS key:指定队列名称
- ID:获取消息的起始ID。" > "表示从下一个未消费的消息开始。其它则是根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
Group类型的消息队列特点:
- 消息可回溯
- 可以多消费者争抢消息
- 可以阻塞读取
- 没有消息漏读风险
- 有消息确认机制,保证消息至少被消费一次
三种实现方式对比
LIST | PubSub | Stream | |
消息持久化 | 支持 | 不支持 | 支持 |
阻塞读取 | 支持 | 支持 | 支持 |
消息堆积处理 | 受限于内存空间,可以利用多消费者加快处理 | 受限于消费者缓冲区 | 受限于队列长度,可以利用消费者组提高消费速度,减少堆积 |
消息确认机制 | 不支持 | 不支持 | 支持 |
消息回溯 | 不支持 | 不支持 | 支持 |