分布式锁—2.Redisson的可重入锁二

大纲

1.Redisson可重入锁RedissonLock概述

2.可重入锁源码之创建RedissonClient实例

3.可重入锁源码之lua脚本加锁逻辑

4.可重入锁源码之WatchDog维持加锁逻辑

5.可重入锁源码之可重入加锁逻辑

6.可重入锁源码之锁的互斥阻塞逻辑

7.可重入锁源码之释放锁逻辑

8.可重入锁源码之获取锁超时与锁超时自动释放逻辑

9.可重入锁源码总结

4.可重入锁源码之WatchDog维持加锁逻辑

(1)异步执行完加锁的lua脚本会触发执行thenApply()里的回调

(2)根据执行加锁lua脚本的返回值来决定是否创建定时调度任务

(3)定时调度任务由Netty的时间轮机制来实现

(4)10秒后会执行定时任务并判断是否要再创建一个定时任务在10秒后执行

如果某个客户端上锁后,过了几分钟都没释放掉这个锁,而锁对应的key一开始的过期时间其实只设置了30秒而已,那么在这种场景下这个锁就不能在上锁的30秒后被自动释放。

RedissonLock中有一个WatchDog机制:

当客户端对myLock这个key成功加锁后,就会创建一个定时任务,这个定时任务会默认10秒后更新myLock这个key的过期时间为30秒。当然,前提是客户端一直持有myLock这个锁,还没有对锁进行释放。只要客户端一直持有myLock这个锁没有对其进行释放,那么就会不断创建一个定时任务,10秒后更新myLock这个key的过期时间。

下面详细介绍加锁成功后的定时任务,是如何每隔10秒去更新锁的过期时间的。

(1)异步执行完加锁的lua脚本会触发执行thenApply()里的回调

异步执行完RedissonLock.tryLockInnerAsync()方法里的加锁lua脚本内容后,会触发执行ttlRemainingFuture的thenApply()方法里的回调。其中加锁lua脚本会返回Long型的ttlRemaining变量,ttlRemainingFuture的thenApply()方法里的回调入参便是这个变量。

具体会先在RedissonLock的tryAcquireAsync()方法中,定义一个RFuture类型的名为ttlRemainingFuture的变量。这个变量封装了当前这个锁对应的key的剩余存活时间,单位毫秒,这个剩余存活时间是在执行完加锁lua脚本时返回的。

然后通过RFuture的thenApply()方法给ttlRemainingFuture添加一个回调。这样当lua脚本执行完成后,就会触发ttlRemainingFuture的thenApply()方法里的回调,而回调里的入参ttlRemaining就是执行加锁lua脚本的返回值。

public class RedissonLock extends RedissonBaseLock {
    protected long internalLockLeaseTime;
    protected final LockPubSub pubSub;
    final CommandAsyncExecutor commandExecutor;
    
    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        //与WatchDog有关的internalLockLeaseTime
        //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager
        //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
        //通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }
    
    //加锁
    @Override
    public void lock() {
        try {
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }
    ...
    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        //线程ID,用来生成设置Hash的值
        long threadId = Thread.currentThread().getId();
        //尝试加锁,此时执行RedissonLock.lock()方法默认传入的leaseTime=-1
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        //ttl为null说明加锁成功
        if (ttl == null) {
            return;
        }      
        //加锁失败时的处理
        ...
    }
    ...
    private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        //默认下waitTime和leaseTime都是-1,下面调用的get方法是来自于RedissonObject的get()方法
        //可以理解为异步转同步:将异步的tryAcquireAsync通过get转同步
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }
    
    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
        if (leaseTime != -1) {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            //默认情况下,由于leaseTime=-1,所以会使用初始化RedissonLock实例时的internalLockLeaseTime
            //internalLockLeaseTime的默认值就是lockWatchdogTimeout的默认值,30秒
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
            //异步执行完tryLockInnerAsync()里加锁lua脚本内容后
            //就会触发执行ttlRemainingFuture.thenApply()里的回调
            //加锁返回的ttlRemaining为null表示加锁成功
            if (ttlRemaining == null) {
                if (leaseTime != -1) {
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    //传入加锁成功的线程ID,启动一个定时任务
                    scheduleExpirationRenewal(threadId);
                }
            }
            return ttlRemaining;
        });
        return new CompletableFutureWrapper<>(f);
    }
    
    //默认情况下,外部传入的leaseTime=-1时,会取lockWatchdogTimeout的默认值=30秒
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return nil; " +
            "end; " +
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return nil; " +
            "end; " +
            "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()),//锁的名字:KEYS[1]
            unit.toMillis(leaseTime),//过期时间:ARGV[1],默认时为30秒
            getLockName(threadId)//ARGV[2],值为UUID + 线程ID
        );
    }
    ...
}

(2)根据执行加锁lua脚本的返回值来决定是否创建定时调度任务

当执行加锁lua脚本的返回值ttlRemaining为null时,则表明锁获取成功。

如果锁获取成功,且指定锁的过期时间,即leaseTime不是默认的-1,那么此时并不会创建定时任务。如果锁获取成功,且没指定锁的过期时间,即leaseTime是默认的-1,那么此时才会创建定时调度任务,并且是根据Netty的时间轮来实现创建。

