[TOC]
創(chuàng)新互聯(lián)建站主要從事成都網(wǎng)站建設(shè)、成都做網(wǎng)站、網(wǎng)頁設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)連平,10余年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):028-86922220
ArrayBlockingQueue 是一個(gè)用數(shù)組實(shí)現(xiàn)的有界隊(duì)列;此隊(duì)列按照先進(jìn)先出(FIFO)的規(guī)則對(duì)元素進(jìn)行排序;默認(rèn)情況下不保證線程公平的訪問隊(duì)列,所謂公平訪問隊(duì)列是指阻塞的線程,可以按照阻塞的先后順序的訪問隊(duì)列,即先阻塞的線程先訪問隊(duì)列;非公平性是對(duì)先等待的線程是非公平的,當(dāng)隊(duì)列可用時(shí),阻塞的線程都可以爭(zhēng)奪訪問隊(duì)列的資格,有可能先阻塞的線程最后才訪問;為了保證公平性,通常會(huì)降低吞吐量。
/** The queued items */
// 記錄數(shù)據(jù)的數(shù)組
final Object[] items;
/** items index for next take, poll, peek or remove */
// 索引用于 take,poll,peek,remove 等方法
int takeIndex;
/** items index for next put, offer, or add */
// 索引用于 put,offer,or add 等方法
int putIndex;
/** Number of elements in the queue */
// 總數(shù)
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
// 隊(duì)列的鎖
final ReentrantLock lock;
/** Condition for waiting takes */
// 用于讓線程等待,消費(fèi)時(shí)隊(duì)列為空
private final Condition notEmpty;
/** Condition for waiting puts */
// 用于讓線程等待,生產(chǎn)時(shí)隊(duì)列滿
private final Condition notFull;
我們看下兩個(gè)構(gòu)造,其實(shí)也就是一個(gè),注意沒有無參構(gòu)造,初始化時(shí)必須要給出容量。
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
// 初始化一個(gè)ArrayBlockingQueue
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
// 初始化一個(gè)數(shù)組
this.items = new Object[capacity];
// 初始化一個(gè)鎖
lock = new ReentrantLock(fair);
// 用來存放消費(fèi)者的阻塞線程
notEmpty = lock.newCondition();
// 用來存放生產(chǎn)者的線程
notFull = lock.newCondition();
}
可以看出add調(diào)用的是offer方法,詳情請(qǐng)看offer方法。
public boolean add(E e) {
// 調(diào)用父類的方法
return super.add(e);
}
// 父類 AbstractQueue 的add方法
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
注意:add 插入失敗會(huì)拋異常。
// offer加入元素
public boolean offer(E e) {
// 不能為null
checkNotNull(e);
// 獲取鎖
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果數(shù)組滿了,返回false
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
// enqueue
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
// 獲取數(shù)組
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 喚醒消費(fèi)阻塞的隊(duì)列
notEmpty.signal();
}
注意:offer還有一個(gè)重載方法,帶有超時(shí)時(shí)間的插入,支持中斷offer(E e, long timeout, TimeUnit unit)。
public void put(E e) throws InterruptedException {
// 不能為null
checkNotNull(e);
// 獲取鎖
final ReentrantLock lock = this.lock;
// 支持中斷
lock.lockInterruptibly();
try {
// 等于數(shù)組的容量
while (count == items.length)
// 等待
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
注意:put和前面的offer要區(qū)別,offer方法隊(duì)列滿是返回false,put方法是讓線程等待,根據(jù)自己的場(chǎng)景用合適的方法。
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
注意:poll也有一個(gè)重載方法,帶有超時(shí)和中斷poll(long timeout, TimeUnit unit)。
// 消費(fèi)
public E take() throws InterruptedException {
// 獲取鎖
final ReentrantLock lock = this.lock;
// 支持中斷
lock.lockInterruptibly();
try {
// 隊(duì)列為空
while (count == 0)
// 阻塞
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
注意:take和poll也是一對(duì)方法,poll隊(duì)列為空返回null,take是讓線程等待,直到喚醒。
// 獲取隊(duì)尾的元素 不刪除
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
// 統(tǒng)計(jì)個(gè)數(shù) size是準(zhǔn)確值
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
ArrayBlockingQueue 是有界的,所以我們?cè)诔跏蓟侨萘恳O(shè)計(jì)好,因?yàn)樗遣豢梢詳U(kuò)容的,還有我覺得這個(gè)隊(duì)列適合一些穩(wěn)定并發(fā)量的系統(tǒng),如果并發(fā)量突然變大,導(dǎo)致隊(duì)列滿,會(huì)造成大量的線程等待,影響系統(tǒng)的響應(yīng);我們通過閱讀源碼也發(fā)現(xiàn)隊(duì)列的源碼是很輕量的,使用起來也很簡單,讓人很好理解;使用這個(gè)隊(duì)列一定要注意put,offer,take,poll這兩組方法,根據(jù)自己的業(yè)務(wù)場(chǎng)景選擇是直接返回(響應(yīng)速度快)還是阻塞線程。
參考:《Java 并發(fā)編程的藝術(shù)》