M功能-分布式锁-支付平台(五)

target:离开柬埔寨倒计时-218day

在这里插入图片描述

珍藏的图片又拿出来了

前言

M系统中的撮合引擎是最最核心的功能,第一版的撮合引擎不是我写的,也没有做交易对的动态分配这样的功能,都是基于抢锁方式来决定谁拥有该交易对的撮合权限,所以锁就至关重要了,本来最简单的方法就是只起一个java进程,然后用jdk的锁就不用担心这些问题了,但是当交易对多的时候,一个进程就不一定能及时的处理这些订单,所以还是需要多台机器同步进行处理,所以还是需要分布式锁。

我接触到的最初版本

我初次接触这个系统是在2019年初

记得是在那年4月还是5月的时候,发生了一个异常,同一个订单撮合了两次,本来那个订单在第一次撮合后就已经全部成交了,所以紧跟着就来了第二笔撮合,那时的负责人让我协助排查这个问题,我就一脸懵的开始了排查之路

  • 首先我快速熟悉这套交易流程,让负责人给我讲解;
  • 根据交易流程,发现问题出现的原因一定在撮合引擎上面;
  • 查看撮合引擎的日志

当时撮合引擎的线程名称是撮合引擎前缀+交易对+编号,排查日志很容易发现其中有两个线程名称和相似,只有编号不一样,交易对是一样的,这就意味着同一个交易对有两个线程在进行撮合,因为这两个线程处于不同的jvm进程内,所以就没办法共享订单簿内存,这样就会出现撮合多次的情况了。

看到这里我不禁心想,这不是锁住了吗,怎么会还出现同一个交易对被两个线程都撮合的情况呢,除非这个锁没有锁住,我先是去查看了加锁的逻辑,加锁使用的是redisson,加锁的key是交易对,所以从逻辑上看是没什么问题的;然后我就继续排查日志,我看到第二台服务器的那个线程产生撮合日志的时间就在几个小时前,属于我就着重去找了那段时间的日志;

从里面的日志我看到了一条很有嫌疑的日志,不能更改锁的过期时间,这时候我隐约知道问题出现的原因了

先来看一段redisson锁里面的一段关键代码片段

类:org.redisson.RedissonLock

// 这个其实就是给redisson锁保活的一个续命任务
private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    // 就是这里,如果这里发生了异常,就不会执行下面的对自己的调用
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    // 其实当时key是存在的,只是发生了网络问题,所以没有到这个分支
                    if (res) {
                        // reschedule itself
                        // 每次续命成功才会继续发起下一次的续命
                        renewExpiration();
                    }
                });
            }
           // 这里续命时间默认是锁超时时间的1/3,也就是说默认30s的话,会每10s发起一次续命
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

其实这个续命任务在多数场景下都是足以支持的了,像我遇到的这个场景是比较少见的,当然也可以增大锁的超时时间,但是多长的时间能满足呢,这些都是问题,所以基于这个场景我写了个基于mysql的锁来支持这个功能。

Mysql实现简单的分布式锁

首先是一个大的抽象类,实现lock接口

package com.littlehow.lock;