所以调用scheduleExpirationRenewal()方法会创建一个定时调度任务,只要锁还被客户端持有,定时任务就会不断更新锁对应的key的过期时间。

public class RedissonLock extends RedissonBaseLock {
    ...
    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
        if (leaseTime != -1) {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
            //异步执行完tryLockInnerAsync()里的内容后,就会执行ttlRemainingFuture.thenApply()里的逻辑
            //加锁返回的ttlRemaining为null表示加锁成功
            if (ttlRemaining == null) {
                if (leaseTime != -1) {
                    //如果指定了锁的过期时间,并不会启动定时任务
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    //传入加锁成功的线程ID,启动一个定时任务
                    scheduleExpirationRenewal(threadId);
                }
            }
            return ttlRemaining;
        });
        return new CompletableFutureWrapper<>(f);
    }
    ...
}

(3)定时调度任务由Netty的时间轮机制来实现

Redisson的WatchDog机制底层并不是调度线程池,而是Netty时间轮。

scheduleExpirationRenewal()方法会创建一个定时调度任务TimerTask交给HashedWheelTimer,10秒后执行。

public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
    ...
    protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            try {
                //创建一个更新过期时间的定时调度任务
                renewExpiration();
            } finally {
                if (Thread.currentThread().isInterrupted()) {
                    cancelExpirationRenewal(threadId);
                }
            }
        }
    }
    
    //更新过期时间
    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        //使用了Netty的定时任务机制:HashedWheelTimer + TimerTask + Timeout
        //创建一个更新过期时间的定时调度任务,下面会调用MasterSlaveConnectionManager.newTimeout()方法
        //即创建一个定时调度任务TimerTask交给HashedWheelTimer,10秒后执行
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ...
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        ee.setTimeout(task);
    }
    ...
}

public class MasterSlaveConnectionManager implements ConnectionManager {
    private HashedWheelTimer timer;//Netty的时间轮
    ...
    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        try {
            //给Netty的时间轮添加任务task
            return timer.newTimeout(task, delay, unit);
        } catch (IllegalStateException e) {
            if (isShuttingDown()) {
                return DUMMY_TIMEOUT;
            }
            throw e;
        }
    }
    ...
}

//Netty的时间轮HashedWheelTimer
public class HashedWheelTimer implements Timer {
    private final HashedWheelBucket[] wheel;
    private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
    private final Executor taskExecutor;
    ...
    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        ...
        //启动Worker线程
        start();
        ...
        //封装定时调度任务TimerTask实例到HashedWheelTimeout实例
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }
    ...
    private final class Worker implements Runnable {
        private long tick;//轮次
        ...
        @Override
        public void run() {
            ...
            do {
                final long deadline = waitForNextTick();
                if (deadline > 0) {
                    int idx = (int) (tick & mask);
                    processCancelledTasks();
                    HashedWheelBucket bucket = wheel[idx];
                    //将定时调度任务HashedWheelTimeout转换到时间轮分桶里
                    transferTimeoutsToBuckets();
                    //处理时间轮分桶HashedWheelBucket里的到期任务
                    bucket.expireTimeouts(deadline);
                    tick++;
                }
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
            ...
        }
        
        private void transferTimeoutsToBuckets() {
            for (int i = 0; i < 100000; i++) {
                HashedWheelTimeout timeout = timeouts.poll();
                ...
                long calculated = timeout.deadline / tickDuration;
                timeout.remainingRounds = (calculated - tick) / wheel.length;
                final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
                int stopIndex = (int) (ticks & mask);//对mask取模
                HashedWheelBucket bucket = wheel[stopIndex];
                bucket.addTimeout(timeout);
            }
        }
        ...
    }
    
    private static final class HashedWheelBucket {
        ...
        public void expireTimeouts(long deadline) {
            HashedWheelTimeout timeout = head;
            //process all timeouts
            while (timeout != null) {
                ...
                //对定时调度任务timeout进行过期处理
                timeout.expire();
                ...
            }
        }
        ...
    }
    
    private static final class HashedWheelTimeout implements Timeout, Runnable {
        private final HashedWheelTimer timer;
        ...
        public void expire() {
            ...
            //执行定时调度任务,让Executor执行HashedWheelTimeout的run方法
            timer.taskExecutor.execute(this);
            ...
        }
        
        @Override
        public void run() {
            ...
            //执行定时调度任务
            task.run(this);
            ...
        }
        ...
    }
    ...
}

(4)10秒后会执行定时任务并判断是否要再创建一个定时任务在10秒后执行

该定时任务TimerTask实例会调用RedissonBaseLock的renewExpirationAsync()方法去执行lua脚本,renewExpirationAsync()方法会传入获取到锁的线程ID。

在lua脚本中,会通过Redis的hexists命令判断在key为锁名的Hash值里,field为UUID + 线程ID的映射是否存在。其中KEYS[1]就是锁的名字如myLock,ARGV[2]为UUID + 线程ID。

如果存在,说明获取锁的线程还在持有锁,并没有对锁进行释放。那么就通过Redis的pexpire命令,设置key的过期时间为30秒。

