redis消息队列
redis可以直接实现消息队列,无需学习别的技术
list——本质是链表,数据存储
启动同一个IP和端口的2台客户端,一边阻塞弹出,一边添加元素
在20s内,如果有元素就弹出,没有元素就等待,超过20s就结束等待
优点:先进先出,消息有序
没有内存上限
有持久化机制,保证数据安全
缺点:
因为pop是remove and get,可能导致消息丢失,刚取出消息没来得及处理就宕机了
只能被一个消费者处理
pubsub——不可靠
-- 订阅一个或多个频道
subscribe
-- 发送消息给一个频道
publish
-- 订阅格式匹配的频道
psubscribe
通配符
?一个字符
*0个或多个
[]指定字符
再启动一个客户端
确保都操作数据库0——select 0
我是先发送再订阅,所以msg1没有收到,但是订阅之后再发的msg2、msg3都被收到了,而且消费者会一直处于订阅就是接收消息状态,天生就是阻塞式的
pmessage order.* order.queue msg2表示订阅的order.*频道接收到了order.queue频道发布的msg2消息
pmessage order.queue order.queue msg2表示订阅的order.queue频道接收到了order.queue频道发布的msg2消息
发布消息的返回值就是订阅该频道的消费者数量!!
所以一开始发布消息时没消费者订阅该频道,返回值是0,后面的是2,最后一个发布到one,只有一个消费者订阅
优点:
支持多个生产者,多个消费者
缺点:
不支持数据持久化:本身就是消息发送,不是数据存储,不支持持久化
无法避免消息丢失:如果发布的消息没有消费者订阅,则直接丢失
消息堆积有上限:发布消息时有消费者订阅,则会存储到消费者缓冲区,缓存空间有上限
不太可靠!!
stream
-- 添加消息
xadd 队列名称 默认自动创建 消息上限 消息id 字段 值
-- 消息长度
xlen 队列名称
-- 读取消息,阻塞时长为0表示一直阻塞
xread 读取数量 是否阻塞 阻塞时长 streams 队列名称 读取起始id
循环读取最新消息时可能有漏读,在处理消息时,收到了很多条,但是下次只会处理最新的,造成漏读
同一个消息可以被不同消费者反复读取,数据持久化
可以阻塞读取
stream消费者组——解决漏读
只要在消费者组就是竞争关系,处理消息速度加快,避免消息堆积
记录最后一个被处理的消息,有点像PC程序计数器,解决漏读
消费者接受到消息后存入pending-list,当处理完后,移除,解决消息丢失
如何让消费者拿到我想发送给他的消息?
订阅多个频道,比如我想拿到order.queue,又想拿到order.one,为了确保该消费者一定能竞争到queue,就订阅order.queue和order.*
pending-list和消费者缓冲区有什么区别?
xgroup create 队列名称 消费者组名称 起始id 是否自动创建(默认false)
xreadgroup group 消费者组名称 消费者名称 读取数量 是否阻塞 阻塞时长 自动确认 streams 队列名称 起始id
一个消息只能被读取一次
以下是正常情况,获取消息并返回ack
如果收到消息就宕机了,没来及ack则会存储在pending-list中,如何查看pending-list勒?
-和+表示从最小到最大的id
读取消息时先用>,读取未消费的消息。如果ack过程中出现异常,则查看pending-list中的消息,然后用0,读取已经消费但是没有Ack的消息