真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

PriorityBlockingQueue1.8源碼解析-創(chuàng)新互聯(lián)

[TOC]

創(chuàng)新互聯(lián)公司堅(jiān)持“要么做到,要么別承諾”的工作理念,服務(wù)領(lǐng)域包括:成都做網(wǎng)站、網(wǎng)站建設(shè)、外貿(mào)營(yíng)銷(xiāo)網(wǎng)站建設(shè)、企業(yè)官網(wǎng)、英文網(wǎng)站、手機(jī)端網(wǎng)站、網(wǎng)站推廣等服務(wù),滿(mǎn)足客戶(hù)于互聯(lián)網(wǎng)時(shí)代的泗縣網(wǎng)站設(shè)計(jì)、移動(dòng)媒體設(shè)計(jì)的需求,幫助企業(yè)找到有效的互聯(lián)網(wǎng)解決方案。努力成為您成熟可靠的網(wǎng)絡(luò)建設(shè)合作伙伴!

PriorityBlockingQueue 1.8 源碼解析

一,簡(jiǎn)介

PriorityBlockingQueue 是一個(gè)支持優(yōu)先級(jí)的×××阻塞隊(duì)列,數(shù)據(jù)結(jié)構(gòu)采用的是最小堆是通過(guò)一個(gè)數(shù)組實(shí)現(xiàn)的,隊(duì)列默認(rèn)采用自然排序的升序排序,如果需要自定義排序,需要在構(gòu)造隊(duì)列時(shí)指定Comparetor比較器,隊(duì)列也是使用ReentrantLock鎖來(lái)實(shí)現(xiàn)的同步機(jī)制。

二,UML圖

PriorityBlockingQueue 1.8 源碼解析

三,基本成員
// 數(shù)組的大容量 2^31 - 8
    private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

     // 二叉堆數(shù)組
    private transient Object[] queue;

    // 總數(shù)
    private transient int size;

    // /默認(rèn)比較器
    private transient Comparator comparator;

    // 鎖
    private final ReentrantLock lock;

    // 為空隊(duì)列
    private final Condition notEmpty;

    // 自旋鎖,在數(shù)組擴(kuò)容時(shí)使用
    private transient volatile int allocationSpinLock;

注意:這里解釋下這個(gè)Integer.MAX_VALUE - 8,為什么數(shù)組的大長(zhǎng)度是這么多了,這其實(shí)和int的大值有關(guān),大值就是(1 << 32) -1 ,大家有沒(méi)有發(fā)現(xiàn)數(shù)組的長(zhǎng)度類(lèi)型是int,為什么是int了???我也不知道,我也試了其它數(shù)據(jù)類(lèi)型發(fā)現(xiàn)數(shù)組的長(zhǎng)度必須是int類(lèi)型,哈哈,所以也可以理解為什么是大值了,至于為什么要減八了,是因?yàn)閯?chuàng)建數(shù)組本身的信息(對(duì)象頭,class信息?。┮彩切枰鎯?chǔ)空間的,所以需要這8位的空間。

四,常用方法
入隊(duì)方法
add 方法
public boolean add(E e) {
        return offer(e);
    }
put 方法

由于是×××隊(duì)列所以put方法不會(huì)阻塞,也是直接調(diào)用了offer方法.

public void put(E e) {
        offer(e); // never need to block
    }
offer 帶超時(shí)方法
public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e); // never need to block
    }
offer 方法
// 添加元素
    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] array;
        // size大于等于數(shù)組的長(zhǎng)度
        while ((n = size) >= (cap = (array = queue).length))
            // 擴(kuò)容
            tryGrow(array, cap);
        try {
            Comparator cmp = comparator;
            if (cmp == null) // 默認(rèn)排序
                siftUpComparable(n, e, array);
            else // 自定義排序
                siftUpUsingComparator(n, e, array, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }

這里我們主要分析下offer方法里面的兩個(gè)重要方法,擴(kuò)容和入隊(duì),tryGrow,siftUpComparable方法。

tryGrow 方法
// 擴(kuò)容方法
    private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); // must release and then re-acquire main lock
        Object[] newArray = null;
        // 只允許一個(gè)線(xiàn)程去擴(kuò)容
        if (allocationSpinLock == 0 &&
                UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                        0, 1)) {
            try {
                // oldCap小于64  就加2 ,小于等于64就擴(kuò)容50%
                int newCap = oldCap + ((oldCap < 64) ?
                        (oldCap + 2) : // grow faster if small
                        (oldCap >> 1));
                // 不可以超過(guò)MAX_ARRAY_SIZE
                if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];
            } finally {
                allocationSpinLock = 0;
            }
        }
        if (newArray == null) // back off if another thread is allocating
            Thread.yield(); // 擴(kuò)容獲取鎖失敗的線(xiàn)程,盡量讓出cpu
        lock.lock(); // 重新獲取鎖
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);
        }
    }