异步执行lua脚本完后,会传入布尔值触发future.whenComplete()方法的回调,回调中会根据布尔值来决定是否继续递归调用renewExpiration()方法。

也就是说,如果获取锁的线程还在持有锁,那么就重置锁的过期时间为30秒,并且lua脚本会返回1,接着在future.whenComplete()方法的回调中,继续调用RedissonBaseLock的renewExpiration()方法重新创建定时调度任务。

如果获取锁的线程已经释放了锁,那么lua脚本就会返回0,接着在future.whenComplete()方法的回调中,会调用RedissonBaseLock的cancelExpirationRenewal()方法执行清理工作。

public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
    ...
    protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            try {
                //创建一个更新过期时间的定时调度任务
                renewExpiration();
            } finally {
                if (Thread.currentThread().isInterrupted()) {
                    cancelExpirationRenewal(threadId);
                }
            }
        }
    }
    
    //更新过期时间
    private void renewExpiration() {
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        //使用了Netty的定时任务机制:HashedWheelTimer + TimerTask + Timeout
        //创建一个更新过期时间的定时调度任务,下面会调用MasterSlaveConnectionManager.newTimeout()方法
        //即创建一个定时调度任务TimerTask交给HashedWheelTimer,10秒后执行
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                //异步执行lua脚本去更新锁的过期时间
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.whenComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getRawName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    //res就是执行renewExpirationAsync()里的lua脚本的返回值
                    if (res) {
                        //重新调度自己
                        renewExpiration();
                    } else {
                        //执行清理工作
                        cancelExpirationRenewal(null);
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        ee.setTimeout(task);
    }
    
    protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        //其中KEYS[1]就是锁的名字如myLock,ARGV[2]为UUID+线程ID;
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
            Collections.singletonList(getRawName()),
            internalLockLeaseTime, getLockName(threadId));
        }
    }
    
    protected void cancelExpirationRenewal(Long threadId) {
        ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (task == null) {
            return;
        }
        if (threadId != null) {
            task.removeThreadId(threadId);
        }
        if (threadId == null || task.hasNoThreads()) {
            Timeout timeout = task.getTimeout();
            if (timeout != null) {
                timeout.cancel();
            }
            EXPIRATION_RENEWAL_MAP.remove(getEntryName());
        }
    }
    ...
}

注意:如果持有锁的机器宕机了,就会导致该机器上的锁的WatchDog不执行了。从而锁的key会在30秒内自动过期,释放掉锁。此时其他客户端最多再等30秒就可获取到这把锁了。

5.可重入锁源码之可重入加锁逻辑

(1)核心一是key的命名:第一层key是锁名,第二层field是UUID + 线程ID

(2)核心二是Redis的exists命令、hexists命令、hincrby命令

public class Application {
    public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");
        //创建RedissonClient实例
        RedissonClient redisson = Redisson.create(config);
        //获取可重入锁
        RLock lock = redisson.getLock("myLock");
        //首次加锁
        lock.lock();
        //重入加锁
        lock.lock();
        //重入加锁
        lock.lock();
        lock.unlock();
        lock.unlock();
        lock.unlock();
        ...
    }
}

public class RedissonLock extends RedissonBaseLock {
    ...
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return nil; " +
            "end; " +
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return nil; " +
            "end; " +
            "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()),//锁的名字:KEYS[1]
            unit.toMillis(leaseTime),//过期时间:ARGV[1],默认时为30秒
            getLockName(threadId)//ARGV[2],值为UUID + 线程ID
        );
    }
    ...
}

实现可重入加锁的核心就是加锁的lua脚本。

(1)核心一是key的命名:第一层key是锁名,第二层field是UUID+线程ID

KEYS[1]的值就是锁的名字,ARGV[2]的值就是客户端UUID + 线程ID。由于第二层field包含了线程ID,所以可以区分申请加锁的线程是否在重入。对在key为锁名的Hash值中,field为UUID + 线程ID的value值进行递增1操作。

(2)核心二是Redis的exists命令、hexists命令、hincrby命令

判断锁是否存在,使用的是Redis的exists命令。判断锁是否正在被线程重入,则使用的是Redis的hexists命令。对field映射的value值进行递增,使用的是Redis的hincrby命令。

所以,持有锁的线程每重入锁一次,就会对field映射的value值递增1。比如:hincrby key UUID:ThreadID 1。

6.可重入锁源码之锁的互斥阻塞逻辑

(1)获取锁失败时会执行Redis的pttl命令返回锁的剩余存活时间

(2)ttlRemainingFuture的回调方法发现ttlRemaining不是null

(3)RedissonLock的tryAcquire()方法会返回这个剩余存活时间ttl

(4)RedissonLock的lock()方法会对tryAcquire()方法返回的ttl进行判断

(5)RedissonLock的lock()方法在ttl不为null时利用Semaphore进行阻塞

(1)获取锁失败时会执行Redis的pttl命令返回锁的剩余存活时间

在执行加锁的lua脚本中:首先通过"exists myLock",判断是否存在myLock这个锁key,发现已经存在。接着判断"hexists myLock 另一个UUID:另一个Thread的ID",发现不存在。于是执行"pttl myLock"并返回,也就是返回myLock这个锁的剩余存活时间。

