Redis:原理速成+项目实战——Redis实战10(Redis消息队列实现异步秒杀)

👨‍🎓作者简介:一位大四、研0学生,正在努力准备大四暑假的实习
🌌上期文章:Redis:原理速成+项目实战——Redis实战9(秒杀优化)
📚订阅专栏:Redis:原理速成+项目实战
希望文章对你们有所帮助

上一节已经实现了异步秒杀,也就是将秒杀分为两个环节:
1、判断是否有抢单资格(库存量是否充足、是否满足一人一单)、
2、下单操作(优惠券表中的库存量-1,订单表增加相应信息)
其中,第一步的操作放在了Redis中,可以有效提高效率,而真正大幅度提高效率的点还是因为我们将下单的操作交给了另一个开辟的线程,因为对数据库的操作并不需要什么时效性。

异步执行所需要的信息被封装并保存到了阻塞队列中,上一节分析了这会造成的问题:
1、内存限制问题
2、数据安全问题

消息队列可以解决这个问题,一般建议用专业的消息中间件来使用,最主流的当然就是RabbitMQ了,但是这边也讲解一下用Redis里面的一些数据结构来模拟出消息队列的效果,实现的话我感觉也挺容易的,只演示基于Stream消息队列实现异步秒杀。

Redis消息队列实现异步秒杀

  • 认识消息队列
  • 基于List实现消息队列
  • PubSub实现消息队列
  • Stream的单消费模式
  • Stream的消费者组模式
  • 基于Stream消息队列实现异步秒杀

认识消息队列

消息队列,也就是存放消息的队列,最简单的消息队列包括3个角色:
(1)消息队列(代理):存储和管理信息
(2)生产者:发消息到消息队列
(3)消费者:从消息队列中获取消息并处理
因此,异步秒杀的思路为:
在这里插入图片描述

这个思路与上一节用阻塞队列的思路是差不多的,但是有2点重要区别:
1、消息队列是JVM以外的独立服务,不受JVM内存的限制
2、消息队列不仅仅做数据存储,还确保了数据安全,存到消息队列中的消息会做持久化处理,并要求消费者要做出消息的确认,否则会持续将消息传递给消费者,确保消息至少被“签收”一次

基于List实现消息队列

List是一种双向链表,很容易模拟出队列。
需要注意的是,当消息队列中没有消息的时候,我们应当要让线程等待,而不是直接返回Null,因此这儿要用BRPOPBLPOP来实现阻塞效果(B表示阻塞)

优点:
(1)利用Redis存储,不受限于JVM内存上限
(2)基于Redis的持久化机制,保证数据安全性
(3)满足消息有序性
缺点:
(1)无法避免消息丢失(消息会从队列直接移除)
(2)只支持单消费者

PubSub实现消息队列

PubSub(发布订阅)是Redis2.0引入的,消费者可以订阅一个或多个channel(频道),生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

优点:采用发布订阅模型,支持多生产、多消费
缺点:
(1)不支持数据持久化
(2)无法避免消息丢失
(3)消息堆积有上线,超出时数据丢失

Stream的单消费模式

Stream是Redis5.0引入的一种新数据类型,可以实现功能完善的消息队列。
在这里插入图片描述
例如:
在这里插入图片描述
读取消息:XREAD
在这里插入图片描述
例如,用XREAD读第一个消息:

XREAD COUNT 1 STREAMS users 0

用XREAD阻塞方式读取最新消息:

XREAD COUNT 1 BLOCK STREAMS users $

所以,在开发的时候,可以循环调用XREAD阻塞方式来查询最新消息,从而实现持久监听队列。
但是,当指定起始ID为$读取最新消息,处理一条消息的过程中,又有超过1条以上的消息到达队列,则下次获取也只能获取到最新的一条,会出现消息漏读

特点:
(1)消息可回溯
(2)一个消息可以被多个消费者读取
(3)可以阻塞读取
(4)有消息漏读的风险

