Java并发:互斥锁,读写锁,Condition,StampedLock

3,Lock与Condition

3.1,互斥锁

3.1.1,可重入锁

锁的可重入性(Reentrant Locking)是指在同一个线程中,已经获取锁的线程可以再次获取该锁而不会导致死锁。这种特性允许线程在持有锁的情况下,可以递归地调用自身的同步方法或代码块,而不会因为再次尝试获取相同的锁而被阻塞。显然,通常的锁都要设计成可重入的。否则就会发生死锁。

synchronized关键字,就是可重入锁。在一个 synchronized 方法 method1() 里面调用另外一个 synchronized 方法 method2() 。如果 synchronized 关键字不可重入,那么再 method2() 处就会发生阻塞,这显然不可行。

public synchronized void method1() {
    method2();  // 同一个线程可以再次进入 method2()
}

public synchronized void method2() {
    // 执行某些操作
}

Concurrent包中的与互斥锁(ReentrantLock)相关类之间的继承层次:
在这里插入图片描述
Lock是一个接口,其定义如下:

  • lock():获取锁。如果锁已经被其他线程占用,则当前线程会被阻塞,直到锁被释放。
  • lockInterruptibly():它允许线程在等待锁的过程中响应中断。如果线程在等待锁时被中断,抛出 InterruptedException 并退出等待。
  • tryLock():尝试获取锁,但不会阻塞线程。如果锁定成功,返回 true;如果锁已被其他线程占用,立即返回 false。
  • tryLock(long time, TimeUnit unit):尝试在指定的时间内获取锁。如果锁在指定时间内被释放,则返回 true 并成功获取锁;否则返回 false。期间如果线程被中断,会抛出 InterruptedException。
  • unlock():释放锁。通常在获取锁之后的 finally 块中调用,确保锁在任务完成后被释放,避免死锁。
  • newCondition():返回一个绑定到该锁的新 Condition 实例。Condition 提供了类似于 Object 的 wait、notify、notifyAll 方法的功能,但更加灵活,可以实现多条件等待/通知机制。
package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;
public interface Lock {
    void lock();
    void lockInterruptibly() throws InterruptedException;
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    void unlock();
    Condition newCondition();
}

ReentrantLock本身没有代码逻辑,实现都在其内部类Sync中:

public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;

  	public void lock() {
        sync.lock();
    }
	public void lock() {
        sync.lock();
    }

3.1.2,公平锁&非公平锁( lock() & tryAcquire())

Sync是一个抽象类,它有两个子类FairSync和NonfairSync,分别对应公平锁和非公平锁。从下面的ReentrantLock否早方法可以看出,会传入一个布尔类型的变量fair指定锁是公平锁还是非公平锁。默认设置的是非共公平锁,是为了提高效率,减少线程切换。

public ReentrantLock() {
    sync = new NonfairSync();
}
    
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

公平锁: 公平锁是一种严格遵循先来先服务原则的锁机制。当多个线程争用同一个锁时,锁会按照线程请求锁的顺序来分配,即先请求的线程优先获取锁,后请求的线程则需要等待前面的线程释放锁。

Lock fairLock = new ReentrantLock(true);  // true 表示使用公平锁

static final class FairSync extends Sync {
//没有一上来就抢锁,在这个函数内部排队,是公平的。
    final void lock() {
        acquire(1);
    }
}
//AbstractQueuedSynchronizer
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
//FairSync
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //只有当c===0(没有线程持有锁)
    if (c == 0) {
    	//检查当前线程前面是否有其他线程排队等待获取锁
        if (!hasQueuedPredecessors() &&
        	//更改锁的状态
            compareAndSetState(0, acquires)) {
            //将当前线程设置为锁的独占持有者
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

非公平锁: 非公平锁是一种不保证锁分配顺序的锁机制。线程在尝试获取锁时,可能会直接“插队”,即使有其他线程已经在等待锁。如果锁是空闲的,任何线程都可以获取它,无论它们何时请求锁。

Lock nonFairLock = new ReentrantLock();  // 默认构造方法,即非公平锁
static final class NonfairSync extends Sync {
   ...
   final void lock() {
      //一上来就尝试修改state值,也就是抢锁。
      //不考虑队列中有没有其他线程在排队。
      if (compareAndSetState(0, 1))
           setExclusiveOwnerThread(Thread.currentThread());
       else
           acquire(1);
   }
   
}
//AbstractQueuedSynchronizer
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
//NonfairSync
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
     int c = getState();
     //如果state为0,且队列中没有等待的线程,则设置当先线程为排他线程
     //同时设置state值。
     if (c == 0) {
         if (compareAndSetState(0, acquires)) {
             setExclusiveOwnerThread(current);
             return true;
         }
     }
     //如果排他线程就是当前线程,才直接设置state值。
     else if (current == getExclusiveOwnerThread()) {
         int nextc = c + acquires;
         if (nextc < 0) // overflow
             throw new Error("Maximum lock count exceeded");
         setState(nextc);
         return true;
     }
     return false;
 }

3.1.3,AbstractQueuedSynchronizer(AQS)

Sync的父类AbstractQueuedSynchronizer经常被称作队列同步器(AQS),这个类非常重要,该类的父类是AbstractOwnableSynchronizer。此处的锁具备synchronized功能,即可以阻塞一个线程。为了实现一把具有阻塞或唤醒功能的锁,需要几个核心要素:

  • ① 需要一个state变量,标记该锁的状态。state变量至少有两个值:0、1。对state变量的操作,使用CAS保证线程安全。
  • ② 需要记录当前是哪个线程持有锁。
  • ③ 需要底层支持对一个线程进行阻塞或唤醒操作。
  • ④ 需要有一个队列维护所有阻塞的线程,这个队列也必须是线程安全的无锁队列,也需要使用CAS。
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    // ...
    private transient Thread exclusiveOwnerThread;  // 记录持有锁的线程 
}

public abstract class AbstractQueuedSynchronizer extends
AbstractOwnableSynchronizer implements java.io.Serializable {
    private volatile int state; // 记录锁的状态,通过CAS修改state的值。    
    // ...
}

针对要素 ① 和 ②,在上面两个类中有对应的体现:state取值可以是0,1,还可以大于1,就是为了支持锁的可重入性。例如,同样一个线程,调用5次lock,state会变成5;然后调用5次unlock,state减为0。

  • 当state=0,没有线程持有锁,exclusiveOwnerThread=null;
  • 当state=1,有一个线程持有锁,exclusiveOwnerThread=该线程;
  • 当state>0,说明该线程(exclusiveOwnerThread)重入了该锁。

对于要素 ③,Unsafe类提供了阻塞或唤醒线程的一堆操作原语,也就是park/unpark。

