作者:acupt
公司主營業(yè)務(wù):成都網(wǎng)站設(shè)計、網(wǎng)站制作、移動網(wǎng)站開發(fā)等業(yè)務(wù)。幫助企業(yè)客戶真正實現(xiàn)互聯(lián)網(wǎng)宣傳,提高企業(yè)的競爭能力。創(chuàng)新互聯(lián)是一支青春激揚、勤奮敬業(yè)、活力青春激揚、勤奮敬業(yè)、活力澎湃、和諧高效的團隊。公司秉承以“開放、自由、嚴謹、自律”為核心的企業(yè)文化,感謝他們對我們的高要求,感謝他們從不同領(lǐng)域給我們帶來的挑戰(zhàn),讓我們激情的團隊有機會用頭腦與智慧不斷的給客戶帶來驚喜。創(chuàng)新互聯(lián)推出輝南免費做網(wǎng)站回饋大家。
前言
不考慮多線程并發(fā)的情況下,容器類一般使用ArrayList、HashMap等線程不安全的類,效率更高。在并發(fā)場景下,常會用到ConcurrentHashMap、ArrayBlockingQueue等線程安全的容器類,雖然犧牲了一些效率,但卻得到了安全。
上面提到的線程安全容器都在java.util.concurrent包下,這個包下并發(fā)容器不少,今天全部翻出來鼓搗一下。
僅做簡單介紹,后續(xù)再分別深入探索。
并發(fā)容器介紹
ConcurrentHashMap:并發(fā)版HashMap
CopyOnWriteArrayList:并發(fā)版ArrayList
CopyOnWriteArraySet:并發(fā)Set
ConcurrentLinkedQueue:并發(fā)隊列(基于鏈表)
ConcurrentLinkedDeque:并發(fā)隊列(基于雙向鏈表)
ConcurrentSkipListMap:基于跳表的并發(fā)Map
ConcurrentSkipListSet:基于跳表的并發(fā)Set
ArrayBlockingQueue:阻塞隊列(基于數(shù)組)
LinkedBlockingQueue:阻塞隊列(基于鏈表)
LinkedBlockingDeque:阻塞隊列(基于雙向鏈表)
PriorityBlockingQueue:線程安全的優(yōu)先隊列
SynchronousQueue:讀寫成對的隊列
LinkedTransferQueue:基于鏈表的數(shù)據(jù)交換隊列
DelayQueue:延時隊列
1.ConcurrentHashMap 并發(fā)版HashMap
最常見的并發(fā)容器之一,可以用作并發(fā)場景下的緩存。底層依然是哈希表,但在JAVA 8中有了不小的改變,而JAVA 7和JAVA 8都是用的比較多的版本,因此經(jīng)常會將這兩個版本的實現(xiàn)方式做一些比較(比如面試中)。
一個比較大的差異就是,JAVA 7中采用分段鎖來減少鎖的競爭,JAVA 8中放棄了分段鎖,采用CAS(一種樂觀鎖),同時為了防止哈希沖突嚴重時退化成鏈表(沖突時會在該位置生成一個鏈表,哈希值相同的對象就鏈在一起),會在鏈表長度達到閾值(8)后轉(zhuǎn)換成紅黑樹(比起鏈表,樹的查詢效率更穩(wěn)定)。
2.CopyOnWriteArrayList 并發(fā)版ArrayList
并發(fā)版ArrayList,底層結(jié)構(gòu)也是數(shù)組,和ArrayList不同之處在于:當新增和刪除元素時會創(chuàng)建一個新的數(shù)組,在新的數(shù)組中增加或者排除指定對象,最后用新增數(shù)組替換原來的數(shù)組。
適用場景:由于讀操作不加鎖,寫(增、刪、改)操作加鎖,因此適用于讀多寫少的場景。
局限:由于讀的時候不會加鎖(讀的效率高,就和普通ArrayList一樣),讀取的當前副本,因此可能讀取到臟數(shù)據(jù)。如果介意,建議不用。
看看源碼感受下:
public?class?CopyOnWriteArrayList????implements?List ,?RandomAccess,?Cloneable,?java.io.Serializable?{ ????final?transient?ReentrantLock?lock?=?new?ReentrantLock(); ????private?transient?volatile?Object[]?array; ????//?添加元素,有鎖 ????public?boolean?add(E?e)?{ ????????final?ReentrantLock?lock?=?this.lock; ????????lock.lock();?//?修改時加鎖,保證并發(fā)安全 ????????try?{ ????????????Object[]?elements?=?getArray();?//?當前數(shù)組 ????????????int?len?=?elements.length; ????????????Object[]?newElements?=?Arrays.copyOf(elements,?len?+?1);?//?創(chuàng)建一個新數(shù)組,比老的大一個空間 ????????????newElements[len]?=?e;?//?要添加的元素放進新數(shù)組 ????????????setArray(newElements);?//?用新數(shù)組替換原來的數(shù)組 ????????????return?true; ????????}?finally?{ ????????????lock.unlock();?//?解鎖 ????????} ????} ????//?讀元素,不加鎖,因此可能讀取到舊數(shù)據(jù) ????public?E?get(int?index)?{ ????????return?get(getArray(),?index); ????}}
3.CopyOnWriteArraySet 并發(fā)Set
基于CopyOnWriteArrayList實現(xiàn)(內(nèi)含一個CopyOnWriteArrayList成員變量),也就是說底層是一個數(shù)組,意味著每次add都要遍歷整個集合才能知道是否存在,不存在時需要插入(加鎖)。
適用場景:在CopyOnWriteArrayList適用場景下加一個,集合別太大(全部遍歷傷不起)。
4.ConcurrentLinkedQueue 并發(fā)隊列(基于鏈表)
基于鏈表實現(xiàn)的并發(fā)隊列,使用樂觀鎖(CAS)保證線程安全。因為數(shù)據(jù)結(jié)構(gòu)是鏈表,所以理論上是沒有隊列大小限制的,也就是說添加數(shù)據(jù)一定能成功。
5.ConcurrentLinkedDeque 并發(fā)隊列(基于雙向鏈表)
基于雙向鏈表實現(xiàn)的并發(fā)隊列,可以分別對頭尾進行操作,因此除了先進先出(FIFO),也可以先進后出(FILO),當然先進后出的話應(yīng)該叫它棧了。
6.ConcurrentSkipListMap 基于跳表的并發(fā)Map
SkipList即跳表,跳表是一種空間換時間的數(shù)據(jù)結(jié)構(gòu),通過冗余數(shù)據(jù),將鏈表一層一層索引,達到類似二分查找的效果
7.ConcurrentSkipListSet 基于跳表的并發(fā)Set
類似HashSet和HashMap的關(guān)系,ConcurrentSkipListSet里面就是一個ConcurrentSkipListMap,就不細說了。
8.ArrayBlockingQueue 阻塞隊列(基于數(shù)組)
基于數(shù)組實現(xiàn)的可阻塞隊列,構(gòu)造時必須制定數(shù)組大小,往里面放東西時如果數(shù)組滿了便會阻塞直到有位置(也支持直接返回和超時等待),通過一個鎖ReentrantLock保證線程安全。
用offer操作舉個例子:
public?class?ArrayBlockingQueue?extends?AbstractQueue ????????implements?BlockingQueue ,?java.io.Serializable?{ ????/** ?????*?讀寫共用此鎖,線程間通過下面兩個Condition通信 ?????*?這兩個Condition和lock有緊密聯(lián)系(就是lock的方法生成的) ?????*?類似Object的wait/notify ?????*/ ????final?ReentrantLock?lock; ????/**?隊列不為空的信號,取數(shù)據(jù)的線程需要關(guān)注?*/ ????private?final?Condition?notEmpty; ????/**?隊列沒滿的信號,寫數(shù)據(jù)的線程需要關(guān)注?*/ ????private?final?Condition?notFull; ????//?一直阻塞直到有東西可以拿出來 ????public?E?take()?throws?InterruptedException?{ ????????final?ReentrantLock?lock?=?this.lock; ????????lock.lockInterruptibly(); ????????try?{ ????????????while?(count?==?0) ????????????????notEmpty.await(); ????????????return?dequeue(); ????????}?finally?{ ????????????lock.unlock(); ????????} ????} ????//?在尾部插入一個元素,隊列已滿時等待指定時間,如果還是不能插入則返回 ????public?boolean?offer(E?e,?long?timeout,?TimeUnit?unit) ????????throws?InterruptedException?{ ????????checkNotNull(e); ????????long?nanos?=?unit.toNanos(timeout); ????????final?ReentrantLock?lock?=?this.lock; ????????lock.lockInterruptibly();?//?鎖住 ????????try?{ ????????????//?循環(huán)等待直到隊列有空閑 ????????????while?(count?==?items.length)?{ ????????????????if?(nanos?<=?0) ????????????????????return?false;//?等待超時,返回 ????????????????//?暫時放出鎖,等待一段時間(可能被提前喚醒并搶到鎖,所以需要循環(huán)判斷條件) ????????????????//?這段時間可能其他線程取走了元素,這樣就有機會插入了 ????????????????nanos?=?notFull.awaitNanos(nanos); ????????????} ????????????enqueue(e);//插入一個元素 ????????????return?true; ????????}?finally?{ ????????????lock.unlock();?//解鎖 ????????} ????}
乍一看會有點疑惑,讀和寫都是同一個鎖,那要是空的時候正好一個讀線程來了不會一直阻塞嗎?
答案就在notEmpty、notFull里,這兩個出自lock的小東西讓鎖有了類似synchronized + wait + notify的功能。傳送門 → 終于搞懂了sleep/wait/notify/notifyAll
9.LinkedBlockingQueue 阻塞隊列(基于鏈表)
基于鏈表實現(xiàn)的阻塞隊列,想比與不阻塞的ConcurrentLinkedQueue,它多了一個容量限制,如果不設(shè)置默認為int最大值。
10.LinkedBlockingDeque 阻塞隊列(基于雙向鏈表)
類似LinkedBlockingQueue,但提供了雙向鏈表特有的操作。
11.PriorityBlockingQueue 線程安全的優(yōu)先隊列
構(gòu)造時可以傳入一個比較器,可以看做放進去的元素會被排序,然后讀取的時候按順序消費。某些低優(yōu)先級的元素可能長期無法被消費,因為不斷有更高優(yōu)先級的元素進來。
12.SynchronousQueue 數(shù)據(jù)同步交換的隊列
一個虛假的隊列,因為它實際上沒有真正用于存儲元素的空間,每個插入操作都必須有對應(yīng)的取出操作,沒取出時無法繼續(xù)放入。
一個簡單的例子感受一下:
import?java.util.concurrent.*;public?class?Main?{ ????public?static?void?main(String[]?args)?{ ????????SynchronousQueue?queue?=?new?SynchronousQueue<>(); ????????new?Thread(()?->?{ ????????????try?{ ????????????????//?沒有休息,瘋狂寫入 ????????????????for?(int?i?=?0;?;?i++)?{ ????????????????????System.out.println("放入:?"?+?i); ????????????????????queue.put(i); ????????????????} ????????????}?catch?(InterruptedException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????}).start(); ????????new?Thread(()?->?{ ????????????try?{ ????????????????//?咸魚模式取數(shù)據(jù) ????????????????while?(true)?{ ????????????????????System.out.println("取出:?"?+?queue.take()); ????????????????????Thread.sleep((long)?(Math.random()?*?2000)); ????????????????} ????????????}?catch?(InterruptedException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????}).start(); ????}}/*?輸出: 放入:?0 取出:?0 放入:?1 取出:?1 放入:?2 取出:?2 放入:?3 取出:?3 */
可以看到,寫入的線程沒有任何sleep,可以說是全力往隊列放東西,而讀取的線程又很不積極,讀一個又sleep一會。輸出的結(jié)果卻是讀寫操作成對出現(xiàn)。
JAVA中一個使用場景就是Executors.newCachedThreadPool(),創(chuàng)建一個緩存線程池。
public?static?ExecutorService?newCachedThreadPool()?{ ????return?new?ThreadPoolExecutor( ????????0,?//?核心線程為0,沒用的線程都被無情拋棄 ????????Integer.MAX_VALUE,?//?最大線程數(shù)理論上是無限了,還沒到這個值機器資源就被掏空了 ????????60L,?TimeUnit.SECONDS,?//?閑置線程60秒后銷毀 ????????new?SynchronousQueue());?//?offer時如果沒有空閑線程取出任務(wù),則會失敗,線程池就會新建一個線程 }
13.LinkedTransferQueue 基于鏈表的數(shù)據(jù)交換隊列
實現(xiàn)了接口TransferQueue,通過transfer方法放入元素時,如果發(fā)現(xiàn)有線程在阻塞在取元素,會直接把這個元素給等待線程。如果沒有人等著消費,那么會把這個元素放到隊列尾部,并且此方法阻塞直到有人讀取這個元素。和SynchronousQueue有點像,但比它更強大。
14.DelayQueue 延時隊列
可以使放入隊列的元素在指定的延時后才被消費者取出,元素需要實現(xiàn)Delayed接口。
總結(jié)
上面簡單介紹了JAVA并發(fā)包下的一些容器類,知道有這些東西,遇到合適的場景時就能想起有個現(xiàn)成的東西可以用了。想要知其所以然,后續(xù)還得再深入探索一番。
最后
歡迎大家一起交流,喜歡文章記得點個贊喲,感謝支持!