分析擴(kuò)容:

  • lock.unlock()第一行為什么就要釋放鎖了,因?yàn)槲覀冊(cè)谌腙?duì)之前就獲取到了鎖,如果我們不釋放,那么別的線(xiàn)程無(wú)法入隊(duì)也無(wú)法出隊(duì),這就大大降低了并發(fā)性,釋放鎖,別的線(xiàn)程就可以進(jìn)行入隊(duì)或者出隊(duì)操作。
  • allocationSpinLock 這里為什么要用這樣一個(gè)鎖了,其實(shí)和我們第一行的釋放鎖有關(guān)系,我們?cè)跀U(kuò)容時(shí)釋放了鎖,那就代表了其它線(xiàn)程也可以入隊(duì),但是隊(duì)列滿(mǎn)了,也需要擴(kuò)容,所以這個(gè)鎖就是為了讓擴(kuò)容只有一個(gè)線(xiàn)程來(lái)操作。
  • 在獲取了allocationSpinLock 鎖的線(xiàn)程在擴(kuò)容中,我們發(fā)現(xiàn)其實(shí)只是創(chuàng)建了一個(gè)新的數(shù)組,并沒(méi)有數(shù)據(jù)的遷移啥的,這是為什么了???后面再解釋。
  • newArray == null ,其實(shí)就是別的線(xiàn)程來(lái)爭(zhēng)奪擴(kuò)容失敗,然后盡量讓出執(zhí)行權(quán)(Thread.yield,線(xiàn)程從運(yùn)行中變成就緒狀態(tài)),讓獲取鎖的線(xiàn)程去執(zhí)行,但不是一定的,我們可以模擬一種可能,假設(shè)沒(méi)有讓出執(zhí)行權(quán),然后下一步獲取到了鎖。這時(shí)這個(gè)線(xiàn)程看見(jiàn)的newArray可能是null,所以就繼續(xù)走offer方法的while ((n = size) >= (cap = (array = queue).length))循環(huán),直到擴(kuò)容線(xiàn)程完成對(duì)newArray 的改變。
  • lock.lock() 因?yàn)槲覀冊(cè)跀U(kuò)容前釋放了鎖,允許別的線(xiàn)程對(duì)數(shù)組的操作,所以獲取鎖的一方面目的是為了控制只有一個(gè)線(xiàn)程對(duì)數(shù)組進(jìn)行操作,第二個(gè)目的其實(shí)就是保證數(shù)組的可見(jiàn)性,別的線(xiàn)程可能在擴(kuò)容期間執(zhí)行了出隊(duì)操作,保證下面數(shù)組的拷貝是準(zhǔn)確的數(shù)據(jù)。
siftUpComparable 方法

最小堆的構(gòu)建

// 保證了每條鏈的順序小到大
    private static  void siftUpComparable(int k, T x, Object[] array) {
        Comparable key = (Comparable) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = array[parent];
            if (key.compareTo((T) e) >= 0)
                break;
            array[k] = e;
            k = parent;
        }
        array[k] = key;
    }

分析:

