微信公众号访问地址:Redis实现消息队列
推荐文章:
1、springBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表;
2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据;
3、为什么引入Redisson分布式锁?
4、Redisson可重入锁原理
5、SpringBoot整合多数据源,并支持动态新增与切换(详细教程)
一、简介
消息队列是一种常用的通信模式,用于解耦消息的发送者和接收者,并实现异步处理。目前基于redis(版本大于5.0)实现消息队列的方案如下:
消息队列的定义及实现方式:
消息队列需要满足的要求:
1 | 顺序一致 | 即要保证消息发送的顺序和消费的顺序是一致的,不一致的话可能会导致业务上的错误。 |
2 | 消息确认机制 | 对于一个已经被消费的消息(已经收到ACK)不能再次被消费。 |
3 | 消息持久化 | 要具有持久化的能力,避免消息丢失,这样当消费者异常宕机导致再次重启后需要重新消费消息时可以再次获取。 |
二、基于LIST结构模拟消息队列
备注:一对一模式
案例如下:
消息发送端:
127.0.0.1:6379> lpush mesg 1 2
(integer) 2
127.0.0.1:6379> lpush mesg 3 4 5
(integer) 3
127.0.0.1:6379>
消息接收端:
127.0.0.1:6379> brpop mesg 20
1) "mesg"
2) "1"
(11.11s)
127.0.0.1:6379> brpop mesg 20
1) "mesg"
2) "2"
127.0.0.1:6379> brpop mesg 20
1) "mesg"
2) "3"
(12.37s)
127.0.0.1:6379> brpop mesg 20
1) "mesg"
2) "4"
127.0.0.1:6379> brpop mesg 20
1) "mesg"
2) "5"
127.0.0.1:6379> brpop mesg 20
(nil)
(20.09s)
127.0.0.1:6379>
总结如下:
三、基于PubSub的消息队列
备注:一对多模式(广播模式)
案例如下:
发布端:
127.0.0.1:6379> publish order.msg01 msg001
(integer) 2
127.0.0.1:6379> publish order.msg01 msg002
(integer) 2
127.0.0.1:6379> publish order.msg01 msg003
(integer) 2
127.0.0.1:6379> publish order.msg02 msg004
订阅端:
1、指定channel的订阅
127.0.0.1:6379> SUBSCRIBE order.msg01
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "order.msg01"
3) (integer) 1
1) "message"
2) "order.msg01"
3) "msg001"
1) "message"
2) "order.msg01"
3) "msg002"
1) "message"
2) "order.msg01"
3) "msg003"
2、模糊channel的订阅
127.0.0.1:6379> PSUBSCRIBE order.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "order.*"
3) (integer) 1
1) "pmessage"
2) "order.*"
3) "order.msg01"
4) "msg001"
1) "pmessage"
2) "order.*"
3) "order.msg01"
4) "msg002"
1) "pmessage"
2) "order.*"
3) "order.msg01"
4) "msg003"
1) "pmessage"
2) "order.*"
3) "order.msg02"
4) "msg004"
运行情况如下:
总结如下:
四、基于Stream的消息队列
详细内容参考:https://www.knowledgedict.com/tutorial/redis-streams.html
4.1、发送消息的命令:xadd
案例如下:
10.1.50.157:0>XADD student * name xiaoli sex 男 age 21
"1691919968658-0"
10.1.50.157:0>XLEN student
"1"
10.1.50.157:0>
效果如下:
4.2、读取消息的方式之一:xread
案例如下:
发送端:
10.1.50.157:0>XADD student * name xiaoli sex 男 age 21
"1691919968658-0"
10.1.50.157:0>XLEN student
"1"
10.1.50.157:0>XADD student * name xiaoli02 sex 女 age 20
"1691920556051-0"
10.1.50.157:0>XADD student * name xiaoli02 sex 女 age 28
"1691920957334-0"
10.1.50.157:0>XADD student * name xiaoli02 sex 女 age 29
"1691920975931-0"
10.1.50.157:0>
消费端:
10.1.50.157:0>XREAD COUNT 1 BLOCK 0 STREAMS student $
1) 1) "student"
2) 1) 1) "1691920851885-0"
2) 1) "name"
2) "xiaoli02"
3) "sex"
4) "女"
5) "age"
6) "21"
数据展示:
总结如下:
4.3、读取消息的方式之二:消费者组
常见使用命令如下:
4.3.1、创建消费者组命令:
参考案例:
10.1.50.157:0>XGROUP CREATE student gs01 0
"OK"
4.3.2、从消费者组读取消息命令:
参考案例:
-- 消费者c01
10.1.50.157:0>XREADGROUP GROUP gs01 c01 COUNT 1 BLOCK 2000 STREAMS student >
1) 1) "student"
2) 1) 1) "1691920851885-0"
2) 1) "name"
2) "xiaoli02"
3) "sex"
4) "女"
5) "age"
6) "21"
10.1.50.157:0>XREADGROUP GROUP gs01 c01 COUNT 1 BLOCK 2000 STREAMS student >
1) 1) "student"
2) 1) 1) "1691920917913-0"
2) 1) "name"
2) "xiaoli02"
3) "sex"
4) "女"
5) "age"
6) "25"
-- 消费者c02
10.1.50.157:0>XREADGROUP GROUP gs01 c02 COUNT 1 BLOCK 2000 STREAMS student >
1) 1) "student"
2) 1) 1) "1691920870086-0"
2) 1) "name"
2) "xiaoli02"
3) "sex"
4) "女"
5) "age"
6) "23"
XACK命令:
参考案例:
10.1.50.157:0>XACK student gs01 1691920917913-0
"1"
XPENDING命令:
参考案例:
10.1.50.157:0>XPENDING student gs01 - + 10
1) 1) "1691919968658-0"
2) "c01"
3) "2523762"
4) "1"
2) 1) "1691920556051-0"
2) "c01"
3) "2500637"
4) "1"
4.4、在java中的应用
4.5、总结
五、全文总结
对于中小型企业,对于消息机制要求不算太严格,推荐使用Stream,基本上满足要求了。但是对于大型企业,对消息要求比较严格,还是推荐使用更更专业的消息中间件,像:RabbitMQ、Kafka等等,因为Stream只满足消费者的ACK确认机制,生产者并不满足。
更多优秀文章,请关注个人微信公众号或搜索“程序猿小杨”查阅。