這篇文章主要介紹“Java線程池工作原理和使用方法是什么”的相關(guān)知識(shí),小編通過實(shí)際案例向大家展示操作過程,操作方法簡(jiǎn)單快捷,實(shí)用性強(qiáng),希望這篇“Java線程池工作原理和使用方法是什么”文章能幫助大家解決問題。
10年積累的成都網(wǎng)站建設(shè)、網(wǎng)站建設(shè)經(jīng)驗(yàn),可以快速應(yīng)對(duì)客戶對(duì)網(wǎng)站的新想法和需求。提供各種問題對(duì)應(yīng)的解決方案。讓選擇我們的客戶得到更好、更有力的網(wǎng)絡(luò)服務(wù)。我雖然不認(rèn)識(shí)你,你也不認(rèn)識(shí)我。但先網(wǎng)站設(shè)計(jì)后付款的網(wǎng)站建設(shè)流程,更有中寧免費(fèi)網(wǎng)站建設(shè)讓你可以放心的選擇與我們合作。
使用線程池通常由以下兩個(gè)原因:
頻繁創(chuàng)建銷毀線程需要消耗系統(tǒng)資源,使用線程池可以復(fù)用線程。
使用線程池可以更容易管理線程,線程池可以動(dòng)態(tài)管理線程個(gè)數(shù)、具有阻塞隊(duì)列、定時(shí)周期執(zhí)行任務(wù)、環(huán)境隔離等。
/**
* @author 一燈架構(gòu)
* @apiNote 線程池示例
**/
public class ThreadPoolDemo {
public static void main(String[] args) {
// 1. 創(chuàng)建線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
3,
3,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy());
// 2. 往線程池中提交3個(gè)任務(wù)
for (int i = 0; i < 3; i++) {
threadPoolExecutor.execute(() -> {
System.out.println(Thread.currentThread().getName() + " 關(guān)注公眾號(hào):一燈架構(gòu)");
});
}
// 3. 關(guān)閉線程池
threadPoolExecutor.shutdown();
}
}
輸出結(jié)果:
pool-1-thread-2 關(guān)注公眾號(hào):一燈架構(gòu)
pool-1-thread-1 關(guān)注公眾號(hào):一燈架構(gòu)
pool-1-thread-3 關(guān)注公眾號(hào):一燈架構(gòu)
線程池的使用非常簡(jiǎn)單:
調(diào)用new ThreadPoolExecutor()構(gòu)造方法,指定核心參數(shù),創(chuàng)建線程池。
調(diào)用execute()方法提交Runnable任務(wù)
使用結(jié)束后,調(diào)用shutdown()方法,關(guān)閉線程池。
再看一下線程池構(gòu)造方法中核心參數(shù)的作用。
線程池共有七大核心參數(shù):
參數(shù)名稱 | 參數(shù)含義 |
---|---|
int corePoolSize | 核心線程數(shù) |
int maximumPoolSize | 最大線程數(shù) |
long keepAliveTime | 線程存活時(shí)間 |
TimeUnit unit | 時(shí)間單位 |
BlockingQueue workQueue | 阻塞隊(duì)列 |
ThreadFactory threadFactory | 線程創(chuàng)建工廠 |
RejectedExecutionHandler handler | 拒絕策略 |
corePoolSize 核心線程數(shù)
當(dāng)往線程池中提交任務(wù),會(huì)創(chuàng)建線程去處理任務(wù),直到線程數(shù)達(dá)到corePoolSize,才會(huì)往阻塞隊(duì)列中添加任務(wù)。默認(rèn)情況下,空閑的核心線程并不會(huì)被回收,除非配置了allowCoreThreadTimeOut=true。
maximumPoolSize 最大線程數(shù)
當(dāng)線程池中的線程數(shù)達(dá)到corePoolSize,阻塞隊(duì)列又滿了之后,才會(huì)繼續(xù)創(chuàng)建線程,直到達(dá)到maximumPoolSize,另外空閑的非核心線程會(huì)被回收。
keepAliveTime 線程存活時(shí)間
非核心線程的空閑時(shí)間達(dá)到了keepAliveTime,將會(huì)被回收。
TimeUnit 時(shí)間單位
線程存活時(shí)間的單位,默認(rèn)是TimeUnit.MILLISECONDS(毫秒),可選擇的有:
TimeUnit.NANOSECONDS(納秒) TimeUnit.MICROSECONDS(微秒) TimeUnit.MILLISECONDS(毫秒) TimeUnit.SECONDS(秒) TimeUnit.MINUTES(分鐘) TimeUnit.HOURS(小時(shí)) TimeUnit.DAYS(天)
workQueue 阻塞隊(duì)列
當(dāng)線程池中的線程數(shù)達(dá)到corePoolSize,再提交的任務(wù)就會(huì)放到阻塞隊(duì)列的等待,默認(rèn)使用的是LinkedBlockingQueue,可選擇的有:
LinkedBlockingQueue(基于鏈表實(shí)現(xiàn)的阻塞隊(duì)列)
ArrayBlockingQueue(基于數(shù)組實(shí)現(xiàn)的阻塞隊(duì)列)
SynchronousQueue(只有一個(gè)元素的阻塞隊(duì)列)
PriorityBlockingQueue(實(shí)現(xiàn)了優(yōu)先級(jí)的阻塞隊(duì)列)
DelayQueue(實(shí)現(xiàn)了延遲功能的阻塞隊(duì)列)
threadFactory 線程創(chuàng)建工廠
用來創(chuàng)建線程的工廠,默認(rèn)的是Executors.defaultThreadFactory(),可選擇的還有Executors.privilegedThreadFactory()實(shí)現(xiàn)了線程優(yōu)先級(jí)。當(dāng)然也可以自定義線程創(chuàng)建工廠,創(chuàng)建線程的時(shí)候最好指定線程名稱,便于排查問題。
RejectedExecutionHandler 拒絕策略
當(dāng)線程池中的線程數(shù)達(dá)到maximumPoolSize,阻塞隊(duì)列也滿了之后,再往線程池中提交任務(wù),就會(huì)觸發(fā)執(zhí)行拒絕策略,默認(rèn)的是AbortPolicy(直接終止,拋出異常),可選擇的有:
AbortPolicy(直接終止,拋出異常)
DiscardPolicy(默默丟棄,不拋出異常)
DiscardOldestPolicy(丟棄隊(duì)列中最舊的任務(wù),執(zhí)行當(dāng)前任務(wù))
CallerRunsPolicy(返回給調(diào)用者執(zhí)行)
線程池的工作原理,簡(jiǎn)單理解如下:
當(dāng)往線程池中提交任務(wù)的時(shí)候,會(huì)先判斷線程池中線程數(shù)是否核心線程數(shù),如果小于,會(huì)創(chuàng)建核心線程并執(zhí)行任務(wù)。
如果線程數(shù)大于核心線程數(shù),會(huì)判斷阻塞隊(duì)列是否已滿,如果沒有滿,會(huì)把任務(wù)添加到阻塞隊(duì)列中等待調(diào)度執(zhí)行。
如果阻塞隊(duì)列已滿,會(huì)判斷線程數(shù)是否小于最大線程數(shù),如果小于,會(huì)繼續(xù)創(chuàng)建最大線程數(shù)并執(zhí)行任務(wù)。
如果線程數(shù)大于最大線程數(shù),會(huì)執(zhí)行拒絕策略,然后結(jié)束。
public class ThreadPoolExecutor extends AbstractExecutorService {
// 線程池的控制狀態(tài),Integer長度是32位,前3位用來存儲(chǔ)線程池狀態(tài),后29位用來存儲(chǔ)線程數(shù)量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 線程個(gè)數(shù)所占的位數(shù)
private static final int COUNT_BITS = Integer.SIZE - 3;
// 線程池的最大容量,2^29-1,約5億個(gè)線程
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 獨(dú)占鎖,用來控制多線程下的并發(fā)操作
private final ReentrantLock mainLock = new ReentrantLock();
// 工作線程的集合
private final HashSet
線程池的控制狀態(tài)ctl用來存儲(chǔ)線程池狀態(tài)和線程個(gè)數(shù),前3位用來存儲(chǔ)線程池狀態(tài),后29位用來存儲(chǔ)線程數(shù)量。
設(shè)計(jì)者多聰明,用一個(gè)變量存儲(chǔ)了兩塊內(nèi)容。
線程池共有5種狀態(tài):
狀態(tài)名稱 | 狀態(tài)含義 | 狀態(tài)作用 |
---|---|---|
RUNNING | 運(yùn)行中 | 線程池創(chuàng)建后默認(rèn)狀態(tài),接收新任務(wù),并處理阻塞隊(duì)列中的任務(wù)。 |
SHUTDOWN | 已關(guān)閉 | 調(diào)用shutdown方法后處于該狀態(tài),不再接收新任務(wù),處理阻塞隊(duì)列中任務(wù)。 |
STOP | 已停止 | 調(diào)用shutdownNow方法后處于該狀態(tài),不再新任務(wù),并中斷所有線程,丟棄阻塞隊(duì)列中所有任務(wù)。 |
TIDYING | 處理中 | 所有任務(wù)已完成,所有工作線程都已回收,等待調(diào)用terminated方法。 |
TERMINATED | 已終止 | 調(diào)用terminated方法后處于該狀態(tài),線程池的最終狀態(tài)。 |
看一下往線程池中提交任務(wù)的源碼,這是線程池的核心邏輯:
// 往線程池中提交任務(wù)
public void execute(Runnable command) {
// 1. 判斷提交的任務(wù)是否為null
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 2. 判斷線程數(shù)是否小于核心線程數(shù)
if (workerCountOf(c) < corePoolSize) {
// 3. 把任務(wù)包裝成worker,添加到worker集合中
if (addWorker(command, true))
return;
c = ctl.get();
}
// 4. 判斷如果線程數(shù)不小于corePoolSize,并且可以添加到阻塞隊(duì)列
if (isRunning(c) && workQueue.offer(command)) {
// 5. 重新檢查線程池狀態(tài),如果線程池不是運(yùn)行狀態(tài),就移除剛才添加的任務(wù),并執(zhí)行拒絕策略
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
// 6. 判斷如果線程數(shù)是0,就創(chuàng)建非核心線程(任務(wù)是null,會(huì)從阻塞隊(duì)列中拉取任務(wù))
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 7. 如果添加阻塞隊(duì)列失敗,就創(chuàng)建一個(gè)Worker
else if (!addWorker(command, false))
// 8. 如果創(chuàng)建Worker失敗說明已經(jīng)達(dá)到最大線程數(shù)了,則執(zhí)行拒絕策略
reject(command);
}
execute方法的邏輯也很簡(jiǎn)單,最終就是調(diào)用addWorker方法,把任務(wù)添加到worker集合中,再看一下addWorker方法的源碼:
// 添加worker
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// 1. 檢查是否允許提交任務(wù)
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
// 2. 使用死循環(huán)保證添加線程成功
for (; ; ) {
int wc = workerCountOf(c);
// 3. 校驗(yàn)線程數(shù)是否超過容量限制
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 4. 使用CAS修改線程數(shù)
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get();
// 5. 如果線程池狀態(tài)變了,則從頭再來
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 6. 把任務(wù)和新線程包裝成一個(gè)worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 7. 加鎖,控制并發(fā)
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 8. 再次校驗(yàn)線程池狀態(tài)是否異常
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 9. 如果線程已經(jīng)啟動(dòng),就拋出異常
if (t.isAlive())
throw new IllegalThreadStateException();
// 10. 添加到worker集合中
workers.add(w);
int s = workers.size();
// 11. 記錄線程數(shù)歷史峰值
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 12. 啟動(dòng)線程
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
方法雖然很長,但是邏輯很清晰。就是把任務(wù)和線程包裝成worker,添加到worker集合,并啟動(dòng)線程。
再看一下worker類的結(jié)構(gòu):
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
// 工作線程
final Thread thread;
// 任務(wù)
Runnable firstTask;
// 創(chuàng)建worker,并創(chuàng)建一個(gè)新線程(用來執(zhí)行任務(wù))
Worker(Runnable firstTask) {
setState(-1);
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
}
再看一下run方法的源碼:
// 線程執(zhí)行入口
public void run() {
runWorker(this);
}
// 線程運(yùn)行核心方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// 1. 如果當(dāng)前worker中任務(wù)是null,就從阻塞隊(duì)列中獲取任務(wù)
while (task != null || (task = getTask()) != null) {
// 加鎖,保證thread不被其他線程中斷(除非線程池被中斷)
w.lock();
// 2. 校驗(yàn)線程池狀態(tài),是否需要中斷當(dāng)前線程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 3. 執(zhí)行run方法
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
// 解鎖
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 4. 從worker集合刪除當(dāng)前worker
processWorkerExit(w, completedAbruptly);
}
}
runWorker方法邏輯也很簡(jiǎn)單,就是不斷從阻塞隊(duì)列中拉取任務(wù)并執(zhí)行。
再看一下從阻塞隊(duì)列中拉取任務(wù)的邏輯:
// 從阻塞隊(duì)列中拉取任務(wù)
private Runnable getTask() {
boolean timedOut = false;
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// 1. 如果線程池已經(jīng)停了,或者阻塞隊(duì)列是空,就回收當(dāng)前線程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 2. 再次判斷是否需要回收線程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 3. 從阻塞隊(duì)列中拉取任務(wù)
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
關(guān)于“Java線程池工作原理和使用方法是什么”的內(nèi)容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業(yè)相關(guān)的知識(shí),可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,小編每天都會(huì)為大家更新不同的知識(shí)點(diǎn)。