Stream的消费者组模式

这一部分命令还是麻烦了,理解就行,要使用就去看文档就好了。

消费者组可以解决消息漏读的问题。
消费者组:将多个消费者划分到一个组中,监听同一个队列。

特点:
1、消息分流:队列中的消息会分流给组内不同消费者,而不是重复消费,从而加快消息处理速度
2、消息标识:消费者组会维护一个标识,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标识之后去读取消息,确保了每一个消息都会被消费
3、消息确认:消费者获取消息后,消息处于pending状态,存入pending-list,当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list中移出,可以解决消息丢失的问题

创建消费者组:
XGROUP CREATE key groupName ID [MKSTREAM]
删除指定消费者组
XGROUP DESTROY key groupName
给指定的消费者组添加消费者
XGROUP CREATECONSUMER key groupName consumerName
删除消费者组中的指定消费者
XGROUP DELCONSUMER key groupName consumerName

从消费者组读取消息:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key ID
其中,ID表示获取消息的起始ID:
(1)“>”:从下一个未消费的消息开始
(2)其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中第一个消息开始

基于Stream消费者组,我们利用消费者监听消息的基本思路:
1、使用阻塞模式尝试监听队列,没消息就继续监听,有消息就开线程处理消息,并在完成后ACK。
2、若没有成功ACK,抛出异常,那么消息就会留在padding-list中,这时候就需要读取padding-list获取异常消息并处理。

STREAM类型消息队列的XREADGROUP命令特点:

1、消息可回溯
2、可以多消费者争抢消息,加快消费速度
3、可以阻塞读取
4、没有消息漏读的风险
5、有消息确认机制,保证消息至少被消费一次

基于Stream消息队列实现异步秒杀

1、创建Stream类型的消息队列stream.orders和消费者组:

XGROUP CREATE stream.orders g1 0 MKSTREAM # 组名g1,起始位置为0

在这里插入图片描述
2、修改之前秒杀下单的Lua脚本,在认定有抢购资格后,直接向stream.orders中添加消息,内容包括voucherId、userId、orderId:

-- 1 参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]
-- 1.3 订单id
local orderId = ARGV[3]

