分布式锁—7.Curator的分布式锁

大纲

1.Curator的可重入锁的源码

2.Curator的非可重入锁的源码

3.Curator的可重入读写锁的源码

4.Curator的MultiLock源码

5.Curator的Semaphore源码

1.Curator的可重入锁的源码

(1)InterProcessMutex获取分布式锁

(2)InterProcessMutex的初始化

(3)InterProcessMutex.acquire()尝试获取锁

(4)LockInternals.attemptLock()尝试获取锁

(5)不同客户端线程获取锁时的互斥实现

(6)同一客户端线程可重入加锁的实现

(7)客户端线程释放锁的实现

(8)客户端线程释放锁后其他线程获取锁的实现

(9)InterProcessMutex就是一个公平锁

(1)InterProcessMutex获取分布式锁

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181", 
            5000, 
            3000, 
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端");
        
        //获取分布式锁
        InterProcessMutex lock = new InterProcessMutex(client, "/locks/myLock");
        lock.acquire();
        Thread.sleep(1000);
        lock.release();
    }
}

(2)InterProcessMutex的初始化

设置锁的节点路径basePath + 初始化一个LockInternals对象实例。

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
    private final LockInternals internals;
    private final String basePath;
    private static final String LOCK_NAME = "lock-";
    ...
    public InterProcessMutex(CuratorFramework client, String path) {
        this(client, path, new StandardLockInternalsDriver());
    }
    
    public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver) {
        this(client, path, LOCK_NAME, 1, driver);
    }
    
    //初始化InterProcessMutex
    InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
        //1.设置锁的节点路径
        basePath = PathUtils.validatePath(path);
        //2.初始化一个LockInternals对象实例
        internals = new LockInternals(client, driver, path, lockName, maxLeases);
    }
}

public class LockInternals {
    private final LockInternalsDriver driver;
    private final String lockName;
    private volatile int maxLeases;
    private final WatcherRemoveCuratorFramework client;
    private final String basePath;
    private final String path;
    ...
    LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {
        this.driver = driver;
        this.lockName = lockName;
        this.maxLeases = maxLeases;
        this.client = client.newWatcherRemoveCuratorFramework();
        this.basePath = PathUtils.validatePath(path);
        this.path = ZKPaths.makePath(path, lockName);
    }
    ...
}

(3)InterProcessMutex.acquire()尝试获取锁

LockData是InterProcessMutex的一个静态内部类。一个线程对应一个LockData实例对象,用来描述线程持有的锁的具体情况。多个线程对应的LockData存放在一个叫threadData的ConcurrentMap中。LockData中有一个原子变量lockCount,用于锁的重入次数计数。

在执行InterProcessMutex的acquire()方法尝试获取锁时:首先会尝试取出当前线程对应的LockData数据,判断是否存在。如果存在,则说明锁正在被当前线程重入,重入次数自增后直接返回。如果不存在,则调用LockInternals的attemptLock()方法尝试获取锁。默认情况下,attemptLock()方法传入的等待获取锁的时间time = -1。

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
    private final LockInternals internals;
    private final String basePath;
    private static final String LOCK_NAME = "lock-";
    //一个线程对应一个LockData数据对象
    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
    ...
    //初始化InterProcessMutex
    InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
        //设置锁的路径
        basePath = PathUtils.validatePath(path);
        //初始化LockInternals
        internals = new LockInternals(client, driver, path, lockName, maxLeases);
    }
    
    @Override
    public void acquire() throws Exception {
        //获取分布式锁,会一直阻塞等待直到获取成功
        //相同的线程可以重入锁,每一次调用acquire()方法都要匹配一个release()方法的调用
        if (!internalLock(-1, null)) {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }
    
    private boolean internalLock(long time, TimeUnit unit) throws Exception {
        //获取当前线程
        Thread currentThread = Thread.currentThread();
        //获取当前线程对应的LockData数据
        LockData lockData = threadData.get(currentThread);
        if (lockData != null) {
            //可重入计算
            lockData.lockCount.incrementAndGet();
            return true;
        }
        //调用LockInternals.attemptLock()方法尝试获取锁,默认情况下,传入的time=-1,表示等待获取锁的时间
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if (lockPath != null) {
            //获取锁成功,将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象
            LockData newLockData = new LockData(currentThread, lockPath);
            //然后把该LockData对象存放到InterProcessMutex.threadData这个Map中
            threadData.put(currentThread, newLockData);
            return true;
        }
        return false;
    }
    
    //LockData是InterProcessMutex的一个静态内部类
    private static class LockData {
        final Thread owningThread;
        final String lockPath;
        final AtomicInteger lockCount = new AtomicInteger(1);//用于锁的重入次数计数
        private LockData(Thread owningThread, String lockPath) {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }
    
    protected byte[] getLockNodeBytes() {
        return null;
    }
    ...
}

(4)LockInternals.attemptLock()尝试获取锁

先创建临时节点,再判断是否满足获取锁的条件。

步骤一:首先调用LockInternalsDriver的createsTheLock()方法创建一个临时顺序节点。其中creatingParentContainersIfNeeded()表示级联创建,forPath(path)表示创建的节点路径名称,withMode(CreateMode.EPHEMERAL_SEQUENTIAL)表示临时顺序节点。

步骤二:然后调用LockInternals的internalLockLoop()方法检查是否获取到了锁。在LockInternals的internalLockLoop()方法的while循环中,会先获取排好序的客户端线程尝试获取锁时创建的临时顺序节点名称列表。然后获取当前客户端线程尝试获取锁时创建的临时顺序节点的名称,再根据名称获取在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径,也就是获取一个封装好这些信息的PredicateResults对象。

具体会根据节点名称获取当前线程创建的临时顺序节点在节点列表的位置,然后会比较当前线程创建的节点的位置和maxLeases的大小。其中maxLeases代表了同时允许多少个客户端可以获取到锁,默认是1。如果当前线程创建的节点的位置小,则表示可以获取锁。如果当前线程创建的节点的位置大,则表示获取锁失败。

获取锁成功,则会中断LockInternals的internalLockLoop()方法的while循环,然后向外返回当前客户端线程创建的临时顺序节点路径。接着在InterProcessMutex的internalLock()方法中,会将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象,然后把该LockData对象存放到InterProcessMutex.threadData这个Map中。

获取锁失败,则通过PredicateResults对象先获取前一个节点路径名称。然后通过getData()方法获取前一个节点路径在zk的信息,并添加Watcher监听。该Watcher监听主要是用来唤醒在LockInternals中被wait()阻塞的线程。添加完Watcher监听后,便会调用wait()方法将当前线程挂起。

所以前一个节点发生变化时,便会通知添加的Watcher监听。然后便会唤醒阻塞的线程,继续执行internalLockLoop()方法的while循环。while循环又会继续获取排序的节点列表 + 判断当前线程是否已获取锁。

public class LockInternals {
    private final LockInternalsDriver driver;
    LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {
        this.driver = driver;
        this.path = ZKPaths.makePath(path, lockName);//生成要创建的临时节点路径名称
        ...
    }
    ...
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        //获取当前时间
        final long startMillis = System.currentTimeMillis();
        //默认情况下millisToWait=null
        final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
        //默认情况下localLockNodeBytes也是null
        final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int retryCount = 0;
     
        String ourPath = null;
        boolean hasTheLock = false;//是否已经获取到锁
        boolean isDone = false;//是否正在获取锁
        while (!isDone) {
            isDone = true;
            //1.这里是关键性的加锁代码,会去级联创建一个临时顺序节点
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            //2.检查是否获取到了锁
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        if (hasTheLock) {
            return ourPath;
        }
        return null;
    }
    
    private final Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            //唤醒LockInternals中被wait()阻塞的线程
            client.postSafeNotify(LockInternals.this);
        }
    };
    
    //检查是否获取到了锁
    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;
        ...
        while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
            //3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
            List<String> children = getSortedChildren();
            //4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称
            String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
            //5.获取当前线程创建的节点在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径名称
            PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            if (predicateResults.getsTheLock()) {//获取锁成功
                //返回true
                haveTheLock = true;
            } else {//获取锁失败
                //获取前一个节点路径名称
                String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                synchronized(this) {
                    //use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                    //通过getData()获取前一个节点路径在zk的信息,并添加watch监听
                    client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                    //默认情况下,millisToWait = null
                    if (millisToWait != null) {
                        millisToWait -= (System.currentTimeMillis() - startMillis);
                        startMillis = System.currentTimeMillis();
                        if (millisToWait <= 0) {
                            doDelete = true;//timed out - delete our node
                            break;
                        }
                        wait(millisToWait);//阻塞
                    } else {
                        wait();//阻塞
                    }
                }
            }
        }
        ...
        return haveTheLock;
    }
    
    List<String> getSortedChildren() throws Exception {
        //获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
        return getSortedChildren(client, basePath, lockName, driver);
    }
    
    public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {
        //获取各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
        List<String> children = client.getChildren().forPath(basePath);
        //对节点名称进行排序
        List<String> sortedList = Lists.newArrayList(children);
        Collections.sort(
            sortedList,
            new Comparator<String>() {
                @Override
                public int compare(String lhs, String rhs) {
                    return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
                }
            }
        );
        return sortedList;
    }
    ...
}