(2)ttlRemainingFuture的回调方法发现ttlRemaining不是null

当执行加锁的lua脚本发现加锁不成功,返回锁的剩余存活时间时,ttlRemainingFuture的回调方法发现ttlRemaining不是null,于是便不会创建定时调度任务在10秒之后去检查锁是否还被申请的线程持有,而是会将返回的剩余存活时间封装到RFuture中并向上返回。

(3)RedissonLock的tryAcquire()方法会返回这个剩余存活时间ttl

主要会通过RedisObject的get()方法去获取RFuture中封装的ttl值。其中异步转同步就是将异步的tryAcquireAsync()方法通过get()方法转同步。

(4)RedissonLock的lock()方法会对tryAcquire()方法返回的ttl进行判断

如果当前线程是第一次加锁,那么ttl一定是null。如果当前线程是多次加锁(可重入锁),那么ttl也一定是null。如果当前线程加锁没成功,锁被其他机器或线程占用,那么ttl就是执行加锁lua脚本时获取到的锁对应的key的剩余存活时间。

如果ttl是null,说明当前线程加锁成功,于是直接返回。如果ttl不是null,说明当前线程加锁不成功,于是会执行阻塞逻辑。

(5)RedissonLock的lock()方法在ttl不为null时利用Semaphore进行阻塞

如果ttl不为null,即加锁不成功,那么会进入while(true)死循环。在死循环内,再次执行RedissonLock的tryAcquire()方法尝试获取锁。如果再次获取锁时返回的ttl为null,即获取到锁,就退出while循环。如果再次获取锁时返回的ttl不为null,则说明其他客户端或线程还持有锁。

于是就利用同步组件Semaphore进行阻塞等待一段ttl的时间。阻塞等待一段时间后,会继续执行while循环里的逻辑,再次尝试获取锁。以此循环往复,直到获得锁。

public class RedissonLock extends RedissonBaseLock {
    protected long internalLockLeaseTime;
    protected final LockPubSub pubSub;
    final CommandAsyncExecutor commandExecutor;
    
    public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
        super(commandExecutor, name);
        this.commandExecutor = commandExecutor;
        //与WatchDog有关的internalLockLeaseTime
        //通过命令执行器CommandExecutor可以获取连接管理器ConnectionManager
        //通过连接管理器ConnectionManager可以获取Redis的配置信息类Config
        //通过Redis的配置信息类Config可以获取lockWatchdogTimeout超时时间
        this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
        this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
    }
    ...
    //加锁
    @Override
    public void lock() {
        try {
            lock(-1, null, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException();
        }
    }
    
    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        //线程ID,用来生成设置Hash的值
        long threadId = Thread.currentThread().getId();
        //尝试加锁,此时执行RedissonLock.lock()方法默认传入的leaseTime=-1
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        //ttl为null说明加锁成功
        if (ttl == null) {
            return;
        }
      
        //加锁失败时的处理
        CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
        if (interruptibly) {
            commandExecutor.syncSubscriptionInterrupted(future);
        } else {
            commandExecutor.syncSubscription(future);
        }

        try {
            while (true) {
                //再次尝试获取锁
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                //返回的ttl为null,获取到锁,就退出while循环
                if (ttl == null) {
                    break;
                }
                //返回的ttl不为null,则说明其他客户端或线程还持有锁
                //那么就利用同步组件Semaphore进行阻塞等待一段ttl的时间
                if (ttl >= 0) {
                    try {
                        commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        commandExecutor.getNow(future).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                } else {
                    if (interruptibly) {
                        commandExecutor.getNow(future).getLatch().acquire();
                    } else {
                        commandExecutor.getNow(future).getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(commandExecutor.getNow(future), threadId);
        }
    }
    ...
    private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        //默认下waitTime和leaseTime都是-1,下面调用的get方法是来自于RedissonObject的get()方法
        //可以理解为异步转同步:将异步的tryAcquireAsync通过get转同步
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }
    
    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
        if (leaseTime != -1) {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            //默认情况下,由于leaseTime=-1,所以会使用初始化RedissonLock实例时的internalLockLeaseTime
            //internalLockLeaseTime的默认值就是lockWatchdogTimeout的默认值,30秒
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
            //加锁返回的ttlRemaining为null表示加锁成功
            if (ttlRemaining == null) {
                if (leaseTime != -1) {
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    scheduleExpirationRenewal(threadId);
                }
            }
            return ttlRemaining;
        });
        return new CompletableFutureWrapper<>(f);
    }
    
    //默认情况下,外部传入的leaseTime=-1时,会取lockWatchdogTimeout的默认值=30秒
    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return nil; " +
            "end; " +
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return nil; " +
            "end; " +
            "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()),//锁的名字:KEYS[1]
            unit.toMillis(leaseTime),//过期时间:ARGV[1],默认时为30秒
            getLockName(threadId)//ARGV[2],值为UUID + 线程ID
        );
    }
    ...
}

public class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {
    private final Semaphore latch;
    ...
    public Semaphore getLatch() {
        return latch;
    }
    ...
}

7.可重入锁源码之释放锁逻辑

(1)宕机自动释放锁

