CountDownLatch的簡單實現(xiàn)
創(chuàng)新互聯(lián)從2013年創(chuàng)立,我們提供高端網(wǎng)站建設(shè)、重慶小程序開發(fā)公司、電商視覺設(shè)計、重慶APP開發(fā)公司及網(wǎng)絡(luò)營銷搜索優(yōu)化服務,在傳統(tǒng)互聯(lián)網(wǎng)與移動互聯(lián)網(wǎng)發(fā)展的背景下,我們堅守著用標準的設(shè)計方案與技術(shù)開發(fā)實力作基礎(chǔ),以企業(yè)及品牌的互聯(lián)網(wǎng)商業(yè)目標為核心,為客戶打造具商業(yè)價值與用戶體驗的互聯(lián)網(wǎng)+產(chǎn)品。
業(yè)務背景假設(shè):現(xiàn)在一個前端頁面的展示需要調(diào)用3個外部電商平臺接口的數(shù)據(jù),所以在我們提供給前端的這個接口上,我們要調(diào)用3個外部電商接口,最后需要對所有的數(shù)據(jù)做一個整合,方便前端展示。
一般情況我們都是順序調(diào)用3個電商接口,得到數(shù)據(jù)后調(diào)用整合方法,假設(shè)每個電商接口調(diào)用時間為2秒,如下:
public static void main(String[] args) throws Exception{
long startTime = new Date().getTime();
//調(diào)用第一個電商平臺的接口取得訂單數(shù),用時2s
Thread.sleep(2000);
System.out.println("獲取電商平臺1的數(shù)據(jù)");
//調(diào)用第二個電商平臺的接口取得訂單數(shù),用時2s
Thread.sleep(2000);
System.out.println("獲取電商平臺2的數(shù)據(jù)");
//調(diào)用第三個電商平臺的接口取得訂單數(shù),用時2s
Thread.sleep(2000);
System.out.println("獲取電商平臺3的數(shù)據(jù)");
System.out.println("對三個電商平臺的數(shù)據(jù)進行合并");
long endTime = new Date().getTime();
long time = endTime - startTime;
System.out.println("總耗時" + time);
}
調(diào)用后耗時6s,如下:
以上方法耗時太長了,需要優(yōu)化,優(yōu)化思路:因為3個接口沒有先后關(guān)系,所以完全可以并行執(zhí)行,之后再做數(shù)據(jù)的整合,這樣設(shè)計接口耗時肯定會節(jié)省很多
使用CountDownLatch來實現(xiàn)以上優(yōu)化思路
CountDownLatch是什么:CountDownLatch是java.util.concurrent包下的類,它在多線程并發(fā)編程里充當這計數(shù)器的功能,通過構(gòu)造函數(shù)維護一個int類型的初始值,如果一個線程調(diào)用await()方法,那么該線程就會進入阻塞狀態(tài),直到初始值變?yōu)?后,調(diào)用await()方法的阻塞線程將會被喚醒,執(zhí)行后續(xù)操作,而通過countDown()這個方法,我們就能夠?qū)崿F(xiàn)初始值的減法,每調(diào)用一次,初始值減一。
具體實現(xiàn)代碼如下:
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(3);
long startTime = new Date().getTime();
//調(diào)用第一個電商平臺的接口取得訂單數(shù),用時2s
Thread thread1 = new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("獲取電商平臺一的數(shù)據(jù)");
countDownLatch.countDown();
});
//調(diào)用第二個電商平臺的接口取得訂單數(shù),用時2s
Thread thread2 = new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("獲取電商平臺二的數(shù)據(jù)");
countDownLatch.countDown();
});
//調(diào)用第二個電商平臺的接口取得訂單數(shù),用時2s
Thread thread3 = new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("獲取電商平臺三的數(shù)據(jù)");
countDownLatch.countDown();
});
thread1.start();
thread2.start();
thread3.start();
countDownLatch.await();
System.out.println("對三個電商平臺的數(shù)據(jù)進行合并");
long endTime = new Date().getTime();
long time = endTime - startTime;
System.out.println("總耗時" + time);
}
因為電商接口有3個,所以CountDownLatch的初始值設(shè)為3,之后多線程執(zhí)行3個電商接口,每執(zhí)行完一個,調(diào)用countDown()方法把初始值減一,同時主線程調(diào)用await()進入阻塞狀態(tài),直到初始值減為0,就被重新喚醒,開始執(zhí)行數(shù)據(jù)的合并邏輯。
執(zhí)行效果如下:
可以看到總耗時節(jié)省了約三分之二
CountDownLatch的實現(xiàn)原理
先看CountDownLatch的構(gòu)造方法:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
可以看到它除了做個初始值的異常判斷外,實際上是構(gòu)造了一個Sync的對象,賦值給自己的屬性sync,那么看下Sync對象的源碼:
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
//Sync對象的構(gòu)造方法
Sync(int count) {
setState(count);
}
}
從以上源碼可以看出,Sync對象繼承了AQS,所以調(diào)用CountDownLatch的構(gòu)造方法實際上就是調(diào)用Sync對象的構(gòu)造方法,然后通過setState(count)方法設(shè)置AQS的state值。
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private volatile int state;
protected final void setState(int newState) {
state = newState;
}
}
再看countDown()方法的實現(xiàn):
public void countDown() {
sync.releaseShared(1);
}
實際上是調(diào)用了Sync對象的releaseShared()方法,參數(shù)固定為1
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
//嘗試釋放共享模式的鎖
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
}
其中方法tryReleaseShared()的具體實現(xiàn)是在CountDownLatch類,如下:
public class CountDownLatch {
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
//獲取計數(shù)器的值
protected final int getState() {
return state;
}
}
通過循環(huán)和compareAndSetState()方法我們可以看出這是一個自旋的CAS(Compare And Set)操作,先獲取state的值,為0則返回false,否則執(zhí)行減1操作,失敗就重試,直到減為0,則返回true,之后執(zhí)行doReleaseShared()方法
await()方法的實現(xiàn)
public class CountDownLatch {
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}
可以看到await()實際是調(diào)用Sync對象的acquireSharedInterruptibly()方法:
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
其中tryAcquireShared()方法的具體實現(xiàn)是在CountDownLatch類:
public class CountDownLatch {
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
}
通過該方法可以判斷出如果計數(shù)器值為0則返回1,否則返回-1,然后為0則會執(zhí)行之后的方法,如果繼續(xù)跟下去,最后會發(fā)現(xiàn)還是調(diào)用到了AQS的doReleaseShared()方法,所有阻塞的線程會被放開。
CountDownLatch和.join()的使用區(qū)別
CountDownLatch和.join()方法的作用其實很像,join()方法的使用可參考Java多線程中join()方法的使用,不過CountDownLatch使用起來會比join()方法更有靈活性。假設(shè)電商接口調(diào)用其實有兩個步驟,在每個接口的第一步獲取完數(shù)據(jù)后,還要做個數(shù)據(jù)記錄,耗時也是2s,下面給出示例代碼:
使用join()方法:鄭州人流多少錢 http://mobile.sgyy029.com/
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(3);
long startTime = new Date().getTime();
//調(diào)用第一個電商平臺的接口取得訂單數(shù),用時2s
Thread thread1 = new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("獲取電商平臺一的數(shù)據(jù)");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("獲取電商平臺一的數(shù)據(jù)后做個記錄");
});
//調(diào)用第二個電商平臺的接口取得訂單數(shù),用時2s
Thread thread2 = new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("獲取電商平臺二的數(shù)據(jù)");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("獲取電商平臺二的數(shù)據(jù)后做個記錄");
});
//調(diào)用第三個電商平臺的接口取得訂單數(shù),用時2s
Thread thread3 = new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("獲取電商平臺三的數(shù)據(jù)");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("獲取電商平臺三的數(shù)據(jù)后做個記錄");
});
thread1.start();
thread2.start();
thread3.start();
thread1.join();
thread2.join();
thread3.join();
System.out.println("對三個電商平臺的數(shù)據(jù)進行合并");
long endTime = new Date().getTime();
long time = endTime - startTime;
System.out.println("總耗時" + time);
}
耗時效果如下:
可以發(fā)現(xiàn),使用join()方法,必須得等到每個線程都結(jié)束后才會接著執(zhí)行之后的主線程,這樣總耗時就會被數(shù)據(jù)記錄的方法拖慢,達到4311ms
使用CountDownLatch,在獲取數(shù)據(jù)后就對初始值減1,而不是等到記錄方法完成才減1,如下:
public static void main(String[] args) throws Exception {
CountDownLatch countDownLatch = new CountDownLatch(3);
long startTime = new Date().getTime();
//調(diào)用第一個電商平臺的接口取得訂單數(shù),用時2s
Thread thread1 = new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("獲取電商平臺一的數(shù)據(jù)");
countDownLatch.countDown();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("獲取電商平臺一的數(shù)據(jù)后做個記錄");
});
//調(diào)用第二個電商平臺的接口取得訂單數(shù),用時2s
Thread thread2 = new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("獲取電商平臺二的數(shù)據(jù)");
countDownLatch.countDown();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("獲取電商平臺二的數(shù)據(jù)后做個記錄");
});
//調(diào)用第二個電商平臺的接口取得訂單數(shù),用時2s
Thread thread3 = new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("獲取電商平臺三的數(shù)據(jù)");
countDownLatch.countDown();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
System.out.println("獲取電商平臺三的數(shù)據(jù)后做個記錄");
});
thread1.start();
thread2.start();
thread3.start();
countDownLatch.await();
System.out.println("對三個電商平臺的數(shù)據(jù)進行合并");
long endTime = new Date().getTime();
long time = endTime - startTime;
System.out.println("總耗時" + time);
}
耗時效果如下:
可以發(fā)現(xiàn)耗時才2185ms
以上就是CountDownLatch和join()方法的使用區(qū)別,相比起join()方法要等線程都執(zhí)行完才會執(zhí)行阻塞的線程,CountDownLatch就能夠靈活控制阻塞線程的執(zhí)行時機,耗時可以更少。