0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫(xiě)文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

FutureTask是如何通過(guò)阻塞來(lái)獲取到異步線程執(zhí)行結(jié)果的呢?

OSC開(kāi)源社區(qū) ? 來(lái)源:OSCHINA 社區(qū) ? 2023-08-12 14:37 ? 次閱讀

1、FutureTask 對(duì)象介紹

Future 對(duì)象大家都不陌生,是 JDK1.5 提供的接口,是用來(lái)以阻塞的方式獲取線程異步執(zhí)行完的結(jié)果。 在 Java 中想要通過(guò)線程執(zhí)行一個(gè)任務(wù),離不開(kāi) Runnable 與 Callable 這兩個(gè)接口。 Runnable 與 Callable 的區(qū)別在于,Runnable 接口只有一個(gè) run 方法,該方法用來(lái)執(zhí)行邏輯,但是并沒(méi)有返回值;而 Callable 的 call 方法,同樣用來(lái)執(zhí)行業(yè)務(wù)邏輯,但是是有一個(gè)返回值的。

Callable 執(zhí)行任務(wù)過(guò)程中可以通過(guò) FutureTask 獲得任務(wù)的執(zhí)行狀態(tài),并且可以在執(zhí)行完成后通過(guò) Future.get () 方式獲取執(zhí)行結(jié)果。 Future 是一個(gè)接口,而 FutureTask 就是 Future 的實(shí)現(xiàn)類(lèi)。并且 FutureTask 實(shí)現(xiàn)了 RunnableFuture(Runnable + Future),說(shuō)明我們可以創(chuàng)建一個(gè) FutureTask 并直接把它放到線程池執(zhí)行,然后獲取 FutureTask 的執(zhí)行結(jié)果。

2、FutureTask 源碼解析

2.1 主要方法和屬性

那么 FutureTask 是如何通過(guò)阻塞的方式來(lái)獲取到異步線程執(zhí)行的結(jié)果的呢?我們看下 FutureTask 中的屬性。

// FutureTask的狀態(tài)及其常量
privatevolatileint state;
    privatestaticfinalint NEW          =0;
    privatestaticfinalint COMPLETING   =1;
    privatestaticfinalint NORMAL       =2;
    privatestaticfinalint EXCEPTIONAL  =3;
    privatestaticfinalint CANCELLED    =4;
    privatestaticfinalint INTERRUPTING =5;
    privatestaticfinalint INTERRUPTED  =6;
    
    // callable對(duì)象,執(zhí)行完后置空
    privateCallable callable;
    // 要返回的結(jié)果或要引發(fā)的異常來(lái)自 get() 方法
    privateObject outcome;// non-volatile, protected by state reads/writes
    // 執(zhí)行Callable的線程
    privatevolatileThread runner;
    // 等待線程的一個(gè)鏈表結(jié)構(gòu)
    privatevolatileWaitNode waiters;

?FutureTask 中幾個(gè)比較重要的方法。

// 取消任務(wù)的執(zhí)行
booleancancel(boolean mayInterruptIfRunning);
// 返回任務(wù)是否已經(jīng)被取消
booleanisCancelled();
// 返回任務(wù)是否已經(jīng)完成,任務(wù)狀態(tài)不為NEW即為完成
booleanisDone();
// 通過(guò)get方法獲取任務(wù)的執(zhí)行結(jié)果
Vget()throwsInterruptedException,ExecutionException;
// 通過(guò)get方法獲取任務(wù)的執(zhí)行結(jié)果,帶有超時(shí),如果超過(guò)給定時(shí)間則拋出異常
Vget(long timeout,TimeUnit unit)
        throwsInterruptedException,ExecutionException,TimeoutException;

?2.2 FutureTask 執(zhí)行

當(dāng)我們?cè)诰€程池中執(zhí)行一個(gè) Callable 方法時(shí),其實(shí)是將 Callable 任務(wù)封裝成一個(gè) RunnableFuture 對(duì)象去執(zhí)行,同時(shí)將這個(gè) RunnableFuture 對(duì)象返回,這樣我們就拿到了 FutureTask 的引用,可以隨時(shí)獲取到任務(wù)執(zhí)行的狀態(tài),并且可以在任務(wù)執(zhí)行完成后通過(guò)該對(duì)象獲取執(zhí)行結(jié)果。 以下為 ThreadPoolExecutor 線程池提交一個(gè) callable 方法的源碼。

public Future submit(Callable task){
        if(task ==null)thrownewNullPointerException();
        RunnableFuture ftask =newTaskFor(task);
        execute(ftask);
        return ftask;
    }

