文章目录
- AQS (AbstractQueuedSynchronizer^1.5+^)
- CLH 锁队列
- AbstractQueuedSynchronizer 成员变量说明
- AbstractQueuedSynchronizer.Node 源码
- CLH 队列原理图
- 入队逻辑方法
- 出队逻辑方法
- 继承 AQS 需要实现的几个方法
- AQS 对象序列化
- ReentrantLock 源码解析
- ReentrantLock Lock 加锁过程源码解析
- ReentrantLock unlock 解锁过程源码解析
- 总结
AQS (AbstractQueuedSynchronizer1.5+)
按照类名直译过来就是抽象队列同步器。用于实现依赖先进先出(FIFO)等待队列的阻塞式锁或相关的同步器。它是这些锁、同步器的基础,提供了一个整体框架实现,我们只需要实现 AQS 的几个方法就能实现自己的 ReentrantLock
CLH 锁队列
CLH(Craig, Landin, Hagersten 三位大神发明的队列,其本身是一个单向链表,但在 AQS 中是使用的双向链表实现),它是 AQS 实现等待队列的基础
AbstractQueuedSynchronizer 成员变量说明
/**
* 等待队列的头,懒初始化的,在初始化后,只能通过 setHead 方法修改
* 如果 head 节点的存在,则 head 的状态 waitStatus 不会是 Node.CANCELLED 的
* 出队,就是从 head 出
*/
private transient volatile Node head;
/**
* 等待队列的尾部,懒初始化,初始化后只能通过 enq 方法添加新的等待节点
*/
private transient volatile Node tail;
/**
* 同步器的状态
* 通过 getState、setState、compareAndSetState 方法操作
*
*/
private volatile int state;
AbstractQueuedSynchronizer.Node 源码
static final class Node {
/** 标识节点为共享模式 */
static final Node SHARED = new Node();
/** 标识节点为独占模式 */
static final Node EXCLUSIVE = null;
/** waitStatus 的值为1,标明为当前节点已取消 */
static final int CANCELLED = 1;
/** waitStatus 的值为 -1,标明当前节点的后续节点需要取消等待(调用unpark)*/
static final int SIGNAL = -1;
/** waitStatus 的值为 -2,标明后续节点在 condition 上等待*/
static final int CONDITION = -2;
/**
* waitStatus 的值为 -3,通知后续节点当前节点当前已 releaseShared
*/
static final int PROPAGATE = -3;
/**
* 状态字段,取值如下:
* SIGNAL -1: 此节点之后的节点为阻塞(park),当此节点在释放后,后续的节点应该upark
* CANCELLED 1: 此节点由于超时或中断而被取消,具有取消节点的线程永远不会再阻塞。
* CONDITION -2: 此节点当前位于条件队列中。在传输之前,它不会被用作同步队列节点,此时状态将被设置为0。
* PROPAGATE -3: 需要保证传播 releaseShared 的其他节点
* 0: 以上状态之外的值
*
* 非负数字不需要发送信号
*
* 普通同步节点初始化为 0
* 条件节点初始化为 CONDITION
*/
volatile int waitStatus;
/**
* 当前节点的前一个节点,在出队时被设置为 null
*/
volatile Node prev;
/**
* 当前节点的 next 节点
*/
volatile Node next;
/**
* 进入该节点的线程
*/
volatile Thread thread;
/**
* 下一个等待条件的节点或特殊节点 SHARED
*/
Node nextWaiter;
/**
* 判断节点是否在共享模式下等待
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回 prev 节点,如果为空,这抛出 NullPointerException
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
CLH 队列原理图
CLH 队列是双向链表的实现,只有一个地方比较特殊,就是入队时,第一个入队节点之前会加入一个没有设置节点对应的线程和mode模式的 new Node() 节点,这个主要是因为出队时的逻辑限制,CLH 队列入队和出队主要对应如下几个方法
入队逻辑方法
// 没有获得锁,当前线程加入队列排队
// 实际就是 CHL 队列的入队逻辑
private Node addWaiter(Node mode) {
// 通过当前线程和排队模式新建 Node 节点
// Node.EXCLUSIVE 为独占模式; Node.SHARED 为共享模式
Node node = new Node(Thread.currentThread(), mode);
// 当前队列的队尾元素
Node pred = tail;
// 当前队列的队尾不为 null
if (pred != null) {
// 设置当前节点的 prev(前置节点) 为 tail
node.prev = pred;
// CAS 设置 pred 为 node(就是将当前 node 设置为 tail)
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 当前队尾为 null 执行 enq 逻辑
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
// t 指向队尾元素
Node t = tail;
// 当前队尾为 null
if (t == null) { // Must initialize
// 新建一个 Node 设置为 head
if (compareAndSetHead(new Node()))
tail = head;
// 这里没有 return 会继续循环下一次
} else {
// 当前节点的 prev 指向队尾元素
node.prev = t;
// 设置 t 为 当前节点
if (compareAndSetTail(t, node)) {
// 设置 t 的 next 为当前节点
t.next = node;
return t;
}
}
}
}
出队逻辑方法
final boolean acquireQueued(final Node node, int arg) {
// 是否失败,默认为 true
boolean failed = true;
try {
// 是否被打断,默认否
boolean interrupted = false;
for (;;) {
// 获取当前节点的前驱节点(前驱节点为空将抛异常)
// 这也是为什么 CLH 队列第一个节点为 new Node() 的原因
// new Node只是一个占位节点
final Node p = node.predecessor();
// 如果前驱节点为 head 节点
// 且 tryAcquire 成功,则前驱节点出队
if (p == head && tryAcquire(arg)) {
// 设置 head 为当前节点
setHead(node);
p.next = null; // help GC
failed = false;
// 返回否 false,将不会执行 selfInterrupt()
// 即不会执行当前线程的 interrupt 方法
return interrupted;
}
// shouldParkAfterFailedAcquire 检查 prev 的 waitStatus 状态
// 为 SIGNAL 才返回 true
// parkAndCheckInterrupt() park 当前线程(开始阻塞)
// 当执行的线程 unpark 后,返回当前线程是否被打断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果出队失败,则取消掉当前节点
if (failed)
cancelAcquire(node);
}
}
出队逻辑和入队逻辑都是 AQS 类的方法。
继承 AQS 需要实现的几个方法
/**
* 尝试以独占模式获取锁
*
* 执行获取锁的线程都会调用此方法,如果此方法返回失败,则将尚未排队的线程加入队列排队,直到收到其他线程发出的释放信号。
* 此方法用于实现 Lock.tryLock() 方法
*
* 默认实现是抛出 UnsupportedOperationException 异常
*
* @param arg 原则上是可以表示任何内容的值。主要用于判断是否可以获取、以及是否需要等待等
* @return true:表示可以获取锁,false 表示无法获取锁
* @throws IllegalMonitorStateException 非法的同步器状态
* @throws UnsupportedOperationException 如果不支持独占模式
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
/**
* 尝试释放独占锁
*
* 所有的执行线程都会调用此方法来释放锁
*
*
* 默认实现是抛出 UnsupportedOperationException 异常
*
* @param arg 原则上是可以表示任何内容的值。主要用于判断是否可以获取、以及是否需要等待等
* @return true:表示锁释放成功,其他等待的线程可以尝试获取了,false 表示释放锁失败
* @throws IllegalMonitorStateException 非法的同步器状态
* @throws UnsupportedOperationException 如果不支持独占模式
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* 尝试以共享模式获取锁
*
* 执行获取锁的线程都会调用此方法,如果此方法返回失败,则将尚未排队的线程加入队列排队,直到收到其他线程发出的释放信号。
*
* 默认实现抛出:UnsupportedOperationException 异常
*
* @param arg 原则上是可以表示任何内容的值。主要用于判断是否可以获取、以及是否需要等待等
* @return 小于 0,表示在共享模式下获取失败
* 等于 0,表示在共享模式下获取成功,且没有后续的共享模式能够成功获取
* 大于 0,表示在共享模式下获取成功,且后续的共享模式获取也能够成功。在此情况下,后续的等待线程必须检查可用性。
* @throws IllegalMonitorStateException 非法的同步器状态
* @throws UnsupportedOperationException 如果不支持共享模式
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* 尝试释放共享锁
*
* 所以执行的线程都会执行此方法来释放共享锁
*
* 默认实现抛出:UnsupportedOperationException 异常
*
* @param arg 原则上是可以表示任何内容的值。主要用于判断是否可以获取、以及是否需要等待等
* @return 成功释放,且其他等待线程可以尝试获取锁,则返回 true,否则返回 false
* @throws IllegalMonitorStateException 非法的同步器状态
* @throws UnsupportedOperationException 如果不支持共享模式
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* Returns 如果当前线程持有锁,则返回 true.
* ConditionObject 的 signal 方法在调用时都会调用此方法
*
* 此方法仅在 AbstractQueuedSynchronizer 内部调用。
* 用于 ConditionObject ,如果不使用 ConditionObject 可以不用实现
*
* @return 如果当前线程持有锁,则返回 true.
* @throws UnsupportedOperationException 如果 ConditionObject 不支持
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
AQS 对象序列化
AQS 的序列化只存储底层的原子整数维护状态,因此反序列化的对象具有空线程队列。所以我们需要定义一个 readObject 方法,该方法在反序列化时将其恢复到已知的初始状态。
ReentrantLock 源码解析
ReentrantLock 涉及到的源码继承关系图
这里的红色线都表示为静态内部类,ReentrantLock 有 Sync 和 FairSync、NonFairSync 三个静态内部类,AQS 有 Node 和 ConditionObject 两个静态内部类。
ReentrantLock Lock 加锁过程源码解析
注:这里是使用的 new ReentrantLock() 方式,其实就是 NonfairSync(非公平锁的方式)
看不清可以点击大图查看,也可以通过本文最上面的链接下载图片来查看,由于平台限制(最5M)我本地画的高清图片无法上传。
ReentrantLock unlock 解锁过程源码解析
关于源码的解析,要写成文章,真是太难了,这里也只是把相关的逻辑做成图片的形式更方便梳理其中的逻辑,图中也将相关的源码方法都贴了上去,并做了相关的逻辑解释,这些解释并不一定准确,希望对大家有所帮助
总结
这里我们只分析了 ReentrantLock 的 lock、unlock 方法的源码分析,ReentrantLock 的其他方法以及 ReentrantReadWriteLock、CountDownLatch、Semaphore 等实现都与我们分析的这些差不太多,我们就不再一一的做解析了,这里我们先总结一下,源码分析的过程与方法,最后再总结一下结论吧。
- CLH 队列由双向链表实现
- CLH 队列的元素第一个 head 节点为 new Node()
- CLH 队列的每个元素包含要执行的线程 thread、锁的共享方式 mode、前驱节点 prev、后继节点 next、状态 waitStatus
- AQS 底层实现等待的方法是 LockSupport.park 和 LockSupport.unpark
建议大家最好是自己通过 IDE debug 走一走 lock、unlock 或者 tryLock 等方法的源码,这样才能更有印象。