Redis-Redis 高并发分布式锁

集群分布式场景高并发

1.negix配置代理和路由

高并发场景超卖问题

1.使用原生redis控制超卖时(若是商品,则可以将商品id作为锁对象),会遇到的问题

问题一:若直接使用:将获取锁的对象和设置的超时的时间分开,则不能控制原子性,如下所示

         Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "zhuge");
        stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);

 问题二:若直接使用:将获取锁的对象和设置的超时的时间放在一个原子操作里执行时,在临界条件下,当程序执行到最后准备释放锁时候,锁的超时时间已到,则此时的锁成为已过期,则释放不了锁而当下一个线程也来执行任务时,前一个任务将这个任务所拿的所给释放掉了(释放掉不属于自己的锁对象);则引入redisson分布式锁来解决当前的问题,redisson具有锁续命机制

@RestController
public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedisTemplate redisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() {
        String lockKey = "lock:product_101";
        //Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "zhuge");
        //stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
       String clientId = UUID.randomUUID().toString();
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS); //jedis.setnx(k,v)
        if (!result) {
            return "error_code";
        }
  
        try {
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        } finally {
           if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
                stringRedisTemplate.delete(lockKey);
            }
        }


        return "end";
    }

使用分布式锁redisson

redisson使用

引入对应的redission的jar包

        <dependency>
			<groupId>org.redisson</groupId>
			<artifactId>redisson</artifactId>
			<version>3.6.5</version>
		</dependency>

 设置redission配置


@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public Redisson redisson() {
        // 此为单机模式
        Config config = new Config();
        config.useSingleServer().setAddress("redis://localhost:6379").setDatabase(0);
        return (Redisson) Redisson.create(config);
    }

}

 redission的基本使用


@RestController
public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    @Autowired
    private RedisTemplate redisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() {
        String lockKey = "lock:product_101";
        //Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "zhuge");
        //stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
        /*String clientId = UUID.randomUUID().toString();
        Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS); //jedis.setnx(k,v)
        if (!result) {
            return "error_code";
        }*/
        //获取锁对象
        RLock redissonLock = redisson.getLock(lockKey);
        //加分布式锁
        redissonLock.lock();  //  .setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
        try {
            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); // jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock", realStock + ""); // jedis.set(key,value)
                System.out.println("扣减成功,剩余库存:" + realStock);
            } else {
                System.out.println("扣减失败,库存不足");
            }
        } finally {
            /*if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {
                stringRedisTemplate.delete(lockKey);
            }*/
            //解锁
            redissonLock.unlock();
        }


        return "end";
    }

Redission执行的逻辑流程

 Redission分布式锁加锁源码分析

        redissonLock.lock(); 

加锁 

 @Override
    public void lockInterruptibly() throws InterruptedException {
        lockInterruptibly(-1, null);
    }

执行 lockInterruptibly加锁逻辑

@Override
    public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId); //尝试去加锁,返回的时加锁后的过期时间
        // lock acquired
        if (ttl == null) {  //若ttl为null ,则表示加锁成功 ;若ttl不为null ,则往下走
            return;
        }

        RFuture<RedissonLockEntry> future = subscribe(threadId); //发布订阅,订阅前者执行的任务若提前执行完,则唤醒机制,去重新获取锁
        commandExecutor.syncSubscription(future);

        try {
            while (true) { //进入循环
                ttl = tryAcquire(leaseTime, unit, threadId); //再次尝试获取锁,返回加锁成功后的过期时间
                // lock acquired
                if (ttl == null) {  //若ttl为null ,则表示加锁成功 ;若ttl不为null ,则往下走
                    break;
                }

                // waiting for message
                if (ttl >= 0) { 若ttl大于o
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
//进行间歇性锁自旋逻辑,不占用cpu资源
                } else {
                    getEntry(threadId).getLatch().acquire();
                }
            }
        } finally {
            unsubscribe(future, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

执行tryAcquire(leaseTime, unit, threadId)尝试加锁逻辑

 private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
//leaseTime 默认设置为-1
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//执行加锁逻辑
        ttlRemainingFuture.addListener(new FutureListener<Long>() { //异步执行,锁续命逻辑
            @Override
            public void operationComplete(Future<Long> future) throws Exception {
                if (!future.isSuccess()) { //若加锁不成功,则退出
                    return;
                }

                Long ttlRemaining = future.getNow(); //若加锁成功,则ttlRemaining 为null
                // lock acquired
                if (ttlRemaining == null) {
                    scheduleExpirationRenewal(threadId); //加锁成功,则执行锁续命逻辑
                }
            }
        });
        return ttlRemainingFuture;
    }

执行tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); 加锁逻辑

  <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
//通过lua脚本来执行加锁逻辑,来保证原子性
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            //执行加锁逻辑 (key,argv与下面所传参数一一对应)
                  "if (redis.call('exists', KEYS[1]) == 0) then " +      
            //KEY[1]表示下面的getName(),
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +    
            //ARGV[2]表示下面的getLockName(threadId)
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " + 
            //ARGV[1]表示下面的internalLockLeaseTime
                      "return nil; " +
                  "end; " +