public final class Unsafe {
	public native void unpark(Object var1);
	public native void park(boolean var1, long var2);
}

有一个LockSupport的工具类,对这一原语做了简单封装:

public class LockSupport {    
    // ...
    private static final Unsafe U = Unsafe.getUnsafe();    
    public static void park() {
        U.park(false, 0L);  
    }
    public static void unpark(Thread thread) {        
        if (thread != null)
        U.unpark(thread);  
    }
}

在当前线程中调用park(),该线程就会被阻塞;在另外一个线程中,调用unpark(Thread thread),传入一个被阻塞的线程,就可以唤醒阻塞在park()地方的线程。 unpark(Thread thread),它实现了一个线程对另外一个线程的"精准唤醒"。notify也只是唤醒某一个线程,但无法执行指定唤醒哪个线程。

针对要素 ④,在AQS中利用双向链表和CAS实现了一个阻塞队列。

public abstract class AbstractQueuedSynchronizer {    
    // ...
    static final class Node {
        volatile Thread thread; // 每个Node对应一个被阻塞的线程
        volatile Node prev;
        volatile Node next;        
        // ...
    }
    private transient volatile Node head;    
    private transient volatile Node tail;    
    // ...
}

阻塞队列是整个AQS核心中的核心。 head指向双向链表头部,tail指向双向链表尾部。入队就是把新的Node加到tail后面,然后对tail进行CAS操作;出队就是对head进行CAS操作,把head向后移一个位置。
在这里插入图片描述
初始时,head=tail=NULL; 然后,在往队列中加入阻塞的线程时,会新建一个空的Node,让head和tail都指向这个空的Node;之后,在后面加入被阻塞的线程对象。所以,当head=tail的时候,说明队列为空。

3.1.4,阻塞队列与唤醒机制(⭐)

addWaiter(…)方法将当前线程封装成一个 Node,然后添加到等待队列的尾部。该方法的目的是让线程进入同步队列,以便在适当的时机被唤醒或中断。

//AbstractQueuedSynchronizer
private Node addWaiter(Node mode) {
	//创建节点,尝试将节点追加到队列尾部。
    Node node = new Node(Thread.currentThread(), mode);
    //获取tail节点,将tail节点的next设置为当前节点。
    Node pred = tail;
    //如果tail不存在,就初始化队列。
    if (pred != null) {
        node.prev = pred;
        //先尝试加到队列尾部,如果不成功,则执行enq(node);
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //enq内部会进行队列的初始化,新建一个空的Node。然后不断尝试自旋,直至成功把该Node加入队列尾部为止。
    enq(node);
    return node;
}

在addWaiter(…)方法把Thread对象加入阻塞队列之后的工作就要靠acquireQueued(…)方法完成。 线程一旦进入acquireQueued(…)就会被无限期阻塞,即使有其他线程调用interrupt()方法也不能将其唤醒,除非有其他线程释放了锁,并且该线程拿到了锁,才会从acquireQueued(…)返回。

进入acquireQueued(…),该线程被阻塞。在该方法返回的一刻,就是拿到锁的那一刻,也就是被唤醒的那一刻,此时会删除队列的第一个元素(head指针前移一个节点)。

//AbstractQueuedSynchronizer
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            //被唤醒,如果自己在队列头部,则尝试拿锁。
            if (p == head && tryAcquire(arg)) {
            	//拿锁成功,出队列;
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

首先,acquireQueued(…)方法有一个返回值,表示什么意思?虽然该方法不会中断响应,但它会记录被阻塞期间有没有其他线程向它发送过中断信号。如果有,则该方法返回true;否则,返回false。

//AbstractQueuedSynchronizer
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

当acquireQueued(…)返回true时,会调用selfInterrupt(),自己给自己发送中断信号,也就是自己把自己的中断标志位设为true。之所以要这么做,是因为自己在阻塞期间,收到其他线程中断信号没有及时响应,现在要进行补偿。这样一来,如果该线程在lock代码块内部有调用sleep()之类的阻塞方法,就可以抛出异常,响应该中断信号。

阻塞就发生在下面这个方法中:

//AbstractQueuedSynchronizer
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

线程调用park()方法,自己把自己阻塞起来,直到被其他线程唤醒,该方法返回。

park()返回有两种情况:

  • 其他线程调用了unpark(Thread t)
  • 其他线程调用了t.interrupt()。这里要注意的是,lock()不能响应中断,但LockSupport.park()会响应中断。

也正因为LockSupport.park()可能被中断唤醒,acquireQueued(…)方法才写了一个for死循环。唤醒之后,如果发现自己排在队列头部,就去拿锁;如果拿不到锁,则再次阻塞自己。不断循环重复此过程,直到拿到锁。

被唤醒之后,通过Thread.interrupted()来判断是否被中断唤醒。如果是情况1,返回fasle;如果是情况2,返回true。

3.1.5,unlock()

unlock不区分公平还是非公平:当前线程要释放锁,先调用tryRelease(arg)方法,如果返回true,则取出head,让head获取锁。

//ReentrantLock
public void unlock() {
    sync.release(1);
}
//AbstractQueuedSynchronizer
public final boolean release(int arg) {
	//tryRelease(...)方法释放锁
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
        	//unparkSuccessor(...)方法唤醒队列中的后继者
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease方法:

//ReentrantLock
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    //只有锁的拥有者才有资格调用unlock()函数,否则抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //每次调用tryRelease,state值减1,直到0,才代表锁可以被成功释放
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    //没有使用CAS,而直接用set。因为是排他锁,只有一个线程能调减state值。
    setState(c);
    return free;
}

unparkSuccessor方法:

//ReentrantLock
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

3.1.6,trylock()

tryLock()实现基于调用非公平锁的tryAcquire(…),对state进行CAS操作,如果操作成功就拿到锁;如果操作不成功则直接返回false,也不阻塞。

//ReentrantLock
public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

3.2,读写锁

和互斥锁相比,读写锁(ReentrantReadWriteLock)就是读线程和读线程之间不互斥。

3.2.1,类继承层次

ReadWriteLock是一个接口,内部由两个Lock接口组成。

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

在这里插入图片描述
ReentrantReadWriteLock实现了该接口,使用方式如下:

ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); 
Lock readLock = readWriteLock.readLock();
readLock.lock(); 
// 进行读取操作 
readLock.unlock();

Lock writeLock = readWriteLock.writeLock(); 
writeLock.lock();
// 进行写操作 
writeLock.unlock();

也就是说,当使用ReadWriteLock的时候,并不是直接使用,而是获得其内部的读锁和写锁,然后分别调用lock/unlock。

3.2.2,读写锁基本原理

从表面来看,ReadLock和WriteLock是两把锁,实际上它只是同一把锁的两个视图而已。什么叫两个视图呢?可以理解为一把锁,线程分为两类:读线程和写线程。读线程和写线程之间不互斥(可以同时拿到这把锁),读线程之间不互斥、写线程之间互斥。

从下面的构造方法也可以看出,readerLock和writerLock实际公用同一个sync对象。sync对象同互斥锁一样,分为非公平锁和公平两种策略,并继承自AQS。

public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

同互斥锁一样,读写锁也是用state变量表示锁状态的。只是state变量在这里的含义和互斥锁完全不同。在内部类Sync中,对state变量进行重新定义,如下所示:

abstract static class Sync extends AbstractQueuedSynchronizer {    
    // ...
    static final int SHARED_SHIFT   = 16;
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;    
    // 持有读锁的线程的重入次数
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }    
    // 持有写锁的线程的重入次数
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }    
    // ...
    private volatile int state;
}

