如何進(jìn)行ThreadPoolExecutor 源碼解析,針對這個問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
創(chuàng)新互聯(lián)主要從事成都做網(wǎng)站、成都網(wǎng)站制作、網(wǎng)頁設(shè)計、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)萬載,十載網(wǎng)站建設(shè)經(jīng)驗,價格優(yōu)惠、服務(wù)專業(yè),歡迎來電咨詢建站服務(wù):18982081108
線程是CPU 調(diào)度的最小操作單位,線程模型分為KLT 模型和ULT 模型,JVM 使用的是KLT 模型。
線程的狀態(tài) :NEW,RUNNABLE,BLOCKED,TERMINATED
在執(zhí)行大量異步運(yùn)算的時候,線程池用優(yōu)化系統(tǒng)性能,減少線程的反復(fù)創(chuàng)建所帶來的的系統(tǒng)開銷
提供了一種限制和管理資源的方法
corePoolSize :線程池中的核心線程數(shù),當(dāng)提交一個任務(wù)時,線程池創(chuàng)建一個新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize;如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到 阻塞隊列中,等待被執(zhí)行;如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會 提前創(chuàng)建并啟動所有核心線程。
maximumPoolSize :線程池中允許的最大線程數(shù)。如果當(dāng)前阻塞隊列滿了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù),前提是當(dāng)前線程數(shù)小于maximumPoolSize;
keepAliveTime :線程池維護(hù)線程所允許的空閑時間。當(dāng)線程池中的線程數(shù)量大corePoolSize的時 候,如果這時沒有新的任務(wù)提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待 的時間超過了keepAliveTime;
unit: keepAliveTime的單位;
workQueue: 用來保存等待被執(zhí)行的任務(wù)的阻塞隊列,且任務(wù)必須實現(xiàn)Runable接口,在JDK中提供了如下阻塞隊列:
ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,按FIFO排序任務(wù);
LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊列,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQuene; -
SynchronousQuene:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于 LinkedBlockingQuene;
priorityBlockingQuene:具有優(yōu)先級的無界阻塞隊列;
threadFactory:它是ThreadFactory類型的變量,用來創(chuàng)建新線程。默認(rèn)使用 Executors.defaultThreadFactory() 來創(chuàng)建線程。使用默認(rèn)ThreadFactory來創(chuàng)建線程 時,會使新創(chuàng)建的線程具有相同的NORM_PRIORITY優(yōu)先級并且是非守護(hù)線程,同時也設(shè) 置了線程的名稱。
handler: 線程池的飽和策略,當(dāng)阻塞隊列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務(wù),必 須采取一種策略處理該任務(wù),線程池提供了4種策略:
AbortPolicy:直接拋出異常,默認(rèn)策略;
CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
DiscardPolicy:直接丟棄任務(wù);
上面的4種策略都是ThreadPoolExecutor的內(nèi)部類。當(dāng)然也可以根據(jù)應(yīng)用場景實現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如 記錄日志或持久化存儲不能處理的任務(wù)。
NEW
RUNNABLE
WATING
BLOCKED
TIMED_WATING
TERMINATED
ctl 是對線程池的運(yùn)行狀態(tài)和線程池中有效線程的數(shù)量進(jìn)行控制的一個字段, 它包含兩 部分的信息: 線程池的運(yùn)行狀態(tài) (runState) 和線程池內(nèi)有效線程的數(shù)量 (workerCount),這 里可以看到,使用了Integer類型來保存,高3位保存runState,低29位保存 workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位減1(29個1),這個常 量表示workerCount的上限值,大約是5億。
runState 主要提供線程池生命周期的控制,主要值包括:
RUNNING
(1) 狀態(tài)說明:線程池處在RUNNING狀態(tài)時,能夠接收新任務(wù),以及對已添加的任務(wù)進(jìn)行 處理。
(2) 狀態(tài)切換:線程池的初始化狀態(tài)是RUNNING。換句話說,線程池被一旦被創(chuàng)建,就處 于RUNNING狀態(tài),并且線程池中的任務(wù)數(shù)為0!
SHUTDOWN
(1) 狀態(tài)說明:線程池處在SHUTDOWN狀態(tài)時,不接收新任務(wù),但能處理已添加的任務(wù)。
(2) 狀態(tài)切換:調(diào)用線程池的shutdown()接口時,線程池由RUNNING -> -SHUTDOWN。
STOP
(1) 狀態(tài)說明:線程池處在STOP狀態(tài)時,不接收新任務(wù),不處理已添加的任務(wù),并且會中 斷正在處理的任務(wù)。
(2) 狀態(tài)切換:調(diào)用線程池的shutdownNow()接口時,線程池由(RUNNING or SHUTDOWN ) -> STOP。
TIDYING
(1) 狀態(tài)說明:當(dāng)所有的任務(wù)已終止,ctl記錄的”任務(wù)數(shù)量”為0,線程池會變?yōu)門IDYING 狀態(tài)。當(dāng)線程池變?yōu)門IDYING狀態(tài)時,會執(zhí)行鉤子函數(shù)terminated()。terminated()在 ThreadPoolExecutor類中是空的,若用戶想在線程池變?yōu)門IDYING時,進(jìn)行相應(yīng)的處理; 可以通過重載terminated()函數(shù)來實現(xiàn)。
(2) 狀態(tài)切換:當(dāng)線程池在SHUTDOWN狀態(tài)下,阻塞隊列為空并且線程池中執(zhí)行的任務(wù)也 為空時,就會由 SHUTDOWN -> TIDYING。 當(dāng)線程池在STOP狀態(tài)下,線程池中執(zhí)行的 任務(wù)為空時,就會由STOP -> TIDYING。
TERMINATED
(1) 狀態(tài)說明:線程池徹底終止,就變成TERMINATED狀態(tài)。
(2) 狀態(tài)切換:線程池處在TIDYING狀態(tài)時,執(zhí)行完terminated()之后,就會由 TIDYING - > TERMINATED。 進(jìn)入TERMINATED的條件如下: 線程池不是RUNNING狀態(tài); 線程池狀態(tài)不是TIDYING狀態(tài)或TERMINATED狀態(tài); 如果線程池狀態(tài)是SHUTDOWN并且workerQueue為空; workerCount為0,設(shè)置TIDYING狀態(tài)成功。
ctl相關(guān) API
runStateOf():獲取運(yùn)行狀態(tài);
workerCountOf():獲取活動線程數(shù);
ctlOf():獲取運(yùn)行狀態(tài)和活動線程數(shù)的值。
execute(Runnable command):執(zhí)行Ruannable類型的任務(wù)
submit(task):可用來提交Callable或Runnable任務(wù),并返回代表此任務(wù)的Future 對象
shutdown():在完成已提交的任務(wù)后封閉辦事,不再接管新任務(wù),
shutdownNow():停止所有正在履行的任務(wù)并封閉辦事。
isTerminated():測試是否所有任務(wù)都履行完畢了。
isShutdown():測試是否該ExecutorService已被關(guān)閉。
ThreadPoolExecutor 默認(rèn)線程池 ScheduledThreadPoolExecutor 定時線程池
public long getTaskCount() //線程池已執(zhí)行與未執(zhí)行的任務(wù)總數(shù)
public long getCompletedTaskCount() //已完成的任務(wù)數(shù)
public int getPoolSize() //線程池當(dāng)前的線程數(shù)
public int getActiveCount() //線程池中正在執(zhí)行任務(wù)的線程數(shù)量
//在將來的某個時間執(zhí)行給定的任務(wù)。任務(wù)可以是新起一個新線程或者復(fù)用現(xiàn)有池線程中的線程去執(zhí)行 public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * 執(zhí)行過程還是分為 3 步: * * 1.執(zhí)行任務(wù): * 如果小于核心線程數(shù),嘗試創(chuàng)建一個新線程來執(zhí)行給定的任務(wù)。 * 方法 addWorker() 就是真正的創(chuàng)建一個新線程來執(zhí)行任務(wù)的方法。 * addWorker()方法會對 runState 和 workerCount進(jìn)行原子檢查。 * addWorker()方法會返回一個 boolean 值,通過返回 false 值來防止在不應(yīng)該添加線程的情況下發(fā)出錯誤警報 * * * 2.添加到阻塞隊列: * 未能滿足條件執(zhí)行完步驟 1 則添加到阻塞隊列。 * 如果任務(wù)可以成功排隊,會再次進(jìn)行檢查,檢查是否應(yīng)該添加線程(因為現(xiàn)有線程自上次檢查后就死了), * 或者自進(jìn)入此方法以來該池已關(guān)閉。因此,需要重新檢查狀態(tài),并在停止的情況下在必要時回滾隊列,如果沒有,則啟動一個新線程。 * * 3.拒絕任務(wù): * 如果無法將任務(wù)添加至阻塞隊列,最大線程數(shù)也未達(dá)到最大會嘗試添加一個新的線程。如果失敗,說明線程池已關(guān)閉或處于飽和狀態(tài),因此拒絕該任務(wù)。 */ //clt記錄著runState和workerCount int c = ctl.get(); /* * workerCountOf方法取出低29位的值,表示當(dāng)前活動的線程數(shù); * 如果當(dāng)前活動線程數(shù)小于corePoolSize,則新建一個線程放入線程池中;并把任務(wù)添加到該線程中。 */ if (workerCountOf(c) < corePoolSize) { /* * addWorker中的第二個參數(shù)表示限制添加線程的數(shù)量是根據(jù)corePoolSize來判斷還是maximumPoolSize來判斷 * 如果為true,根據(jù)corePoolSize來判斷; * 如果為false,則根據(jù)maximumPoolSize來判斷 */ if (addWorker(command, true)) return; //如果添加失敗,則重新獲取ctl值 c = ctl.get(); } //執(zhí)行到此處說明從核心線程里給當(dāng)前任務(wù)分配線程失敗 //如果當(dāng)前線程池是運(yùn)行狀態(tài)并且任務(wù)添加到隊列成功 if (isRunning(c) && workQueue.offer(command)) { //重新獲取ctl值。即使添加隊列成功也要再次檢查,如果不是運(yùn)行狀態(tài),由于之前已經(jīng)把任務(wù)添加到workerQueue 中了,所以要移除該任務(wù),執(zhí)行過后通過handler使用拒絕策略對該任務(wù)進(jìn)行處理,整個方法返回 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); /* * 獲取線程池中的有效線程數(shù),如果數(shù)量是0,則執(zhí)行addWorker方法 這里傳入的參數(shù)表示: * 第一個參數(shù)為null,表示在線程池中創(chuàng)建一個線程,但不去啟動; * 第二個參數(shù)為false,將線程池的有限線程數(shù)量的上限設(shè)置為maximumPolSize,添加線程時根據(jù)maximumPoolSize來判斷; * 如果判斷workerCount大于0,則直接返回,在workQueue中新增的comman 會在將來的某個時刻被執(zhí)行。 */ //因為 任務(wù)已經(jīng)被添加到workQueue中了,所以worker在執(zhí)行的時候,會直接從workQueue中 獲取任務(wù)。 else if (workerCountOf(recheck) == 0) //執(zhí)行到這里說明任務(wù)已經(jīng)添加到阻塞隊列里了,最大線程數(shù)也未飽和,則創(chuàng)建一個新的線程去阻塞隊列里拿任務(wù) //這步操作也就是創(chuàng)建一個線程,但并沒有傳入任務(wù),因為任務(wù)已經(jīng)被添加到workQueue中了,所以worker在執(zhí)行的時候,會直接從workQueue中 獲取任務(wù)。 //為什么要這樣做呢?是為了保證線程池在RUNNING狀態(tài)下必須要有一個線程來執(zhí)行任務(wù)。 addWorker(null, false); } /* * 如果執(zhí)行到這里,有兩種情況: * 1. 線程池已經(jīng)不是RUNNING狀態(tài); * 2. 線程池是RUNNING狀態(tài),但workerCount >= corePoolSize并且workQueue已滿。 * 這時,再次調(diào)用addWorker方法,但第二個參數(shù)傳入為false,將線程池的 有限線程數(shù)量的上限設(shè)置為maximumPoolSize; * 如果失敗則拒絕該任務(wù) */ else if (!addWorker(command, false)) reject(command); }
/** * 檢查是否可以根據(jù)當(dāng)前線程池的狀態(tài)添加一個新的工作線程去執(zhí)行任務(wù)。 * addWorker(runnable,true)表示從核心工作線程數(shù)中分配線程執(zhí)行傳進(jìn)來的任務(wù); * addWorker(null,false)表示從最大線程數(shù)中分配線程執(zhí)行阻塞隊列中的任務(wù)。 * 線程池如果停止或者關(guān)閉則直接返回 false,如果線程池創(chuàng)建新線程失敗同樣也會返回 false。 * 如果創(chuàng)建線程失敗,或者線程工廠返回 null,或者執(zhí)行當(dāng)前 addWorker()的線程拋出異常,(注意是當(dāng)前線程拋出異常,當(dāng)前線程拋出異常只與當(dāng)前任務(wù)有關(guān),并不影響其他任務(wù)的執(zhí)行),線程池的相關(guān)屬性會立即回滾 */ private boolean addWorker(Runnable firstTask, boolean core) { retry: // 外層 for 循環(huán)就是為了能給任務(wù)分配線程做準(zhǔn)備,判斷狀態(tài)--> 原子遞增 workerCount // 直到線程池狀態(tài)不符合條件返回 false ,或者自增成功跳出 for 循環(huán) // 同樣的,getTask()從阻塞隊列中獲取任務(wù)的時候也是這么個邏輯,先對 workerCount 原子遞減,再去執(zhí)行任務(wù) for (;;) { //可以看到,每一步操作都會對線程池的狀態(tài)參數(shù)做判斷 int c = ctl.get(); int rs = runStateOf(c); //也是對線程池狀態(tài),隊列狀態(tài)做檢查 /** * 這里的狀態(tài)判斷也很好理解: * 線程池狀態(tài)為SHUTDOWN,不會再接受新的任務(wù)了,返回 false * 想城池狀態(tài)不為SHUTDOWN,傳進(jìn)來的任務(wù)為空,并且阻塞隊列里也沒任務(wù),那還執(zhí)行個錘子任務(wù),同樣返回 false */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //前面已經(jīng)判斷過滿足為任務(wù)分配一個線程去執(zhí)行任務(wù) //這個 for 循環(huán)就是為了創(chuàng)建任務(wù)做準(zhǔn)備,先去原子性的遞增 workerCount,workerCount 遞增成功了才會去真正的為任務(wù)分配線程去執(zhí)行 for (;;) { //當(dāng)前工作線程數(shù) int wc = workerCountOf(c); //當(dāng)前工作線程數(shù)大于corePoolSize 或者 maximumPoolSize (跟誰比較就是根據(jù)傳進(jìn)來的參數(shù) core 判斷), //說明也沒有分配的線程可以執(zhí)行任務(wù)了,返回 false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; /** * 執(zhí)行到這里說明滿足條件了,可以分配出來線程去執(zhí)行任務(wù)了 * 嘗試增加workerCount,如果成功,則跳出第一個for循環(huán) * 這里是進(jìn)行 CAS 自增 ctl 的 workerCount(先把數(shù)量自增,再跳出 for 循環(huán)創(chuàng)建新的線程去執(zhí)行任務(wù)) * 該方法內(nèi)部也是調(diào)用了原子類 AtomicInteger.compareAndSet()方法,保證原子遞增 */ if (compareAndIncrementWorkerCount(c)) break retry; //如果嘗試添加新的工作線程失敗則會繼續(xù)判斷當(dāng)前線程池的狀態(tài),狀態(tài)滿足繼續(xù)嘗試為當(dāng)前線程分配工作線程 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //跳出 for 循環(huán)之后說明線程池的工作線程數(shù) workerCount 已經(jīng)調(diào)節(jié)過了,接下來要做到就是真正的分配線程,執(zhí)行任務(wù) boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //Worker 對象是個內(nèi)部類,其實就是用threatFactory 生成一個新的線程 //繼承 AQS 類,實現(xiàn)Runable 接口,重寫 run()方法,重寫的 run()方法也很重要,后面會講 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //加鎖保證同步 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //還是先進(jìn)行一通的線程池狀態(tài)檢查 int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //這個 workers 是個 HashSet,線程池也是通過維護(hù)這個 workers 控制任務(wù)的執(zhí)行 workers.add(w); int s = workers.size(); if (s > largestPoolSize) //largestPoolSize記錄著線程池中出現(xiàn)過的最大線程數(shù)量 largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //終于,調(diào)用線程的 start() 方法 t.start(); workerStarted = true; } } } finally { //如果創(chuàng)建線程失敗,就要回滾線程池的狀態(tài)了 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
回頭再看 Worker 類,線程池中的每一個線程被封裝成一個Worker對象,ThreadPool維護(hù)的其實就是一組 Worker對象(HashSet)。
Worker類繼承了AQS,并實現(xiàn)了Runnable接口,注意其中的firstTask和thread屬 性:firstTask用它來保存?zhèn)魅氲娜蝿?wù);thread是在調(diào)用構(gòu)造方法時通過ThreadFactory來創(chuàng) 建的線程,是用來處理任務(wù)的線程。
Worker繼承了AQS,使用AQS來實現(xiàn)獨占鎖的功能。為什么不使用ReentrantLock來 實現(xiàn)呢?可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的:
lock方法一旦獲取了獨占鎖,表示當(dāng)前線程正在執(zhí)行任務(wù)中;
如果正在執(zhí)行任務(wù),則不應(yīng)該中斷線程;
如果該線程現(xiàn)在不是獨占鎖的狀態(tài),也就是空閑的狀態(tài),說明它沒有在處理任務(wù), 這時可以對該線程進(jìn)行中斷;
線程池在執(zhí)行shutdown方法或tryTerminate方法時會調(diào)用interruptIdleWorkers 方法來中斷空閑的線程,interruptIdleWorkers方法會使用tryLock方法來判斷線程 池中的線程是否是空閑狀態(tài);
之所以設(shè)置為不可重入,是因為我們不希望任務(wù)在調(diào)用像setCorePoolSize這樣的 線程池控制方法時重新獲取鎖。如果使用ReentrantLock,它是可重入的,這樣如果 在任務(wù)中調(diào)用了如setCorePoolSize這類線程池控制的方法,會中斷正在運(yùn)行的線程。 所以,Worker繼承自AQS,用于判斷線程是否空閑以及是否可以被中斷。 此外,在構(gòu)造方法中執(zhí)行了setState(-1);,把state變量設(shè)置為-1,為什么這么做呢? 是因為AQS中默認(rèn)的state是0,如果剛創(chuàng)建了一個Worker對象,還沒有執(zhí)行任務(wù)時,這時就不應(yīng)該被中斷,看一下tryAquire方法:
tryAcquire(int unused) 方法
/** * 用于判斷線程是否空閑以及是否可以被中斷 */ protected boolean tryAcquire(int unused) { //cas 修改狀態(tài),不可重入 if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
tryAcquire方法是根據(jù)state是否是0來判斷的,所以,將state設(shè)置為-1是 為了禁止在執(zhí)行任務(wù)前對線程進(jìn)行中斷。
正因為如此,在runWorker方法中會先調(diào)用Worker對象的unlock方法將state設(shè)置為 0。
runWorker(Worker w)方法
/** * Worker 類實現(xiàn) Runnable 接口,重寫的 run()方法 */ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; //允許中斷 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //while 循環(huán)就是不斷的去執(zhí)行任務(wù),當(dāng)自己的任務(wù)(firstTask)執(zhí)行完之后依然會從阻塞隊列里拿任務(wù)去執(zhí)行,就這樣的操作保證了線程的重用 //task 為空則從阻塞隊列中獲取任務(wù) while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt /** * 如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài),如果不是的話,則要保證當(dāng)前線程不是中斷狀態(tài) * 這里為什么要這么做呢?考慮在執(zhí)行該if語句期間可能也執(zhí)行了shutdownNow方法,shutdownNow方法會 把狀態(tài)設(shè)置為STOP, * 回顧一下STOP狀態(tài):不能接受新任務(wù),也不處理隊列中的任務(wù),會中斷正在處理任務(wù)的線程。在線程池處于 RUNNING 或 SHUTDOWN 狀態(tài)時, * 調(diào)用 shutdownNow() 方法會使線程池進(jìn)入到STOP狀態(tài)。 * STOP狀態(tài)要中斷線程池中的所有線程,而這里使用Thread.interrupted()來判斷是否中斷是為了確保在 RUNNING或者SHUTDOWN狀態(tài)時線程是非中斷狀態(tài)的, * 因為Thread.interrupted()方法會重置中斷的狀態(tài)。 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
總結(jié)一下runWorker方法的執(zhí)行過程:
while循環(huán)不斷地通過getTask()方法獲取任務(wù);
getTask()方法從阻塞隊列中取任務(wù);
如果線程池正在停止,那么要保證當(dāng)前線程是中斷狀態(tài),否則要保證當(dāng)前線程不是 中斷狀態(tài);
調(diào)用task.run()執(zhí)行任務(wù);
如果task為null則跳出循環(huán),執(zhí)行processWorkerExit()方法;
runWorker方法執(zhí)行完畢,也代表著Worker中的run方法執(zhí)行完畢,銷毀線程。
getTask()方法
/** * 從阻塞隊列中獲取任務(wù),返回值是 Runnable * 線程池狀態(tài)不滿足執(zhí)行條件時直接返回 null */ private Runnable getTask() { //timeOut變量的值表示上次從阻塞隊列中取任務(wù)時是否超時 boolean timedOut = false; // Did the last poll() time out? //這里兩個 for 循環(huán)操作和 addWorker() 方法里的兩個 for 循環(huán)操作思想一樣 for (;;) { int c = ctl.get(); int rs = runStateOf(c); //仍然檢查線程池狀態(tài) // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); //timed變量用于判斷是否需要進(jìn)行超時控制 //allowCoreThreadTimeOut默認(rèn)是false,也就是核心線程不允許進(jìn)行超時 //wc > corePoolSize,表示當(dāng)前線程池中的線程數(shù)量大于核心線程數(shù)量 //對于超過核心線程數(shù)量的這些線程,需要進(jìn)行超時控制 // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //和 addWorker() 里流程一樣,也是先對線程池中 workerCount 進(jìn)行控制,再進(jìn)行后面的執(zhí)行任務(wù)操作 //滿足條件則 workerCount 數(shù)量減一 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //根據(jù)timed來判斷,如果為true,則通過阻塞隊列的poll方法進(jìn)行超時控制,如果在keepAliveTime時間內(nèi)沒有獲取到任務(wù),則返回null //否則通過take方法,如果這時隊列為空,則take方法會阻塞直到隊列不為空 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 如果 r == null,說明已經(jīng)超時,timedOut設(shè)置為true timedOut = true; } catch (InterruptedException retry) { // 如果獲取任務(wù)時當(dāng)前線程發(fā)生了中斷,則設(shè)置timedOut為false并返回 timedOut = false; } } } processWorkerExit() /** * getTask方法返回null時,在runWorker方法中會跳出while循環(huán),然后會執(zhí)行 processWorkerExit方法。 * 做線程池的善后工作 */ private void processWorkerExit(Worker w, boolean completedAbruptly) { //如果completedAbruptly值為true,則說明線程執(zhí)行時出現(xiàn)了異常,需要將workerCount減1 //如果線程執(zhí)行時沒有出現(xiàn)異常,說明在getTask()方法中已經(jīng)已經(jīng)對workerCount減1了,這里就不需要再減了 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //統(tǒng)計完成的任務(wù)數(shù) completedTaskCount += w.completedTasks; // 從workers中移除,也就表示著從線程池中移除了一個工作線程 // workers 是前面提到的 HashSet,線程池就是通過維護(hù)這個 worker()來保證線程池運(yùn)作的 workers.remove(w); } finally { mainLock.unlock(); } // 根據(jù)線程池狀態(tài)進(jìn)行判斷是否結(jié)束線程池 tryTerminate(); /** * 當(dāng)線程池是RUNNING或SHUTDOWN狀態(tài)時,如果worker是異常結(jié)束,那么會直接addWorker; * 如果allowCoreThreadTimeOut=true,并且等待隊列有任務(wù),至少保留一個worker * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize */ int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
至此,processWorkerExit執(zhí)行完之后,工作線程被銷毀,以上就是整個工作線程的生 命周期,從execute方法開始,Worker使用ThreadFactory創(chuàng)建新的工作線程, runWorker通過getTask獲取任務(wù),然后執(zhí)行任務(wù),如果getTask返回null,進(jìn)入 processWorkerExit方法,整個線程結(jié)束
線程池如何實現(xiàn)線程重用的?
就是在重寫的 run()方法里,通過 while 循環(huán),執(zhí)行完 firstTask 之后依然從阻塞隊列里獲取任務(wù)去執(zhí)行。
線程超時怎么處理?
當(dāng)前面任務(wù)拋出異常,后面的線程還會執(zhí)行嗎? 答案是會。也是 while 循環(huán)里找答案,當(dāng)前線程拋出異常只會對當(dāng)前線程產(chǎn)生影響,對線程池里其他任務(wù)不會有影響。
什么時候會銷毀?
是runWorker方法執(zhí)行完之后,也就是Worker中的run方法執(zhí)行完,由JVM自動回收。
阻塞隊列選取?在JDK中提供了如下阻塞隊列:
ArrayBlockingQueue:基于數(shù)組結(jié)構(gòu)的有界阻塞隊列,按FIFO排序任務(wù);
LinkedBlockingQuene:基于鏈表結(jié)構(gòu)的阻塞隊列,按FIFO排序任務(wù),吞吐量通常要高于ArrayBlockingQuene;
SynchronousQuene:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于 LinkedBlockingQuene;
PriorityBlockingQuene:具有優(yōu)先級的無界阻塞隊列;
丟棄策略選?。烤€程池提供了4種策略:
AbortPolicy:直接拋出異常,默認(rèn)策略;
CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
DiscardPolicy:直接丟棄任務(wù);
線程數(shù)如何設(shè)置?
一般設(shè)法是會根據(jù)我們?nèi)蝿?wù)的類型去設(shè)置,簡單分為: CPU 密集型 :CPU 核數(shù) + 1 IO 密集型:2*CPU 核數(shù) + 1
《Java并發(fā)編程實戰(zhàn)》中最原始的公式是這樣的: Nthreads=Ncpu?Ucpu?(1+WC)Nthreads=Ncpu?Ucpu?(1+CW);
Ncpu代表CPU的個數(shù),
Ucpu代表CPU利用率的期望值(0
WCCW仍然是等待時間與計算時間的比例。
上面提供的公式相當(dāng)于目標(biāo)CPU利用率為100%。 通常系統(tǒng)中不止一個線程池,所以實際配置線程數(shù)應(yīng)該將目標(biāo)CPU利用率計算進(jìn)去。
關(guān)于如何進(jìn)行ThreadPoolExecutor 源碼解析問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識。