protected RunnableFuture newTaskFor(Callable callable){
        returnnewFutureTask(callable);
    }

?2.3 run 方法介紹

RunnableFuture 其實(shí)也是一個(gè)可以執(zhí)行的 runnable,我們看下他的 run 方法。其主要流程就是執(zhí)行 call 方法,正常執(zhí)行完畢后將 result 結(jié)果賦值到 outcome 屬性上。

publicvoidrun(){
        if(state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null,Thread.currentThread()))
            return;
        try{
            // 將callable賦值到本地變量
            Callable c = callable;
            // 判斷callable不為空并且FutureTask的狀態(tài)必須為新創(chuàng)建
            if(c !=null&& state == NEW){
                V result;
                boolean ran;
                try{
                    // 執(zhí)行call方法(用戶(hù)自己實(shí)現(xiàn)的call邏輯),并獲取到result結(jié)果
                    result = c.call();
                    ran =true;
                }catch(Throwable ex){
                    result =null;
                    ran =false;
                    // 如果執(zhí)行過(guò)程出現(xiàn)異常,則將異常對(duì)象賦值到outcome上
                    setException(ex);
                }
                // 如果正常執(zhí)行完畢,則將result賦值到outcome屬性上
                if(ran)
                    set(result);
            }
        }finally{
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner =null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if(s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

?以下邏輯為正常執(zhí)行完成后賦值的邏輯。

// 如果任務(wù)沒(méi)有被取消,將future執(zhí)行完的返回值賦值給result結(jié)果
// FutureTask任務(wù)的執(zhí)行狀態(tài)是通過(guò)CAS的方式進(jìn)行賦值的,并且由此可知,COMPLETING其實(shí)是一個(gè)瞬時(shí)狀態(tài)
// 當(dāng)將線程執(zhí)行結(jié)果賦值給outcome后,狀態(tài)會(huì)修改為對(duì)應(yīng)的NORMAL,即正常結(jié)束
protectedvoidset(V v){
        if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL);// final state
            finishCompletion();
        }
    }

?以下為執(zhí)行異常時(shí)賦值邏輯,直接將 Throwable 對(duì)象賦值到 outcome 屬性上。

protectedvoidsetException(Throwable t){
        if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)){
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);// final state
            finishCompletion();
        }
    }

?無(wú)論是正常執(zhí)行還是異常執(zhí)行,最終都會(huì)調(diào)用一個(gè) finishCompletion 方法,用來(lái)做工作的收尾工作。

2.4 get 方法介紹

Future 的 get 方法有兩個(gè)重載的方法,一個(gè)是 get () 獲取結(jié)果,一個(gè)是 get (long, TimeUnit) 帶有超時(shí)時(shí)間的獲取結(jié)果,我們看下 FutureTask 中的這兩個(gè)方法是如何實(shí)現(xiàn)的。

// 不帶有超時(shí)時(shí)間,一直阻塞直到獲取結(jié)果
publicVget()throwsInterruptedException,ExecutionException{
        int s = state;
        if(s <= COMPLETING)
            // 等待結(jié)果完成,帶有超時(shí)的get方法也是調(diào)用的awaitDone方法
            s =awaitDone(false,0L);
        // 返回結(jié)果
        returnreport(s);
    }

// 帶有超時(shí)時(shí)間的獲取結(jié)果,如果超過(guò)時(shí)間還沒(méi)有獲取到結(jié)果則拋出異常
publicVget(long timeout,TimeUnit unit)
        throwsInterruptedException,ExecutionException,TimeoutException{
        if(unit ==null)
            thrownewNullPointerException();
        int s = state;
        // 如果任務(wù)未中斷,調(diào)用awaitDone方法等待任務(wù)結(jié)果
        if(s <= COMPLETING &&
            (s =awaitDone(true, unit.toNanos(timeout)))<= COMPLETING)
            thrownewTimeoutException();
        // 返回結(jié)果
        returnreport(s);
    }

?我們主要看下 awaitDone 方法的執(zhí)行邏輯。此方法會(huì)通過(guò) for 循環(huán)的方式一直阻塞等待任務(wù)執(zhí)行完成。如果帶有超時(shí)時(shí)間,則超過(guò)截止時(shí)間后會(huì)直接返回。

