文章目录
- 基本使用
- 源码分析
- await
- countDown
- getCount
- 可重置的CountDownLatch
- 总结
CountDownLatch是一个多线程同步工具类,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。主要用来解决一个线程或多个线程等待另外多个线程的场景。
初始化需要指定一个计数器,表示需要等待操作完成的操作数量。由于调用了countDown方法,wait方法会一直阻塞,直到当前计数达到零,之后所有等待的线程都会被释放,任何后续的wait调用都会立即返回。这是一种一次性现象,计数无法重置。如果需要一个重置计数的版本,可以考虑使用CyclicBarrier。
java.util.concurrent.CountDownLatch
没有继承和实现其他的类或者接口,同时只暴露了下面几个方法:
- void await() throws InterruptedException
- boolean await(long timeout, TimeUnit unit) throws InterruptedException
- void countDown()
- long getCount()
基本使用
CountDownLatch 是 Java 中一个非常有用的同步工具,通常用于等待一组线程完成某些操作。以下是一个简单的使用实例,演示如何使用 CountDownLatch 来等待多个线程完成任务。
import java.util.concurrent.CountDownLatch;
public class CountDownLatchExample {
public static void main(String[] args) {
// 创建一个 CountDownLatch,计数为3,参数为需要等待的线程数量
CountDownLatch latch = new CountDownLatch(3);
// 创建并启动三个线程
for (int i = 1; i <= 3; i++) {
final int threadNumber = i;
new Thread(() -> {
try {
// 模拟任务执行
System.out.println("线程 " + threadNumber + " 正在执行任务...");
// 随机休眠
Thread.sleep((long) (Math.random() * 1000));
System.out.println("线程 " + threadNumber + " 完成任务.");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 每个线程在完成任务后调用 latch.countDown(),使计数器减1
latch.countDown();
}
}).start();
}
try {
// 主线程等待,直到计数器减为0
latch.await();
System.out.println("所有线程已完成任务,主线程继续执行...");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
源码分析
CountDownLatch基于AQS实现,源码很短,大部分在AQS中已经实现了
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
await
await会调用AQS的获取资源的方法,CountDownLatch调用await是会阻塞的,说明获取资源会失败,原因就在于Sync中的tryAcquireShared方法
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
超时版本
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
Sync类重写了AQS的tryAcquireShared
方法,如果当前计数不为0则返回-1,表示获取资源失败,然后被阻塞,当调用countDown时会将计数器减少1,同时唤醒阻塞的线程判断
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
countDown
通过AQS释放资源,将AQS维护的资源数量减少1
public void countDown() {
sync.releaseShared(1);
}
getCount
这个方法很简单,就是获取AQS的状态值
public long getCount() {
return sync.getCount();
}
可重置的CountDownLatch
在RocketMQ源码中有实现,其实就是调用AQS的setState方法重新设置资源数量
private static final class Sync extends AbstractQueuedSynchronizer {
private final int startCount;
Sync(int count) {
setState(count);
}
// 重置状态
protected void reset() {
setState(startCount);
}
}
总结
- CountDownLatch工作在共享模式,和Semaphore一样
- 可中断
- 只能使用一次,计数器不可重置