(2)线程主动释放锁的流程

(3)主动释放锁的lua脚本分析

(1)宕机自动释放锁

如果获取锁的所在机器宕机,那么10秒后检查锁是否还在被线程持有的定时调度任务就没了。于是锁对应Redis里的key,就会在最多30秒内进行过期删除。之后,其他的客户端就能成功获取锁了。

当然,前提是创建锁的时候没有设置锁的存活时间leaseTime。如果指定了leaseTime,那么就不会存在10秒后检查锁的定时调度任务。此时获取锁的所在机器宕机,那么锁对应的key会在最多leaseTime后过期。

(2)线程主动释放锁的流程

主动释放锁调用的是RedissonLock的unlock()方法。在RedissonLock的unlock()方法中,会调用get(unlockAsync())方法。也就是首先调用RedissonBaseLock的unlockAsync()方法,然后再调用RedissonObject的get()方法。

其中unlockAsync()方法是异步化执行的方法,释放锁的操作就是异步执行的。而RedisObject的get()方法会通过RFuture同步等待获取异步执行的结果,可以将get(unlockAsync())理解为异步转同步。

在RedissonBaseLock的unlockAsync()方法中:首先会调用RedissonLock的unlockInnerAsync()方法进行异步释放锁,然后当完成释放锁的处理后,再通过异步去取消定时调度任务。

public class Application {
    public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");
        //创建RedissonClient实例
        RedissonClient redisson = Redisson.create(config);
        //获取可重入锁
        RLock lock = redisson.getLock("myLock");
        lock.lock();//获取锁
        lock.unlock();//释放锁
        ...
    }
}

public class RedissonLock extends RedissonBaseLock {
    ...
    @Override
    public void unlock() {
        ...
        //异步转同步
        //首先调用的是RedissonBaseLock的unlockAsync()方法
        //然后调用的是RedissonObject的get()方法
        get(unlockAsync(Thread.currentThread().getId()));
        ...
    }
    
    //异步执行释放锁的lua脚本
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(
            getRawName(), 
            LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return nil;",
            Arrays.asList(getRawName(), getChannelName()),//KEYS[1] + KEYS[2]
            LockPubSub.UNLOCK_MESSAGE,//ARGV[1]
            internalLockLeaseTime,//ARGV[2]
            getLockName(threadId)//ARGV[3]
        );
    }
    ...
}

public abstract class RedissonBaseLock extends RedissonExpirable implements RLock {
    ...
    @Override
    public RFuture<Void> unlockAsync(long threadId) {
        //异步执行释放锁的lua脚本
        RFuture<Boolean> future = unlockInnerAsync(threadId);
        CompletionStage<Void> f = future.handle((opStatus, e) -> {
            //取消定时调度任务
            cancelExpirationRenewal(threadId);
            if (e != null) {
                throw new CompletionException(e);
            }
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId);
                throw new CompletionException(cause);
            }
            return null;
        });
        return new CompletableFutureWrapper<>(f);
    }
    ...
}

abstract class RedissonExpirable extends RedissonObject implements RExpirable {
    ...
    RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {
        super(connectionManager, name);
    }
    ...
}

public abstract class RedissonObject implements RObject {
    protected final CommandAsyncExecutor commandExecutor;
    protected String name;
    protected final Codec codec;
    
    public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
        this.codec = codec;
        this.commandExecutor = commandExecutor;
        if (name == null) {
            throw new NullPointerException("name can't be null");
        }
        setName(name);
    }
    ...
    protected final <V> V get(RFuture<V> future) {
        //下面会调用CommandAsyncService.get()方法
        return commandExecutor.get(future);
    }
    ...
}

public class CommandAsyncService implements CommandAsyncExecutor {
    ...
    @Override
    public <V> V get(RFuture<V> future) {
        if (Thread.currentThread().getName().startsWith("redisson-netty")) {
            throw new IllegalStateException("Sync methods can't be invoked from async/rx/reactive listeners");
        }
        try {
            return future.toCompletableFuture().get();
        } catch (InterruptedException e) {
            future.cancel(true);
            Thread.currentThread().interrupt();
            throw new RedisException(e);
        } catch (ExecutionException e) {
            throw convertException(e);
        }
    }
    ...
}

(3)主动释放锁的lua脚本分析

首先判断在key为锁名的Hash值中,field为UUID + 线程ID的映射是否存在。如果不存在,则表示锁已经被释放了,直接返回。如果存在,则在key为锁名的Hash值中对field为UUID + 线程ID的value值递减1。即调用Redis的hincrby命令,进行递减1处理。

接着对递减1后的结果进行如下判断:如果递减1后的结果大于0,则表示线程还在持有锁。这对应于持有锁的线程多次重入锁,此时需要重置锁的过期时间为30秒。如果递减1后的结果小于0,则表示线程不再持有锁,于是删除锁对应的key,并且通过Redis的publish命令,发布一个事件。

public class RedissonLock extends RedissonBaseLock {
    ...
    //异步执行释放锁的lua脚本
    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(
            getRawName(), 
            LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return nil;",
            Arrays.asList(getRawName(), getChannelName()),//KEYS[1] + KEYS[2]
            LockPubSub.UNLOCK_MESSAGE,//ARGV[1]
            internalLockLeaseTime,//ARGV[2]
            getLockName(threadId)//ARGV[3]
        );
    }
    ...
}

