java线程栅栏_Java 多线程基础 - CyclicBarrier
我的博客 轉(zhuǎn)載請注明原創(chuàng)出處。
序
java.util.concurrent包里有幾個能幫助人們管理相互合作的線程集的類,為多線程常見的應(yīng)用場景預(yù)置了抽象好的類庫。在遇到這些應(yīng)用場景時應(yīng)該直接重用合適的庫類而不要試圖提供手工的鎖與條件的集合。
同步屏障 CyclicBarrier
官方定義上文已經(jīng)給出,人話版是等待特定數(shù)量的線程都到達同步屏障后各線程才繼續(xù)執(zhí)行。
同步屏障有兩個構(gòu)造函數(shù),第一個構(gòu)造函數(shù)只需要指定需要等待的線程數(shù)量,第二構(gòu)造函數(shù)多了一個在特定數(shù)量的線程都到達同步屏障時優(yōu)先執(zhí)行的Runnable。
例子:
public class CyclicBarrierTest {
// 等待4個線程到達同步屏障,全部到達后優(yōu)先執(zhí)行一個 Runnable
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(4,
() -> System.out.println("全部到達同步屏障" + LocalDateTime.now()));
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
Runnable runnable = () -> {
System.out.println("到達同步屏障" + LocalDateTime.now());
try {
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("繼續(xù)執(zhí)行");
};
List list = Arrays.asList(runnable, runnable, runnable);
list.forEach(runnable1 -> new Thread(runnable1).start());
Thread.sleep(1000);
System.out.println("最后一個線程到達同步屏障");
cyclicBarrier.await();
}
}
輸出:
到達同步屏障2018-08-12T14:33:16.769
到達同步屏障2018-08-12T14:33:16.769
到達同步屏障2018-08-12T14:33:16.769
最后一個線程到達同步屏障
全部到達同步屏障2018-08-12T14:33:17.746
繼續(xù)執(zhí)行
繼續(xù)執(zhí)行
繼續(xù)執(zhí)行
Process finished with exit code 0
同步屏障的應(yīng)用場景是那種多線程執(zhí)行任務(wù),在全部任務(wù)執(zhí)行完成后需要進行一些操作的場景。比如對每個用戶進行充值統(tǒng)計,最后匯總返回。
CyclicBarrier的方法如上,分別是
getParties() 返回需要到達同步屏障的線程數(shù)量
await() 等待所有線程到達
await(long, TimeUnit) 帶時間限制的await()
isBroken() 判斷阻塞的線程是否被中斷
reset() 重置計數(shù)器
getNumberWaiting() 當(dāng)前被阻塞的線程數(shù)量,該方法主要用于調(diào)試和斷言
源碼分析
那么CyclicBarrier是怎么實現(xiàn)這個效果的呢?我們從最常用的await()方法入手。
可以看到await()方法主要是調(diào)用了CyclicBarrier私有的dowait()方法
如注釋所言,dowait()方法就是實現(xiàn)功能的主要方法了。
首先拿到可重入的鎖
然后通過內(nèi)部類Generation判斷阻塞的線程是否被中斷或該屏障已經(jīng)失效。
如果線程沒有被中斷,那么就獲取還沒有到達的線程數(shù)量并減一。如果已經(jīng)沒有需要等待的線程了,就判斷是否有需要執(zhí)行的Runnable。如果沒報錯就更新屏障狀態(tài)并喚醒所有線程繼續(xù)執(zhí)行。Runnable執(zhí)行報錯的話執(zhí)行breakBarrier()方法。
如果還有未到達的線程,就進入一個死循環(huán),直到超時、線程中斷、屏障失效、全部完成等情況下退出。
完整的代碼:
/**
* Each use of the barrier is represented as a generation instance.
* The generation changes whenever the barrier is tripped, or
* is reset. There can be many generations associated with threads
* using the barrier - due to the non-deterministic way the lock
* may be allocated to waiting threads - but only one of these
* can be active at a time (the one to which {@code count} applies)
* and all the rest are either broken or tripped.
* There need not be an active generation if there has been a break
* but no subsequent reset.
*/
private static class Generation {
boolean broken = false;
}
/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();
/** The number of parties */
private final int parties;
/* The command to run when tripped */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation. It is reset to parties on each new
* generation or when broken.
*/
private int count;
/**
* Updates state on barrier trip and wakes up everyone.
* Called only while holding lock.
*/
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
/**
* Sets current barrier generation as broken and wakes up everyone.
* Called only while holding lock.
*/
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
總結(jié)
以上是生活随笔為你收集整理的java线程栅栏_Java 多线程基础 - CyclicBarrier的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: 如何清理水草泥里的粑粑?
 - 下一篇: java私塾 设计模式 视频_[章节]J