[TOC]
創(chuàng)新互聯(lián)公司堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都做網(wǎng)站、網(wǎng)站建設(shè)、外貿(mào)營(yíng)銷(xiāo)網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿(mǎn)足客戶(hù)于互聯(lián)網(wǎng)時(shí)代的泗縣網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!PriorityBlockingQueue 是一個(gè)支持優(yōu)先級(jí)的×××阻塞隊(duì)列,數(shù)據(jù)結(jié)構(gòu)采用的是最小堆是通過(guò)一個(gè)數(shù)組實(shí)現(xiàn)的,隊(duì)列默認(rèn)采用自然排序的升序排序,如果需要自定義排序,需要在構(gòu)造隊(duì)列時(shí)指定Comparetor比較器,隊(duì)列也是使用ReentrantLock鎖來(lái)實(shí)現(xiàn)的同步機(jī)制。
// 數(shù)組的大容量 2^31 - 8
private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
// 二叉堆數(shù)組
private transient Object[] queue;
// 總數(shù)
private transient int size;
// /默認(rèn)比較器
private transient Comparator super E> comparator;
// 鎖
private final ReentrantLock lock;
// 為空隊(duì)列
private final Condition notEmpty;
// 自旋鎖,在數(shù)組擴(kuò)容時(shí)使用
private transient volatile int allocationSpinLock;
注意:這里解釋下這個(gè)Integer.MAX_VALUE - 8,為什么數(shù)組的大長(zhǎng)度是這么多了,這其實(shí)和int的大值有關(guān),大值就是(1 << 32) -1 ,大家有沒(méi)有發(fā)現(xiàn)數(shù)組的長(zhǎng)度類(lèi)型是int,為什么是int了???我也不知道,我也試了其它數(shù)據(jù)類(lèi)型發(fā)現(xiàn)數(shù)組的長(zhǎng)度必須是int類(lèi)型,哈哈,所以也可以理解為什么是大值了,至于為什么要減八了,是因?yàn)閯?chuàng)建數(shù)組本身的信息(對(duì)象頭,class信息?。┮彩切枰鎯?chǔ)空間的,所以需要這8位的空間。
public boolean add(E e) {
return offer(e);
}
由于是×××隊(duì)列所以put方法不會(huì)阻塞,也是直接調(diào)用了offer方法.
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e); // never need to block
}
// 添加元素
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
// size大于等于數(shù)組的長(zhǎng)度
while ((n = size) >= (cap = (array = queue).length))
// 擴(kuò)容
tryGrow(array, cap);
try {
Comparator super E> cmp = comparator;
if (cmp == null) // 默認(rèn)排序
siftUpComparable(n, e, array);
else // 自定義排序
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
這里我們主要分析下offer方法里面的兩個(gè)重要方法,擴(kuò)容和入隊(duì),tryGrow,siftUpComparable方法。
// 擴(kuò)容方法
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
// 只允許一個(gè)線(xiàn)程去擴(kuò)容
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
// oldCap小于64 就加2 ,小于等于64就擴(kuò)容50%
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
// 不可以超過(guò)MAX_ARRAY_SIZE
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield(); // 擴(kuò)容獲取鎖失敗的線(xiàn)程,盡量讓出cpu
lock.lock(); // 重新獲取鎖
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
分析擴(kuò)容:
最小堆的構(gòu)建
// 保證了每條鏈的順序小到大
private static void siftUpComparable(int k, T x, Object[] array) {
Comparable super T> key = (Comparable super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
分析:
先得解釋下(k - 1) >>> 1,就是求的商,我們來(lái)模擬插入五個(gè)數(shù)吧,默認(rèn)容量是11.
第一次插入一個(gè)1,此時(shí)的k是0,x是1,k不大于0,直接插入。
索引 | 0 | ||||
---|---|---|---|---|---|
值 | 1 |
第二次我們插入一個(gè)0,此時(shí)的k是1,x是0,parent是0,然后獲取0位置索引的值和現(xiàn)在的比較,現(xiàn)在其實(shí)是不大于0的,所以此時(shí)交換了位置,array[k] = e; k = parent;parent是0,所以結(jié)束循環(huán)然后在0的位置設(shè)置當(dāng)前x是1。
索引 | 0 | 1 | |||
---|---|---|---|---|---|
值 | 0 | 1 |
第三次我們插入一個(gè)5,此時(shí)的k是2,x是5,parent 是0,然后獲取0位置的值和插入值標(biāo)記,發(fā)現(xiàn)是大于0的所以直接插入,在2的位置插入5。
索引 | 0 | 1 | 2 | ||
---|---|---|---|---|---|
值 | 0 | 1 | 5 |
第四次我們插入一個(gè)4,此時(shí)的k是3,x是4,parent是1,然后獲取1位置的值和插入值比較,發(fā)現(xiàn)是大于0的,所以直接插入在3的位置插入。
索引 | 0 | 1 | 2 | 3 | |
---|---|---|---|---|---|
值 | 0 | 1 | 5 | 4 |
第五次我們插入一個(gè)3,此時(shí)的k是4,x是3,parent是1,然后獲取1位置值和插入值做比較,發(fā)現(xiàn)大于0的,所以直接在4的位置插入。
索引 | 0 | 1 | 2 | 3 | 4 |
---|---|---|---|---|---|
值 | 0 | 1 | 5 | 4 | 3 |
我們用一個(gè)圖來(lái)描繪下這個(gè)數(shù)組,怎么出現(xiàn)的這個(gè)圖了,我們發(fā)現(xiàn)每次插入的數(shù)的索引就是數(shù)組的長(zhǎng)度,然后通過(guò)(i - 1)>>> 2 = n求父節(jié)點(diǎn),通過(guò)比較和父節(jié)點(diǎn)比較確認(rèn)自己的位置,左右子節(jié)點(diǎn)其實(shí)就是2n+1,2n+2,左右子節(jié)點(diǎn)就是數(shù)組的相鄰元素,我們發(fā)現(xiàn)子節(jié)點(diǎn)一定比父節(jié)點(diǎn)大,這就是最小堆;每次插入一個(gè)元素都是從最底層向上冒泡,維護(hù)最小堆的次序。
調(diào)用了 dequeue方法。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 彈出根節(jié)點(diǎn)
return dequeue();
} finally {
lock.unlock();
}
}
// 帶超時(shí)時(shí)間
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}
也是調(diào)用了dequeue方法,這個(gè)方法支持線(xiàn)程的中斷。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
private E dequeue() {
int n = size - 1;
// 隊(duì)列還沒(méi)有初始化
if (n < 0)
return null;
else {
Object[] array = queue;
// 獲取根節(jié)點(diǎn)
E result = (E) array[0];
// 獲取尾節(jié)點(diǎn)
E x = (E) array[n];
// 尾節(jié)點(diǎn)置位null
array[n] = null;
Comparator super E> cmp = comparator;
// 重新排序最小堆
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
其實(shí)上面就是返回了根節(jié)點(diǎn),然后獲取尾節(jié)點(diǎn)放在根節(jié)點(diǎn)的位置調(diào)整最小堆請(qǐng)看siftDownComparable方法。
private static void siftDownComparable(int k, T x, Object[] array,
int n) {
// n是數(shù)組的大索引 k開(kāi)始是0 x就是尾節(jié)點(diǎn)的值
if (n > 0) {
// x是最后一個(gè)節(jié)點(diǎn)的值
Comparable super T> key = (Comparable super T>)x;
int half = n >>> 1; // 最后一個(gè)節(jié)點(diǎn)的父節(jié)點(diǎn) // loop while a non-leaf
while (k < half) { // k是頭節(jié)點(diǎn) k> 了 說(shuō)明到最后了
int child = (k << 1) + 1; // assume left child is least // 左子節(jié)點(diǎn)
Object c = array[child]; // 左節(jié)點(diǎn)的值
int right = child + 1; // 右子節(jié)點(diǎn)
if (right < n && // 左子節(jié)點(diǎn)大于由子節(jié)點(diǎn)
((Comparable super T>) c).compareTo((T) array[right]) > 0)
c = array[child = right]; // c就是右子節(jié)點(diǎn)
if (key.compareTo((T) c) <= 0) // 找到了子節(jié)點(diǎn)比自己大的
break;
array[k] = c;
k = child;
}
array[k] = key;
}
}
分析:
我們上圖的5個(gè)元素為例,進(jìn)行一次出隊(duì)操作。
我們調(diào)用siftDownComparable 方法調(diào)整最小堆,我們看下參數(shù),此時(shí)的k是0,x是3,array就是這個(gè)數(shù)組,n就是4,key就是3,然后算half(half可以理解為堆中父節(jié)點(diǎn)大索引位置,找到這個(gè)節(jié)點(diǎn)說(shuō)明已經(jīng)沒(méi)有子節(jié)點(diǎn)了),half = 2。
說(shuō)下調(diào)整最小堆的過(guò)程,其實(shí)就是從根節(jié)點(diǎn)開(kāi)始,重新構(gòu)建父節(jié)點(diǎn)的過(guò)程,不過(guò)不是每個(gè)都需要重新構(gòu)建,只需要構(gòu)造子節(jié)點(diǎn)小的那邊的的父節(jié)點(diǎn),因?yàn)樾〉墓?jié)點(diǎn)都去頂替原來(lái)的父節(jié)點(diǎn)了;我們彈出的是根節(jié)點(diǎn),所以要從他的左右子節(jié)點(diǎn)找個(gè)根節(jié)點(diǎn)(但是要滿(mǎn)足子節(jié)點(diǎn)大于父節(jié)點(diǎn)的規(guī)則),那么左右子節(jié)點(diǎn)有一個(gè)去當(dāng)父節(jié)點(diǎn)了,它的位置也需要有節(jié)點(diǎn)代替,所以又從他的子節(jié)點(diǎn)開(kāi)始找接替的節(jié)點(diǎn),以此類(lèi)推,直到找到最后一個(gè)父節(jié)點(diǎn)的位置。
使用了鎖,這個(gè)是精確的值。
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
PriorityBlockingQueue 是一個(gè)wujie的隊(duì)列,使用put方法不會(huì)阻塞,使用時(shí)一定要注意內(nèi)存溢出的問(wèn)題;整個(gè)隊(duì)列的出隊(duì)和入隊(duì)都是通過(guò)最小堆來(lái)實(shí)現(xiàn)的,理解最小堆是這個(gè)隊(duì)列的關(guān)鍵;這個(gè)一個(gè)優(yōu)先級(jí)的隊(duì)列,適合有優(yōu)先級(jí)的場(chǎng)景。
參考《Java 并發(fā)編程的藝術(shù)》
創(chuàng)新互聯(lián)www.cdcxhl.cn,專(zhuān)業(yè)提供香港、美國(guó)云服務(wù)器,動(dòng)態(tài)BGP最優(yōu)骨干路由自動(dòng)選擇,持續(xù)穩(wěn)定高效的網(wǎng)絡(luò)助力業(yè)務(wù)部署。公司持有工信部辦法的idc、isp許可證, 機(jī)房獨(dú)有T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確進(jìn)行流量調(diào)度,確保服務(wù)器高可用性。佳節(jié)活動(dòng)現(xiàn)已開(kāi)啟,新人活動(dòng)云服務(wù)器買(mǎi)多久送多久。