前面的文章中我們講到了CyclicBarrier、CountDownLatch的使用,這里再回顧一下CountDownLatch主要用在一個線程等待多個線程執(zhí)行完畢的情況,而CyclicBarrier用在多個線程互相等待執(zhí)行完畢的情況。
創(chuàng)新互聯(lián)公司,是成都地區(qū)的互聯(lián)網(wǎng)解決方案提供商,用心服務(wù)為企業(yè)提供網(wǎng)站建設(shè)、成都APP應用開發(fā)、重慶小程序開發(fā)公司、系統(tǒng)定制網(wǎng)站和微信代運營服務(wù)。經(jīng)過數(shù)十載的沉淀與積累,沉淀的是技術(shù)和服務(wù),讓客戶少走彎路,踏實做事,誠實做人,用情服務(wù),致力做一個負責任、受尊敬的企業(yè)。對客戶負責,就是對自己負責,對企業(yè)負責。
Phaser是java 7 引入的新的并發(fā)API。他引入了新的Phaser的概念,我們可以將其看成一個一個的階段,每個階段都有需要執(zhí)行的線程任務(wù),任務(wù)執(zhí)行完畢就進入下一個階段。所以Phaser特別適合使用在重復執(zhí)行或者重用的情況。
基本使用
在CyclicBarrier、CountDownLatch中,我們使用計數(shù)器來控制程序的順序執(zhí)行,同樣的在Phaser中也是通過計數(shù)器來控制。在Phaser中計數(shù)器叫做parties, 我們可以通過Phaser的構(gòu)造函數(shù)或者register()方法來注冊。
通過調(diào)用register()方法,我們可以動態(tài)的控制phaser的個數(shù)。如果我們需要取消注冊,則可以調(diào)用arriveAndDeregister()方法。
我們看下arrive:
public int arrive() { return doArrive(ONE_ARRIVAL); }
Phaser中arrive實際上調(diào)用了doArrive方法,doArrive接收一個adjust參數(shù),ONE_ARRIVAL表示arrive,ONE_DEREGISTER表示arriveAndDeregister。
Phaser中的arrive()、arriveAndDeregister()方法,這兩個方法不會阻塞,但是會返回相應的phase數(shù)字,當此phase中最后一個party也arrive以后,phase數(shù)字將會增加,即phase進入下一個周期,同時觸發(fā)(onAdvance)那些阻塞在上一phase的線程。這一點類似于CyclicBarrier的barrier到達機制;更靈活的是,我們可以通過重寫onAdvance方法來實現(xiàn)更多的觸發(fā)行為。
下面看一個基本的使用:
void runTasks(Listtasks) { final Phaser phaser = new Phaser(1); // "1" to register self // create and start threads for (final Runnable task : tasks) { phaser.register(); new Thread() { public void run() { phaser.arriveAndAwaitAdvance(); // await all creation task.run(); } }.start(); } // allow threads to start and deregister self phaser.arriveAndDeregister(); }
上面的例子中,我們在執(zhí)行每個Runnable之前調(diào)用register()來注冊, 然后調(diào)用arriveAndAwaitAdvance()來等待這一個Phaser周期結(jié)束。最后我們調(diào)用 phaser.arriveAndDeregister();來取消注冊主線程。
多個Phaser周期
Phaser的值是從0到Integer.MAX_VALUE,每個周期過后該值就會加一,如果到達Integer.MAX_VALUE則會繼續(xù)從0開始。
如果我們執(zhí)行多個Phaser周期,則可以重寫onAdvance方法:
protected boolean onAdvance(int phase, int registeredParties) { return registeredParties == 0; }
onAdvance將會在最后一個arrive()調(diào)用的時候被調(diào)用,如果這個時候registeredParties為0的話,該Phaser將會調(diào)用isTerminated方法結(jié)束該Phaser。
如果要實現(xiàn)多周期的情況,我們可以重寫這個方法:
protected boolean onAdvance(int phase, int registeredParties) { return phase >= iterations || registeredParties == 0; }
上面的例子中,如果phase次數(shù)超過了指定的iterations次數(shù)則就會自動終止。
我們看下實際的例子:
void startTasks(Listtasks, final int iterations) { final Phaser phaser = new Phaser() { protected boolean onAdvance(int phase, int registeredParties) { return phase >= iterations || registeredParties == 0; } }; phaser.register(); for (final Runnable task : tasks) { phaser.register(); new Thread() { public void run() { do { task.run(); phaser.arriveAndAwaitAdvance(); } while (!phaser.isTerminated()); } }.start(); } phaser.arriveAndDeregister(); // deregister self, don't wait }
上面的例子將會執(zhí)行iterations次。
本文的例子請參考https://github.com/ddean2009/learn-java-concurrency/tree/master/Phaser
到此這篇關(guān)于java多線程之Phaser的使用的文章就介紹到這了,更多相關(guān)java多線程Phaser內(nèi)容請搜索創(chuàng)新互聯(lián)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持創(chuàng)新互聯(lián)!