-- 2 数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3 脚本业务
-- 3.1 判断库存是够充足
if(tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2 库存不足,返回1
    return 1
end
-- 3.2 判断用户是否下单,即判断用户id是不是这个set集合的成员
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.2 存在,说明重复下单
    return 2
end
-- 3.4 扣库存
redis.call('incrby', stockKey, -1)
-- 3.5 下单(保存用户)
redis.call('sadd', orderKey, userId)
-- 3.6 发送消息到队列中,orderId的key指定为Id更好,因为订单实体类是这么定义的
redis.call('xadd', 'stream.orders', '*', 'userId', userId, 'voucherId', voucherId, 'Id', orderId)
return 0

与上次的代码相比,我们多增加了一个参数,所以我们要修改一下函数的调用:
在这里插入图片描述
这个参数的增加,在后续的编写中会省去一些麻烦。

3、项目启动时,开启一个线程任务,尝试获取stream.orders中的消息,完成下单,整体的业务流程的代码如下:

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    //注入秒杀优惠券的service
    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private RedissonClient redissonClient;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    public static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    //线程池
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    //线程任务,用户随时都能抢单,所以应该要在这个类被初始化的时候马上开始执行
    @PostConstruct  //该注解表示在当前类初始化完毕以后立即执行
    private void init(){
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        //获取用户,用户id不能再从UserHolder中取了,因为现在是从线程池获取的全新线程,不是主线程
        Long userId = voucherOrder.getUserId();
        //创建锁对象
        RLock lock = redissonClient.getLock("lock:order:" + userId);
        //获取锁
        boolean isLock = lock.tryLock();
        //判断是否获取锁成功
        if(!isLock){
            log.error("不允许重复下单");//理论上不会发生
        }
        try {
            proxy.createVoucherOrder(voucherOrder);
        } finally {
            lock.unlock();
        }
    }

    IVoucherOrderService proxy;

    private class VoucherOrderHandler implements Runnable{
        String queueName = "stream.orders";
        @Override
        public void run() {
            while (true){
                try {
                    //获取消息队列中的订单信息,XREADGROUP GROUP g1 c1 BLOCK 2000 STREAMS stream.orders
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"),
                            StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                            StreamOffset.create(queueName, ReadOffset.lastConsumed())
                    );
                    //判断消息获取是否成功
                    if(list == null || list.isEmpty()) {//获取失败,说明没有消息,继续下一次循环
                        continue;
                    }
                    //解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    //将其转变为VoucherOrder对象,忽略异常
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    //获取成功,下单
                    handleVoucherOrder(voucherOrder);
                    //ACK确认
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
                } catch (Exception e) {
                    //异常则表示没有被ACK确认,剩下的操作都是针对pending-list的
                    log.error("处理订单异常", e);
                    handlePendingList();
                }
            }
        }

        private void handlePendingList() {
            while (true){
                try {
                    //获取pending-list中的订单信息 XREADGROUP g1 c1 COUNT 1 STREAMS stream.orders 0
                    List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                            Consumer.from("g1", "c1"), //消费者信息
                            StreamReadOptions.empty().count(1),
                            StreamOffset.create(queueName, ReadOffset.from("0"))
                    );
                    if(list == null || list.isEmpty()) {
                        //获取失败,说明没有消息,结束循环
                        break;
                    }
                    //解析消息中的订单信息
                    MapRecord<String, Object, Object> record = list.get(0);
                    Map<Object, Object> values = record.getValue();
                    //将其转变为VoucherOrder对象,忽略异常
                    VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(values, new VoucherOrder(), true);
                    //获取成功,下单
                    handleVoucherOrder(voucherOrder);
                    //ACK确认
                    stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
                } catch (Exception e) {
                    log.error("处理pending-list订单异常", e);
                }
            }
        }
    }

    //秒杀优化,调用Lua的代码
    @Override
    public Result seckillVoucher(Long voucherId) {
        //获取用户
        Long userId = UserHolder.getUser().getId();
        //获取订单id
        long orderId = redisIdWorker.nextId("order");
        //执行Lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString(), String.valueOf(orderId)
        );
        //判断结果是否为0
        int r = result.intValue();
        if(r != 0){
            //不为0,没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
        //获取代理对象
        proxy = (IVoucherOrderService) AopContext.currentProxy();

        //返回订单id
        return Result.ok(orderId);
    }
    
    @Transactional(rollbackFor = Exception.class)
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        Long voucherId = voucherOrder.getVoucherId();
        //查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherId).count();
        //判断是否存在
        if (count > 0) {
            log.error("不可重复购买");
        }
        //扣减库存
        boolean success = seckillVoucherService.update().
                setSql("stock = stock - 1").
                eq("voucher_id", voucherId).
                gt("stock", 0).
                update();
        if (!success) {
            log.error("库存不足");
        }
        //保存订单
        this.save(voucherOrder);
    }
}

我觉得真的还是太麻烦了。。。而且我遇到了很多次bug,反正都跟线程池有关系,自己修改bug的能力一般,耽误了不少时间,这方面能力要提高。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/310685.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

zookerper入门

zookerper介绍 ZooKeeper 是一个开源的分布式协调框架,主要用来解决分布式集群中应用系统的一致性问题. ZooKeeper本质上是一个分布式的小文件存储系统&#xff08;Zookeeper文件系统监听机制&#xff09;.提供基于类似于文件系统的目录树方式的数据存储&#xff0c;并且可以…

PFA试剂瓶——实验室存储运输化学试剂样品容器

