真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

ConcurrentLinkedQueue1.8源碼淺析

[TOC]

讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來自于我們對(duì)這個(gè)行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡(jiǎn)單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價(jià)值的長(zhǎng)期合作伙伴,公司提供的服務(wù)項(xiàng)目有:域名與空間、網(wǎng)絡(luò)空間、營(yíng)銷軟件、網(wǎng)站建設(shè)、鄂城網(wǎng)站維護(hù)、網(wǎng)站推廣。

ConcurrentLinkedQueue 1.8 源碼淺析

一,簡(jiǎn)介

ConcurrentlinkedQueue 還是一個(gè)基于鏈表的,×××的,線程安全的單端隊(duì)列,它采用先進(jìn)先出(FIFO)的規(guī)則對(duì)節(jié)點(diǎn)進(jìn)行排序,當(dāng)我們加入一個(gè)元素時(shí),它會(huì)插入隊(duì)列的尾部,當(dāng)我們獲取元素時(shí),會(huì)從隊(duì)列的首部獲取元素。它沒有使用鎖來保證線程安全,使用的是“wait-free”算法來保證整個(gè)隊(duì)列的線程安全。

二,基本成員簡(jiǎn)介
Node 節(jié)點(diǎn)對(duì)象
        // 存儲(chǔ)的數(shù)據(jù)
        volatile E item;
        // 下一個(gè)節(jié)點(diǎn)引用
        volatile Node next;

        /**
         * Constructs a new node.  Uses relaxed write because item can
         * only be seen after publication via casNext.
         */
        // 構(gòu)造一個(gè)node節(jié)點(diǎn)
        Node(E item) {
            UNSAFE.putObject(this, itemOffset, item);
        }

        // 修改節(jié)點(diǎn)的item
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }

        // 懶修改節(jié)點(diǎn)的next
        void lazySetNext(Node val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        }

        // cas修改節(jié)點(diǎn)的next節(jié)點(diǎn)
        boolean casNext(Node cmp, Node val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
head 頭節(jié)點(diǎn)
 private transient volatile Node head;
tail 尾節(jié)點(diǎn)
private transient volatile Node tail;
三,常用方法解析
無參構(gòu)造方法
    public ConcurrentLinkedQueue() {
        head = tail = new Node(null);
    }

構(gòu)造節(jié)點(diǎn)是其實(shí)就是構(gòu)造了一個(gè)node的item為null的節(jié)點(diǎn),然后head和tail指向這個(gè)節(jié)點(diǎn),如下圖所示:

ConcurrentLinkedQueue 1.8 源碼淺析

add 方法
public boolean add(E e) {
        return offer(e);
    }

我們可以看出其實(shí)調(diào)用的是offer方法,具體參考o(jì)ffer方法的講解。

offer 方法

源碼解析:

    public boolean offer(E e) {
        // 入隊(duì)元素不能為null
        checkNotNull(e);
        // 創(chuàng)建新的節(jié)點(diǎn)
        final Node newNode = new Node(e);
        // 死循環(huán),設(shè)置節(jié)點(diǎn)
        // p獲取尾節(jié)點(diǎn)
        for (Node t = tail, p = t;;) {
            // q是p的next節(jié)點(diǎn)
            Node q = p.next;
            // 獲取尾節(jié)點(diǎn)的next節(jié)點(diǎn)
            // 尾節(jié)點(diǎn)沒有下一個(gè)節(jié)點(diǎn)
            if (q == null) {
                // p is last node
                // 這一步說明p是尾節(jié)點(diǎn),新的節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)的next節(jié)點(diǎn)
                if (p.casNext(null, newNode)) {
                    // Successful CAS is the linearization point
                    // for e to become an element of this queue,
                    // and for newNode to become "live".
                    // 設(shè)置尾節(jié)點(diǎn),當(dāng)之前的尾節(jié)點(diǎn)和現(xiàn)在插入的節(jié)點(diǎn)之間有一個(gè)節(jié)點(diǎn)時(shí)
                    // 并不是每一次都cas設(shè)置尾節(jié)點(diǎn)(優(yōu)化手段,是怎么想到這種優(yōu)化的??)
                    if (p != t) // hop two nodes at a time
                        // cas設(shè)置尾節(jié)點(diǎn),可能會(huì)失敗
                        casTail(t, newNode);  // Failure is OK.
                    return true;
                }
                // Lost CAS race to another thread; re-read next
            }
            // 多線程操作時(shí)候,由于poll時(shí)候會(huì)把舊的head變?yōu)樽砸?,然后將head的next設(shè)置為新的head
            // 所以這里需要重新找新的head
            else if (p == q)
                // We have fallen off list.  If tail is unchanged, it
                // will also be off-list, in which case we need to
                // jump to head, from which all live nodes are always
                // reachable.  Else the new tail is a better bet.
                p = (t != (t = tail)) ? t : head;
            else
                // 尋找尾節(jié)點(diǎn)
                // Check for tail updates after two hops.
                // p!=t
                p = (p != t && t != (t = tail)) ? t : q;
        }
    }

分析插入過程,我們插入使用3個(gè)線程來調(diào)用offer 方法,ThreadA,ThreadB同時(shí)運(yùn)行,ThreadC最后插入,分析下offer方法的流程。

第一步,隊(duì)列屬于初始化狀態(tài),ThreadA,ThreadB同時(shí)調(diào)用offer方法;創(chuàng)建節(jié)點(diǎn),死循環(huán)設(shè)置節(jié)點(diǎn),獲取尾節(jié)點(diǎn)的next節(jié)點(diǎn),此時(shí)q== null,兩個(gè)線程都同時(shí)可能看見,然后cas設(shè)置尾節(jié)點(diǎn)的next節(jié)點(diǎn)(隊(duì)列狀態(tài)如圖A所示),我們假設(shè)是ThreadA線程cas設(shè)置成功了,然后p==t此時(shí)的尾節(jié)點(diǎn)其實(shí)沒有發(fā)生變化;此時(shí)我們來看ThreadB由于A成功了,所以ThreadB cas失敗了,重新循環(huán),此時(shí)q != null了,p == q顯然不等于,再看下一個(gè)else判斷p!=t,此時(shí)顯然p == t,所以才是p = q,然后再次循環(huán),此時(shí)的q==null,我們假設(shè)沒有線程來和ThreadB競(jìng)爭(zhēng),所以cas設(shè)置成功,然后p!=t嗎,顯然滿足所以設(shè)置尾節(jié)點(diǎn),此時(shí)的設(shè)置尾節(jié)點(diǎn)的節(jié)點(diǎn)和之前的尾節(jié)點(diǎn)之間剛剛好有一個(gè)節(jié)點(diǎn)(如圖B所示)。

ConcurrentLinkedQueue 1.8 源碼淺析

ConcurrentLinkedQueue 1.8 源碼淺析

第二步,ThreadC插入,此時(shí)的尾節(jié)點(diǎn)是ThreadB插入的節(jié)點(diǎn)假設(shè)是B,獲取B的next節(jié)點(diǎn),q == null,然后cas設(shè)置節(jié)點(diǎn),完成,p==t,所以不用更新尾節(jié)點(diǎn)(如圖C所示)。

ConcurrentLinkedQueue 1.8 源碼淺析

peek 方法

注意:不會(huì)刪除元素,要和poll方法區(qū)別

    public E peek() {
        restartFromHead:
        for (;;) {
            for (Node h = head, p = h, q;;) {
                E item = p.item;
                if (item != null || (q = p.next) == null) {
                    // 更新頭結(jié)點(diǎn)
                    updateHead(h, p);
                    return item;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }
poll 方法
    public E poll() {
        restartFromHead:
        // 循環(huán)
        for (;;) {
            // 獲取頭結(jié)點(diǎn)
            for (Node h = head, p = h, q;;) {
                // 獲取節(jié)點(diǎn)的內(nèi)容
                E item = p.item;
                // item不為null ,使用cas設(shè)置item為空
                if (item != null && p.casItem(item, null)) {
                    // Successful CAS is the linearization point
                    // for item to be removed from this queue.
                    // 更新頭結(jié)點(diǎn),和尾節(jié)點(diǎn)一樣,不是每次都更新
                    // 頭結(jié)點(diǎn)item為null是,下個(gè)節(jié)點(diǎn)就必須更新頭結(jié)點(diǎn)
                    // 頭結(jié)點(diǎn)item不為null時(shí),規(guī)則和更新尾節(jié)點(diǎn)一樣
                    if (p != h) // hop two nodes at a time
                        updateHead(h, ((q = p.next) != null) ? q : p);
                    return item;
                }
                // 那么獲取p節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),如果p節(jié)點(diǎn)的下一節(jié)點(diǎn)為null,則表明隊(duì)列已經(jīng)空了
                else if ((q = p.next) == null) {
                    updateHead(h, p);
                    return null;
                }
                // p == q,說明別的線程調(diào)用了updateHead,
                // 自己的next 指向了自己,重新循環(huán),獲取最新的頭結(jié)點(diǎn)
                else if (p == q)
                    continue restartFromHead;
                // 如果下一個(gè)元素不為空,則將頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)設(shè)置成頭節(jié)點(diǎn)
                else
                    p = q;
            }
        }
    }

    final void updateHead(Node h, Node p) {
        if (h != p && casHead(h, p))
            h.lazySetNext(h);
    }

分析我們按照offer時(shí)候元素來執(zhí)行poll方法,ThreadD和ThreadE同時(shí)執(zhí)行來分析下隊(duì)列的變化(主要分析p==q的產(chǎn)生)。

初始狀態(tài)(如圖C所示)

ConcurrentLinkedQueue 1.8 源碼淺析

第一步,ThreadD和ThreadE執(zhí)行poll操作,item等于null,所以執(zhí)行執(zhí)行下面的操作(q = p.next) == null不等于,p == q不等于,所以p = q,其實(shí)就是上圖的ThreadA插入的節(jié)點(diǎn),此時(shí)的item已經(jīng)不為null了,所以執(zhí)行cas設(shè)置item為null的操作,假設(shè)ThreadD執(zhí)行成功了,那么此時(shí)p!=h就滿足了,所以此時(shí)要更新頭結(jié)點(diǎn)調(diào)用updateHead,這個(gè)方法會(huì)更新頭結(jié)點(diǎn),并且把原來的頭節(jié)點(diǎn)的next設(shè)置為自己)(如圖D所示);接下我們分析ThreadE,cas失敗了需要重新執(zhí)行,此時(shí)的item已經(jīng)不為null,所以執(zhí)行執(zhí)行下面的操作(q = p.next) == null不等于,p == q這使其實(shí)已經(jīng)是等于了,因?yàn)門hreadD改變了了以前頭結(jié)點(diǎn)的next節(jié)點(diǎn)為自己,所以需要重新遍歷,獲取最新的頭結(jié)點(diǎn),此時(shí)的頭結(jié)點(diǎn)其實(shí)就是ThreadA插入的節(jié)點(diǎn),然后item為null,接著執(zhí)行下面的判斷,最終p就是p.next節(jié)點(diǎn)也就是ThreadB節(jié)點(diǎn),然后cas設(shè)置item為null,由于p=p.next,所以p發(fā)生了變化,所以需要設(shè)置ThreadB為頭結(jié)點(diǎn)(如圖E所示)。

ConcurrentLinkedQueue 1.8 源碼淺析

ConcurrentLinkedQueue 1.8 源碼淺析
看到上面的執(zhí)行流程可能就有人有疑問了,這不是每次都更新頭結(jié)點(diǎn)嗎,沒有優(yōu)化啊,只看poll方法確實(shí)是這樣,那什么時(shí)候會(huì)產(chǎn)生不是每次都更新頭節(jié)點(diǎn)了,那就是當(dāng)頭節(jié)點(diǎn)的item不為null的時(shí)候,但是如果按初始化的狀況來看,頭結(jié)點(diǎn)的item一直是null,但是當(dāng)我看了peek方法之后才發(fā)現(xiàn),peek可以改變這個(gè)情況,可以設(shè)置item不為null的頭結(jié)點(diǎn),其實(shí)我們可以在poll方法前調(diào)用下peek方法,其實(shí)就啟動(dòng)了優(yōu)化策略,不是每次更新頭結(jié)點(diǎn),不知道作者是不是這么設(shè)計(jì)的,反正就是牛皮。

size 方法
public int size() {
        int count = 0;
        for (Node p = first(); p != null; p = succ(p))
            if (p.item != null)
                // Collection.size() spec says to max out
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }

我們可以發(fā)現(xiàn)size沒有加鎖,就是遍歷了整個(gè)隊(duì)列,但是遍歷的同時(shí)可能在發(fā)生poll或者offer,所以size不是特別的精確,用的時(shí)候要注意。

四,總結(jié)

ConcurrentLinkedQueue是×××的隊(duì)列,所以使用時(shí)一定要注意內(nèi)存溢出的問題,還有在執(zhí)行size方法時(shí)一定要注意這個(gè)是不準(zhǔn)確的值;在學(xué)poll和offer方法時(shí),一定要理解更新head和tail節(jié)點(diǎn)的時(shí)機(jī),這種優(yōu)化手段值得我們?nèi)W(xué)習(xí),我覺得這就是學(xué)習(xí)源碼的作用,就是學(xué)習(xí)作者的源碼思想。

參考《Java 并發(fā)編程的藝術(shù)》


當(dāng)前名稱:ConcurrentLinkedQueue1.8源碼淺析
網(wǎng)站網(wǎng)址:http://weahome.cn/article/ihodjs.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部