線程池(Thread Pool) 是并行執(zhí)行任務(wù)收集的實(shí)用工具。隨著 CPU 引入適合于應(yīng)用程序并行化的多核體系結(jié)構(gòu),線程池的作用正日益顯現(xiàn)。通過(guò) ThreadPoolExecutor類及其他輔助類,Java 5 引入了這一框架,作為新的并發(fā)支持部分。
創(chuàng)新互聯(lián)公司成立于2013年,是專業(yè)互聯(lián)網(wǎng)技術(shù)服務(wù)公司,擁有項(xiàng)目成都網(wǎng)站設(shè)計(jì)、做網(wǎng)站網(wǎng)站策劃,項(xiàng)目實(shí)施與項(xiàng)目整合能力。我們以讓每一個(gè)夢(mèng)想脫穎而出為使命,1280元長(zhǎng)汀做網(wǎng)站,已為上家服務(wù),為長(zhǎng)汀各地企業(yè)和個(gè)人服務(wù),聯(lián)系電話:028-86922220
ThreadPoolExecutor框架靈活且功能強(qiáng)大,它支持特定于用戶的配置并提供了相關(guān)的掛鉤(hook)和飽和策略來(lái)處理滿隊(duì)列
Java線程池會(huì)將提交的任務(wù)先置于工作隊(duì)列中,在從工作隊(duì)列中獲取(SynchronousQueue直接由生產(chǎn)者提交給工作線程)。那么工作隊(duì)列就有兩種實(shí)現(xiàn)策略:無(wú)界隊(duì)列和有界隊(duì)列。無(wú)界隊(duì)列不存在飽和的問(wèn)題,但是其問(wèn)題是當(dāng)請(qǐng)求持續(xù)高負(fù)載的話,任務(wù)會(huì)無(wú)腦的加入工作隊(duì)列,那么很可能導(dǎo)致內(nèi)存等資源溢出或者耗盡。而有界隊(duì)列不會(huì)帶來(lái)高負(fù)載導(dǎo)致的內(nèi)存耗盡的問(wèn)題,但是有引發(fā)工作隊(duì)列已滿情況下,新提交的任務(wù)如何管理的難題,這就是線程池工作隊(duì)列飽和策略要解決的問(wèn)題。
飽和策略分為:Abort 策略, CallerRuns 策略,Discard策略,DiscardOlds策略。
為了更好的理解,我編寫(xiě)一個(gè)小的例子。
package concurrency.pool; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class SaturationPolicy { /** * 線程池工作隊(duì)列已滿時(shí),在不同飽和策略下表現(xiàn) * @param handler 線程池工作隊(duì)列飽和策略 */ public static void policy(RejectedExecutionHandler handler){ //基本線程2個(gè),最大線程數(shù)為3,工作隊(duì)列容量為5 ThreadPoolExecutor exec = new ThreadPoolExecutor(2,3,0l, TimeUnit.MILLISECONDS,new LinkedBlockingDeque<>(5)); if (handler != null){ exec.setRejectedExecutionHandler(handler); //設(shè)置飽和策略 } for (int i = 0; i < 10; i++) { exec.submit(new Task()); //提交任務(wù) } exec.shutdown(); } public static void main(String[] args) { // policy(new ThreadPoolExecutor.AbortPolicy()); // policy((new ThreadPoolExecutor.CallerRunsPolicy())); // policy(new ThreadPoolExecutor.DiscardPolicy()); // policy(new ThreadPoolExecutor.DiscardOldestPolicy()); } //自定義任務(wù) static class Task implements Runnable { private static int count = 0; private int id = 0; //任務(wù)標(biāo)識(shí) public Task() { id = ++count; } @Override public void run() { try { TimeUnit.SECONDS.sleep(3); //休眠3秒 } catch (InterruptedException e) { System.err.println("線程被中斷" + e.getMessage()); } System.out.println(" 任務(wù):" + id + "\t 工作線程: "+ Thread.currentThread().getName() + " 執(zhí)行完畢"); } } }
當(dāng)工作隊(duì)列滿了,不同策略的處理方式為:
1.Abort策略:默認(rèn)策略,新任務(wù)提交時(shí)直接拋出未檢查的異常RejectedExecutionException,該異常可由調(diào)用者捕獲。
在主函數(shù)中添加如下代碼:
policy(new ThreadPoolExecutor.AbortPolicy());
運(yùn)行結(jié)果為:
程序拋出了RejectedExecutionException,并且一共運(yùn)行了8個(gè)任務(wù)(線程池開(kāi)始能運(yùn)行3個(gè)任務(wù),工作隊(duì)列中存儲(chǔ)5個(gè)隊(duì)列)。當(dāng)工作隊(duì)列滿了的時(shí)候,直接拋出了異常,而且JVM一直不退出(我現(xiàn)在也不知道什么原因)。我們可以看到執(zhí)行任務(wù)的線程全是線程池中的線程。
2.CallerRuns策略:為調(diào)節(jié)機(jī)制,既不拋棄任務(wù)也不拋出異常,而是將某些任務(wù)回退到調(diào)用者。不會(huì)在線程池的線程中執(zhí)行新的任務(wù),而是在調(diào)用exector的線程中運(yùn)行新的任務(wù)。
在主函數(shù)運(yùn)行:
policy((new ThreadPoolExecutor.CallerRunsPolicy()));
運(yùn)行結(jié)果
所有的任務(wù)都被運(yùn)行,且有2(10 - 3 -5)個(gè)任務(wù)是在main線程中執(zhí)行成功的,8個(gè)任務(wù)在線程池中的線程執(zhí)行的。
3.Discard策略:新提交的任務(wù)被拋棄。
在main函數(shù)中運(yùn)行
policy(new ThreadPoolExecutor.DiscardPolicy());
通過(guò)上面的結(jié)果可以顯示:沒(méi)有異常拋出,后面提交的2個(gè)新任務(wù)被拋棄,只處理了前8(3+5)個(gè)任務(wù),JVM退出。
4.DiscardOldest策略:隊(duì)列的是“隊(duì)頭”的任務(wù),然后嘗試提交新的任務(wù)。(不適合工作隊(duì)列為優(yōu)先隊(duì)列場(chǎng)景)
在main函數(shù)中運(yùn)行如下方法
policy(new ThreadPoolExecutor.DiscardOldestPolicy());
運(yùn)行結(jié)果:一共運(yùn)行8個(gè)任務(wù),程序結(jié)束,后面添加的任務(wù)9,任務(wù)10被執(zhí)行了,而前面的任務(wù)3,任務(wù)4被丟棄。
總結(jié)
以上就是本文關(guān)于java線程池工作隊(duì)列飽和策略代碼示例的全部?jī)?nèi)容,希望對(duì)大家有所幫助。如有不足之處,歡迎留言指出。感謝朋友們對(duì)本站的支持。