也就是把state变量拆成两半,低16位,用来记录写锁;高16位,用来记录"读"锁。但同一时间既然只能有一个线程写,为什么还需要16位呢?因为一个写线程可能多次重入。16位的数值范围是0到65535,这意味着一个线程最多可以重入写锁65535次。这个范围通常已经足够大,能够满足绝大多数场景中的需求;高16位的值等于5,既可以表示5个线程都拿到了该锁;也可以表示一个读线程重入了5次。

为什么要把一个int类型变量拆成两半,而不是用两个int型变量分别表示读锁和写锁的状态呢?

CAS操作只能在一次操作中对一个内存地址的值进行比较和交换。无法用一次CAS同时操作两个int变量,所以用来一个int型的高16位和低16位分别表示读锁和写锁的状态。

  • 当state = 0时,说明既没有线程持有读锁,也没有线程持有写锁;
  • 当state !=0时,要么有线程持有读锁,要么有线程持有写锁,两者不能同时成立,因为读和写互斥。这时再进一步通过sharedCount(state)和exclusiveCount(state)判断到底时读线程还是写线程持有了该锁。

3.2.3,lock() & unlock()

public static class ReadLock implements Lock, java.io.Serializable {    
    // ...
    public void lock() {
        sync.acquireShared(1);  
    }
    public void unlock() {
        sync.releaseShared(1);  
    }
    // ... 
}
public static class WriteLock implements Lock, java.io.Serializable {    
    // ...
    public void lock() {        
        sync.acquire(1);  
    }
    public void unlock() {        
        sync.release(1);  
    }
    // ... 
}

acquire/release、acquireShared/releaseShared是AQS里面的两对模板方法。互斥锁和读写锁的写锁都是基于acquire/release模板方法来实现的。读写锁的读锁是基于acquireShared/releaseShared这对模板方法来实现的。这两对模板方法的代码如下:

public abstract class AbstractQueuedSynchronizer 
    extends AbstractOwnableSynchronizer implements java.io.Serializable {    
    // ...
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&                 // tryAcquire方法由多个Sync子类实现
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))            
            selfInterrupt();
    }
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0) // tryAcquireShared方法由多个Sync子类实现
            doAcquireShared(arg);
    }
    
    public final boolean release(int arg) {
        if (tryRelease(arg)) {  // tryRelease方法由多个Sync子类实现            
            Node h = head;
            if (h != null && h.waitStatus != 0)                
                unparkSuccessor(h);
            return true;      
        }
        return false;  
    }
    
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {  // tryReleaseShared方法由多个Sync子类实现
            doReleaseShared();
            return true;     
        }
        return false;  
    }
    // ...
}

将读/写、公平/非公平进行排列组合,就有4种组合。如下,上面的两个方法都是在Sync种实现的。Sync种的两个方法又是模板方法,在NonfairSync和FairSync种分别有实现。最终的对应关系如下:

  • 读锁的公平实现:Sync.tryAcquireShared() + FairSync 中的 lock() & tryAcquire()。tryAcquire())。
  • 读锁的非公平实现:Sync.tryAcquireShared() + NonfairSync中的 lock() & tryAcquire()。
  • 写锁的公平实现:Sync.tryAcquire() + FairSync中的 lock() & tryAcquire()。
  • 写锁的非公平实现:Sync.tryAcquire() + NonfairSync中的 lock() & tryAcquire()。
    在这里插入图片描述
