AbstractQueuedSynchronizer
- 1、AbstractQueuedSynchronizer概述
- 2、AbstractQueuedSynchronizer源码分析
- 2.1 AQS源码
- 2.2 Node类
如有侵权,请联系~
如有问题,也欢迎批评指正~
1、AbstractQueuedSynchronizer概述
AbstractQueuedSynchronizer(抽象队列同步器,简称 AQS)是 Java 并发包中的一个重要类,它为实现基于队列的同步器提供了一个框架。AQS 主要用于构建各种同步机制,比如锁、信号量、读写锁等。它通过一个 FIFO 队列(先进先出)来管理线程的获取和释放同步状态(如锁定状态),从而有效地处理多线程之间的竞争。
JUC包下的lock锁子包,以及JUC下的阻塞队列ArrayBlockingQueue、CountDownLatch等等这些常用的类都是基于AQS实现的。
2、AbstractQueuedSynchronizer源码分析
2.1 AQS源码
AQS使用的管程模型,本篇文章作为JUC-locks锁的基础补充。首先先看下AQS的架构图:
AQS的源码注释【AQS主要是一个队列(线程阻塞队列,等待资源)和一个共用争取资源状态(state),方法都是围绕着阻塞队列以及资源的】:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 上述的阻塞队列的头
private transient volatile Node head; // 阻塞队列头,每个阻塞线程都被封装成node节点
// 阻塞队列尾
private transient volatile Node tail;
// 表示争夺资源目前的状态。
// 值为0:表示该资源空闲
// 值大于0:表示被n个线程所持有【例如:CountDownLatch】或者一个线程被持有n次【例如:可重入锁】
private volatile int state;
// 当前持有该资源的线程,这个属性来自于父类AbstractOwnableSynchronizer
private transient Thread exclusiveOwnerThread;
// 获取资源 addWaiter(Node.EXCLUSIVE), arg)将当前线程放在阻塞队列中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 当前线程会判断当前是不是阻塞队列的第一个并且是否能获取到资源。如果能够获取到资源则返回,如果获取不到则等待。
// 并且在等待过程中是否被中断也会将作为结果返回
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 使用UnSafe类的CAS+自旋 将node节点加入到 AQS队列的末尾
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 方法中带tryXXX的都会响应中断,不带的不响应中断
// 这个方法是获取对象的锁,争夺资源。所以这个交给实现类进行操作。例如公平锁和非公平锁就是通过这个方法实现体现的
// arg一般是获取资源的数量,这个方法尝试非阻塞地获取资源,如果成功就立即返回true,如果失败就立即返回false,不会将线程加入到等待队列,也不会使线程阻塞。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 这个方法尝试在给定的时间内获取资源,如果在指定的时间内成功获取到资源就立即返回true,如果在指定的时间内没有获取到资源就返回false。这个方法也是阻塞的,如果在阻塞状态下被中断,它会响应中断,抛出InterruptedException,并返回false。
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE); // 添加到阻塞队列队尾
boolean failed = true;
try { // 如果等待时间超过spinForTimeoutThreshold,则线程挂起等待被唤醒;否则在这for自旋进行判断。这样剩去了线程切换的时间
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //如果这个线程在对头并且尝试一下并获取到资源,直接返回
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) // 到时间还没获取到资源,失败
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold) //spinForTimeoutThreshold=1000
LockSupport.parkNanos(this, nanosTimeout); // 如果等待时间比较久,那就使该线程进行阻塞。当等待时间超过spinForTimeoutThreshold时,该线程挂起等待超时被唤醒
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// AQS尝试获取和尝试释放都未实现,等待子类实现
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
}
2.2 Node类
Node类对应于上述中阻塞队列中的node。这里比较重要的是每个node的状态标识:
static final class Node {
static final Node SHARED = new Node(); // 表示这个node(线程)对资源是共享的还是需要独占的
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1; // 该node的线程被取消了(可能超时或者中断导致的)
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1; // 后继的node中的线程需要被其唤醒
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2; // 当前node节点在条件队列中
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3; // 共享模式下的节点释放资源后应该将信息传播给其他节点
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus; // node中线程的状态,取值为上面的值
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev; // 前一个node节点指针
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next; // 后一个node的指针
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread; // 等待资源的线程
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter; // 指向下一个条件等待的node。所以,猜想一个等待队列和一个条件等待队列
}