文章目录
- 1. 什么是ReentrantLock
- 2. AQS
- 2.1 CLH队列
- 3. ReentrantLock源码解析
- 3.1 非公平锁 NonfairSync
- 3.2 公平锁 FairSync
- 3.3 解锁
1. 什么是ReentrantLock
ReentrantLock
是一个互斥锁,能够实现共享数据做互斥同步,这样在同一个时刻保证了只有一个线程能执行某个方法或者是某个代码块。
同时,ReentrantLock
也是一个可重入锁,能够保证一个线程能够对临界共享资源进行重复加锁。
对于互斥锁,JDK中还提供了一种实现synchronized
,两种锁的区别如下
Synchronized | ReentrantLock | |
---|---|---|
锁实现机制 | 对象头监视器模式 | 依赖 AQS |
灵活性 | 不灵活 | 支持响应中断、超时、尝试获取锁 |
释放锁形式 | 自动释放锁 | 显示调用 unlock() |
支持锁类型 | 非公平锁 | 公平锁 & 非公平锁 |
条件队列 | 单条件队列 | 多个条件队列 |
是否支持可重入 | 支持 | 支持 |
可见,ReentrantLock
是要比 synchronized
灵活以及支持功能更丰富。
2. AQS
synchronized
的实现是靠底层的JVM,而ReentrantLock
则是靠AQS。
AQS是一个用来构建锁和同步器的抽象框架,只需要继承AQS就可以很方便的实现我们自定义的多线程同步器、锁。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer implements java.io.Serializable {
// CLH 变体队列头、尾节点
private transient volatile Node head;
private transient volatile Node tail;
// AQS 同步状态
private volatile int state;
// 独占线程(不参与序列化)
private transient Thread exclusiveOwnerThread;
// 设置当前独占的线程
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
// 返回当前独占的线程
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
// CAS 方式更新 state
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}
AQS的核心思想如下:
AQS使用一个volatile
修饰的int
类型的成员变量state
来表示同步状态,1表示其他线程持有锁,0表示锁未被其他线程持有。
之所以使用volatile
是因为需要保证state
成员变量在多线程之间保持可见性,修改state
的时候需要使用CAS机制来确保原子性。
如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS同步队列的抽象表现。它将要请求共享资源的线程及自身的等待状态封装成队列的结点对象(Node)
,通过 CAS
、自以及LockSupportpark()
的方式,维护state变量的状态,使并发达到同步的效果。
2.1 CLH队列
CLH队列是一个单链表实现的队列。申请线程只在本地变量上进行自旋,不断进行轮询前驱的状态,如果发现前驱节点释放了锁,就结束自旋。
CLH队列有以下特点:
- 单向链表,保证了FIFO先进先出的队列特性。
- 通过
tail
尾节点(原子引用)来构建队列,总是指向最后一个节点 - 未获得锁节点会进行自旋,而不是切换线程状态
- 并发时性能比较差,因为未获得锁节点会不断轮询前驱节点的状态来查看是否获取到锁。
而AQS中的队列是CLH 变体的虚拟双向队列,通过将每条请求共享资源的线程封装成一个节点来实现锁的分配
AQS 中的 CLH 变体等待队列拥有以下特点
- AQS 中队列是个双向链表,也是 FIFO 先进先出的特性
- 通过 Head、Tail 头尾两个节点来组成队列结构,通过 volatile 修饰保证可见性
- Head 指向节点为已获得锁的节点,是一个虚拟节点,节点本身不持有具体线程
- 获取不到同步状态,会将节点进行自旋获取锁,自旋一定次数失败后会将线程阻塞,相对于 CLH 队列性能较好
3. ReentrantLock源码解析
ReentrantLock
是一个独占锁,也就是说锁一次只能被一个线程持有,如果别的线程想要获取到锁,只能等待其他线程释放锁才能获取。
**与独占锁对立的则是共享锁,**共享锁则是可以被多个线程持有,如果线程T对数据A加上共享锁后,则其他线程只能对A再加共享锁,不能加排它锁。获取到共享锁后只能读数据而不能修改数据。
在ReentrantLock
中,有公平锁与非公平锁之分,默认无参创建的是非公平锁。
// 非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
// 公平锁,true代表公平锁,false代表非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
3.1 非公平锁 NonfairSync
- 非公平锁是多个线程加锁时会直接获取到锁,获取不到锁就会等待队列的队尾等待,但如果此时锁刚好可用,那么这个线程可以无需阻塞直接获取到锁。
- 非公平锁的优点是可以减少唤起线程的开销,整体的吞吐效率高,因为线程有几率不阻塞直接获得锁,CPU 不必唤醒所有线程。缺点是处于等待队列中的线程可能会饿死,或者等很久才会获得锁。
NonfairSync
继承的是ReentrantLock
类中的Sync
,而Sync
继承AQS
。
FairSync
也是继承的是ReentrantLock
类中的Sync
,而Sync
继承AQS
。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
之所以这些锁没有直接继承AQS,而是定义一个Sync
类去继承一个AQS
这样做是因为锁是面向使用用户的,而同步器是面向线程控制的,那么在锁的实现中聚合同步器而不是直接继承 AQS 就可以很好的 隔离二者所关注的事情
而非公平锁的加锁具体过程如下
接下来进行源码分析,在新建一个非公平之后,底层创建了一个NonfairSync
类
ReentrantLock lock = new ReentrantLock();
public ReentrantLock() {
sync = new NonfairSync();
}
当调用lock
方法的时候,最终会调用sync
的lock
方法
public void lock() {
sync.lock();
}
也就是其子类NonfairSync
的lock
方法
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
这里的lock
方法有两个逻辑
- 首先,通过比较并替换
state
(state
是AbstractQueuedSynchronizer
的一个成员变量),如果成功将state
设置为1表示成功获取到锁,并将当前线程设置为独占线程(这是一个插队的步骤)。 - 如果修改
state
为1失败,说明获取锁失败,则进入acquire
方法
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, StateOffset, expect, update);
}
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
主要关注一下acquire
方法,在acquire
方法中,有两个逻辑
- 首先调用模板方法
tryAcquire
再次尝试获取锁 - 如果再次获取锁失败,就会将当前线程封装程一个
Node
节点加入等待队列
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里的tryAcquire
调用的是子类NonfairSync
的tryAcquire
方法,这个方法在AQS中也有实现,默认实现为抛出异常(之所以不设置为抽象方法,是因为AQS不仅对独占锁实现了抽象,还包括共享锁,共享锁不需要tryAcquire
,也就是不需要实现tryAcquire
方法)
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
在子类NonfairSync
的tryAcquire
方法中调用了nonfairTryAcquire
方法,步骤如下:
- 首先获取当前线程ID
- 获取
state
当前的值 - 如果
state
为0,说明没有线程占用锁,则使用CAS再次获取锁,这里体现了非公平锁的特性。如果获取锁成功,则将当前线程设置为独占锁线程,此时nonfairTryAcquire
方法结束,返回true
- 否则,如果
state
为1,则先判断当前线程是否已经获取了锁线程,如果是则表现为可重入性,锁计数器+1 - 如果都不是,则说明锁被其他线程占有,获取锁失败
//再次尝试获取锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// State 等于0表示此时无锁
if (c == 0) {
// 再次使用CAS尝试获取锁, 表现为非公平锁特性
if (compareAndSetState(0, acquires)) {
// 设置线程为独占锁线程
setExclusiveOwnerThread(current);
return true;
}
// 如果当前线程等于已获取锁线程, 表现为可重入锁特性
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置 State
setState(nextc);
return true;
}
// 如果state不等于0并且独占线程不是当前线程, 返回 false
return false;
}
获取锁失败后,则就要将线程组装成Node
入队了。
Node 是 AQS 中最基本的数据结构,也是 CLH 变体队列中的节点,Node 有 SHARED(共享)、EXCLUSIVE(独占) 两种模式。
下面列出关于 Node EXCLUSIVE 模式的一些关键方法以及状态信息
关键方法与属性 | 对应含义 |
---|---|
waitStatus | 当前节点在队列中处于什么状态 |
thread | 表示节点对应的线程 |
prev | 前驱指针,指向本节点的上一个节点 |
next | 后继指针,指向本节点的下一个节点 |
predecessor | 返回前驱节点,没有的话抛出 NPE 异常 |
Node
中独占锁相关的 waitStatus
属性分别有以下几种状态
属性值 | 值含义 |
---|---|
0 | Node 被初始化后的默认值 |
CANCELLED | 值为1,由于中断或超时,节点被取消 |
SIGNAL | 值为-1,表示节点的后继节点即将被阻塞 |
CONDITION | 值为-2,表示节点在等待队列中,节点线程等待唤醒 |
入队主要是依靠addWaiter
方法,该方法步骤如下:
- 首先,将当前线程封装成一个Node类,mode是
Node.EXCLUSIVE
,代表这是一个独占锁的Node - 接着获取等待队列的尾节点
- 如果为尾节点不为null,说明这个等待队列已经被初始化了,那这样只要将node设置为尾节点
- 首先,将原本尾节点设置为
node
的前驱节点 - 接着,将
node
通过 CAS 方式设置为 AQS 等待队列的尾节点,替换成功后将原来的尾节点后继指针指向新的尾节点。
- 首先,将原本尾节点设置为
- 如果尾节点为null,说明等待队列没有被初始化,则需要调用
enq
方法完成等待队列的初始化。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 获取等待队列的尾节点
Node pred = tail;
// 如果尾节点不为空, 将 node 设置为尾节点, 并将原尾节点 next 指向 新的尾节点node
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾部为空,enq 执行
enq(node);
return node;
}
enq
方法是完成初始化等待队列的操作的,等待队列的虚拟化头节点也在这里产生
- 获取
tail
节点,也就是尾节点,再次判断尾节点是否为null- 再次判断尾节点是否为null,是因为
enq
方法是一个死循环,循环过程中 t 的值是不固定的。假如执行 enq 方法时队列为空,for 循环会执行两遍不同的处理逻辑
- 再次判断尾节点是否为null,是因为
- 如果
tail
节点为null,那么就需要虚拟化出一个新的Node
节点,这时候队列中只有一个节点,为了保证 AQS 队列结构的完整性,会将尾节点指向头节点,第一遍循环结束 - 第二次循环,
tail
节点不为null,那么就需要将node
进行入队操作,node 节点前驱指针指向尾节点,并将 node 通过 CAS 设置为新的尾节点,成功后设置原尾节点的后继指针指向 node,至此入队成功。返回的 t 无意义,只是为了终止死循环。
private Node enq(final Node node) {
for (; ; ) {
Node t = tail;
if (t == null) {
// 虚拟化一个空Node, 并将head指向空Node
if (compareAndSetHead(new Node()))
// 将尾节点等于头节点
tail = head;
} else {
// node上一条指向尾节点
node.prev = t;
// 设置node为尾节点
if (compareAndSetTail(t, node)) {
// 设置原尾节点的下一条指向node
t.next = node;
return t;
}
}
}
}
因此,可以看出,addWaiter
方法就是为了让 Node 入队,并且维护出一个双向队列模型。
回到acquire
方法,入队成功后,会返回当前线程封装后的Node
类,要执行acquireQueued
再次尝试获取锁。如果获取锁失败,则会进行线程阻塞。
acquireQueued
方法的步骤如下,acquireQueued
的返回值决定了是否阻塞:
- 获取当前线程的
Node
类的上一个节点,如果没有则会抛出异常,详情可查看predecessor
方法 - 然后,判断
node
的前驱节点是不是头节点,如果是则尝试获取锁。- 如果是头节点,并且尝试获取锁成功,则将当前的
node
设置为新的头节点,并将原头节点的后驱指针设为空。在setHead
会将node
的前驱节点设置为null,这一步是有助于垃圾回收,清空未使用的数据,然后返回false,则不会阻塞
- 如果是头节点,并且尝试获取锁成功,则将当前的
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
// 获取node上一个节点
final Node p = node.predecessor();
// 如果node为头节点 & 尝试获取锁成功
if (p == head && tryAcquire(arg)) {
// 此时当前node线程获取到了锁
// 将node设置为新的头节点
setHead(node);
// help GC
//并将原头节点的后驱指针设为空
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
前驱节点不是头节点或者尝试加锁失败,则先调用shouldParkAfterFailedAcquire
方法,流程如下
-
pred
代表了node
的前驱节点,node
代表当前线程的Node
类 -
首先,拿到
node
的前驱节点的等待状态ws
,等待状态有四种,忘记可以看看上面的表格ws == Node.SIGNAL
,表示需要将申请锁节点进行阻塞ws > 0
,表示等待队列中包含被取消节点,需要调整队列- 将
pre
设置为pre
的前驱节点,而node
设置为pre
的前驱节点,也就是过滤掉被取消的节点 - 找到前面的
pre
前面的一个不是被取消的节点,将这个节点的next
设置为node
- 将
ws <= 0
,则需要设置当前节点的前置节点等待状态为Node.SIGNAL
,表示当前节点获取锁失败,需要进行阻塞操作。
设置当前节点的前置节点等待状态为
Node.SIGNAL
,表示当前节点获取锁失败,需要进行阻塞操作有点懵的话,看一下
waitState
的各个数值的定义
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
假设当前有T1、T2线程抢夺锁
T1 线程获得锁,T2 进入 AQS 等待队列排队,并通过 CAS 将 T2 节点的前驱节点等待状态置为 SIGNAL
执行切换前驱节点等待状态后返回 false,继续进行循环尝试获取同步状态
这一步操作保证了线程能进行多次重试,尽量避免线程状态切换
如果 T1 线程没有释放锁,T2 线程第二次执行到 shouldParkAfterFailedAcquire
方法,因为前驱节点已设置为 SIGNAL,所以会直接返回 true,执行线程阻塞操作。
进行阻塞需要调用parkAndCheckInterrupt
方法
LockSupport.park
方法将当前等待队列中线程进行阻塞操作,线程执行一个从RUNNABLE
到WAITING
状态转变- 方法返回了当前线程的中断状态,并将当前线程的中断标识设置为false
private final boolean parkAndCheckInterrupt() {
// 将当前线程进行阻塞
LockSupport.park(this);
// 方法返回了当前线程的中断状态,并将当前线程的中断标识设置为false
return Thread.interrupted();
}
如果线程被唤醒,通过执行 Thread.interrupted
查看中断状态,这里的中断状态会被传递到 acquire
方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果线程被中断, 这里会再次设置中断状态
// 因为如果线程中断, 调用 Thread.interrupted 虽然会返回 true, 但是会清除线程中断状态
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
即使线程从 park
方法中唤醒后发现自己被中断了,但是不影响接下来的获取锁操作,如果需要设置线程中断来影响流程,可以使用 lockInterruptibly
获得锁,抛出检查异常 InterruptedException
。
回顾到原来的acquireQueued
方法,可以看到这里面是一个死循环,如果执行了if (p == head && tryAcquire(arg))
里面的代码,那么会将failed
设置为false
,那么finally
代码块的代码就不会执行。
所以,当线程因为自旋或者异常等情况获取锁失败,finally
代码块就会调用cancelAcquire
方法取消正在获取锁的操作
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
// 获取node上一个节点
final Node p = node.predecessor();
// 如果node为头节点 & 尝试获取锁成功
if (p == head && tryAcquire(arg)) {
// 此时当前node线程获取到了锁
// 将node设置为新的头节点
setHead(node);
// help GC
//并将原头节点的后驱指针设为空
p.next = null;
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
在cancelAcquire
传入参数为当前想要获取锁的线程封装的Node
节点
- 步骤一如果当前节点
node
为尾节点的话,则将pred
(也就是node
的前驱节点)设置为新的尾节点,成功设置后再将 pred 后继节点设置为空。
- 步骤二需要满足以下四个条件才会将前驱节点(非取消状态)的后继指针指向当前节点的后继指针
- 当前节点不等于尾节点
- 当前节点前驱节点不等于头节点
- 前驱节点的等待状态不为取消状态
- 前驱节点的拥有线程不为空
- 如果不满足步骤二的话,会执行步骤三相关逻辑,唤醒后继节点
private void cancelAcquire(Node node) {
// 不存在的节点直接返回
if (node == null)
return;
node.thread = null;
/**
* waitStatus > 0 代表节点为取消状态
* while循环会将node节点的前驱指针指向一个非取消状态的节点
* pred等于当前节点的前驱节点(非取消状态)
*/
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取过滤后的前驱节点的后继节点
Node predNext = pred.next;
// 设置node等待状态为取消状态
node.waitStatus = Node.CANCELLED;
// 🚩步骤一,如果node是尾节点,使用CAS将pred设置为新的尾节点
if (node == tail && compareAndSetTail(node, pred)) {
// 设置pred(新tail)的后驱指针为空
compareAndSetNext(pred, predNext, null);
} else {
int ws;
// 🚩步骤二,node的前驱节点pred(非取消状态)!= 头节点
if (pred != head
/**
* 1. pred等待状态等于SIGNAL
* 2. ws <= 0并且设置pred等待状态为SIGNAL
*/
&& ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
// pred中线程不为空
&& pred.thread != null) {
Node next = node.next;
/**
* 1. 当前节点的后继节点不为空
* 2. 后继节点等待状态<=0(表示非取消状态)
*/
if (next != null && next.waitStatus <= 0)
// 设置pred的后继节点设置为当前节点的后继节点
compareAndSetNext(pred, predNext, next);
} else {
// 🚩步骤三,如果当前节点为头节点或者上述条件不满足, 执行唤醒当前节点的后继节点流程
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
AQS 等待队列中取消状态节点会在 shouldParkAfterFailedAcquire
方法中被 GC 垃圾回收
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
等待队列中取消状态节点就可以被 GC 垃圾回收了,至此加锁流程也就结束了
3.2 公平锁 FairSync
- 公平锁是指多个线程按照申请锁的顺序来获取锁,线程直接进入队列中排队,队列中的第一个线程才能获得锁
- 公平锁的优点是等待锁的线程不会饿死。缺点是整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU 唤醒阻塞线程的开销比非公平锁大
在ReentrantLock
有一个公平锁的静态内部类FairSync
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
那么公平锁的底层实现和非公平锁有什么不一样呢?
首先就是lock
方法的实现,非公平锁没有了if-else说明了新来的线程没有插队的机会,所有线程都必须扔到队伍
// 非公平锁实现
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//公平锁实现
final void lock() {
acquire(1);
}
其次就是不同之处在于其tryAcquire
方法,与非公平锁的nonfairTryAcquire
方法对比,可以发现两者不同就是多了一个hasQueuedPredecessors
方法
//只有队列为空或着当前tryAcquire的node线程是head
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//拿到当前的同步状态, 如果是无锁状态, 则进行hasQueuedPredecessors方法逻辑
//逻辑含义是:当前队列为空或本身是同步队列中的头结点。如果满足条件CAS获取同步状态,并设置当前独占线程。
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//重入锁逻辑 和非公平锁一样 不解释
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
这个方法就是实现公平锁的重要方法!!!!
- 首先判断一下,
h != t
,如果是false
,说明等待队列为空,直接return false
,开始抢占锁 - 如果是
true
就要继续进一步判断,否则如果head
有next
,next
持有的线程不是当前线程,则true
,当前线程要到队尾排队 - 如果
head
有next
,next
持有的线程是当前线程,则return false
说明当前线程在队伍第一位可以抢夺锁。
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
//head没有next ----> false
//head有next,next持有的线程不是当前线程 ----> true
//head有next,next持有的线程是当前线程 ----> false
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
3.3 解锁
解锁调用的是unlock
方法,unlock
方法调用了release
方法
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先,调用tryRelease
释放锁的同步状态,tryRelease
是定义在 AQS 中的抽象方法,通过 Sync
类重写了其实现。
- 首先获取当前
state
的值,使用这个值减去releases(也就是1)
,获取到解锁后的state
也就是c
- 如果
c = 0
,说明锁的计数器为0,可以彻底释放锁 - 最后将
c
的值设置为state
的值
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 如果当前线程不等于拥有锁线程, 抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
// 将拥有锁线程设置为空
setExclusiveOwnerThread(null);
}
// 设置State状态为0, 解锁成功
setState(c);
return free;
}
释放掉锁后,需要唤醒后续节点。
唤醒后续节点,需要满足两个条件头节点head
不为空,并且头节点的等待状态不为0
当线程还在争夺锁,队列还未初始化,头节点必然是为空的。
当头节点等待状态等于0,证明后继节点还在自旋,不需要进行后继节点唤醒。
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
满足这两个条件后,就能调用unparkSuccessor
方法进行对等待队列头节点的后继节点进行唤醒操作
- 获取
node
节点的等待状态,node
指的是head
,也就是头节点 - 如果
node
的等待状态小于0,说明后继节点被阻塞或者节点在等待队列中,节点线程等待被唤醒,这时候需要将node
节点的状态设置为0 - 接着,获取
node
的后继结点 - 如果下个节点为空或者被取消(状态大于0), 遍历队列查询非取消节点,从队尾开始查找, 等待状态 <= 0 的节点
- 当满足了
s != null && s.waitStatus <= 0
,就执行unpark
,也就是唤醒操作。
这里或许有个疑问?为什么查找队列中未被取消的节点需要从尾部开始?
这个问题有两个原因可以解释,分别是 Node 入队和清理取消状态的节点
- 先从
addWaiter
入队时说起,compareAndSetTail(pred, node)、pred.next = node
并非原子操作,如果在执行pred.next = node 前进行 unparkSuccessor
,就没有办法通过next
指针向后遍历,所以才会从后向前找寻非取消的节点 cancelAcquire
方法也有导致使用head
无法遍历全部Node
的因素,因为先断开的是next
指针,prev
指针并未断开
private void unparkSuccessor(Node node) {
// 获取node等待状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取node的后继节点
Node s = node.next;
// 如果下个节点为空或者被取消, 遍历队列查询非取消节点
if (s == null || s.waitStatus > 0) {
s = null;
// 从队尾开始查找, 等待状态 <= 0 的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 满足 s != null && s.waitStatus <= 0
// 执行 unpark
if (s != null)
LockSupport.unpark(s.thread);
}
参考:
- 万字图文 | 聊一聊 ReentrantLock 和 AQS 那点事(看完不会你找我) - 掘金 (juejin.cn)
- 深入剖析ReentrantLock公平锁与非公平锁源码实现 - 掘金 (juejin.cn)
- 面试官:说一下公平锁和非公平锁的区别? - 掘金 (juejin.cn)