介绍
AQS 的全称为 AbstractQueuedSynchronizer
,就是抽象队列同步器。
从源码上可以看到AQS 就是一个抽象类,它继承了AbstractOwnableSynchronizer,实现了java.io.Serializable接口。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable{}
它主要用来构建锁和同步器,为构建锁和同步器提供了一些通用功能的实现,因此,使用 AQS 能简单且高效地构造出应用广泛的大量同步器,比如 ReentrantLock
,Semaphore,
ReentrantReadWriteLock
,SynchronousQueue
等。
核心思想
AQS 核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制 AQS 是基于 CLH 锁 (Craig, Landin, and Hagersten locks) 实现的。
CLH 锁是对自旋锁的一种改进,是一个虚拟的双向队列(虚拟的双向队列即不存在队列实例,仅存在结点之间的关联关系),暂时获取不到锁的线程将被加入到该队列中。AQS 将每条请求共享资源的线程封装成一个 CLH 队列锁的一个结点(Node)来实现锁的分配。在 CLH 队列锁中,一个节点表示一个线程,它保存着线程的引用(thread)、 当前节点在队列中的状态(waitStatus)、前驱节点(prev)、后继节点(next)。
从源码中可以看到Node具有以下几个属性:
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
并且设置了一个 头节点和一个尾节点
private transient volatile Node head;
private transient volatile Node tail;
对应CLH队列的结构如下:
CLH的优点:
-
性能优异,获取和释放锁开销小。CLH 的锁状态不再是单一的原子变量,而是分散在每个节点的状态中,降低了自旋锁在竞争激烈时频繁同步的开销。在释放锁的开销也因为不需要使用 CAS 指令而降低了。
-
公平锁。先入队的线程会先得到锁。
-
实现简单,易于理解。
-
扩展性强。下面会提到 AQS 如何扩展 CLH 锁实现了 j.u.c 包下各类丰富的同步器。
CLH还有两个缺点
- 有自旋操作,当锁持有时间长时会带来较大的 CPU 开销。
- 基本的 CLH 锁功能单一,不改造不能支持复杂的功能。
针对 CLH 的缺点,AQS 对 CLH 队列锁进行了一定的改造。针对第一个缺点,AQS 将自旋操作改为阻塞线程操作。针对第二个缺点,AQS 对 CLH 锁进行改造和扩展
如何改造的这里不多做解释,想了解具体的信息,极力推荐大佬的一篇文章
Java AQS 核心数据结构-CLH 锁
回来继续来谈AQS
AQS中 使用 int类型变量 state
表示同步状态,通过内置的 FIFO 线程等待/等待队列 来完成获取资源线程的排队工作。state
变量由 volatile
修饰,用于展示当前临界资源的获锁情况。
状态信息 state
可以通过 protected
类型的getState()
、setState()
和compareAndSetState()
进行操作。并且,这几个方法都是 final
修饰的,在子类中无法被重写。
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
state
变量可以被用来表达以下几种情况:
独占模式下的状态:
当 state
为 0 时,表示同步器当前没有被任何线程所持有,可以被任意线程获取。
当 state
大于 0 时,表示有一个线程已经获取了同步器,并且可以是同一个线程多次获取,例如 ReentrantLock
中的重入
共享模式下的状态:
对于共享模式,state
的含义取决于具体实现,例如在 CountDownLatch
中,state
表示还需要等待的线程数,而在 Semaphore
中,state
表示当前可用的许可数。
举个例子
以可重入的互斥锁 ReentrantLock
为例,它的内部维护了一个 state
变量,用来表示锁的占用状态。state
的初始值为 0,表示锁处于未锁定状态。当线程 A 调用 lock()
方法时,会尝试通过 tryAcquire()
方法独占该锁,并让 state
的值加 1。如果成功了,那么线程 A 就获取到了锁。如果失败了,那么线程 A 就会被加入到一个等待队列(CLH 队列)中,直到其他线程释放该锁。假设线程 A 获取锁成功了,释放锁之前,A 线程自己是可以重复获取此锁的(state
会累加)。这就是可重入性的体现:一个线程可以多次获取同一个锁而不会被阻塞。但是,这也意味着,一个线程必须释放与获取的次数相同的锁,才能让 state
的值回到 0,也就是让锁恢复到未锁定状态。只有这样,其他等待的线程才能有机会获取该锁。
源码解析
上面已经看到了aqs部分的源码,在这里再次详细解释一下
AQS 主要有三个属性,有两个Node节点,分别是 head表示待队列的头结点 ,tail表示等待队列的尾节点
还有个state示同步状态。
在node节点中有如下几个属性
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
其中上面几个都是final修饰,是不可改变的。
下面的waitStatus可以取上面的几个值分别表示不同的涵义。
- 当waitStatus为0时,为node节点刚被创建时的初始值
- 为CANCELLED,也就是为1时,表示线程获取锁的请求已经取消了
- 为SIGNAL也就是-1时,表示此节点后面的节点被阻塞(park),避免竞争,资源浪费,此节点释放后通知后面的节点
- 为CONDITION也就是-2时,表示节点在等待队列中,等待被唤醒
- 为PROPAGATE也就是-3时,表示当前线程处于SHARED状态,表示锁的下一次获取可以无条件传播
lock方法
acquire()
上锁时主要就是在调用acquire方法,该方法中调用了很多其他的方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先会调用tryAcquire()尝试直接去获取资源,如果成功则直接返回
addWaiter()方法说将该线程加入等待队列的尾部,并标记为独占模式
acquireQueued()方法使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
然后我们依次来看一下这些方法
首先是tryAcquire()方法
tryAcquire()
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
调用了另一个方法nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//再次尝试抢占锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 重入锁的情况
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// false 表示抢占失败
return false;
}
首先会先判断锁状态,如果未被持有,则尝试抢占,否则进行重入锁。
addWaiter()
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
首先会先按传参的模式来构建节点,然后直接插入到队列尾部,如果失败了就调用enq方法入队。
再来看enq方法
enq()
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如果此时队列为空,则创建一个头节点,否则正常插入
acquireQueued()
经过上面的代码,如果可以运行到这里,那证明此时线程获取获取资源已经失败,进入了等待队列休息,然后在等待队列中就要等待被唤醒。
方法流程如下:
先标记了是否已经拿到了资源,标记等待中是否被打断过,随后进入自旋
自旋中先拿到前驱节点,如果前驱节点就是头节点,那证明自己有资格去竞争资源,获取中让head指向自己,然后标记成功获取资源并返回等待中是否被打断过。
如果前驱节点不是头节点,那就让自己休息进入等待状态。
最后如果等待过程中没有获取到资源则取消等待
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;、
try {
boolean interrupted = false;、
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
然后我们再来看shouldParkAfterFailedAcquire()方法和parkAndCheckInterrupt()方法
shouldParkAfterFailedAcquire()
首先拿到前驱节点的状态,如果此前已经获取过了前驱状态为SIGNAL,则自己进入休息即可。
如果前驱节点被放弃了那就一直向前寻找,直到找到正常的节点,找到了将其设为SIGNAL,后面通知自己,让自己休息。
因此,一个正常前驱状态为SIGNAL则自己就可以安心的休息,如果不是则一直向前寻找,直到找到正常的,让自己可以安心休息。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//拿到前驱的状态
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt()
这里面很简单,就是让线程去休息
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//调用park()使线程进入waiting状态
return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的。
}
截至到这里整个获取流程执行完毕。
unlock方法
看完获取锁,接着看解锁
解锁过程比较简单
首先时release方法
release()
它会释放指定量的资源,如果彻底释放了(即state=0),他会找到头节点,然后去唤醒等待队列中的下一个正在等待的线程。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease()
这个方法在尝试释放一定量的资源
首先让state减一,如果当前线程不是持有锁的线程则抛出异常
如果减完为0则表示已经释放了,最后更改掉state的值
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}