作者:廖祥俐
CountDownLatch和CyclicBarrier是Java JDK中提供的辅助类用来简化并发编程,在此分别介绍CountDownLatch和CyclicBarrier的作用,常用Api,使用场景,并给出简单的示例,最后对一些注意点以及两者的不同点进行阐述。
CountDownLatch 允许一个或多个线程等待其他线程完成操作,首先初始化一个计数值N,然后通过调用 await可以让当前线程阻塞,直到计数值N在被调用N次countDown置为0后,再继续执行。
1,构造方法
public CountDownLatch(int count) // 创建一个新的 CountDownLatch,它将在计数值count减为0时启动
2,主要方法
public void await() throws InterruptedException; //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public void countDown(); //将count值减1
一个线程需要等待其它线程完成操作后才能进行后续的操作。
比如在进行歌曲匹配时,当传入的歌曲songId量很大(如50w),单线程执行会导致处理速度过慢,这时候希望能有多个线程同时进行处理,提高匹配的效率,在所有匹配完成后,需要往数据库里插入一条匹配完成的记录。
int countSize = 5;
CountDownLatch countDownLatch = new CountDownLatch(countSize);
for(int i=0; i<countSize; i++){ // 如果这里为i<countSize-1,则主线程会一直在await()这里hold住,因为计数器不会归0;如果这里为i<countSize+1,则主线程等待到第countSize个countDown后,就会去执行await()后面的任务
new CuntDownTestThread(countDownLatch).start();
}
countDownLatch.await();
System.out.println("wait all finished."); // 这里会等到把计数器countDown到0才执行
----------------------------------------------------------------------------
class CuntDownTestThread extends Thread{
private CountDownLatch countDownLatch;
public CuntDownTestThread(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run(){
try {
Thread.sleep((long)(Math.random()*10000));
} catch (InterruptedException e) {
// TODO
}
System.out.println("before count down. Thread id:" + Thread.currentThread().getName());
doMatchTask();
this.countDownLatch.countDown();
System.out.println("after count down. Thread id:" + Thread.currentThread().getName());
}
}
1)初始化值
初始化值如下:
CountDownLatch countDownLatch = new CountDownLatch(countSize);
若countSize小于0,则会直接抛出异常 若countSize等于0,计数器就是零,调用await方法时不会阻塞当前线程 只有当countSize大于0时,调用await方法会阻塞当前线程。
2)忘记countDown或其它原因导致没有countDown
在初始化CountDownLatch 的计数器值N后(大于0),如果由于某种原因,countDown
的执行次数小于N,则会导致计数器值一直大于0,则会导致调用了await方法的线程一直处于等待状态。
3)如果是当前线程是等待N个线程完成操作,然后执行后续的任务,则注意在每个线程里,有且仅有一次:
countDownLatch.countDown();
4)如果当前线程在await时,不关心N个线程是否执行到countDown
而使得计数器置为0,则可通过设置超时时间来进行控制,当达到超时时间后,将计数器置为0,当前线程继续执行。
await(long timeout, TimeUnit unit)
CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续工作。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier当前线程已经到达了屏障,然后当前线程被阻塞
1,构造方法
CyclicBarrier(int parties) //创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
CyclicBarrier(int parties, Runnable barrierAction) // 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
2,常用方法
int await() // 在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
int await(long timeout, TimeUnit unit) // 在所有参与者都已经在此屏障上调用 await 方法之前将一直等待,或者超出了指定的等待时间。
int getNumberWaiting() //返回当前在屏障处等待的参与者数目。
int getParties() //返回要求启动此 barrier 的参与者数目。
boolean isBroken() // 查询此屏障是否处于损坏状态。
void reset() // 将屏障重置为其初始状态。
需要所有的子任务都完成时,才执行主任务,这个时候就可以选择使用CyclicBarrier
如在进行版权库匹配任务时,需要把曲库数据迁移至版权库,版权库中的匹配结果显示中才会有相应的数据,即多个线程一起迁移数据,等数据迁移完之后,再分别执行匹配任务。
int countSize = 5;
CyclicBarrier cyclicBarrier = new CyclicBarrier(countSize);
for(int i=0; i<countSize; i++){
new CyclicBarrierTestThread(cyclicBarrier).start();
}
System.out.println("wait all finished."); // 这里会直接执行
----------------------------------------------------------------------------------------
class CyclicBarrierTestThread extends Thread{
private CyclicBarrier cyclicBarrier;
public CyclicBarrierTestThread(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run(){
try {
Thread.sleep((long)(Math.random()*10000));
} catch (InterruptedException e) {
// TODO
}
try {
System.out.println("before await. Thread id:" + Thread.currentThread().getName());
doImportTask() ; // 导入数据
this.cyclicBarrier.await(); // 线程在此阻塞,直到集齐
doMatchTask(); // 进行匹配操作
System.out.println("after await. Thread id:" + Thread.currentThread().getName());
} catch (InterruptedException e) {
// TODO
} catch (BrokenBarrierException e) {
// TODO
}
}
}
1)对于指定计数值parties,若由于某种原因,没有足够的线程调用CyclicBarrier的await,则所有调用await的线程都会被阻塞;
2)同样的CyclicBarrier也可以调用await(timeout, unit),设置超时时间,在设定时间内,如果没有足够线程到达,则解除阻塞状态,继续工作;
3)通过reset重置计数,会使得进入await的线程出现BrokenBarrierException;
4)如果采用是CyclicBarrier(int parties, Runnable barrierAction) 构造方法,执行barrierAction操作的是最后一个到达的线程
1,同一个线程调用CountDownLatch的countDown可以将计数减为0,使等待线程解除阻塞状态;而对于CyclicBarrier,如果调用多次await时,线程会在第一个await处阻塞,直到集齐所有成员通过屏障,再到第二个await阻塞,进行下一轮“集齐成员”...,可以阅读源码,如下注释:
// CyclicBarrier 中 private int dowait(boolean timed, long nanos) 部分源码
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand; // 这里的command 是初始构造函数为CyclicBarrier(int parties, Runnable barrierAction) 的barrierAction
if (command != null)
command.run();
ranAction = true; // 如果command执行异常,这里就会被跳过
nextGeneration(); // 下一个轮次(重置count等)
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
2,CountDownLatch的计数器值减为0后,不能再重置;而CyclicBarrier可以通过reset重置,并且在计数器值减为0后,会自动重置(进入下一个Cyclic)。
3,CountDownLatch调用await时,是当前线程阻塞等待计数值置为0;而CyclicBarrier是每个调用await的线程都进行阻塞,直到调用await的线程达到计数值
4,CyclicBarrier提供了其它辅助方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量,isBroken方法用来知道阻塞的线程是否被中断;.CountDownLatch提供了getCount()方法可以获取当前的计数值。
网易云大礼包:https://www.163yun.com/gift
本文来自网易实践者社区,经作者廖祥俐授权发布