/**
 * Nonfair version of Sync
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -8159625535654395037L;
    // 写线程枪锁的时候是否应该阻塞
    final boolean writerShouldBlock() {
        // 写线程在抢锁之前永远不被阻塞,非公平锁
        return false; // writers can always barge
    }
    // 读线程抢锁的时候是否应该阻塞
    final boolean readerShouldBlock() {
        // 读线程抢锁的时候,当队列中第一个元素是写线程的时候要阻塞
        return apparentlyFirstQueuedIsExclusive();
    }
}

/**
 * Fair version of Sync
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = -2274990926593161451L;
    // 写线程抢锁的时候是否应该阻塞
    final boolean writerShouldBlock() {
        // 写线程在抢锁之前,如果队列中有其他线程在排队,则阻塞。公平锁
        return hasQueuedPredecessors();
    }
    // 读线程抢锁的时候是否应该阻塞
    final boolean readerShouldBlock() {
        // 读线程在抢锁之前,如果队列中有其他线程在排队,阻塞。公平锁
        return hasQueuedPredecessors();
    }
}

对于非公平,读锁和写锁的实现策略稍有差异:

  • 写锁能抢锁,前提是state=0,只有在没有其他线程持有读锁或写锁的情况下,它才有机会去抢锁。或者state !=0,但哪个持有写锁的线程是自己,再次重入。写线程是非公平的,即writerShouldBlock()方法一直返回false。
  • 对于读线程,假设当前线程被读线程持有,然后其他读线程还非公平地一直去抢,可能导致写线程永远拿不到锁,所以对于读线程的非公平,要做一些"约束"。当发现队列的第一个元素是写线程的时候,读线程也要阻塞,不能直接去抢。即偏向写线程。

3.2.4,WriteLock公平锁&非公平锁

写锁是排他锁,实现策略类似于互斥锁,重写了tryAcquire/tryRelease方法。

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    //写线程只能有一个,但写线程可以多次重入
    int w = exclusiveCount(c);
    //当c!=0 说明有读线程或者写线程持有锁
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        // w == 0, 说明锁被读线程持有,只能返回:w!=0,持有写锁的线程不是自己,也只能返回。
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        //16位用满了,超过了最大重入次数。
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    //公平锁实现和非公平锁实现只是writerShouldBlock()分别被FairSync和NonfairSync实现。
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    //抢锁成功后,将ownerThread设成自己。
    setExclusiveOwnerThread(current);
    return true;
}
  • c!=0 and w==0,说明当前一定是读线程拿着锁,写锁一定拿不到,返回false。
  • c!=0 and w!=0,说明当前一定是写线程拿着锁,执行current!=getExclusive-OwnerThread()的判断,发现ownerThread不是自己,返回false。
  • c!=0 and w!=0 and current = getExclusiveOwnerThread(),才会走到 if (w + exclusiveCount(acquires) > MAX_COUNT)。判断重入次数,重入次数超过最大值,抛出异常。
  • if (c=0),说明当前既没有读线程,也没有写线程持有该锁。可以通过CAS操作开抢。

tryRelease()分析

protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    //因为写锁是排他的,在当前线程持有写锁的时候,其他线程不会持有写锁也不会持有读锁。所以,这里对state值的调减不需要CAS操作,直接减1即可。
    setState(nextc);
    return free;
}

tryLock和lock方法不区分公平/非公平。

//ReentrantReadWriteLock
public boolean tryLock( ) {
   return sync.tryWriteLock();
}
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
    int c = getState();
    //当state不是0的时候,如果写线程获取锁的个数是0,或者写线程不是当前线程,则返回枪锁失败。
    if (c != 0) {
        int w = exclusiveCount(c);
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
    }
    //只要不是上面的情况,则通过CAS设置state的值。
    //如果设置成功,就将排他线程设置为当前线程并返回true。
    if (!compareAndSetState(c, c + 1))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

3.2.5,ReadLock公平锁&非公平锁

读锁是共享锁,重写了tryAcquireShared/tryReleaseShared方法其实现策略和排他锁有很大差异。

//ReentrantReadWriteLock
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    //写锁被某线程持有,并且这个线程还不是自己,读锁肯定拿不到,直接返回。
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    //公平和非公平的差异就在于这个函数
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        //CAS拿读锁,高16位加1
        compareAndSetState(c, c + SHARED_UNIT)) {
        //r之前等于0,说明这是第一个拿到读锁的线程
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        //不是第一个
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    //上面拿读锁失败,进入这个函数不断自旋拿读锁
    return fullTryAcquireShared(current);
}
if (exclusiveCount(c) != 0 &&
	getExclusiveOwnerThread() != current)
	return -1;

低16位不等于0,说明有写线程持有锁,并且只有当ownerThread !=自己时,才返回-1。如果current=ownerThread,则这段代码不会返回。这是因为一个写线程可以再次去拿读锁!也就是说,一个线程在持有WriteLock后,再去调用ReadLock.lock也是可以的。

compareAndSetState(c, c + SHARED_UNIT))

把state的高16位加1(读锁的状态),但因为是在高16位,必须把1左移16位再加1。

firstReader,cacheHoldConunter之类的变量,只是一些统计变量,在ReentrantRead-WriteLock对外的一些查询函数中会用到,例如,查询持有读锁的线程列表,但对整个读写互斥机制没有影响。

protected final boolean tryReleaseShared(int unused) {    
    Thread current = Thread.currentThread();
    // ...
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
        // Releasing the read lock has no effect on readers,            
        // but it may allow waiting writers to proceed if            
        // both read and write locks are now free.
        return nextc == 0;  
    }
}

因为读锁是共享锁,多个线程会同时持有读锁,所以对读锁的释放不能直接减1,而是需要通过一个for循环 + CAS操作不断重试。这是tryReleaseShared和tryReleased的根本差异所在。

3.3,Condition

3.3.1,Condition与Lock的关系

Condition本身也是一个接口,其功能和wait/notify类似,如下:

public interface Condition {
    void await() throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;    
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    void awaitUninterruptibly();
    boolean awaitUntil(Date deadline) throws InterruptedException;    
    void signal();
    void signalAll(); 
}

wait()/notify()必须和synchronized一起使用,Condition也必须和Lock一起使用。因此,在Lock的接口中,有一个与Condition相关的接口:

public interface Lock {    
    void lock();
    void lockInterruptibly() throws InterruptedException;    
    // 所有的Condition都是从Lock中构造出来的
    Condition newCondition();
    boolean tryLock();
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;    
    void unlock();
}

3.3.2,Condition的使用场景

以ArrayBlockingQueue为例。如下面所示为一个用数组实现的阻塞队列,执行put(…)操作的时候,队列满了,生产者线程被阻塞;执行take()的时候,队列为空,消费者线程被阻塞。

public class ArrayBlockingQueue<E> extends AbstractQueue<E>        
        implements BlockingQueue<E>, java.io.Serializable {    
    //...
    final Object[] items;
    int takeIndex;
    int putIndex;
    int count;
    // 一把锁+两个条件
    final ReentrantLock lock;
    private final Condition notEmpty;    
    private final Condition notFull;
    
    public ArrayBlockingQueue(int capacity, boolean fair) {        
        if (capacity <= 0)
            throw new IllegalArgumentException();        
        this.items = new Object[capacity];
        // 构造器中创建一把锁加两个条件
        lock = new ReentrantLock(fair);        
        // 构造器中创建一把锁加两个条件
        notEmpty = lock.newCondition();        
        // 构造器中创建一把锁加两个条件
        notFull =  lock.newCondition();  
    }
    
    public void put(E e) throws InterruptedException {        
        Objects.requireNonNull(e);
        final ReentrantLock lock = this.lock;        
        lock.lockInterruptibly();
        try {
            while (count == items.length)                
                // 非满条件阻塞,队列容量已满                
                notFull.await();
            enqueue(e);        
        } finally {
            lock.unlock();      
        }
    }
    
    private void enqueue(E e) {
        // assert lock.isHeldByCurrentThread();        
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;        
        final Object[] items = this.items;        
        items[putIndex] = e;
        if (++putIndex == items.length) putIndex = 0;        
        count++;
        // put数据结束,通知消费者非空条件        
        notEmpty.signal();
    }
    
    public E take() throws InterruptedException {        
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();        
        try {
            while (count == 0)
                // 阻塞于非空条件,队列元素个数为0,无法消费                
                notEmpty.await();
            return dequeue();        
        } finally {
            lock.unlock();      
        }
    }
    
    private E dequeue() {
        // assert lock.isHeldByCurrentThread();        
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;        
        final Object[] items = this.items;        
        @SuppressWarnings("unchecked")
        E e = (E) items[takeIndex];        
        items[takeIndex] = null;
        if (++takeIndex == items.length) takeIndex = 0;        
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        // 消费成功,通知非满条件,队列中有空间,可以生产元素了。        
        notFull.signal();
        return e;  
    }
    // ...
}

3.3.3,Condition实现原理

可以发现,Condition的使用很方便,避免了wait/notify的生产者通知生产者,消费者通知消费者的问题。具体实现如下:

由于Condition必须和Lock一起使用,所以Condition的实现也是Lock的一部分。首先查看互斥锁和读写锁中Condition的构造方法

public class ReentrantLock implements Lock, java.io.Serializable {    
    // ...
    public Condition newCondition() {
        return sync.newCondition();  
    }
}

public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {    
    // ...
    private final ReentrantReadWriteLock.ReadLock readerLock;    
    private final ReentrantReadWriteLock.WriteLock writerLock;    
    // ...
    public static class ReadLock implements Lock, java.io.Serializable {        
        // 读锁不支持Condition
        public Condition newCondition() {
            // 抛异常
            throw new UnsupportedOperationException();      
        }
    }
    
    public static class WriteLock implements Lock, java.io.Serializable {        
        // ...
        public Condition newCondition() {
            return sync.newCondition(); 
        }
        // ...  
    }
    // ... 
}

首先,读写锁中的ReadLock是不支持Condition的,读写锁的写锁和互斥锁都支持Condition。虽然它们各自调用的是自己的内部类Sync,但内部类Sync都继承紫AQS。因此,上面的代码sync.newCondition最终都调用了AQS中的newCondition:

public abstract class AbstractQueuedSynchronizer 
        extends AbstractOwnableSynchronizer implements java.io.Serializable {
        
    public class ConditionObject implements Condition, java.io.Serializable {
       // Condition的所有实现,都在ConditionObject类中  
    }
}

public class ReentrantLock implements Lock, java.io.Serializable {
    abstract static class Sync extends AbstractQueuedSynchronizer {   
        final ConditionObject newCondition() {
            return new ConditionObject();  
        }
    }
}

每一个Condition对象上面,都阻塞了多个线程。因此,在ConditionObject内部也有一个双向链表组成的队列,如下:

public class ConditionObject implements Condition, java.io.Serializable {    
    private transient Node firstWaiter;
    private transient Node lastWaiter; 
}
static final class Node {
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;    
    Node nextWaiter;
}

3.3.4,await()实现分析

public final void await() throws InterruptedException {
    // 刚要执行await()操作,收到中断信号,抛异常
    if (Thread.interrupted())
        throw new InterruptedException();
    // 加入Condition的等待队列
    Node node = addConditionWaiter();
    // 阻塞在Condition之前必须先释放锁,否则会死锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        // 阻塞当前线程
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 重新获取锁
    if (acquiraeQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        // 被中断唤醒,抛中断异常
        reportInterruptAfterWait(interruptMode);
}
  • 线程调用await()的时候,肯定已经先拿到了锁。所以,在addConditionWaiter()内部,对这个双向链表的操作不需要执行CAS操作,线程是安全的,代码如下:
private Node addConditionWaiter() {
    // ...
    Node t = lastWaiter;
    // ...
    Node node = new Node(Node.CONDITION);

    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
  • 在线程执行wait操作之前,必须先释放锁。也就是fullyRelease(node),否则会发生死锁。这个和wait/notify与synchronized的配合机制一样。

  • 线程从wait中被唤醒后,必须用acquiraeQueued(node, savedState)方法重新拿锁。

  • checkInterruptWhileWaiting(node)代码在park(this)代码之后,是为了检测在park期间是否收到过中断信号。当线程从park中醒来时,有两种可能:

    • 一种是其他线程调用了unpark;
    • 另一种是收到中断信号。
  • 这里的await()方法是可以响应中断,所以当发现自己被中断唤醒的,而不是被unpark唤醒时,会直接退出while循环,await()方法也会返回。

  • isOnSyncQueue(node)用于判断该Node是否在AQS的同步队列里面。初始的时候,Node值在Condition的队列里,而不在AQS的队列里。但执行notify操作的时候,会放进AQS的同步队列。

3.3.5,awaitUniterruptibly()实现分析

与await()不同,awaitUninterruptibly()不会响应中断,其方法的定义中不会有中断异常抛出,下面分析其实现和await()的区别。

public final void awaitUninterruptibly() {
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        //当线程唤醒后,如果被中断过,仅记录,不处理,继续进行while循环
        if (Thread.interrupted())
            interrupted = true;
    }
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

可以看出,整体代码和await()类似,区别在于收到异常后,不会抛出异常,而是继续执行while循环。

3.3.6,notify()实现分析

public final void signal() {
    // 只有持有锁的线程,才有资格调用signal()方法
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        // 发起通知
        doSignal(first);
}

// 唤醒队列中的第1个线程
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
         first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}
        
final boolean transferForSignal(Node node) {
    if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
        return false;
	// 先把Node放入互斥锁的同步队列中,再调用unpark方法
    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

同await()一样,在调用notify()的时候,必须先拿到锁(否则就会抛出上面的异常),是因为前面执行await()的时候,把锁释放了。

然后从队列中取出firstWaiter,唤醒它。在通过调用unpark唤醒它之前,先用enq(node)方法把这个Node放入AQS的锁对应的阻塞队列中。也正因为如此,才有了await()方法里面的判断条件:while (!isOnSyncQueue(node)) ,这个判断条件满足,说明await线程不是被中断的,而是被unpark唤醒的。

notifyAll()与此类似。

3.4,StampedLock

3.4.1,StampedLock使用场景

并发度
ReentrantLock读与读互斥,读与写互斥,写与写互斥
ReentrantReadWriteLock读与读不互斥,读与写互斥,写与写互斥
StampedLock读与读不互斥,读与写不互斥,写与写互斥

可以看到,从ReentrantLock到StampedLock,并发度依次提高。StampedLock是如何做到“读”与“写”也不互斥、并发地访问的呢?MySQL 高并发的核心机制 MVCC,也就是一份数据多个版本,此处的StampedLock有异曲同工之妙。

另一方面,因为ReentrantLock采用的是“悲观读”的策略,当第一个读线程拿到锁之后,第二个、第三个读线程还可以拿到锁,使得写线程一直拿不到锁,可能导致写线程“饿死”。虽然在其公平或非公平的实现中,都尽量避免这种情形,但还有可能发生。StampedLock引入了“乐观读”策略,读的时候不加读锁,读出来发现数据被修改了,再升级为“悲观读”,相当于降低了“读”的地位,把抢锁的天平往“写”的一方倾斜了一下,避免写线程被饿死。

class Point {
    private double x,y;
    private final StampedLock s1 = new StampedLock();
    //多个线程调用该函数,修改x,y的值
    void move(double deltaX, double deltaY) {
        long stamp = s1.writeLock();
        try {
            x += deltaX;
            y += deltaY;
        } finally {
            s1.unlockWrite(stamp);
        }
    }
    //多个线程调用该函数,求距离。使用“乐观读”将共享变量拷贝到线程栈中。
    //读的期间,其他线程修改了共享变量(读到脏数据),放弃。升级为悲观锁。
    double distanceFromOrigin() {
        long stamp = s1.tryOptimisticRead();
        double currentX = x, currentY = y;
        if (!s1.validate(stamp)) {
            stamp = s1.readLock();
            try {
                currentX = x;
                currentY = y;
            } finally {
                s1.unlockRead(stamp);
            }
        }
        return Math.sqrt(currentX*currentX + currentY*currentY);
    }
}

首先,执行move操作的时候,要加锁。这个用法和ReadWriteLock的用法没有区别,写操作和写操作也是互斥的。关键在于读的时候,用了一个“乐观读”sl.tryOptimisticRead(),相当于在读之前给数据的状态做了一个“快照”。然后,把数据拷贝到内存里面,在用之前,再比对一次版本号。如果版本号变了,则说明在读的期间有其他线程修改了数据。读出来的数据废弃,重新获取读锁。关键代码就是下面这三行:

//在读之前,获取数据的版本号。
//读:将一份数据拷贝到线程的栈内存中
//读之后:将读之前的版本号和当前版本号比对,判断读出来数据是否可以使用(期间没有被其他线程修改)
long stamp = s1.tryOptimisticRead();
double currentX = x, currentY = y;
if (!s1.validate(stamp)) {

要说明的是,这三行关键代码对顺序非常敏感,不能有重排序。因为 state 变量已经是volatile,所以可以禁止重排序,但stamp并不是volatile 的。为此,在validate (stamp )函数里面插入内存屏障。

3.4.2,“乐观读”的实现原理

首先,StampedLock是一个读写锁,因此也会像读写锁那样,把一个state变量分成两半,分别表示读锁和写锁的状态。同时,它还需要一个数据的version。但正如前面所说,一次CAS没有办法操作两个变量,所以这个state变量本身同时也表示了数据的version。下面先分析state变量。

public class StampedLock implements java.io.Serializable {
	private static final long RUNIT = 1L;
	//第8位表示写锁
    private static final long WBIT  = 1L << LG_READERS;
    //最低的7位表示读锁
    private static final long RBITS = WBIT - 1L;
    //读锁的数目
    private static final long RFULL = RBITS - 1L;
    //读锁和写锁的状态整合到一起
    private static final long ABITS = RBITS | WBIT;
    private static final long SBITS = ~RBITS; // note overlap with ABITS

    // Initial value for lock state; avoid failure value zero
    private static final long ORIGIN = WBIT << 1;
    //state的初始值
    private transient volatile long state;

用最低的8位表示读和写的状态,其中第8位表示写锁的状态,最低的7位表示读锁的状态。因为写锁只有一个bit位,所以写锁是不可重入的。
在这里插入图片描述
初始值不为0,而是把WBIT 向左移动了一位,也就是上面的ORIGIN 常量,构造函数如下所示。

// Initial value for lock state; avoid failure value zero
private static final long ORIGIN = WBIT << 1;

public StampedLock() {
   state = ORIGIN;
}

为什么state的初始值不设为0呢?这就要从乐观锁的实现说起。

public long tryOptimisticRead() {
    long s;
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}
public boolean validate(long stamp) {
    U.loadFence();
    return (stamp & SBITS) == (state & SBITS);
}

上面两个函数必须结合起来看:当state&WBIT!=0的时候,说明有线程持有写锁,上面的tryOptimisticRead会永远返回0。这样,再调用validate(stamp),也就是validate(0)也会永远返回false。这正是我们想要的逻辑:当有线程持有写锁的时候,validate永远返回false,无论写线程是否释放了写锁。因为无论是否释放了(state回到初始值)写锁,state值都不为0,所以validate(0)永远为false。

为什么上面的validate(…)函数不直接比较stamp=state,而要比较state&SBITS=state&SBITS 呢?因为读锁和读锁是不互斥的!所以,即使在“乐观读”的时候,state 值被修改了,但如果它改的是第7位,validate(…)还是会返回true。

另外要说明的一点是,上面使用了内存屏障 U.loadFence(),是因为在这行代码的下一行里面的stamp、SBITS变量不是volatile
的,由此可以禁止其和前面的currentX=X,currentY=Y进行重排序。通过上面的分析,可以发现state的设计非常巧妙。只通过一个变量,既实现了读锁、写锁的状态记录,还实现了数据的版本号的记录。

3.4.3,悲观读/写:“阻塞”和“自旋”策略实现差异

和ReadWriteLock一样,StampedLock也要进行悲观的读锁和写锁操作。不过它不是基于AQS实现的,而是内部重新实现了一个阻塞队列。

static final class WNode {
    volatile WNode prev;
    volatile WNode next;
    volatile WNode cowait;    // list of linked readers
    volatile Thread thread;   // non-null while possibly parked
    volatile int status;      // 0, WAITING, or CANCELLED
    final int mode;           // RMODE or WMODE
    WNode(int m, WNode p) { mode = m; prev = p; }
}
/** Head of CLH queue */
private transient volatile WNode whead;
/** Tail (last) of CLH queue */
private transient volatile WNode wtail;

