[TOC]
讓客戶滿意是我們工作的目標(biāo),不斷超越客戶的期望值來自于我們對(duì)這個(gè)行業(yè)的熱愛。我們立志把好的技術(shù)通過有效、簡(jiǎn)單的方式提供給客戶,將通過不懈努力成為客戶在信息化領(lǐng)域值得信任、有價(jià)值的長(zhǎng)期合作伙伴,公司提供的服務(wù)項(xiàng)目有:域名與空間、網(wǎng)絡(luò)空間、營(yíng)銷軟件、網(wǎng)站建設(shè)、鄂城網(wǎng)站維護(hù)、網(wǎng)站推廣。
ConcurrentlinkedQueue 還是一個(gè)基于鏈表的,×××的,線程安全的單端隊(duì)列,它采用先進(jìn)先出(FIFO)的規(guī)則對(duì)節(jié)點(diǎn)進(jìn)行排序,當(dāng)我們加入一個(gè)元素時(shí),它會(huì)插入隊(duì)列的尾部,當(dāng)我們獲取元素時(shí),會(huì)從隊(duì)列的首部獲取元素。它沒有使用鎖來保證線程安全,使用的是“wait-free”算法來保證整個(gè)隊(duì)列的線程安全。
// 存儲(chǔ)的數(shù)據(jù)
volatile E item;
// 下一個(gè)節(jié)點(diǎn)引用
volatile Node next;
/**
* Constructs a new node. Uses relaxed write because item can
* only be seen after publication via casNext.
*/
// 構(gòu)造一個(gè)node節(jié)點(diǎn)
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
// 修改節(jié)點(diǎn)的item
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// 懶修改節(jié)點(diǎn)的next
void lazySetNext(Node val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
// cas修改節(jié)點(diǎn)的next節(jié)點(diǎn)
boolean casNext(Node cmp, Node val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
private transient volatile Node head;
private transient volatile Node tail;
public ConcurrentLinkedQueue() {
head = tail = new Node(null);
}
構(gòu)造節(jié)點(diǎn)是其實(shí)就是構(gòu)造了一個(gè)node的item為null的節(jié)點(diǎn),然后head和tail指向這個(gè)節(jié)點(diǎn),如下圖所示:
public boolean add(E e) {
return offer(e);
}
我們可以看出其實(shí)調(diào)用的是offer方法,具體參考o(jì)ffer方法的講解。
源碼解析:
public boolean offer(E e) {
// 入隊(duì)元素不能為null
checkNotNull(e);
// 創(chuàng)建新的節(jié)點(diǎn)
final Node newNode = new Node(e);
// 死循環(huán),設(shè)置節(jié)點(diǎn)
// p獲取尾節(jié)點(diǎn)
for (Node t = tail, p = t;;) {
// q是p的next節(jié)點(diǎn)
Node q = p.next;
// 獲取尾節(jié)點(diǎn)的next節(jié)點(diǎn)
// 尾節(jié)點(diǎn)沒有下一個(gè)節(jié)點(diǎn)
if (q == null) {
// p is last node
// 這一步說明p是尾節(jié)點(diǎn),新的節(jié)點(diǎn)設(shè)置為尾節(jié)點(diǎn)的next節(jié)點(diǎn)
if (p.casNext(null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
// 設(shè)置尾節(jié)點(diǎn),當(dāng)之前的尾節(jié)點(diǎn)和現(xiàn)在插入的節(jié)點(diǎn)之間有一個(gè)節(jié)點(diǎn)時(shí)
// 并不是每一次都cas設(shè)置尾節(jié)點(diǎn)(優(yōu)化手段,是怎么想到這種優(yōu)化的??)
if (p != t) // hop two nodes at a time
// cas設(shè)置尾節(jié)點(diǎn),可能會(huì)失敗
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
// 多線程操作時(shí)候,由于poll時(shí)候會(huì)把舊的head變?yōu)樽砸?,然后將head的next設(shè)置為新的head
// 所以這里需要重新找新的head
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
else
// 尋找尾節(jié)點(diǎn)
// Check for tail updates after two hops.
// p!=t
p = (p != t && t != (t = tail)) ? t : q;
}
}
分析插入過程,我們插入使用3個(gè)線程來調(diào)用offer 方法,ThreadA,ThreadB同時(shí)運(yùn)行,ThreadC最后插入,分析下offer方法的流程。
第一步,隊(duì)列屬于初始化狀態(tài),ThreadA,ThreadB同時(shí)調(diào)用offer方法;創(chuàng)建節(jié)點(diǎn),死循環(huán)設(shè)置節(jié)點(diǎn),獲取尾節(jié)點(diǎn)的next節(jié)點(diǎn),此時(shí)q== null,兩個(gè)線程都同時(shí)可能看見,然后cas設(shè)置尾節(jié)點(diǎn)的next節(jié)點(diǎn)(隊(duì)列狀態(tài)如圖A所示),我們假設(shè)是ThreadA線程cas設(shè)置成功了,然后p==t此時(shí)的尾節(jié)點(diǎn)其實(shí)沒有發(fā)生變化;此時(shí)我們來看ThreadB由于A成功了,所以ThreadB cas失敗了,重新循環(huán),此時(shí)q != null了,p == q顯然不等于,再看下一個(gè)else判斷p!=t,此時(shí)顯然p == t,所以才是p = q,然后再次循環(huán),此時(shí)的q==null,我們假設(shè)沒有線程來和ThreadB競(jìng)爭(zhēng),所以cas設(shè)置成功,然后p!=t嗎,顯然滿足所以設(shè)置尾節(jié)點(diǎn),此時(shí)的設(shè)置尾節(jié)點(diǎn)的節(jié)點(diǎn)和之前的尾節(jié)點(diǎn)之間剛剛好有一個(gè)節(jié)點(diǎn)(如圖B所示)。
第二步,ThreadC插入,此時(shí)的尾節(jié)點(diǎn)是ThreadB插入的節(jié)點(diǎn)假設(shè)是B,獲取B的next節(jié)點(diǎn),q == null,然后cas設(shè)置節(jié)點(diǎn),完成,p==t,所以不用更新尾節(jié)點(diǎn)(如圖C所示)。
注意:不會(huì)刪除元素,要和poll方法區(qū)別
public E peek() {
restartFromHead:
for (;;) {
for (Node h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
// 更新頭結(jié)點(diǎn)
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
public E poll() {
restartFromHead:
// 循環(huán)
for (;;) {
// 獲取頭結(jié)點(diǎn)
for (Node h = head, p = h, q;;) {
// 獲取節(jié)點(diǎn)的內(nèi)容
E item = p.item;
// item不為null ,使用cas設(shè)置item為空
if (item != null && p.casItem(item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
// 更新頭結(jié)點(diǎn),和尾節(jié)點(diǎn)一樣,不是每次都更新
// 頭結(jié)點(diǎn)item為null是,下個(gè)節(jié)點(diǎn)就必須更新頭結(jié)點(diǎn)
// 頭結(jié)點(diǎn)item不為null時(shí),規(guī)則和更新尾節(jié)點(diǎn)一樣
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// 那么獲取p節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),如果p節(jié)點(diǎn)的下一節(jié)點(diǎn)為null,則表明隊(duì)列已經(jīng)空了
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// p == q,說明別的線程調(diào)用了updateHead,
// 自己的next 指向了自己,重新循環(huán),獲取最新的頭結(jié)點(diǎn)
else if (p == q)
continue restartFromHead;
// 如果下一個(gè)元素不為空,則將頭節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)設(shè)置成頭節(jié)點(diǎn)
else
p = q;
}
}
}
final void updateHead(Node h, Node p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
分析我們按照offer時(shí)候元素來執(zhí)行poll方法,ThreadD和ThreadE同時(shí)執(zhí)行來分析下隊(duì)列的變化(主要分析p==q的產(chǎn)生)。
初始狀態(tài)(如圖C所示)
第一步,ThreadD和ThreadE執(zhí)行poll操作,item等于null,所以執(zhí)行執(zhí)行下面的操作(q = p.next) == null不等于,p == q不等于,所以p = q,其實(shí)就是上圖的ThreadA插入的節(jié)點(diǎn),此時(shí)的item已經(jīng)不為null了,所以執(zhí)行cas設(shè)置item為null的操作,假設(shè)ThreadD執(zhí)行成功了,那么此時(shí)p!=h就滿足了,所以此時(shí)要更新頭結(jié)點(diǎn)調(diào)用updateHead,這個(gè)方法會(huì)更新頭結(jié)點(diǎn),并且把原來的頭節(jié)點(diǎn)的next設(shè)置為自己)(如圖D所示);接下我們分析ThreadE,cas失敗了需要重新執(zhí)行,此時(shí)的item已經(jīng)不為null,所以執(zhí)行執(zhí)行下面的操作(q = p.next) == null不等于,p == q這使其實(shí)已經(jīng)是等于了,因?yàn)門hreadD改變了了以前頭結(jié)點(diǎn)的next節(jié)點(diǎn)為自己,所以需要重新遍歷,獲取最新的頭結(jié)點(diǎn),此時(shí)的頭結(jié)點(diǎn)其實(shí)就是ThreadA插入的節(jié)點(diǎn),然后item為null,接著執(zhí)行下面的判斷,最終p就是p.next節(jié)點(diǎn)也就是ThreadB節(jié)點(diǎn),然后cas設(shè)置item為null,由于p=p.next,所以p發(fā)生了變化,所以需要設(shè)置ThreadB為頭結(jié)點(diǎn)(如圖E所示)。
看到上面的執(zhí)行流程可能就有人有疑問了,這不是每次都更新頭結(jié)點(diǎn)嗎,沒有優(yōu)化啊,只看poll方法確實(shí)是這樣,那什么時(shí)候會(huì)產(chǎn)生不是每次都更新頭節(jié)點(diǎn)了,那就是當(dāng)頭節(jié)點(diǎn)的item不為null的時(shí)候,但是如果按初始化的狀況來看,頭結(jié)點(diǎn)的item一直是null,但是當(dāng)我看了peek方法之后才發(fā)現(xiàn),peek可以改變這個(gè)情況,可以設(shè)置item不為null的頭結(jié)點(diǎn),其實(shí)我們可以在poll方法前調(diào)用下peek方法,其實(shí)就啟動(dòng)了優(yōu)化策略,不是每次更新頭結(jié)點(diǎn),不知道作者是不是這么設(shè)計(jì)的,反正就是牛皮。
public int size() {
int count = 0;
for (Node p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
我們可以發(fā)現(xiàn)size沒有加鎖,就是遍歷了整個(gè)隊(duì)列,但是遍歷的同時(shí)可能在發(fā)生poll或者offer,所以size不是特別的精確,用的時(shí)候要注意。
ConcurrentLinkedQueue是×××的隊(duì)列,所以使用時(shí)一定要注意內(nèi)存溢出的問題,還有在執(zhí)行size方法時(shí)一定要注意這個(gè)是不準(zhǔn)確的值;在學(xué)poll和offer方法時(shí),一定要理解更新head和tail節(jié)點(diǎn)的時(shí)機(jī),這種優(yōu)化手段值得我們?nèi)W(xué)習(xí),我覺得這就是學(xué)習(xí)源碼的作用,就是學(xué)習(xí)作者的源碼思想。
參考《Java 并發(fā)編程的藝術(shù)》