1 简介
在多线程编程中, 协调和同步线程的执行是至关重要的。Java 提供了许多并发工具来帮助开发人员有效地管理多线程应用程序。
其中之一是 CyclicBarrier, 它是一个强大的同步辅助类, 可用于在多个线程之间创建同步点, 以便它们可以在同一时间点协调执行某个任务。
CyclicBarrier 和 CountDownLatch 一样具有等待计数的功能, 但是相比于 CountDownLatch 功能更加强大。
其本身具备了以下特点
- 同步多个线程: 允许多个线程在达到共同的同步点之前进行等待, 然后同时开始执行
- 可重用性: 一旦所有等待线程达到同步点, CyclicBarrier 会重置计数器, 可以被重复使用。这使得它适用于循环的多阶段任务
- 自定义回调动作: 可以在所有线程达到同步点后执行自定义的回调动作, 以便在同步完成后执行一些特定的逻辑
- 指定等待时间: 提供了一个带有超时参数的 await() 方法, 允许等待一段指定的时间后, 即使没有足够的线程达到同步点, 也会继续执行
- 可检测是否被破坏: 提供的 isBroken() 方法允许检测在等待过程中是否有线程被中断, 以及是否有异常发生
- 内存一致性效果: 在所有线程达到同步点时, 对于之前的所有写入操作, 对于当前线程都是可以见的
为了理解 CyclicBarrier, 这里举一个通俗的例子。
开运动会时, 会有跑步这一项运动, 我们来模拟下运动员入场时的情况。
假设有 6 条跑道, 在比赛开始时, 就需要 6 个运动员在比赛开始的时候都站在起点了, 裁判员吹哨后才能开始跑步。跑道起点就相当于 “barrier”, 是临界点,
而这 6 个运动员就类比成线程的话, 就是这 6 个线程都必须到达指定点了, 意味着凑齐了一波, 然后才能继续执行, 否则每个线程都得阻塞等待, 直至凑齐一
波即可。cyclic 是循环的意思, 也就是说 CyclicBarrier 当多个线程凑齐了一波之后, 仍然有效, 可以继续凑齐下一波。CyclicBarrier的执行。
示意图如下:
当多个线程都达到了指定点后, 才能继续往下继续执行。这就有点像报数的感觉, 假设 6 个线程就相当于 6 个运动员, 到赛道起点时会报数进行统计, 如果刚好是 6 的话, 这一波就凑齐了, 才能往下执行。
CyclicBarrier 在使用一次后, 可以进行重置, 继续当做计数器使用, 这是与 CountDownLatch 的区别之一。
2 CyclicBarrier 的方法
上面说到的 6 个运动员, 6 个线程, 指的就是计数器的初始值 6, 可以通过 CyclicBarrier 的构造方法传入的
public class CyclicBarrier {
public CyclicBarrier(int parties) {
this(parties, null);
}
// barrierAction 指定了回调函数, 在维护的个数达到了 0 了, 就会执行这个函数
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0)
throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
}
CyclicBarrier 的主要方法
- await() throws InterruptedException, BrokenBarrierException : 等到所有的线程都到达指定的临界点
- await(long timeout, TimeUnit unit) : 与上面的 await 方法功能一致, 只不过这里有了时间限制, 调用该方法的线程等到指定的 timeout 时间后, 不管 N 是否减至为 0, 都会继续往下执行
- getNumberWaiting() : 获取当前距离线程执行还差多少值
- reset(): CyclicBarrier 状态重置, 重新开始
- isBroken(): 判断当前的 CyclicBarrier 是否为一个打破状态, 可以理解为当前的栅栏被破坏了, 无法继续使用了, 线程这时在调用 await 方法会抛出异常
CyclicBarrier 进入打破状态的场景
- 线程被设置了中断标识为 true 的情况下, 调用了 await 方法
- 调用了 await 带超时时间的方法, 超时了也会进入 break 状态
- 线程调用 await, 这时需要等待的线程数为 0 了, 执行回调函数, 这个过程失败了, 也会进入 break 状态
- 调用 reset 方法, 也会先设置为 break 状态, 在重试设置为 false
下面用一个具体的例子来说明 CyclicBarrier 的具体用法:
public class CyclicBarrierDemo {
//指定必须有 6 个运动员到达才行, 同时搭配一个线程数达到了, 就执行的回调的函数
private static CyclicBarrier barrier = new CyclicBarrier(6, () -> {
System.out.println("所有运动员入场, 裁判员一声令下!!!!!");
});
public static void main(String[] args) {
System.out.println("运动员准备进场, 全场欢呼............");
ExecutorService service = Executors.newFixedThreadPool(6);
for (int i = 0; i < 6; i++) {
service.execute(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 运动员, 进场");
// 线程挂起, 得到计数器达到了 0
barrier.await();
// 线程又被唤醒, 继续执行
System.out.println(Thread.currentThread().getName() + " 运动员出发");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
});
}
}
}
输出结果:
运动员准备进场, 全场欢呼............
pool-1-thread-2 运动员, 进场
pool-1-thread-1 运动员, 进场
pool-1-thread-3 运动员, 进场
pool-1-thread-4 运动员, 进场
pool-1-thread-5 运动员, 进场
pool-1-thread-6 运动员, 进场
所有运动员入场, 裁判员一声令下!!!!!
pool-1-thread-6 运动员出发
pool-1-thread-1 运动员出发
pool-1-thread-5 运动员出发
pool-1-thread-4 运动员出发
pool-1-thread-3 运动员出发
pool-1-thread-2 运动员出发
3 CyclicBarrier 的源码实现
CyclicBarrier 内部是通过 ReentrantLock 和 Condition 实现的。
而 ReentrantLock 和 Condition 是基于 AQS 实现的, 所以还有 AQS 的基础, 就能很容易理解。
3.1 CyclicBarrier 中的 Generation
CyclicBarrier 内部定义了一个很简单的内部类 Generation
private static class Generation {
Generation() {}
// 默认值为 false, 表示当前的栅栏是否为破坏状态, 也就是当前的 CyclicBarrier 已经遭到了破坏, 不适用了
boolean broken;
}
3.2 CyclicBarrier 的 await 方法
CyclicBarrier 的 await 方法可以让线程进行等待所有线程的到齐, 一起执行, 没到齐的情况进行挂起的操作。
public class CyclicBarrier {
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 参数 1: 是否会超时
// 参数 2: 0L 超时时间 0 纳秒, 即不超时
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe);
}
}
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {
final ReentrantLock lock = this.lock;
// 获取可重入锁, 加锁, 加锁失败会被阻塞
lock.lock();
try {
final Generation g = generation;
// 当前的栅栏状态已经为破坏状态, 直接抛出异常
if (g.broken)
throw new BrokenBarrierException();
// 线程是中断状态
if (Thread.interrupted()) {
// 直接将栅栏设置为破坏状态, 通过 condition 将所有在等待的队列的线程移到同步队列, 等待获取锁唤醒
breakBarrier();
throw new InterruptedException();
}
// 还需要等待的线程数 - 1
int index = --count;
// 还需要等待的线程数为 0 了
if (index == 0) {
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
// 有设置了回调函数, 执行回调函数
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
for (;;) {
try {
// 把当前的线程放到等待队列, 等待唤醒
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
// 线程唤醒了, 继续执行其他的逻辑
if (g.broken)
throw new BrokenBarrierException();
// 唤醒后, 保留的 Generation 引用和最新的 Generation 不一样了, 直接返回当前的 index, 既还需要等待多少个线程
if (g != generation)
return index;
// 设置了超时, 同时超时时间小于等于 0 了
if (timed && nanos <= 0L) {
// 直接将栅栏设置为爆破状态, 通过 condition 将所有在等待的队列的线程移到同步队列, 等待获取锁唤醒
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void breakBarrier() {
// 设置 Generation 的 broken 为 true,
generation.broken = true;
count = parties;
// 将在等待队列中的线程移到同步队列
trip.signalAll();
}
private void nextGeneration() {
// 将在等待队列中的线程移到同步队列
trip.signalAll();
// 更新新的计算值
count = parties;
// 重试设置一个新的 Generation, broken 默认为 false
generation = new Generation();
}
}
3.3 CyclicBarrier 的 isBroken 方法
public class CyclicBarrier {
public boolean isBroken() {
// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 返回当前的 Generation 的 broken 状态
return generation.broken;
} finally {
lock.unlock();
}
}
}
3.3 CyclicBarrier 的 reset 方法
public class CyclicBarrier {
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 设置为 break 状态, 更新新的等待线程个数
breakBarrier();
// 设置一个新的 Generation
nextGeneration();
} finally {
lock.unlock();
}
}
}
4 CountDownLatch 与 CyclicBarrier 的比较
CountDownLatch 与 CyclicBarrier 都是用于控制并发的工具类, 都可以理解成维护的就是一个计数器, 但是这两者还是各有不同侧重点的:
- CountDownLatch 一般用于某个线程 A 等待若干个其他线程执行完任务之后, 它才执行; 而 CyclicBarrier 一般用于一组线程互相等待至某个状态,
然后这一组线程再同时执行; CountDownLatch 强调一个线程等多个线程完成某件事情。CyclicBarrier 是多个线程互等, 等大家都完成, 再携手共进。- 调用 CountDownLatch 的 countDown 方法后, 当前线程并不会阻塞, 会继续往下执行; 而调用 CyclicBarrier 的 await 方法, 会阻塞当前
线程, 直到 CyclicBarrier 指定的线程全部都到达了指定点的时候, 才能继续往下执行- CountDownLatch 方法比较少, 操作比较简单, 而 CyclicBarrier 提供的方法更多, 比如能够通过 getNumberWaiting(), isBroken() 这些
方法获取当前多个线程的状态, 并且 CyclicBarrier 的构造方法可以传入 barrierAction, 指定当所有线程都到达时执行的业务功能- CountDownLatch 是不能复用的, 而 CyclicLatch 是可以复用的, 可以通过 reset 方法进行重置
5 参考
大白话说java并发工具类-CountDownLatch, CyclicBarrier