这个阻塞队列和 AQS 里面的很像。刚开始的时候,whead=wtail=NULL,然后初始化,建一个空节点,whead和wtail都指向这个空节点,之后往里面加入一个个读线程或写线程节点。但基于这个阻塞队列实现的锁的调度策略和AQS很不一样,也就是“自旋”。在AQS里面,当一个线程CAS state失败之后,会立即加入阻塞队列,并且进入阻塞状态。但在StampedLock中,CAS state失败之后,会不断自旋,自旋足够多的次数之后,如果还拿不到锁,才进入阻塞状态。为此,根据CPU的核数,定义了自旋次数的常量值。如果是单核的CPU,肯定不能自旋,在多核情况下,才采用自旋策略。

private static final int NCPU = Runtime.getRuntime().availableProcessors();

private static final int SPINS = (NCPU > 1) ? 1 << 6 : 0;

下面以写锁的加锁,也就是StampedLock的writeLock()函数为
例,来看一下自旋的实现。

 
public long writeLock() {
    long s, next;  
    return ((((s = state) & ABITS) == 0L &&
             U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
            next : acquireWrite(false, 0L));
}

如上面代码所示,当state&ABITS==0的时候,说明既没有线程持有读锁,也没有线程持有写锁,此时当前线程才有资格通过CAS操作state。若操作不成功,则调用acquireWrite()函数进入阻塞队列,并进行自旋,这个函数是整个加锁操作的核心,代码如下。

private long acquireWrite(boolean interruptible, long deadline) {
        WNode node = null, p;
        //入队列时自选
        for (int spins = -1;;) { // spin while enqueuing
            long m, s, ns;
            if ((m = (s = state) & ABITS) == 0L) {
                if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                	//自旋的时候拿到了锁,函数返回
                    return ns;
            }
            else if (spins < 0)
                spins = (m == WBIT && wtail == whead) ? SPINS : 0;
            else if (spins > 0) {
                if (LockSupport.nextSecondarySeed() >= 0)
                	//不断自旋,以一定的概率把spins值往下累减
                    --spins;
            }
            else if ((p = wtail) == null) { //初始化队列
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    wtail = hd;
            }
            else if (node == null)
                node = new WNode(WMODE, p);
            else if (node.prev != p)
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                p.next = node;
                //for循环唯一的break,CAS tail成功(成功加入队列尾部),才会退出for循环
                break;
            }
        }

        for (int spins = -1;;) {
            WNode h, np, pp; int ps;
            if ((h = whead) == p) {
                if (spins < 0)
                    spins = HEAD_SPINS;
                else if (spins < MAX_HEAD_SPINS)
                    spins <<= 1;
                for (int k = spins;;) { // spin at head
                    long s, ns;
                    //再次尝试拿锁
                    if (((s = state) & ABITS) == 0L) {
                        if (U.compareAndSwapLong(this, STATE, s,
                                                 ns = s + WBIT)) {
                            whead = node;
                            node.prev = null;
                            return ns;
                        }
                    }
                    else if (LockSupport.nextSecondarySeed() >= 0 &&
                             --k <= 0) //不断自旋
                        break;
                }
            }
            else if (h != null) { // help release stale waiters
                WNode c; Thread w;
                //自己从阻塞中唤醒,然后唤醒cowait中所有reader线程
                while ((c = h.cowait) != null) { 
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null)
                        U.unpark(w);
                }
            }
            if (whead == h) {
                if ((np = node.prev) != p) {
                    if (np != null)
                        (p = np).next = node;   // stale
                }
                else if ((ps = p.status) == 0)
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                else if (ps == CANCELLED) {
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                }
                else {
                    long time; // 0 argument to park means no timeout
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, node, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                        whead == h && node.prev == p)
                        //进入阻塞状态,之后被另外一个线程release唤醒,接着往下执行这个for循环
                        U.park(false, time);
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, node, true);
                }
            }
        }
    }

