一、簡(jiǎn)介
我們提供的服務(wù)有:做網(wǎng)站、成都做網(wǎng)站、微信公眾號(hào)開(kāi)發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、碌曲ssl等。為上1000家企事業(yè)單位解決了網(wǎng)站和推廣的問(wèn)題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的碌曲網(wǎng)站制作公司
前面的一篇文章我們介紹了ConcurrentHashMap1.7版本版本的源碼介紹,我們知道1.7版本的ConcurrentHashMap采用的是分段鎖的思想,提高了鎖的數(shù)量,提高了并發(fā)的特性,但是也有其局限性,例如就是并發(fā)的數(shù)量也就是鎖的數(shù)量是不可改變的等;我們今天要介紹的1.8版本的ConcurrentHashMap其實(shí)也是采用了多鎖的思想,不過(guò)在1.8中沒(méi)有了segments這些東西了,每次鎖住的數(shù)組中的一個(gè)元素或者桶(其實(shí)也就是數(shù)組或者樹(shù)的頭結(jié)點(diǎn)),然后鎖也和1.7發(fā)生變了,使用的是Synchronized鎖,1.8中的鎖是隨著數(shù)組的長(zhǎng)度發(fā)生變化的,提升了并發(fā)的數(shù)量的靈活性,還有就是1.8的數(shù)據(jù)結(jié)構(gòu)也發(fā)生了一些變化,采用的是數(shù)組+鏈表+紅黑樹(shù)(鏈表到達(dá)閾值會(huì)樹(shù)化),結(jié)構(gòu)如下圖所示:
二、基本成員
先介紹一些基本成員,只有了解了這些成員的概念后,才能去更好的去理解方法。
ConcurrentHashMap的一些成員變量
/** node數(shù)組的最大容量 2^30 */
private static final int MAXIMUM_CAPACITY = 1 << 30;
/** 默認(rèn)初始化值16,必須是2的冥 */
private static final int DEFAULT_CAPACITY = 16;
/** 虛擬機(jī)限制的最大數(shù)組長(zhǎng)度,在ArrayList中有說(shuō)過(guò),jdk1.8新引入的,需要與toArrar()相關(guān)方法關(guān)聯(lián) */
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
/** 并發(fā)數(shù)量,1.7遺留,兼容以前版本 */
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
/** 負(fù)載因子,兼容以前版本,構(gòu)造方法中指定的參數(shù)是不會(huì)被用作loadFactor的,為了計(jì)算方便,統(tǒng)一使用 n - (n >> 2) 代替浮點(diǎn)乘法 *0.75 */
private static final float LOAD_FACTOR = 0.75f;
/** 鏈表轉(zhuǎn)紅黑樹(shù),閾值>=8 */
static final int TREEIFY_THRESHOLD = 8;
/** 樹(shù)轉(zhuǎn)鏈表閥值,小于等于6(tranfer時(shí),lc、hc=0兩個(gè)計(jì)數(shù)器分別++記錄原bin、新binTreeNode數(shù)量,
* <=UNTREEIFY_THRESHOLD 則untreeify(lo))
*/
static final int UNTREEIFY_THRESHOLD = 6;
/** 鏈表轉(zhuǎn)紅黑樹(shù)的閾值,64(map容量小于64時(shí),鏈表轉(zhuǎn)紅黑樹(shù)時(shí)先進(jìn)行擴(kuò)容) */
static final int MIN_TREEIFY_CAPACITY = 64;
/** 下面這三個(gè)和多線程協(xié)助擴(kuò)容有關(guān) */
/** // 擴(kuò)容操作中,transfer這個(gè)步驟是允許多線程的,這個(gè)常量表示一個(gè)線程執(zhí)行transfer時(shí),最少要對(duì)連續(xù)的16個(gè)hash桶進(jìn)行transfer
// (不足16就按16算,多控制下正負(fù)號(hào)就行)
private static final int MIN_TRANSFER_STRIDE = 16;
/** 生成sizeCtl所使用的bit位數(shù) */
private static int RESIZE_STAMP_BITS = 16;
/** 參與擴(kuò)容的最大線程數(shù) */
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
/** 移位量,把生成戳移位后保存在sizeCtl中當(dāng)做擴(kuò)容線程計(jì)數(shù)的基數(shù),相反方向移位后能夠反解出生成
戳 */
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
/*
* Encodings for Node hash fields. See above for explanation.
*/
static final int MOVED = -1; // 表示正在轉(zhuǎn)移
static final int TREEBIN = -2; // 表示已經(jīng)轉(zhuǎn)換為樹(shù)
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
/** 可用處理器數(shù)量 */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/** 用于存放node數(shù)組 */
transient volatile Node[] table;
/**
* baseCount為并發(fā)低時(shí),直接使用cas設(shè)置成功的值
* 并發(fā)高,cas競(jìng)爭(zhēng)失敗,把值放在counterCells數(shù)組里面的counterCell里面
* 所以map.size = baseCount + (每個(gè)counterCell里面的值累加)
*/
private transient volatile long baseCount;
/**
* 控制標(biāo)識(shí)符,用來(lái)控制table的初始化和擴(kuò)容的操作,不同的值有不同的含義
* 當(dāng)為負(fù)數(shù)時(shí):-1代表正在初始化,-N就代表在擴(kuò)容,-N-RS-2就代表有多少個(gè)線程在協(xié)助擴(kuò)容
* 當(dāng)為0時(shí):代表當(dāng)時(shí)的table還沒(méi)有被初始化
* 當(dāng)為正數(shù)時(shí):表示初始化或者下一次進(jìn)行擴(kuò)容的大小
*/
private transient volatile int sizeCtl;
/**
* 通過(guò)cas實(shí)現(xiàn)的鎖,0 無(wú)鎖,1 有鎖
*/
private transient volatile int cellsBusy;
/**
* counterCells數(shù)組,具體的值在每個(gè)counterCell里面
*/
private transient volatile CounterCell[] counterCells;
Node,內(nèi)部類,主要用于存儲(chǔ)鍵值,有ForwardingNode、ReservationNode、TreeNode和TreeBin四個(gè)子類,具體在后面代碼用到的時(shí)候講。
static class Node implements Entry {
final int hash;
final K key;
// val和next 在擴(kuò)容時(shí)可能發(fā)生變化,加上volatile關(guān)鍵字,提供可見(jiàn)性與重排序
volatile V val;
volatile Node next;
// 不允許修改val
public final V setValue(V value) {
throw new UnsupportedOperationException();
}
三、主要方法
上面介紹了一些基本成員,主要介紹一些常用方法,只有紅黑樹(shù)相關(guān)方法占時(shí)不講(還沒(méi)有搞明白紅黑樹(shù),嘿嘿)。
①、構(gòu)造方法
**
* 無(wú)參構(gòu)造
*/
public ConcurrentHashMap() {
}
/**
* 指定初始化大小的構(gòu)造,不能小于0
* @param initialCapacity
*/
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
// cap必須是2的n次方,
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
②、初始化方法(采用延遲初始化,在put方法里面)
private final Node[] initTable() {
Node[] tab; int sc;
while ((tab = table) == null || tab.length == 0) { // 空的table 才能初始化
if ((sc = sizeCtl) < 0) // 表示其它線程正在初始化或者擴(kuò)容
// 當(dāng)前線程把執(zhí)行權(quán)交給其它線程(擁有相同優(yōu)先級(jí)的線程),然后變成可運(yùn)行狀態(tài)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { // 原子操作表示把SIZECTL設(shè)置為-1,正在初始化
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 初始化大小
@SuppressWarnings("unchecked")
Node[] nt = (Node[])new Node,?>[n]; // 初始化
table = tab = nt;
sc = n - (n >>> 2); // 下一次擴(kuò)容閾值 n*0,75
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
③、put方法(重要,其中的擴(kuò)容方法比較難理解)
分析:
1)、我們可以通過(guò)源碼判斷key和value不允許為null。
2)、需要判斷table有沒(méi)有初始化,沒(méi)有調(diào)用initTable初始化,然后接著循環(huán)。
3)、判斷key的hash(調(diào)用spread方法)的位置有沒(méi)有值,證明是第一個(gè),使用cas設(shè)置,為什么cas,可能不止一個(gè)線程。
4)、判斷當(dāng)前線程的hash是不是MOVED,其實(shí)就是節(jié)點(diǎn)是不是ForwardingNode節(jié)點(diǎn),F(xiàn)orwardingNode代表正在擴(kuò)容,至于為什么會(huì)是ForwardingNode,這個(gè)在擴(kuò)容的方法里面再講,如果是ForwardingNode節(jié)點(diǎn)就協(xié)助擴(kuò)容,也就是當(dāng)前也去擴(kuò)容,然后擴(kuò)容完畢,在執(zhí)行循環(huán),協(xié)助擴(kuò)容執(zhí)行helpTransfer方法。
5)、如果不是擴(kuò)容、table也初始化了和hash位置也有值了,那證明當(dāng)前hash的位置是鏈表或者樹(shù),接下來(lái)鎖住這個(gè)節(jié)點(diǎn),進(jìn)行鏈表或者樹(shù)的節(jié)點(diǎn)的追加,如果存在相同的key,就替換,最后釋放鎖。
6)、判斷鏈表的節(jié)點(diǎn)數(shù),有沒(méi)有大于等于8,滿足就樹(shù)化,調(diào)用treeifyBin方法,這個(gè)方法會(huì)在樹(shù)化前判斷大于等于64嗎,沒(méi)有就擴(kuò)容,調(diào)用tryPresize方法,有就樹(shù)化。
7)、修改節(jié)點(diǎn)的數(shù)量,調(diào)用addCount方法。
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 不允許key或者value 為null
if (key == null || value == null) throw new NullPointerException();
// 獲取hash
int hash = spread(key.hashCode());
int binCount = 0;
// 遍歷table,死循環(huán),直到插入成功
for (Node[] tab = table;;) {
Node f; int n, i, fh;
if (tab == null || (n = tab.length) == 0) // table 還沒(méi)有初始化
tab = initTable(); // 初始化
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 當(dāng)前位置為空 ,直接插入
if (casTabAt(tab, i, null, // 使用cas來(lái)進(jìn)行設(shè)置
new Node(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED) // 如果在進(jìn)行擴(kuò)容,則先進(jìn)行協(xié)助擴(kuò)容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 沒(méi)有在擴(kuò)容,頭結(jié)點(diǎn)也不是空,
// 鎖住鏈表或者樹(shù)的頭節(jié)點(diǎn)
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { // 普通Node的hash值為key的hash值大于零,而ForwardingNode的是-1,TreeBin是-2
binCount = 1;
// 遍歷鏈表
for (Node e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) { // 找到了相同的key
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value; // 替換value 結(jié)束循環(huán)
break;
}
Node pred = e;
if ((e = e.next) == null) { // 找到最后一個(gè)節(jié)點(diǎn)
pred.next = new Node(hash, key,
value, null); // 把當(dāng)前節(jié)點(diǎn)設(shè)置為最后一個(gè)節(jié)點(diǎn)的next
break;
}
}
}
else if (f instanceof TreeBin) { // 如果是樹(shù)結(jié)構(gòu)
Node p;
binCount = 2;
if ((p = ((TreeBin)f).putTreeVal(hash, key,
value)) != null) { // 樹(shù)節(jié)點(diǎn)插入,存在就替換
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) // 如果鏈表大于等于8,樹(shù)化
treeifyBin(tab, i); // 樹(shù)化
if (oldVal != null) // 證明存在相同的key,是替換return舊值
return oldVal;
break;
}
}
}
addCount(1L, binCount); //數(shù)量加1
return null;
}
注:上面的put方法用到了initTable、helpTransfer、treeifyBin、tryPresize和addCount方法,接下來(lái)我們按照程序的流程講解下這個(gè)幾個(gè)方法。
initTable方法,這個(gè)在前文講過(guò)了。
helpTransfer方法幫助擴(kuò)容
分析:
1)、這里得講一下resizeStamp方法Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)),其實(shí)這個(gè)方法就是獲取table的length的二進(jìn)制的最高位的前面0的個(gè)數(shù),然后|上2^15,舉個(gè)例子吧,假如現(xiàn)在的length為16,那么二進(jìn)制是多少了16是2^4,所以二進(jìn)制10000,所以其實(shí)就是27 | 2 ^ 15,這里還得順便講一下MAXIMUM_CAPACITY = 1 << 30(為什么是2^30了),因?yàn)檫@是正數(shù)的最大值,那么再給這個(gè)數(shù)加上一些值會(huì)發(fā)生什么了(其實(shí)這可能就是為什么要去二進(jìn)制最高位前面0的個(gè)數(shù)的原因)。
2)、怎么判斷的擴(kuò)容已經(jīng)開(kāi)始了,我們知道sizectl為-1是代表正在初始化,大于0表示已經(jīng)初始化,如下方法也判斷了size小于0,那么什么時(shí)候sizectl還會(huì)為負(fù)數(shù)了,其實(shí)開(kāi)始擴(kuò)容的時(shí)候(參考addCount和tryPresize方法,方方法里面都有(rs << RESIZE_STAMP_SHIFT) + 2,這里rs就是上面的resizeStamp的返回值,其實(shí)就是左移16位,int的正數(shù)的最大值是2^30,再給他加值會(huì)變成負(fù)數(shù),對(duì)rs右移在累加顯然已經(jīng)大于了2^30,所以他是負(fù)數(shù)),由此判斷出擴(kuò)容已經(jīng)開(kāi)始。
3)、什么時(shí)候協(xié)助擴(kuò)容了,當(dāng)前是擴(kuò)容開(kāi)始了,但是還沒(méi)結(jié)束,所以下面的滿足sc小于的里面的第一if就是判斷擴(kuò)容有沒(méi)有完成,第二if就是使用cas加入?yún)f(xié)助擴(kuò)容的過(guò)程。
4)、transfer方法,我們?cè)诤竺嬖斀狻?/p>
/**
* 幫助擴(kuò)容
*/
final Node[] helpTransfer(Node[] tab, Node f) {
Node[] nextTab; int sc;
// 原table不等于空,當(dāng)前節(jié)點(diǎn)必須是fwd節(jié)點(diǎn),nextTab已經(jīng)初始化
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode)f).nextTable) != null) {
// 其實(shí)就是去tab.length二進(jìn)制最高位前面有多少個(gè)0,然后 | 1 << 15
int rs = resizeStamp(tab.length);
// nextTab和成員變量一樣,table也一樣,sizeCtl<0,表示在擴(kuò)容
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// sc >>> RESIZE_STAMP_SHIFT) != rs 說(shuō)明擴(kuò)容完畢或者有其它協(xié)助擴(kuò)容者
// sc == rs + 1 表示只剩下最后一個(gè)擴(kuò)容線程了,其它都擴(kuò)容完畢了
// transferIndex <= 0 擴(kuò)容結(jié)束了
// sc == rs + MAX_RESIZERS 到達(dá)最大值
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 當(dāng)前線程參加擴(kuò)容,sc+1
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
treeifyBin方法,在table的length小于64時(shí)會(huì)調(diào)用tryPresize 先進(jìn)行擴(kuò)容,調(diào)用tryPresize方法,在下文會(huì)進(jìn)行解釋。
private final void treeifyBin(Node[] tab, int index) {
Node b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // 當(dāng)前table的length小于64,就擴(kuò)容
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode hd = null, tl = null;
for (Node e = b; e != null; e = e.next) {
TreeNode p =
new TreeNode(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin(hd));
}
}
}
}
}
tryPresize 方法,這里的邏輯和helpTransfer都差不多,至于transfer方法主菜我們?cè)诤竺嫔?/strong>。
private final void tryPresize(int size) {
// 計(jì)算擴(kuò)容的size
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
int sc;
// 證明table已經(jīng)初始了或者還沒(méi)有初始化
while ((sc = sizeCtl) >= 0) {
Node[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {// 證明還沒(méi)有初始化,需要初始化
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node[] nt = (Node[])new Node,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
// 大于最大容量返回
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
// 已經(jīng)初始化,并且沒(méi)有大于最大容量
else if (tab == table) {
int rs = resizeStamp(n);
// 判斷是否需要協(xié)助擴(kuò)容
if (sc < 0) {
Node[] nt;
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 開(kāi)始擴(kuò)容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}
transfer擴(kuò)容方法,比較難理解的一個(gè)方法
分析:
1)、看stride這個(gè)參數(shù)其實(shí)就是算每個(gè)線程處理的數(shù)量,和CPU有關(guān),最小是16.
2)、初始化一個(gè)原來(lái)二倍的新table就是 nextTable,然后這個(gè)過(guò)程可能會(huì)出錯(cuò),n<<1可能為負(fù)數(shù),設(shè)置nextTable和transferIndex,其中transferIndex就是原table的長(zhǎng)度。
3)、初始化一個(gè)ForwardingNode節(jié)點(diǎn)在后面會(huì)用到。
4)、死循環(huán)for,這個(gè)循環(huán)就是為每個(gè)線程分配任務(wù),然后每個(gè)線程處理各自的任務(wù),倒敘分配,舉個(gè)例子,加入table.length=32,現(xiàn)在的stride為16,第一個(gè)線程其實(shí)就是32到16(不包含32,因?yàn)槭撬饕?,第二個(gè)線程就是0-15,參考這一段代碼((U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex,nextBound = (nextIndex > stride ?nextIndex - stride : 0)))),然后遍歷每個(gè)段,處理節(jié)點(diǎn),知道處理完成,具體邏輯參考代碼注釋。
/**
* 擴(kuò)容方法
*/
private final void transfer(Node[] tab, Node[] nextTab) {
int n = tab.length, stride;
// n >>> 3(也就是除以8) / cpu個(gè)數(shù),每個(gè)cpu的每個(gè)線程負(fù)責(zé)的遷移的數(shù)量
// 這樣的目的是為了每個(gè)cpu處理的桶一樣多,避免出現(xiàn)任務(wù)轉(zhuǎn)移不均勻的現(xiàn)象,如果桶少的話,默認(rèn)一個(gè)cpu(一個(gè)線程)處理16個(gè)桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 擴(kuò)容table 沒(méi)有初始化
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node[] nt = (Node[])new Node,?>[n << 1]; // 初始化原來(lái)的length兩倍的table
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 初始化失敗,使用integer的最大值
sizeCtl = Integer.MAX_VALUE;
return; // 結(jié)束
}
// 更新成員變量
nextTable = nextTab;
// 更新轉(zhuǎn)移下標(biāo),就是運(yùn)來(lái)的table的length
transferIndex = n;
}
// 新table的length
int nextn = nextTab.length;
// 創(chuàng)建一個(gè)fwd節(jié)點(diǎn),用于占位.當(dāng)別的節(jié)點(diǎn)發(fā)現(xiàn)這個(gè)槽位中有fwd節(jié)點(diǎn)時(shí),則跳過(guò)這個(gè)節(jié)點(diǎn)
// 它的hash為MOVED
ForwardingNode fwd = new ForwardingNode(nextTab);
// 首次推進(jìn)為true,如果為true說(shuō)明需要再次推進(jìn)一個(gè)目標(biāo)(i--),反之如果是false,那么就不能推進(jìn)下標(biāo),需要將當(dāng)前的下標(biāo)處理完畢
boolean advance = true;
// 完成狀態(tài),如果為true,就結(jié)束方法
boolean finishing = false; // to ensure sweep before committing nextTab
// 死循環(huán),因?yàn)槭堑怪闅v,所以i是點(diǎn)前線程的最大位置(i---),bound是邊界,也就是區(qū)間里面的最小值
for (int i = 0, bound = 0;;) {
Node f; int fh;
// 如果當(dāng)前線程可以向后推進(jìn),這個(gè)循環(huán)就是控制i遞減.同時(shí)每個(gè)線程都會(huì)進(jìn)入這里取得自己需要轉(zhuǎn)移的桶的下標(biāo)區(qū)間
// 1. true
while (advance) {
int nextIndex, nextBound;
// 1. -1 >= 0,false
if (--i >= bound || finishing)
advance = false;
//transferIndex <= 0 說(shuō)明已經(jīng)沒(méi)有需要遷移的桶了
// 1.nextIndex = 16 <= 0
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
//更新 transferIndex
//為當(dāng)前線程分配任務(wù),處理的桶結(jié)點(diǎn)區(qū)間為(nextBound,nextIndex)
// 1.16 > 16 ? 16 -16 : 0 區(qū)間 16 到0
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound; // 0
i = nextIndex - 1;// 15
advance = false;
}
}
// i = 15 nextn = 32
// i < 0 ,表示數(shù)據(jù)遷移已經(jīng)完成
// i >= n 和 i + n >= nextn 表示最后一個(gè)線程也執(zhí)行完成了,擴(kuò)容完成了
// 第二個(gè)if里面的i=n
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) { // 完成擴(kuò)容
nextTable = null; // 刪除成員變量
table = nextTab; // 更新table
sizeCtl = (n << 1) - (n >>> 1); // 更新閾值
return;
}
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // 表示一個(gè)線程退出擴(kuò)容
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) // 說(shuō)明還有其他線程正在擴(kuò)容
return; // 當(dāng)前線程結(jié)束
// 當(dāng)前線程為最后一個(gè)線程,負(fù)責(zé)在檢查一個(gè)整個(gè)隊(duì)列
finishing = advance = true; //
i = n; // recheck before commit
}
}
// 待遷移桶為null,用cas把當(dāng)前節(jié)點(diǎn)設(shè)置為ForwardingNode節(jié)點(diǎn),表示已經(jīng)處理
else if ((f = tabAt(tab, i)) == null) // 第一個(gè)線程 獲取i處的數(shù)據(jù)為null,
advance = casTabAt(tab, i, null, fwd);// 設(shè)置當(dāng)前節(jié)點(diǎn)為 fwd 節(jié)點(diǎn)
else if ((fh = f.hash) == MOVED) // 如果當(dāng)前節(jié)點(diǎn)為 MOVED,說(shuō)明已經(jīng)處理過(guò)了,直接跳過(guò)
advance = true; // already processed
else {
// 節(jié)點(diǎn)不為空,鎖住i位置的頭結(jié)點(diǎn)
synchronized (f) {
if (tabAt(tab, i) == f) {
Node ln, hn;
if (fh >= 0) { // 表示是鏈表
int runBit = fh & n; // fn表示f.hash & n ,表示獲取原來(lái)table的位置
Node lastRun = f; // lastRun = f
for (Node p = f.next; p != null; p = p.next) {
int b = p.hash & n; // 獲取節(jié)點(diǎn)的位置
if (b != runBit) { // 如果這個(gè)節(jié)點(diǎn)和上一個(gè)節(jié)點(diǎn)的位置不一樣,記錄節(jié)點(diǎn)和位置
runBit = b; // 當(dāng)前節(jié)點(diǎn)的位置
lastRun = p; // 當(dāng)前節(jié)點(diǎn)
}
}
// 不管runBit有沒(méi)有發(fā)生變化,只可能是0或者n,
// ln表示的不變化的節(jié)點(diǎn)
// hn表示的是變化節(jié)點(diǎn)的位置
if (runBit == 0) { // 如果是0,那么ln=lastRun就是位置沒(méi)有變的這條鏈 hn=null變化鏈需要遍歷重組
ln = lastRun;
hn = null;
}
else { // 如果當(dāng)前節(jié)點(diǎn)不是0,hn=lastRun這個(gè)變化鏈,ln=null沒(méi)有變化的鏈需要遍歷重組
hn = lastRun;
ln = null;
}
for (Node p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node(ph, pk, pv, ln);
else
hn = new Node(ph, pk, pv, hn);
}
// 原來(lái)位置
setTabAt(nextTab, i, ln);
// 變化位置
setTabAt(nextTab, i + n, hn);
// 原來(lái)table的位置設(shè)置fwd節(jié)點(diǎn),表示擴(kuò)容
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
TreeBin t = (TreeBin)f;
TreeNode lo = null, loTail = null;
TreeNode hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode p = new TreeNode
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
addCount主要是用于修改map的size,和擴(kuò)容用的,這里我們只看后半部分?jǐn)U容部分,修改count部分在map的size方法講解
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 嘗試使用cas更新baseCount失敗
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// 在counterCells沒(méi)有初始化,或者嘗試cas更新當(dāng)前線程的CounterCell失敗時(shí)
// 調(diào)用fullAddCount更新
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
// check >= 0,新加入一個(gè)值
if (check >= 0) {
Node[] tab, nt; int n, sc;
// s代表了 現(xiàn)在map的數(shù)據(jù)量
// sc= 12 ,證明剛剛初始化,沒(méi)有進(jìn)行擴(kuò)容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null && // 當(dāng)前容量大于sc,table已經(jīng)有值,table的cap小于最大cap
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n); // 這一步不好理解,
// Integer.numberOfLeadingZeros(n) 其實(shí)就是最高位前面有多少個(gè)0,n代表table的長(zhǎng)度
// | (1 << (RESIZE_STAMP_BITS - 1)) 2^15 二進(jìn)制16位,第16位1,其余15位0
// 其實(shí)就是相加
// 表示正在擴(kuò)容
if (sc < 0) {
// sc >>> RESIZE_STAMP_SHIFT // 擴(kuò)容結(jié)束
// 只有最后一個(gè)線程在擴(kuò)容
// sc == rs + MAX_RESIZERS 達(dá)到最大數(shù)
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// sc 加1,表示有一個(gè)線程,協(xié)助參加了擴(kuò)容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 表示沒(méi)有正在進(jìn)行擴(kuò)容,開(kāi)始擴(kuò)容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
④、get方法(get方法的邏輯相對(duì)就要簡(jiǎn)單點(diǎn)了,請(qǐng)看代碼注釋)
// get方法
public V get(Object key) {
Node[] tab; Node e, p; int n, eh; K ek;
// 獲取hash值
int h = spread(key.hashCode());
// table不為null,table已經(jīng)初始化,通過(guò)hash查找的node不為nul
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) { // hash相等
if ((ek = e.key) == key || (ek != null && key.equals(ek))) // 找到了相同的key
return e.val; // 返回當(dāng)前e的value
}
else if (eh < 0) // hash小于0,說(shuō)明是特殊節(jié)點(diǎn)(TreeBin或ForwardingNode)調(diào)用find
return (p = e.find(h, key)) != null ? p.val : null;
// 不是上面的情況,那就是鏈表了,遍歷鏈表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
⑤、size方法(和1.7的處理方式截然不同)
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
CounterCell[]這個(gè)數(shù)組就是記錄的map的count,這里就不得不講一下addCount方法的前半部分,只有理解了每次添加一個(gè)元素,count是怎么處理的,才能明白為什么要有這個(gè)數(shù)組
CounterCell 內(nèi)部類
static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
addCount方法,只看前半部分,我們可以看出其實(shí)修改map的count,先是使用cas修改basecount,然后可能存在多個(gè)線程同時(shí)修改,所以會(huì)失敗,失敗就用CounterCell[]數(shù)組處理,調(diào)用fullAddCount方法。
/**
*
* @param x 1L
* @param check 默認(rèn)值是0,等于0時(shí),代表插入為null
* 不等于0時(shí),check等于2代表了樹(shù),其它代表了鏈表
*/
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 嘗試使用cas更新baseCount失敗
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// 在counterCells沒(méi)有初始化,或者嘗試cas更新當(dāng)前線程的CounterCell失敗時(shí)
// 調(diào)用fullAddCount更新
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
}
}
fullAddCount,主要用來(lái)記錄競(jìng)爭(zhēng)導(dǎo)致的basecount修改失敗的這些操作,其實(shí)主要就是把這些失敗的次數(shù)記錄在CounterCell[]數(shù)組里面,然后在統(tǒng)計(jì)size時(shí),就是basecount+CounterCell[]里面的次數(shù)。
分析:CounterCell[]可以看做是一個(gè)map,因?yàn)楹枚嗵幚矸椒ê蚼ap類似,我們先來(lái)看下數(shù)組的長(zhǎng)度,默認(rèn)是2,其實(shí)是可以擴(kuò)容的,每次擴(kuò)容2倍(擴(kuò)容不能超過(guò)cpu的數(shù)量),然后怎么插入值了,和map類似都需要確定位置,那么怎么確定位置了,map是通過(guò)key的hashcode,而這個(gè)數(shù)組是通過(guò)一個(gè)并發(fā)隨機(jī)數(shù)ThreadLocalRandom來(lái)說(shuō)生成一個(gè)隨機(jī)數(shù),然后通過(guò)這隨機(jī)數(shù)&數(shù)組的長(zhǎng)度減一確定位置,是不是很map一樣,還有就是他沒(méi)有鏈表這個(gè)概念,那沖突了怎么辦了,其實(shí)就是累加,這個(gè)數(shù)組還有鎖的概念就是cellsBusy,因?yàn)檫@里可能也是多個(gè)線程來(lái)執(zhí)行,等于零就表示沒(méi)有鎖,等于一就表示有鎖,在插入新值、擴(kuò)容和創(chuàng)建數(shù)組這些操作都需要獲取鎖,具體的方法的概念就到這里,具體的邏輯參考代碼注釋。
/**
*
* @param x 需要更新的值
* @param wasUncontended 是否發(fā)生競(jìng)爭(zhēng)
*/
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
// 初始化一個(gè)隨機(jī)值
// ThreadLocalRandom是JDK 7之后提供并發(fā)產(chǎn)生隨機(jī)數(shù),能夠解決多個(gè)線程發(fā)生的競(jìng)爭(zhēng)爭(zhēng)奪。
if ((h = ThreadLocalRandom.getProbe()) == 0) {
// 為當(dāng)前線程初始化一個(gè)隨機(jī)值
ThreadLocalRandom.localInit(); // force initialization
// 獲取這個(gè)值
h = ThreadLocalRandom.getProbe();
// 由于重新生成了probe,未沖突標(biāo)志位設(shè)置為true
wasUncontended = true;
}
// 沖突標(biāo)志位,決定了擴(kuò)容還是不擴(kuò)容
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
// counterCells數(shù)組已經(jīng)被初始化了
if ((as = counterCells) != null && (n = as.length) > 0) {
// 求在counterCells中的位置,與hash一樣求%,因?yàn)閏ounterCells數(shù)組長(zhǎng)度是2的冥
if ((a = as[(n - 1) & h]) == null) { // 當(dāng)前位置沒(méi)有CounterCell
if (cellsBusy == 0) {// Try to attach new Cell
// 創(chuàng)建新的CounterCell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 && // cellsBusy=0還沒(méi)有加鎖,使用cas進(jìn)行加鎖,cellsBusy設(shè)置為1
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r; // 放進(jìn)數(shù)組
created = true;
}
} finally {
cellsBusy = 0;
}
// 操作成功,退出死循環(huán)
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// 在調(diào)用fullAddCount之前就發(fā)生了競(jìng)爭(zhēng)
// 然后wasUncontended=true,未發(fā)生競(jìng)爭(zhēng),然后重新循環(huán)更新
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 當(dāng)前位置的CounterCell不為空,進(jìn)行累加
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
// 數(shù)組被擴(kuò)容了
// 數(shù)組大于了cpu數(shù)量
// 設(shè)置沖突標(biāo)志, collide = false,防止擴(kuò)容
else if (counterCells != as || n >= NCPU)
collide = false;
// 設(shè)置沖突標(biāo)志,重新執(zhí)行循環(huán)
// 如果下次循環(huán)執(zhí)行到該分支,并且沖突標(biāo)志仍然為true
// 那么會(huì)跳過(guò)該分支,到下一個(gè)分支進(jìn)行擴(kuò)容// At max size or stale
// 這個(gè)位置決定了擴(kuò)容還是不擴(kuò)容 false就不擴(kuò)容,true就擴(kuò)容
else if (!collide)
collide = true;
// 擴(kuò)容,CAS設(shè)置cellsBusy值
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
// 容量擴(kuò)大一倍
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 為當(dāng)前線程重新計(jì)算probe
h = ThreadLocalRandom.advanceProbe(h);
}
// 證明數(shù)組為空
// 獲取鎖,初始化數(shù)組
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
if (counterCells == as) { // 沒(méi)有被其它線程初始化
// 初始化,默認(rèn)長(zhǎng)度2
CounterCell[] rs = new CounterCell[2];
// 創(chuàng)建新的CounterCell(x),位置為rs[h&(2-1)]
rs[h & 1] = new CounterCell(x);
// 賦值給成員變量counterCells
counterCells = rs;
// 初始化成功
init = true;
}
} finally {
// 釋放鎖
cellsBusy = 0;
}
if (init)
// 結(jié)束循環(huán)
break;
}
// 證明在CounterCell上也存在競(jìng)爭(zhēng),那么嘗試對(duì)baseCount進(jìn)行更新
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
四、總結(jié)
本文主要基于jdk1.8介紹了ConcurrentHashMap的一部分常用方法,主要講了get、put和size者三個(gè)常用方法,其中比較難理解的是put方法,其中在擴(kuò)容和協(xié)助擴(kuò)容作者的設(shè)計(jì)讓人眼前一亮,大師就是大師,絕對(duì)值得你去一探究竟,還有就是size也采用了分治的思想(不知道這個(gè)詞合適不,個(gè)人理解),就是統(tǒng)計(jì)累加count時(shí),沒(méi)有競(jìng)爭(zhēng)的單獨(dú)處理,有競(jìng)爭(zhēng)的單獨(dú)處理,而沒(méi)有采用自旋,極大的提升了效率;1.8和1.7 的區(qū)別很大,首先數(shù)據(jù)結(jié)構(gòu)發(fā)生變化,其次鎖也發(fā)生了變化、擴(kuò)容側(cè)率和size的統(tǒng)計(jì)等;最后附上學(xué)習(xí)學(xué)習(xí)Map和ConcurrentHashMap的一點(diǎn)小建議,如果想從1.7和1.8兩個(gè)版本看,建議從這個(gè)方向(jdk版本一樣,數(shù)據(jù)結(jié)構(gòu)基本沒(méi)有發(fā)生變化)HashMap 1.7>>ConcurrentHashMap 1.7>>HashMap 1.8>>[ConcurrentHashMap 1.8]()
參考 《Java 并發(fā)編程的藝術(shù)》