一、概念
Redis Streams是Redis5.0提供的一种消息队列机制,支持多播的可持久化的消息队列,用户实现发布订阅的功能,借鉴了kafka设计。
二、常用命令
命令名称 | 描述 |
XADD key ID field value [field value ...] | 添加一条消息
|
XREAD [COUNT count] STREAMS key [ID] | 从一个或多个 Stream 中读取消息。
|
XREADGROUP GROUP group consumer [COUNT count] STREAMS key [ID] | 从消费者组中读取消息。
|
XGROUP CREATE key groupname ID | 创建一个新的消费者组。
|
XGROUP DESTROY key groupname | 删除一个消费者组 |
XGROUP SETID key groupname ID | 修改消费者组的起始消费位置 |
XACK key groupname ID [ID ...] | 确认消息已被成功处理。
|
XPENDING key groupname | 查看消费者组中未确认的消息统计信息 |
XPENDING key groupname start end count [consumer] | 查看具体的未确认消息列表 |
XDEL key ID [ID ...] | 删除指定的消息。
|
XTRIM key MAXLEN [=|~] threshold | 控制 Stream 的长度,删除旧消息。
|
XINFO STREAM key | 获取 Stream 的基本信息 |
XINFO GROUPS key | 获取 Stream 的所有消费者组信息 |
XINFO CONSUMERS key groupname | 获取消费者组中的所有消费者信息 |
XLEN sms_stream | 查看Stream长度 |
三、应用场景
采用Redis Streams 实现短信发送队列的异步处理,替代传统List列表无数据轮询模式,降低系统资源消耗。
角色定义
-
生产者:短信网关服务,将待发送短信封装为消息写入Redis Stream。
-
消费者组:多个短信处理节点(Consumer Group),每个节点负责消费部分消息。
-
消息格式:使用Protobuf序列化消息体,包含手机号、内容、模板ID等字段。
1.Redis命令实现
生产者通过XADD
命令写入Stream
XADD sms_stream * mobile "13800138000" content "您的验证码是1234" template_id "T1001"
消费者通过XREADGROUP
阻塞读取消息
XREADGROUP GROUP sms_group consumer1 COUNT 1 BLOCK 5000 STREAMS sms_stream >
消息处理成功后发送XACK
确认
XACK sms_stream sms_group "1678905140000-0"
消费者组初始化
XGROUP CREATE sms_stream sms_group $ MKSTREAM
消息重试机制,Pending List处理:定时扫描未ACK的消息(超过30秒未确认),重新投递
local pending = XPENDING sms_stream sms_group - + 10
for each message in pending do
XCLAIM sms_stream sms_group consumer2 30000 <message-id>
end
限制Stream最大长度(避免内存溢出)
四、与传统轮询模式对比
维度 | 传统轮询(List/LPOP) | Redis Streams |
---|---|---|
资源消耗 | 高频空轮询消耗CPU/网络 | 阻塞读取,无消息时挂起,降低30% CPU |
消息可靠性 | 无ACK机制,消息可能丢失 | 支持ACK+Pending List,确保至少一次投递 |
吞吐量 | 单消费者约5万条/秒 | 消费者组多节点并行,可达20万条/秒 |
功能扩展 | 仅基础队列 | 支持多消费者组、消息回溯、阻塞读取 |