真实的国产乱ⅩXXX66竹夫人,五月香六月婷婷激情综合,亚洲日本VA一区二区三区,亚洲精品一区二区三区麻豆

成都創(chuàng)新互聯(lián)網(wǎng)站制作重慶分公司

JDK1.8FutureTask源碼解讀(Future模式)

????在Java中比較常見的兩種創(chuàng)建線程的方法:繼承Thread類和實現(xiàn)Runnable接口。但是這兩種方法有個缺點就是無法獲取線程執(zhí)行后的結(jié)果。所以Java之后提供了Future和Runnable接口,用于實現(xiàn)獲取線程執(zhí)行結(jié)果。下面開始源碼分析:

成都創(chuàng)新互聯(lián)公司自2013年起,先為西烏珠穆沁等服務(wù)建站,西烏珠穆沁等地企業(yè),進行企業(yè)商務(wù)咨詢服務(wù)。為西烏珠穆沁企業(yè)網(wǎng)站制作PC+手機+微官網(wǎng)三網(wǎng)同步一站式服務(wù)解決您的所有建站問題。

1、Callable接口

public interface Callable {
    //返回接口,或者拋出異常
    V call() throws Exception;
}

2、Future接口

public interface Future {

   /***嘗試取消任務(wù),如果任務(wù)已經(jīng)完成、已取消或其他原因無法取消,則失敗。
        ** 1、如果任務(wù)還沒開始執(zhí)行,則該任務(wù)不應(yīng)該運行
        **  2、如果任務(wù)已經(jīng)開始執(zhí)行,由參數(shù)mayInterruptIfRunning來決定執(zhí)行該任務(wù)的線程是否應(yīng)該被中斷,這只是終止任務(wù)的一種嘗試。若mayInterruptIfRunning為true,則會立即中斷執(zhí)行任務(wù)的線程并返回true,若mayInterruptIfRunning為false,則會返回true且不會中斷任務(wù)執(zhí)行線程。
        ** 3、調(diào)用這個方法后,以后對isDone方法調(diào)用都返回true。
        ** 4、如果這個方法返回true,以后對isCancelled返回true。
        ***/
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * 判斷任務(wù)是否被取消了,如果調(diào)用了cance()則返回true
     */
    boolean isCancelled();

    /**
     *如果任務(wù)完成,則返回ture
     *任務(wù)完成包含正常終止、異常、取消任務(wù)。在這些情況下都返回true
     */
    boolean isDone();

