本篇內(nèi)容介紹了“JUC的PriorityBlockingQueue如何使用”的有關(guān)知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細閱讀,能夠?qū)W有所成!
專注于為中小企業(yè)提供成都做網(wǎng)站、成都網(wǎng)站制作服務(wù),電腦端+手機端+微信端的三站合一,更高效的管理,為中小企業(yè)松溪免費做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了上千多家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實現(xiàn)規(guī)模擴充和轉(zhuǎn)變。
PriorityBlockingQueue 底層依賴于數(shù)組作為存儲結(jié)構(gòu),最大容量上限是 Integer.MAX_VALUE - 8
,所以幾乎可以將其視為無界的。同 PriorityQueue 一樣,PriorityBlockingQueue 同樣引入了堆數(shù)據(jù)結(jié)構(gòu)來編排隊列元素的優(yōu)先級,默認使用最小堆結(jié)構(gòu)。
此外,由 Blocking 字樣我們可以推斷出 PriorityBlockingQueue 是一個阻塞隊列。PriorityBlockingQueue 實現(xiàn)自 BlockingQueue 接口,并基于 ReentrantLock 鎖保證線程安全。不過需要注意的一點是,PriorityBlockingQueue 的阻塞僅針對出隊列操作而言,當(dāng)隊列為空時出隊列的線程會阻塞等待其它線程往隊列中添加新的元素。對于入隊列操作來說,因為 PriorityBlockingQueue 定義為無界,所以執(zhí)行入隊列的線程會立即得到響應(yīng),如果隊列底層數(shù)組已滿則該線程會嘗試對底層數(shù)組進行擴容,當(dāng)?shù)讓訑?shù)據(jù)達到容量上限而無法繼續(xù)擴容時會拋出 OOM 異常。
下面先來了解一下 PriorityBlockingQueue 的字段定義,如下:
public class PriorityBlockingQueueextends AbstractQueue implements BlockingQueue , Serializable { /** 隊列默認初始容量 */ private static final int DEFAULT_INITIAL_CAPACITY = 11; /** * 隊列容量上限 * * Some VMs reserve some header words in an array. * Attempts to allocate larger arrays may result in OutOfMemoryError: Requested array size exceeds VM limit */ private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; /** * 存儲隊列元素的數(shù)組,按照最小堆組織 * * Priority queue represented as a balanced binary heap: * the two children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. * The priority queue is ordered by comparator, or by the elements' natural ordering, * if comparator is null: For each node n in the heap and each descendant d of n, n <= d. * The element with the lowest value is in queue[0], assuming the queue is nonempty. */ private transient Object[] queue; /** 隊列中元素個數(shù) */ private transient int size; /** 隊列元素比較器,如果為 null 則使用元素自帶的比較器 */ private transient Comparator super E> comparator; /** 保證隊列操作線程安全的可重入獨占鎖 */ private final ReentrantLock lock; /** 記錄因為隊列為空而阻塞的線程 */ private final Condition notEmpty; /** * 擴容標(biāo)記位,保證同一時間只有一個線程在擴容隊列,狀態(tài)為 0 或 1: * - 0: 表示當(dāng)前沒有在執(zhí)行擴容操作 * - 1: 表示當(dāng)前正在執(zhí)行擴容操作 */ private transient volatile int allocationSpinLock; /** 輔助支持序列化和反序列化 */ private PriorityQueue q; // ... 省略方法實現(xiàn) }
PriorityBlockingQueue 默認初始時的底層數(shù)組大小設(shè)置為 11,并在元素已滿時觸發(fā)擴容操作,字段 PriorityBlockingQueue#allocationSpinLock
用于控制同一時間只有一個線程在執(zhí)行擴容。當(dāng)某個線程檢測到當(dāng)前底層數(shù)組已滿時會基于 CAS 操作嘗試將該字段值由 0 改為 1,然后開始執(zhí)行擴容,并在完成之后重置該標(biāo)記字段。
字段 PriorityBlockingQueue#comparator
用于指定元素比較器以判定隊列元素的優(yōu)先級,如果該字段為 null,則 PriorityBlockingQueue 會基于元素自帶的比較器排列優(yōu)先級。對于基本類型而言則參考元素的自然順序,對于自定義對象來說,需要保證這些對象實現(xiàn)了 java.lang.Comparable
接口,否則會拋出 ClassCastException 異常。
PriorityBlockingQueue 實現(xiàn)自 BlockingQueue 接口,下面針對核心方法的實現(xiàn)逐一進行分析。
針對添加元素的操作,PriorityBlockingQueue 實現(xiàn)了 PriorityBlockingQueue#offer
、PriorityBlockingQueue#add
和 PriorityBlockingQueue#put
方法,不過后兩者都是直接調(diào)用了 PriorityBlockingQueue#offer
方法。
此外,該方法的超時版本 PriorityBlockingQueue#offer(E, long, TimeUnit)
也是直接委托給 PriorityBlockingQueue#offer
方法執(zhí)行,并沒有真正實現(xiàn)超時等待機制,這主要是因為 PriorityBlockingQueue 是無界的,所有的添加操作都能夠被立即響應(yīng),而不會阻塞。
下面展開分析一下 PriorityBlockingQueue#offer
方法的實現(xiàn),如下:
public boolean offer(E e) { // 待添加元素不能為 null if (e == null) { throw new NullPointerException(); } final ReentrantLock lock = this.lock; // 加鎖 lock.lock(); int n, cap; Object[] array; // 如果隊列中的元素個數(shù)大于等于隊列的容量,則執(zhí)行擴容操作 while ((n = size) >= (cap = (array = queue).length)) { this.tryGrow(array, cap); // 擴容 } try { // 將待添加元素插入到堆的合適位置(最小堆) Comparator super E> cmp = comparator; if (cmp == null) { siftUpComparable(n, e, array); } else { // 自定義比較器 siftUpUsingComparator(n, e, array, cmp); } // 結(jié)點計數(shù)加 1 size = n + 1; // 喚醒一個之前因為隊列為空而阻塞的線程 notEmpty.signal(); } finally { // 釋放鎖 lock.unlock(); } return true; }
PriorityBlockingQueue 同樣不允許往其中添加 null 元素,如果待添加的元素值合法則執(zhí)行:
加鎖,保證同一時間只有一個線程在操作隊列;
判斷隊列是否已滿,如果是則執(zhí)行擴容操作;
將元素基于最小堆數(shù)據(jù)結(jié)構(gòu)的約束插入到底層數(shù)據(jù)的合適位置;
隊列結(jié)點計數(shù)加 1;
因為當(dāng)前隊列至少包含一個元素,所以嘗試喚醒一個之前因為隊列為空而阻塞的線程;
釋放鎖并返回。
繼續(xù)來看一下上述步驟中的擴容過程,實現(xiàn)位于 PriorityBlockingQueue#tryGrow
方法中,如下:
private void tryGrow(Object[] array, int oldCap) { // 擴容之前,先釋放鎖,避免擴容期間阻塞其它線程的出隊列、入隊列操作 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; if (allocationSpinLock == 0 && // 基于 CAS 操作將擴容標(biāo)記位由 0 改為 1 UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { // 如果當(dāng)前隊列長度小于 64,則擴容為 2(n + 1),否則擴容為 (1 + 1/2)n int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : (oldCap >> 1)); // grow faster if small // 避免隊列容量超過允許上限 if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) { throw new OutOfMemoryError(); } newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) { newArray = new Object[newCap]; } } finally { // 重置擴容標(biāo)記 allocationSpinLock = 0; } } // 當(dāng)前線程擴容失敗,則讓渡其它線程獲取鎖 if (newArray == null) { Thread.yield(); } // 加鎖 lock.lock(); // 替換底層存儲為擴容后的數(shù)組,并復(fù)制元素 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } }
在開始執(zhí)行擴容之前,當(dāng)前線程會釋放持有的鎖,以避免在擴容期間阻塞其它線程的出隊列操作,然后基于 CAS 操作修改擴容標(biāo)記位 PriorityBlockingQueue#allocationSpinLock
,保證同一時間只有一個線程在執(zhí)行擴容。一開始數(shù)組較?。ㄩL度小于 64)時,線程將對底層數(shù)組成倍擴容(即 2(n + 1)
),然后再按照 50% 的比例進行擴容(即 (1 + 1/2) * n
),如果底層數(shù)組已經(jīng)到達容量上限,則會拋出 OOM 異常。
線程在完成擴容操作之后會重置擴容標(biāo)記,如果有線程在競爭 CAS 時失敗則會嘗試讓渡其它線程獲取鎖。這里主要是讓渡給成功完成擴容操作的線程,因為此時擴容操作還未真正完成,該線程需要嘗試獲取鎖以繼續(xù)用擴容后的數(shù)組替換當(dāng)前底層數(shù)組。
繼續(xù)回到 PriorityBlockingQueue#offer
方法,如果擴容操作完成或者本次入隊列操作無需觸發(fā)擴容,則接下去線程會將待添加的元素按照最小堆的約束插入到底層數(shù)據(jù)的合適位置。此時需要區(qū)分兩種情況,如果在構(gòu)造 PriorityBlockingQueue 對象時指定了比較器 Comparator,則會調(diào)用 PriorityBlockingQueue#siftUpUsingComparator
方法基于該比較器執(zhí)行最小堆插入操作,否則調(diào)用 PriorityBlockingQueue#siftUpComparable
方法按照元素的自然順序?qū)?dāng)前元素插入到最小堆中。
基于數(shù)組實現(xiàn)的堆結(jié)構(gòu),在操作上是比較簡單的,讀者可以自行參考源碼,本文不對最小堆 siftUp*
和 siftDown*
操作展開分析。
前面幾篇介紹的隊列都滿足 FIFO 的特性,在執(zhí)行出隊列時返回的都是在隊列中存活時間最長的元素。對于 PriorityBlockingQueue 而言,結(jié)點的順序則按照優(yōu)先級進行編排,所以這里獲取元素的操作返回的是隊列中優(yōu)先級最高的結(jié)點。
針對獲取元素的操作,PriorityBlockingQueue 實現(xiàn)了 PriorityBlockingQueue#poll
、PriorityBlockingQueue#peek
和 PriorityBlockingQueue#take
方法。其中 PriorityBlockingQueue#peek
方法僅獲取最小堆堆頂結(jié)點元素值,而不移除該結(jié)點,實現(xiàn)上比較簡單。方法 PriorityBlockingQueue#take
相對于 PriorityBlockingQueue#poll
的區(qū)別在于,當(dāng)隊列為空時該方法會無限期阻塞,直到有其它線程往隊列中插入新的元素,或者該線程被中斷。實現(xiàn)層面,二者大同小異,所以下面以 PriorityBlockingQueue#poll
方法為例展開分析從 PriorityBlockingQueue 中獲取元素操作的具體實現(xiàn)。
PriorityBlockingQueue 針對 PriorityBlockingQueue#poll
方法定義了兩個版本,區(qū)別在于當(dāng)隊列為空時是立即返回還是阻塞等待一段時間,而在實現(xiàn)思路上是一致的。這里以不帶超時參數(shù)的版本為例展開分析,實現(xiàn)如下:
public E poll() { final ReentrantLock lock = this.lock; // 加鎖 lock.lock(); try { // 出隊列,獲取最小堆堆頂元素值,并移除堆頂結(jié)點,調(diào)整最小堆 return this.dequeue(); } finally { // 釋放鎖 lock.unlock(); } } private E dequeue() { int n = size - 1; if (n < 0) { // 當(dāng)前隊列為空,直接返回 null return null; } else { Object[] array = queue; // 獲取堆頂元素值 E result = (E) array[0]; // 調(diào)整堆的結(jié)構(gòu),以便再次滿足最小堆定義 E x = (E) array[n]; array[n] = null; Comparator super E> cmp = comparator; if (cmp == null) { siftDownComparable(0, x, array, n); } else { // 自定義比較器 siftDownUsingComparator(0, x, array, n, cmp); } // 隊列結(jié)點計數(shù)減 1 size = n; return result; } }
對于優(yōu)先級隊列而言,出隊列操作獲取到的是隊列中優(yōu)先級最高的元素,因為底層依賴于最小堆實現(xiàn),所以只需要移除最小堆堆頂結(jié)點,并返回結(jié)點元素即可。但是因為這樣破壞了堆的結(jié)構(gòu),所以需要調(diào)用 shiftDown*
方法從上往下進行調(diào)整,以再次滿足最小堆結(jié)構(gòu)的約束。
針對移除元素的操作,PriorityBlockingQueue 實現(xiàn)了 PriorityBlockingQueue#remove
方法,并提供了有參和無參的版本,其中無參版本實際上是委托給 PriorityBlockingQueue#poll
方法執(zhí)行的。下面來分析一下有參版本的實現(xiàn),如下:
public boolean remove(Object o) { final ReentrantLock lock = this.lock; // 加鎖 lock.lock(); try { // 獲取待刪除元素的數(shù)組下標(biāo) int i = this.indexOf(o); if (i == -1) { // 不存在 return false; } // 移除元素 this.removeAt(i); return true; } finally { // 釋放鎖 lock.unlock(); } } private void removeAt(int i) { Object[] array = queue; int n = size - 1; // 當(dāng)前移除的是最后一個元素 if (n == i) { // removed last element array[i] = null; } // 當(dāng)前移除的是中間元素 else { // 將數(shù)組最后一個位置置為 null,并調(diào)整堆的結(jié)構(gòu)以滿足最小堆定義 E moved = (E) array[n]; array[n] = null; Comparator super E> cmp = comparator; // 自上而下調(diào)整堆結(jié)構(gòu)以滿足最小堆定義 if (cmp == null) { siftDownComparable(i, moved, array, n); } else { siftDownUsingComparator(i, moved, array, n, cmp); } // 自下而上調(diào)整堆結(jié)構(gòu)以滿足最小堆定義 if (array[i] == moved) { if (cmp == null) { siftUpComparable(i, moved, array); } else { siftUpUsingComparator(i, moved, array, cmp); } } } // 隊列結(jié)點計數(shù)減 1 size = n; }
如果待刪除的元素是優(yōu)先級最低的元素,則只需要將底層數(shù)組末尾結(jié)點置為 null 即可,否則,對于其它優(yōu)先級的元素來說,在執(zhí)行刪除之后需要調(diào)整堆結(jié)構(gòu)以滿足最小堆定義。
方法 PriorityBlockingQueue#contains
接收一個參數(shù),用于判斷隊列中是否包含值等于參數(shù)的結(jié)點。
方法 PriorityBlockingQueue#size
用于返回當(dāng)前隊列中包含的結(jié)點個數(shù),因為 PriorityBlockingQueue 已經(jīng)定義了 PriorityBlockingQueue#size
字段,用于對隊列中的結(jié)點進行計數(shù),所以該方法只需要返回字段值即可。
“JUC的PriorityBlockingQueue如何使用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實用文章!