這篇文章主要介紹“JUC的CyclicBarrier怎么實現(xiàn)”,在日常操作中,相信很多人在JUC的CyclicBarrier怎么實現(xiàn)問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”JUC的CyclicBarrier怎么實現(xiàn)”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
創(chuàng)新互聯(lián)成立十載來,這條路我們正越走越好,積累了技術(shù)與客戶資源,形成了良好的口碑。為客戶提供成都做網(wǎng)站、網(wǎng)站建設(shè)、網(wǎng)站策劃、網(wǎng)頁設(shè)計、域名注冊、網(wǎng)絡(luò)營銷、VI設(shè)計、網(wǎng)站改版、漏洞修補等服務(wù)。網(wǎng)站是否美觀、功能強大、用戶體驗好、性價比高、打開快等等,這些對于網(wǎng)站建設(shè)都非常重要,創(chuàng)新互聯(lián)通過對建站技術(shù)性的掌握、對創(chuàng)意設(shè)計的研究為客戶提供一站式互聯(lián)網(wǎng)解決方案,攜手廣大客戶,共同發(fā)展進(jìn)步。
前面介紹的 CountDownLatch 同步器是基于 AQS 實現(xiàn)的,而本文要介紹的 CyclicBarrier 則沒有直接繼承 AQS 的 AbstractQueuedSynchronizer 抽象類,而是基于 ReentrantLock 鎖進(jìn)行實現(xiàn)。首先來看一下 CyclicBarrier 的字段定義,如下:
public class CyclicBarrier { /** 支撐 CyclicBarrier 的重入鎖 */ private final ReentrantLock lock = new ReentrantLock(); /** 條件隊列,已經(jīng)到達(dá)屏障的線程會在條件隊列中等待其它線程 */ private final Condition trip = lock.newCondition(); /** 參與的線程數(shù) */ private final int parties; /** 當(dāng)所有線程都到達(dá)屏障時的回調(diào)函數(shù) */ private final Runnable barrierCommand; /** 當(dāng)前年代對象 */ private Generation generation = new Generation(); /** 當(dāng)前剩余未完成的線程數(shù) */ private int count; // ... 省略方法定義 }
上述各個字段的含義如代碼注釋,這里我們進(jìn)一步解釋一下 generation 字段,該字段為 Generation 類型,用于表示當(dāng)前 CyclicBarrier 同步器的年代信息。Generation 內(nèi)部類定義如下:
private static class Generation { boolean broken = false; }
當(dāng)新建一個 CyclicBarrier 對象時會初始化 CyclicBarrier#generation
字段。此外,當(dāng)所有參與的線程都到達(dá)屏障后(也稱 tripped),或者 CyclicBarrier 被重置(即調(diào)用 CyclicBarrier#reset
方法)時,會新建一個 Generation 對象賦值給 CyclicBarrier#generation
字段,表示年代的更替。
Generation 定義的 Generation#broken
屬性用于標(biāo)識當(dāng)前屏障是否被打破。當(dāng) CyclicBarrier 被重置,或者參與到該屏障的某個線程被中斷、等待超時,亦或是執(zhí)行回調(diào)函數(shù)發(fā)生異常,都會導(dǎo)致屏障被打破。破損的屏障(即 broken=true
)會導(dǎo)致當(dāng)前參與等待的線程,以及已經(jīng)處于等待狀態(tài)的線程拋出 BrokenBarrierException 異常,并退出當(dāng)前屏障進(jìn)程。
因為 CyclicBarrier 的復(fù)用性,導(dǎo)致在程序運行期間可能并存多個年代信息,但是任何時刻只有一個年代對象是活躍的,剩余的年代對象對應(yīng)的 CyclicBarrier 要么是已經(jīng)用完的(tripped),要么就是已經(jīng)破損的。
介紹完了字段定義,下面來分析一下 CyclicBarrier 的方法實現(xiàn),首先來看一下構(gòu)造方法。CyclicBarrier 定義了兩個構(gòu)造方法,實現(xiàn)如下:
public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) { throw new IllegalArgumentException(); } this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
其中 parties 參數(shù)用于指定當(dāng)前參與的線程數(shù),參數(shù) barrierAction 用于指定當(dāng)所有參與的線程都到達(dá)屏障時的回調(diào)邏輯。你可能有些疑問,既然設(shè)置了 parties 字段,為什么還要設(shè)置一個 count 字段呢?
考慮 CyclicBarrier 是可重用的,所以需要有一個字段記錄參與線程的數(shù)目,即 parties 字段,而 count 字段初始值等于 parties 字段值,但是在運行期間其值是會隨著參與線程逐一到達(dá)屏障而遞減的,所以 count 值始終記錄的是當(dāng)前未到達(dá)屏障的線程數(shù)。當(dāng) CyclicBarrier 被重置時,我們需要依據(jù) parties 字段值來重置 count 字段值。
繼續(xù)來看一下 CyclicBarrier 除構(gòu)造方法以外的剩余方法實現(xiàn),主要分析一下 CyclicBarrier#await
方法和 CyclicBarrier#reset
方法。首先來看一下 CyclicBarrier#reset
方法,當(dāng)我們希望復(fù)用 CyclicBarrier 對象時可以調(diào)用該方法,用于重置 count 值、年代信息,并喚醒所有位于條件隊列中等待的線程。方法實現(xiàn)如下:
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { this.breakBarrier(); // break the current generation this.nextGeneration(); // start a new generation } finally { lock.unlock(); } } private void breakBarrier() { // 標(biāo)識當(dāng)前屏障被打破 generation.broken = true; // 重置 count 字段值 count = parties; // 喚醒所有等待的線程 trip.signalAll(); } private void nextGeneration() { // 喚醒所有等待的線程 trip.signalAll(); // 重置 count 值 count = parties; generation = new Generation(); }
再來看一下 CyclicBarrier#await
方法,該方法用于阻塞當(dāng)前線程,以在屏障處等待其它線程到達(dá),CyclicBarrier 還為該方法定義了超時等待版本。當(dāng)一個線程因調(diào)用 CyclicBarrier#await
方法進(jìn)入等待狀態(tài)時,該線程將會在滿足以下條件之一時退出等待狀態(tài):
所有參與的線程都已經(jīng)到達(dá)了屏障。
當(dāng)前線程被中斷,或者其它處于等待狀態(tài)的線程被中斷。
如果啟用了超時機制,并且某個參與的線程等待超時。
CyclicBarrier 被重置。
方法 CyclicBarrier#await
的普通版本和超時版本在實現(xiàn)上都是直接委托給 CyclicBarrier#dowait
方法執(zhí)行,所以下面主要來分析一下該方法,實現(xiàn)如下:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 加鎖 lock.lock(); try { // 獲取當(dāng)前年代信息 final Generation g = generation; // 當(dāng)前屏障被打破,拋出異常 if (g.broken) { throw new BrokenBarrierException(); } // 當(dāng)前線程被中斷,打破屏障,并喚醒所有等待的線程 if (Thread.interrupted()) { this.breakBarrier(); throw new InterruptedException(); } int index = --count; // 如果 count 值為 0,說明所有的線程都已經(jīng)到達(dá)屏障 if (index == 0) { // tripped boolean ranAction = false; try { // 如果設(shè)置了回調(diào),則執(zhí)行 final Runnable command = barrierCommand; if (command != null) { command.run(); } ranAction = true; // 喚醒所有等待的線程,并重置屏障 this.nextGeneration(); return 0; } finally { // 如果執(zhí)行回調(diào)異常 if (!ranAction) { this.breakBarrier(); } } } // count 值不為 0,說明存在還未到達(dá)屏障的線程,則進(jìn)入條件隊列等待 // loop until tripped, broken, interrupted, or timed out for (; ; ) { try { if (!timed) { // 進(jìn)入條件隊列等待 trip.await(); } else if (nanos > 0L) { // 進(jìn)入條件隊列超時等待 nanos = trip.awaitNanos(nanos); } } catch (InterruptedException ie) { // 當(dāng)前線程被中斷,響應(yīng)中斷 if (g == generation && !g.broken) { this.breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not been interrupted, // so this interrupt is deemed to "belong" to subsequent execution. Thread.currentThread().interrupt(); } } // 屏障被打破 if (g.broken) { throw new BrokenBarrierException(); } // 當(dāng)前 CyclicBarrier 已經(jīng)被重置 if (g != generation) { return index; } // 等待超時 if (timed && nanos <= 0L) { this.breakBarrier(); throw new TimeoutException(); } } } finally { // 釋放鎖 lock.unlock(); } }
由上述實現(xiàn)我們可以總結(jié)線程在調(diào)用 CyclicBarrier#await
方法時的整體執(zhí)行流程。如果當(dāng)前線程不是最后一個到達(dá)屏障的線程(遞減 count 值之后仍然大于 0),則調(diào)用 Condition#await
方法(或超時版本)將當(dāng)前線程添加到條件隊列中等待。如果當(dāng)前線程是最后一個到達(dá)屏障的線程(遞減 count 值之后為 0),則在線程到達(dá)屏障后執(zhí)行:
如果指定了回調(diào)邏輯,則執(zhí)行該回調(diào),如果期間發(fā)生任何異常,則打破屏障、重置 count 值,并喚醒條件隊列中所有等待的線程;
否則,繼續(xù)調(diào)用 CyclicBarrier#nextGeneration
方法喚醒條件隊列中所有等待的線程,并重置 count 值和年代信息。
在上述過程中如果當(dāng)前線程或處于等待狀態(tài)的線程被中斷、屏障被打破、年代信息發(fā)生變化,或者等待超時(如果允許的話),則線程將會從 Condition#await
方法中退出,即當(dāng)前屏障失效。
到此,關(guān)于“JUC的CyclicBarrier怎么實現(xiàn)”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
網(wǎng)站名稱:JUC的CyclicBarrier怎么實現(xiàn)
鏈接地址:http://weahome.cn/article/pcsicj.html