在上一章中,我們介紹了阻塞隊(duì)列BlockingQueue,下面我們介紹它的常用實(shí)現(xiàn)類ArrayBlockingQueue。
作為一家“創(chuàng)意+整合+營(yíng)銷”的成都網(wǎng)站建設(shè)機(jī)構(gòu),我們?cè)跇I(yè)內(nèi)良好的客戶口碑。成都創(chuàng)新互聯(lián)公司提供從前期的網(wǎng)站品牌分析策劃、網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)、網(wǎng)站制作、創(chuàng)意表現(xiàn)、網(wǎng)頁(yè)制作、系統(tǒng)開發(fā)以及后續(xù)網(wǎng)站營(yíng)銷運(yùn)營(yíng)等一系列服務(wù),幫助企業(yè)打造創(chuàng)新的互聯(lián)網(wǎng)品牌經(jīng)營(yíng)模式與有效的網(wǎng)絡(luò)營(yíng)銷方法,創(chuàng)造更大的價(jià)值。
一. 用數(shù)組來(lái)實(shí)現(xiàn)隊(duì)列
因?yàn)殛?duì)列這種數(shù)據(jù)結(jié)構(gòu)的特殊要求,所以它天然適合用鏈表的方式來(lái)實(shí)現(xiàn),用兩個(gè)變量分別記錄鏈表頭和鏈表尾,當(dāng)刪除或插入隊(duì)列時(shí),只要改變鏈表頭或鏈表尾就可以了,而且鏈表使用引用的方式鏈接的,所以它的容量幾乎是無(wú)限的。
那么怎么使用數(shù)組來(lái)實(shí)現(xiàn)隊(duì)列,我們需要四個(gè)變量:Object[] array來(lái)存儲(chǔ)隊(duì)列中元素,headIndex和tailIndex分別記錄隊(duì)列頭和隊(duì)列尾,count記錄隊(duì)列的個(gè)數(shù)。
這里用了一個(gè)很巧妙的方式,我們知道當(dāng)向隊(duì)列中插入一個(gè)元素,那么就占用了數(shù)組的一個(gè)位置,當(dāng)刪除一個(gè)元素的時(shí)候,那么其實(shí)數(shù)組的這個(gè)位置就空閑出來(lái)了,表示這個(gè)位置又可以插入新元素了。
所以我們插入新元素前,必須檢查隊(duì)列是否已滿,刪除元素之前,必須檢查隊(duì)列是否為空。
二. ArrayBlockingQueue中重要成員變量
/** 儲(chǔ)存隊(duì)列的中元素 */ final Object[] items; /** 隊(duì)列頭的位置 */ int takeIndex; /** 隊(duì)列尾的位置 */ int putIndex; /** 當(dāng)前隊(duì)列擁有的元素個(gè)數(shù) */ int count; /** 用來(lái)保證多線程操作共享變量的安全問(wèn)題 */ final ReentrantLock lock; /** 當(dāng)隊(duì)列為空時(shí),就會(huì)調(diào)用notEmpty的wait方法,讓當(dāng)前線程等待 */ private final Condition notEmpty; /** 當(dāng)隊(duì)列為滿時(shí),就會(huì)調(diào)用notFull的wait方法,讓當(dāng)前線程等待 */ private final Condition notFull;
就是多了lock、notEmpty、notFull變量來(lái)實(shí)現(xiàn)多線程安全和線程等待條件的,它們?nèi)齻€(gè)是怎么操作的呢?
2.1 lock的作用
因?yàn)锳rrayBlockingQueue是在多線程下操作的,所以修改items、takeIndex、putIndex和count這些成員變量時(shí),必須要考慮多線程安全問(wèn)題,所以這里使用lock獨(dú)占鎖,來(lái)保證并發(fā)操作的安全。
2.2 notEmpty與notFull的作用
因?yàn)樽枞?duì)列必須實(shí)現(xiàn),當(dāng)隊(duì)列為空或隊(duì)列已滿的時(shí)候,隊(duì)列的讀取或插入操作要等待。所以我們想到了并發(fā)框架下的Condition對(duì)象,使用它來(lái)控制。
在AQS中,我們介紹了這個(gè)類的作用:
三. 添加元素方法
3.1 add(E e)與offer(E e)方法:
// 調(diào)用AbstractQueue父類中的方法。 public boolean add(E e) { // 通過(guò)調(diào)用offer來(lái)時(shí)實(shí)現(xiàn) if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } //向隊(duì)列末尾新添加元素。返回true表示添加成功,false表示添加失敗,不會(huì)拋出異常 public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; // 使用lock來(lái)保證,多線程修改成員屬性的安全 lock.lock(); try { // 隊(duì)列已滿,添加元素失敗,返回false。 if (count == items.length) return false; else { // 調(diào)用enqueue方法將元素插入隊(duì)列中 enqueue(e); return true; } } finally { lock.unlock(); } }
add方法是調(diào)用offer方法實(shí)現(xiàn)的。在offer方法中,必須先判斷隊(duì)列是否已滿,如果已滿就直接返回false,而不會(huì)阻塞當(dāng)前線程。如果不滿就調(diào)用enqueue方法將元素插入隊(duì)列中。
注意:這里使用lock.lock()是保證同一時(shí)間只有一個(gè)線程修改成員變量,防止出現(xiàn)并發(fā)操作問(wèn)題。雖然它也會(huì)阻塞當(dāng)前線程,但是它并不是條件等待,只是因?yàn)殒i被其他線程持有,而ArrayBlockingQueue中方法操作時(shí)間都不長(zhǎng),這里相當(dāng)于不阻塞線程。
3.2 put方法
// 向隊(duì)列末尾新添加元素,如果隊(duì)列已滿,當(dāng)前線程就等待。響應(yīng)中斷異常 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; // 使用lock來(lái)保證,多線程修改成員屬性的安全 lock.lockInterruptibly(); try { // 隊(duì)列已滿,則調(diào)用notFull.await()方法,讓當(dāng)前線程等待,直到隊(duì)列不是滿的 while (count == items.length) notFull.await(); // 調(diào)用enqueue方法將元素插入隊(duì)列中 enqueue(e); } finally { lock.unlock(); } }
與offer方法大體流程一樣,只是當(dāng)隊(duì)列已滿的時(shí)候,會(huì)調(diào)用notFull.await()方法,讓當(dāng)前線程阻塞等待,直到隊(duì)列被別的線程移除了元素,隊(duì)列不滿的時(shí)候,會(huì)喚醒這個(gè)等待線程。
3.3 offer(E e, long timeout, TimeUnit unit)方法
/** * 向隊(duì)列末尾新添加元素,如果隊(duì)列中沒(méi)有可用空間,當(dāng)前線程就等待, * 如果等待時(shí)間超過(guò)timeout了,那么返回false,表示添加失敗 */ public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); // 計(jì)算一共最多等待的時(shí)間值nanos long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // 使用lock來(lái)保證,多線程修改成員屬性的安全 lock.lockInterruptibly(); try { // 隊(duì)列已滿 while (count == items.length) { // nanos <= 0表示最大等待時(shí)間已到,那么不用再等待了,返回false,表示添加失敗。 if (nanos <= 0) return false; // 調(diào)用notFull.awaitNanos(nanos)方法,超時(shí)nanos時(shí)間會(huì)被自動(dòng)喚醒, // 如果被提前喚醒,那么返回剩余的時(shí)間 nanos = notFull.awaitNanos(nanos); } // 調(diào)用enqueue方法將元素插入隊(duì)列中 enqueue(e); return true; } finally { lock.unlock(); } }
與put的方法大體流程一樣,只不過(guò)是調(diào)用notFull.awaitNanos(nanos)方法,讓當(dāng)前線程等待,并設(shè)置等待時(shí)間。
四. 刪除元素方法
4.1 remove()和poll()方法:
// 調(diào)用AbstractQueue父類中的方法。 public E remove() { // 通過(guò)調(diào)用poll來(lái)時(shí)實(shí)現(xiàn) E x = poll(); if (x != null) return x; else throw new NoSuchElementException(); } // 刪除隊(duì)列第一個(gè)元素(即隊(duì)列頭),并返回它。如果隊(duì)列是空的,它不會(huì)拋出異常,而是會(huì)返回null。 public E poll() { final ReentrantLock lock = this.lock; // 使用lock來(lái)保證,多線程修改成員屬性的安全 lock.lock(); try { // 如果count == 0,列表為空,就返回null,否則調(diào)用dequeue方法,返回列表頭元素 return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } }
remove方法是調(diào)用poll()方法實(shí)現(xiàn)的。在 poll()方法中,如果列表為空,就返回null,否則調(diào)用dequeue方法,返回列表頭元素。
4.2 take()方法
/** * 返回并移除隊(duì)列第一個(gè)元素,如果隊(duì)列是空的,就前線程就等待。響應(yīng)中斷異常 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; // 使用lock來(lái)保證,多線程修改成員屬性的安全 lock.lockInterruptibly(); try { // 如果隊(duì)列為空,就調(diào)用notEmpty.await()方法,讓當(dāng)前線程等待。 // 直到有別的線程向隊(duì)列中插入元素,那么這個(gè)線程會(huì)被喚醒。 while (count == 0) notEmpty.await(); // 調(diào)用dequeue方法,返回列表頭元素 return dequeue(); } finally { lock.unlock(); } }
take()方法當(dāng)隊(duì)列為空的時(shí)候,當(dāng)前線程必須等待,直到有別的線程向隊(duì)列中插入新元素,那么這個(gè)線程會(huì)被喚醒。
4.3 poll(long timeout, TimeUnit unit)方法
/** * 返回并移除隊(duì)列第一個(gè)元素,如果隊(duì)列是空的,就前線程就等待。 * 如果等待時(shí)間超過(guò)timeout了,那么返回false,表示獲取元素失敗 */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { // 計(jì)算一共最多等待的時(shí)間值nanos long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; // 使用lock來(lái)保證,多線程修改成員屬性的安全 lock.lockInterruptibly(); try { // 隊(duì)列為空 while (count == 0) { // nanos <= 0表示最大等待時(shí)間已到,那么不用再等待了,返回null。 if (nanos <= 0) return null; // 調(diào)用notEmpty.awaitNanos(nanos)方法讓檔期線程等待,并設(shè)置超時(shí)時(shí)間。 nanos = notEmpty.awaitNanos(nanos); } // 調(diào)用dequeue方法,返回列表頭元素 return dequeue(); } finally { lock.unlock(); } }
與take()方法流程一樣,只不過(guò)調(diào)用awaitNanos(nanos)方法,讓當(dāng)前線程等待,并設(shè)置等待時(shí)間。
五. 查看元素的方法
5.1 element()與peek() 方法
// 調(diào)用AbstractQueue父類中的方法。 public E element() { E x = peek(); if (x != null) return x; else throw new NoSuchElementException(); } // 查看隊(duì)列頭元素 public E peek() { final ReentrantLock lock = this.lock; // 使用lock來(lái)保證,多線程修改成員屬性的安全 lock.lock(); try { // 返回當(dāng)前隊(duì)列頭的元素 return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } }
六. 其他重要方法
6.1 enqueue和dequeue方法
// 將元素x插入隊(duì)列尾 private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; //當(dāng)前putIndex位置元素一定是null final Object[] items = this.items; items[putIndex] = x; // 如果putIndex等于items.length,那么將putIndex重新設(shè)置為0 if (++putIndex == items.length) putIndex = 0; // 隊(duì)列數(shù)量加一 count++; // 因?yàn)椴迦胍粋€(gè)元素,那么當(dāng)前隊(duì)列肯定不為空,那么喚醒在notEmpty條件下等待的一個(gè)線程 notEmpty.signal(); } // 刪除隊(duì)列頭的元素,返回它 private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; // 得到當(dāng)前隊(duì)列頭的元素 @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; // 將當(dāng)前隊(duì)列頭位置設(shè)置為null items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; // 隊(duì)列數(shù)量減一 count--; if (itrs != null) itrs.elementDequeued(); // 因?yàn)閯h除了一個(gè)元素,那么隊(duì)列肯定不滿了,那么喚醒在notFull條件下等待的一個(gè)線程 notFull.signal(); return x; }
這兩個(gè)方法分別代表,向隊(duì)列中插入元素和從隊(duì)列中刪除元素。而且它們要喚醒等待的線程。當(dāng)插入元素后,那么隊(duì)列一定不為空,那么就要喚醒在notEmpty條件下等待的線程。當(dāng)刪除元素后,那么隊(duì)列一定不滿,那么就要喚醒在notFull條件下等待的線程。
6.2 remove(Object o)方法
// 刪除隊(duì)列中對(duì)象o元素,最多刪除一條 public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; // 使用lock來(lái)保證,多線程修改成員屬性的安全 final ReentrantLock lock = this.lock; lock.lock(); try { // 當(dāng)隊(duì)列中有值的時(shí)候,才進(jìn)行刪除。 if (count > 0) { // 隊(duì)列尾下一個(gè)位置 final int putIndex = this.putIndex; // 隊(duì)列頭的位置 int i = takeIndex; do { // 當(dāng)前位置元素與被刪除元素相同 if (o.equals(items[i])) { // 刪除i位置元素 removeAt(i); // 返回true return true; } if (++i == items.length) i = 0; // 當(dāng)i==putIndex表示遍歷完所有元素 } while (i != putIndex); } return false; } finally { lock.unlock(); } }
從隊(duì)列中刪除指定對(duì)象o,那么就要遍歷隊(duì)列,刪除第一個(gè)與對(duì)象o相同的元素,如果隊(duì)列中沒(méi)有對(duì)象o元素,那么返回false刪除失敗。
這里有兩點(diǎn)需要注意:
如何遍歷隊(duì)列,就是從隊(duì)列頭遍歷到隊(duì)列尾。就要靠takeIndex和putIndex兩個(gè)變量了。
為什么Object[] items = this.items;這句代碼沒(méi)有放到同步鎖lock代碼塊內(nèi)。items是成員變量,那么多線程操作的時(shí)候,不會(huì)有并發(fā)問(wèn)題么?
這個(gè)是因?yàn)閕tems是個(gè)引用變量,不是基本數(shù)據(jù)類型,而且我們對(duì)隊(duì)列的插入和刪除操作,都是針對(duì)這一個(gè)items數(shù)組,沒(méi)有改變數(shù)組的引用,所以在lock代碼中,items會(huì)得到其他線程對(duì)它最新的修改。但是如果這里將int putIndex = this.putIndex;方法lock代碼塊外面,就會(huì)產(chǎn)生問(wèn)題。
removeAt(final int removeIndex)方法
// 刪除隊(duì)列removeIndex位置的元素 void removeAt(final int removeIndex) { // assert lock.getHoldCount() == 1; // assert items[removeIndex] != null; // assert removeIndex >= 0 && removeIndex < items.length; final Object[] items = this.items; // 表示刪除元素是列表頭,就容易多了,與dequeue方法流程差不多 if (removeIndex == takeIndex) { // 移除removeIndex位置元素 items[takeIndex] = null; // 到了數(shù)組末尾,就要轉(zhuǎn)到數(shù)組頭位置 if (++takeIndex == items.length) takeIndex = 0; // 隊(duì)列數(shù)量減一 count--; if (itrs != null) itrs.elementDequeued(); } else { // an "interior" remove final int putIndex = this.putIndex; for (int i = removeIndex;;) { int next = i + 1; if (next == items.length) next = 0; // 還沒(méi)有到隊(duì)列尾,那么就將后一個(gè)位置元素覆蓋前一個(gè)位置的元素 if (next != putIndex) { items[i] = items[next]; i = next; } else { // 將隊(duì)列尾元素置位null items[i] = null; // 重新設(shè)置putIndex的值 this.putIndex = i; break; } } // 隊(duì)列數(shù)量減一 count--; if (itrs != null) itrs.removedAt(removeIndex); } // 因?yàn)閯h除了一個(gè)元素,那么隊(duì)列肯定不滿了,那么喚醒在notFull條件下等待的一個(gè)線程 notFull.signal(); }
在隊(duì)列中刪除指定位置的元素。需要注意的是刪除之后的數(shù)組還能保持隊(duì)列形式,分為兩種情況:
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持創(chuàng)新互聯(lián)。