整个acquireWrite(…)函数是两个大的for循环,内部实现了非常复杂的自旋策略。在第一个大的for循环里面,目的就是把该Node加入队列的尾部,一边加入,一边通过CAS操作尝试获得锁。如果获得了,整个函数就会返回;如果不能获得锁,会一直自旋,直到加入队列尾部。

在第二个大的for循环里,也就是该Node已经在队列尾部了。这个时候,如果发现自己刚好也在队列头部,说明队列中除了空的Head节点,就是当前线程了。此时,再进行新一轮的自旋,直到达到MAX_HEAD_SPINS次数,然后进入阻塞。这里有一个关键点要说明:当release(…)函数被调用之后,会唤醒队列头部的第1个元素,此时会执行第二个大的for循环里面的逻辑,也就是接着for循环里面park()函数后面的代码往下执行。

另外一个不同于AQS的阻塞队列的地方是,在每个WNode里面有一个cowait指针,用于串联起所有的读线程。例如,队列尾部阻塞的是一个读线程 1,现在又来了读线程 2、3,那么会通过cowait指针,把1、2、3串联起来。1被唤醒之后,2、3也随之一起被唤醒,因为读和读之间不互斥。

明白加锁的自旋策略后,下面来看锁的释放操作。和读写锁的实现类似,也是做了两件事情:一是把state变量置回原位,二是唤醒阻塞队列中的第一个节点。节点被唤醒之后,会继续执行上面的第二个大的for循环,自旋拿锁。如果成功拿到,则出队列;如果拿不到,则再次进入阻塞,等待下一次被唤醒。