public class StandardLockInternalsDriver implements LockInternalsDriver {
    ...
    //级联创建一个临时顺序节点
    @Override
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
        String ourPath;
        //默认情况下传入的lockNodeBytes=null
        if (lockNodeBytes != null) {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        } else {
            //创建临时顺序节点
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }
    
    //获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁
    @Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
        //根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置
        int ourIndex = children.indexOf(sequenceNodeName);
        validateOurIndex(sequenceNodeName, ourIndex);
        //maxLeases代表的是同时允许多少个客户端可以获取到锁
        //getsTheLock为true表示可以获取锁,getsTheLock为false表示获取锁失败
        boolean getsTheLock = ourIndex < maxLeases;
        //获取当前节点需要watch的前一个节点路径
        String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
        return new PredicateResults(pathToWatch, getsTheLock);
    }
    ...
}

(5)不同客户端线程获取锁时的互斥实现

maxLeases代表了同时允许多少个客户端可以获取到锁,默认值是1。能否获取锁的判断就是:线程创建的节点的位置outIndex < maxLeases。当线程1创建的节点在节点列表中排第一时,满足outIndex = 0 < maxLeases = 1,可以获取锁。当线程2创建的节点再节点列表中排第二时,不满足outIndex = 1 < maxLeases = 1,所以不能获取锁。从而实现线程1和线程2获取锁时的互斥。

(6)同一客户端线程可重入加锁的实现

客户端线程重复获取锁时,会重复调用InterProcessMutex的internalLock()方法。在InterProcessMutex的internalLock()方法中:线程第一次获取锁成功会创建一个LockData对象,并存放在一个Map中。线程第二次获取锁时,便会从这个Map中取出这个LockData对象,并对LockData对象中的重入计数器lockCount进行递增,接着就返回true。以此实现可重入加锁。

(7)客户端线程释放锁的实现

客户端线程释放锁时会调用InterProcessMutex的release()方法。

首先对LockData里的重入计数器进行递减。当重入计数器大于0时,直接返回。当重入计数器为0时才执行下一步删除节点的操作。

然后删除客户端线程创建的临时顺序节点,client.delete().guaranteed().forPath(ourPath)。

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
    private final LockInternals internals;
    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
    ...
    @Override
    public void release() throws Exception {
        //获取当前线程
        Thread currentThread = Thread.currentThread();
        //获取当前线程对应的LockData对象
        LockData lockData = threadData.get(currentThread);
        if (lockData == null) {
            throw new IllegalMonitorStateException("You do not own the lock: " + basePath);
        }
        //1.首先对LockData里的重入计数器lockCount进行递减
        int newLockCount = lockData.lockCount.decrementAndGet();
        if (newLockCount > 0) {
            //当重入计数器大于0时,直接返回
            return;
        }
        if (newLockCount < 0) {
            throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + basePath);
        }
        try {
            //2.当重入计数器为0时执行删除节点的操作
            internals.releaseLock(lockData.lockPath);
        } finally {
            threadData.remove(currentThread);
        }
    }
    ...
}

public class LockInternals {
    ...
    final void releaseLock(String lockPath) throws Exception {
        client.removeWatchers();
        revocable.set(null);
        deleteOurPath(lockPath);
    }
    
    private void deleteOurPath(String ourPath) throws Exception {
        //删除节点
        client.delete().guaranteed().forPath(ourPath);
    }
    ...
}

(8)客户端线程释放锁后其他线程获取锁的实现

由于在节点列表里排第二的节点对应的线程会监听排第一的节点,而当持有锁的客户端线程释放锁后,排第一的节点会被删除掉。所以在节点列表里排第二的节点对应的客户端,便会收到zk的通知。于是会回调执行该线程添加的Watcher的process()方法,也就是唤醒该线程,让其继续执行while循环获取锁。

