這篇文章主要介紹“java線程池的狀態(tài)有幾種”,在日常操作中,相信很多人在java線程池的狀態(tài)有幾種問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”java線程池的狀態(tài)有幾種”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
巴南網(wǎng)站建設公司創(chuàng)新互聯(lián)建站,巴南網(wǎng)站設計制作,有大型網(wǎng)站制作公司豐富經(jīng)驗。已為巴南1000+提供企業(yè)網(wǎng)站建設服務。企業(yè)網(wǎng)站搭建\成都外貿網(wǎng)站制作要多少錢,請找那個售后服務好的巴南做網(wǎng)站的公司定做!
// 高3位存放狀態(tài) 低29位存線程數(shù)量 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; RUNNING = ‐1 << COUNT_BITS; //高3位為111 SHUTDOWN = 0 << COUNT_BITS; //高3位為000 STOP = 1 << COUNT_BITS; //高3位為001 TIDYING = 2 << COUNT_BITS; //高3位為010 TERMINATED = 3 << COUNT_BITS; //高3位為011
RUNNING 初始化以后就是這種狀態(tài) SHUTDOWN 不再接收新任務 把已有任務完成后就變狀態(tài) STOP 不接收新任務,不處理已添加的任務,并且會中斷正在處理的任務。 TIDYING(收拾/整理) 沒有任務沒有線程 有鉤子方法 terminated() TERMINATED 線程池的具體實現(xiàn) ThreadPoolExecutor 默認線程池 ScheduledThreadPoolExecutor 定時線程池 提交任務
public void execute() //提交任務無返回值 public Future> submit() //任務執(zhí)行完成后有返回值
###參數(shù)解釋
線程池維護線程所允許的空閑時間。當線程池中的線程數(shù)量大于corePoolSize的時 候,如果這時沒有新的任務提交,核心線程外的線程不會立即銷毀,而是會等待,直到等待 的時間超過了keepAliveTime; unit keepAliveTime的單位;
了如下阻塞隊列: 1、ArrayBlockingQueue:基于數(shù)組結構的有界阻塞隊列,按FIFO排序任務; 2、LinkedBlockingQuene:基于鏈表結構的阻塞隊列,按FIFO排序任務,吞 吐量通常要高于ArrayBlockingQuene; 3、SynchronousQuene:一個不存儲元素的阻塞隊列,每個插入操作必須等到 另一個線程調用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于 LinkedBlockingQuene; 4、priorityBlockingQuene:具有優(yōu)先級的無界阻塞隊列; threadFactory 它是ThreadFactory類型的變量,用來創(chuàng)建新線程。默認使用 Executors.defaultThreadFactory() 來創(chuàng)建線程。使用默認的ThreadFactory來創(chuàng)建線程 時,會使新創(chuàng)建的線程具有相同的NORM_PRIORITY優(yōu)先級并且是非守護線程,同時也設 置了線程的名稱。
線程池的飽和策略,當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務,必 須采取一種策略處理該任務,線程池提供了4種策略: 1、AbortPolicy:直接拋出異常,默認策略; 2、CallerRunsPolicy:用調用者所在的線程來執(zhí)行任務; 3、DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,并執(zhí)行當前任務; 4、DiscardPolicy:直接丟棄任務; 上面的4種策略都是ThreadPoolExecutor的內部類。 當然也可以根據(jù)應用場景實現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如 記錄日志或持久化存儲不能處理的任務。
public long getTaskCount() //線程池已執(zhí)行與未執(zhí)行的任務總數(shù) public long getCompletedTaskCount() //已完成的任務數(shù) public int getPoolSize() //線程池當前的線程數(shù) public int getActiveCount() //線程池中正在執(zhí)行任務的線程數(shù)量
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /** * 記錄著線程池狀態(tài)和線程數(shù)量 */ int c = ctl.get(); /** * 如果當前線程數(shù)小于核心線程數(shù) 創(chuàng)建線程 */ if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
如果在執(zhí)行execute方法一直處于running狀態(tài) 執(zhí)行流程如下
如果wokeCount < corePoolSize 創(chuàng)建一個新的線程來執(zhí)行任務
如果workerCount >= corePoolSize,且線程池內的阻塞隊列未滿,則將任務添 加到該阻塞隊列中;
如 果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內的阻塞隊列已滿,則創(chuàng)建并啟動一個線程來執(zhí)行新 提交的任務;
如果workerCount >= maximumPoolSize,并且線程池內的阻塞隊列已滿, 則根 據(jù)拒絕策略來處理該任務, 默認的處理方式是直接拋異常。 注意一下addWorker(null,false) 創(chuàng)建線程但是沒有傳入任務,因為之前已經(jīng)把任務放到隊里里面了
firstTask參數(shù) 用 于指定新增的線程執(zhí)行的第一個任務
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) // 如果設置成功 跳出第一個循環(huán) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // 如果狀態(tài)變了 回到的第一個循環(huán) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 用任務創(chuàng)建了worker對象 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); // 放入線程不安全的set里 int s = workers.size(); if (s > largestPoolSize) // 設置最大的池數(shù)量 largestPoolSize = s; // 標記添加worker成功 workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { // 開始執(zhí)行任務 因為thread是利用woker創(chuàng)建的 所以t.start實際就是調用的worker的run方法 t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
線程池中的每一個線程被封裝成一個Worker對象,ThreadPool維護的其實就是一組 Worker對象.
Worker類繼承了AQS,并實現(xiàn)了Runnable接口,注意其中的firstTask和thread屬 性:firstTask用它來保存?zhèn)魅氲娜蝿?thread是在調用構造方法時通過ThreadFactory來創(chuàng) 建的線程,是用來處理任務的線程。
在調用構造方法時,需要把任務傳入,這里通過 getThreadFactory().newThread(this); 來 新 建 一 個 線 程 , newThread 方 法 傳 入 的 參 數(shù) 是 this,因為Worker本身繼承了Runnable接口,也就是一個線程,所以一個Worker對象在 啟動的時候會調用Worker類中的run方法。
Worker繼承了AQS,使用AQS來實現(xiàn)獨占鎖的功能。為什么不使用ReentrantLock來 實現(xiàn)呢?可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的:
lock方法一旦獲取了獨占鎖,表示當前線程正在執(zhí)行任務中;
如果正在執(zhí)行任務,則不應該中斷線程;
如果該線程現(xiàn)在不是獨占鎖的狀態(tài),也就是空閑的狀態(tài),說明它沒有在處理任務, 這時可以對該線程進行中斷;
線程池在執(zhí)行shutdown方法或tryTerminate方法時會調用interruptIdleWorkers 方法來中斷空閑的線程,interruptIdleWorkers方法會使用tryLock方法來判斷線程 池中的線程是否是空閑狀態(tài);
之所以設置為不可重入,是因為我們不希望任務在調用像setCorePoolSize這樣的 線程池控制方法時重新獲取鎖。如果使用ReentrantLock,它是可重入的,這樣如果 在任務中調用了如setCorePoolSize這類線程池控制的方法,會中斷正在運行的線 程。 所以,Worker繼承自AQS,用于判斷線程是否空閑以及是否可以被中斷。 此外,在構造方法中執(zhí)行了setState(-1);,把state變量設置為-1,為什么這么做呢? 是因為AQS中默認的state是0,如果剛創(chuàng)建了一個Worker對象,還沒有執(zhí)行任務時,這時 就不應該被中斷,看一下tryAquire方法:
// tryAcquire方法是根據(jù)state是否是0來判斷的,所以, 將state設置為-1是 為了禁止在執(zhí)行任務前對線程進行中斷。 //正因為如此,在runWorker方法中會先調用Worker對象的unlock方法將state設置為 0。 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
在Worker類中的run方法調用了runWorker方法來執(zhí)行任務,runWorker方法的代碼如下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts 允許被打斷 // 是否異常退出循環(huán) boolean completedAbruptly = true; try { // 如果task是null 從隊列里拿數(shù)據(jù) while (task != null || (task = getTask()) != null) { // 為什么不用ReentrantLock 因為要做到不可重入 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt /** * 如果線程池正在停止,那么要保證當前線程是中斷狀態(tài); * 如果不是的話,則要保證當前線程不是中斷狀態(tài); * Thread.interrupted() 獲得當前的中斷狀態(tài) 復位 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 鉤子方法 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 鉤子方法 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // gettask結果是null 就會走這 runWorker 結束 銷毀線程 processWorkerExit(w, completedAbruptly); } }
總結一下runWorker方法的執(zhí)行過程:
while循環(huán)不斷地通過getTask()方法獲取任務;
getTask()方法從阻塞隊列中取任務;
如果線程池正在停止,那么要保證當前線程是中斷狀態(tài),否則要保證當前線程不是 中斷狀態(tài);
調用task.run()執(zhí)行任務;
如果task為null則跳出循環(huán),執(zhí)行processWorkerExit()方法;
runWorker方法執(zhí)行完畢,也代表著Worker中的run方法執(zhí)行完畢,銷毀線程。 這里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor類中是空的,留給 子類來實現(xiàn)。
private Runnable getTask() { // 判斷從阻塞隊列里獲取任務是否超時 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 如果線程池是非run狀態(tài) // 如果>= stop 說明線程正在停止 或者隊列是空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 減少任務數(shù) decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
到此,關于“java線程池的狀態(tài)有幾種”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
新聞名稱:java線程池的狀態(tài)有幾種
網(wǎng)址分享:http://weahome.cn/article/godieh.html