AQS
- 概览
- AQS官方解释
- 简单来说
- JDK17 中 AQS源码分析
- Lock 阶段
- UnLock 阶段
- 什么时候取消排队呢?
在学习阳哥的 JUC课程的时候,阳哥讲AQS用的是JDK8,我用的是JDK17,想着自己分析一下,分析完之后发现JDK17与JDK8还是有些不同的,感觉更高效了。
概览
AQS全称AbstractQueuedSynchronizer
,为实现依赖先进先出(FIFO)等待队列的阻塞锁
和相关同步器
(信号量、事件等)提供了一个框架。
AQS在JUCL包下,如图:
AQS官方解释
该类被设计为大多数类型的同步器的抽象父类,这些同步器依赖于单个原子int值
来表示状态。子类必须定义 改变这种状态 的受保护的方法,这些方法定义了该对象被获取或释放时该状态的含义。考虑到这些,这个类中的其他方法执行所有排队和阻塞机制。子类可以维护其他状态字段,但是只有使getState、setState、compareAndSetState
方法操纵的自动更新的int值才会跟踪同步。
子类应该定义为非公共的内部助手类
(通常定义成了Sync
),用于实现其外围类的同步属性。类AbstractQueuedSynchronizer
不实现任何同步接口。相反,它定义了诸如acquireInterruptibly
之类的方法,具体 锁和相关同步器 可以在适当的时候调用这些方法来实现它们的公共方法。
该类支持默认独占模式
和共享模式
。当以独占模式获取时,其他线程的尝试获取无法成功。多线程获取共享模式可能(但不需要)成功。这个类不“理解”这些差异,除非从机械意义上说,当共享模式获取成功时,下一个等待线程也必须确定它是否也可以获取。在不同模式下等待的线程共享同一个FIFO队列。通常,实现的子类只支持这些模式中的一种,但这两种模式都可以发挥作用,例如在ReadWriteLock中。仅支持独占模式或仅支持共享模式的子类不需要定义支持未使用模式的方法。
简单来说
就是AQS作为一个模板父类,提供了一个大致的办事流程,具体的细节由子类实现。比如说一个锁,如果有线程使用,当前的state就是1,表示使用中,如果又有线程使用,那么进入AQS的FIFO双端队列当中,FIFO双端队列保存的是Node节点,每个Node节点有下述属性:
volatile Node prev;
volatile Node next;
Thread waiter; // 等待线程
volatile int status; // 当前线程的等待状态
线程的等待状态
static final int WAITING = 1; // 等待状态
static final int CANCELLED = 0x80000000; // 不等待了
static final int COND = 2; // in a condition wait // 不太清楚
JDK17 中 AQS源码分析
以ReentrantLock
的 非公平锁
为例
ReentrantLock reentrantLock = new ReentrantLock(false);
try {
reentrantLock.lock();
} finally {
reentrantLock.unlock();
}
假设有三个线程A,B,C
依次获取锁
Lock 阶段
难懂的函数:
setPrevRelaxed(t)
:将调用者的pre设置成节点t。tryInitializeHead
:初始化dummy节点,当tail为空的时候调用,调用之后head、tail都指向dummy节点。casTail(ex, now)
:通过cas的方式将tail节点指向now节点。initialTryLock()
:先抢占一下锁,如果没有人用或者是可以重入,返回true,否则返回false
对于线程A来说
在lock的时候会调用helper的lock方法
public void lock() {
sync.lock();
}
在helper中的lock方法
final void lock() {
if (!initialTryLock())
acquire(1);
}
abstract boolean initialTryLock();
非公平锁的initialTryLock()
方法
final boolean initialTryLock() {
Thread current = Thread.currentThread();
// 尝试获取锁,如果没有人拥有锁,当前线程拥有锁
if (compareAndSetState(0, 1)) { // first attempt is unguarded
// 将当前线程设置为锁的拥有者
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) { // 是否是自己,因为可能是可重入的
int c = getState() + 1; // 锁冲入次数 + 1
if (c < 0) // overflow // 重入次数太多,溢出了
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else // 已经有人用了,返回false
return false;
}
经过源码分析,可以根据initialTryLock的返回值得出结论:
false
:已经有线程获取锁了,排队等候取True
:自己拥有锁,或者重入锁
线程A是第一个lock
的线程,因此方法返回true
,AQS的 state = 1。
对于线程B来说,initialTryLock
方法返回false,进入acquire(1)
方法,排队获取锁
/**
以独占模式获取,忽略中断。通过调用至少一次tryAcquire实现,成功时返回。
否则,线程将被排队,可能会反复阻塞和解除阻塞,调用tryAcquire直到成功。
*/
public final void acquire(int arg) {
// 尝试获取锁
if (!tryAcquire(arg))
// 排队获取锁
acquire(null, arg, false, false, false, 0L);
}
这里调用了tryAcquire
方法,看一下非公平锁的实现方法
protected final boolean tryAcquire(int acquires) {
// 第一个 getState()获取当前锁的状态,已经被获取了,返回false,没有的话是true
if (getState() == 0 && compareAndSetState(0, acquires)) {
// 没人用,抢占锁
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
由于A已经获得锁,因此state >= 1
,所以这次返回了false, 进入acquire(null, arg, false, false, false, 0L);
方法
// 主获取方法,由所有导出的获取方法调用。
/**
param:
node: 除非有重新获取条件,否则是null
arg: 获取参数,1表示要抢占
shared: 如果是共享模式则为true
interruptible: 如果true,中断的时候返回负数,否则中断当前线程
timed: 如果为真,则使用定时等待
time: 如果timed为true,这里表示定时多久后抢占
return:
如果获取了返回正值,如果超时了返回0,如果被中断了,返回负数
*/
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
// 获取当前线程
Thread current = Thread.currentThread();
// 取消第一个线程的停放后重试
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
// 是否中断?
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
for (;;) {
/**
满足:
1. 不是第一个节点
2. pred表示当前节点的前一个节点,前一个节点不是Null
3. 不是第一个节点
*/
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
/**
满足: 是第一个节点或者前一个是null(没有入队呢)
*/
if (first || pred == null) {
// 是否获得锁
boolean acquired;
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg); // 独占模式,再次尝试获取锁
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
// 如果当前node是空node,创建node
if (node == null) { // allocate; retry before enqueue
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // try to enqueue
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) {
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
对于线程B,B不是FIFO队列的第一个节点,并且node是空,会先创建第一个node节点。
然后再次尝试获取锁,如果没有的话初始化当前节点,waiter设置为当前线程
如果FIFO队列的tail节点是空的,说明此时队列为空,调用tryInitializeHead
方法创建一个dummy node。如下图所示:
然后再次循环到这里,Node t = tail;node.setPrevRelaxed(t);
之后,当前节点的pre指向了tail节点,然后调用casTail
方法,将当前B线程所在的node设置成tail,如果成功,将上一次的tail的next指向当前node.
再次回到循环开始的地方,当前是第一个节点,不进入第一个循环,再次尝试获取锁,这里假设还没有获取锁,将node的状态设置成WAITING
即1表示进入等待状态,只等资源释放了。
然后再次循环到spins = postSpins = (byte)((postSpins << 1) | 1);if (!timed)LockSupport.park(this);
,将自己挂起,等待唤醒。
此时 spins = 1((postSpins << 1) | 1 等价于 2 * postSpins + 1)
当线程C尝试获取锁的时候,进入acquire方法 当前节点是null,创建一个node节点,将waiter设置为当前线程。然后获取tail节点,也就是B所在的node,将当前节点指向tail。
然后将当前节点设置成tail节点,上一次的tail节点(也就是B)指向当前节点
然后回到循环的头部,第一个if条件满足了,也就是下面的满足了,但是里面的又都不满足,因此什么也不做.
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
直接到node.status = WAITING;
将当前的status也设置成1,之后进入最后一个else,park挂起一下,等待唤醒。
这就是lock的过程。
这里每次都要tryAcquire
是因为非公平的模式。
UnLock 阶段
难懂的方法
getAndUnsetStatus(status)
:将调用者的status字段设置成0
unlock会调用helper的release方法:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}
首先尝试释放,如果是最后一次重入,那么执行signalNext(head);
方法
tryRelease 方法
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 当前状态减去依次,可能重入,所以此时c可能依然大于0
if (getExclusiveOwnerThread() != Thread.currentThread()) // 无意外都是true
throw new IllegalMonitorStateException();
boolean free = (c == 0); // 重入的线程是否都出去了
if (free) // 如果是,让出锁来
setExclusiveOwnerThread(null);
setState(c); // 更新重入次数
return free; // 返回释放结果
}
signalNext 方法
private static void signalNext(Node h) {
// 后继节点
Node s;
// 当前 FIFO队列不空,并且不是初始化状态
if (h != null && (s = h.next) != null && s.status != 0) {
s.getAndUnsetStatus(WAITING);
LockSupport.unpark(s.waiter);
}
}
这个时候满足条件,会进入if代码块,将当前状态设置为0
final int getAndUnsetStatus(int v) { // for signalling
return U.getAndBitwiseAndInt(this, STATUS, ~v);
}
// ~ 按位取反
然后将当前线程unpark
此时,在acquire挂起的线程继续执行
然后执行下面的尝试抢占锁
if (first || pred == null) {
boolean acquired;
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) {
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
如果抢到了,将当前node内容置空,dummy的next指向null,当前节点的pre指向null。FIFO双链表的头节点指向当前节点,如下:
然后一步步退回到acquire, lock,方法。继续执行操作共享资源代码。
这只是正常流程。如果排队的CANCELLED , 在后面线程lock的时候,会clean掉它.
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
}
这里就暂时不看具体逻辑了
什么时候取消排队呢?
if ((interrupted |= Thread.interrupted()) && interruptible)
也就是说如果 出现超时或者线程被打断了 并且 当前是可以被打断的。或者出现异常了 进入下面方法
private int cancelAcquire(Node node, boolean interrupted,
boolean interruptible) {
if (node != null) {
node.waiter = null;
node.status = CANCELLED;
if (node.prev != null) // 如果当前不是第一个,调用一次cleanQueue清空一下队列中状态是取消的节点
cleanQueue();
}
if (interrupted) {
if (interruptible)
return CANCELLED;
else
Thread.currentThread().interrupt();
}
return 0;
}
to be continue…