本篇內(nèi)容主要講解“JUC的LinkedTransferQueue怎么使用”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“JUC的LinkedTransferQueue怎么使用”吧!
網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)!專注于網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、微信小程序開發(fā)、集團企業(yè)網(wǎng)站建設(shè)等服務(wù)項目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了興慶免費建站歡迎大家使用!
LinkedTransferQueue 在 jdk 1.7 被引入,是一個基于 Dual Queue 數(shù)據(jù)結(jié)構(gòu)實現(xiàn)的無界線程安全隊列,其作者 Doug Lea 描述 LinkedTransferQueue 從功能上來說是 ConcurrentLinkedQueue、SynchronousQueue(公平模式),以及 LinkedBlockingQueue 的超集,并且更加實用和高效。
下面的章節(jié)我們將一起來分析 LinkedTransferQueue 的設(shè)計與實現(xiàn),不過在開始之前還是需要先對兩個名詞做一下解釋,即匹配和松弛度。
我們在上一篇介紹 SynchronousQueue 隊列時已經(jīng)解釋了 匹配的概念,這里再重復(fù)介紹一下。LinkedTransferQueue 在內(nèi)部基于隊列實現(xiàn)線程間的交互,以“生產(chǎn)者-消費者”為例,當生產(chǎn)者往 LinkedTransferQueue 中插入一個元素時,通常情況下該生產(chǎn)者線程在插入成功之后并不會立即返回,而是等待消費者前來消費。當消費者執(zhí)行消費時發(fā)現(xiàn)隊列上正好有生產(chǎn)者在等待,于是執(zhí)行消費邏輯,也稱為開始執(zhí)行匹配進程,將當前消費者與生產(chǎn)者匹配成一對兒紛紛出隊列。
匹配描述的是 Dual Queue 的運行機制,而 松弛度(slack)則是一種優(yōu)化策略。為了避免頻繁移動隊列的 head 和 tail 指針,作者引入了松弛度的概念,以度量 head 結(jié)點(或 tail 結(jié)點)與最近一個未匹配結(jié)點之間的距離。當一個結(jié)點被匹配(或取消,或插入)時,LinkedTransferQueue 并不會立即更新相應(yīng)的 head 或 tail 指針,而是當松弛度大于指定閾值時才觸發(fā)更新。這個閾值的取值范圍一般設(shè)置在 1 到 3 之間,如果太大會降低有效結(jié)點命中率,增加遍歷的長度,太小則會增加 CAS 的競爭和開銷。
TransferQueue 接口在 JDK 1.7 被引入,用于描述一種全新的阻塞隊列。LinkedTransferQueue 實現(xiàn)自 TransferQueue 接口,并且是目前(JDK 1.8)該接口的唯一實現(xiàn)類。TransferQueue 接口繼承自 BlockingQueue 接口,由 BlockingQueue 描述的阻塞隊列在隊列為空或者已滿時,相應(yīng)的出隊列線程或入隊列線程會阻塞等待,而 TransferQueue 則更進一步。以入隊列操作為例,當線程成功將元素添加到由 TransferQueue 描述的阻塞隊列中后,該線程通常會一直阻塞直到某個出隊列線程從隊列中取走該入隊列線程添加的元素。
TransferQueue 在 BlockingQueue 接口的基礎(chǔ)上增加了以下方法:
public interface TransferQueueextends BlockingQueue { void transfer(E e) throws InterruptedException; boolean tryTransfer(E e); boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException; boolean hasWaitingConsumer(); int getWaitingConsumerCount(); }
針對各方法的含義說明如下:
transfer
:生產(chǎn)者將元素直接傳遞給正在等待的消費者,而不執(zhí)行入隊列操作,如果沒有正在等待的消費者則無限期等待,期間支持響應(yīng)中斷。
tryTransfer
:生產(chǎn)者將元素直接傳遞給正在等待的消費者,而不執(zhí)行入隊列操作,如果沒有正在等待的消費者則返回 false,提供相應(yīng)的超時版本。
hasWaitingConsumer
:檢查是否存在正在等待的消費者。
getWaitingConsumerCount
:返回當前正在等待的消費者數(shù)目(近似值)。
由上述接口方法釋義我們可以了解到,TransferQueue 系的隊列支持在兩個線程之間直接交換數(shù)據(jù),而無需先將數(shù)據(jù)落地存儲到隊列中,如果確實需要落地,則線程可以隨數(shù)據(jù)一起在隊列上等待。
LinkedTransferQueue 針對 BlockingQueue 和 TransferQueue 接口中聲明的方法,在實現(xiàn)上均委托給 LinkedTransferQueue#xfer
方法執(zhí)行,該方法也是本小節(jié)將要重點分析的方法。
在開始分析 LinkedTransferQueue#xfer
方法的實現(xiàn)之前,我們先介紹一下 LinkedTransferQueue 的基本字段定義。LinkedTransferQueue 基于 Dual Queue 作為底層存儲結(jié)構(gòu),并定義了 Node 類描述 Dual Queue 上的結(jié)點,字段 LinkedTransferQueue#head
和 LinkedTransferQueue#tail
分別指向底層隊列的頭結(jié)點和尾結(jié)點。
Node 類的字段定義如下:
static final class Node { /** 標識當前結(jié)點是一個數(shù)據(jù)結(jié)點,還是一個請求結(jié)點 */ final boolean isData; // false if this is a request node /** * 存放數(shù)據(jù),并標識匹配狀態(tài): * - 對于請求結(jié)點初始為 null,匹配之后指向自己 * - 對于數(shù)據(jù)結(jié)點初始為 data,匹配之后為 null */ volatile Object item; // initially non-null if isData; CASed to match /** 后繼指針 */ volatile Node next; /** 記錄在當前結(jié)點上等待的線程對象 */ volatile Thread waiter; // null until waiting // ... 省略方法定義 }
LinkedTransferQueue 中的結(jié)點分為 數(shù)據(jù)結(jié)點和 請求結(jié)點兩類,可以簡單將數(shù)據(jù)結(jié)點理解為生產(chǎn)者結(jié)點,將請求結(jié)點理解為消費者結(jié)點。Node 類通過 Node#isData
字段標記一個結(jié)點是數(shù)據(jù)結(jié)點還是請求結(jié)點,并通過 Node#item
字段承載數(shù)據(jù)和標識對應(yīng)結(jié)點的匹配狀態(tài)。下表展示了數(shù)據(jù)結(jié)點和請求結(jié)點在匹配前后,字段 Node#item
的變化:
結(jié)點類型 | 數(shù)據(jù)結(jié)點 | 請求結(jié)點 |
---|---|---|
匹配前 | isData = true; item != null | isData = false; item = null |
匹配后 | isData = true; item = null | isData = false; item = this |
注意:當一個結(jié)點被取消后,該結(jié)點的 Node#item
字段同樣指向結(jié)點自己。
由上述表格我們可以設(shè)計一個判斷結(jié)點是否已經(jīng)匹配的方法,如下:
// Node#isMatched final boolean isMatched() { Object x = item; return (x == this) || ((x == null) == isData); }
如果一個結(jié)點的 item 字段指向自己(即 x == this
),說明該結(jié)點被取消,或者對于請求結(jié)點而言,該結(jié)點已經(jīng)被匹配,否則我們就可以繼續(xù)執(zhí)行 (x == null) == isData
進行判斷,具體如下:
如果當前結(jié)點是數(shù)據(jù)結(jié)點(即 isData = true
),如果該結(jié)點被匹配則結(jié)點的 item 應(yīng)該為 null,所以滿足 (x == null) == isData
。
如果當前結(jié)點是請求結(jié)點(即 isData = false
),如果該結(jié)點被匹配則結(jié)點的 item 應(yīng)該不為 null,所以滿足 (x == null) == isData
。
接下來我們開始分析 LinkedTransferQueue#xfer
方法的實現(xiàn),首先來看一下方法的參數(shù)定義,如下:
private E xfer(E e, boolean haveData, int how, long nanos) { // ... 省略方法實現(xiàn) }
其中參數(shù) e 表示待添加的元素值,如果是出隊列操作,則為 null;參數(shù) haveData 用于指定當前是入隊列操作還是出隊列操作,如果是入隊列則 haveData 為 true,否則為 false;參數(shù) how 對應(yīng)當前的操作模式,分為:NOW、ASYNC、SYNC,以及 TIMED,如果是 TIMED 模式,則參數(shù) nanos 用于指定當前等待的納秒值。
下面進一步介紹一下 how 參數(shù),我們知道 LinkedTransferQueue 的隊列操作方法基本上都是直接委托給 LinkedTransferQueue#xfer
方法執(zhí)行,而參數(shù) how 則用于控制在不同調(diào)用場景下該方法的運行邏輯。LinkedTransferQueue 定義了 4 個 int 類型常量,分別表示不同的操作模式,如下:
private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer
針對各個模式的含義說明如下:
NOW:當隊列中沒有匹配的結(jié)點時立即返回而不等待,例如當生產(chǎn)者執(zhí)行入隊列操作時,如果隊列中沒有正在等待的消費者則立即返回。
ASYNC:當隊列中沒有匹配的結(jié)點時將元素入隊列,但是當前線程本身并不等待而是立即返回,主要用于入隊列操作。
SYNC:當隊列中沒有匹配的結(jié)點時將元素入隊列,并且當前線程會依附在對應(yīng)結(jié)點上無限期等待。
TIMED:當隊列中沒有匹配的結(jié)點時將元素入隊列,并且當前線程會依附在對應(yīng)結(jié)點上超時等待。
LinkedTransferQueue 實現(xiàn)的主要入隊列和出隊列方法在委托執(zhí)行 LinkedTransferQueue#xfer
方法時的參數(shù)值設(shè)置如下表:
方法 | e | haveData | how | nanos |
---|---|---|---|---|
LinkedTransferQueue#put | e | true | ASYNC | 0 |
LinkedTransferQueue#add | e | true | ASYNC | 0 |
LinkedTransferQueue#offer(E) | e | true | ASYNC | 0 |
LinkedTransferQueue#offer(E, long, TimeUnit) | e | true | ASYNC | 0 |
LinkedTransferQueue#take | null | false | SYNC | 0 |
LinkedTransferQueue#poll() | null | false | NOW | 0 |
LinkedTransferQueue#poll(long, TimeUnit) | null | false | TIMED | timeout |
LinkedTransferQueue#transfer | e | true | SYNC | 0 |
LinkedTransferQueue#tryTransfer(E) | e | true | NOW | 0 |
LinkedTransferQueue#tryTransfer(E, long, TimeUnit) | e | true | TIMED | timeout |
下面開始分析方法 LinkedTransferQueue#xfer
的實現(xiàn),如下:
private E xfer(E e, boolean haveData, int how, long nanos) { // 如果是入隊列操作,則不允許待添加元素值為 null if (haveData && (e == null)) { throw new NullPointerException(); } // the node to append, if needed Node s = null; retry: for (; ; ) { // restart on append race /* 1. Try to match an existing node */ // 從頭開始遍歷隊列,對第一個未匹配的結(jié)點執(zhí)行匹配操作 for (Node h = head, p = h; p != null; ) { // find & match first node boolean isData = p.isData; Object item = p.item; // 找到第一個未匹配且未被取消的結(jié)點 if (item != p && (item != null) == isData) { // unmatched // 結(jié)點模式與本次操作模式一致,無法匹配,退出循環(huán)并進入下一步 if (isData == haveData) { // can't match break; } // 模式互補,執(zhí)行匹配操作,將匹配結(jié)點 p 的 item 值修改為 e // 如果 item 為 null,則 e 為 data,如果 item 為 data,則 e 為 null if (p.casItem(item, e)) { // 匹配成功 // 如果當前被匹配的結(jié)點不是 head 結(jié)點,需要更新 head 指針,保證松弛度小于 2 for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton // 更新 head 為匹配結(jié)點 p 的 next 結(jié)點,如果 next 結(jié)點為 null 則更新為當前匹配結(jié)點 if (head == h && this.casHead(h, n == null ? q : n)) { // 將之前的 head 結(jié)點自引用,等待 GC h.forgetNext(); break; } // 如果松弛度(slack)小于 2,則退出循環(huán),否則繼續(xù)循環(huán)后移 head 指針 if ((h = head) == null || (q = h.next) == null || !q.isMatched()) { break; // unless slack < 2 } } // 喚醒在剛剛完成匹配結(jié)點上等待的線程 LockSupport.unpark(p.waiter); return cast(item); } } // 結(jié)點已被其它線程匹配,繼續(xù)往后遍歷尋找下一個可匹配結(jié)點 Node n = p.next; p = (p != n) ? n : (h = head); // 如果 p 已經(jīng)脫離隊列,則從 head 開始尋找 } // end of for // 未找到可以匹配的結(jié)點,將當前結(jié)點添加到隊列末端 if (how != NOW) { // 上游函數(shù)不期望立即返回 if (s == null) { s = new Node(e, haveData); } /* 2. Try to append a new node */ // 將結(jié)點 s 添加到隊列末端,如果成功則返回 s 的前驅(qū)結(jié)點 Node pred = this.tryAppend(s, haveData); // 返回 null 說明結(jié)點 s 入隊列失敗,重試 if (pred == null) { continue retry; // lost race vs opposite mode } // 阻塞(或自旋)等待匹配 if (how != ASYNC) { /* 3. Await match or cancellation */ return this.awaitMatch(s, pred, e, (how == TIMED), nanos); } } return e; // not waiting } }
由上述實現(xiàn)可以看出,整個 LinkedTransferQueue#xfer
方法的執(zhí)行分為 3 個階段(已在代碼中標出),針對各個階段的說明作者在文檔中已經(jīng)給出了概述,這里直接摘錄作者的原話:
Try to match an existing node;
Try to append a new node;
Await match or cancellation.
也就是說當一個線程進入 LinkedTransferQueue#xfer
方法時,第 1 步會嘗試在隊列中尋找可以匹配的結(jié)點,如果存在則執(zhí)行匹配操作;否則如果上游方法不期望立即返回(即不為 NOW 操作模式)則執(zhí)行第 2 步,將當前元素添加到隊列中;如果上游方法允許當前線程等待(即不為 ASYNC 操作模式),則進入等待狀態(tài),也就是第 3 步。
下面我們分步驟對這 3 個階段逐一進行分析,首先來看 步驟 1,作者對這一步的詳細概述摘錄如下:
Try to match an existing node
Starting at head, skip already-matched nodes until finding an unmatched node of opposite mode, if one exists, in which case matching it and returning, also if necessary updating head to one past the matched node (or the node itself if the list has no other unmatched nodes). If the CAS misses, then a loop retries advancing head by two steps until either success or the slack is at most two. By requiring that each attempt advances head by two (if applicable), we ensure that the slack does not grow without bound. Traversals also check if the initial head is now off-list, in which case they start at the new head.
If no candidates are found and the call was untimed poll/offer, (argument "how" is NOW) return.
這一步的核心邏輯在于從隊列中尋找可以匹配的結(jié)點,并執(zhí)行匹配操作,具體執(zhí)行流程概括為:
從隊列頭部開始遍歷隊列,尋找第一個未被取消且未被匹配的結(jié)點 p,如果存在則進入匹配進程;
校驗結(jié)點 p 的模式是否與當前操作模式互補,如果相同則無法匹配,需要轉(zhuǎn)而執(zhí)行步驟 2,將當前結(jié)點添加到隊列末端;
否則,基于 CAS 修改結(jié)點 p 的 item 值(如果是請求結(jié)點,則更新 item 為元素值 e;如果是數(shù)據(jù)結(jié)點,則更新 item 為 null),即執(zhí)行匹配操作;
如果匹配失敗,則說明存在其它線程先于完成了匹配操作,繼續(xù)往后尋找下一個可以匹配的結(jié)點;
如果匹配成功,則嘗試后移 head 指針,保證 head 結(jié)點的松弛度小于 2,并喚醒在匹配結(jié)點上阻塞的線程,最后返回本次匹配結(jié)點的 item 值。
下面利用圖示演示上述執(zhí)行流程,其中黃色表示消費者結(jié)點,青色表示生產(chǎn)者結(jié)點(M 表示已匹配,U 表示未匹配),紅色表示當前匹配結(jié)點。假設(shè)當前操作是一個消費者線程,則從隊列頭部開始往后尋找第一個未被取消且未被匹配的結(jié)點,此時各指針的指向如下圖 1 所示。在執(zhí)行完幾輪循環(huán)之后,當前線程在隊列上找到了第一個可以匹配的結(jié)點 p,如下圖 2 所示。然后執(zhí)行匹配操作,基于 CAS 嘗試將待匹配結(jié)點 p 的 item 值修改為 null,如下圖 3 所示。
接下來線程會進入最內(nèi)側(cè) for 循環(huán),嘗試后移 head 指針,以保證 head 結(jié)點的松弛度小于 2,如果期間正好有另外一個線程更新了 head 指針的指向,此時各指針的指向如上圖 4 所示。此時 head 指針與 h 指針指向不同,所以繼續(xù)執(zhí)行最內(nèi)側(cè) for 循環(huán)的第二個 if 判斷,執(zhí)行完后各個指針的指向如上圖 5 所示。此時因為指針 q 所指向的結(jié)點已經(jīng)完成匹配,所以繼續(xù)進入下一輪最內(nèi)側(cè) for 循環(huán),此時滿足最內(nèi)側(cè) for 循環(huán)的第一個 if 判斷,基于 CAS 更新 head 指針,并將之前 head 結(jié)點的 next 指針指向自己(自引用),等待 GC 回收,如上圖 6 所示。最后喚醒在本次匹配結(jié)點上等待的線程,并返回。
如果上述步驟沒有找到可以匹配的結(jié)點,則嘗試為當前元素構(gòu)造一個新的結(jié)點并插入到隊列中,即執(zhí)行 步驟 2,作者對這一步的詳細概述摘錄如下:
Try to append a new node
Starting at current tail pointer, find the actual last node and try to append a new node (or if head was null, establish the first node). Nodes can be appended only if their predecessors are either already matched or are of the same mode. If we detect otherwise, then a new node with opposite mode must have been appended during traversal, so we must restart at phase 1. The traversal and update steps are otherwise similar to phase 1: Retrying upon CAS misses and checking for staleness. In particular, if a self-link is encountered, then we can safely jump to a node on the list by continuing the traversal at current head.
On successful append, if the call was ASYNC, return.
如果當前操作模式為 NOW,則說明上游方法要求當隊列中不存在可以匹配的結(jié)點時立即返回,則不執(zhí)行本步驟,否則執(zhí)行 LinkedTransferQueue#tryAppend
方法嘗試將當前結(jié)點 s 入隊列。該方法在執(zhí)行失敗的情況下會返回 null,否則返回新添加結(jié)點 s 的前驅(qū)結(jié)點,如果沒有前驅(qū)結(jié)點則返回結(jié)點 s 自己。
方法 LinkedTransferQueue#tryAppend
的實現(xiàn)如下:
private Node tryAppend(Node s, boolean haveData) { // 嘗試將結(jié)點 s 入隊列 for (Node t = tail, p = t; ; ) { // move p to last node and append Node n, u; // temps for reads of next & tail // 當前隊列為空 if (p == null && (p = head) == null) { // 1 // 直接將結(jié)點 s 設(shè)置為 head,并返回 s 結(jié)點 if (this.casHead(null, s)) { return s; // initialize } } // 結(jié)點 s 不能作為結(jié)點 p 的后繼結(jié)點,因為 p 和 s 的模式互補,且 p 未匹配 else if (p.cannotPrecede(haveData)) { // 2 return null; // lost race vs opposite mode } // p 已經(jīng)不是最新的尾結(jié)點,更新 else if ((n = p.next) != null) { // 3 // not last; keep traversing p = p != t && t != (u = tail) ? (t = u) // stale tail : (p != n) ? n : null; // restart if off list } // 結(jié)點 s 入隊列失敗,說明 p 未指向最新的尾結(jié)點 else if (!p.casNext(null, s)) { // 4 p = p.next; // re-read on CAS failure } // 將結(jié)點 s 入隊列成功,后移 tail 指針,保證松弛度小于 2 else { // 5 if (p != t) { // update if slack now >= 2 while ((tail != t || !this.casTail(t, s)) // 后移 tail 指針 && (t = tail) != null && (s = t.next) != null // advance and retry && (s = s.next) != null && s != t) { } } return p; } } }
這一步的核心邏輯在于將結(jié)點 s 入隊列,并在 tail 結(jié)點松弛度較大時后移 tail 指針。具體執(zhí)行流程概括為:
如果隊列為空,則直接將結(jié)點 s 入隊列,并返回結(jié)點 s 對象;
否則,校驗結(jié)點 s 能否入隊列,如果前驅(qū)結(jié)點與結(jié)點 s 模式互補且未匹配,則不能入隊列,此時直接返回 null 并退回步驟 1 開始執(zhí)行;
如果結(jié)點 s 可以入隊列,則尋找隊列當前真正的 tail 結(jié)點,并將結(jié)點 s 作為后繼結(jié)點入隊列;
如果入隊列失敗,則說明前驅(qū)結(jié)點不是最新的隊列 tail 結(jié)點,繼續(xù)進入下一輪循環(huán)重試;
如果入隊列成功,則判斷 tail 結(jié)點的松弛度是否較大,如果較大則后移 tail 指針,以降低 tail 結(jié)點的松弛度。
下面利用圖示演示上述執(zhí)行流程。假設(shè)當前操作是一個生產(chǎn)者線程,期望向隊列插入一個元素值為 5 的結(jié)點,并且隊列中存在的都是未匹配的生產(chǎn)者結(jié)點,如下圖 1 所示。此時隊列不為空,且結(jié)點 s 可以入隊列,此時各指針指向如下圖 2 所示。因為結(jié)點 p 的 next 結(jié)點不為 null,說明 p 未指向最新的 tail 結(jié)點,需要后移 p、t 和 n 指針,直到 p 指向 tail 結(jié)點,如下圖 3、4 和 5 所示。
接下來執(zhí)行代碼 4,基于 CAS 嘗試將 p 結(jié)點的 next 結(jié)點由 null 更新為 s,即將結(jié)點 s 入隊列,如上圖 6 所示。如果入隊列成功,則繼續(xù)執(zhí)行代碼 5,后移 tail 指針,保證 tail 結(jié)點的松弛度小于 2,最后返回結(jié)點 s 的前驅(qū)結(jié)點,如上圖 7 和 8 所示。
最后來看 步驟 3,作者對這一步的詳細概述摘錄如下:
Await match or cancellation
Wait for another thread to match node; instead cancelling if the current thread was interrupted or the wait timed out. On multiprocessors, we use front-of-queue spinning: If a node appears to be the first unmatched node in the queue, it spins a bit before blocking. In either case, before blocking it tries to unsplice any nodes between the current "head" and the first unmatched node.
Front-of-queue spinning vastly improves performance of heavily contended queues. And so long as it is relatively brief and "quiet", spinning does not much impact performance of less-contended queues. During spins threads check their interrupt status and generate a thread-local random number to decide to occasionally perform a Thread.yield. While yield has underdefined specs, we assume that it might help, and will not hurt, in limiting impact of spinning on busy systems. We also use smaller (1/2) spins for nodes that are not known to be front but whose predecessors have not blocked -- these "chained" spins avoid artifacts of front-of-queue rules which otherwise lead to alternating nodes spinning vs blocking. Further, front threads that represent phase changes (from data to request node or vice versa) compared to their predecessors receive additional chained spins, reflecting longer paths typically required to unblock threads during phase changes.
如果當前操作模式為 ASYNC,則說明上游方法要求線程在完成入隊列操作之后不阻塞等待,而是立即返回。對于其它操作模式(除 NOW 和 ASYNC 以外)則需要執(zhí)行 LinkedTransferQueue#awaitMatch
方法讓當前線程依附在剛剛?cè)腙犃械慕Y(jié)點上等待。如果是 TIMED 操作模式,則執(zhí)行超時等待,否則執(zhí)行無限期等待,期間支持響應(yīng)中斷。
方法 LinkedTransferQueue#awaitMatch
實現(xiàn)如下:
private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { // 如果設(shè)置超時,則計算到期時間戳 final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); int spins = -1; // initialized after first item and cancel checks ThreadLocalRandom randomYields = null; // bound if needed for (; ; ) { Object item = s.item; // 當前結(jié)點已匹配 if (item != e) { // matched s.forgetContents(); // avoid garbage return cast(item); } // 線程被中斷,或者等待超時,則取消 if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // 將結(jié)點的 item 指向結(jié)點自己,表示取消 // 移除結(jié)點 s this.unsplice(pred, s); return e; } // 初始化自旋次數(shù) if (spins < 0) { // establish spins at/near front // 依據(jù)前驅(qū)結(jié)點的狀態(tài)計算當前結(jié)點的自旋次數(shù) if ((spins = spinsFor(pred, s.isData)) > 0) { randomYields = ThreadLocalRandom.current(); } } // 在阻塞之前先自旋幾次 else if (spins > 0) { // spin --spins; if (randomYields.nextInt(CHAINED_SPINS) == 0) { // 隨機讓步 Thread.yield(); // occasionally yield } } // 將當前線程對象綁定到 s 結(jié)點上 else if (s.waiter == null) { s.waiter = w; // request unpark then recheck } // 如果設(shè)置了超時,則超時等待 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos > 0L) { LockSupport.parkNanos(this, nanos); } } // 如果未設(shè)置超時,則無限期等待 else { LockSupport.park(this); } } }
可以看到在線程進入阻塞狀態(tài)之前會先自旋幾次,這樣主要是為了提升 LinkedTransferQueue 在多核 CPU 上的性能,在入隊列和出隊列比較頻繁的場景下避免線程不必要的阻塞和喚醒操作。上述方法的實現(xiàn)與上一篇介紹 SynchronousQueue 中的 TransferStack#awaitFulfill
方法的執(zhí)行過程基本一致。
到此,相信大家對“JUC的LinkedTransferQueue怎么使用”有了更深的了解,不妨來實際操作一番吧!這里是創(chuàng)新互聯(lián)網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!