public class LockInternals {
    ...
    private final Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            //唤醒LockInternals中被wait()阻塞的线程
            client.postSafeNotify(LockInternals.this);
        }
    };
    
    //检查是否获取到了锁
    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;
        ...
        while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
            //3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
            List<String> children = getSortedChildren();
            //4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称
            String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
            //5.获取当前线程创建的节点在节点列表中的位置+是否可以获取锁+前一个节点的路径名称
            PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            if (predicateResults.getsTheLock()) {//获取锁成功
                //返回true
                haveTheLock = true;
            } else {//获取锁失败
                //获取前一个节点路径名称
                String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                synchronized(this) {
                    //use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                    //通过getData()获取前一个节点路径在zk的信息,并添加watch监听
                    client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                    //默认情况下,millisToWait = null
                    if (millisToWait != null) {
                        millisToWait -= (System.currentTimeMillis() - startMillis);
                        startMillis = System.currentTimeMillis();
                        if (millisToWait <= 0) {
                            doDelete = true;//timed out - delete our node
                            break;
                        }
                        wait(millisToWait);//阻塞
                    } else {
                        wait();//阻塞
                    }
                }
            }
        }
        ...
        return haveTheLock;
    }
    ...
}

(9)InterProcessMutex就是一个公平锁

因为所有客户端线程都会创建一个顺序节点,然后按申请锁的顺序进行排序。最后会依次按自己所在的排序来尝试获取锁,实现了所有客户端排队获取锁。

图片

2.Curator的非可重入锁的源码

(1)Curator的非可重入锁InterProcessSemaphoreMutex的使用

(2)Curator的非可重入锁InterProcessSemaphoreMutex的源码

(1)Curator的非可重入锁InterProcessSemaphoreMutex的使用

非可重入锁:同一个时间只能有一个客户端线程获取到锁,其他线程都要排队,而且同一个客户端线程是不可重入加锁的。

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");

        //非可重入锁
        InterProcessSemaphoreMutex lock = new InterProcessSemaphoreMutex(client, "/locks");
        lock.acquire();
        Thread.sleep(3000);
        lock.release();
    }
}

(2)Curator的非可重入锁InterProcessSemaphoreMutex的源码

Curator的非可重入锁是基于Semaphore来实现的,也就是将Semaphore允许获取Lease的客户端线程数设置为1,从而实现同一时间只能有一个客户端线程获取到Lease。

public class InterProcessSemaphoreMutex implements InterProcessLock {
    private final InterProcessSemaphoreV2 semaphore;
    private final WatcherRemoveCuratorFramework watcherRemoveClient;
    private volatile Lease lease;

    public InterProcessSemaphoreMutex(CuratorFramework client, String path) {
        watcherRemoveClient = client.newWatcherRemoveCuratorFramework();
        this.semaphore = new InterProcessSemaphoreV2(watcherRemoveClient, path, 1);
    }

    @Override
    public void acquire() throws Exception {
        //获取非可重入锁就是获取Semaphore的Lease
        lease = semaphore.acquire();
    }

    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception {
        Lease acquiredLease = semaphore.acquire(time, unit);
        if (acquiredLease == null) {
            return false;
        }
        lease = acquiredLease;
        return true;
    }

    @Override
    public void release() throws Exception {
        //释放非可重入锁就是释放Semaphore的Lease
        Lease lease = this.lease;
        Preconditions.checkState(lease != null, "Not acquired");
        this.lease = null;
        lease.close();
        watcherRemoveClient.removeWatchers();
    }
}

3.Curator的可重入读写锁的源码

(1)Curator的可重入读写锁InterProcessReadWriteLock的使用

(2)Curator的可重入读写锁InterProcessReadWriteLock的初始化

(3)InterProcessMutex获取锁的源码

(4)先获取读锁 + 后获取读锁的情形分析

(5)先获取读锁 + 后获取写锁的情形分析

(6)先获取写锁 + 后获取读锁的情形分析

(7)先获取写锁 + 再获取写锁的情形分析

(1)Curator的可重入读写锁InterProcessReadWriteLock的使用

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");

        //读写锁
        InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, "/locks");
        lock.readLock().acquire();
        lock.readLock().release();
        lock.writeLock().acquire();
        lock.writeLock().release();
    }
}

(2)Curator的可重入读写锁InterProcessReadWriteLock的初始化

读锁和写锁都是基于可重入锁InterProcessMutex的子类来实现的。读锁和写锁的获取锁和释放锁逻辑,就是使用InterProcessMutex的逻辑。

public class InterProcessReadWriteLock {
    private final InterProcessMutex readMutex;//读锁
    private final InterProcessMutex writeMutex;//写锁
    //must be the same length. LockInternals depends on it
    private static final String READ_LOCK_NAME  = "__READ__";
    private static final String WRITE_LOCK_NAME = "__WRIT__";
    ...
    //InterProcessReadWriteLock的初始化
    public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) {
        lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);
        //写锁的初始化
        writeMutex = new InternalInterProcessMutex(
            client,
            basePath,
            WRITE_LOCK_NAME,//写锁的lockName='__WRIT__'
            lockData,
            1,//写锁的maxLeases
            new SortingLockInternalsDriver() {
                @Override
                public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
                    return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
                }
            }
        );
        //读锁的初始化
        readMutex = new InternalInterProcessMutex(
            client,
            basePath,
            READ_LOCK_NAME,//读锁的lockName='__READ__'
            lockData,
            Integer.MAX_VALUE,//读锁的maxLeases
            new SortingLockInternalsDriver() {
                @Override
                public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
                    return readLockPredicate(children, sequenceNodeName);
                }
            }
        );
    }
    
    private static class InternalInterProcessMutex extends InterProcessMutex {
        private final String lockName;
        private final byte[] lockData;
        InternalInterProcessMutex(CuratorFramework client, String path, String lockName, byte[] lockData, int maxLeases, LockInternalsDriver driver) {
            super(client, path, lockName, maxLeases, driver);
            this.lockName = lockName;
            this.lockData = lockData;
        }
         ...
    }
    
    public InterProcessMutex readLock() {
        return readMutex;
    }
    
    public InterProcessMutex writeLock() {
        return writeMutex;
    }
    ...
}

(3)InterProcessMutex获取锁的源码