    /**
     * 線程阻塞,直到任務(wù)完成,返回結(jié)果
     * 如果任務(wù)被取消,則引發(fā)CancellationException
     * 如果當(dāng)前線程被中斷,則引發(fā)InterruptedException
     * 當(dāng)任務(wù)在執(zhí)行的過程中出現(xiàn)異常,則拋出ExecutionException
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * 線程阻塞一定時間等待任務(wù)完成,并返回任務(wù)執(zhí)行結(jié)果,如果則超時則拋出TimeoutException
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

3、FutureTask
????Future只是一個接口,不能直接用來創(chuàng)建對象,其實現(xiàn)類是FutureTask,JDK1.8修改了FutureTask的實現(xiàn),JKD1.8不再依賴AQS來實現(xiàn),而是通過一個volatile變量state以及CAS操作來實現(xiàn)。FutureTask結(jié)構(gòu)如下所示:
JDK1.8  FutureTask源碼解讀(Future模式)

public class FutureTask implements RunnableFuture {
    /*
     * 當(dāng)前任務(wù)運行狀態(tài)
     * NEW -> COMPLETING -> NORMAL(正常結(jié)束,返回結(jié)果)
     * NEW -> COMPLETING -> EXCEPTIONAL(返回異常結(jié)果)
     * NEW -> CANCELLED(任務(wù)被取消,無結(jié)果)
     * NEW -> INTERRUPTING -> INTERRUPTED(任務(wù)被打斷,無結(jié)果)
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** 將要被執(zhí)行的任務(wù) */
    private Callable callable;
    /** 存放執(zhí)行結(jié)果,用于get()方法獲取結(jié)果,也可能用于get()方法拋出異常 */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** 執(zhí)行任務(wù)Callable的線程; */
    private volatile Thread runner;
    /** 棧結(jié)構(gòu)的等待隊列,該節(jié)點是棧中最頂層的節(jié)點 */
    private volatile WaitNode waiters;
為了后面更好的分析FutureTask的實現(xiàn),這里有必要解釋下各個狀態(tài)。
NEW:表示是個新的任務(wù)或者還沒被執(zhí)行完的任務(wù)。這是初始狀態(tài)。
COMPLETING:任務(wù)已經(jīng)執(zhí)行完成或者執(zhí)行任務(wù)的時候發(fā)生異常,但是任務(wù)執(zhí)行結(jié)果或者異常原因還沒有保存到outcome字段(outcome字段用來保存任務(wù)執(zhí)行結(jié)果,如果發(fā)生異常,則用來保存異常原因)的時候,狀態(tài)會從NEW變更到COMPLETING。但是這個狀態(tài)會時間會比較短,屬于中間狀態(tài)。
NORMAL:任務(wù)已經(jīng)執(zhí)行完成并且任務(wù)執(zhí)行結(jié)果已經(jīng)保存到outcome字段,狀態(tài)會從COMPLETING轉(zhuǎn)換到NORMAL。這是一個最終態(tài)。
EXCEPTIONAL:任務(wù)執(zhí)行發(fā)生異常并且異常原因已經(jīng)保存到outcome字段中后,狀態(tài)會從COMPLETING轉(zhuǎn)換到EXCEPTIONAL。這是一個最終態(tài)。
CANCELLED:任務(wù)還沒開始執(zhí)行或者已經(jīng)開始執(zhí)行但是還沒有執(zhí)行完成的時候,用戶調(diào)用了cancel(false)方法取消任務(wù)且不中斷任務(wù)執(zhí)行線程,這個時候狀態(tài)會從NEW轉(zhuǎn)化為CANCELLED狀態(tài)。這是一個最終態(tài)。
INTERRUPTING: 任務(wù)還沒開始執(zhí)行或者已經(jīng)執(zhí)行但是還沒有執(zhí)行完成的時候,用戶調(diào)用了cancel(true)方法取消任務(wù)并且要中斷任務(wù)執(zhí)行線程但是還沒有中斷任務(wù)執(zhí)行線程之前,狀態(tài)會從NEW轉(zhuǎn)化為INTERRUPTING。這是一個中間狀態(tài)。
INTERRUPTED:調(diào)用interrupt()中斷任務(wù)執(zhí)行線程之后狀態(tài)會從INTERRUPTING轉(zhuǎn)換到INTERRUPTED。這是一個最終態(tài)。
有一點需要注意的是,所有值大于COMPLETING的狀態(tài)都表示任務(wù)已經(jīng)執(zhí)行完成(任務(wù)正常執(zhí)行完成,任務(wù)執(zhí)行異?;蛘呷蝿?wù)被取消)。

3.1、FutureTask構(gòu)造方法

    // Callable 構(gòu)造方法
public FutureTask(Callable callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

/**
 * runnable 構(gòu)造函數(shù)
 */
public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

3.2、get()方法阻塞隊列

    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }

3.3、run方法解析

//Executor調(diào)用執(zhí)行任務(wù)
//
    public void run() {
        //狀態(tài)如果不是NEW,說明任務(wù)或者已經(jīng)執(zhí)行過,或者已經(jīng)被取消,直接返返回,當(dāng)然如果執(zhí)行任務(wù)的線程runner不為null,說明任務(wù)正在執(zhí)行
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        //執(zhí)行任務(wù)
        try {
            Callable c = callable;
                        //判斷任務(wù)是否為null,狀態(tài)是否為NEW
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call(); //暫時存放任務(wù)執(zhí)行結(jié)果
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;  //執(zhí)行失敗
                    //通過CAS算法設(shè)置返回值(COMPLETING)和狀態(tài)值(EXCEPTIONAL)
                    setException(ex);
                }
                  //執(zhí)行成功通過CAS(UNSAFE)設(shè)置返回值(COMPLETING)和狀態(tài)值(NORMAL)
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
                        //將任務(wù)runner設(shè)置為null,避免發(fā)生并發(fā)調(diào)用run()方法
            runner = null;  
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
                        //須重新讀取任務(wù)狀態(tài),避免不可達(dá)(泄漏)的中斷
            int s = state;
                        //確保cancle(ture)操作時,運行中的任務(wù)能接收到中斷指令
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

3.4、setException(Throwable t)解析

//發(fā)生異常時,將返回值設(shè)置到outcome(=COMPLETING)中,并更新任務(wù)狀態(tài)(EXCEPTIONAL)
    protected void setException(Throwable t) {
        //調(diào)用UNSAFE類封裝的CAS算法,設(shè)置值
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        //喚醒因等待返回值而阻塞的線程
            finishCompletion();
        }
    }

