一、概述
1、作用?
允许一组线程互相等待,直到到达某个公共屏障(barrier)点。
- 因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。
2、使用场景?
用于多线程计算数据,最后合并计算结果的场景。
3、常用类方法?
- await():告诉CyclicBarrier,线程已经到达了屏障,计数减一;然后阻塞线程,直到count为0。
- reset():重置CyclicBarrier的未加入到party的数量count和当前代Generation。
4、案例
public class CyclicBarrierTest {
public static void main(String[] args) throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
new Thread(() -> {
System.out.println("work thread start + 1");
try {
TimeUnit.SECONDS.sleep(1);
cyclicBarrier.await();
// 睡一会,让主线程先干活,work thread再继续
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("main thread OK, work thread go on!");
} catch (Exception e) {
e.printStackTrace();
}
}, "work-thread").start();
// 主线程在这等着,等其他线程调用await()方法之后,大家再一起执行
cyclicBarrier.await();
System.out.println("main end");
}
}
二、原理
1)CyclicBarrier中一个generation代表了每一代,通过这个实现CyclicBarrier的复用。
- parties变量用来表示参与party的线程数;
- count变量代表了还没到party的线程数;
- 外加个ReentrantLock锁和一个Condition条件变量实现线程的并发和阻塞。
2)在CyclicBarrier类的内部有一个计数器count,每个线程在到达屏障点的时候都会调用
await()
方法将自己阻塞排队,并将计数器count减1;
3)当计数器count减为0的时候,所有因调用await()
方法而被阻塞的线程
将被唤醒
。
4)线程的排队
进入party通过ReentrantLock实现
;进入party后睡眠
等待所有参会者通过锁的条件等待Condition实现
。
三、源码解析
1、成员变量和构造器
- CyclicBarrier内部是通过条件队列
trip
对线程进行阻塞;- 两个int型的变量
parties
和count
:
parties
表示每次拦截的线程数,即party的所有参会者
;count
表示还未拦截的线程数,即还有多少参会者没到party
;它的初始值和parties相同,调用await()
方法减1,减为0时将所有阻塞在条件变量上线程唤醒。
- 静态内部类
Generation
,代表栅栏的当前代
,就像玩游戏时代表的本局游戏,利用它可以实现循环等待
。barrierCommand
,表示换代前执行的任务。当前代结束会执行该任务,然后自动开启下一代
。
public class CyclicBarrier {
// 该类用于CyclicBarrier的一次协同是否完成(正常完成、异常完成)
// reset后会复用该结构,每一次的party都会生成一个该类的新实例
private static class Generation {
// 当前party有没有被强制中断,false表示没有
boolean broken = false;
}
// 同步锁,线程进入 条件等待 时需要获取锁
private final ReentrantLock lock = new ReentrantLock();
// 用于阻塞线程的条件变量(有未到party的线程,已到party的线程则等待在该条件变量上)
private final Condition trip = lock.newCondition();
// 参与party的人数
private final int parties;
// 当所有的线程都参与到了party中, 回调的方法
private final Runnable barrierCommand;
// 当前party
private Generation generation = new Generation();
// 还未到party的线程数
private int count;
// 构造器
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
}
2、await()方法
CyclicBarrier的核心方法是await()
,该方法是线程相互等待的关键,它有两种实现:一种是带等待超时的,一种是不带等待超时,本质上都是调用了同一个方法dowait()
,只是带等待超时的多传了一个时间。
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 直接调用自己的dowait()方法
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
CyclicBarrier#dowait()方法:
- 首先因为await()是同步的,需要先加互斥锁ReentrantLock;
- 如果发现当前代generation已经崩了,则当前线程直接抛出异常BrokenBarrierException。
- 每次进来都将count减1,减完立马进行判断看看count是否等于0:
- 如果等于0,则执行
换代前要执行的任务barrierCommand
,然后唤醒所有阻塞等待的线程
,接着自动进入
CyclicBarrier的下一代
;将计数器count
的值重新设为parties。如果barrierCommand运行异常
,则打破栅栏的当前代
,唤醒所有阻塞等待的线程。- count不等于0,这进入for循环:
不是超时等待
,直接调用Condition.await()阻塞
当前线程。是超时等待
,就在nanos时间内循环竞争锁;
- 如果当前线程在
await()
获取锁过程中被中断了:
- 在当前代还没结束之前打破栅栏,即游戏在中途被打断,则设置generation的broken状态为true并唤醒所有线程。
- 当前代已经结束,则直接中断当前线程。
线程被唤醒后
进行下面三个判断:
- 如果线程因为
broken generation
操作(即调用breakBarrier()方法)而被唤醒则抛出异常
;- 如果线程因为CyclicBarrier
正常换代被唤醒
,则返回计数器count的值
;- 如果线程因为
超时而被唤醒打破栅栏并抛出TimeOut异常
。
注意
:如果其中有一个线程因为等待超时而退出
,那么整盘游戏也会结束,其他线程都会被唤醒
。
- 最后解锁。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 上锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 保存当前party时的generation快照,generation更新后不会影响这里
final Generation g = generation;
// 当party被强制中断时,抛出异常
if (g.broken)
throw new BrokenBarrierException();
// 判断当前线程是否被中断
if (Thread.interrupted()) {
// 有中断的线程混入其中:
// 1)broken当前generation;
// 2)唤醒CyclicBarrier阻塞的所有线程;重新开始,注意:此时没有改变party的Generation
// 3)抛出线程中断异常
breakBarrier();
throw new InterruptedException();
}
// 每次都将计数器的值减1,即未到场的参会人个数减一。
int index = --count;
// 当前线程是最后一个到达party的线程时,回调barrierCommand,然后唤醒所有阻塞在条件变量上的线程。
// 若回调barrierCommand正常完成,则不需要手动调用reset()就可进入新的一代,因为这里调用了nextGeneration()
if (index == 0) {
boolean ranAction = false;
try {
// 唤醒所有线程前先执行指定的任务
final Runnable command = barrierCommand;
if (command != null)
// 若回调方法运行出现异常,异常直接上抛
command.run();
ranAction = true;
// 唤醒所有阻塞的线程,并进入下一代party,修改generation。
nextGeneration();
return 0;
} finally {
// barrierCommand回调方法发生了异常,那么设置broker标志位,并唤醒所有阻塞的线程
if (!ranAction)
breakBarrier();
}
}
// 循环等待最后一个参与party的线程 唤醒自己 或者 被中断 或者 等待超时
for (;;) {
try {
// 根据传入的参数决定是定时等待还是非定时等待
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 发生中断异常
// 如果当前线程在CyclicBarrier的等待唤醒期间(即:g没有被改变)被中断,则中断generation,唤醒所有线程
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// 如果当前线程已经完成在CyclicBarrier上的等待(即:g被改变),则直接标志当前线程被中断
Thread.currentThread().interrupt();
}
}
// 如果线程因为broken generation操作而被唤醒则抛出异常
if (g.broken)
throw new BrokenBarrierException();
// 如果g != generation ,说明CyclicBarrier换代了(即:generation改变了),线程因此被唤醒的话,则返回还有多个参会者没进来,即计数器的值count。
if (g != generation)
return index;
// 如果线程因为时间到了而被唤醒,这broken generation 并抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
3、breakBarrier()方法:
意味着有人搞破坏,游戏中途结束,将所有的等待线程全部唤醒。
- await()方法通过抛出BrokenBarrierException 异常返回;
private void breakBarrier() {
// 中断当前generation,即打破栅栏
generation.broken = true;
// 设置 未参会者数量count 等于 所有需要参会者数量
count = parties;
// 唤醒所有阻塞的线程
trip.signalAll();
}
4、nextGeneration()方法:
开启栅栏新的一代。
private void nextGeneration() {
// 唤醒所有阻塞的线程
trip.signalAll();
// 设置 未参会者数量count 等于 所有需要参会者数量
count = parties;
// 生成栅栏的下一代Generation,这也是和breakBarrier()方法的区别
generation = new Generation();
}
5、reset()方法:
重置一个栅栏
:
- 打破栅栏 中断当前代,await()方法通过抛出BrokenBarrierException 异常返回;
- 开始新的下一代,重置count和generation。
public void reset() {
final ReentrantLock lock = this.lock;
// 上锁
lock.lock();
try {
breakBarrier(); // 将所有参与party的线程唤醒
nextGeneration(); // 生成下一代
} finally {
lock.unlock();
}
}
若barrierCommand正常完成
,则不需要手动调用reset()
就可自动进入新的一代
,因为运行barrierCommand之后调用了nextGeneration()。
四、总结
简单说就是一个ReentrantLock
加上一个Condition条件变量
实现并发控制和多个线程的阻塞等待。
并且采用多线程协作
机制,在多个线程协作过程中,只要有一个线程被中断或者发生异常
,则整个协作过程取消
。
CyclicBarrier和CountDownLatch相同的是,它们都能让多个线程协调在某一个节点上等待;下面我们看一下他们的区别?
2、CyclicBarrier和CountDownLatch的区别?
1.是否可复用?
- CountDownLatch的计数器只能使用一次;
- CyclicBarrier的计数器可以使用reset()方法重置进而循环使用。
2.唤醒方式?
- CountDownLatch是多线程阻塞后,需要等待外界条件达到某种状态才会被统一唤醒,即CountDownLatch还需要额外的countDown()唤醒操作。
- CyclicBarrier是多线程协作,当线程达到等待数量或者一个线程出现异常或被中断时自动放行;即CyclicBarrier中只需要await(),不需要额外的唤醒操作;
3.从性能来看:
- CountDownLatch的性能大于CyclicBarrier,因为CountDownLatch是自己采用CAS,利用共享锁的原理实现;
- 而CyclicBarrier是采用ReentrantLock独占锁 +Condition条件变量实现;
4.从应用场景来看:
- CyclicBarrier能处理更为复杂的业务场景。
5.简言之:
- 可以理解为CountDownLatch是一个计数器,线程完成一个记录一个,只不过计数不是递增而是递减。
- 而CyclicBarrier是一个阀门,所有线程都到了 ,阀门才打开。