PFA是一种高性能的塑料材料。它是一种热塑性塑料&#xff0c;由全氟化&#xff0c;聚合物制成&#xff0c;具有高度的化学稳定性性。由于其优异的性能&#xff0c;PFA被广泛应用于多个领域&#xff0c;尤其是作为存储和运输各种化学试剂的容器&#xff0c;耐受-200℃至260C的温…

闩锁效应(Latch-up)

闩锁效应&#xff08;Latch-up&#xff09;原理解析 什么是闩锁效应&#xff08;Latch-up&#xff09;&#xff1f; 在CMOS N阱设计中&#xff0c;实际上是由于CMOS电路中基极和集电极相互连接的两个PNP和NPN双极性BJT管子(下图中&#xff0c;侧面式NPN和垂直式PNP)的回路放大…

AI实景自动直播项目怎么样?解决实体行业直播难题!

在如今的互联网时代&#xff0c;作为实体老板想要在激烈的同行竞争中占领优势&#xff0c;那短视频和直播必然是要做的推广渠道之二&#xff0c;但是最近短视频流量持续下滑&#xff0c;带来的订单量越来越少&#xff0c;必然直播将成为常态化的宣传动作&#xff0c;如今抖捧AI…

Allure07-动态生成用例标题

Allure07-动态生成用例标题 高清B站链接 Allure报告清空上一次运行的记录 使用pytest-h 可以查勘报告相关的三个参数 reporting 报告相关参数 –alluredirDIR 指定报告的目录路径 –clean-alluredir 如果已经存在报告&#xff0c;就先清空它 –allure-no-capture 不加载 log…

Ubuntu 22.04.3 LTS arm64 aarch64 ISO jammy-desktop-arm64.iso 下载

Ubuntu 22.04.3 LTS (Jammy Jellyfish) Daily Build 参考 Are there official Ubuntu ARM / aarch64 desktop images? - Ask Ubuntu

Linux第26步_在虚拟机中安装stm32wrapper4dbg工具

在Ubuntu下编译TF-A 或者 Uboot时&#xff0c;我们需要用到ST公司提供的stm32wrapper4dbg工具。stm32wrapper4dbg工具的源码下载地址为: GitHub - STMicroelectronics/stm32wrapper4dbg 记得我们在前面已经创建过的目录如下&#xff1a; 1&#xff09;、在根目录下&#xf…

0 ZigBee无线通信概念实验、抓包

胜达电子学习笔记&#xff1a;lesson5 ZigBee无线通信概念实验、抓包 5.无线通信概论5.1 理解 Lesson5-Sendmain&#xff1a;主函数halRfInit&#xff1a;射频初始化RFSend&#xff1a;无线数据发送出去SendPacket 数组 5.2 理解 Lesson5-ReceiveRevRFProc() 无线接收函数 5.3…

C#基础:通过QQ邮件发送验证码到指定邮箱

