(1)LinkedTransferQueue是什么東東?
為西峽等地區(qū)用戶提供了全套網(wǎng)頁設(shè)計(jì)制作服務(wù),及西峽網(wǎng)站建設(shè)行業(yè)解決方案。主營業(yè)務(wù)為網(wǎng)站制作、網(wǎng)站設(shè)計(jì)、西峽網(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會得到認(rèn)可,從而選擇與我們長期合作。這樣,我們也可以走得更遠(yuǎn)!
(2)LinkedTransferQueue是怎么實(shí)現(xiàn)阻塞隊(duì)列的?
(3)LinkedTransferQueue是怎么控制并發(fā)安全的?
(4)LinkedTransferQueue與SynchronousQueue有什么異同?
LinkedTransferQueue是LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合體,它綜合了這三者的方法,并且提供了更加高效的實(shí)現(xiàn)方式。
LinkedTransferQueue實(shí)現(xiàn)了TransferQueue接口,而TransferQueue接口是繼承自BlockingQueue的,所以LinkedTransferQueue也是一個(gè)阻塞隊(duì)列。
TransferQueue接口中定義了以下幾個(gè)方法:
// 嘗試移交元素
boolean tryTransfer(E e);
// 移交元素
void transfer(E e) throws InterruptedException;
// 嘗試移交元素(有超時(shí)時(shí)間)
boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
// 判斷是否有消費(fèi)者
boolean hasWaitingConsumer();
// 查看消費(fèi)者的數(shù)量
int getWaitingConsumerCount();
主要是定義了三個(gè)移交元素的方法,有阻塞的,有不阻塞的,有超時(shí)的。
LinkedTransferQueue使用了一個(gè)叫做dual data structure
的數(shù)據(jù)結(jié)構(gòu),或者叫做dual queue
,譯為雙重?cái)?shù)據(jù)結(jié)構(gòu)或者雙重隊(duì)列。
雙重隊(duì)列是什么意思呢?
放取元素使用同一個(gè)隊(duì)列,隊(duì)列中的節(jié)點(diǎn)具有兩種模式,一種是數(shù)據(jù)節(jié)點(diǎn),一種是非數(shù)據(jù)節(jié)點(diǎn)。
放元素時(shí)先跟隊(duì)列頭節(jié)點(diǎn)對比,如果頭節(jié)點(diǎn)是非數(shù)據(jù)節(jié)點(diǎn),就讓他們匹配,如果頭節(jié)點(diǎn)是數(shù)據(jù)節(jié)點(diǎn),就生成一個(gè)數(shù)據(jù)節(jié)點(diǎn)放在隊(duì)列尾端(入隊(duì))。
取元素時(shí)也是先跟隊(duì)列頭節(jié)點(diǎn)對比,如果頭節(jié)點(diǎn)是數(shù)據(jù)節(jié)點(diǎn),就讓他們匹配,如果頭節(jié)點(diǎn)是非數(shù)據(jù)節(jié)點(diǎn),就生成一個(gè)非數(shù)據(jù)節(jié)點(diǎn)放在隊(duì)列尾端(入隊(duì))。
用圖形來表示就是下面這樣:
不管是放元素還是取元素,都先跟頭節(jié)點(diǎn)對比,如果二者模式不一樣就匹配它們,如果二者模式一樣,就入隊(duì)。
// 頭節(jié)點(diǎn)
transient volatile Node head;
// 尾節(jié)點(diǎn)
private transient volatile Node tail;
// 放取元素的幾種方式:
// 立即返回,用于非超時(shí)的poll()和tryTransfer()方法中
private static final int NOW = 0; // for untimed poll, tryTransfer
// 異步,不會阻塞,用于放元素時(shí),因?yàn)閮?nèi)部使用×××單鏈表存儲元素,不會阻塞放元素的過程
private static final int ASYNC = 1; // for offer, put, add
// 同步,調(diào)用的時(shí)候如果沒有匹配到會阻塞直到匹配到為止
private static final int SYNC = 2; // for transfer, take
// 超時(shí),用于有超時(shí)的poll()和tryTransfer()方法中
private static final int TIMED = 3; // for timed poll, tryTransfer
static final class Node {
// 是否是數(shù)據(jù)節(jié)點(diǎn)(也就標(biāo)識了是生產(chǎn)者還是消費(fèi)者)
final boolean isData; // false if this is a request node
// 元素的值
volatile Object item; // initially non-null if isData; CASed to match
// 下一個(gè)節(jié)點(diǎn)
volatile Node next;
// 持有元素的線程
volatile Thread waiter; // null until waiting
}
典型的單鏈表結(jié)構(gòu),內(nèi)部除了存儲元素的值和下一個(gè)節(jié)點(diǎn)的指針外,還包含了是否為數(shù)據(jù)節(jié)點(diǎn)和持有元素的線程。
內(nèi)部通過isData區(qū)分是生產(chǎn)者還是消費(fèi)者。
public LinkedTransferQueue() {
}
public LinkedTransferQueue(Collection extends E> c) {
this();
addAll(c);
}
只有這兩個(gè)構(gòu)造方法,且沒有初始容量,所以是×××的一個(gè)阻塞隊(duì)列。
四個(gè)方法都是一樣的,使用異步的方式調(diào)用xfer()方法,傳入的參數(shù)都一模一樣。
public void put(E e) {
// 異步模式,不會阻塞,不會超時(shí)
// 因?yàn)槭欠旁?,單鏈表存儲,會一直往后? xfer(e, true, ASYNC, 0);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean add(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
xfer(E e, boolean haveData, int how, long nanos)的參數(shù)分別是:
(1)e表示元素;
(2)haveData表示是否是數(shù)據(jù)節(jié)點(diǎn),
(3)how表示放取元素的方式,上面提到的四種,NOW、ASYNC、SYNC、TIMED;
(4)nanos表示超時(shí)時(shí)間;
出隊(duì)的四個(gè)方法也是直接或間接的調(diào)用xfer()方法,放取元素的方式和超時(shí)規(guī)則略微不同,本質(zhì)沒有大的區(qū)別。
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
public E take() throws InterruptedException {
// 同步模式,會阻塞直到取到元素
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 有超時(shí)時(shí)間
E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}
public E poll() {
// 立即返回,沒取到元素返回null
return xfer(null, false, NOW, 0);
}
取元素就各有各的玩法了,有同步的,有超時(shí)的,有立即返回的。
public boolean tryTransfer(E e) {
// 立即返回
return xfer(e, true, NOW, 0) == null;
}
public void transfer(E e) throws InterruptedException {
// 同步模式
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
// 有超時(shí)時(shí)間
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
請注意第二個(gè)參數(shù),都是true,也就是這三個(gè)方法其實(shí)也是放元素的方法。
這里xfer()方法的幾種模式到底有什么區(qū)別呢?請看下面的分析。
private E xfer(E e, boolean haveData, int how, long nanos) {
// 不允許放入空元素
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
// 外層循環(huán),自旋,失敗就重試
retry:
for (;;) { // restart on append race
// 下面這個(gè)for循環(huán)用于控制匹配的過程
// 同一時(shí)刻隊(duì)列中只會存儲一種類型的節(jié)點(diǎn)
// 從頭節(jié)點(diǎn)開始嘗試匹配,如果頭節(jié)點(diǎn)被其它線程先一步匹配了
// 就再嘗試其下一個(gè),直到匹配到為止,或者到隊(duì)列中沒有元素為止
for (Node h = head, p = h; p != null;) { // find & match first node
// p節(jié)點(diǎn)的模式
boolean isData = p.isData;
// p節(jié)點(diǎn)的值
Object item = p.item;
// p沒有被匹配到
if (item != p && (item != null) == isData) { // unmatched
// 如果兩者模式一樣,則不能匹配,跳出循環(huán)后嘗試入隊(duì)
if (isData == haveData) // can't match
break;
// 如果兩者模式不一樣,則嘗試匹配
// 把p的值設(shè)置為e(如果是取元素則e是null,如果是放元素則e是元素值)
if (p.casItem(item, e)) { // match
// 匹配成功
// for里面的邏輯比較復(fù)雜,用于控制多線程同時(shí)放取元素時(shí)出現(xiàn)競爭的情況的
// 看不懂可以直接跳過
for (Node q = p; q != h;) {
// 進(jìn)入到這里可能是頭節(jié)點(diǎn)已經(jīng)被匹配,然后p會變成h的下一個(gè)節(jié)點(diǎn)
Node n = q.next; // update by 2 unless singleton
// 如果head還沒變,就把它更新成新的節(jié)點(diǎn)
// 并把它刪除(forgetNext()會把它的next設(shè)為自己,也就是從單鏈表中刪除了)
// 這時(shí)為什么要把head設(shè)為n呢?因?yàn)榈竭@里了,肯定head本身已經(jīng)被匹配掉了
// 而上面的p.casItem()又成功了,說明p也被當(dāng)前這個(gè)元素給匹配掉了
// 所以需要把它們倆都出隊(duì)列,讓其它線程可以從真正的頭開始,不用重復(fù)檢查了
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
// 如果新的頭節(jié)點(diǎn)為空,或者其next為空,或者其next未匹配,就重試
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// 喚醒p中等待的線程
LockSupport.unpark(p.waiter);
// 并返回匹配到的元素
return LinkedTransferQueue.cast(item);
}
}
// p已經(jīng)被匹配了或者嘗試匹配的時(shí)候失敗了
// 也就是其它線程先一步匹配了p
// 這時(shí)候又分兩種情況,p的next還沒來得及修改,p的next指向了自己
// 如果p的next已經(jīng)指向了自己,就重新取head重試,否則就取其next重試
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
// 到這里肯定是隊(duì)列中存儲的節(jié)點(diǎn)類型和自己一樣
// 或者隊(duì)列中沒有元素了
// 就入隊(duì)(不管放元素還是取元素都得入隊(duì))
// 入隊(duì)又分成四種情況:
// NOW,立即返回,沒有匹配到立即返回,不做入隊(duì)操作
// ASYNC,異步,元素入隊(duì)但當(dāng)前線程不會阻塞(相當(dāng)于×××LinkedBlockingQueue的元素入隊(duì))
// SYNC,同步,元素入隊(duì)后當(dāng)前線程阻塞,等待被匹配到
// TIMED,有超時(shí),元素入隊(duì)后等待一段時(shí)間被匹配,時(shí)間到了還沒匹配到就返回元素本身
// 如果不是立即返回
if (how != NOW) { // No matches available
// 新建s節(jié)點(diǎn)
if (s == null)
s = new Node(e, haveData);
// 嘗試入隊(duì)
Node pred = tryAppend(s, haveData);
// 入隊(duì)失敗,重試
if (pred == null)
continue retry; // lost race vs opposite mode
// 如果不是異步(同步或者有超時(shí))
// 就等待被匹配
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
private Node tryAppend(Node s, boolean haveData) {
// 從tail開始遍歷,把s放到鏈表尾端
for (Node t = tail, p = t;;) { // move p to last node and append
Node n, u; // temps for reads of next & tail
// 如果首尾都是null,說明鏈表中還沒有元素
if (p == null && (p = head) == null) {
// 就讓首節(jié)點(diǎn)指向s
// 注意,這里插入第一個(gè)元素的時(shí)候tail指針并沒有指向s
if (casHead(null, s))
return s; // initialize
}
else if (p.cannotPrecede(haveData))
// 如果p無法處理,則返回null
// 這里無法處理的意思是,p和s節(jié)點(diǎn)的類型不一樣,不允許s入隊(duì)
// 比如,其它線程先入隊(duì)了一個(gè)數(shù)據(jù)節(jié)點(diǎn),這時(shí)候要入隊(duì)一個(gè)非數(shù)據(jù)節(jié)點(diǎn),就不允許,
// 隊(duì)列中所有的元素都要保證是同一種類型的節(jié)點(diǎn)
// 返回null后外面的方法會重新嘗試匹配重新入隊(duì)等
return null; // lost race vs opposite mode
else if ((n = p.next) != null) // not last; keep traversing
// 如果p的next不為空,說明不是最后一個(gè)節(jié)點(diǎn)
// 則讓p重新指向最后一個(gè)節(jié)點(diǎn)
p = p != t && t != (u = tail) ? (t = u) : // stale tail
(p != n) ? n : null; // restart if off list
else if (!p.casNext(null, s))
// 如果CAS更新s為p的next失敗
// 則說明有其它線程先一步更新到p的next了
// 就讓p指向p的next,重新嘗試讓s入隊(duì)
p = p.next; // re-read on CAS failure
else {
// 到這里說明s成功入隊(duì)了
// 如果p不等于t,就更新tail指針
// 還記得上面插入第一個(gè)元素時(shí)tail指針并沒有指向新元素嗎?
// 這里就是用來更新tail指針的
if (p != t) { // update if slack now >= 2
while ((tail != t || !casTail(t, s)) &&
(t = tail) != null &&
(s = t.next) != null && // advance and retry
(s = s.next) != null && s != t);
}
// 返回p,即s的前一個(gè)元素
return p;
}
}
}
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) {
// 如果是有超時(shí)的,計(jì)算其超時(shí)時(shí)間
final long deadline = timed ? System.nanoTime() + nanos : 0L;
// 當(dāng)前線程
Thread w = Thread.currentThread();
// 自旋次數(shù)
int spins = -1; // initialized after first item and cancel checks
// 隨機(jī)數(shù),隨機(jī)讓一些自旋的線程讓出CPU
ThreadLocalRandom randomYields = null; // bound if needed
for (;;) {
Object item = s.item;
// 如果s元素的值不等于e,說明它被匹配到了
if (item != e) { // matched
// assert item != s;
// 把s的item更新為s本身
// 并把s中的waiter置為空
s.forgetContents(); // avoid garbage
// 返回匹配到的元素
return LinkedTransferQueue.cast(item);
}
// 如果當(dāng)前線程中斷了,或者有超時(shí)的到期了
// 就更新s的元素值指向s本身
if ((w.isInterrupted() || (timed && nanos <= 0)) &&
s.casItem(e, s)) { // cancel
// 嘗試解除s與其前一個(gè)節(jié)點(diǎn)的關(guān)系
// 也就是刪除s節(jié)點(diǎn)
unsplice(pred, s);
// 返回元素的值本身,說明沒匹配到
return e;
}
// 如果自旋次數(shù)小于0,就計(jì)算自旋次數(shù)
if (spins < 0) { // establish spins at/near front
// spinsFor()計(jì)算自旋次數(shù)
// 如果前面有節(jié)點(diǎn)未被匹配就返回0
// 如果前面有節(jié)點(diǎn)且正在匹配中就返回一定的次數(shù),等待
if ((spins = spinsFor(pred, s.isData)) > 0)
// 初始化隨機(jī)數(shù)
randomYields = ThreadLocalRandom.current();
}
else if (spins > 0) { // spin
// 還有自旋次數(shù)就減1
--spins;
// 并隨機(jī)讓出CPU
if (randomYields.nextInt(CHAINED_SPINS) == 0)
Thread.yield(); // occasionally yield
}
else if (s.waiter == null) {
// 更新s的waiter為當(dāng)前線程
s.waiter = w; // request unpark then recheck
}
else if (timed) {
// 如果有超時(shí),計(jì)算超時(shí)時(shí)間,并阻塞一定時(shí)間
nanos = deadline - System.nanoTime();
if (nanos > 0L)
LockSupport.parkNanos(this, nanos);
}
else {
// 不是超時(shí)的,直接阻塞,等待被喚醒
// 喚醒后進(jìn)入下一次循環(huán),走第一個(gè)if的邏輯就返回匹配的元素了
LockSupport.park(this);
}
}
}
這三個(gè)方法里的內(nèi)容特別復(fù)雜,很大一部分代碼都是在控制線程安全,各種CAS,我們這里簡單描述一下大致的邏輯:
(1)來了一個(gè)元素,我們先查看隊(duì)列頭的節(jié)點(diǎn),是否與這個(gè)元素的模式一樣;
(2)如果模式不一樣,就嘗試讓他們匹配,如果頭節(jié)點(diǎn)被別的線程先匹配走了,就嘗試與頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)匹配,如此一直往后,直到匹配到或到鏈表尾為止;
(3)如果模式一樣,或者到鏈表尾了,就嘗試入隊(duì);
(4)入隊(duì)的時(shí)候有可能鏈表尾修改了,那就尾指針后移,再重新嘗試入隊(duì),依此往復(fù);
(5)入隊(duì)成功了,就自旋或阻塞,阻塞了就等待被其它線程匹配到并喚醒;
(6)喚醒之后進(jìn)入下一次循環(huán)就匹配到元素了,返回匹配到的元素;
(7)是否需要入隊(duì)及阻塞有四種情況:
a)NOW,立即返回,沒有匹配到立即返回,不做入隊(duì)操作
對應(yīng)的方法有:poll()、tryTransfer(e)
b)ASYNC,異步,元素入隊(duì)但當(dāng)前線程不會阻塞(相當(dāng)于×××LinkedBlockingQueue的元素入隊(duì))
對應(yīng)的方法有:add(e)、offer(e)、put(e)、offer(e, timeout, unit)
c)SYNC,同步,元素入隊(duì)后當(dāng)前線程阻塞,等待被匹配到
對應(yīng)的方法有:take()、transfer(e)
d)TIMED,有超時(shí),元素入隊(duì)后等待一段時(shí)間被匹配,時(shí)間到了還沒匹配到就返回元素本身
對應(yīng)的方法有:poll(timeout, unit)、tryTransfer(e, timeout, unit)
(1)LinkedTransferQueue可以看作LinkedBlockingQueue、SynchronousQueue(公平模式)、ConcurrentLinkedQueue三者的集合體;
(2)LinkedTransferQueue的實(shí)現(xiàn)方式是使用一種叫做雙重隊(duì)列
的數(shù)據(jù)結(jié)構(gòu);
(3)不管是取元素還是放元素都會入隊(duì);
(4)先嘗試跟頭節(jié)點(diǎn)比較,如果二者模式不一樣,就匹配它們,組成CP,然后返回對方的值;
(5)如果二者模式一樣,就入隊(duì),并自旋或阻塞等待被喚醒;
(6)至于是否入隊(duì)及阻塞有四種模式,NOW、ASYNC、SYNC、TIMED;
(7)LinkedTransferQueue全程都沒有使用synchronized、重入鎖等比較重的鎖,基本是通過 自旋+CAS 實(shí)現(xiàn);
(8)對于入隊(duì)之后,先自旋一定次數(shù)后再調(diào)用LockSupport.park()或LockSupport.parkNanos阻塞;
LinkedTransferQueue與SynchronousQueue(公平模式)有什么異同呢?
(1)在java8中兩者的實(shí)現(xiàn)方式基本一致,都是使用的雙重隊(duì)列;
(2)前者完全實(shí)現(xiàn)了后者,但比后者更靈活;
(3)后者不管放元素還是取元素,如果沒有可匹配的元素,所在的線程都會阻塞;
(4)前者可以自己控制放元素是否需要阻塞線程,比如使用四個(gè)添加元素的方法就不會阻塞線程,只入隊(duì)元素,使用transfer()會阻塞線程;
(5)取元素兩者基本一樣,都會阻塞等待有新的元素進(jìn)入被匹配到;
歡迎關(guān)注我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋。