這篇文章主要介紹“CountDownLatch、Semaphore、CyclicBarrier的原理和作用是什么”,在日常操作中,相信很多人在CountDownLatch、Semaphore、CyclicBarrier的原理和作用是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”CountDownLatch、Semaphore、CyclicBarrier的原理和作用是什么”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!
專注于為中小企業(yè)提供網(wǎng)站設(shè)計、成都網(wǎng)站建設(shè)服務(wù),電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)甘泉免費做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了上千余家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實現(xiàn)規(guī)模擴充和轉(zhuǎn)變。
CountDownLatch是一個計數(shù)器閉鎖,通過它可以完成類似于阻塞當(dāng)前線程的功能,即:一個線程或多個線程一直等待,直到其他線程執(zhí)行的操作完成。CountDownLatch用一個給定的計數(shù)器來初始化,該計數(shù)器的操作是原子操作,即同時只能有一個線程去操作該計數(shù)器。調(diào)用該類await方法的線程會一直處于阻塞狀態(tài),直到其他線程調(diào)用countDown方法使當(dāng)前計數(shù)器的值變?yōu)榱悖看握{(diào)用countDown計數(shù)器的值減1。當(dāng)計數(shù)器值減至零時,所有因調(diào)用await()方法而處于等待狀態(tài)的線程就會繼續(xù)往下執(zhí)行。這種現(xiàn)象只會出現(xiàn)一次,因為計數(shù)器不能被重置,如果業(yè)務(wù)上需要一個可以重置計數(shù)次數(shù)的版本,可以考慮使用CycliBarrier。
在某些業(yè)務(wù)場景中,程序執(zhí)行需要等待某個條件完成后才能繼續(xù)執(zhí)行后續(xù)的操作;典型的應(yīng)用如并行計算,當(dāng)某個處理的運算量很大時,可以將該運算任務(wù)拆分成多個子任務(wù),等待所有的子任務(wù)都完成之后,父任務(wù)再拿到所有子任務(wù)的運算結(jié)果進(jìn)行匯總。
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class CountDownLatchExample1 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); Thread.sleep(100); } }
結(jié)果:
20:18:32.917 [pool-1-thread-7] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 6 20:18:32.917 [pool-1-thread-6] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 5 20:18:32.919 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 4 20:18:32.918 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 0 20:18:32.918 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 2 20:18:32.916 [pool-1-thread-9] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 8 20:18:32.918 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 3 20:18:32.916 [pool-1-thread-10] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 9 20:18:32.916 [pool-1-thread-8] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 7 20:18:32.917 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - 1 20:18:33.032 [main] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample1 - finish
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @Slf4j public class CountDownLatchExample2 { private final static int threadCount = 200; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final CountDownLatch countDownLatch = new CountDownLatch(threadCount); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { test(threadNum); } catch (Exception e) { log.error("exception", e); } finally { countDownLatch.countDown(); } }); } countDownLatch.await(10, TimeUnit.MILLISECONDS); log.info("finish"); exec.shutdown(); } private static void test(int threadNum) throws Exception { Thread.sleep(100); log.info("{}", threadNum); } }
結(jié)果: 超過指定時間跳過等待
20:19:34.878 [main] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - finish 20:19:34.964 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 2 20:19:34.965 [pool-1-thread-10] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 9 20:19:34.964 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 0 20:19:34.965 [pool-1-thread-8] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 7 20:19:34.964 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 1 20:19:34.965 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 4 20:19:34.965 [pool-1-thread-7] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 6 20:19:34.964 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 3 20:19:34.965 [pool-1-thread-9] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 8 20:19:34.965 [pool-1-thread-6] INFO com.mmall.concurrency.example.aqs.CountDownLatchExample2 - 5
Semaphore與CountDownLatch相似,不同的地方在于Semaphore的值被獲取到后是可以釋放的,并不像CountDownLatch那樣一直減到底。它也被更多地用來限制流量,類似閥門的 功能。如果限定某些資源最多有N個線程可以訪問,那么超過N個主不允許再有線程來訪問,同時當(dāng)現(xiàn)有線程結(jié)束后,就會釋放,然后允許新的線程進(jìn)來。有點類似于鎖的lock與 unlock過程。相對來說他也有兩個主要的方法:
用于獲取權(quán)限的acquire(),其底層實現(xiàn)與CountDownLatch.countdown()類似;
用于釋放權(quán)限的release(),其底層實現(xiàn)與acquire()是一個互逆的過程。
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @Slf4j public class SemaphoreExample1 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); // 每次最多三個線程獲取許可 final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { semaphore.acquire(); // 獲取一個許可 test(threadNum); semaphore.release(); // 釋放一個許可 } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; @Slf4j public class SemaphoreExample2 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { semaphore.acquire(3); // 獲取多個許可 test(threadNum); semaphore.release(3); // 釋放多個許可 } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @Slf4j public class SemaphoreExample3 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { if (semaphore.tryAcquire()) { // 嘗試獲取一個許可 test(threadNum); semaphore.release(); // 釋放一個許可 } } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @Slf4j public class SemaphoreExample4 { private final static int threadCount = 20; public static void main(String[] args) throws Exception { ExecutorService exec = Executors.newCachedThreadPool(); final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 嘗試獲取一個許可 test(threadNum); semaphore.release(); // 釋放一個許可 } } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); } }
CyclicBarrier也是一個同步輔助類,它允許一組線程相互等待,直到到達(dá)某個公共屏障點(common barrier point)。通過它可以完成多個線程之間相互等待,只有當(dāng)每個線程都準(zhǔn)備就緒后,才能各自繼續(xù)往下執(zhí)行后面的操作。類似于CountDownLatch,它也是通過計數(shù)器來實現(xiàn)的。當(dāng)某個線程調(diào)用await方法時,該線程進(jìn)入等待狀態(tài),且計數(shù)器加1,當(dāng)計數(shù)器的值達(dá)到設(shè)置的初始值時,所有因調(diào)用await進(jìn)入等待狀態(tài)的線程被喚醒,繼續(xù)執(zhí)行后續(xù)操作。因為CycliBarrier在釋放等待線程后可以重用,所以稱為循環(huán)barrier。CycliBarrier支持一個可選的Runnable,在計數(shù)器的值到達(dá)設(shè)定值后(但在釋放所有線程之前),該Runnable運行一次,注,Runnable在每個屏障點只運行一個。
使用場景類似于CountDownLatch與CountDownLatch的區(qū)別
CountDownLatch主要是實現(xiàn)了1個或N個線程需要等待其他線程完成某項操作之后才能繼續(xù)往下執(zhí)行操作,描述的是1個線程或N個線程等待其他線程的關(guān)系。CyclicBarrier主要是實現(xiàn)了多個線程之間相互等待,直到所有的線程都滿足了條件之后各自才能繼續(xù)執(zhí)行后續(xù)的操作,描述的多個線程內(nèi)部相互等待的關(guān)系。
CountDownLatch是一次性的,而CyclicBarrier則可以被重置而重復(fù)使用。
@Slf4j public class CyclicBarrierExample1 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); barrier.await(); log.info("{} continue", threadNum); } }
結(jié)果: ready ready .. go
20:24:34.616 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 0 is ready 20:24:35.610 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 1 is ready 20:24:36.610 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 2 is ready 20:24:37.611 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 3 is ready 20:24:38.612 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 4 is ready 20:24:38.612 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 0 continue 20:24:38.612 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 1 continue 20:24:38.612 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 4 continue 20:24:38.612 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 3 continue 20:24:38.612 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 2 continue 20:24:39.614 [pool-1-thread-6] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 5 is ready 20:24:40.613 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 6 is ready 20:24:41.614 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 7 is ready 20:24:42.615 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 8 is ready 20:24:43.615 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 9 is ready 20:24:43.615 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 9 continue 20:24:43.615 [pool-1-thread-6] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 5 continue 20:24:43.615 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 6 continue 20:24:43.615 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 7 continue 20:24:43.615 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample1 - 8 continue
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @Slf4j public class CyclicBarrierExample2 { private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); try { barrier.await(2000, TimeUnit.MILLISECONDS); } catch (Exception e) { log.warn("BarrierException", e); } log.info("{} continue", threadNum); } }
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Slf4j public class CyclicBarrierExample3 { private static CyclicBarrier barrier = new CyclicBarrier(5, () -> { log.info("callback is running"); }); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready", threadNum); barrier.await(); log.info("{} continue", threadNum); } }
結(jié)果:
20:28:32.790 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 0 is ready 20:28:33.785 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 1 is ready 20:28:34.786 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 2 is ready 20:28:35.787 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 3 is ready 20:28:36.787 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 4 is ready 20:28:36.787 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - callback is running 20:28:36.787 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 4 continue 20:28:36.788 [pool-1-thread-1] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 0 continue 20:28:36.788 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 1 continue 20:28:36.788 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 2 continue 20:28:36.788 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 3 continue 20:28:37.788 [pool-1-thread-6] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 5 is ready 20:28:38.789 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 6 is ready 20:28:39.789 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 7 is ready 20:28:40.790 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 8 is ready 20:28:41.791 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 9 is ready 20:28:41.791 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - callback is running 20:28:41.791 [pool-1-thread-3] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 9 continue 20:28:41.791 [pool-1-thread-6] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 5 continue 20:28:41.791 [pool-1-thread-4] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 6 continue 20:28:41.818 [pool-1-thread-2] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 8 continue 20:28:41.818 [pool-1-thread-5] INFO com.mmall.concurrency.example.aqs.CyclicBarrierExample3 - 7 c
1.CountDownLatch底層使用的是共享鎖,它有個內(nèi)部類Sync,這個Sync繼承AQS,實現(xiàn)了共享鎖。
具體參考JUC系列回顧之-CountDownLatch底層原理和示例
簡單畫了一下共享鎖的實現(xiàn)。
比如有4個線程在等待隊列里,并且節(jié)點類型都是共享鎖。 會喚醒head節(jié)點的下一節(jié)點中的線程Thread1。head節(jié)點就變成了之前head節(jié)點的下個節(jié)點,然后再做重復(fù)操作。 這個過程是一個傳播過程,會依次喚醒各個共享節(jié)點中的線程。
2.并發(fā)包下的另外一個工具類Semaphore底層也是使用共享鎖實現(xiàn)的。但是它跟CountDownLatch唯一的區(qū)別就是它不會喚醒所有的共享節(jié)點中的線程,而是喚醒它能喚醒的最大線程數(shù)(由信號量可用大小決定)。
3.CyclicBarrier底層使用的是ReentrantLock和這個lock的條件對象Condition。
到此,關(guān)于“CountDownLatch、Semaphore、CyclicBarrier的原理和作用是什么”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
文章標(biāo)題:CountDownLatch、Semaphore、CyclicBarrier的原理和作用是什么
分享地址:http://weahome.cn/article/jjechp.html