一、概述
1、作用
允许一个或多个线程等待,直到在其他线程中执行的一组操作完成,再继续执行。
2、demo
常用方法:
// 创建一个count为2的计数器;
CountDownLatch count = new CountDownLatch(2);
// 对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程;
count.countDown();
//阻塞当前线程,将当前线程加入阻塞队列;直到count为0时 唤醒当前线程。
count.await();
// 在timeout的时间之内阻塞当前线程,时间一过当前线程就可以继续执行。
count.await(long timeout, TimeUnit unit);
该demo源自CountDownLatch源码上的注释。
public class CountDownLatchTest {
static class WorkerRunnable implements Runnable {
private final CountDownLatch doneSignal;
private final int i;
WorkerRunnable(CountDownLatch doneSignal, int i) {
this.doneSignal = doneSignal;
this.i = i;
}
public void run() {
doWork(i);
doneSignal.countDown();
}
void doWork(int i) {
System.out.println(i);
}
}
public static void main(String[] args) throws InterruptedException {
CountDownLatch doneSignal = new CountDownLatch(5);
ExecutorService e = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; ++i) // create and start threads
e.execute(new WorkerRunnable(doneSignal, i));
doneSignal.await(); // wait for all to finish
System.out.println("所有线程执行结束,回到主线程");
e.shutdown();
}
}
3、应用场景
CountDownLatch可以用于 一个或者多个线程在执行之前 依赖于 某些前提业务先执行的场景。
- 多线程加载文件中的数据到内存中,等到所有的文件数据都加载完之后;主线程继续执行。
源码中的应用:
- RocketMQ中BrokerOuterAPI#registerBrokerAll()方法中,Broker采用线程池机制向所有的NameServer中发送心跳时用CountDownLatch来阻塞主线程,保证同步发送
二、实现原理
1> CountDownLatch在并发编程中充当一个计时器,其维护一个变量count用于表示操作数;
- 当一个操作执行完成时,原子性的减少一个count;
- 当count变量为0时,代表所有操作均已完成,唤醒等待的线程。
2> 具体代码实现:
CountDownLatch内部集成一个Sync类,从其构造函数可以看到:表示操作数的count值和核心操作都由同步器类Sync完成。
而Sync集成自AQS,所以本质上CountDownLatch就是使用AQS的共享锁机制,实现锁的获取、线程的阻塞/唤醒;
当调用countDownLatch.await()方法时,会尝试获取共享锁,只有当count的值为0时才能获取到;
未获取到锁的话,则创建一个线程节点Node,通过AQS的
addWaiter()
方法加入到AQS的阻塞队列,并把当前线程阻塞挂起。当调用countDownLatch.countDown()方法时,会在AQS中释放一次共享锁,即对count,也就是AQS的state变量进行减一操作;
当state=0时,将AQS阻塞队列里的阻塞线程全部唤醒;即阻塞在await()方法那的线程获取到共享锁,继续往下执行。
3> 另外根据happens-before原则,await()方法一定happens-before countDown()方法。
三、源码解析
1、核心变量和构造函数
CountDownLatch下就一个Sync变量,由此可见:CountDownLatch的所有操作都是围绕Sync类来的。
// Sync同步器,通过该对象调用AQS的方法,进而实现线程的阻塞和唤醒
private final Sync sync;
// 根据传入的count值初始化计数器
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// 创建一个Sync对象 并传入count值,Sync内部将会执行setState(count)
this.sync = new Sync(count);
}
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
setState(count);
}
}
2、await()
当我们调用countDownLatch.wait()的时候,会创建一个线程节点Node,加入到AQS的阻塞队列,并把改线程挂起阻塞。
内部调用AQS的实现方法acquireSharedInterruptibly()
,我们可以看到模板方法acquireSharedInterruptibly()
,响应线程中断式的获取锁。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
下面是AQS的acquireSharedInterruptibly()
方法:
// AQS类的模板方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 响应线程的中断
if (Thread.interrupted())
throw new InterruptedException();
// 调用子类tryAcquireShared()方法的实现,让子类实现自己获取信号量的机制。
// tryAcquireShared()方法返回-1表示获取锁失败,阻塞当前线程节点
if (tryAcquireShared(arg) < 0)
// 线程获取锁失败,加入到AQS的同步队列中阻塞等待。
doAcquireSharedInterruptibly(arg);
}
CountDownLatch的内部类Sync中实现了尝试获取共享锁(tryAcquireShared
)逻辑;
// Sync类中
protected int tryAcquireShared(int acquires) {
// 如果state变量为0,表示共享锁已经被取完了,无法再获取共享锁。即:当AQS中的state变量不为0时,也就是CountDownLatch中的count还不为0,需要阻塞线程。
return (getState() == 0) ? 1 : -1;
}
CountDownLatch实际上是一种共享锁机制
,即锁可以同时被多个线程获取。因为一旦count被减小为0,所有的线程走到await()方法时,都能够顺利通过,不会因为获取不到锁而阻塞。
而且Sync直接将count值作为AQS的state的值,只有state的值为0,线程才能获取锁,即获得执行线程的权限。
3、countDown()
当我们调用countDownLatch.down()方法的时候,会对计数器进行减1操作,AQS内部是通过释放共享锁的方式,对state进行减1操作;当state=0的时候会将AQS阻塞队列里的阻塞线程全部唤醒。
public void countDown() {
sync.releaseShared(1);
}
滋滋滋,又是完成依赖AQS的实现;
// 调用AQS类的模板方法
public final boolean releaseShared(int arg) {
// 子类自己实现自己的释放锁逻辑
if (tryReleaseShared(arg)) {
// 释放锁成功之后,唤醒阻塞队列中的线程。
doReleaseShared();
return true;
}
return false;
}
CountDownLatch的内部类Sync中实现了尝试释放共享锁(tryAcquireShared
)逻辑;
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
// 如果 state == 0,表明没有线程持有锁
if (c == 0)
return false;
int nextc = c-1;
// CAS 修改state变量,如果修改后的state = 0 表明释放锁成功
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
四、总结
啊这,CountDownLatch真的是非常简单,代码包括注释也就300多行,除了其内部类Sync重写了AQS的两个方法,底层真的是几乎完全靠AQS实现。
关于AQS的实现原理,我们后面的文章见。