8.可重入锁源码之获取锁超时与锁超时自动释放逻辑

(1)尝试获取锁超时

(2)锁超时自动释放

针对如下代码方式去获取锁,如果超过60秒获取不到锁,就自动放弃获取锁,不会进行永久性阻塞。如果获取到锁之后,锁在10秒内没有被主动释放,那么就会自动释放锁。

public class Application {
    public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.useClusterServers().addNodeAddress("redis://192.168.1.110:7001");
        //创建RedissonClient实例
        RedissonClient redisson = Redisson.create(config);
        //获取可重入锁
        RLock lock = redisson.getLock("myLock");
        boolean res = lock.tryLock(60, 10, TimeUnit.SECONDS);//尝试获取锁
        if (res) lock.unlock();//尝试获取锁成功,则释放锁
        ...
    }
}

(1)尝试获取锁超时

RedissonLock的tryLock()方法,会传入waitTime和leaseTime尝试获取锁。其中waitTime就是最多可以等待多少时间去获取锁,比如60秒。leaseTime就是获取锁后锁的自动过期时间,比如10秒。

如果第一次获取锁的时间超过了等待获取锁的最大时间waitTime,那么就会返回false。

如果第一次获取锁的时间还没超过等待获取锁的最大时间waitTime,那么就进入while循环,不断地再次尝试获取锁。

如果再次尝试获取锁成功,那么就返回true。如果再次尝试获取锁失败,那么计算还剩下多少时间可以继续等待获取锁。也就是使用time去自减每次尝试获取锁的耗时以及自减每次等待的耗时。

如果发现time小于0,表示等待了waitTime都无法获取锁,则返回false。如果发现time大于0,继续下一轮while循环,尝试获取锁 + time自减耗时。

public class RedissonLock extends RedissonBaseLock {
    ...
    @Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);//最多可以等待多少时间去获取锁
        long current = System.currentTimeMillis();//current是第一次尝试获取锁之前的时间戳
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
        //尝试获取锁成功
        if (ttl == null) {
            return true;
        }
        //第一次尝试获取锁失败
        //当前时间减去第一次获取锁之前的时间戳,就是这次尝试获取锁的耗费时间,使用time进行自减
        time -= System.currentTimeMillis() - current;
        //如果第一次获取锁的时间超过了waitTime等待获取锁的最大时间,那么就会直接返回获取锁失败
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
        //如果第一次获取锁的时间还没达到waitTime等待获取锁的最大时间
        current = System.currentTimeMillis();
        CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        try {
            subscribeFuture.toCompletableFuture().get(time, TimeUnit.MILLISECONDS);
        } catch (ExecutionException | TimeoutException e) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.whenComplete((res, ex) -> {
                    if (ex == null) {
                        unsubscribe(res, threadId);
                    }
                });
            }
            acquireFailed(waitTime, unit, threadId);
            return false;
        }

        try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
                //再次尝试获取锁成功
                if (ttl == null) {
                    return true;
                }
                //剩余可用等待的时间,使用time去自减每次尝试获取锁的耗时
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
                //返回的ttl不为null,则说明其他客户端或线程还持有锁
                //那么就利用同步组件Semaphore进行阻塞等待一段ttl的时间
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
                //使用time去自减每次等待的耗时
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(waitTime, unit, threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
        }
    }
    ...
}

(2)锁超时自动释放

当使用RedissonLock.tryAcquire方法尝试获取锁时,传入的leaseTime不是-1,而是一个指定的锁存活时间,那么锁超时就会自动被释放。

当leaseTime不是-1时:

一.锁的过期时间就不是lockWatchdogTimeout=30秒,而是leaseTime

二.执行加锁lua脚本成功后,不会创建一个定时调度任务在10秒后检查锁

总结:当指定了leaseTime后,那么锁最多在leaseTime后,就必须被删除。因为此时没有定时调度任务去检查锁释放还在被线程持有并更新过期时间。所以,锁要么被主动删除,要么在存活leaseTime后自动过期。

public class RedissonLock extends RedissonBaseLock {
    ...
    private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        //默认下waitTime和leaseTime都是-1,下面调用的get方法是来自于RedissonObject的get()方法
        //可以理解为异步转同步:将异步的tryAcquireAsync通过get转同步
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }
    
    private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
        if (leaseTime != -1) {
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            //默认情况下,由于leaseTime=-1,所以会使用初始化RedissonLock实例时的internalLockLeaseTime
            //internalLockLeaseTime的默认值就是lockWatchdogTimeout的默认值,30秒
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
            //加锁返回的ttlRemaining为null表示加锁成功
            if (ttlRemaining == null) {
                if (leaseTime != -1) {
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    scheduleExpirationRenewal(threadId);
                }
            }
            return ttlRemaining;
        });
        return new CompletableFutureWrapper<>(f);
    }
    ...
}

9.可重入锁源码总结

(1)加锁

(2)WatchDog维持加锁

(3)可重入锁

(4)锁互斥

(5)手动释放锁