先得解釋下(k - 1) >>> 1,就是求的商,我們來(lái)模擬插入五個(gè)數(shù)吧,默認(rèn)容量是11.

  • 第一次插入一個(gè)1,此時(shí)的k是0,x是1,k不大于0,直接插入。

    索引0
    1
  • 第二次我們插入一個(gè)0,此時(shí)的k是1,x是0,parent是0,然后獲取0位置索引的值和現(xiàn)在的比較,現(xiàn)在其實(shí)是不大于0的,所以此時(shí)交換了位置,array[k] = e; k = parent;parent是0,所以結(jié)束循環(huán)然后在0的位置設(shè)置當(dāng)前x是1。

    索引01
    01
  • 第三次我們插入一個(gè)5,此時(shí)的k是2,x是5,parent 是0,然后獲取0位置的值和插入值標(biāo)記,發(fā)現(xiàn)是大于0的所以直接插入,在2的位置插入5。

    索引012
    015
  • 第四次我們插入一個(gè)4,此時(shí)的k是3,x是4,parent是1,然后獲取1位置的值和插入值比較,發(fā)現(xiàn)是大于0的,所以直接插入在3的位置插入。

    索引0123
    0154
  • 第五次我們插入一個(gè)3,此時(shí)的k是4,x是3,parent是1,然后獲取1位置值和插入值做比較,發(fā)現(xiàn)大于0的,所以直接在4的位置插入。

    索引01234
    01543

我們用一個(gè)圖來(lái)描繪下這個(gè)數(shù)組,怎么出現(xiàn)的這個(gè)圖了,我們發(fā)現(xiàn)每次插入的數(shù)的索引就是數(shù)組的長(zhǎng)度,然后通過(guò)(i - 1)>>> 2 = n求父節(jié)點(diǎn),通過(guò)比較和父節(jié)點(diǎn)比較確認(rèn)自己的位置,左右子節(jié)點(diǎn)其實(shí)就是2n+1,2n+2,左右子節(jié)點(diǎn)就是數(shù)組的相鄰元素,我們發(fā)現(xiàn)子節(jié)點(diǎn)一定比父節(jié)點(diǎn)大,這就是最小堆;每次插入一個(gè)元素都是從最底層向上冒泡,維護(hù)最小堆的次序。

PriorityBlockingQueue 1.8 源碼解析

出隊(duì)方法
poll 方法

調(diào)用了 dequeue方法。

public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                // 彈出根節(jié)點(diǎn)
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    // 帶超時(shí)時(shí)間
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null && nanos > 0)
                nanos = notEmpty.awaitNanos(nanos);
        } finally {
            lock.unlock();
        }
        return result;
    }
take 方法

也是調(diào)用了dequeue方法,這個(gè)方法支持線(xiàn)程的中斷。

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
    }
dequeue 方法
private E dequeue() {
        int n = size - 1;
        // 隊(duì)列還沒(méi)有初始化
        if (n < 0)
            return null;
        else {
            Object[] array = queue;
            // 獲取根節(jié)點(diǎn)
            E result = (E) array[0];
            // 獲取尾節(jié)點(diǎn)
            E x = (E) array[n];
            // 尾節(jié)點(diǎn)置位null
            array[n] = null;
            Comparator cmp = comparator;
            // 重新排序最小堆
            if (cmp == null)
                siftDownComparable(0, x, array, n);
            else
                siftDownUsingComparator(0, x, array, n, cmp);
            size = n;
            return result;
        }
    }

其實(shí)上面就是返回了根節(jié)點(diǎn),然后獲取尾節(jié)點(diǎn)放在根節(jié)點(diǎn)的位置調(diào)整最小堆請(qǐng)看siftDownComparable方法。

siftDownComparable 方法
private static  void siftDownComparable(int k, T x, Object[] array,
                                               int n) {
        // n是數(shù)組的大索引 k開(kāi)始是0 x就是尾節(jié)點(diǎn)的值
        if (n > 0) {
            // x是最后一個(gè)節(jié)點(diǎn)的值
            Comparable key = (Comparable)x;
            int half = n >>> 1; // 最后一個(gè)節(jié)點(diǎn)的父節(jié)點(diǎn)           // loop while a non-leaf
            while (k < half) { // k是頭節(jié)點(diǎn) k> 了 說(shuō)明到最后了
                int child = (k << 1) + 1; // assume left child is least // 左子節(jié)點(diǎn)
                Object c = array[child]; // 左節(jié)點(diǎn)的值
                int right = child + 1;  // 右子節(jié)點(diǎn)
                if (right < n && // 左子節(jié)點(diǎn)大于由子節(jié)點(diǎn)
                        ((Comparable) c).compareTo((T) array[right]) > 0)
                    c = array[child = right]; // c就是右子節(jié)點(diǎn)
                if (key.compareTo((T) c) <= 0) // 找到了子節(jié)點(diǎn)比自己大的
                    break;
                array[k] = c;
                k = child;
            }
            array[k] = key;
        }

    }