一、控制台程序 using System; using System.Net; using System.Net.Mail;public class EmailSender {public void SendEmail(string toAddress, string subject, string body){// 设置发件人邮箱地址以及授权码string fromAddress "xxxxxqq.com";string password …

SIT1050ISO具有隔离功能,1Mbps,高速 CAN 总线收发器

➢ 完全兼容“ ISO 11898 ”标准&#xff1b; ➢ 内置过温保护&#xff1b; ➢ 100kV/s 瞬态抗扰度&#xff1b; ➢ 显性超时功能&#xff1b; ➢ -40V 至 40V 的总线故障保护&#xff1b; ➢ I/O 电压范围支持 3.3V 和 5V MCU &#xff1b; ➢ 低环路延迟…

MS6001S1A低功耗、低噪声 CMOS 轨到轨输入输出运算放大器

MS6001S1A 运算放大器具有极低功耗&#xff0c;轨到轨输入输出&#xff0c;低 的输入失调电压和低的电流噪声。具体表现为可工作在幅度为 1.8V 到 5V 的单电源或者双电源条件&#xff0c;低功耗和低噪声使得 MS6001S1A 能够用在可移动设备上&#xff0c;输入输出的轨到…

【YOLO系列】 Smooth L1 Loss、IOU、GIOU、DIOU、CIOU(附代码实现)

Smooth L1 Loss、IOU、GIOU、DIOU和CIOU都是用于评估模型预测准确性的指标&#xff0c;但它们在计算方式和应用场景上有所不同。 一、Smooth L1 Loss Smooth L1 Loss主要用于回归问题&#xff0c;是由微软的Ross Girshick大神在Fast R-CNN论文中提出的。将Smooth L1 Loss之前应…

vue.js环境在window和linux安装

nodei官网&#xff1a;https://nodejs.org/en/download/ 一.windows环境下安装vue 1&#xff1a;node安装 在node.js的官网上下载node的安装包&#xff0c;下载下来之间安装即可&#xff0c;在命令行输入 npm -vnode -v 如下表示安装成功 2&#xff1a;cnpm安装 npm inst…

怎么修改照片尺寸的?分享3个实用的工具!

在数字时代&#xff0c;照片已经成为我们记录生活、分享经历的重要方式。然而&#xff0c;不同的平台和应用对照片尺寸的要求各不相同&#xff0c;这就需要我们经常对照片进行修改。本文将为您介绍如何修改照片尺寸&#xff0c;以及一些实用的工具。 一、手机应用 手机应用同样…

Backtrader 文档学习-Strategy with Signals

Backtrader 文档学习-Strategy with Signals backtrader可以不通过重写策略的方式触发交易&#xff0c;尽管重写策略是首选通用的方式。 下面介绍通过使用信号也是可以实现交易触发的。 1.定义signal import backtrader as btdata bt.feeds.OneOfTheFeeds(datanamemydatana…

非常好用的个人工作学习记事本Obsidian

现在记事本有两大流派&#xff1a;Obsidian 和Notion&#xff0c;同时据说logseq也很不错 由于在FreeBSD下后两种都没有相关ports&#xff0c;所以优先尝试使用Obsidian Obsidian简介 Obsidian是基于Markdown文件的本地知识管理软件&#xff0c;并且开发者承诺Obsidian对于个…

手机直连卫星及NTN简介

一、手机直连卫星的发展现状 近日&#xff0c;华为推出了支持北斗卫星短报文的Mate 50旗舰机、P60系列&#xff0c;苹果也跟Globalstar&#xff08;全球星&#xff09;合作推出了支持卫星求救的iPhone14&#xff0c;最亮眼的还是华为的。这几款产品揭开了卫星通信探索消费领域…

Aigtek高压放大器的工作原理和指标应用介绍

高压放大器是一种用于放大高压信号的电子设备&#xff0c;具有高压输出&#xff0c;低噪声&#xff0c;高精度&#xff0c;高稳定性&#xff0c;高可靠性&#xff0c;低功耗&#xff0c;低成本等的优点&#xff0c;所以才被广泛应用在磁场探测、电磁脉冲放大、电磁波放大、电磁…

omics简介

omics简介 公众号pythonic生物人写的系列文章介绍了组学的相关内容&#xff0c;本文仅做了一个简单的知识框架&#xff0c;供后面遇到对应问题&#xff0c;快速查阅。欢迎大家去关注原作者。 这篇文章也非常值得阅读&#xff1a;肿瘤NGS行业新人如何构建自己的知识体系-思考问题…

Win系统搭建Elasticsearch实现公网远程访问本地服务

文章目录 系统环境1. Windows 安装Elasticsearch2. 本地访问Elasticsearch3. Windows 安装 Cpolar4. 创建Elasticsearch公网访问地址5. 远程访问Elasticsearch6. 设置固定二级子域名 Elasticsearch是一个基于Lucene库的分布式搜索和分析引擎&#xff0c;它提供了一个分布式、多…