(6)宕机自动释放锁

(7)尝试加锁超时

(8)超时锁自动释放

(1)加锁

在Redis里设置两层Hash数据结构,默认的过期时间是30秒。第一层Hash值的key是锁名,第二层Hash值的key是UUID + 线程ID。涉及Redis的exists命令、hexists命令、hincrby命令。

(2)WatchDog维持加锁

如果获取锁的线程一直持有锁,那么Redis里的key就会一直保持存活。获取锁成功时会创建一个定时任务10秒后检查锁是否还被线程持有。如果线程还在持有锁,就会重置key的过期时间为30秒,并且创建一个新的定时任务在10秒后继续检查锁是否还被线程持有。

(3)可重入锁

同一个线程可以多次加锁,对第二层的key为UUID + 线程ID的Hash值,每次获取锁就进行累加1。

(4)锁互斥

不同线程尝试加锁时,会由于第二层Hash的key不同,而导致加锁不成功。也就是执行加锁lua脚本时会返回锁的剩余过期时间ttl。然后利用同步组件Semaphore进行阻塞等待一段ttl时间。阻塞等待一段时间后,继续在while循环里再次尝试获取锁。以此循环等待 + 尝试,直到获得锁。

(5)手动释放锁

使用hincrby命令对第二层key(UUID + 线程ID)的Hash值递减1。递减1后还大于0表示锁被重入,需要重置锁的过期时间。递减1后小于等于0表示锁释放完毕,需要删除锁key及取消定时调度任务。

(6)宕机自动释放锁

如果持有锁的客户端宕机,那么锁的WatchDog定时调度任务也没了。此时不会重置锁key的过期时间,于是锁key会自动释放。

(7)尝试加锁超时

在while循环中不断地尝试获取锁。使用time表示还可以等待获取锁的时间。每次循环都对time:自减尝试获取锁的耗时 + 自减每次等待的耗时。在指定时间内没有成功加锁(即time小于0),就退出循环,表示加锁失败。

(8)超时锁自动释放

当指定了leaseTime时,如果获取锁没有在leaseTime内手动释放锁,那么Redis里的锁key会自动过期,自动释放锁。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/980936.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

视频教育网站开源系统的部署安装 (roncoo-education)服务器为ubuntu22.04.05

一、说明 前端技术体系&#xff1a;Vue3 Nuxt3 Vite5 Vue-Router Element-Plus Pinia Axios 后端技术体系&#xff1a;Spring Cloud Alibaba2021 MySQL8 Nacos Seata Mybatis Druid redis 后端系统&#xff1a;roncoo-education&#xff08;核心框架&#xff1a;S…

线程相关八股

1. 线程和进程的区别&#xff1f; 进程&#xff1a;进程可以简单理解为进行一个程序&#xff0c;比如说我们打开一个浏览器&#xff0c;打开一个文本&#xff0c;这就是开启了一个进程&#xff0c;一个进程想要在计算机中运行&#xff0c;需要将程序交给CPU&#xff0c;将数据…

Python 绘制迷宫游戏,自带最优解路线

1、需要安装pygame 2、上下左右移动&#xff0c;空格实现物体所在位置到终点的路线&#xff0c;会有虚线绘制。 import pygame import random import math# 迷宫单元格类 class Cell:def __init__(self, x, y):self.x xself.y yself.walls {top: True, right: True, botto…

【音视频】VLC播放器

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 一、vlc是什么&#xff1f; VLC Media Player&#xff08;简称VLC&#xff09;是一款免费、开源、跨平台的多媒体播放器&#xff0c;由非营利组织VideoLAN开发&#xff0c;最…

vue2+ele-ui实践

前言&#xff1a;真理先于实践&#xff0c;实践发现真理&#xff0c;再实践检验真理 环境&#xff1a;vue2 & element-ui 正片&#xff1a; Select 选择器 简称 下拉框 下拉框完整的使用循环 下拉框 → 点击下拉框 → 展示数据 → 选择数据 → 下拉框显示数据 核心具有…

刷题日记——部分二分算法题目分享

前言 咱们紧跟上一期结合时间复杂度浅谈二分法的好处, 并分享部分二分题目(将持续更新题目,绝对值你一个收藏)-CSDN博客 笔者接着分享一些刷过的关于二分算法的题目. 第一题 1283. 使结果不超过阈值的最小除数 - 力扣&#xff08;LeetCode&#xff09; 这道题就是典型的二…

excel 斜向拆分单元格

右键-合并单元格 右键-设置单元格格式-边框 在设置好分割线后&#xff0c;你可以开始输入文字。 需要注意的是&#xff0c;文字并不会自动分成上下两行。 为了达到你期望的效果&#xff0c;你可以通过 同过左对齐、上对齐 空格键或使用【AltEnter】组合键来调整单元格中内容的…

关于常规模式下运行VScode无法正确执行“pwsh”问题

前言&#xff1a; pwsh在系统环境中正确配置&#xff0c;且可以运行在cmd&#xff0c; powshell&#xff08;5.1&#xff09;--- 都需要在管理员权限下运行 &#xff08;打开setting&#xff09; 打开setting.json &#xff08;在vscode中添加 powershell 7 路径&…