//执行锁重入逻辑(一个线程对同一个锁对象进行多次加锁,此为重入锁逻辑)
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);", //设置返回过期时间
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    

执行scheduleExpirationRenewal(threadId);锁续命逻辑

private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }

        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                //执行锁续命逻辑
                RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//KEYS[1]表示getName(),ARGV[2]表示锁对象getLockName(threadId), ARGV[1]表示过期时间
//判断当前锁的对象,为当前的线程对象,那么则当前的锁的对象设置原始的过期时间,以达到续命效果
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                        "end; " +
                        "return 0;",
                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
                
                //异步监听执行        
                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }
                        //判断是否任然持有锁,是的话,则getNow()为null
                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);//再次执行续命逻辑
                        }
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
            task.cancel();
        }
    }

在外层未获取到锁的线程  getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);间隙自旋获取锁对象

getLatch()表示信号量,信号量为1,则表示阻塞状态,最终通过发布订阅方式来唤醒当前被阻塞的线程,唤醒后则执行获取锁的逻辑 doAcquireSharedNanos(arg, nanosTimeout);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/189364.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

桥接设计模式

package com.jmj.pattern.bridge;/*** 视频文件(实现化角色)*/ public interface VideoFile {void decode(String fileName); }package com.jmj.pattern.bridge;public class RmvFile implements VideoFile{Overridepublic void decode(String fileName) {System.out.println(&…

论文阅读——MCAN(cvpr2019)

补充一下MCAN-VQA&#xff1a; 对图片的处理&#xff1a;首先输入图片到Faster R-CNN&#xff0c;会先设定一个判断是否检测到物体的阈值&#xff0c;这样动态的生成m∈[10,100]个目标&#xff0c;然后从检测到的对应的区域通过平均池化提取特征。第i个物体特征表示为&#xff…

ubuntu22.04系统下载程序和依赖,并拷贝到指定路径下

脚本1 apt install aptitude apt-get -d install xxx #xxx是待下载的安装包 mv /var/cache/apt/archives/* /home/tuners/1apt install aptitude apt-get -d install xxx mv /var/cache/apt/archives/*.deb /home/tuners/1 xxx 为程序包名称 /home/tuners/1为保存程序包的…

网络通信基础概念介绍

网络通信基础概念介绍 局域网LAN 局域网&#xff0c;即 Local Area Network&#xff0c;简称LAN。 局域网内的主机之间能方便的进行网络通信&#xff0c;又称为内网&#xff1b;局域网和局域网之间在没有连接的情况下&#xff0c;是无法通信的。 局域网是指在一个相对较小的…

微机课设--汇编语言在51单片机上写一个四位十进制加法器

代码如下 KEYVAL EQU 30HKEYTM EQU 31HKEYSCAN EQU 32HDAT EQU 33HSCANLED EQU 37HS_DAT EQU 38HD_DAT EQU 39HR_DATL EQU 3AHR_DATH EQU 3BH CALFLAG EQU 3CHFLAG BIT 00HORG 0000HLJMP MAINORG 000BHLJMP T0ISRORG 0030HMAIN:MOV SP,#5FHMOV TMOD,#01HMOV TH0,#0D8HMOV TL0,…

过渡曲线的构造之平面PH曲线

平面PH曲线的构造及其相应性质 平面PH曲线的构造及其相应性质PH曲线理论三次PH曲线的构造及性质四次PH曲线的构造及性质五次PH曲线的构造及性质非尖点五次PH曲线尖点五次PH曲线 参考文献 平面PH曲线的构造及其相应性质 过渡曲线常需要满足在连接点处位置连续、曲率连续以及切线…

如何看待 2023 OPPO 开发者大会?潘塔纳尔进展如何?AndesGPT 有哪些亮点?

在2023年11月16日举行的OPPO开发者大会&#xff08;ODC23&#xff09;上&#xff0c;OPPO带来了全新ColorOS 14、全新互联网服务生态以及健康服务进展&#xff0c;这些新动态中有许多值得关注的地方。 1、全新ColorOS 14&#xff1a; 效率提升&#xff1a;ColorOS 14通过一系列…

java基于springboot公益帮学网站 新闻发布系统的设计与实现vue

以Java为开发平台&#xff0c;综合利用Java Web开发技术、数据库技术等&#xff0c;开发出公益帮学网站。用户使用版块&#xff1a;可以选择注册并登录&#xff0c;可以浏览信息、可以网上互动、发布文章、内容推荐等。后台管理员管理版块&#xff1a;以管理员身份登录网站后台…

常见的数据库面试题含答案

1、什么是数据库&#xff1f; 数据库是一个组织和存储数据的集合&#xff0c;它采用特定的数据结构和管理模式&#xff0c;以提供对数据的高效访问和管理。 2、请解释 SQL 是什么&#xff1f; SQL&#xff08;Structured Query Language&#xff09;是一种用于管理和操作关系…

机器学习---贝叶斯网络与朴素贝叶斯

1. 贝叶斯法则 如何判定一个人是好人还是坏人&#xff1f; 当你无法准确的熟悉一个事物的本质时&#xff0c;你可以依靠与事物特定本质相关的事件出现的次数来判断 其本质属性的概率。如果你看到一个人总是做一些好事&#xff0c;那这个人就越可能是一个好人。 数学语言表达…

JVM字节码文件的相关概述解读

Java全能学习面试指南&#xff1a;https://javaxiaobear.cn 1、字节码文件 从下面这个图就可以看出&#xff0c;字节码文件是可以跨平台使用的 想要让一个Java程序正确地运行在JVM中&#xff0c;Java源码就必须要被编译为符合JVM规范的字节码。 https://docs.oracle.com/java…

【一文讲清楚 Anaconda 相关环境配置】

文章目录 0 前言1 Package 与环境1.1 module1.2 package1.3 环境 2 Conda、Miniconda、Anaconda和Pip & PyPI2.1 Conda2. 2 Miniconda2.3 Anaconda2.3.1 Anaconda Navigator2.3.2 Anaconda PowerShell Prompt & Anaconda Prompt2.3.3 Jupyter notebook 2.4 Pip & P…

【数据结构】什么是栈?

&#x1f984;个人主页:修修修也 &#x1f38f;所属专栏:数据结构 ⚙️操作环境:Visual Studio 2022 目录 &#x1f4cc;栈的定义 &#x1f4cc;元素进栈出栈的顺序 &#x1f4cc;栈的抽象数据类型 &#x1f4cc;栈的顺序存储结构 &#x1f4cc;栈的链式存储结构 链栈的进…

MySql之索引,视图,事务以及存储过程举例详解

一.数据准备 数据准备可参考下面的链接中的数据准备步骤 MySql之内连接&#xff0c;外连接&#xff0c;左连接&#xff0c;右连接以及子查询举例详解-CSDN博客 &#xff08;如有问题可在评论区留言&#xff09; 二.存储过程 1.定义 存储过程 PROCEDURE &#xff0c;也翻译…

【代码】基于量子粒子群算法(QPSO)优化LSTM的风电、负荷等时间序列预测算法matlab

程序名称&#xff1a;基于量子粒子群算法&#xff08;QPSO&#xff09;优化LSTM的风电、负荷等时间序列预测算法 实现平台&#xff1a;matlab 代码简介&#xff1a;代码是基于QPSO-LSTM的负荷、光伏、风电等时间序列预测&#xff0c;MATLAB编写。包含LSTM&#xff08;长短时记…

大数据技术之数据安全与网络安全——CMS靶场(文章管理系统)实训

大数据技术之数据安全与网络安全——CMS靶场(文章管理系统)实训 在当今数字化时代&#xff0c;大数据技术的迅猛发展带来了前所未有的数据增长&#xff0c;同时也催生了对数据安全和网络安全的更为迫切的需求。本篇博客将聚焦于大数据技术背景下的数据安全与网络安全&#xff…

面对困境时的力量——《难不难》与歌手荆涛的坚持

歌手荆涛演唱的《难不难》不仅是一首歌曲&#xff0c;更是一种精神的呈现。它告诉我们&#xff0c;面对问题时&#xff0c;只要我们坚持并勇往直前&#xff0c;一切困难都会变得简单。无论前方有多少险阻&#xff0c;总有过去的那一天&#xff0c;只要我们不放弃&#xff0c;就…

【计算机网络学习之路】日志和守护进程

文章目录 前言一. 日志介绍二. 简单日志1. 左字符串2. 右字符串 三. 守护进程1. ps -axj命令2. 会话扩展命令 3. 创建守护进程 结束语 前言 本系列文章是计算机网络学习的笔记&#xff0c;欢迎大佬们阅读&#xff0c;纠错&#xff0c;分享相关知识。希望可以与你共同进步。 本…

JDK、JRE、JVM的特点和关联

Java 的三个重要的概念是 JDK&#xff08;Java Development Kit&#xff09;、JRE&#xff08;Java Runtime Environment&#xff09;和 JVM&#xff08;Java Virtual Machine&#xff09;。它们之间有着密切的关联&#xff0c;同时又有不同的职责和特点。 JDK&#xff08;Java…

中伟视界:创新解决方案,搭建自适应的AI算法模型训练平台

搭建AI算法模型自训练平台是当今人工智能领域的热门话题&#xff0c;但是其中存在着许多技术难点需要克服。 自训练平台需要具备高效的算法模型&#xff0c;这就要求能够处理庞大的数据量并进行高速计算。 平台需要具备强大的数据管理及存储能力&#xff0c;以满足训练过程中的…