分析:

我們上圖的5個(gè)元素為例,進(jìn)行一次出隊(duì)操作。

  • 彈出根節(jié)點(diǎn)0,然后調(diào)整最小堆。

PriorityBlockingQueue 1.8 源碼解析

  • 我們調(diào)用siftDownComparable 方法調(diào)整最小堆,我們看下參數(shù),此時(shí)的k是0,x是3,array就是這個(gè)數(shù)組,n就是4,key就是3,然后算half(half可以理解為堆中父節(jié)點(diǎn)大索引位置,找到這個(gè)節(jié)點(diǎn)說(shuō)明已經(jīng)沒(méi)有子節(jié)點(diǎn)了),half = 2。

  • 第一次,從0開(kāi)始,此時(shí)的child是1,c=1,right是2,c>arrray[right],不大于,此時(shí)的key小于c不小于,所以此時(shí)0的位置就變成了1(array[k] = c),k = child。

PriorityBlockingQueue 1.8 源碼解析

  • 第二次,從1開(kāi)始,此時(shí)的child是3,c是4,right是4,左邊大于右邊所以,c=array[child=right ] = 3,所以此時(shí)的child 是4,c是3,由于key <= c,所以借宿循環(huán),直接在1的位置設(shè)置c,調(diào)整結(jié)束。

PriorityBlockingQueue 1.8 源碼解析

說(shuō)下調(diào)整最小堆的過(guò)程,其實(shí)就是從根節(jié)點(diǎn)開(kāi)始,重新構(gòu)建父節(jié)點(diǎn)的過(guò)程,不過(guò)不是每個(gè)都需要重新構(gòu)建,只需要構(gòu)造子節(jié)點(diǎn)小的那邊的的父節(jié)點(diǎn),因?yàn)樾〉墓?jié)點(diǎn)都去頂替原來(lái)的父節(jié)點(diǎn)了;我們彈出的是根節(jié)點(diǎn),所以要從他的左右子節(jié)點(diǎn)找個(gè)根節(jié)點(diǎn)(但是要滿(mǎn)足子節(jié)點(diǎn)大于父節(jié)點(diǎn)的規(guī)則),那么左右子節(jié)點(diǎn)有一個(gè)去當(dāng)父節(jié)點(diǎn)了,它的位置也需要有節(jié)點(diǎn)代替,所以又從他的子節(jié)點(diǎn)開(kāi)始找接替的節(jié)點(diǎn),以此類(lèi)推,直到找到最后一個(gè)父節(jié)點(diǎn)的位置。

size 方法

使用了鎖,這個(gè)是精確的值。

public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return size;
        } finally {
            lock.unlock();
        }
    }
五,總結(jié)

PriorityBlockingQueue 是一個(gè)wujie的隊(duì)列,使用put方法不會(huì)阻塞,使用時(shí)一定要注意內(nèi)存溢出的問(wèn)題;整個(gè)隊(duì)列的出隊(duì)和入隊(duì)都是通過(guò)最小堆來(lái)實(shí)現(xiàn)的,理解最小堆是這個(gè)隊(duì)列的關(guān)鍵;這個(gè)一個(gè)優(yōu)先級(jí)的隊(duì)列,適合有優(yōu)先級(jí)的場(chǎng)景。

參考《Java 并發(fā)編程的藝術(shù)》

創(chuàng)新互聯(lián)www.cdcxhl.cn,專(zhuān)業(yè)提供香港、美國(guó)云服務(wù)器,動(dòng)態(tài)BGP最優(yōu)骨干路由自動(dòng)選擇,持續(xù)穩(wěn)定高效的網(wǎng)絡(luò)助力業(yè)務(wù)部署。公司持有工信部辦法的idc、isp許可證, 機(jī)房獨(dú)有T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確進(jìn)行流量調(diào)度,確保服務(wù)器高可用性。佳節(jié)活動(dòng)現(xiàn)已開(kāi)啟,新人活動(dòng)云服務(wù)器買(mǎi)多久送多久。


新聞標(biāo)題:PriorityBlockingQueue1.8源碼解析-創(chuàng)新互聯(lián)
瀏覽路徑:http://weahome.cn/article/ddpegd.html

其他資訊

在線(xiàn)咨詢(xún)

微信咨詢(xún)

電話(huà)咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部