概念:CyclicBarrier翻译为循环(屏障/栅栏),当一组线程到达一个屏障(同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续工作。
设计目的:和CountDownLatch都为同步辅助类,因为CountDownLatch的计数器是一次性的,属于一对多,
1、【某一个线程需要其余线程执行完之后再执行 (一个等多个)】,
2、【一些线程需要在某个时刻同时执行,就像等待裁判员枪响后,才能同时起跑(多个等一个)】
所以,CyclicBarrier被设计成为计数器可循环使用,多对多。只要多个线程都达到后,自会执行接下来的事,没有CountDownLatch的一个等多个,多个等一个的现象。
源码解析:
public class CyclicBarrier {
/**
* Each use of the barrier is represented as a generation instance.
* The generation changes whenever the barrier is tripped, or
* is reset. There can be many generations associated with threads
* using the barrier - due to the non-deterministic way the lock
* may be allocated to waiting threads - but only one of these
* can be active at a time (the one to which {@code count} applies)
* and all the rest are either broken or tripped.
* There need not be an active generation if there has been a break
* but no subsequent reset.
* 屏障的每次使用都表示为一个生成实例。当屏障被触发或重置时,生成就会改变。
可以有许多代与使用barrier的线程相关联——由于锁可能以不确定的方式分配给等待线程——但是一次只能有一个是活动的({@code count}应用的那个),
其余的要么中断,要么被触发。如果有中断,但没有随后的重置,则不需要活动代。
*/
private static class Generation {
boolean broken = false;
}
/** The lock for guarding barrier entry
保护屏障的锁,重入锁,调用await()方法的时候会用到,只有一个线程会执行成功,所以多线程高并发的情况下CyclicBarrier的执行效率不是很高 */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped
待跳闸的条件( Condition本身用于线程间通信,在这就是阻塞和唤醒线程用的)*/
private final Condition trip = lock.newCondition();
/** The number of parties
这个就是计数器初始值,新建类的时候传入的数值会赋值给他 */
private final int parties;
/* The command to run when tripped
跳闸时要执行操作的线程*/
private final Runnable barrierCommand;
/** The current generation
当前代*/
private Generation generation = new Generation();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
还有很多任务在等待。每一代从计数器初始值(parties)降到0。它会在每一代新创建或用尽时重置。
*/
private int count;
/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
只在持有锁时调用,去更新屏障的状态并唤醒其他线程。(意思代表下一代,其实就是新建一个代对象重新赋值)
*/
private void nextGeneration() {
// signal completion of last generation
//当计数器减少为零证明这代已经满了,唤醒所有当前代阻塞的线程,这是Condition的方法
trip.signalAll();
// set up next generation
//将初始需要拦截的计数器初始值初始化给count计数器
count = parties;
//上一代已经结束了,就要重新开启下一代
generation = new Generation();
}
/**
这个方法就是告诉这一代线程出问题了: 屏障里面的某个线程中断了,那就唤醒所有线程,然后这个屏障拦截的所有线程都会被唤醒,然后抛出异常,并告诉这一代坏掉了
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {
//将这代标记为true,告诉已经坏掉了
generation.broken = true;
//这一代重新计数
count = parties;
//唤醒这一代里面的所有阻塞线程
trip.signalAll();
}
/**
* Main barrier code, covering the various policies.
主要的屏障代码,涵盖了各种策略
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//当多线程并发情况下只有一个线程能获得锁(注意:该锁为jvm级锁,只能保证单实例有效,多进程下管理共享资源无效)
final ReentrantLock lock = this.lock;
lock.lock();
try {
//获取当前代的信息
final Generation g = generation;
//调用await方法之前需要判断这一代是否已经坏掉了,如果坏掉了直接抛出异常
if (g.broken)
throw new BrokenBarrierException();
//判断当前线程是否中断了,如果中断了也会抛出异常,并调用breakBarrier方法
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//如果这代没有坏掉,当前线程也没有中断,那么就将计数器减去1
int index = --count;
//减去1之后的数值如果为零,证明这一代已经满了,然后唤醒屏障拦截的所有线程
if (index == 0) { // tripped
boolean ranAction = false;
try {
//获取初始化的那个线程或者传入的线程
final Runnable command = barrierCommand;
if (command != null)
//线程执行任务
command.run();
ranAction = true;
//执行换代操作
nextGeneration();
return 0;
} finally {
//如果上的操作执行出现问题了,那么就执行breakBarrier方法,唤醒所有线程
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
//自旋
for (;;) {
try {
//await()方法可以传入两个参数的,一个是否需要判断等待时间,一个等待的时间
//如果不需要判断等待时间,那么直接调用await()方法,这是Condition类的方法
if (!timed)
trip.await();
//如果需要等待的时间大于0,那么线程阻塞要设置时间,超过这个时间需要被唤醒
else if (nanos > 0L)
//设置超时阻塞时间,线程就被阻塞了,程序执行到这里就不会继续向下执行,直到这个线程被唤醒,才会继续向下执行
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
//当前线程在阻塞过程,被中断了或者由于这一代屏障前的其他线程原因导致的代损坏了
//执行breakBarrier方法唤醒其他线程
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
//如果不是上述两个原因造成的,就中断当前线程
Thread.currentThread().interrupt();
}
}
// 当有任何一个线程中断,会调用 breakBarrier 方法.
// 就会唤醒其他的线程,其他线程醒来后,也要抛出异常
//走到这里这名线程被唤醒了,唤醒的方式可能是计数器为零被唤醒,也有可能调用breakBarrier
//方法导致代坏掉了而唤醒的,那么需要需要抛出代 破损异常
if (g.broken)
throw new BrokenBarrierException();
// g != generation 正常换代了,因为计数器为零了,代更新了
//一切正常,返回当前线程所在屏障的下标
// 如果 g == generation,说明还没有换代,那为什么会醒了?这里就是代的作用
// 因为一个线程可以使用多个屏障,当别的屏障唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。
// 正是因为这个原因,才需要 generation 来保证正确。
if (g != generation)
return index;
//如果不需要设置等待时间并且传入的时间不合法小于0,那么执行breakBarrier抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and which
* will execute the given barrier action when the barrier is tripped,
* performed by the last thread entering the barrier.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @param barrierAction the command to execute when the barrier is
* tripped, or {@code null} if there is no action
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
/**
* Creates a new {@code CyclicBarrier} that will trip when the
* given number of parties (threads) are waiting upon it, and
* does not perform a predefined action when the barrier is tripped.
*
* @param parties the number of threads that must invoke {@link #await}
* before the barrier is tripped
* @throws IllegalArgumentException if {@code parties} is less than 1
*/
public CyclicBarrier(int parties) {
this(parties, null);
}
/**
* Returns the number of parties required to trip this barrier.
*
* @return the number of parties required to trip this barrier
*/
public int getParties() {
return parties;
}
/**
* Waits until all {@linkplain #getParties parties} have invoked
* {@code await} on this barrier.
*
* <p>If the current thread is not the last to arrive then it is
* disabled for thread scheduling purposes and lies dormant until
* one of the following things happens:
* <ul>
* <li>The last thread arrives; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* one of the other waiting threads; or
* <li>Some other thread times out while waiting for barrier; or
* <li>Some other thread invokes {@link #reset} on this barrier.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* <p>If the barrier is {@link #reset} while any thread is waiting,
* or if the barrier {@linkplain #isBroken is broken} when
* {@code await} is invoked, or while any thread is waiting, then
* {@link BrokenBarrierException} is thrown.
*
* <p>If any thread is {@linkplain Thread#interrupt interrupted} while waiting,
* then all other waiting threads will throw
* {@link BrokenBarrierException} and the barrier is placed in the broken
* state.
*
* <p>If the current thread is the last thread to arrive, and a
* non-null barrier action was supplied in the constructor, then the
* current thread runs the action before allowing the other threads to
* continue.
* If an exception occurs during the barrier action then that exception
* will be propagated in the current thread and the barrier is placed in
* the broken state.
*
* @return the arrival index of the current thread, where index
* {@code getParties() - 1} indicates the first
* to arrive and zero indicates the last to arrive
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws BrokenBarrierException if <em>another</em> thread was
* interrupted or timed out while the current thread was
* waiting, or the barrier was reset, or the barrier was
* broken when {@code await} was called, or the barrier
* action (if present) failed due to an exception
*/
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
/**
* Waits until all {@linkplain #getParties parties} have invoked
* {@code await} on this barrier, or the specified waiting time elapses.
*
* <p>If the current thread is not the last to arrive then it is
* disabled for thread scheduling purposes and lies dormant until
* one of the following things happens:
* <ul>
* <li>The last thread arrives; or
* <li>The specified timeout elapses; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* the current thread; or
* <li>Some other thread {@linkplain Thread#interrupt interrupts}
* one of the other waiting threads; or
* <li>Some other thread times out while waiting for barrier; or
* <li>Some other thread invokes {@link #reset} on this barrier.
* </ul>
*
* <p>If the current thread:
* <ul>
* <li>has its interrupted status set on entry to this method; or
* <li>is {@linkplain Thread#interrupt interrupted} while waiting
* </ul>
* then {@link InterruptedException} is thrown and the current thread's
* interrupted status is cleared.
*
* <p>If the specified waiting time elapses then {@link TimeoutException}
* is thrown. If the time is less than or equal to zero, the
* method will not wait at all.
*
* <p>If the barrier is {@link #reset} while any thread is waiting,
* or if the barrier {@linkplain #isBroken is broken} when
* {@code await} is invoked, or while any thread is waiting, then
* {@link BrokenBarrierException} is thrown.
*
* <p>If any thread is {@linkplain Thread#interrupt interrupted} while
* waiting, then all other waiting threads will throw {@link
* BrokenBarrierException} and the barrier is placed in the broken
* state.
*
* <p>If the current thread is the last thread to arrive, and a
* non-null barrier action was supplied in the constructor, then the
* current thread runs the action before allowing the other threads to
* continue.
* If an exception occurs during the barrier action then that exception
* will be propagated in the current thread and the barrier is placed in
* the broken state.
*
* @param timeout the time to wait for the barrier
* @param unit the time unit of the timeout parameter
* @return the arrival index of the current thread, where index
* {@code getParties() - 1} indicates the first
* to arrive and zero indicates the last to arrive
* @throws InterruptedException if the current thread was interrupted
* while waiting
* @throws TimeoutException if the specified timeout elapses.
* In this case the barrier will be broken.
* @throws BrokenBarrierException if <em>another</em> thread was
* interrupted or timed out while the current thread was
* waiting, or the barrier was reset, or the barrier was broken
* when {@code await} was called, or the barrier action (if
* present) failed due to an exception
*/
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
/**
* Queries if this barrier is in a broken state.
*
* @return {@code true} if one or more parties broke out of this
* barrier due to interruption or timeout since
* construction or the last reset, or a barrier action
* failed due to an exception; {@code false} otherwise.
*/
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
/**
* Resets the barrier to its initial state. If any parties are
* currently waiting at the barrier, they will return with a
* {@link BrokenBarrierException}. Note that resets <em>after</em>
* a breakage has occurred for other reasons can be complicated to
* carry out; threads need to re-synchronize in some other way,
* and choose one to perform the reset. It may be preferable to
* instead create a new barrier for subsequent use.
重置整个屏障为初始状态,将已经拦截的释放掉,全部重新开始计数
*/
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//执行breakBarrier方法唤醒其他线程
breakBarrier(); // break the current generation
//更新屏障的状态并唤醒其他线程
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}
/**
* Returns the number of parties currently waiting at the barrier.
* This method is primarily useful for debugging and assertions.
*
* @return the number of parties currently blocked in {@link #await}
*/
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
}
实例:
假如公司团建,大家一起做大巴车,在大巴车出发之前,肯定是需要点名的,只有大家都到车上之后,才会发车,然后到了到了目的地之后,肯定是所有人都下车了,司机才能把车开走,这个过程中涉及了2次大家都就位之后,司机才能继续操作,可以证明CyclicBarrier可以循环使用计数器。
package com.runlion.middleground.marginal;
import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
/**
*
* @description:
* @date 2024年04月30日 17:26:05
*/
@Slf4j
public class StudyTest {
@Getter
@Setter
static class Flag{
public int count=0;
}
@Test
@SneakyThrows
public void testCyclicBarrier() {
Flag flag=new Flag();
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
if(flag.getCount()==0){
System.out.println("所有人都上车了,可以发车了");
flag.setCount(1);
}else {
System.out.println("所有人都下车了,司机可以走了");
}
});
for (int i = 1; i < 6; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "号上车了");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "号开始休息了");
TimeUnit.MILLISECONDS.sleep(new Random().nextInt(2000));
System.out.println(Thread.currentThread().getName() + "号下车了");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "号到达目的地了");
}catch (Exception e){
log.info(e.getMessage());
}
}, String.valueOf(i)).start();
}
System.out.println("主线程不阻塞");
}
}