這篇文章將為大家詳細講解有關(guān)如何將普通的Thread多線程改為Java8的parallelStream并發(fā)流,文章內(nèi)容質(zhì)量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關(guān)知識有一定的了解。
創(chuàng)新互聯(lián)公司網(wǎng)站建設(shè)服務(wù)商,為中小企業(yè)提供網(wǎng)站設(shè)計制作、網(wǎng)站制作服務(wù),網(wǎng)站設(shè)計,網(wǎng)站托管、服務(wù)器托管等一站式綜合服務(wù)型公司,專業(yè)打造企業(yè)形象網(wǎng)站,讓您在眾多競爭對手中脫穎而出創(chuàng)新互聯(lián)公司。
Java8的parallelStream
并發(fā)流能達到跟多線程類似的效果,但它也不是什么善茬,為了得到跟上一版本的多線程類似的效果,一改再改,雖然最后改出來了,但是還是存在理解不了的地方。
理論上,你需要先有一個List>
,任意類型的List都行,然后調(diào)用它的.parallelStream()
方法就可以了。
對我這個例子來說,元素的類型不重要,因此選擇了Integer
類型,核心代碼如下:
AtomicInteger atomicInteger = new AtomicInteger(0); return Arrays.asList(new Integer[size]).parallelStream().map(i -> atomicInteger.incrementAndGet());
值得注意的是,第一行用的是AtomicInteger
而不是Integer
,因為Integer
會存在并發(fā)問題
第二行的意思是:新建一個大小為size
的數(shù)組,把數(shù)組轉(zhuǎn)成List
,再把List轉(zhuǎn)成parallelStream
,再把列表中的元素初始化成遞增的整數(shù),最后返回。
簡單來說,我認(rèn)為原因是:因為它的默認(rèn)值適用的場景是CPU密集型
的,而一般的Web項目是IO密集型
的(一般的Web項目都是需要跟數(shù)據(jù)庫打交道的,針對數(shù)據(jù)庫的操作主要就都是IO
,而對CPU的消耗并不高)。
當(dāng)不能使用默認(rèn)值的時候,就需要開發(fā)人員額外去了解parallelStream
的用法,而這些資料還不是特別好找。比如說:parallelStream
默認(rèn)的并發(fā)線程數(shù)是多少?怎么修改默認(rèn)的線程數(shù)?
我最終找到了這篇問答:Custom thread pool in Java 8 parallel stream,供參考。
問題1: 在Java代碼中,怎樣獲取可用的CPU處理器的數(shù)量?代碼如下:(在我的機器8核上結(jié)果是:8)
Runtime.getRuntime().availableProcessors()
問題2: parallelStream默認(rèn)的并發(fā)線程數(shù)是多少?代碼如下:(在我的機器上結(jié)果是:7)
ForkJoinPool.getCommonPoolParallelism()
問題3: 為什么parallelStream默認(rèn)的并發(fā)線程數(shù)要比CPU處理器的數(shù)量少1個?
因為最優(yōu)的策略是每個CPU處理器分配一個線程,然而主線程也算一個線程,所以要占一個名額。
問題4: 那如果電腦比較差,就只有1個CPU要怎么辦?那就不管了,默認(rèn)的并發(fā)線程數(shù)就是1,總不能為零吧。
問題5: 默認(rèn)的并發(fā)線程數(shù)太少了,要怎么修改?如代碼如下:(改成了20)
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
問題6: 默認(rèn)的并發(fā)線程數(shù)可以反復(fù)修改嗎?不能。因為java.util.concurrent.ForkJoinPool.common.parallelism
是final
類型的,整個JVM中只允許設(shè)置一次。
執(zhí)行以下代碼:
int a=5; for (;;) { System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "" + a++); System.out.println("ForkJoinPool.getCommonPoolParallelism() : " + ForkJoinPool.getCommonPoolParallelism()); if(a>7)break; } /** result: ForkJoinPool.getCommonPoolParallelism() : 5 ForkJoinPool.getCommonPoolParallelism() : 5 ForkJoinPool.getCommonPoolParallelism() : 5 */
問題7: 既然默認(rèn)的并發(fā)線程數(shù)不能反復(fù)修改,那怎么進行不同線程數(shù)量的并發(fā)測試呢?答案是:引入ForkJoinPool
。用法如下:
new ForkJoinPool(threadCount).submit(() -> { parallelStream.forEach(i -> { // 這里省略提交訂單的代碼 }); }).get();
問題8: java.util.concurrent.ForkJoinPool.common.parallelism
與new ForkJoinPool(threadCount)
之間有什么關(guān)系?答案是:不知道。
這個答案很讓人失望,但是我確實沒有查出來。我這邊測試的結(jié)果是:
1.如果在new ForkJoinPool(threadCount)
之前沒有設(shè)置java.util.concurrent.ForkJoinPool.common.parallelism
的值,那么new ForkJoinPool(threadCount)
的作用就不明顯,即就是說,改變threadCount
的值對性能沒有多大影響。
2.如果在之前設(shè)置了java.util.concurrent.ForkJoinPool.common.parallelism
的值,但是設(shè)置得比較?。ū热?2),則后續(xù)的new ForkJoinPool(threadCount)
的作用也不明顯。
3.只有先把java.util.concurrent.ForkJoinPool.common.parallelism
的值設(shè)置得比較大(比如10000),后續(xù)的new ForkJoinPool(threadCount)
中threadCount
改變之后,才對性能有明顯的影響。
問題9: 如果按問題8中的來修改,把java.util.concurrent.ForkJoinPool.common.parallelism
的值設(shè)置得比較大(比如10000),就意味不再適用于CPU密集型
的操作了,那應(yīng)該怎么辦呢?
答案是:每次都用new ForkJoinPool(threadCount)
,整體放棄使用默認(rèn)的parallelStream
。(那多麻煩啊
你看,隨隨便便就有這么多問題,頓時就不想用了,是不是。
并發(fā)線程數(shù)量”與“每秒能提交的訂單數(shù)量”之間的關(guān)系
這次測試的結(jié)果與上次測試的結(jié)果對比圖如下:(紅色為上一版本使用Thread測試的結(jié)果,黑色為這一版本使用parallelStream測試的結(jié)果)
可以看到差別并不大。
【備注】:不同的機器上的測試結(jié)果會不一樣,以上測試結(jié)果僅供參考。
關(guān)于如何將普通的Thread多線程改為Java8的parallelStream并發(fā)流就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,可以學(xué)到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。