????在Java中比較常見的兩種創(chuàng)建線程的方法:繼承Thread類和實現(xiàn)Runnable接口。但是這兩種方法有個缺點就是無法獲取線程執(zhí)行后的結(jié)果。所以Java之后提供了Future和Runnable接口,用于實現(xiàn)獲取線程執(zhí)行結(jié)果。下面開始源碼分析:
成都創(chuàng)新互聯(lián)公司自2013年起,先為西烏珠穆沁等服務(wù)建站,西烏珠穆沁等地企業(yè),進行企業(yè)商務(wù)咨詢服務(wù)。為西烏珠穆沁企業(yè)網(wǎng)站制作PC+手機+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問題。
1、Callable接口
public interface Callable {
//返回接口,或者拋出異常
V call() throws Exception;
}
2、Future接口
public interface Future {
/***嘗試取消任務(wù),如果任務(wù)已經(jīng)完成、已取消或其他原因無法取消,則失敗。
** 1、如果任務(wù)還沒開始執(zhí)行,則該任務(wù)不應(yīng)該運行
** 2、如果任務(wù)已經(jīng)開始執(zhí)行,由參數(shù)mayInterruptIfRunning來決定執(zhí)行該任務(wù)的線程是否應(yīng)該被中斷,這只是終止任務(wù)的一種嘗試。若mayInterruptIfRunning為true,則會立即中斷執(zhí)行任務(wù)的線程并返回true,若mayInterruptIfRunning為false,則會返回true且不會中斷任務(wù)執(zhí)行線程。
** 3、調(diào)用這個方法后,以后對isDone方法調(diào)用都返回true。
** 4、如果這個方法返回true,以后對isCancelled返回true。
***/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 判斷任務(wù)是否被取消了,如果調(diào)用了cance()則返回true
*/
boolean isCancelled();
/**
*如果任務(wù)完成,則返回ture
*任務(wù)完成包含正常終止、異常、取消任務(wù)。在這些情況下都返回true
*/
boolean isDone();
/**
* 線程阻塞,直到任務(wù)完成,返回結(jié)果
* 如果任務(wù)被取消,則引發(fā)CancellationException
* 如果當(dāng)前線程被中斷,則引發(fā)InterruptedException
* 當(dāng)任務(wù)在執(zhí)行的過程中出現(xiàn)異常,則拋出ExecutionException
*/
V get() throws InterruptedException, ExecutionException;
/**
* 線程阻塞一定時間等待任務(wù)完成,并返回任務(wù)執(zhí)行結(jié)果,如果則超時則拋出TimeoutException
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
3、FutureTask
????Future只是一個接口,不能直接用來創(chuàng)建對象,其實現(xiàn)類是FutureTask,JDK1.8修改了FutureTask的實現(xiàn),JKD1.8不再依賴AQS來實現(xiàn),而是通過一個volatile變量state以及CAS操作來實現(xiàn)。FutureTask結(jié)構(gòu)如下所示:
public class FutureTask implements RunnableFuture {
/*
* 當(dāng)前任務(wù)運行狀態(tài)
* NEW -> COMPLETING -> NORMAL(正常結(jié)束,返回結(jié)果)
* NEW -> COMPLETING -> EXCEPTIONAL(返回異常結(jié)果)
* NEW -> CANCELLED(任務(wù)被取消,無結(jié)果)
* NEW -> INTERRUPTING -> INTERRUPTED(任務(wù)被打斷,無結(jié)果)
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** 將要被執(zhí)行的任務(wù) */
private Callable callable;
/** 存放執(zhí)行結(jié)果,用于get()方法獲取結(jié)果,也可能用于get()方法拋出異常 */
private Object outcome; // non-volatile, protected by state reads/writes
/** 執(zhí)行任務(wù)Callable的線程; */
private volatile Thread runner;
/** 棧結(jié)構(gòu)的等待隊列,該節(jié)點是棧中最頂層的節(jié)點 */
private volatile WaitNode waiters;
為了后面更好的分析FutureTask的實現(xiàn),這里有必要解釋下各個狀態(tài)。
NEW:表示是個新的任務(wù)或者還沒被執(zhí)行完的任務(wù)。這是初始狀態(tài)。
COMPLETING:任務(wù)已經(jīng)執(zhí)行完成或者執(zhí)行任務(wù)的時候發(fā)生異常,但是任務(wù)執(zhí)行結(jié)果或者異常原因還沒有保存到outcome字段(outcome字段用來保存任務(wù)執(zhí)行結(jié)果,如果發(fā)生異常,則用來保存異常原因)的時候,狀態(tài)會從NEW變更到COMPLETING。但是這個狀態(tài)會時間會比較短,屬于中間狀態(tài)。
NORMAL:任務(wù)已經(jīng)執(zhí)行完成并且任務(wù)執(zhí)行結(jié)果已經(jīng)保存到outcome字段,狀態(tài)會從COMPLETING轉(zhuǎn)換到NORMAL。這是一個最終態(tài)。
EXCEPTIONAL:任務(wù)執(zhí)行發(fā)生異常并且異常原因已經(jīng)保存到outcome字段中后,狀態(tài)會從COMPLETING轉(zhuǎn)換到EXCEPTIONAL。這是一個最終態(tài)。
CANCELLED:任務(wù)還沒開始執(zhí)行或者已經(jīng)開始執(zhí)行但是還沒有執(zhí)行完成的時候,用戶調(diào)用了cancel(false)方法取消任務(wù)且不中斷任務(wù)執(zhí)行線程,這個時候狀態(tài)會從NEW轉(zhuǎn)化為CANCELLED狀態(tài)。這是一個最終態(tài)。
INTERRUPTING: 任務(wù)還沒開始執(zhí)行或者已經(jīng)執(zhí)行但是還沒有執(zhí)行完成的時候,用戶調(diào)用了cancel(true)方法取消任務(wù)并且要中斷任務(wù)執(zhí)行線程但是還沒有中斷任務(wù)執(zhí)行線程之前,狀態(tài)會從NEW轉(zhuǎn)化為INTERRUPTING。這是一個中間狀態(tài)。
INTERRUPTED:調(diào)用interrupt()中斷任務(wù)執(zhí)行線程之后狀態(tài)會從INTERRUPTING轉(zhuǎn)換到INTERRUPTED。這是一個最終態(tài)。
有一點需要注意的是,所有值大于COMPLETING的狀態(tài)都表示任務(wù)已經(jīng)執(zhí)行完成(任務(wù)正常執(zhí)行完成,任務(wù)執(zhí)行異?;蛘呷蝿?wù)被取消)。
3.1、FutureTask構(gòu)造方法
// Callable 構(gòu)造方法
public FutureTask(Callable callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
/**
* runnable 構(gòu)造函數(shù)
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
3.2、get()方法阻塞隊列
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
3.3、run方法解析
//Executor調(diào)用執(zhí)行任務(wù)
//
public void run() {
//狀態(tài)如果不是NEW,說明任務(wù)或者已經(jīng)執(zhí)行過,或者已經(jīng)被取消,直接返返回,當(dāng)然如果執(zhí)行任務(wù)的線程runner不為null,說明任務(wù)正在執(zhí)行
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
//執(zhí)行任務(wù)
try {
Callable c = callable;
//判斷任務(wù)是否為null,狀態(tài)是否為NEW
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call(); //暫時存放任務(wù)執(zhí)行結(jié)果
ran = true;
} catch (Throwable ex) {
result = null;
ran = false; //執(zhí)行失敗
//通過CAS算法設(shè)置返回值(COMPLETING)和狀態(tài)值(EXCEPTIONAL)
setException(ex);
}
//執(zhí)行成功通過CAS(UNSAFE)設(shè)置返回值(COMPLETING)和狀態(tài)值(NORMAL)
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
//將任務(wù)runner設(shè)置為null,避免發(fā)生并發(fā)調(diào)用run()方法
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
//須重新讀取任務(wù)狀態(tài),避免不可達(dá)(泄漏)的中斷
int s = state;
//確保cancle(ture)操作時,運行中的任務(wù)能接收到中斷指令
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
3.4、setException(Throwable t)解析
//發(fā)生異常時,將返回值設(shè)置到outcome(=COMPLETING)中,并更新任務(wù)狀態(tài)(EXCEPTIONAL)
protected void setException(Throwable t) {
//調(diào)用UNSAFE類封裝的CAS算法,設(shè)置值
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
//喚醒因等待返回值而阻塞的線程
finishCompletion();
}
}
3.5、set(V v)方法解析
//任務(wù)正常完成,將返回值設(shè)置到outcome(=COMPLETING)中,并更新任務(wù)狀態(tài)(=NORMAL)
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
3.6、 finishCompletion解析
//移除所有等待線程并發(fā)出信號,調(diào)用done(),以及將任務(wù)callable清空
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
//循環(huán)喚醒阻塞線程,直到阻塞隊列為空
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
//一直到阻塞隊列為空,跳出循環(huán)
if (next == null)
break;
q.next = null; // unlink to help gc 方便gc在適當(dāng)?shù)臅r候回收
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
3.7、handlePossibleCancellationInterrupt 方法解析
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
//自旋等待cancle(true)結(jié)束(中斷結(jié)束)
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
3.8、cancle方法解析
//取消任務(wù)執(zhí)行
public boolean cancel(boolean mayInterruptIfRunning) {
//對NEW狀態(tài)的任務(wù)進行中斷,并根據(jù)參數(shù)設(shè)置state
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
//任務(wù)已完成(已發(fā)出中斷或已取消)
return false;
//中斷線程
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {//cancel(true)
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
//通過CAS算法,更新狀態(tài)
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//喚醒阻塞線程
finishCompletion();
}
return true;
}
3.9 get方法解析
/**
* 獲取執(zhí)行結(jié)果
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
//假如任務(wù)還沒有執(zhí)行完,則則塞線程,直至任務(wù)執(zhí)行完成(結(jié)果已經(jīng)存放到對應(yīng)變量中)
s = awaitDone(false, 0L);
//返回結(jié)果
return report(s);
}
/**
* 獲取任務(wù)執(zhí)行結(jié)果,指定時間結(jié)束,則超時返回,不再阻塞
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
3.10 awaitDone解析
/**
* Awaits completion or aborts on interrupt or timeout.
* 如英文注釋:等待任務(wù)執(zhí)行完畢或任務(wù)中斷或任務(wù)超時
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
//循環(huán)等待
for (;;) {
//線程已經(jīng)中斷,則移除等待任務(wù)
if (Thread.interrupted()) {
removeWaiter(q);
//移除當(dāng)前任務(wù)后,拋出中斷異常
throw new InterruptedException();
}
//任務(wù)已經(jīng)完成,則返回任務(wù)狀態(tài),并對當(dāng)前任務(wù)清場處理
int s = state;
if (s > COMPLETING) {
if (q != null) //任務(wù)不為空,則將執(zhí)行線程設(shè)為null,避免并發(fā)下重復(fù)執(zhí)行
q.thread = null;
return s;
}
//設(shè)置結(jié)果,很快就能完成,自旋等待
else if (s == COMPLETING) // cannot time out yet
Thread.yield(); //任務(wù)提前退出
//正在執(zhí)行或者還沒開始,則構(gòu)建新的節(jié)點
else if (q == null)
q = new WaitNode();
//判斷是否入隊,新節(jié)點一般在下一次循環(huán)入隊列阻塞
else if (!queued)
//沒有入隊列,設(shè)置q.next=waiters,并將waiters設(shè)為q
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
//假如有超時限制,則判斷是否超時
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
//超時則將任務(wù)節(jié)點從阻塞隊列中移除,并返回狀態(tài)
removeWaiter(q);
return state;
}
//阻塞調(diào)用get方法的線程,有超時限制
LockSupport.parkNanos(this, nanos);
}
else
//阻塞調(diào)用get方法的線程,無超時限制
LockSupport.park(this);
}
}
3.11 removeWaiter方法解析
//移除任務(wù)節(jié)點
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}