// java.util.concurrent.locks.StampedLock#unlockWrite
public void unlockWrite(long stamp) {
    WNode h;
    if (state != stamp || (stamp & WBIT) == 0L)
        throw new IllegalMonitorStateException();
    state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
    if ((h = whead) != null && h.status != 0)
        release(h);
}
// 唤醒队列的队首节点【头结点whead的后继节点】
private void release(WNode h) {
    if (h != null) {
        WNode q; Thread w;
        U.compareAndSwapInt(h, WSTATUS, WAITING, 0); // 将头结点状态从-1变为0,标识要唤醒其后继节点
        if ((q = h.next) == null || q.status == CANCELLED) { // 判断头结点的后继节点是否为null或状态为取消
            for (WNode t = wtail; t != null && t != h; t = t.prev) // 从队尾查找距头结点最近的状态为等待的节点
                if (t.status <= 0)
                    q = t; // 赋值
        }
        if (q != null && (w = q.thread) != null)
            U.unpark(w); // 唤醒队首节点
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/875238.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

如何在 Selenium 中获取网络调用请求?

引言 捕获网络请求对于理解网站的工作方式以及传输的数据至关重要。Selenium 作为一种 Web 自动化工具,可以用于捕获网络请求。本文将讨论如何使用 Selenium 在 Java 中捕获网络请求并从网站检索数据。 我们可以使用浏览器开发者工具轻松捕获网络请求或日志。大多数现代 Web…

【iOS】UIViewController的生命周期

UIViewController的生命周期 文章目录 UIViewController的生命周期前言UIViewController的一个结构UIViewController的函数的执行顺序运行代码viewWillAppear && viewDidAppear多个视图控制器跳转时的生命周期pushpresent 小结 前言 之前对于有关于UIViewControlller的…

MySQL:bin log

redo log 它是物理日志&#xff0c;记录内容是“在某个数据页上做了什么修改”&#xff0c;属于 InnoDB 存储引擎。 而 binlog 是逻辑日志&#xff0c;记录内容是语句的原始逻辑&#xff0c;类似于“给 ID2 这一行的 c 字段加 1”&#xff0c;属于MySQL Server 层。 不管用什…

学习平台|基于java的移动学习平台系统小程序(源码+数据库+文档)

学习平台|学习平台系统|在线学习平台系统小程序 目录 基于java的移动学习平台系统小程序 一、前言 二、系统设计 三、系统功能设计 四、数据库设计 五、核心代码 六、论文参考 七、最新计算机毕设选题推荐 八、源码获取&#xff1a; 博主介绍&#xff1a;✌️大厂码…

C++奇迹之旅:快速上手Priority_queue的使用与模拟实现

文章目录 &#x1f4dd;priority_queue的介绍和使用&#x1f320; priority_queue的介绍&#x1f309;priority_queue的使用 &#x1f320;仿函数的使用&#x1f320;C语言有趣的模仿push_back&#x1f320;priority_queue的模拟实现&#x1f6a9;总结 &#x1f4dd;priority_q…

java重点学习-集合(List)

七 集合&#xff08;List&#xff09; 7.1 复杂度分析 7.2 数组 1.数组(Array)是一种用连续的内存空间存储相同数据类型 数据的线性数据结构。 2.数组下标为什么从0开始 寻址公式是:baseAddressi*dataTypeSize&#xff0c;计算下标的内存地址效率较高 3.查找的时间复杂度 随机(…

HarmonyOS Next系列之实现一个左右露出中间大两边小带缩放动画的轮播图(十二)

系列文章目录 HarmonyOS Next 系列之省市区弹窗选择器实现&#xff08;一&#xff09; HarmonyOS Next 系列之验证码输入组件实现&#xff08;二&#xff09; HarmonyOS Next 系列之底部标签栏TabBar实现&#xff08;三&#xff09; HarmonyOS Next 系列之HTTP请求封装和Token…

一种多策略改进小龙虾智能优化算法MSCOA 改进策略:种群混沌映射初始化+透镜成像反向学习+黄金正弦变异策略

一种多策略改进小龙虾智能优化算法MSCOA 改进策略&#xff1a;种群初始化精英反向透镜成像反向学习黄金正弦变异策略 文章目录 一、小龙虾COA基本原理二、改进策略2.1种群初始化 映射2.2 透镜成像反向学习2.3 黄金正弦变异策略 三、实验结果四、核心代码五、代码获取六、总结 一…

小型企业如何利用人工智能的生产力

尽管生产力低下是一个长期存在的问题&#xff0c;但最近严峻的经济逆风加剧了这一问题&#xff0c;企业清算数量同比增长了 19&#xff05;。 Xero 的报告《小企业生产力&#xff1a;趋势、影响和战略》反映了这些宏观经济变化&#xff0c;显示 2023 年新西兰小企业生产力与 …

SiC,GaN驱动优选驱动方案SiLM5350系列SiLM5350MDDCM-DG 带米勒钳位Clamp保护功能 单通道隔离栅极驱动器

SiLM5350MDDCM-DG是一款适用于IGBT、MOSFET的单通道 隔离门极驱动器&#xff0c;具有10A拉电流和10A灌电流驱动能 力。提供内部钳位功能&#xff0c;可单独控制 上升时间和下降时间。 在 SOP8 封 装 中 具 有 3000VRMS 隔 离 耐 压 &#xff08; 符 合 UL1577&#xff09;。 与…

抖音微信超火国庆节国旗头像生成源码

源码介绍&#xff1a; 抖音微信超火国庆节国旗头像生成源码&#xff0c;静态页前端生成速度超快&#xff01;源码直接上传到服务器即可使用。 1、打开地址后点击上传->选一张你喜欢的头像->然后点右边箭头符合选款式->最后点保存头像->按照提示 2、保存到手机即…

c/c++面试100道

1.一道笔试题解析_哔哩哔哩_bilibili P20&#xff1a;#define offsetof(TYPE, MEMBER) ((size_t)&((TYPE*)0)->MEMBER) 1、 offsetof 宏是 C 语言中用于计算结构体成员相对于结构体起始地址的偏移量的宏定义。这个宏的定义如下&#xff1a; #define offsetof(TYPE, …

macOS上谷歌浏览器的十大隐藏功能

谷歌浏览器&#xff08;Google Chrome&#xff09;在macOS上拥有一系列强大而隐蔽的特性&#xff0c;这些功能能显著提高您的浏览体验。从多设备同步到提升安全性和效率&#xff0c;这些被低估的功能等待着被发掘。我们将逐步探索这些功能&#xff0c;帮助您最大化利用谷歌浏览…

09-排序1 排序(C)

这一节&#xff0c;测试各类排序算法的运行速度&#xff08;没有基数排序&#xff08;桶&#xff09; 其实在实际学习中&#xff0c;还是有意义的 给定 n 个&#xff08;长整型范围内的&#xff09;整数&#xff0c;要求输出从小到大排序后的结果。 本题旨在测试各种不同的排序…

【代码随想录训练营第42期 Day57打卡 - 图论Part7 - Prim算法

一、Prim算法 Prim算法是一种贪心算法&#xff0c;用于求解加权无向图的最小生成树问题。其中&#xff0c;最小生成树是指一个边的子集&#xff0c;它连接图中的所有顶点&#xff0c;且边的总权重最小&#xff0c;并且没有形成环。 对于Prim算法的简单了解&#xff0c;这里推…

基于小程序的教学辅助微信小程序设计+ssm(lw+演示+源码+运行)

教学辅助微信小程序 摘 要 随着移动应用技术的发展&#xff0c;越来越多的学生借助于移动手机、电脑完成生活中的事务&#xff0c;许多的传统行业也更加重视与互联网的结合&#xff0c;由于学生学习的压力越来越大&#xff0c;教学辅助是一个非常不错的教育平台&#xff0c;对…

9.12-kubeadm方式安装k8s+基础命令的使用

一、安装环境 编号主机名称ip地址1k8s-master192.168.2.662k8s-node01192.168.2.773k8s-node02192.168.2.88 二、前期准备 1.设置免密登录 [rootk8s-master ~]# ssh-keygen[rootk8s-master ~]# ssh-copy-id root192.168.2.77[rootk8s-master ~]# ssh-copy-id root192.168.2.…

指令——计算机的语言(part 2)

目录 1.1 翻译并执行程序 1.1.1 编译器 1.1.2 汇编器 1.1.3 链接器 1.1.4 加载器 1.1.5 动态链接库 接上一篇文章: 指令——计算机的语言(part 1) 1.1 翻译并执行程序 程序翻译层次图如下: 首先高级语言比如说C&#xff0c;会被编译器编译成汇编语言&#xff0c;然后汇…

Python面试宝典第48题:找丑数

题目 我们把只包含质因子2、3和5的数称作丑数&#xff08;Ugly Number&#xff09;。比如&#xff1a;6、8都是丑数&#xff0c;但14不是&#xff0c;因为它包含质因子7。习惯上&#xff0c;我们把1当做是第一个丑数。求按从小到大的顺序的第n个丑数。 示例 1&#xff1a; 输入…

另类动态规划

前言&#xff1a;一开始我根本想不到这个题目是一个动态规划的题目&#xff0c;而且我一开始的初始状态还写错了 我还忘记了写算法题的基本步骤&#xff0c;先看数据范围&#xff0c;再考虑能不能用动态规划写 题目地址 #include <bits/stdc.h> using namespace std; #de…