java并發(fā)容器J.U.C AQS怎么用,很多新手對(duì)此不是很清楚,為了幫助大家解決這個(gè)難題,下面小編將為大家詳細(xì)講解,有這方面需求的人可以來(lái)學(xué)習(xí)下,希望你能有所收獲。
成都創(chuàng)新互聯(lián)公司堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都做網(wǎng)站、網(wǎng)站設(shè)計(jì)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿(mǎn)足客戶(hù)于互聯(lián)網(wǎng)時(shí)代的江陰網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!
J.U.C 大大提高了java并發(fā)的性能,而AQS則是J.U.C的核心。
AQS底層使用雙向列表(隊(duì)列的一種實(shí)現(xiàn))。
使用Node實(shí)現(xiàn)FIFO隊(duì)列,可以用于構(gòu)建鎖或者其他同步裝置的基礎(chǔ)框架
利用了一個(gè)int類(lèi)型表示狀態(tài)。 在AQS中有一個(gè)status的成員變量,基于AQS有一個(gè)同步組件ReentrantLock,在這個(gè)ReentrantLock中status表示獲取鎖的線(xiàn)程數(shù),例如status=0表示還沒(méi)有線(xiàn)程獲取鎖,status=1表示已經(jīng)有線(xiàn)程獲取了鎖,status>1表示重入鎖的數(shù)量。
使用方法:繼承
子類(lèi)通過(guò)繼承并通過(guò)實(shí)現(xiàn)它的方法管理其狀態(tài){acquire和release}的方法操縱狀態(tài)
可以同時(shí)實(shí)現(xiàn)排它鎖和共享鎖模式(獨(dú)占,共享)
countdownLatch,閉鎖,通過(guò)一個(gè)計(jì)數(shù)來(lái)保證線(xiàn)程是否需要一直阻塞。
semaphore,控制同一時(shí)間并發(fā)線(xiàn)程的數(shù)量。
cyclicbarrier,與countdownlatch很像,都能阻塞進(jìn)程。
reentrantlock
condition
futuretask
是一個(gè)同步輔助類(lèi),通過(guò)他可以實(shí)現(xiàn)類(lèi)似于阻塞當(dāng)前線(xiàn)程的功能。一個(gè)線(xiàn)程或多個(gè)線(xiàn)程一直等待,直到其他線(xiàn)程操作完成,countdownlatch用了一個(gè)給定的計(jì)數(shù)器來(lái)進(jìn)行初始化,該計(jì)數(shù)器的操作是原子操作,也就是同時(shí)只能有一個(gè)線(xiàn)程操作該計(jì)數(shù)器。調(diào)用該類(lèi)的await()方法則會(huì)一直處于阻塞狀態(tài),直到其他線(xiàn)程調(diào)用countdown()方法,每次調(diào)用countdown()方法會(huì)使得計(jì)數(shù)器的值減1,當(dāng)計(jì)數(shù)器的值減為0時(shí),所有因調(diào)用await方法處于等待狀態(tài)的線(xiàn)程就會(huì)繼續(xù)往下執(zhí)行。這種狀態(tài)只會(huì)出現(xiàn)一次,因?yàn)檫@里的計(jì)數(shù)器是不能被重置的,如果業(yè)務(wù)上需要一個(gè)可以重置計(jì)數(shù)次數(shù)的版本,可以考慮使用cyclicbarrier。
在某些業(yè)務(wù)場(chǎng)景中,程序執(zhí)行需要等到某個(gè)條件完成后才能繼續(xù)執(zhí)行后續(xù)的操作,典型的應(yīng)用例如并行計(jì)算:當(dāng)某個(gè)處理的運(yùn)算量很大時(shí),可以將該運(yùn)算任務(wù)拆分多個(gè)子任務(wù),等待所有的子任務(wù)都完成之后,父任務(wù)拿到所有的子任務(wù)的運(yùn)行結(jié)果進(jìn)行匯總。
下面舉例countdownlatch的基本用法:
@Slf4j public class CountDownLatchExample1 { private final static int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for(int i = 0; i< threadCount; i++) { final int threadNum = i; executorService.execute(() ->{ try { test(threadNum); } catch (InterruptedException e) { log.error("exception", e); }finally { countDownLatch.countDown(); } }); } //可以保證之前的線(xiàn)程都執(zhí)行完成 countDownLatch.await(); log.info("finish"); executorService.shutdown(); } private static void test(int threadNum) throws InterruptedException { Thread.sleep(100); log.info("{}", threadNum); Thread.sleep(100); } }
一個(gè)復(fù)雜的場(chǎng)景:我們開(kāi)了很多個(gè)線(xiàn)程去完成一個(gè)任務(wù),但是這個(gè)任務(wù)需要在指定的時(shí)間內(nèi)完成,如果超過(guò)一定的時(shí)間沒(méi)有完成則放棄該任務(wù)。
@Slf4j public class CountDownLatchExample2 { private final static int threadCount = 200; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for(int i = 0; i< threadCount; i++) { final int threadNum = i; executorService.execute(() ->{ try { test(threadNum); } catch (InterruptedException e) { log.error("exception", e); }finally { countDownLatch.countDown(); } }); } //可以保證之前的線(xiàn)程都執(zhí)行完成 countDownLatch.await(10, TimeUnit.MILLISECONDS); log.info("finish"); // 第一時(shí)間內(nèi)并不會(huì)把所有線(xiàn)程都銷(xiāo)毀,而是讓當(dāng)前已有線(xiàn)程執(zhí)行完之后在把線(xiàn)程池銷(xiāo)毀。 executorService.shutdown(); } private static void test(int threadNum) throws InterruptedException { Thread.sleep(100); log.info("{}", threadNum); } }
可以控制某個(gè)資源可被同時(shí)訪(fǎng)問(wèn)的個(gè)數(shù),與countdownlatch有些類(lèi)似,提供了兩個(gè)核心方法:aquire和release。aquire表示獲取一個(gè)許可,如果沒(méi)有則等待,release表示操作完成后釋放一個(gè)許可。semaphore維護(hù)了當(dāng)前訪(fǎng)問(wèn)的個(gè)數(shù),提供同步機(jī)制控制訪(fǎng)問(wèn)的個(gè)數(shù)。
常用于僅能提供有限訪(fǎng)問(wèn)的資源例如數(shù)據(jù)庫(kù)連接數(shù)是有限的,而上層應(yīng)用的并發(fā)數(shù)會(huì)遠(yuǎn)遠(yuǎn)大于連接數(shù),如果同時(shí)對(duì)數(shù)據(jù)庫(kù)進(jìn)行操作可能出現(xiàn)因?yàn)闊o(wú)法獲取數(shù)據(jù)庫(kù)連接而導(dǎo)致異常。這時(shí)可以通過(guò)信號(hào)量semaphore來(lái)并發(fā)訪(fǎng)問(wèn)控制。當(dāng)semaphore把并發(fā)數(shù)控制到1時(shí)就跟單線(xiàn)程運(yùn)行很相似了。
舉例如下:
@Slf4j public class SemaphoreExample1 { private final static int threadCount = 20; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //允許的并發(fā)數(shù) final Semaphore semaphore = new Semaphore(3); for(int i = 0; i< threadCount; i++) { final int threadNum = i; executorService.execute(() ->{ try { // 獲取一個(gè)許可 semaphore.acquire(); test(threadNum); // 釋放一個(gè)許可 semaphore.release(); } catch (InterruptedException e) { log.error("exception", e); } }); } log.info("finish"); executorService.shutdown(); } private static void test(int threadNum) throws InterruptedException { log.info("{}", threadNum); Thread.sleep(1000); } }
運(yùn)行結(jié)果可以看到同時(shí)3個(gè)線(xiàn)程在執(zhí)行。
也可以獲得多個(gè)許可:
@Slf4j public class SemaphoreExample2 { private final static int threadCount = 20; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //允許的并發(fā)數(shù) final Semaphore semaphore = new Semaphore(3); for(int i = 0; i< threadCount; i++) { final int threadNum = i; executorService.execute(() ->{ try { // 獲取多個(gè)許可 semaphore.acquire(3); test(threadNum); // 釋放多個(gè)許可 semaphore.release(3); } catch (InterruptedException e) { log.error("exception", e); } }); } log.info("finish"); executorService.shutdown(); } private static void test(int threadNum) throws InterruptedException { log.info("{}", threadNum); Thread.sleep(1000); } }
每一次獲取三個(gè)許可,而同時(shí)只允許3個(gè)并發(fā)數(shù),相當(dāng)于單線(xiàn)程在運(yùn)行。
@Slf4j public class SemaphoreExample3 { private final static int threadCount = 20; public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newCachedThreadPool(); //允許的并發(fā)數(shù) final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; executorService.execute(() -> { try { // 嘗試獲取一個(gè)許可 if (semaphore.tryAcquire()) { test(threadNum); // 釋放一個(gè)許可 semaphore.release(); } } catch (InterruptedException e) { log.error("exception", e); } }); } log.info("finish"); executorService.shutdown(); } private static void test(int threadNum) throws InterruptedException { log.info("{}", threadNum); Thread.sleep(1000); } }
輸出結(jié)果:
15:24:21.098 [pool-1-thread-1] INFO com.vincent.example.aqs.SemaphoreExample3 - 0 15:24:21.098 [pool-1-thread-2] INFO com.vincent.example.aqs.SemaphoreExample3 - 1 15:24:21.098 [main] INFO com.vincent.example.aqs.SemaphoreExample3 - finish 15:24:21.098 [pool-1-thread-3] INFO com.vincent.example.aqs.SemaphoreExample3 - 2
因?yàn)槲覀兺€(xiàn)程池中放了二十個(gè)請(qǐng)求,二十個(gè)請(qǐng)求在同一時(shí)間內(nèi)都會(huì)嘗試去執(zhí)行,semaphore會(huì)嘗試讓每個(gè)線(xiàn)程去獲取許可,而同一時(shí)刻內(nèi)我們的并發(fā)數(shù)是3,也就是只有三個(gè)線(xiàn)程獲取到了許可,而test方法內(nèi)有Thread.sleep(1000),因此其余17個(gè)線(xiàn)程都不能拿到許可,直接結(jié)束。
semaphore.tryAcquire(3, TimeUnit.SECONDS)
表示可以等3秒,如果3秒內(nèi)沒(méi)拿到許可就結(jié)束。
也是一個(gè)同步輔助類(lèi),允許一組線(xiàn)程相互等待,直到到達(dá)某個(gè)公共的屏障點(diǎn)??梢酝瓿啥鄠€(gè)線(xiàn)程之間相互等待,只有當(dāng)每個(gè)線(xiàn)程都準(zhǔn)備就緒后,才能各自繼續(xù)往下執(zhí)行謀面的操作。它和countdownlatch有相似的地方,都是通過(guò)計(jì)數(shù)器來(lái)實(shí)現(xiàn)的,當(dāng)一個(gè)線(xiàn)程調(diào)用await()方法后,該線(xiàn)程就進(jìn)入了等待狀態(tài)。當(dāng)循環(huán)計(jì)數(shù)器的值達(dá)到設(shè)置的初始值之后,進(jìn)入等待狀態(tài)的線(xiàn)程會(huì)被喚醒,繼續(xù)執(zhí)行后續(xù)操作。因?yàn)镃yclicBarrier在釋放等待線(xiàn)程后可以重用,所以稱(chēng)他為循環(huán)屏障。
CyclicBarrier的使用場(chǎng)景與countdownlatch類(lèi)似,CyclicBarrier可以用于多線(xiàn)程計(jì)算數(shù)據(jù),最后合并計(jì)算結(jié)果的應(yīng)用場(chǎng)景。
CyclicBarrier與Countdownlatch的區(qū)別:
countdownlatch的計(jì)數(shù)器只能使用一次,CyclicBarrier可以使用reset方法重復(fù)使用
countdownlatch主要是實(shí)現(xiàn)一個(gè)或n個(gè)線(xiàn)程需要等待其他線(xiàn)程完成某項(xiàng)操作之后,才能繼續(xù)往下執(zhí)行,他描述的是1個(gè)或n個(gè)線(xiàn)程等待其他線(xiàn)程的關(guān)系。而CyclicBarrier主要實(shí)現(xiàn)了多個(gè)線(xiàn)程之間相互等待,直到所有線(xiàn)程都滿(mǎn)足了條件之后才能繼續(xù)執(zhí)行后續(xù)的操作,它描述的是各個(gè)線(xiàn)程內(nèi)部相互等待的關(guān)系。所以CyclicBarrier可以處理更復(fù)雜的業(yè)務(wù)場(chǎng)景,例如計(jì)數(shù)器發(fā)生錯(cuò)誤可以重置計(jì)數(shù)器,讓線(xiàn)程重新執(zhí)行一次。
看完上述內(nèi)容是否對(duì)您有幫助呢?如果還想對(duì)相關(guān)知識(shí)有進(jìn)一步的了解或閱讀更多相關(guān)文章,請(qǐng)關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對(duì)創(chuàng)新互聯(lián)的支持。