public class InterProcessMutex implements InterProcessLock, Revocable<InterProcessMutex> {
    private final LockInternals internals;
    private final String basePath;
    private static final String LOCK_NAME = "lock-";
    //一个线程对应一个LockData数据对象
    private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
    ...
    //初始化InterProcessMutex
    InterProcessMutex(CuratorFramework client, String path, String lockName, int maxLeases, LockInternalsDriver driver) {
        //设置锁的路径
        basePath = PathUtils.validatePath(path);
        //初始化LockInternals
        internals = new LockInternals(client, driver, path, lockName, maxLeases);
    }
    
    @Override
    public void acquire() throws Exception {
        //获取分布式锁,会一直阻塞等待直到获取成功
        //相同的线程可以重入锁,每一次调用acquire()方法都要匹配一个release()方法的调用
        if (!internalLock(-1, null)) {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }
    
    private boolean internalLock(long time, TimeUnit unit) throws Exception {
        //获取当前线程
        Thread currentThread = Thread.currentThread();
        //获取当前线程对应的LockData数据
        LockData lockData = threadData.get(currentThread);
        if (lockData != null) {
            //可重入计算
            lockData.lockCount.incrementAndGet();
            return true;
        }
        //调用LockInternals.attemptLock()方法尝试获取锁,默认情况下,传入的time=-1,表示等待获取锁的时间
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if (lockPath != null) {
            //获取锁成功,将当前线程 + 其创建的临时顺序节点路径,封装成一个LockData对象
            LockData newLockData = new LockData(currentThread, lockPath);
            //然后把该LockData对象存放到InterProcessMutex.threadData这个Map中
            threadData.put(currentThread, newLockData);
            return true;
        }
        return false;
    }
    
    //LockData是InterProcessMutex的一个静态内部类
    private static class LockData {
        final Thread owningThread;
        final String lockPath;
        final AtomicInteger lockCount = new AtomicInteger(1);//用于锁的重入次数计数
        private LockData(Thread owningThread, String lockPath) {
            this.owningThread = owningThread;
            this.lockPath = lockPath;
        }
    }
    
    protected byte[] getLockNodeBytes() {
        return null;
    }
    ...
}

public class LockInternals {
    private final LockInternalsDriver driver;
    LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases) {
        this.driver = driver;
        this.path = ZKPaths.makePath(path, lockName);//生成要创建的临时节点路径名称
        ...
    }
    ...
    String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        //获取当前时间
        final long startMillis = System.currentTimeMillis();
        //默认情况下millisToWait=null
        final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
        //默认情况下localLockNodeBytes也是null
        final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int retryCount = 0;
     
        String ourPath = null;
        boolean hasTheLock = false;//是否已经获取到锁
        boolean isDone = false;//是否正在获取锁
        while (!isDone) {
            isDone = true;
            //1.这里是关键性的加锁代码,会去级联创建一个临时顺序节点
            ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
            //2.检查是否获取到了锁
            hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
        }
        if (hasTheLock) {
            return ourPath;
        }
        return null;
    }
    
    private final Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            //唤醒LockInternals中被wait()阻塞的线程
            client.postSafeNotify(LockInternals.this);
        }
    };
    
    //检查是否获取到了锁
    private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;
        ...
        while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
            //3.获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
            List<String> children = getSortedChildren();
            //4.获取当前客户端线程尝试获取分布式锁时创建的临时顺序节点的名称
            String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
            //5.获取当前线程创建的节点在节点列表中的位置 + 是否可以获取锁 + 前一个节点的路径名称
            PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
            if (predicateResults.getsTheLock()) {//获取锁成功
                //返回true
                haveTheLock = true;
            } else {//获取锁失败
                //获取前一个节点路径名称
                String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
                synchronized(this) {
                    //use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                    //通过getData()获取前一个节点路径在zk的信息,并添加watch监听
                    client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                    //默认情况下,millisToWait = null
                    if (millisToWait != null) {
                        millisToWait -= (System.currentTimeMillis() - startMillis);
                        startMillis = System.currentTimeMillis();
                        if (millisToWait <= 0) {
                            doDelete = true;//timed out - delete our node
                            break;
                        }
                        wait(millisToWait);//阻塞
                    } else {
                        wait();//阻塞
                    }
                }
            }
        }
        ...
        return haveTheLock;
    }
    
    List<String> getSortedChildren() throws Exception {
        //获取排好序的各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
        return getSortedChildren(client, basePath, lockName, driver);
    }
    
    public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception {
        //获取各个客户端线程尝试获取分布式锁时创建的临时顺序节点名称列表
        List<String> children = client.getChildren().forPath(basePath);
        //对节点名称进行排序
        List<String> sortedList = Lists.newArrayList(children);
        Collections.sort(
            sortedList,
            new Comparator<String>() {
                @Override
                public int compare(String lhs, String rhs) {
                    return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
                }
            }
        );
        return sortedList;
    }
    ...
}

public class StandardLockInternalsDriver implements LockInternalsDriver {
    ...
    //级联创建一个临时顺序节点
    @Override
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
        String ourPath;
        //默认情况下传入的lockNodeBytes=null
        if (lockNodeBytes != null) {
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path, lockNodeBytes);
        } else {
            //创建临时顺序节点
            ourPath = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(path);
        }
        return ourPath;
    }
    
    //获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁
    @Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
        //根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置
        int ourIndex = children.indexOf(sequenceNodeName);
        validateOurIndex(sequenceNodeName, ourIndex);
        //maxLeases代表的是同时允许多少个客户端可以获取到锁
        //getsTheLock为true表示可以获取锁,getsTheLock为false表示获取锁失败
        boolean getsTheLock = ourIndex < maxLeases;
        //获取当前节点需要watch的前一个节点路径
        String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
        return new PredicateResults(pathToWatch, getsTheLock);
    }
    ...
}

(4)先获取读锁 + 后获取读锁的情形分析

当线程创建完临时顺序节点,并获取到排好序的节点列表children后,执行LockInternalsDriver的getsTheLock()方法获取能否成功加锁的信息时,会执行到InterProcessReadWriteLock的readLockPredicate()方法。

由于此时firstWriteIndex = Integer.MAX_VALUE,所以无论多少线程尝试获取读锁,都能满足ourIndex < firstWriteIndex,也就是getsTheLock的值会为true,即表示可以获取读锁。

所以读读不互斥。

