文章目录
- 1、CountDownLatch介绍
- 1.1、功能介绍
- 1.2、demo
- 1.3、问题
- 2、前置知识
- 2.1、AQS整体结构
- 2.1.1、整体结构
- 2.1.2、state属性
- 2.1.3、head和tail属性
- 3、CountDownLatchAPI源码解析
- 3.1、countDown方法
- 3.1.1、Sync类
- 3.1.2、releaseShared方法
- 3.1.3、tryReleaseShared方法
- 3.2、await方法
- 3.2.1、tryAcquireShared方法
- 3.2.2、doAcquireSharedInterruptibly方法
- 3.2.2.1、addWaiter方法。
- 3.2.2.2、setHeadAndPropagate方法
- 3.2.2.3、shouldParkAfterFailedAcquire方法
- 3.2.2.4、parkAndCheckInterrupt方法
- 3.2.2.5、cancelAcquire方法
- 3.3、doReleaseShared方法
- 4、总结
- 5、参考资料
1、CountDownLatch介绍
1.1、功能介绍
CountDownLatch,是一个同步器,常见的使用场景是:
我们在主线程中需要执行一些异步任务,但是在全部的异步任务都执行完成前,主线程需要阻塞等待,等待异步任务全部执行完成后,主线程才能响应客户端。
1.2、demo
基于以上场景,搞了一个demo。
下面的这个demo。总体上分为了三个小步骤,代表了CountDownLatch的三个核心API
1)、初始化一个CountDownLatch。构造器中传入了10,代表这个CountDownLatch需要执行10次countDown方法。
2)、然后在一个循环中提交异步任务,总共提交10个异步任务,每个异步任务执行一次countDown方法。
3)、循环执行完成后,执行CountDownLatch.await方法
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5,
10,
60L,
TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
new ThreadPoolExecutor.CallerRunsPolicy());
@Test
public void testDel(){
//初始化一个CountDownLatch
CountDownLatch countDownLatch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
//给线程池提交异步任务,总共提交10个
threadPoolExecutor.execute(() -> {
try {
System.out.println("执行一个异步任务");
} catch (Exception e) {
e.printStackTrace();
}finally {
//执行countDownLatch.countDown(),该方法一定要写在finally中,确保无论什么情况,都能执行到
countDownLatch.countDown();
}
});
}
try {
System.out.println("主线程等待");
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
1.3、问题
使用起来很简单,也就是三步。我有3个问题
1)、countDown方法执行后,在做什么操作?
2)、await方法执行后,countDown方法全部执行完成前,流程阻塞在了await处。CountDownLatch怎么实现的阻塞呢?
3)、countDown方法全部执行完成后,await方法被唤醒,继续执行下面的业务逻辑,怎么唤醒的呢?
这里多说一句。看源码,最好能带着问题看,这样看起来,不会太枯燥,因为你要搞明白一些问题。
说回文章。我们下面就围绕我的这3个问题来展开。如果你还有其他问题,看懂我的3个问题后,可以自己再研究一下源码
2、前置知识
CountDownLatch是借助AQS实现的,大家应该都听说过AQS。其实不光CountDownLatch,像我们常使用的ReenTrantLock、Semaphore等同步器类也都是借助AQS实现的。所以在介绍CountDownLatch前,需要先介绍AQS。
AQS全称是:AbstractQueuedSynchronizer
先从类名上剖析一下这个类。
Abstract:抽象的。说明内部有一些方法需要子类实现
Queued:队列。内部实现依赖了队列
Synchronizer:同步器。这个类是用来设计同步器的
2.1、AQS整体结构
2.1.1、整体结构
AQS抽象类的属性及主要方法,如下图(原图链接)
带颜色的都是方法,不带颜色的都是属性。
从上到下,难度由浅入深。像CountDownLatch相关的API就是和AQS的API层进行交互。从图上也能看出来,AQS内容很多,为了避免篇幅过大,我们只介绍和CountDownLatch有关系的。其他方法或属性,感兴趣可以自己研究下
AQS中和CountDownLatch有关系的属性如下:
private volatile int state;
private transient volatile Node head;
private transient volatile Node tail;
2.1.2、state属性
state属性是AQS中的许可或者说资源,不管是CountDownLatch,还是ReentrantLock或者Semaphore,本质上都是在操作这个state属性。我们以CountDownLatch举例。
初始化时,new CountDownLatch(10),就是将state的值设置为了10。看一下源码
//CountDownLatch构造器
public CountDownLatch(int count) {
if (count < 0){
throw new IllegalArgumentException("count < 0");
}
this.sync = new Sync(count);
}
//Sync构造器
Sync(int count) {
setState(count);
}
//setState方法
protected final void setState(int newState) {
state = newState;
}
countDown方法,最终会对state值减一
await方法,监控state值的变化,当变成0时,阻塞结束
这两个方法的源码,下面会详细的解析,这里就不细说了。
2.1.3、head和tail属性
看命名,就知道这两个属性是链表上的头节点和尾节点,初始值为null。数据类型是Node。说明,AQS内部存在一条Node链表
看一下Node的属性
static final class Node {
......
//共享模式的Node
static final Node SHARED = new Node();
//独占模式的Node
static final Node EXCLUSIVE = null;
//节点的等待状态,初始值是0
volatile int waitStatus;
//前置节点
volatile Node prev;
//后置节点
volatile Node next;
//节点代表的线程。每一个需要阻塞的线程,都会被封装成一个Node
volatile Thread thread;
......
}
那Node链表又长什么样子呢?
就长这个样子
其中的waitStatus值很重要,关于Node链表,AQS基本就是围绕Node节点的waitStatus在做文章。除了初始化值0之外,还有另外4个取值
//节点处于取消状态。这个值是大于0的,另外的3个值都是小于0的。记一下这个点,源码中会经常遇到
static final int CANCELLED = 1;
//节点处于被唤醒状态。
static final int SIGNAL = -1;
//节点处于条件状态,这个CONDITION一般在CONDITION队列中使用。关于CountDownLatch,我们主要是讨论Sync队列
static final int CONDITION = -2;
//对于共享模式的Node来说,这个状态代表了传播
static final int PROPAGATE = -3;
AQS重要属性说完了。下面开始介绍CountDownLatch的重要API源码实现
3、CountDownLatchAPI源码解析
3.1、countDown方法
public void countDown() {
sync.releaseShared(1);
}
这里调用了Sync类的releaseShard方法,接下来,看一下Sync类的实现。
3.1.1、Sync类
Sync类定义如下:
可以看到Sync类继承了AQS,所以Sync类是连接CountDownLatch和AQS的桥梁。
private static final class Sync extends AbstractQueuedSynchronizer {
......
//这个构造方法,我们在上面已经见过了,new CountDownLatch(10),设置CountDownLatch的许可数量时,就会使用到这个构造器。
Sync(int count) {
setState(count);
}
//获取state
int getCount() {
return getState();
}
//判断AQS中state的值状态。这是重写的AQS的方法
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
......
}
3.1.2、releaseShared方法
在Sync类中,我们并没有看到releaseShared方法,说明releaseShared是父类AQS的方法。
我们进入AQS看一下releaseShard方法的逻辑
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
先执行了tryReleaseShared方法,如果返回true,再执行doReleaseShared方法。先看tryReleaseShared方法
3.1.3、tryReleaseShared方法
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
原方法是抛出异常,说明这个方法需要子类重新实现,这就又回到了CountDownLatch类的Sync类中,Sync类中实现了tryReleaseShared方法,从方法名中能看出,这是共享模式释放。啥是共享模式呢?这里简单提一句,共享模式的对立面是独占模式。类似ReentrantLock,加锁操作就是独占模式,同一时刻,只能有一个线程加锁成功。共享模式,就是多个线程可以同时执行,对于CountDownLatch来说,多个线程可以同时执行countDown操作,所以countDown操作是共享模式。内部相应的方法命名时就会添加shared标识。比如此处的tryReleaseShared方法
看一下tryReleaseShared的实现
protected boolean tryReleaseShared(int releases) {
//开启死循环
for (;;) {
//获取state数量
int c = getState();
if (c == 0)
//返回失败
return false;
//如果state数量不是0,state减一
int nextc = c-1;
//CAS设置新的state值
if (compareAndSetState(c, nextc))
//如果state执行减一操作后值为0,说明异步线程全部执行countDown完成,此时可以唤醒主线程
return nextc == 0;
}
}
这个方法最终就是对state执行减一操作。
tryReleaseShared执行完成,如果返回结果是true,会回到releaseShared方法中,然后就会执行doReleaseShared方法。
这里停一下,我们先不看doReleaseShared方法实现。先回到我们的CountDownLatch使用场景上。我们知道CountDownLatch.countDown一般是子线程执行,子线程全部执行countDown完成前,主线程是阻塞在countDownLatch.await处的,等待CountDownLatch.countDown全部执行完成,然后才会继续执行。
所以在执行doReleaseShared方法前,一般会先执行countDownLatch.await方法。所以,我们先看countDownLatch.await的实现逻辑,然后再看doReleaseShared方法逻辑。
3.2、await方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
await方法抛出了InterruptedException,说明这个方法是能响应线程中断的。在await方法内部执行了Sync类的acquireSharedInterruptibly方法
这个方法,Sync类中并没有,所以是父类AQS中的。我们到AQS中看一下这个方法的逻辑
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//检查当前线程是否被中断。如果被中断,抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
先是判断了线程是否被中断,如果被中断,会直接抛出中断异常
如果线程没有被中断,然后会执行tryAcquireShared。我们看一下tryAcquireShared的方法实现
3.2.1、tryAcquireShared方法
tryAcquireShared实现如下:
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
可以看到,AQS中的tryAcquireShared方法逻辑是直接抛出了UnsupportedOperationException异常(不支持的操作),说明该方法是需要子类重写的。我们回到CountDownLatch的Sync类中看下Sync类重写后的方法实现
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
可以看到,就是判断state的值。如果getState == 0的话,就返回1,此时代表CountDownLatch.countDown已经全部执行完成。如果getState != 0的话,就返回-1,代表CountDownLatch.countDown未全部执行完成。
tryAcquireShared,如果返回-1的话,接着就会执行doAcquireSharedInterruptibly方法
3.2.2、doAcquireSharedInterruptibly方法
这是CountDownLatch实现await方法的关键逻辑
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//将当前线程以共享模式加入Node链表中
final Node node = addWaiter(Node.SHARED);
//获取锁的结果,初始化值为true,代表获取锁失败
boolean failed = true;
try {
//开启一个死循环,这就实现了CountDownLatch.await阻塞的效果。
for (;;) {
//获取当前节点的前置节点
final Node p = node.predecessor();
//如果前置节点是头节点,再次执行tryAcquireShared方法判断state的值
if (p == head) {
//获取state的状态
int r = tryAcquireShared(arg);
//如果返回值r >= 0代表state是0,此时说明CountDownLatch.countDown已经执行完成,此时会结束死循环,主线程继续往下执行
if (r >= 0) {
//将当前节点设置为头节点并且进行传播
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//1、shouldParkAfterFailedAcquire判断是否需要执行线程阻塞。
//2、parkAndCheckInterrupt是阻塞当前线程,避免一直获取不到锁,浪费cpu
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
//获取锁失败的话,执行cancelAcquire方法
cancelAcquire(node);
}
}
doAcquireSharedInterruptibly方法中有很多逻辑,这个方法的主要作用是:
开启死循环,线程不断地获取锁。如果线程满足阻塞条件的话,就对该线程执行阻塞操作,然后休眠等待被唤醒,避免浪费cpu。被唤醒后,继续获取锁。获取锁成功,就退出死循环。
下面挨个分析其中的方法
3.2.2.1、addWaiter方法。
看这个方法名,也能知道这是添加一个等待者,也就是我们执行了CountDownLatch.await方法的主线程
private Node addWaiter(Node mode) {
//创建一个Node,Node的线程是当前线程,节点模式是共享模式
Node node = new Node(Thread.currentThread(), mode);
//获取Node链表的尾指针
Node pred = tail;
//如果尾指针不为空,说明Node链表已经被初始化。将当前Node节点入队
if (pred != null) {
//获取node节点的前向指针
node.prev = pred;
//将当前节点设置为尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果尾指针为空,说明Node链表未被初始化,需要先初始化Node链表,再将当前Node节点入队
enq(node);
return node;
}
//enq方法逻辑
private Node enq(final Node node) {
//开启死循环
for (;;) {
//获取Node链表尾节点
Node t = tail;
//如果尾节点为空,说明Node链表未被初始化,需要初始化
if (t == null) {
//初始化一个Node节点,利用CAS将其设置为头节点
if (compareAndSetHead(new Node()))
//tail节点指向head节点
tail = head;
} else {
//当前Node节点前向指针指向尾部节点
node.prev = t;
//将当前节点设置为尾节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
这里总结一下,addWaiter方法就是将执行CountDownLatch.await的主线程加入到等待链表中
3.2.2.2、setHeadAndPropagate方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
//将当前节点设置为头节点
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
//取当前节点的下一个节点,对于我们这个场景来说,只有一个节点,没有下一个节点,所以下一个节点是null
Node s = node.next;
//如果下一个节点是null,执行doReleaseShared方法
if (s == null || s.isShared())
doReleaseShared();
}
doReleaseShared方法的逻辑如下
private void doReleaseShared() {
//开启一个死循环
for (;;) {
//取头节点。头节点就是我们的节点
Node h = head;
//头结点不为空 && 头结点不等于尾节点。这说明Node链表中间有其他元素
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
在CountDownLatch的流程中,只有一个主线程,并且在setHeadAndPropagate方法中,先执行了setHead(node)方法,将当前节点设置为头节点,所以此时的头节点和尾节点是相同的。所以h != null && h != tail结果为假,doReleaseShared方法会直接返回
执行完setHeadAndPropagate方法,下面就该执行shouldParkAfterFailedAcquire和parkAndCheckInterrupt了。这两个方法,前一个是判断能否执行阻塞,后一个是执行线程阻塞操作的。
我们看一下shouldParkAfterFailedAcquire的实现逻辑。
3.2.2.3、shouldParkAfterFailedAcquire方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//如果前置节点状态是SIGNAL,此时返回true,说明需要阻塞当前节点
if (ws == Node.SIGNAL)
return true;
//waitStatus > 0,说明当前节点的前置节点是取消状态
if (ws > 0) {
do {
//向前遍历,删除已取消节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//找到不是取消节点的节点,将当前节点放入该节点后
pred.next = node;
} else {
//将当前节点的waitStatus置为SIGNAL状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
3.2.2.4、parkAndCheckInterrupt方法
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
逻辑里,调用了LockSupport.park方法,阻塞当前线程,然后返回了线程的中断状态
为什么要返回中断状态呢?
因为LockSupport.park方法是能响应线程中断的,但是响应完中断后,并不会抛出异常,那这个时候就会有一个问题。我们怎么知道park方法是怎么被唤醒的呢?
是CountDownLatch.countDown执行完成后,正常被唤醒的。
还是执行CountDownLatch.await方法的线程执行了中断唤醒的。
区分不出来吧?所以此处要执行Thread.interrupted(),如果线程执行了中断,此时会Thread.interrupted()会返回true。以此就可以判断LockSupport.park的唤醒原因
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
如果线程发生了中断,此时就会抛出中断异常
3.2.2.5、cancelAcquire方法
抛出中断异常后,doAcquireSharedInterruptibly方法中的finally代码块就会开始执行。
如果failed值为true的话,此时会执行cancelAcquire方法
if (failed)
//获取锁失败的话,执行cancelAcquire方法
cancelAcquire(node);
从方法名上看,cancelAcquire方法是取消获取的意思。意思就是取消获取锁,说明节点对应的thread不再竞争锁。在这个方法中,会将Node节点的waitStatus值置为CANCELLED。这个方法和我们要研究的CountDownLatch流程没有什么关系,就不看了
以上,就是await方法的流程。
3.3、doReleaseShared方法
我们在介绍countDown方法的时候说过,有一个方法等介绍完await方法再进行介绍,还有印象吧?就是在介绍countDown方法逻辑时,tryReleaseShared方法执行后,应该执行doReleaseShared方法。再看一下countDown流程中的这段源码
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared方法执行成功后,会执行doReleaseShared方法
看一下doReleaseShared方法的实现
private void doReleaseShared() {
//开启一个死循环
for (;;) {
//获取头节点
Node h = head;
//头结点不等于空 && 头节点不等于尾节点
if (h != null && h != tail) {
//获取头结点的waitStatus值
int ws = h.waitStatus;
//如果头节点状态是SIGNAL
if (ws == Node.SIGNAL) {
//将头结点状态改成0,初始状态
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//唤醒后面节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
这个方法,对于CountDownLatch的流程来说,其实可以不用关注。为什么这样说呢?
对于CountDownLatch来说,Node链表上其实只有一个节点。并且在执行
setHeadAndPropagate方法时,将当前主线程代表的Node设置为了头节点,所以头尾是相同的。在执行h != null && h != tail判断条件时,就不满足判断条件了,所以在执行h == head判断条件时,结果就会为true,方法就直接返回了
4、总结
以上就是CountDownLatch核心API的源码解析。
我画了一张图,我们从整体上看一下CountDownLatch的操作流程图
CountDownLatch本身的API不多,也都很简单。但是因为CountDownLatch是借助AQS框架实现的,所以会涉及到一些AQS的逻辑,稍微复杂一点。
5、参考资料
1)、https://tech.meituan.com/2019/12/05/aqs-theory-and-apply.html。这是美团技术团队写的一篇文章,我最开始看不懂AQS源码的时候,搜到了这篇文章,连续读了3遍,然后对AQS的整体技术脉络有了一个大体的认识,后面再看CountDownLatch相关的代码就清楚多了。推荐大家也读一下。