真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

如何理解ArrayBlockingQueue的線程安全

本篇內(nèi)容介紹了“如何理解ArrayBlockingQueue的線程安全”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!

你所需要的網(wǎng)站建設(shè)服務(wù),我們均能行業(yè)靠前的水平為你提供.標(biāo)準(zhǔn)是產(chǎn)品質(zhì)量的保證,主要從事網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)、企業(yè)網(wǎng)站建設(shè)、手機(jī)網(wǎng)站制作、網(wǎng)頁(yè)設(shè)計(jì)、成都品牌網(wǎng)站建設(shè)、網(wǎng)頁(yè)制作、做網(wǎng)站、建網(wǎng)站。創(chuàng)新互聯(lián)擁有實(shí)力堅(jiān)強(qiáng)的技術(shù)研發(fā)團(tuán)隊(duì)及素養(yǎng)的視覺(jué)設(shè)計(jì)專(zhuān)才。

ArrayBlockingQueue的線程安全是通過(guò)底層的ReentrantLock保證的,因此在元素出入隊(duì)列操作時(shí),無(wú)需額外加鎖。寫(xiě)一段簡(jiǎn)單的代碼舉個(gè)例子,從具體的使用來(lái)說(shuō)明它的線程安全吧

ArrayBlockingQueue queue=new ArrayBlockingQueue(7,
        true, new ArrayList<>(Arrays.asList(new Integer[]{1,2,3,4,5,6,7})));