public class InterProcessReadWriteLock {
    ...
    //sequenceNodeName是当前线程创建的临时顺序节点的路径名称
    private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception {
        if (writeMutex.isOwnedByCurrentThread()) {
            return new PredicateResults(null, true);
        }
        int index = 0;
        int firstWriteIndex = Integer.MAX_VALUE;
        int ourIndex = -1;
        for (String node : children) {
            if (node.contains(WRITE_LOCK_NAME)) {
                firstWriteIndex = Math.min(index, firstWriteIndex);
            } else if (node.startsWith(sequenceNodeName)) {
                //找出当前线程创建的临时顺序节点在节点列表中的位置,用ourIndex表示
                ourIndex = index;
                break;
            }
            ++index;
        }
        StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);
        boolean getsTheLock = (ourIndex < firstWriteIndex);
        String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
        return new PredicateResults(pathToWatch, getsTheLock);
    }
    ...
}

(5)先获取读锁 + 后获取写锁的情形分析

一.假设客户端线程1首先成功获取了读锁

那么在/locks目录下,此时已经有了如下这个读锁的临时顺序节点。

/locks/43f3-4c2f-ba98-07a641d351f2-__READ__0000000004

二.然后另一个客户端线程2过来尝试获取写锁

于是该线程2会也会先在/locks目录下创建出如下写锁的临时顺序节点:

/locks/9361-4fb7-8420-a8d4911d2c99-__WRIT__0000000005

接着该线程会获取/locks目录的当前子节点列表并进行排序,结果如下:

[43f3-4c2f-ba98-07a641d351f2-__READ__0000000004,
9361-4fb7-8420-a8d4911d2c99-__WRIT__0000000005]

然后会执行StandardLockInternalsDriver的getsTheLock()方法。由于初始化写锁时,设置了其maxLeases是1,而在StandardLockInternalsDriver的getsTheLock()方法中,判断线程能成功获取写锁的依据是:ourIndex < maxLeases。即如果要成功获取写锁,那么线程创建的节点在子节点列表里必须排第一。

而此时,由于之前已有线程获取过一个读锁,而后来又有其他线程往里面创建一个写锁的临时顺序节点。所以写锁的临时顺序节点在子节点列表children里排第二,ourIndex是1。所以index = 1 < maxLeases = 1,条件不成立。

因此,此时客户端线程2获取写锁失败。于是该线程便会给前一个节点添加一个监听器,并调用wait()方法把自己挂起。如果前面一个节点被删除释放了锁,那么该线程就会被唤醒,从而再次尝试判断自己创建的节点是否在当前子节点列表中排第一。如果是,那么就表示获取写锁成功。

public class StandardLockInternalsDriver implements LockInternalsDriver {
    ...
    //获取当前线程创建的节点在节点列表中的位置以及是否可以获取锁
    @Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
        //根据节点名称获取当前线程创建的临时顺序节点在节点列表中的位置
        int ourIndex = children.indexOf(sequenceNodeName);
        validateOurIndex(sequenceNodeName, ourIndex);
        //maxLeases代表的是同时允许多少个客户端可以获取到锁
        //getsTheLock为true表示可以获取锁,getsTheLock为false表示获取锁失败
        boolean getsTheLock = ourIndex < maxLeases;
        //获取当前节点需要watch的前一个节点路径
        String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);
        return new PredicateResults(pathToWatch, getsTheLock);
    }
    ...
}

(6)先获取写锁 + 后获取读锁的情形分析

一.假设客户端线程1先获取了写锁

那么在/locks目录下,此时已经有了如下这个写锁的临时顺序节点。

/locks/4383-466e-9b86-fda522ea061a-__WRIT__0000000006

二.然后另一个客户端线程2过来尝试获取读锁

于是该线程2会也会先在/locks目录下创建出如下读锁的临时顺序节点:

/locks/5ba2-488f-93a4-f85fafd5cc32-__READ__0000000007

接着该线程会获取/locks目录的当前子节点列表并进行排序,结果如下:

[4383-466e-9b86-fda522ea061a-__WRIT__0000000006,
5ba2-488f-93a4-f85fafd5cc32-__READ__0000000007]

然后会执行LockInternalsDriver的getsTheLock()方法获取能否加锁的信息,也就是会执行InterProcessReadWriteLock的readLockPredicate()方法。

public class InterProcessReadWriteLock {
    ...
    //sequenceNodeName是当前线程创建的临时顺序节点的路径名称
    private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception {
        //如果是同一个客户端线程,先加写锁,再加读锁,是可以成功的,不会互斥
        if (writeMutex.isOwnedByCurrentThread()) {
            return new PredicateResults(null, true);
        }
        int index = 0;
        int firstWriteIndex = Integer.MAX_VALUE;
        int ourIndex = -1;
        for (String node : children) {
            if (node.contains(WRITE_LOCK_NAME)) {
                firstWriteIndex = Math.min(index, firstWriteIndex);
            } else if (node.startsWith(sequenceNodeName)) {
                //找出当前线程创建的临时顺序节点在节点列表中的位置,用ourIndex表示
                ourIndex = index;
                break;
            }
            ++index;
        }
        StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);
        boolean getsTheLock = (ourIndex < firstWriteIndex);
        String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
        return new PredicateResults(pathToWatch, getsTheLock);
    }
    ...
}

在InterProcessReadWriteLock的readLockPredicate()方法中,如果是同一个客户端线程,先获取写锁,再获取读锁,是不会互斥的。如果是不同的客户端线程,线程1先获取写锁,线程2再获取读锁,则互斥。因为线程2执行readLockPredicate()方法在遍历子节点列表(children)时,如果在子节点列表(children)中发现了一个写锁,会设置firstWriteIndex=0。而此时线程2创建的临时顺序节点的ourIndex=1,所以不满足ourIndex(1) < firstWriteIndex(0),于是线程2获取读锁失败。

总结,获取读锁时,在当前线程创建的节点前面:如果还有写锁对应的节点,那么firstWriteIndex就会被重置为具体位置。如果没有写锁对应的节点,那么firstWriteIndex就是MAX_VALUE。而只要firstWriteIndex为MAX_VALUE,那么就可以不断允许获取读锁。

(7)先获取写锁 + 再获取写锁的情形分析

如果客户端线程1先获取了写锁,然后后面客户端线程2来获取这个写锁。此时线程2会发现自己创建的节点排在节点列表中的第二,不是第一。于是获取写锁失败,进行阻塞挂起。等线程1释放了写锁后,才会唤醒线程2继续尝试获取写锁。

4.Curator的MultiLock源码

(1)Curator的MultiLock的使用

(2)Curator的MultiLock的源码

