本篇內(nèi)容介紹了“并發(fā)計(jì)數(shù)類(lèi)LongAdder怎么用”的有關(guān)知識(shí),在實(shí)際案例的操作過(guò)程中,不少人都會(huì)遇到這樣的困境,接下來(lái)就讓小編帶領(lǐng)大家學(xué)習(xí)一下如何處理這些情況吧!希望大家仔細(xì)閱讀,能夠?qū)W有所成!
創(chuàng)新互聯(lián)建站主要從事成都網(wǎng)站制作、成都網(wǎng)站建設(shè)、網(wǎng)頁(yè)設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)商州,十多年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專(zhuān)業(yè),歡迎來(lái)電咨詢建站服務(wù):18980820575
AtomicLong是通過(guò)CAS(即Compare And Swap)原理來(lái)完成原子遞增遞減操作,在并發(fā)情況下不會(huì)出現(xiàn)線程不安全結(jié)果。AtomicLong中的value是使用volatile修飾,并發(fā)下各個(gè)線程對(duì)value最新值均可見(jiàn)。我們以incrementAndGet()方法來(lái)深入。
public final long incrementAndGet() {
return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
}
這里是調(diào)用了unsafe的方法
public final long getAndAddLong(Object var1, long var2, long var4) {
long var6;
do {
var6 = this.getLongVolatile(var1, var2);
} while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
return var6;
}
方法中this.compareAndSwapLong()有4個(gè)參數(shù),var1是需要修改的類(lèi)對(duì)象,var2是需要修改的字段的內(nèi)存地址,var6是修改前字段的值,var6+var4是修改后字段的值。compareAndSwapLong只有該字段實(shí)際值和var6值相當(dāng)?shù)臅r(shí)候,才可以成功設(shè)置其為var6+var4。
再繼續(xù)往深一層去看
public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
這里Unsafe.compareAndSwapLong是native方法,底層通過(guò)JNI(Java Native Interface)來(lái)完成調(diào)用,實(shí)際就是借助C來(lái)調(diào)用CPU指令來(lái)完成。
實(shí)現(xiàn)中使用了do-while循環(huán),如果CAS失敗,則會(huì)繼續(xù)重試,直到成功為止。并發(fā)特別高的時(shí)候,雖然這里可能會(huì)有很多次循環(huán),但是還是可以保證線程安全的。不過(guò)如果自旋CAS操作長(zhǎng)時(shí)間不成功,競(jìng)爭(zhēng)較大,會(huì)帶CPU帶來(lái)極大的開(kāi)銷(xiāo),占用更多的執(zhí)行資源,可能會(huì)影響其他主業(yè)務(wù)的計(jì)算等。
LongAdder怎么優(yōu)化AtomicLong
Doug Lea在jdk1.5的時(shí)候就針對(duì)HashMap進(jìn)行了線程安全和并發(fā)性能的優(yōu)化,推出了分段鎖實(shí)現(xiàn)的ConcurrentHashMap。一般Java面試,基本上離不開(kāi)ConcurrentHashMap這個(gè)網(wǎng)紅問(wèn)題。另外在ForkJoinPool中,Doug Lea在其工作竊取算法上對(duì)WorkQueue使用了細(xì)粒度鎖來(lái)較少并發(fā)的競(jìng)爭(zhēng),更多細(xì)節(jié)可參考我的原創(chuàng)文章ForkJoin使用和原理剖析。如果已經(jīng)對(duì)ConcurrentHashMap有了較為深刻的理解,那么現(xiàn)在來(lái)看LongAdder的實(shí)現(xiàn)就會(huì)相對(duì)簡(jiǎn)單了。
來(lái)看LongAdder的increase()方法實(shí)現(xiàn),
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
//第一個(gè)if進(jìn)行了兩個(gè)判斷,(1)如果cells不為空,則直接進(jìn)入第二個(gè)if語(yǔ)句中。(2)同樣會(huì)先使用cas指令來(lái)嘗試add,如果成功則直接返回。如果失敗則說(shuō)明存在競(jìng)爭(zhēng),需要重新add
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
這里用到了Cell類(lèi)對(duì)象,Cell對(duì)象是LongAdder高并發(fā)實(shí)現(xiàn)的關(guān)鍵。在casBase沖突嚴(yán)重的時(shí)候,就會(huì)去創(chuàng)建Cell對(duì)象并添加到cells中,下面會(huì)詳細(xì)分析。
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
//提供CAS方法修改當(dāng)前Cell對(duì)象上的value
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
而這一句a = as[getProbe() & m]其實(shí)就是通過(guò)getProbe()拿到當(dāng)前Thread的threadLocalRandomProbe的probe Hash值。這個(gè)值其實(shí)是一個(gè)隨機(jī)值,這個(gè)隨機(jī)值由當(dāng)前線程ThreadLocalRandom.current()產(chǎn)生。不用Rondom的原因是因?yàn)檫@里已經(jīng)是高并發(fā)了,多線程情況下Rondom會(huì)極大可能得到同一個(gè)隨機(jī)值。因此這里使用threadLocalRandomProbe在高并發(fā)時(shí)會(huì)更加隨機(jī),減少?zèng)_突。更多ThreadLocalRandom信息想要深入了解可關(guān)注這篇文章并發(fā)包中ThreadLocalRandom類(lèi)原理淺嘗。拿到as數(shù)組中當(dāng)前線程的Cell對(duì)象,然后再進(jìn)行CAS的更新操作,我們?cè)谠创a上進(jìn)行分析。longAccumulate()是在父類(lèi)Striped64.java中。
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
if ((h = getProbe()) == 0) {
//如果當(dāng)前線程的隨機(jī)數(shù)為0,則初始化隨機(jī)數(shù)
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
for (;;) {
Cell[] as; Cell a; int n; long v;
//如果當(dāng)前cells數(shù)組不為空
if ((as = cells) != null && (n = as.length) > 0) {
//如果線程隨機(jī)數(shù)對(duì)應(yīng)的cells對(duì)應(yīng)數(shù)組下標(biāo)的Cell元素不為空,
if ((a = as[(n - 1) & h]) == null) {
//當(dāng)使用到LongAdder的Cell數(shù)組相關(guān)的操作時(shí),需要先獲取全局的cellsBusy的鎖,才可以進(jìn)行相關(guān)操作。如果當(dāng)前有其他線程的使用,則放棄這一步,繼續(xù)for循環(huán)重試。
if (cellsBusy == 0) { // Try to attach new Cell
//Cell的初始值是x,創(chuàng)建完畢則說(shuō)明已經(jīng)加上
Cell r = new Cell(x); // Optimistically create
//casCellsBusy獲取鎖,cellsBusy通過(guò)CAS方式獲取鎖,當(dāng)成功設(shè)置cellsBusy為1時(shí),則獲取到鎖。
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
//finally里面釋放鎖
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//如果a不為空,則對(duì)a進(jìn)行cas增x操作,成功則返回
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
//cells的長(zhǎng)度n已經(jīng)大于CPU數(shù)量,則繼續(xù)擴(kuò)容沒(méi)有意義,因此直接標(biāo)記為不沖突
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
//到這一步則說(shuō)明a不為空但是a上進(jìn)行CAS操作也有多個(gè)線程在競(jìng)爭(zhēng),因此需要擴(kuò)容cells數(shù)組,其長(zhǎng)度為原長(zhǎng)度的2倍
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//繼續(xù)使用新的隨機(jī)數(shù),避免在同一個(gè)Cell上競(jìng)爭(zhēng)
h = advanceProbe(h);
}
//如果cells為空,則需要先創(chuàng)建Cell數(shù)組。初始長(zhǎng)度為2.(個(gè)人理解這個(gè)if放在前面會(huì)比較好一點(diǎn),哈哈)
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//如果在a上競(jìng)爭(zhēng)失敗,且擴(kuò)容競(jìng)爭(zhēng)也失敗了,則在casBase上嘗試增加數(shù)量
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
最后是求LongAdder的總數(shù),這一步就非常簡(jiǎn)單了,把base的值和所有cells上的value值加起來(lái)就是總數(shù)了。
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
思考
源碼中Cell數(shù)組的會(huì)控制在不超過(guò)NCPU的兩倍,原因是LongAdder其實(shí)在底層是依賴(lài)CPU的CAS指令來(lái)操作,如果多出太多,即使在代碼層面沒(méi)有競(jìng)爭(zhēng),在底層CPU的競(jìng)爭(zhēng)會(huì)更多,所以這里會(huì)有一個(gè)數(shù)量的限制。所以在LongAdder的設(shè)計(jì)中,由于使用到CAS指令操作,瓶頸在于CPU上。
YY一下,那么有沒(méi)有方式可以突破這個(gè)瓶頸呢?我個(gè)人覺(jué)得是有的,但是有前提條件,應(yīng)用場(chǎng)景極其有限?;赥hreadLocal的設(shè)計(jì),假設(shè)統(tǒng)計(jì)只在一個(gè)固定的線程池中進(jìn)行,假設(shè)線程池中的線程不會(huì)銷(xiāo)毀(異常補(bǔ)充線程的就暫時(shí)不管了),則可以認(rèn)為線程數(shù)量是固定且不變的,那么統(tǒng)計(jì)則可以依賴(lài)于只在當(dāng)前線程中進(jìn)行,那么即使是高并發(fā),就轉(zhuǎn)化為T(mén)hreadLocal這種單線程操作了,完全可以擺脫CAS的CPU指令操作的限制,那么性能將極大提升。
“并發(fā)計(jì)數(shù)類(lèi)LongAdder怎么用”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí)可以關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編將為大家輸出更多高質(zhì)量的實(shí)用文章!