@AllArgsConstructor
class Task implements Runnable{
    String threadName;
    @Override
    public void run() {
        while(true) {
            try {
                System.out.println(threadName+" take: "+queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

private void queueTest(){
    new Thread(new Task("Thread 1")).start();
    new Thread(new Task("Thread 2")).start();
}

在代碼中創(chuàng)建隊(duì)列時(shí)就往里放入了7個(gè)元素,然后創(chuàng)建兩個(gè)線程各自從隊(duì)列中取出元素。對(duì)隊(duì)列的操作也非常簡(jiǎn)單,只用到了操作隊(duì)列中出隊(duì)方法take,運(yùn)行結(jié)果如下:

Thread 1 take: 1
Thread 2 take: 2
Thread 1 take: 3
Thread 2 take: 4
Thread 1 take: 5
Thread 2 take: 6
Thread 1 take: 7

可以看到在公平模式下,兩個(gè)線程交替對(duì)隊(duì)列中的元素執(zhí)行出隊(duì)操作,并沒(méi)有出現(xiàn)重復(fù)取出的情況,即保證了多個(gè)線程對(duì)資源競(jìng)爭(zhēng)的互斥訪問(wèn)。它的過(guò)程如下:

如何理解ArrayBlockingQueue的線程安全

面試官:那它的阻塞性呢?

Hydra:好的,還是寫(xiě)段代碼通過(guò)例子來(lái)說(shuō)明

private static void queueTest() throws InterruptedException {
    ArrayBlockingQueue queue=new ArrayBlockingQueue<>(3);
    int size=7;
    Thread putThread=new Thread(()->{
        for (int i = 0; i  {
        for (int i = 0; i < size+1 ; i++) {
            try {
                Thread.sleep(3000);
                System.out.println("TakeThread take: "+queue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    putThread.start();
    Thread.sleep(1000);
    takeThread.start();
}

和第一個(gè)例子中的代碼不同,這次我們創(chuàng)建隊(duì)列時(shí)只指定長(zhǎng)度,并不在初始化時(shí)就往隊(duì)列中放入元素。接下來(lái)創(chuàng)建兩個(gè)線程,一個(gè)線程充當(dāng)生產(chǎn)者,生產(chǎn)產(chǎn)品放入到隊(duì)列中,另一個(gè)線程充當(dāng)消費(fèi)者,消費(fèi)隊(duì)列中的產(chǎn)品。需要注意生產(chǎn)和消費(fèi)的速度是不同的,生產(chǎn)者每一秒生產(chǎn)一個(gè),而消費(fèi)者每三秒才消費(fèi)一個(gè)。執(zhí)行上面的代碼,運(yùn)行結(jié)果如下:

PutThread put: 0 - Size:1
PutThread put: 1 - Size:2
PutThread put: 2 - Size:3
TakeThread take: 0
PutThread put: 3 - Size:3
TakeThread take: 1
PutThread put: 4 - Size:3
TakeThread take: 2
PutThread put: 5 - Size:3
TakeThread take: 3
PutThread put: 6 - Size:3
TakeThread take: 4
TakeThread take: 5
TakeThread take: 6

來(lái)給你畫(huà)個(gè)比較直觀的圖吧:

如何理解ArrayBlockingQueue的線程安全

分析運(yùn)行結(jié)果,能夠在兩個(gè)方面體現(xiàn)出隊(duì)列的阻塞性:

  • 入隊(duì)阻塞:當(dāng)隊(duì)列中的元素個(gè)數(shù)等于隊(duì)列長(zhǎng)度時(shí),會(huì)阻塞向隊(duì)列中放入元素的操作,當(dāng)有出隊(duì)操作取走隊(duì)列中元素,隊(duì)列出現(xiàn)空缺位置后,才會(huì)再進(jìn)行入隊(duì)

  • 出隊(duì)阻塞:當(dāng)隊(duì)列中的元素為空時(shí),執(zhí)行出隊(duì)操作的線程將被阻塞,直到隊(duì)列不為空時(shí)才會(huì)再次執(zhí)行出隊(duì)操作。在上面的代碼的出隊(duì)線程中,我們故意將出隊(duì)的次數(shù)設(shè)為了隊(duì)列中元素?cái)?shù)量加一,因此這個(gè)線程最后會(huì)被一直阻塞,程序?qū)⒁恢眻?zhí)行不會(huì)結(jié)束

面試官:你只會(huì)用puttake方法嗎,能不能講講其他的方法?

Hydra:方法太多了,簡(jiǎn)單概括一下插入和移除相關(guān)的操作吧

如何理解ArrayBlockingQueue的線程安全

面試官:方法記得還挺清楚,看樣子是個(gè)合格的 API caller。下面說(shuō)說(shuō)原理吧,先講一下ArrayBlockingQueue 的結(jié)構(gòu)

Hydra:在ArrayBlockingQueue 中有下面四個(gè)比較重要的屬性

final Object[] items;
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}

在構(gòu)造函數(shù)中對(duì)它們進(jìn)行了初始化:

  • Object[] items:隊(duì)列的底層由數(shù)組組成,并且數(shù)組的長(zhǎng)度在初始化就已經(jīng)固定,之后無(wú)法改變

  • ReentrantLock lock:用對(duì)控制隊(duì)列操作的獨(dú)占鎖,在操作隊(duì)列的元素前需要獲取鎖,保護(hù)競(jìng)爭(zhēng)資源

  • Condition notEmpty:條件對(duì)象,如果有線程從隊(duì)列中獲取元素時(shí)隊(duì)列為空,就會(huì)在此進(jìn)行等待,直到其他線程向隊(duì)列后插入元素才會(huì)被喚醒

  • Condition notFull:如果有線程試圖向隊(duì)列中插入元素,且此時(shí)隊(duì)列為滿(mǎn)時(shí),就會(huì)在這進(jìn)行等待,直到其他線程取出隊(duì)列中的元素才會(huì)被喚醒

Condition是一個(gè)接口,代碼中的notFullnotEmpty實(shí)例化的是AQS的內(nèi)部類(lèi)ConditionObject,它的內(nèi)部是由AQS中的Node組成的等待鏈,ConditionObject中有一個(gè)頭節(jié)點(diǎn)firstWaiter和尾節(jié)點(diǎn)lastWaiter,并且每一個(gè)Node都有指向相鄰節(jié)點(diǎn)的指針。簡(jiǎn)單的來(lái)說(shuō),它的結(jié)構(gòu)是下面這樣的:

如何理解ArrayBlockingQueue的線程安全

至于它的作用先賣(mài)個(gè)關(guān)子,放在后面講。除此之外,還有兩個(gè)int類(lèi)型的屬性takeIndexputIndex,表示獲取元素的索引位置和插入元素的索引位置。假設(shè)一個(gè)長(zhǎng)度為5的隊(duì)列中已經(jīng)有了3個(gè)元素,那么它的結(jié)構(gòu)是這樣的:

如何理解ArrayBlockingQueue的線程安全

面試官:說(shuō)一下隊(duì)列的插入操作吧

Hydra:好的,那我們先說(shuō)addoffer方法,在執(zhí)行add方法時(shí),調(diào)用了其父類(lèi)AbstractQueue中的add方法。add方法則調(diào)用了offer方法,如果添加成功返回true,添加失敗時(shí)拋出異常,看一下源碼:

public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}

public boolean offer(E e) {
    checkNotNull(e);//檢查元素非空
    final ReentrantLock lock = this.lock; //獲取鎖并加鎖
    lock.lock();
    try {
        if (count == items.length)//隊(duì)列已滿(mǎn)
            return false;
        else {
            enqueue(e);//入隊(duì)
            return true;
        }
    } finally {
        lock.unlock();
    }
}

實(shí)際將元素加入隊(duì)列的核心方法enqueue

private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x; 
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

enqueue中,首先將元素放入數(shù)組中下標(biāo)為putIndex的位置,然后對(duì)putIndex自增,并判斷是否已處于隊(duì)列中最后一個(gè)位置,如果putIndex索引位置等于數(shù)組的長(zhǎng)度時(shí),那么將putIndex置為0,即下一次在元素入隊(duì)時(shí),從隊(duì)列頭開(kāi)始放置。

舉個(gè)例子,假設(shè)有一個(gè)長(zhǎng)度為5的隊(duì)列,現(xiàn)在已經(jīng)有4個(gè)元素,我們進(jìn)行下面一系列的操作,來(lái)看一下索引下標(biāo)的變化:

如何理解ArrayBlockingQueue的線程安全

上面這個(gè)例子提前用到了隊(duì)列中元素被移除時(shí)takeIndex會(huì)自增的知識(shí)點(diǎn),通過(guò)這個(gè)例子中索引的變化,可以看出ArrayBlockingQueue就是一個(gè)循環(huán)隊(duì)列,takeIndex就相當(dāng)于隊(duì)列的頭指針,而putIndex相當(dāng)于隊(duì)列的尾指針的下一個(gè)位置索引。并且這里不需要擔(dān)心在隊(duì)列已滿(mǎn)時(shí)還會(huì)繼續(xù)向隊(duì)列中添加元素,因?yàn)樵?code>offer方法中會(huì)首先判斷隊(duì)列是否已滿(mǎn),只有在隊(duì)列不滿(mǎn)時(shí)才會(huì)執(zhí)行enqueue方法。

面試官:這個(gè)過(guò)程我明白了,那enqueue方法里最后的notEmpty.signal()是什么意思?

Hydra:這是一個(gè)喚醒操作,等后面講完它的掛起后再說(shuō)。我還是先把插入操作中的put方講完吧,看一下它的源碼:

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

put方法是一個(gè)阻塞方法,當(dāng)隊(duì)列中元素未滿(mǎn)時(shí),會(huì)直接調(diào)用enqueue方法將元素加入隊(duì)列中。如果隊(duì)列已滿(mǎn),就會(huì)調(diào)用notFull.await()方法將掛起當(dāng)前線程,直到隊(duì)列不滿(mǎn)時(shí)才會(huì)被喚醒,繼續(xù)執(zhí)行插入操作。

當(dāng)隊(duì)列已滿(mǎn),再執(zhí)行put操作時(shí),就會(huì)執(zhí)行下面的流程:

如何理解ArrayBlockingQueue的線程安全

這里提前劇透一下,當(dāng)隊(duì)列中有元素被移除,在調(diào)用dequeue方法中的notFull.signal()時(shí),會(huì)喚醒等待隊(duì)列中的線程,并把對(duì)應(yīng)的元素添加到隊(duì)列中,流程如下:

如何理解ArrayBlockingQueue的線程安全

做一個(gè)總結(jié),在插入元素的幾個(gè)方法中,addoffer以及帶有超時(shí)的offer方法都是非阻塞的,會(huì)立即返回或超時(shí)后立即返回,而put方法是阻塞的,只有當(dāng)隊(duì)列不滿(mǎn)添加成功后才會(huì)被返回。

面試官:講的不錯(cuò),講完插入操作了再講講移除操作吧

Hydra:還是老規(guī)矩,先說(shuō)非阻塞的方法removepoll,父類(lèi)的remove方法還是會(huì)調(diào)用子類(lèi)的poll方法,不同的是remove方法在隊(duì)列為空時(shí)拋出異常,而poll會(huì)直接返回null。這兩個(gè)方法的核心還是調(diào)用的dequeue方法,它的源碼如下:

private E dequeue() {
    final Object[] items = this.items;
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        //更新迭代器中的元素
        itrs.elementDequeued();
    notFull.signal();
    return x;
}

dequeue中,在獲取到數(shù)組下標(biāo)為takeIndex的元素,并將該位置置為null。將takeIndex自增后判斷是否與數(shù)組長(zhǎng)度相等,如果相等還是按之前循環(huán)隊(duì)列的理論,將它的索引置為0,并將隊(duì)列的中的計(jì)數(shù)減1。

有一個(gè)隊(duì)列初始化時(shí)有5個(gè)元素,我們對(duì)齊分別進(jìn)行5次的出隊(duì)操作,查看索引下標(biāo)的變化情況:

如何理解ArrayBlockingQueue的線程安全

然后我們還是結(jié)合take方法來(lái)說(shuō)明線程的掛起和喚醒的操作,與put方法相對(duì),take用于阻塞獲取元素,來(lái)看一下它的源碼:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

take是一個(gè)可以被中斷的阻塞獲取元素的方法,首先判斷隊(duì)列是否為空,如果隊(duì)列不為空那么就調(diào)用dequeue方法移除元素,如果隊(duì)列為空時(shí)就調(diào)用notEmpty.await()就將當(dāng)前線程掛起,直到有其他的線程調(diào)用了enqueue方法,才會(huì)喚醒等待隊(duì)列中被掛起的線程。可以參考下面的圖來(lái)理解:

如何理解ArrayBlockingQueue的線程安全

當(dāng)有其他線程向隊(duì)列中插入元素后:

如何理解ArrayBlockingQueue的線程安全

入隊(duì)的enqueue方法會(huì)調(diào)用notEmpty.signal(),喚醒等待隊(duì)列中firstWaiter指向的節(jié)中的線程,并且該線程會(huì)調(diào)用dequeue完成元素的出隊(duì)操作。到這移除的操作就也分析完了,至于開(kāi)頭為什么說(shuō)ArrayBlockingQueue是線程安全的,看到每個(gè)方法前都通過(guò)全局單例的lock加鎖,相信你也應(yīng)該明白了

“如何理解ArrayBlockingQueue的線程安全”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!


文章標(biāo)題:如何理解ArrayBlockingQueue的線程安全
轉(zhuǎn)載來(lái)源:http://weahome.cn/article/pjhgjh.html

其他資訊

在線咨詢(xún)

微信咨詢(xún)

電話咨詢(xún)

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部