(1)Curator的MultiLock的使用

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");

        //MultiLock
        InterProcessLock lock1 = new InterProcessMutex(client, "/locks/lock_01");
        InterProcessLock lock2 = new InterProcessMutex(client, "/locks/lock_02");
        InterProcessLock lock3 = new InterProcessMutex(client, "/locks/lock_03");
        List<InterProcessLock> locks = new ArrayList<InterProcessLock>();
        locks.add(lock1);
        locks.add(lock2);
        locks.add(lock3);
        InterProcessMultiLock multiLock = new InterProcessMultiLock(locks);
    }
}

(2)Curator的MultiLock的源码

MultiLock原理:依次遍历获取每个锁,阻塞直到获取每个锁为止,然后返回true。如果过程中有报错,依次释放已经获取到的锁,然后返回false。

public class InterProcessMultiLock implements InterProcessLock {
    private final List<InterProcessLock> locks;
    public InterProcessMultiLock(List<InterProcessLock> locks) {
        this.locks = ImmutableList.copyOf(locks);
    }
    
    //获取锁
    @Override
    public void acquire() throws Exception {
        acquire(-1, null);
    }
    
    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception {
        Exception exception = null;
        List<InterProcessLock> acquired = Lists.newArrayList();
        boolean success = true;
        //依次遍历获取每个锁,阻塞直到获取每个锁为止
        for (InterProcessLock lock : locks) {
            try {
                if (unit == null) {
                    lock.acquire();
                    acquired.add(lock);
                } else  {
                    if (lock.acquire(time, unit)) {
                        acquired.add(lock);
                    } else {
                        success = false;
                        break;
                    }
                }
            } catch (Exception e) {
                ThreadUtils.checkInterrupted(e);
                success = false;
                exception = e;
            }
        }
        if (!success) {
            for (InterProcessLock lock : reverse(acquired)) {
                try {
                    lock.release();
                } catch (Exception e) {
                    ThreadUtils.checkInterrupted(e);
                    // ignore
                }
            }
        }
        if (exception != null) {
            throw exception;
        }
        return success;
    }
    
    @Override
    public synchronized void release() throws Exception {
        Exception baseException = null;
        for (InterProcessLock lock : reverse(locks)) {
            try {
                lock.release();
            } catch (Exception e) {
                ThreadUtils.checkInterrupted(e);
                if (baseException == null) {
                    baseException = e;
                } else {
                    baseException = new Exception(baseException);
                }
            }
        }
        if (baseException != null) {
            throw baseException;
        }
    }
    ...
}

5.Curator的Semaphore源码

(1)基于InterProcessSemaphoreV2使用Semaphore

(2)InterProcessSemaphoreV2的初始化

(3)InterProcessSemaphoreV2.acquire()方法获取Semaphore的Lease

(4)InterProcessSemaphoreV2.returnLease()方法释放Semaphore的Lease

Semaphore信号量,就是指定同时可以有多个线程获取到锁。

(1)基于InterProcessSemaphoreV2使用Semaphore

public class Demo {
    public static void main(String[] args) throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        final CuratorFramework client = CuratorFrameworkFactory.newClient(
            "127.0.0.1:2181",//zk的地址
            5000,//客户端和zk的心跳超时时间,超过该时间没心跳,Session就会被断开
            3000,//连接zk时的超时时间
            retryPolicy
        );
        client.start();
        System.out.println("已经启动Curator客户端,完成zk的连接");

        //获取Semaphore
        InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/semaphore", 3);
        Lease lease = semaphore.acquire();//获取Semaphore的一个锁
        Thread.sleep(3000);
        semaphore.returnLease(lease);//向Semaphore返还一个锁
    }
}

(2)InterProcessSemaphoreV2的初始化

public class InterProcessSemaphoreV2 {
    private final WatcherRemoveCuratorFramework client;
    private final InterProcessMutex lock;
    private final String leasesPath;
    private volatile int maxLeases;
    ...
    //maxLeases表示该实例可以允许获取的lease数量
    public InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases) {
        this(client, path, maxLeases, null);
    }
    
    //初始化InterProcessSemaphoreV2时,传入的参数path = "/semaphore",参数maxLeases = 3
    private InterProcessSemaphoreV2(CuratorFramework client, String path, int maxLeases, SharedCountReader count) {
        this.client = client.newWatcherRemoveCuratorFramework();
        path = PathUtils.validatePath(path);
        //锁的path是ZKPaths.makePath(path, LOCK_PARENT) => '/semaphore/locks'
        //初始化一个InterProcessMutex分布式锁
        this.lock = new InterProcessMutex(client, ZKPaths.makePath(path, LOCK_PARENT));
        this.maxLeases = (count != null) ? count.getCount() : maxLeases;
        //lease的path是:'/semaphore/leases'
        this.leasesPath = ZKPaths.makePath(path, LEASE_PARENT);
        ...
    }
    ...
}

(3)InterProcessSemaphoreV2.acquire()方法获取Semaphore的Lease

客户端线程尝试获取Semaphore的一个Lease。

步骤一:首先会获取初始化时创建的锁InterProcessMutex

锁的路径是:/semaphore/locks。当多个客户端线程同时执行acquire()获取Lease时只会有一个线程成功,而其他线程会基于锁路径下的临时顺序节点来排队获取锁。

步骤二:获取锁成功后才会尝试获取Semaphore的Lease

Lease的路径是:/semaphore/leases。此时会先到'/semaphore/leases'目录下创建一个临时顺序节点,然后会调用InterProcessSemaphoreV2的makeLease()方法创建一个Lease。这个Lease对象就是客户端线程成功获取Semaphore的一个Lease。

创建完Lease对象后,接着会进入一个for循环,会先获取/semaphore/leases目录下的所有临时顺序节点,并添加监听。然后判断/semaphore/leases目录下节点的数量是否大于maxLeases。如果临时顺序节点的数量小于maxLeases,那么说明当前客户端线程成功获取Semaphore的Lease,于是退出循环。如果临时顺序节点的数量大于maxLeases,那么当前客户端线程就要调用wait()进行阻塞等待。