// timed:是否需要超時(shí)獲取
// nanos:超時(shí)時(shí)間單位納秒
privateintawaitDone(boolean timed,long nanos)
        throwsInterruptedException{
        finallong deadline = timed ?System.nanoTime()+ nanos :0L;
        WaitNode q =null;
        boolean queued =false;
        // 此方法會(huì)一直for循環(huán)判斷任務(wù)狀態(tài)是否已經(jīng)完成,是Future.get阻塞的原因
        for(;;){
            if(Thread.interrupted()){
                removeWaiter(q);
                thrownewInterruptedException();
            }

            int s = state;
            // 任務(wù)狀態(tài)大于COMPLETING,則表明任務(wù)結(jié)束,直接返回
            if(s > COMPLETING){
                if(q !=null)
                    q.thread =null;
                return s;
            }
            elseif(s == COMPLETING)// cannot time out yet
                // Thread.yield() 方法,使當(dāng)前線程由執(zhí)行狀態(tài),變成為就緒狀態(tài),讓出cpu時(shí)間,在下一個(gè)線程執(zhí)行時(shí)候,此線程有可能被執(zhí)行,也有可能沒(méi)有被執(zhí)行。
                // COMPLETING狀態(tài)為瞬時(shí)狀態(tài),任務(wù)執(zhí)行完成,要么是正常結(jié)束,要么異常結(jié)束,后續(xù)會(huì)被置為NORMAL或者EXCEPTIONAL
                Thread.yield();
            elseif(q ==null)
                // 每調(diào)用一次get方法,都會(huì)創(chuàng)建一個(gè)WaitNode等待節(jié)點(diǎn)
                q =newWaitNode();
            elseif(!queued)
                // 將該等待節(jié)點(diǎn)添加到鏈表結(jié)構(gòu)waiters中,q.next = waiters 即在waiters的頭部插入
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            // 如果方法帶有超時(shí)判斷,則判斷當(dāng)前時(shí)間是否已經(jīng)超過(guò)了截止時(shí)間,如果超過(guò)了及截止日期,則退出循環(huán)直接返回當(dāng)前狀態(tài),此時(shí)任務(wù)狀態(tài)一定是NEW
            elseif(timed){
                nanos = deadline -System.nanoTime();
                if(nanos <=0L){
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

?我們?cè)诳聪?report 方法,在調(diào)用 get 方法時(shí)是如何返回結(jié)果的。

這里首先獲取 outcome 的值,并判斷任務(wù)是否已經(jīng)執(zhí)行完成,如果執(zhí)行完成,則將 outcome 對(duì)象強(qiáng)轉(zhuǎn)成泛型指定的類(lèi)型;如果任務(wù)被取消了,則拋出一個(gè) CancellationException 異常;如果都不是,則說(shuō)明任務(wù)在執(zhí)行過(guò)程中發(fā)生了異常,此時(shí)任務(wù)狀態(tài)位 EXCEPTIONAL,此時(shí)的 outcome 即為 Throwable 對(duì)象,所以將 outcome 強(qiáng)轉(zhuǎn)為 Throwable 并拋出異常。

由此可以知道,我們將一個(gè) FutureTask 任務(wù) submit 到線程池中執(zhí)行的時(shí)候,如果發(fā)生了異常,是會(huì)在調(diào)用 get 方法的時(shí)候拋出的。
privateVreport(int s)throwsExecutionException{
        Object x = outcome;
        if(s == NORMAL)
            return(V)x;
        if(s >= CANCELLED)
            thrownewCancellationException();
        thrownewExecutionException((Throwable)x);
    }

?2.5 cancel 方法介紹

cancel 方法用于取消正在運(yùn)行的任務(wù),如果任務(wù)取消成功,則返回 TRUE,如果取消失敗則返回 FALSE。

// mayInterruptIfRunning:允許中斷正在運(yùn)行的任務(wù)
publicbooleancancel(boolean mayInterruptIfRunning){
        // mayInterruptIfRunning如果為true則將狀態(tài)置為INTERRUPTING,如果未false則將狀態(tài)置為CANCELLED
        if(!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            returnfalse;
        // 如果狀態(tài)修改成功后,判斷是否允許中斷線程,如果允許,則調(diào)用Thread的interrupt方法中斷
        try{// in case call to interrupt throws exception
            if(mayInterruptIfRunning){
                try{
                    Thread t = runner;
                    if(t !=null)
                        t.interrupt();
                }finally{// final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        }finally{
            // 取消后的收尾工作
            finishCompletion();
        }
        returntrue;
    }

?2.6 isDone/isCancelled 方法介紹

isDone 方法用于判斷 FutureTask 是否已經(jīng)完成;isCancelled 方法用來(lái)判斷 FutureTask 是否已經(jīng)取消,這兩個(gè)方法都是通過(guò)狀態(tài)位來(lái)判斷的。

publicbooleanisCancelled(){
        return state >= CANCELLED;
    }

    publicbooleanisDone(){
        return state != NEW;
    }

?2.7 finishCompletion 方法介紹

我們看下 finishCompletion 方法都做了哪些工作。

// 刪除所有等待線程并發(fā)出信號(hào),最后執(zhí)行done方法
privatevoidfinishCompletion(){
        // assert state > COMPLETING;
        for(WaitNode q;(q = waiters)!=null;){
            if(UNSAFE.compareAndSwapObject(this, waitersOffset, q,null)){
                for(;;){
                    Thread t = q.thread;
                    if(t !=null){
                        q.thread =null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if(next ==null)
                        break;
                    q.next =null;// unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable =null;// to reduce footprint
    }

?我們看到 done 方法是一個(gè)受保護(hù)的空方法,此處沒(méi)有任何邏輯,由其子類(lèi)去根據(jù)自己的業(yè)務(wù)去實(shí)現(xiàn)相應(yīng)的邏輯。例如:java.util.concurrent.ExecutorCompletionService.QueueingFuture。
protectedvoiddone(){}

3、總結(jié)

通過(guò)源碼解讀可以了解到 Future 的原理:

第一步:主線程將任務(wù)封裝成一個(gè) Callable 對(duì)象,通過(guò) submit 方法提交到線程池去執(zhí)行。

第二步:線程池執(zhí)行任務(wù)的 run 方法,主線程則可以繼續(xù)執(zhí)行其他邏輯。

第三步:線程池中方法執(zhí)行完成后將結(jié)果賦值到 outcome 屬性上,并修改任務(wù)狀態(tài)。

第四步:主線程在需要拿到異步任務(wù)結(jié)果的時(shí)候,主動(dòng)調(diào)用 fugure.get () 方法來(lái)獲取結(jié)果。

第五步:如果異步線程在執(zhí)行過(guò)程中發(fā)生異常,則會(huì)在調(diào)用 future.get () 方法的時(shí)候拋出來(lái)。 以上就是對(duì)于 FutureTask 的分析,我們可以了解 FutureTask 任務(wù)執(zhí)行的方式以及 Future.get 已阻塞的方式獲取線程執(zhí)行的結(jié)果原理,并且從代碼中可以了解 FutureTask 的任務(wù)執(zhí)行狀態(tài)以及狀態(tài)的變化過(guò)程。





審核編輯:劉清

聲明:本文內(nèi)容及配圖由入駐作者撰寫(xiě)或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • 狀態(tài)機(jī)
    +關(guān)注

    關(guān)注

    2

    文章

    492

    瀏覽量

    27577
  • 線程池
    +關(guān)注

    關(guān)注

    0

    文章

    57

    瀏覽量

    6866
  • for循環(huán)
    +關(guān)注

    關(guān)注

    0

    文章

    61

    瀏覽量

    2512

原文標(biāo)題:并發(fā)編程 - FutureTask 解析

文章出處:【微信號(hào):OSC開(kāi)源社區(qū),微信公眾號(hào):OSC開(kāi)源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    鴻蒙原生應(yīng)用開(kāi)發(fā)-ArkTS語(yǔ)言基礎(chǔ)類(lèi)庫(kù)多線程I/O密集型任務(wù)開(kāi)發(fā)

    使用異步并發(fā)可以解決單次I/O任務(wù)阻塞的問(wèn)題,但是如果遇到I/O密集型任務(wù),同樣會(huì)阻塞線程中其它任務(wù)的執(zhí)行,這時(shí)需要使用多
    發(fā)表于 03-21 14:57

    Java線程阻塞方法大全

    如果線程是因?yàn)檎{(diào)用了wait()、sleep()或者join()方法而導(dǎo)致的阻塞,可以中斷線程,并且通過(guò)拋出InterruptedException
    發(fā)表于 04-02 15:42

    Java的線程喚醒與阻塞規(guī)則

    如果線程是因?yàn)檎{(diào)用了wait()、sleep()或者join()方法而導(dǎo)致的阻塞,可以中斷線程,并且通過(guò)拋出InterruptedException
    發(fā)表于 07-06 15:11

    同步與異步阻塞與非阻塞的區(qū)別是什么

    同步與異步,阻塞與非阻塞的區(qū)別
    發(fā)表于 01-26 06:12

    labview怎么終止‘執(zhí)行系統(tǒng)命令’并獲取所有結(jié)果

    現(xiàn)在有一個(gè)應(yīng)用要一邊運(yùn)動(dòng)一邊通過(guò)執(zhí)行系統(tǒng)命令’來(lái)獲取一系列數(shù)據(jù),獲取數(shù)據(jù)的指令是一直以0.1秒的速率讀取數(shù)據(jù),我可以在運(yùn)動(dòng)開(kāi)始的時(shí)候開(kāi)啟‘
    發(fā)表于 04-07 10:31

    如何使用多線程異步操作等并發(fā)設(shè)計(jì)方法來(lái)最大化程序的性能

    很多朋友往往會(huì)使用線程來(lái)執(zhí)行耗時(shí)較長(zhǎng)的I/O操作。這樣在只有少數(shù)幾個(gè)并發(fā)操作的時(shí)候還無(wú)傷大雅,如果需要處理大量的并發(fā)操作時(shí)就不合適了。  異步調(diào)用與多
    發(fā)表于 08-23 16:31

    A線程如何在線程本身識(shí)別變量是否改變

    阻塞獲取可以解決但是這個(gè)B線程是別人代碼寫(xiě)的。不好修改不想再增加一個(gè)線程去循環(huán)讀取變量X是否改變,再釋放信號(hào)量需求A線程如何在
    發(fā)表于 11-02 11:02

    異步調(diào)用子vi問(wèn)題

    我試了異步調(diào)用子vi,現(xiàn)在的問(wèn)題是子vi是一個(gè)循環(huán),但是我在主程序獲取子vi的結(jié)果時(shí),只有子vi結(jié)束了才能獲取且只能獲取到循環(huán)最后一次的
    發(fā)表于 11-11 10:34

    是否有函數(shù)或者功能可以實(shí)現(xiàn)A線程阻塞變量的值?

    阻塞獲取可以解決但是這個(gè)B線程是別人代碼寫(xiě)的。不好修改不想再增加一個(gè)線程去循環(huán)讀取變量X是否改變,再釋放信號(hào)量需求A線程如何在
    發(fā)表于 02-01 16:25

    Runloop是怎樣進(jìn)行線程?;?/a>

    ,它會(huì)向一個(gè) C語(yǔ)言程序那樣在運(yùn)行完所有代碼后退出線程。 而網(wǎng)絡(luò)請(qǐng)求是異步的,這導(dǎo)致獲取到請(qǐng)求數(shù)據(jù)時(shí),線程已經(jīng)退出,代理方法沒(méi)有機(jī)會(huì)執(zhí)行。
    發(fā)表于 09-26 10:34 ?0次下載

    沒(méi)有互聯(lián)網(wǎng),如何本地獲取到LoRaWAN的終端數(shù)據(jù)?

    一般情況下,我們可以通過(guò)連接TTN,來(lái)獲取到LoRaWAN的終端數(shù)據(jù)。 但是,如果沒(méi)有互聯(lián)網(wǎng),那么,我們也就無(wú)法通過(guò)連接TTN來(lái)
    發(fā)表于 04-10 16:38 ?987次閱讀

    詳解同步異步阻塞阻塞

    同步、異步分別指的是一種通訊方式,當(dāng) cpu 不需要執(zhí)行線程上下文切換就能完成任務(wù),此時(shí)便認(rèn)為這種通訊方式是同步的,相對(duì)的如果存在cpu 上下文切換,這種方式便是異步
    的頭像 發(fā)表于 05-03 17:53 ?4868次閱讀
    詳解同步<b class='flag-5'>異步</b>和<b class='flag-5'>阻塞</b>非<b class='flag-5'>阻塞</b>

    使用匿名管道技術(shù)獲取CMD命令的執(zhí)行結(jié)果

    遠(yuǎn)程 CMD 是指惡意程序接收到控制端發(fā)送的 CMD 指令后,在本地執(zhí)行 CMD 命令,并將執(zhí)行結(jié)果回傳至控制端。本文將演示使用匿名管道技術(shù)獲取 CMD 命令的
    的頭像 發(fā)表于 04-03 18:04 ?4044次閱讀

    CompletableFuture異步線程是真的優(yōu)雅

    雖然 Future 以及相關(guān)使用方法提供了異步執(zhí)行任務(wù)的能力,但是對(duì)于結(jié)果獲取卻是很不方便,我們必須使用Future.get()的方式阻塞
    的頭像 發(fā)表于 08-07 15:40 ?700次閱讀
    CompletableFuture<b class='flag-5'>異步</b>多<b class='flag-5'>線程</b>是真的優(yōu)雅

    verilog同步和異步的區(qū)別 verilog阻塞賦值和非阻塞賦值的區(qū)別

    Verilog中同步和異步的區(qū)別,以及阻塞賦值和非阻塞賦值的區(qū)別。 一、Verilog中同步和異步的區(qū)別 同步傳輸和異步傳輸是指數(shù)據(jù)在電路中
    的頭像 發(fā)表于 02-22 15:33 ?1773次閱讀