import java.util.Date;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public abstract class DefaultLock implements Lock {

    private final String id;
    protected final String key;
	// 主要是此处存放锁定key和id使用
    protected DefaultLock(String key) {
        this.id = UUID.randomUUID().toString().replace("-", "");
        this.key = key;
    }

    // 获取锁id
    public String getId(long threadId) {
        return id + ":" + threadId;
    }

    public String getKey() {
        return key;
    }

    @Override
    public boolean tryLock() {
        try {
            return tryLock(-1, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

    @Override
    public void lock() {
        try {
            lockInterruptibly();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public Condition newCondition() {
        return new Condition() {
            @Override
            public void await() throws InterruptedException {

            }

            @Override
            public void awaitUninterruptibly() {

            }

            @Override
            public long awaitNanos(long nanosTimeout) throws InterruptedException {
                return 0;
            }

            @Override
            public boolean await(long time, TimeUnit unit) throws InterruptedException {
                return false;
            }

            @Override
            public boolean awaitUntil(Date deadline) throws InterruptedException {
                return false;
            }

            @Override
            public void signal() {

            }

            @Override
            public void signalAll() {

            }
        };
    }
}

真正实现逻辑的分布式锁实现类

package com.littlehow.lock;

import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class DistributedLock extends DefaultLock {
    private final static Map<String, ScheduledFuture> scheduleFuture = new ConcurrentHashMap<>();
    private final static AtomicInteger threadId = new AtomicInteger(1);
    // 使用默认的拒绝策略AbortPolicy 新任务来了抛出拒绝异常即可
    private final ExecutorService pool = new ThreadPoolExecutor(8, 20, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100000),
            r -> new Thread(r, "littlehow-lock-" + threadId.getAndIncrement()));

    private final ScheduledExecutorService schedule = Executors.newSingleThreadScheduledExecutor((r) -> new Thread(r,"DistributedLock-thread"));
    private static final long defaultTimeout = 60000L;
    private final long expired;
    private ContinueLife continueLife;
    private final LockService lockService;

    public DistributedLock(long expired, LockService lockService, String key) {
        this(expired, null, lockService, key);
    }

    public DistributedLock(long expired, ContinueLife continueLife, LockService lockService, String key) {
        super(key);
        this.expired = expired;
        this.continueLife = continueLife;
        this.lockService = lockService;
    }

    @Override
    public void lockInterruptibly() {
        tryLock(-1, TimeUnit.MILLISECONDS);
    }

    /**
     * 如果要实现重入,可以在这里获取锁成功后计数到ThreadLocal,不用考虑计数失败,因为在这里操作计数失败只能是发生了不可控的异常
     * 想要保证原子性的话,计数就可以放到底层,如mysql表这些来设置,此处因为没有重入的需求,所以就没有实现加锁去锁的计数
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) {
        final String id = getId(Thread.currentThread().getId());
        // 这里实际上使用的ip获取工具获取的,此处就写死
        String ip = "192.168.1.1";
        log.debug("get lock key={}, id={}, ip={}", key, id, ip);
        Future<Boolean> future = pool.submit(() ->  this.lockService.tryLock(this.key, id, System.currentTimeMillis() + expired, ip));
        try {
            boolean lock = future.get(time == -1L ? defaultTimeout : time, unit);
            if (lock && continueLife != null) {
                final String cacheKey = key + "-" + id;
                if (!scheduleFuture.containsKey(cacheKey)) {
                    ScheduledFuture taskFuture = schedule.scheduleWithFixedDelay(() -> {
                            boolean flag = this.continueLife.flushLife(key, id, System.currentTimeMillis() + expired) ;
                            //如果续命返回false,则会清除续命任务
                            if (!flag) {
                                cancelContinueTask(cacheKey);
                            }
                        },
                        expired / 3, expired / 3, TimeUnit.MILLISECONDS);
                    scheduleFuture.put(cacheKey, taskFuture);
                }
            }
            return lock;
        } catch (Exception e) {
            log.debug("get lock fail key={} id={} message={}", key, getId(Thread.currentThread().getId()), e.getMessage());
        }
        return false;
    }

    @Override
    public void unlock() {
        String id = getId(Thread.currentThread().getId());
        try {
            log.info("unlock key={}, id={}", key, id);
            this.lockService.unlock(this.key, id);
        } catch (Throwable t) {
            log.error("解锁异常", t);
            cancelContinueTask(key + "-" + id);
        }
    }

    private void cancelContinueTask(String cacheKey) {
        //停止相应的续命任务
        ScheduledFuture tf = scheduleFuture.get(cacheKey);
        if (tf == null) return;
        log.info("continue life fail key={}", key);
        tf.cancel(true);
        log.info("clear task key={}, result={}", key, tf.isCancelled());
        scheduleFuture.remove(cacheKey);
    }
}

下面是锁接口和续命接口

package com.littlehow.lock;

public interface LockService {
    // 获取锁
    boolean tryLock(String key, String id, long expired, String ip);
	
    // 解锁
    void unlock(String key, String id);
}

=====================================================================================

package com.littlehow.lock;

public interface ContinueLife {
    // 刷新过期时间
    boolean flushLife(String key, String id, long time);
}

然后是mysql实现的一套锁,基于上面的基础接口和类

package com.littlehow.lock.support.mysql;

import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;

/**
 * @author littlehow
 * @since 5/28/24 19:43
 */
@Setter
@Getter
@Accessors(chain = true)
public class LockModel {
    /**
     * 锁的关键key
     */
    private String key;

    /**
     * 锁的机器ip地址
     */
    private String ip;

    /**
     * 锁的实际id
     */
    private String lockId;

    /**
     * 锁的过期时间
     */
    private Long expireTime;
}

=====================================================================================
package com.littlehow.lock.support.mysql;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;


@Component
@Slf4j
public class MysqlLockSupport {
    @Value("${lock.warn.time:30000}")
    private long warnTime;

    public boolean tryLock(String key, String id, long expired, String ip) {
        Assert.hasText(id, "lock id must be not null");
        LockModel lockModel = new LockModel().setLockId(id).setExpireTime(expired)
                .setKey(key).setIp(ip);
        // mysql实际实现细节就不具体写出来了,下面就写个伪代码
        // 实际代码是去数据库拉取信息,然后根据数据库信息进行下面的判定
        LockModel dbLock = lockModel;
        if (dbLock == null) {
            // 进行保存,保存成功才返回true,否则返回false,对唯一约束异常也要做保存失败处理
            return true;
        } else if (id.equals(dbLock.getLockId())) {
            // 同一个线程获取两次锁,直接返回true
            // 重入逻辑可以在上层使用ThreadLocal实现,这里就不实现数据库的计数了
            return true;
        } else {
            // 这里就是其他线程在对此进行抢锁操作
            // 如果时间超过了配置的警告时间,则进行错误日志答应,报警处理
            if (System.currentTimeMillis() - warnTime > dbLock.getExpireTime()) {
                log.error("key {} deadlock for {}, ip address {}", key, dbLock.getLockId(), dbLock.getIp());
            }
        }
        return false;
    }

    public void unlock(String key, String id) {
        // 如果支持重入的锁,那么上层逻辑一定要减去对应的值,最终等于1才调用此处的逻辑
        // 此处的代码就相当于是更新三个值,一个锁的过期时间,一个是锁的lockId。一个是ip地址,都进行置空处理
        // 因为这个是为撮合引擎定制的锁,所以这个key才不进行删除,因为此处的key就相当于是交易对,这些交易对基本都是固定的,只会增加,基本不会出现减少的情况
    }

    public boolean updateLockExpired(String key, String id, long time) {
        log.info("start continue life key={}, id={}, time={}", key, id, time);
        // 这里是更新锁的续命时间, 如果更新续命时间成功,则返回true即可
        return true;
    }
}

=====================================================================================
package com.littlehow.lock.support.mysql;

import com.littlehow.lock.LockService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MysqlLockService implements LockService {

    @Autowired
    private MysqlLockSupport lockSupport;

    @Override
    public boolean tryLock(String key, String id, long expired, String ip) {
        try {
            return lockSupport.tryLock(key, id, expired, ip);
        } catch (Throwable t) {
            log.error("获取锁异常", t);
            return false;
        }
    }

    @Override
    public void unlock(String key, String id) {
        lockSupport.unlock(key, id);
    }

}

=====================================================================================
package com.littlehow.lock.support.mysql;

import com.littlehow.lock.ContinueLife;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MysqlContinueLife implements ContinueLife {

    @Autowired
    private MysqlLockSupport lockSupport;

    @Override
    public boolean flushLife(String key, String id, long time) {
        try {
            return lockSupport.updateLockExpired(key, id, time);
        } catch (Throwable t) {//出现异常返回true,下次续命任务会继续进行
            log.error("锁续命异常", t);
        }
        return true;
    }
}

=====================================================================================
package com.littlehow.lock.support.mysql;

import com.littlehow.lock.DistributedLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;

@Component
public class MysqlLockFactory {
    @Value("${lock.expired:30000}")
    private long expired;

    @Autowired
    private MysqlContinueLife continueLife;

    @Autowired
    private MysqlLockService lockService;

    private static final Map<String, Lock> locks = new HashMap<>();

    /**
     * 获取锁信息
     * @param key
     * @return
     */
    public Lock getLock(String key) {
        Lock lock = locks.get(key);
        if (lock == null) {
            synchronized (this) {
                lock = locks.get(key);
                if (lock == null) {
                    lock = new DistributedLock(expired, continueLife, lockService, key);
                    locks.put(key, lock);
                }
            }
        }
        return lock;
    }
}

然后就是调用了

package com.littlehow.lock;

import com.littlehow.lock.support.mysql.MysqlLockFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.concurrent.locks.Lock;


/**
 * @author littlehow
 * @since 5/28/24 20:04
 */
@Slf4j
public class TestLock {

    @Autowired
    private MysqlLockFactory mysqlLockFactory;

    /**
     * 这里可以使用junit进行测试调用
     */
    public void test() {
        Lock lock = mysqlLockFactory.getLock("USD/CNY");
        try {
            if (lock.tryLock()) {
                // 已经获取到锁,可以进行业务处理
            } else {
                log.info("获取锁失败");
            }
        } finally {
            lock.unlock();
        }
    }
}

所以整个锁的获取流程图如下

在这里插入图片描述

后记

这几天很忙很忙,差点就中断制定的日更博客了,做M功能时的苦难感情戏本来就要登场的,结果一直酝酿不出当时的情绪,感觉写不好,所以就先更新一些我在M项目里面做的一些事情,也算是解析了一点点分布式锁在超长事务里面使用的一些注意事项吧!

今天又看到别人在翻新自己的“沙滩排球”场地,有时候真的羡慕他们呀,没有那么卷的生活,每天都开开心心,还能忙里偷闲做自己喜欢做的事情!
在这里插入图片描述

加油吧littlehow
北京时间:2024-05-28 21:10

金边时间:2024-05-28 20:10

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/655517.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【加密与解密(第四版)】第十四章笔记

第十四章 漏洞分析技术 14.1 软件漏洞原理 缓冲区溢出漏洞&#xff1a;栈溢出 堆溢出、整型溢出&#xff08;存储溢出、计算溢出、符号问题&#xff09; UAF&#xff08;Use-After-Free&#xff09;漏洞 14.2 ShellCode 功能模块&#xff1a;下载执行、捆绑、反弹shell 14.3 …

构造器--5.28

不用一个个属性赋值的方法&#xff1a; 知道了类的创建与使用&#xff0c;但是每次赋值都是一个个调用&#xff0c;我们可以用构造器使得方法简单一点&#xff0c;不用一个个调用属性赋值&#xff0c;直接传参就OK了&#xff1b; 点击类名然后ctrl可以查看构造器 public yanxi…

2024最新(PC+WEB+IOS+Android)即时通讯系统客户端仿默往IM源码下载

2024最新(PCWEBIOSAndroid)即时通讯系统客户端仿默往IM源码下载(总大小约2.4G&#xff09; 系统功能配置灵活、海量并发、稳定可靠、数据安全&#xff0c;2小时快速部署、数据安全、单聊群聊、系统通知等通信功能&#xff0c;支持App、PC、Web等多端快速接入。 群功能&#xf…

Vue热更新出现内存溢出

Vue热更新出现内存溢出 vue-cli2遇到此问题的解决办法&#xff1a;vue-cli3遇到此问题的解决办法&#xff1a;方法一&#xff08;已测试ok&#xff09;方法二&#xff08;未尝试&#xff09; 开发项目有一段时间了&#xff0c;随着项目越来越大&#xff0c;打包的时间也相应的变…

《中国科技投资》是什么级别的期刊?是正规期刊吗?能评职称吗?

问题解答&#xff1a; 问&#xff1a;《中国科技投资》期刊什么级别&#xff1f; 答&#xff1a;国家级 问&#xff1a;《中国科技投资》期刊是核心期刊吗? 答&#xff1a;不是&#xff0c;是万方维普收录的正规期刊。 主管单位&#xff1a;中国信息协会 主办单位&#…

SMB工具横向移动

一. SMB工具介绍和使用 1.介绍 2013年的Defcon上&#xff0c;就引入了smbexec&#xff0c;后续 smbexec 被 Impacket 进一步完善了。在Impacket中支持明文认证&#xff0c;NTLM认证&#xff0c;Aeskey认证等方式&#xff01; 2. 使用方法 命令&#xff1a; smbexec.exe 用户…

【热门话题】CentOS 常见命令指南

&#x1f308;个人主页: 鑫宝Code &#x1f525;热门专栏: 闲话杂谈&#xff5c; 炫酷HTML | JavaScript基础 ​&#x1f4ab;个人格言: "如无必要&#xff0c;勿增实体" 文章目录 CentOS 常见命令指南一、文件与目录操作1. 切换目录2. 查看当前目录3. 列出目录…

学习javascript的函数

1.什么是函数&#xff1f; 可以重复被使用的代码块 作用&#xff1a;函数可以把具有相同或者相似逻辑的代码“包裹起来”&#xff0c;有利于代码的复用。 2.函数的基本使用 1.定义函数 利用关键字Function 定义函数&#xff08;声明函数&#xff09; function 函数名(){函…

《TCP/IP网络编程》(第十一章)进程间通信

进程间通信意味着两个不同的进程间可以交换数据&#xff0c;它使得不同的进程能够协同工作&#xff0c;实现复杂的系统功能。 1.通过管道实现进程间通信 下图是基于 管道&#xff08;PIPE&#xff09; 的进程间通信结构模型 管道不属于进程的资源&#xff0c;属于操作系统的资…

Python | Leetcode Python题解之第104题二叉树的最大深度

题目&#xff1a; 题解&#xff1a; class Solution:def maxDepth(self, root: TreeNode) -> int:if not root: return 0queue, res [root], 0while queue:tmp []for node in queue:if node.left: tmp.append(node.left)if node.right: tmp.append(node.right)queue tmp…

基于Vue+SpirngBoot的博客管理平台的设计与实现(论文+源码)_kaic

摘 要 随着当下社会的发展&#xff0c;互联网已经成为时代的主流&#xff0c;从此进入了互联网时代&#xff0c;对大部分人来说&#xff0c;互联网在日常生活中的应用是越来越频繁&#xff0c;大家都在互联网当中互相交流、学习、娱乐。博客正是扮演这样一个角色。博客已成为当…

《我的阿勒泰》最经典的6句话

这是首部散文影视化改编的作品&#xff0c;剧集里的每一帧画面&#xff0c;都堪比电影大作。 阿勒泰壮丽广阔的风光&#xff0c;如同一幅幅动人的画卷展现在我们面前&#xff0c;让人沉醉其中。李文秀平淡朴实的生活&#xff0c;却溢出了蓬勃的生命力&#xff0c;直击心灵。只…

git将某次提交合并到另一个分支

一、需求背景 将分支b中的某一次提交单独合并到分支a 二、实现方案 需求&#xff1a;将分支b中的某一次提交单独合并到分支a 1.在git上查看指定某次提交的id&#xff0c;如下图所示&#xff1a; 也可以通过git log命令查看提交的id&#xff0c;如下图&#xff1a; git log…

Java Web集成开发环境Eclipse的安装及web项目创建

第一步&#xff1a;下载安装JDK http://t.csdnimg.cn/RzTBXhttp://t.csdnimg.cn/RzTBX 第二步&#xff1a;下载安装Tomcat Tomcat下载安装以及配置_tomcat下载配置-CSDN博客文章浏览阅读2.5k次&#xff0c;点赞2次&#xff0c;收藏13次。Tomcat下载安装及其配置_tomcat下载配…

范罗士、希喂、安德迈爆款宠物空气净化器哪款好?深度对比测评

作为一名深受养猫过敏困扰的铲屎官&#xff0c;我经常提醒新手铲屎官重视家里的空气环境。宠物的浮毛和皮屑不仅会引发过敏&#xff0c;还可能传播细菌和病毒。很多人以为普通空气净化器能解决问题&#xff0c;但这些产品并未针对宠物家庭的特殊需求。经过多次研究和测试&#…

狂暴少帅短视频:成都科成博通文化传媒公司

狂暴少帅短视频&#xff1a;热血与激情的碰撞 在当下这个信息爆炸的时代&#xff0c;短视频以其独特的魅力迅速占领了人们的视线。而在众多短视频创作者中&#xff0c;一位名为“狂暴少帅”的创作者以其独特的风格和引人入胜的内容&#xff0c;赢得了广大网友的喜爱和追捧。今…

MySQL数据库案例实战教程:数据类型、语法与高级查询详解

✨✨ 欢迎大家来访Srlua的博文&#xff08;づ&#xffe3;3&#xffe3;&#xff09;づ╭❤&#xff5e;✨✨ &#x1f31f;&#x1f31f; 欢迎各位亲爱的读者&#xff0c;感谢你们抽出宝贵的时间来阅读我的文章。 我是Srlua小谢&#xff0c;在这里我会分享我的知识和经验。&am…

P7-P9【分配器】【源文件】【OOPvs.GP】

分配器 如何分配&#xff0c;如何释放 源文件 标准库源代码文件VC布局 标准库源代码文件GCC布局 OOP(面向对象编程) VS GP(泛型编程) 这两种编程的区别&#xff1a; 面向对象编程是将数据和方法联系在一起&#xff0c;更注重对不同的对象做出不同的响应&#xff0c;更适合…

浅谈redis未授权漏洞

redis未授权漏洞 利用条件 版本比较高的redis需要修改redis的配置文件&#xff0c;将bind前面#注释符去掉&#xff0c;将protected-mode 后面改为no 写入webshell 读者福利 | CSDN大礼包&#xff1a;《网络安全入门&进阶学习资源包》免费分享&#xff08;安全链接&#xff…