public class InterProcessSemaphoreV2 {
    private final InterProcessMutex lock;
    private final Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            //唤醒在InterProcessSemaphoreV2对象中执行wait()而被阻塞的线程
            client.postSafeNotify(InterProcessSemaphoreV2.this);
        }
    };
    ...
    public Lease acquire() throws Exception {
        Collection<Lease> leases = acquire(1, 0, null);
        return leases.iterator().next();
    }
    
    public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception {
        long startMs = System.currentTimeMillis();
        boolean hasWait = (unit != null);
        long waitMs = hasWait ? TimeUnit.MILLISECONDS.convert(time, unit) : 0;
        Preconditions.checkArgument(qty > 0, "qty cannot be 0");


        ImmutableList.Builder<Lease> builder = ImmutableList.builder();
        boolean success = false;
        try {
            while (qty-- > 0) {
                int retryCount = 0;
                long startMillis = System.currentTimeMillis();
                boolean isDone = false;
                while (!isDone) {
                    switch (internalAcquire1Lease(builder, startMs, hasWait, waitMs)) {
                        case CONTINUE: {
                            isDone = true;
                            break;
                        }
                        case RETURN_NULL: {
                            return null;
                        }
                        case RETRY_DUE_TO_MISSING_NODE: {
                            if (!client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
                                throw new KeeperException.NoNodeException("Sequential path not found - possible session loss");
                            }
                            //try again
                            break;
                        }
                    }
                }
            }
            success = true;
        } finally {
            if (!success) {
                returnAll(builder.build());
            }
        }
        return builder.build();
    }
    
    private InternalAcquireResult internalAcquire1Lease(ImmutableList.Builder<Lease> builder, long startMs, boolean hasWait, long waitMs) throws Exception {
        if (client.getState() != CuratorFrameworkState.STARTED) {
            return InternalAcquireResult.RETURN_NULL;
        }
        if (hasWait) {
            long thisWaitMs = getThisWaitMs(startMs, waitMs);
            if (!lock.acquire(thisWaitMs, TimeUnit.MILLISECONDS)) {
                return InternalAcquireResult.RETURN_NULL;
            }
        } else {
            //1.首先获取一个分布式锁
            lock.acquire();
        }
        Lease lease = null;
        boolean success = false;
        try {
            //2.尝试获取Semaphore的Lease:创建一个临时顺序节点
            PathAndBytesable<String> createBuilder = client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL);
            String path = (nodeData != null) ? createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME), nodeData) : createBuilder.forPath(ZKPaths.makePath(leasesPath, LEASE_BASE_NAME));
            String nodeName = ZKPaths.getNodeFromPath(path);
            lease = makeLease(path);
            ...
            try {
                synchronized(this) {
                    for(;;) {
                        List<String> children;
                        //3.获取./lease目录下的所有临时顺序节点,并添加watcher监听
                        children = client.getChildren().usingWatcher(watcher).forPath(leasesPath);
                        ...
                        //4.判断临时顺序节点的数量是否大于maxLeases
                        //maxLeases表示最多允许多少个客户端线程获取Semaphore的Lease
                        if (children.size() <= maxLeases) {
                            //如果临时顺序节点的数量小于maxLeases
                            //那么说明当前客户端线程成功获取Semaphore的Lease,于是退出循环
                            break;
                        }
                        //如果临时顺序节点的数量大于maxLeases
                        //那么当前客户端线程就要调用wait()进行阻塞等待
                        if (hasWait) {
                            long thisWaitMs = getThisWaitMs(startMs, waitMs);
                            if (thisWaitMs <= 0) {
                                return InternalAcquireResult.RETURN_NULL;
                            }
                            ...
                            wait(thisWaitMs);
                        } else {
                            ...
                            wait();
                        }
                    }
                    success = true;
                }
            } finally {
                if (!success) {
                    returnLease(lease);
                }
                client.removeWatchers();
            }
        } finally {
            //释放掉之前获取的锁
            lock.release();
        }
        builder.add(Preconditions.checkNotNull(lease));
        return InternalAcquireResult.CONTINUE;
    }
    ...
}

(4)InterProcessSemaphoreV2.returnLease()方法释放Semaphore的Lease

执行InterProcessSemaphoreV2的returnLease()方法时,最终会执行makeLease()生成的Lease对象的close()方法,而close()方法会删除在/semaphore/leases目录下创建的临时顺序节点。

当/semaphore/leases目录下的节点发生变化时,那些对该目录进行Watcher监听的客户端就会收到通知,于是就会执行Watcher里的process()方法,唤醒执行wait()时被阻塞的线程,从而让这些没有成功获取Semaphore的Lease的线程继续尝试获取Lease。

public class InterProcessSemaphoreV2 {
    ...
    public void returnLease(Lease lease) {
        //执行Lease的close()方法
        CloseableUtils.closeQuietly(lease);
    }
    
