Java中怎么使用BlockingQueue實(shí)現(xiàn)并發(fā),相信很多沒(méi)有經(jīng)驗(yàn)的人對(duì)此束手無(wú)策,為此本文總結(jié)了問(wèn)題出現(xiàn)的原因和解決方法,通過(guò)這篇文章希望你能解決這個(gè)問(wèn)題。
成都創(chuàng)新互聯(lián)公司從2013年創(chuàng)立,先為山東等服務(wù)建站,山東等地企業(yè),進(jìn)行企業(yè)商務(wù)咨詢服務(wù)。為山東企業(yè)網(wǎng)站制作PC+手機(jī)+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問(wèn)題。
阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩種附加操作的隊(duì)列。支持附加阻塞的插入和移除操作。
支持阻塞的插入:當(dāng)隊(duì)列滿時(shí),插入操作會(huì)被阻塞,直到隊(duì)列不滿。
支持阻塞的移除:當(dāng)隊(duì)列空時(shí),移除操作會(huì)被阻塞,直到隊(duì)列不空。
阻塞隊(duì)列不可用時(shí),操作處理方式
方法\處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時(shí)退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除方法 | remove() | poll() | take() | poll(time, unit) |
檢查方法 | element() | peek() | 無(wú) | 無(wú) |
拋出異常:隊(duì)列滿時(shí),若繼續(xù)插入元素會(huì)拋出IllegalStateException
;當(dāng)隊(duì)列為空時(shí),若獲取元素則會(huì)拋出NoSuchElementException
異常。
返回特殊值:向隊(duì)列插入元素時(shí),會(huì)返回是否插入成功true/false;獲取元素時(shí),成功則返回元素,失敗則返回null。
一直阻塞:當(dāng)阻塞隊(duì)列滿時(shí),若繼續(xù)使用put新增元素時(shí)會(huì)被阻塞,直到隊(duì)列不為空或者響應(yīng)中斷退出;當(dāng)阻塞隊(duì)列為空時(shí),繼續(xù)使用take獲取元素時(shí)會(huì)被阻塞,直到隊(duì)列不為空。
超時(shí)退出:當(dāng)阻塞隊(duì)列滿時(shí),使用offer(e, time, unit)新增元素會(huì)被阻塞至超時(shí)退出;當(dāng)隊(duì)列為空時(shí),使用poll(time, unit)獲取元素時(shí)會(huì)被阻塞至超時(shí)退出。
注意:
阻塞隊(duì)列中不允許插入null
,會(huì)拋出NPE異常。
可以訪問(wèn)阻塞隊(duì)列中的任意元素,調(diào)用remove(Object o)
可以將隊(duì)列之中的特定對(duì)象移除,但會(huì)遍歷全部元素,并不高效。
由數(shù)組構(gòu)成的有界阻塞隊(duì)列,內(nèi)部由數(shù)組final Object[] items
實(shí)現(xiàn)。默認(rèn)情況下不保證線程公平的訪問(wèn)隊(duì)列,所謂公平訪問(wèn)隊(duì)列指阻塞的線程,可以按照阻塞的先后順序訪問(wèn)隊(duì)列。
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); // 使用公平鎖/非公平鎖 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
隊(duì)列大小初始化后不可修改。參數(shù)fair
控制內(nèi)部ReentrantLock
是否采用公平鎖。
鏈表實(shí)現(xiàn)的有界阻塞隊(duì)列。內(nèi)部結(jié)構(gòu)是單鏈表。默認(rèn)大小為Integer.MAX_VALUE
,可以指定大小。
public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); // 指定隊(duì)列大小 this.capacity = capacity; last = head = new Node(null); } // 單鏈表節(jié)點(diǎn)Node static class Node { E item; Node next; Node(E x) { item = x; } }
支持優(yōu)先級(jí)的無(wú)界阻塞隊(duì)列。默認(rèn)情況下采取自然順序升序排列。也可以自定義compareTo()
方法來(lái)指定元素的排列順序,或者初始化隊(duì)列時(shí),指定構(gòu)造參數(shù)Comparator
來(lái)對(duì)元素進(jìn)行排序。同優(yōu)先級(jí)順序無(wú)法保證。
public PriorityBlockingQueue(int initialCapacity, Comparator super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); // 非公平鎖 this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; } // offer方法部分代碼 Comparator super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp);
由offer代碼可以看出,Comparator
的優(yōu)先級(jí)是大于Comparable.compareTo
方法的。
注意:PriorityBlockingQueue
不會(huì)阻塞數(shù)據(jù)生產(chǎn)者(隊(duì)列無(wú)界),只會(huì)在沒(méi)有數(shù)據(jù)時(shí)阻塞消費(fèi)者。生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度絕對(duì)不能快于消費(fèi)者消費(fèi)數(shù)據(jù)的速度,否則將有可能耗盡堆空間。
支持延時(shí)獲取元素的無(wú)界隊(duì)列。隊(duì)列使用PriorityQueue
實(shí)現(xiàn)。隊(duì)列中的元素必須實(shí)現(xiàn)java.util.concurrent.Delayed
接口,在創(chuàng)建元素時(shí)指定多久才能才能從隊(duì)列中取到元素。
DelayQueue非常有用,可以將DelayQueu應(yīng)用在以下應(yīng)用場(chǎng)景。
緩存系統(tǒng)的設(shè)計(jì):用DelayQueue保存緩存元素的有效期,使用一個(gè)線程循環(huán)查詢DelayQueue,一旦能獲取到元素時(shí),表示緩存有限期到了。
定時(shí)任務(wù)調(diào)度:使用DelayQueue保存當(dāng)天將會(huì)執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從DelayQueue中獲取到任務(wù)就開始執(zhí)行。比如TimerQueue
就是使用DelayQueue實(shí)現(xiàn)的。
不存儲(chǔ)元素的阻塞隊(duì)列。每個(gè)put
操作都必須等待一個(gè)take
操作,反之亦然。
// fair為true,等待線程將以FIFO的順序進(jìn)行訪問(wèn) public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack (); }
將生產(chǎn)者線程處理的數(shù)據(jù)直接傳遞給消費(fèi)者線程。隊(duì)列本身不存儲(chǔ)任何元素,非常適合傳遞性場(chǎng)景。SynchronousQueue
的吞吐量高于ArrayBlockingQueue
和LinkedBlockingQueue
。
利用Lock
鎖的多條件(Condition)阻塞控制。下面簡(jiǎn)單分析下ArrayBlockingQueue
部分代碼。
/** The queued items */ // 數(shù)據(jù)元素?cái)?shù)組 final Object[] items; /** items index for next take, poll, peek or remove */ // 下一個(gè)待獲取元素索引 int takeIndex; /** items index for next put, offer, or add */ // 下一個(gè)待插入元素索引 int putIndex; /** Number of elements in the queue */ // 隊(duì)列中元素個(gè)數(shù) int count; /* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ // 所有訪問(wèn)的主鎖 final ReentrantLock lock; /** Condition for waiting takes */ // 消費(fèi)者監(jiān)視器 private final Condition notEmpty; /** Condition for waiting puts */ // 生產(chǎn)者監(jiān)視器 private final Condition notFull; // public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
// 在隊(duì)列尾部插入元素,若隊(duì)列已滿則等待隊(duì)列非滿。 public void put(E e) throws InterruptedException { // 校驗(yàn)插入元素,為空則拋出NPE checkNotNull(e); final ReentrantLock lock = this.lock; // 1. 嘗試獲取鎖(響應(yīng)中斷) lock.lockInterruptibly(); try { // 2. 當(dāng)隊(duì)列滿時(shí) while (count == items.length) // 2.1 若隊(duì)列滿,則阻塞當(dāng)前線程。等待`notFull.signal()`喚醒。 notFull.await(); // 3. 非滿則執(zhí)行入隊(duì)操作 enqueue(e); } finally { lock.unlock(); } } // 在`putIndex`處放置當(dāng)前元素,只有獲取lock鎖后才會(huì)調(diào)用 private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; // 在`putIndex`處放置元素 items[putIndex] = x; // putIndex等于數(shù)組長(zhǎng)度時(shí),重置為0索引。 if (++putIndex == items.length) putIndex = 0; // 數(shù)量加1 count++; // 4. 喚醒一個(gè)等待線程(等待取元素的線程) notEmpty.signal(); }
put總體流程:
獲取lock鎖,拿到鎖后繼續(xù)執(zhí)行,否則自旋競(jìng)爭(zhēng)鎖。
判斷阻塞隊(duì)列是否滿。滿了了則調(diào)用await
阻塞當(dāng)前線程。同時(shí)釋放lock鎖。
如果沒(méi)滿,則調(diào)用enqueue
方法將元素put進(jìn)阻塞隊(duì)列。此時(shí)還有一種可能是:第2步中被阻塞的線程被喚醒且又拿到了lock鎖。
喚醒一個(gè)標(biāo)記為notEmpty(消費(fèi)者)
的線程。
// 從頭部獲取元素,若隊(duì)列為空則等待隊(duì)列非空。 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 1. 獲取鎖 lock.lockInterruptibly(); try { // 2. 當(dāng)隊(duì)列為空時(shí) while (count == 0) // 2.1 當(dāng)隊(duì)列為空時(shí),阻塞當(dāng)前線程。等待`notEmpty.signal()`喚醒。 notEmpty.await(); // 3. 非空則進(jìn)行入隊(duì)操作 return dequeue(); } finally { lock.unlock(); } } // 從`takeIndex`位置獲取當(dāng)前元素,只有獲取到lock鎖后才會(huì)調(diào)用 private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") // 從`takeIndex`位置獲取元素,然后清除該位置元素 E x = (E) items[takeIndex]; items[takeIndex] = null; // if (++takeIndex == items.length) takeIndex = 0; // 隊(duì)列元素減1 count--; if (itrs != null) itrs.elementDequeued(); // 4. 喚醒一個(gè)標(biāo)記為notFull(生產(chǎn)者)的線程 notFull.signal(); return x; }
take的整體流程:
獲取lock鎖,拿到鎖則執(zhí)行下一步流程;未拿到則自旋競(jìng)爭(zhēng)鎖。
當(dāng)前隊(duì)列是否為空,若為空則調(diào)用notEmpty.await
阻塞當(dāng)前線程,同時(shí)釋放鎖,等待被喚醒。
若非空,則調(diào)用dequeue
進(jìn)行出隊(duì)操作。此時(shí)還有一種可能:第2步中的阻塞的線程被喚醒并且又拿到了lock鎖。
喚醒一個(gè)被標(biāo)記為notFull(生產(chǎn)者)的線程。
put
和take
操作都需要先獲得鎖,沒(méi)有獲得鎖的線程無(wú)法進(jìn)行操作。
拿到鎖后,并不一定能順利執(zhí)行put
/take
操作,還需要判斷隊(duì)列是否可用(是否滿/空),不可用則會(huì)被阻塞,并釋放鎖。
在2中被阻塞的線程會(huì)被喚醒,但喚醒之后依然需要拿到鎖之后才能繼續(xù)向下執(zhí)行。否則,自旋拿鎖,拿到鎖后再while判斷隊(duì)列是否可用。
看完上述內(nèi)容,你們掌握J(rèn)ava中怎么使用BlockingQueue實(shí)現(xiàn)并發(fā)的方法了嗎?如果還想學(xué)到更多技能或想了解更多相關(guān)內(nèi)容,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!