3.5、set(V v)方法解析

//任務(wù)正常完成,將返回值設(shè)置到outcome(=COMPLETING)中,并更新任務(wù)狀態(tài)(=NORMAL)
    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

3.6、 finishCompletion解析

//移除所有等待線程并發(fā)出信號,調(diào)用done(),以及將任務(wù)callable清空
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                            //循環(huán)喚醒阻塞線程,直到阻塞隊列為空
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                                        //一直到阻塞隊列為空,跳出循環(huán)
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc   方便gc在適當(dāng)?shù)臅r候回收
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

3.7、handlePossibleCancellationInterrupt 方法解析

private void handlePossibleCancellationInterrupt(int s) {
        // It is possible for our interrupter to stall before getting a
        // chance to interrupt us.  Let's spin-wait patiently.
        //自旋等待cancle(true)結(jié)束(中斷結(jié)束)
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield(); // wait out pending interrupt

        // assert state == INTERRUPTED;

        // We want to clear any interrupt we may have received from
        // cancel(true).  However, it is permissible to use interrupts
        // as an independent mechanism for a task to communicate with
        // its caller, and there is no way to clear only the
        // cancellation interrupt.
        //
        // Thread.interrupted();
    }

3.8、cancle方法解析

//取消任務(wù)執(zhí)行
    public boolean cancel(boolean mayInterruptIfRunning) {
        //對NEW狀態(tài)的任務(wù)進行中斷,并根據(jù)參數(shù)設(shè)置state
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                    //任務(wù)已完成(已發(fā)出中斷或已取消)
            return false;       
            //中斷線程
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {//cancel(true)
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                                //通過CAS算法,更新狀態(tài)
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            //喚醒阻塞線程
            finishCompletion();
        }
        return true;
    }

3.9 get方法解析

    /**
         * 獲取執(zhí)行結(jié)果
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
                //假如任務(wù)還沒有執(zhí)行完,則則塞線程,直至任務(wù)執(zhí)行完成(結(jié)果已經(jīng)存放到對應(yīng)變量中)
            s = awaitDone(false, 0L);
                //返回結(jié)果
        return report(s);
    }

    /**
         * 獲取任務(wù)執(zhí)行結(jié)果,指定時間結(jié)束,則超時返回,不再阻塞
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

3.10 awaitDone解析

/**
     * Awaits completion or aborts on interrupt or timeout.
     * 如英文注釋:等待任務(wù)執(zhí)行完畢或任務(wù)中斷或任務(wù)超時
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
                //循環(huán)等待
        for (;;) {
                //線程已經(jīng)中斷,則移除等待任務(wù)
            if (Thread.interrupted()) {
                removeWaiter(q);
                                //移除當(dāng)前任務(wù)后,拋出中斷異常
                throw new InterruptedException();
            }

            //任務(wù)已經(jīng)完成,則返回任務(wù)狀態(tài),并對當(dāng)前任務(wù)清場處理
            int s = state;
            if (s > COMPLETING) {
                if (q != null) //任務(wù)不為空,則將執(zhí)行線程設(shè)為null,避免并發(fā)下重復(fù)執(zhí)行
                    q.thread = null;
                return s;
            }
                        //設(shè)置結(jié)果,很快就能完成,自旋等待
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();  //任務(wù)提前退出
                    //正在執(zhí)行或者還沒開始,則構(gòu)建新的節(jié)點
            else if (q == null)
                q = new WaitNode();
                    //判斷是否入隊,新節(jié)點一般在下一次循環(huán)入隊列阻塞
            else if (!queued)
                                //沒有入隊列,設(shè)置q.next=waiters,并將waiters設(shè)為q
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
                //假如有超時限制,則判斷是否超時
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                                //超時則將任務(wù)節(jié)點從阻塞隊列中移除,并返回狀態(tài)
                    removeWaiter(q);
                    return state;
                }
                                //阻塞調(diào)用get方法的線程,有超時限制
                LockSupport.parkNanos(this, nanos);
            }
            else
                        //阻塞調(diào)用get方法的線程,無超時限制
                LockSupport.park(this);
        }
    }

3.11 removeWaiter方法解析

//移除任務(wù)節(jié)點
    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
        }
    }

分享文章:JDK1.8FutureTask源碼解讀(Future模式)
本文來源:http://weahome.cn/article/jeipec.html

其他資訊

在線咨詢

微信咨詢

電話咨詢

028-86922220(工作日)

18980820575(7×24)

提交需求

返回頂部