(1)CyclicBarrier是什么?
創(chuàng)新互聯(lián)是一家網(wǎng)站設(shè)計(jì)、做網(wǎng)站,提供網(wǎng)頁設(shè)計(jì),網(wǎng)站設(shè)計(jì),網(wǎng)站制作,建網(wǎng)站,按需策劃設(shè)計(jì),網(wǎng)站開發(fā)公司,公司2013年成立是互聯(lián)行業(yè)建設(shè)者,服務(wù)者。以提升客戶品牌價(jià)值為核心業(yè)務(wù),全程參與項(xiàng)目的網(wǎng)站策劃設(shè)計(jì)制作,前端開發(fā),后臺程序制作以及后期項(xiàng)目運(yùn)營并提出專業(yè)建議和思路。(2)CyclicBarrier具有什么特性?
(3)CyclicBarrier與CountDownLatch的對比?
CyclicBarrier,回環(huán)柵欄,它會阻塞一組線程直到這些線程同時(shí)達(dá)到某個(gè)條件才繼續(xù)執(zhí)行。它與CountDownLatch很類似,但又不同,CountDownLatch需要調(diào)用countDown()方法觸發(fā)事件,而CyclicBarrier不需要,它就像一個(gè)柵欄一樣,當(dāng)一組線程都到達(dá)了柵欄處才繼續(xù)往下走。
public class CyclicBarrierTest {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
for (int i = 0; i < 3; i++) {
new Thread(()->{
System.out.println("before");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("after");
}).start();
}
}
}
這段方法很簡單,使用一個(gè)CyclicBarrier使得三個(gè)線程保持同步,當(dāng)三個(gè)線程同時(shí)到達(dá)cyclicBarrier.await();
處大家再一起往下運(yùn)行。
private static class Generation {
boolean broken = false;
}
Generation,中文翻譯為代,一代人的代,用于控制CyclicBarrier的循環(huán)使用。
比如,上面示例中的三個(gè)線程完成后進(jìn)入下一代,繼續(xù)等待三個(gè)線程達(dá)到柵欄處再一起執(zhí)行,而CountDownLatch則做不到這一點(diǎn),CountDownLatch是一次性的,無法重置其次數(shù)。
// 重入鎖
private final ReentrantLock lock = new ReentrantLock();
// 條件鎖,名稱為trip,絆倒的意思,可能是指線程來了先絆倒,等達(dá)到一定數(shù)量了再喚醒
private final Condition trip = lock.newCondition();
// 需要等待的線程數(shù)量
private final int parties;
// 當(dāng)喚醒的時(shí)候執(zhí)行的命令
private final Runnable barrierCommand;
// 代
private Generation generation = new Generation();
// 當(dāng)前這一代還需要等待的線程數(shù)
private int count;
通過屬性可以看到,CyclicBarrier內(nèi)部是通過重入鎖的條件鎖來實(shí)現(xiàn)的,那么你可以腦補(bǔ)一下這個(gè)場景嗎?
彤哥來腦補(bǔ)一下:假如初始時(shí)count = parties = 3
,當(dāng)?shù)谝粋€(gè)線程到達(dá)柵欄處,count減1,然后把它加入到Condition的隊(duì)列中,第二個(gè)線程到達(dá)柵欄處也是如此,第三個(gè)線程到達(dá)柵欄處,count減為0,調(diào)用Condition的signalAll()通知另外兩個(gè)線程,然后把它們加入到AQS的隊(duì)列中,等待當(dāng)前線程運(yùn)行完畢,調(diào)用lock.unlock()的時(shí)候依次從AQS的隊(duì)列中喚醒一個(gè)線程繼續(xù)運(yùn)行,也就是說實(shí)際上三個(gè)線程先依次(排隊(duì))到達(dá)柵欄處,再依次往下運(yùn)行。
以上純屬彤哥腦補(bǔ)的內(nèi)容,真實(shí)情況是不是如此呢,且往后看。
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
// 初始化parties
this.parties = parties;
// 初始化count等于parties
this.count = parties;
// 初始化都到達(dá)柵欄處執(zhí)行的命令
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
構(gòu)造方法需要傳入一個(gè)parties變量,也就是需要等待的線程數(shù)。
每個(gè)需要在柵欄處等待的線程都需要顯式地調(diào)用await()方法等待其它線程的到來。
public int await() throws InterruptedException, BrokenBarrierException {
try {
// 調(diào)用dowait方法,不需要超時(shí)
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
// 加鎖
lock.lock();
try {
// 當(dāng)前代
final Generation g = generation;
// 檢查
if (g.broken)
throw new BrokenBarrierException();
// 中斷檢查
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// count的值減1
int index = --count;
// 如果數(shù)量減到0了,走這段邏輯(最后一個(gè)線程走這里)
if (index == 0) { // tripped
boolean ranAction = false;
try {
// 如果初始化的時(shí)候傳了命令,這里執(zhí)行
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
// 調(diào)用下一代方法
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// 這個(gè)循環(huán)只有非最后一個(gè)線程可以走到
for (;;) {
try {
if (!timed)
// 調(diào)用condition的await()方法
trip.await();
else if (nanos > 0L)
// 超時(shí)等待方法
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
// 檢查
if (g.broken)
throw new BrokenBarrierException();
// 正常來說這里肯定不相等
// 因?yàn)樯厦娲蚱茤艡诘臅r(shí)候調(diào)用nextGeneration()方法時(shí)generation的引用已經(jīng)變化了
if (g != generation)
return index;
// 超時(shí)檢查
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// 調(diào)用condition的signalAll()將其隊(duì)列中的等待者全部轉(zhuǎn)移到AQS的隊(duì)列中
trip.signalAll();
// 重置count
count = parties;
// 進(jìn)入下一代
generation = new Generation();
}
dowait()方法里的整個(gè)邏輯分成兩部分:
(1)最后一個(gè)線程走上面的邏輯,當(dāng)count減為0的時(shí)候,打破柵欄,它調(diào)用nextGeneration()方法通知條件隊(duì)列中的等待線程轉(zhuǎn)移到AQS的隊(duì)列中等待被喚醒,并進(jìn)入下一代。
(2)非最后一個(gè)線程走下面的for循環(huán)邏輯,這些線程會阻塞在condition的await()方法處,它們會加入到條件隊(duì)列中,等待被通知,當(dāng)它們喚醒的時(shí)候已經(jīng)更新?lián)Q“代”了,這時(shí)候返回。
學(xué)習(xí)過前面的章節(jié),看這個(gè)圖很簡單了,看不懂的同學(xué)還需要把推薦的內(nèi)容好好看看哦^^
(1)CyclicBarrier會使一組線程阻塞在await()處,當(dāng)最后一個(gè)線程到達(dá)時(shí)喚醒(只是從條件隊(duì)列轉(zhuǎn)移到AQS隊(duì)列中)前面的線程大家再繼續(xù)往下走;
(2)CyclicBarrier不是直接使用AQS實(shí)現(xiàn)的一個(gè)同步器;
(3)CyclicBarrier基于ReentrantLock及其Condition實(shí)現(xiàn)整個(gè)同步邏輯;
CyclicBarrier與CountDownLatch的異同?
(1)兩者都能實(shí)現(xiàn)阻塞一組線程等待被喚醒;
(2)前者是最后一個(gè)線程到達(dá)時(shí)自動喚醒;
(3)后者是通過顯式地調(diào)用countDown()實(shí)現(xiàn)的;
(4)前者是通過重入鎖及其條件鎖實(shí)現(xiàn)的,后者是直接基于AQS實(shí)現(xiàn)的;
(5)前者具有“代”的概念,可以重復(fù)使用,后者只能使用一次;
(6)前者只能實(shí)現(xiàn)多個(gè)線程到達(dá)柵欄處一起運(yùn)行;
(7)后者不僅可以實(shí)現(xiàn)多個(gè)線程等待一個(gè)線程條件成立,還能實(shí)現(xiàn)一個(gè)線程等待多個(gè)線程條件成立(詳見CountDownLatch那章使用案例);
1、死磕 java同步系列之開篇
2、死磕 java魔法類之Unsafe解析
3、死磕 java同步系列之JMM(Java Memory Model)
4、死磕 java同步系列之volatile解析
5、死磕 java同步系列之synchronized解析
6、死磕 java同步系列之自己動手寫一個(gè)鎖Lock
7、死磕 java同步系列之AQS起篇
8、死磕 java同步系列之ReentrantLock源碼解析(一)——公平鎖、非公平鎖
9、死磕 java同步系列之ReentrantLock源碼解析(二)——條件鎖
10、死磕 java同步系列之ReentrantLock VS synchronized
11、死磕 java同步系列之ReentrantReadWriteLock源碼解析
12、死磕 java同步系列之Semaphore源碼解析
13、死磕 java同步系列之CountDownLatch源碼解析
14、死磕 java同步系列之AQS終篇
15、死磕 java同步系列之StampedLock源碼解析
歡迎關(guān)注我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。
另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時(shí)售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價(jià)比高”等特點(diǎn)與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。