企微审批中MySQL字段TEXT类型被截断的排查与修复实践

在MySQL中&#xff0c;TEXT类型字段常用于存储较大的文本数据&#xff0c;但在一些应用场景中&#xff0c;当文本内容较大时&#xff0c;TEXT类型字段可能无法满足需求&#xff0c;导致数据截断或插入失败。为了避免这种问题&#xff0c;了解不同文本类型&#xff08;如TEXT、M…

异常 PipeMapRed.waitOutputThreads(): subprocess failed with code 127

直接放问题异常 hadoop jar /opt/module/hadoop-3.3.2/share/hadoop/tools/lib/hadoop-streaming-3.3.2.jar \ -D mapreduce.map.memory.mb100 \ -D mapreduce.reduce.memory.mb100 \ -D mapred.map.tasks1 \ -D stream.num.map.output.key.fields2 \ -D num.key.fields.for.pa…

Focal Loss (聚焦损失) :解决类别不平衡与难易样本的利器,让模型学会“重点学习”

1. 为什么需要Focal Loss&#xff1f; 2. 交叉熵损失的问题 3.Focal Loss的智慧&#xff1a;给不同的错误“区别对待” 4.代码演示 1. 为什么需要Focal Loss&#xff1f; 在机器学习和深度学习中&#xff0c;类别不平衡&#xff08;Class Imbalance&#xff09; 是一个普遍…

算法系列之数据结构-二叉树

在计算机科学中&#xff0c;数据结构是组织和存储数据的方式&#xff0c;以便能够高效地访问和修改数据。树&#xff08;Tree&#xff09;是一种非常重要的非线性数据结构&#xff0c;广泛应用于各种算法和应用中。本文将详细介绍树的基本概念、常见类型以及用Java实现树的遍历…

进来了解一下python的深浅拷贝

深浅拷贝是什么&#xff1a;在Python中&#xff0c;理解深拷贝&#xff08;deep copy&#xff09;和浅拷贝&#xff08;shallow copy&#xff09;对于处理复杂的数据结构&#xff0c;如列表、字典或自定义对象&#xff0c;是非常重要的。这两种拷贝方式决定了数据在内存中的复制…

磁盘空间不足|如何安全清理以释放磁盘空间(开源+节流)

背景&#xff1a; 最近往数据库里存的东西有点多&#xff0c;磁盘不够用 查看磁盘使用情况 df -h /dev/sda5&#xff08;根目录 /&#xff09; 已使用 92% 咱们来开源节流 目录 背景&#xff1a; 一、开源 二、节流 1.查找 大于 500MB 的文件&#xff1a; 1. Snap 缓存…

vue3学习-2(深入组件)

vue3学习-2&#xff08;深入组件&#xff09; 1.开始2.基础3.深入组件注册全局注册局部注册组件名格式 PropsProps 声明响应式 Props 解构 3.5将解构的 props 传递到函数中单向数据流更改对象 / 数组类型的 propsProp 校验 事件触发与监听事件事件参数声明触发的事件事件校验 组…

Java 入门 (超级详细)

一、什么是Java Java是一种高级编程语言&#xff0c;由Sun Microsystems公司于1995年推出。Java具有跨平台性、面向对象、健壮性、安全性、可移植性等特点&#xff0c;被广泛应用于企业级应用开发、移动应用开发、大数据处理、云计算等领域。Java程序可以在不同的操作系统上运…

23种设计模式之工厂方法模式(Factory Method Pattern)【设计模式】

文章目录 一、工厂方法模式简介二、关键点三、代码示例3.1 定义抽象产品3.2 实现具体产品3.3 创建抽象工厂3.4 实现具体工厂3.5 客户端代码 四、解释五、优缺点5.1 优点5.2 缺点 六、适用场景 一、工厂方法模式简介 工厂方法模式&#xff08;Factory Method Pattern&#xff0…

io学习----->标准io

思维导图&#xff1a; 一.io的作用 io是实现对文件的操作&#xff0c;把运行结果存到文件中&#xff0c;读取文件的数据&#xff0c;方便后期查询。 二.io的概念 io是指系统 和外部设备或用户之间的数据交互 I:input 表示数据从外部设备输入到内存中&#xff1b; O:output…

从 R1 到 Sonnet 3.7,Reasoning Model 首轮竞赛中有哪些关键信号?

DeepSeek R1 催化了 reasoning model 的竞争&#xff1a;在过去的一个月里&#xff0c;头部 AI labs 已经发布了三个 SOTA reasoning models&#xff1a;OpenAI 的 o3-mini 和deep research&#xff0c; xAI 的 Grok 3 和 Anthropic 的 Claude 3.7 Sonnet。随着头部 Al labs 先…

FPGA开发,使用Deepseek V3还是R1(7):以“FPGA的整体设计框架”为例

以下都是Deepseek生成的答案 FPGA开发&#xff0c;使用Deepseek V3还是R1&#xff08;1&#xff09;&#xff1a;应用场景 FPGA开发&#xff0c;使用Deepseek V3还是R1&#xff08;2&#xff09;&#xff1a;V3和R1的区别 FPGA开发&#xff0c;使用Deepseek V3还是R1&#x…