    private Lease makeLease(final String path) {
        return new Lease() {
            @Override
            public void close() throws IOException {
                try {
                    client.delete().guaranteed().forPath(path);
                } catch (KeeperException.NoNodeException e) {
                    log.warn("Lease already released", e);
                } catch (Exception e) {
                    ThreadUtils.checkInterrupted(e);
                    throw new IOException(e);
                }
            }
            
            @Override
            public byte[] getData() throws Exception {
                return client.getData().forPath(path);
            }
            
            @Override
            public String getNodeName() {
                return ZKPaths.getNodeFromPath(path);
            }
        };
    }
    ...
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/983505.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

电脑睡眠智能管控:定时、依状态灵活调整,多模式随心选

软件介绍 今天要给大家介绍一款十分实用的软件——DontSleep&#xff0c;从名字就能看出&#xff0c;它的核心功能之一是阻止电脑进入睡眠状态&#xff0c;不过它的能耐可远不止于此&#xff0c;还具备强大的定时以及根据电脑实时状况灵活调整的功能。 这款软件采用绿色单文件…

装饰器模式--RequestWrapper、请求流request无法被重复读取

目录 前言一、场景二、原因分析三、解决四、更多 前言 曾经遇见这么一段代码&#xff0c;能看出来是把request又重新包装了一下&#xff0c;核心信息都不会改变 后面了解到这叫 装饰器模式&#xff08;Decorator Pattern&#xff09; &#xff1a;也称为包装模式(Wrapper Pat…

C++20 格式化库:强大的字符串格式化工具

文章目录 格式化语法常见用法1. 填充和对齐2. 数值格式化3. 进制格式化4. 自定义类型 示例代码注意事项 C20 的格式化库是一个强大的工具&#xff0c;用于处理字符串的格式化操作。它提供了类似于 Python 中 str.format() 的功能&#xff0c;但语法和用法更符合 C 的风格。以下…

Linux基础--文件权限+软件包管理+管道符

目录 基础权限 更改文件权限 使用命令:chmod 更改文件属主和数组 使用命令: chown 权限掩码 使用命令:umask 高级权限 软件包管理 使用命令: rpm 使用命令: yum 管道符&#xff0c;重定向 基础权限 文件基础权限表 符号含义数字r读权限4w写权限2x执行权限1 更改文件…

css画出带圆角平行四边形效果

使用css画出平行四边形效果如下图 HTML代码 <div class"badge"><span>营业中</span> </div> 关键代码&#xff1a; transform: skewX(-15deg); /* 让元素倾斜&#xff0c;形成平行四边形的视觉效果 */ 如果倾斜的元素里面需要放文字&…

postman接口请求中的 Raw是什么

前言 在现代的网络开发中&#xff0c;API 的使用已经成为数据交换的核心方式之一。然而&#xff0c;在与 API 打交道时&#xff0c;关于如何发送请求体&#xff08;body&#xff09;内容类型的问题常常困扰着开发者们&#xff0c;尤其是“raw”和“json”这两个术语之间的区别…

Gartner:数据安全平台DSP提升数据流转及使用安全

2025 年 1 月 7 日&#xff0c;Gartner 发布“China Context&#xff1a;Market Guide for Data Security Platforms”&#xff08;《数据安全平台市场指南——中国篇》&#xff0c;以下简称指南&#xff09;&#xff0c;报告主要聚焦中国数据安全平台&#xff08;Data Securit…

记录一次wifi版有人物联串口服务器调试经过

1、首先买了一个华为的wifi路由器&#xff0c;连接上以后&#xff0c;设置好网络名字和wifi密码 2、用网线连接串口服务器&#xff0c;通过192.168.1.1登录&#xff0c;进行配置 找到无线客户端配置&#xff0c;先在基本配置中打开5G配置&#xff0c;然后再去5.8G配置中设置 …

百货店的诞生与现代商业革命:结合开源AI智能客服、AI智能名片与S2B2C商城小程序的新视角

摘要&#xff1a;本文深入探讨了百货店作为现代商业革命的标志性事件&#xff0c;其出现对销售方式、经营方式、组织管理三个方面的根本性变革。同时&#xff0c;本文也展望了在数字化时代背景下&#xff0c;开源AI智能客服、AI智能名片以及S2B2C商城小程序等新兴技术如何为传统…

初学STM32之简单认识IO口配置(学习笔记)

在使用51单片机的时候基本上不需要额外的配置IO&#xff0c;不过在使用特定的IO的时候需要额外的设计外围电路&#xff0c;比如PO口它是没有内置上拉电阻的。因此若想P0输出高电平&#xff0c;它就需要外接上拉电平。&#xff08;当然这不是说它输入不需要上拉电阻&#xff0c;…

HPC超算系列3——新手指南2

可以参考我的上一篇博客&#xff1a; https://blog.csdn.net/weixin_62528784/article/details/146122850?sharetypeblogdetail&sharerId146122850&sharereferPC&sharesourceweixin_62528784&spm1011.2480.3001.8118 这一节主要是对上一节的一些内容的补充&…

Ubuntu20.04搭建gerrit code review

一、环境准备 1. 安装 Java 环境‌ Gerrit 依赖 Java 运行环境&#xff08;推荐 JDK 8&#xff09;&#xff1a; sudo apt install openjdk-11-jdk 验证安装&#xff1a; java -version ‌2. 安装 Git sudo apt install git ‌3. 可选依赖 数据库‌&#xff1a;Gerrit …

爬虫案例七Python协程爬取视频

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、Python协程爬取视频 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 爬虫案例七协程爬取视频 提示&#xff1a;以下是本篇文章正文…

GB28181视频监控流媒体平台LiveGBS如何自定义收流端口区间以便减少收流端口数或解决端口冲突问题

LiveGBS GB28181流媒体服务在接收视频的时候默认是使用30000-30249&#xff0c; webrtc流播放端口区间默认是UDP的30250-30500区间。有些网络环境不方便开放这么大的端口区间&#xff0c;下面介绍下如何修改配置这个区间。 从页面上修改这个区间&#xff0c;端口区间尽量设置大…

从连接到交互:SDN 架构下 OpenFlow 协议的流程与报文剖析

在SDN架构中&#xff0c;交换机与控制器之间的通信基于 OpenFlow协议&#xff0c;其设计目的是实现控制平面与数据平面的解耦。以下是 交换机连接控制器 和 数据包进入交换机触发交互 的详细流程及协议报文分析&#xff1a; 一、交换机连接控制器的流程&#xff08;初始化阶段&…

每日一练之反转链表

题目&#xff1a; 画图解答&#xff1a; 方法&#xff1a;三指针 代码解答&#xff08;带解析&#xff09;&#xff1a; //题目给的结构体 /*** Definition for singly-linked list.* struct ListNode {* int val;* struct ListNode *next;* };*/typedef struct Lis…

虚拟机总结| 关于虚拟机的一些配置总结

前言 每次安装新的虚拟机都需要重新在网上搜索如何配置网络&#xff0c;我需要写一个自己的部署步骤&#xff0c;增加工作效率&#xff0c;不用每次配置的时候再去网上去翻找。 1.只需要联网功能记录(不固定IP) 1.1 修改ifcfg-ens33 vi etc/sysconfig/network-scripts/ifcfg…

【数据结构初阶】---堆的实现、堆排序以及文件中的TopK问题

1.树的概念及结构 1.1树的概念 树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一个具有层次关系的集合。把它叫做树是因为它看起来像一棵倒挂的树&#xff0c;也就是说它是根朝上&#xff0c;而叶朝下的。 有一个特殊的结点&…

Autojs无线连接vscode方法

1.获得电脑的IP 在电脑的CMD界面输入 ipconfig 然后找到ipv4的那一行&#xff0c;后面的即是你的电脑IP地址 2.打开vscode的autojs服务 安装autojs插件 在vscode界面按下ctrlshiftp 输入autojs 找到 点击 之后打开手机上的autojs 之后输入刚刚电脑上的地址 可以看到vsc…

【Java开发指南 | 第三十五篇】Maven + Tomcat Web应用程序搭建

读者可订阅专栏&#xff1a;Java开发指南 |【CSDN秋说】 文章目录 前言Maven Tomcat Web应用程序搭建1、使用Maven构建新项目2、单击项目&#xff0c;连续按两次shift键&#xff0c;输入"添加"&#xff0c;选择"添加框架支持"3、选择Java Web程序4、点击&…