池技術(shù)是性能優(yōu)化的重要手段:連接池,線程池已經(jīng)是開(kāi)發(fā)中的標(biāo)配了。面試中這個(gè)知識(shí)點(diǎn)也是高頻問(wèn)題。抽空學(xué)習(xí)了Java的ThreadPoolExecutor, 把學(xué)習(xí)的思路記錄一下。
專業(yè)成都網(wǎng)站建設(shè)公司,做排名好的好網(wǎng)站,排在同行前面,為您帶來(lái)客戶和效益!創(chuàng)新互聯(lián)建站為您提供成都網(wǎng)站建設(shè),五站合一網(wǎng)站設(shè)計(jì)制作,服務(wù)好的網(wǎng)站設(shè)計(jì)公司,成都網(wǎng)站設(shè)計(jì)、做網(wǎng)站負(fù)責(zé)任的成都網(wǎng)站制作公司!
由于線程的創(chuàng)建和銷毀都是系統(tǒng)層面的操作,涉及到系統(tǒng)資源的占用和回收,所以創(chuàng)建線程是一個(gè)重量級(jí)的操作。為了提升性能,就引入了線程池;即線程復(fù)用。Java不僅提供了線程池,還提供了線程池的操作工具類。 我們由淺入深了解一下。
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadDemo {
static class Worker implements Runnable{
public void run(){
System.out.println("run work "+Thread.currentThread().getName() );
}
}
public static void main(String[] args) {
Worker w1 = new Worker();
ExecutorService service = Executors.newFixedThreadPool(10);
service.submit(w1);
service.shutdown();
}
}
看Executors的源碼,發(fā)現(xiàn)其使用的是ThreadPoolExecutor。 研究一下ThreadPoolExecutor, 發(fā)現(xiàn)其默認(rèn)的參數(shù)Executors.defaultThreadFactory(), defaultHandler
。線程池的創(chuàng)建工廠默認(rèn)如下:
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
也就是自定義了一下線程的名字,將線程歸到了同一個(gè)組。
線程池的defaultHandler
如下:
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always.
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
也就是說(shuō),當(dāng)提交的任務(wù)超過(guò)線程池的容量,那么就會(huì)拋出RejectedExecutionException
異常。 但是使用Executors會(huì)發(fā)現(xiàn),并沒(méi)有拋出異常。這是因?yàn)镋xecutors創(chuàng)建BlockingQueue
時(shí)沒(méi)有指定隊(duì)列的容量。
換言之,線程池能容納的任務(wù)數(shù)量最多為maximumPoolSize
+ queueSize
。 比如線程池如下new ThreadPoolExecutor(10, 11, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(5));
則最大任務(wù)數(shù)量為16個(gè),超過(guò)16個(gè)就會(huì)拋出異常。
線程池中線程數(shù)量有多少呢?先運(yùn)行如下的代碼:
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
public class ThreadDemo {
static class Worker implements Runnable{
public void run(){
try {
Thread.sleep(1000);
System.out.println("done work "+Thread.currentThread().getName() );
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
Worker w1 = new Worker();
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 4,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue(5));
for(int i=0;i<9;i++) {
executor.submit(w1);
}
executor.shutdown();
}
}
可以發(fā)現(xiàn)最多開(kāi)啟了4個(gè)線程。 這4個(gè)線程就對(duì)應(yīng)了4個(gè)Worker的實(shí)例。
看worker的源碼可以發(fā)現(xiàn),它兼?zhèn)銩QS和Runnable兩個(gè)特性。 我們只關(guān)注它Runnable的特性。
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
在這個(gè)線程中不斷從隊(duì)列中獲取任務(wù),然后執(zhí)行。 worker中反復(fù)出現(xiàn)的ctl
又是什么呢?
ctl是兩個(gè)變量組合,一個(gè)32位的int, 高3位用于控制線程池的狀態(tài),低29位用于記錄線程池啟動(dòng)線程的數(shù)量。
所以有這么幾個(gè)方法
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
整個(gè)線程池的核心,就worker
和ctl
的理解。 有點(diǎn)復(fù)雜,主要是集中了:
1. AQS
2. BlockingQueue
這也是為什么我建議先學(xué)AQS,后學(xué)線程池的實(shí)現(xiàn)原理。