今天就跟大家聊聊有關(guān)如何自定義ForkJoinPool提升并行流 ParallelStream執(zhí)行速度,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結(jié)了以下內(nèi)容,希望大家根據(jù)這篇文章可以有所收獲。
創(chuàng)新互聯(lián)公司專注于企業(yè)營銷型網(wǎng)站建設(shè)、網(wǎng)站重做改版、南縣網(wǎng)站定制設(shè)計、自適應(yīng)品牌網(wǎng)站建設(shè)、H5頁面制作、商城網(wǎng)站建設(shè)、集團公司官網(wǎng)建設(shè)、成都外貿(mào)網(wǎng)站建設(shè)公司、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計等建站業(yè)務(wù),價格優(yōu)惠性價比高,為南縣等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。
在 java8 中 添加了流Stream,可以讓你以一種聲明的方式處理數(shù)據(jù)。使用起來非常簡單優(yōu)雅。ParallelStream 則是一個并行執(zhí)行的流,采用 ForkJoinPool 并行執(zhí)行任務(wù),提高執(zhí)行速度。
下面我們看看2個簡單的示例:
Arrays.asList(1,2,3,4,5,6) .parallelStream() .forEach((value) -> { String name = Thread.currentThread().getName(); System.out.println("示例1 Thread:" + name + " value:" + value); });
Stream.of(1,2,3,4,5,6) .parallel() .forEach((value) -> { String name = Thread.currentThread().getName(); System.out.println("示例2 Thread:" + name + " value:" + value); });
筆者最近在做一些爬蟲相關(guān)的業(yè)務(wù),其核心工具已開源 mica-http:https://gitee.com/596392912/mica/tree/master/mica-http ,經(jīng)過2個版本的迭代已經(jīng)發(fā)展成了一個強大非賬號爬蟲利器,趕緊來試試吧。
我們采集了大量的代理 ip 用來供爬蟲使用,其中有個定時任務(wù)每 5 分鐘去檢測代理是否失效,代理 ip 檢測比較費時,我們給每個檢測的請求 設(shè)定了 2s 的超時,這樣單線程的話 1000 個 ip 就得消耗半個多小時,當然筆者在校驗的時候采用的 parallel Stream 簡化開發(fā)。
然后發(fā)現(xiàn)效果并不明顯,代理 ip 數(shù)量上來之后 5 分鐘完全檢測不完,導(dǎo)致任務(wù)堆積。明明用了并發(fā)流為什么沒有明顯的提高執(zhí)行速度呢?
下面我們來看看剛剛的“示例”打印出的信息:
示例1 Thread:main value:4 示例1 Thread:ForkJoinPool.commonPool-worker-2 value:1 示例1 Thread:main value:6 示例1 Thread:ForkJoinPool.commonPool-worker-2 value:5 示例1 Thread:main value:3 示例1 Thread:ForkJoinPool.commonPool-worker-1 value:2 示例2 Thread:main value:4 示例2 Thread:ForkJoinPool.commonPool-worker-3 value:3 示例2 Thread:ForkJoinPool.commonPool-worker-2 value:5 示例2 Thread:ForkJoinPool.commonPool-worker-4 value:1 示例2 Thread:ForkJoinPool.commonPool-worker-5 value:2 示例2 Thread:ForkJoinPool.commonPool-worker-1 value:6
我們可以看到 Parallel Stream,默認采用的是一個 ForkJoinPool.commonPool 的線程池,這樣我們就算使用了 Parallel Stream, 整個 jvm 共用一個 common pool 線程池,一不小心就任務(wù)堆積了,在校驗代理 ip 的時候我們還有采集代理等其他的任務(wù)中也大量使用了并發(fā)流, 這樣也就印證了為什么會任務(wù)堆積了。
使用自定義 ForkJoinPool 執(zhí)行速度。示例代碼如下:
// 示例:自定義線程池 ForkJoinPool forkJoinPool = new ForkJoinPool(8); // 這里是從數(shù)據(jù)庫里查出來的一批代理 ip Listrecords = new ArrayList<>(); // 找出失效的代理 ip List needDeleteList = forkJoinPool.submit(() -> records.parallelStream() .map(ProxyList::getIpPort) .filter(IProxyListTask::isFailed) .collect(Collectors.toList()) ).join(); // 刪除失效的代理
整個代碼依然比較優(yōu)雅,在使用自定義的 ForkJoin 線程池之后,執(zhí)行速度有了明顯的提升。以前 5 分鐘執(zhí)行不完的任務(wù)現(xiàn)在 2 分鐘之內(nèi)就能全部執(zhí)行完畢。
java8 的并發(fā)流在大批量數(shù)據(jù)處理時可簡化多線程的使用,在遇到耗時業(yè)務(wù)或者重度使用并發(fā)流不妨根據(jù)業(yè)務(wù)情況采用自定義線程池來提示處理速度。
看完上述內(nèi)容,你們對如何自定義ForkJoinPool提升并行流 ParallelStream執(zhí)行速度有進一步的了解嗎?如果還想了解更多知識或者相關(guān)內(nèi)容,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝大家的支持。