0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

如何設(shè)計(jì)一個(gè)線程池?JAVA中的線程池是如何設(shè)計(jì)的?

OSC開源社區(qū) ? 來源:京東云開發(fā)者 ? 2023-11-07 09:12 ? 次閱讀

一、線程

1、什么是線程

線程(thread) 是操作系統(tǒng)能夠進(jìn)行運(yùn)算調(diào)度的最小單位。它被包含在進(jìn)程之中,是進(jìn)程中的實(shí)際 運(yùn)作單位。一條線程指的是進(jìn)程中一個(gè)單一順序的控制流,一個(gè)進(jìn)程中可以并發(fā)多個(gè)線程,每條線 程并行執(zhí)行不同的任務(wù)。

2、如何創(chuàng)建線程

2.1、JAVA 中創(chuàng)建線程

/**
 * 繼承Thread類,重寫run方法
 */
classMyThreadextendsThread{
    @Override
    publicvoidrun(){
        System.out.println("myThread..."+Thread.currentThread().getName());
}}

/**
 * 實(shí)現(xiàn)Runnable接口,實(shí)現(xiàn)run方法 
 */
classMyRunnableimplementsRunnable{
    @Override
    publicvoidrun(){
        System.out.println("MyRunnable..."+Thread.currentThread().getName());
}}

/**
 * 實(shí)現(xiàn)Callable接口,指定返回類型,實(shí)現(xiàn)call方法
 */
classMyCallableimplementsCallable {
    @Override
    publicStringcall()throwsException{
        return"MyCallable..."+Thread.currentThread().getName();
}}

2.2、測(cè)試一下

publicstaticvoidmain(String[] args)throwsException{
    MyThread thread =newMyThread();
    thread.run();//myThread...main
    thread.start();//myThread...Thread-0
    
    MyRunnable myRunnable =newMyRunnable();
    Thread thread1 =newThread(myRunnable);
    myRunnable.run();//MyRunnable...main
    thread1.start();//MyRunnable...Thread-1
    
    MyCallable myCallable =newMyCallable();
    FutureTask futureTask =newFutureTask<>(myCallable);
    Thread thread2 =newThread(futureTask);
    thread2.start();
    System.out.println(myCallable.call());//MyCallable...main
    System.out.println(futureTask.get());//MyCallable...Thread-2

}
2.3、問題

既然我們創(chuàng)建了線程,那為何我們直接調(diào)用方法和我們調(diào)用 start () 方法的結(jié)果不同?new Thread () 是否真實(shí)創(chuàng)建了線程?

2.4、問題分析

我們直接調(diào)用方法,可以看到是執(zhí)行的主線程,而調(diào)用 start () 方法就是開啟了新線程,那說明 new Thread () 并沒有創(chuàng)建線程,而是在 start () 中創(chuàng)建了線程。 那我們看下 Thread 類 start () 方法:

classThreadimplementsRunnable{//Thread類實(shí)現(xiàn)了Runnalbe接口,實(shí)現(xiàn)了run()方法 
    
    privateRunnable target;

    publicsynchronizedvoidstart(){
        ...

        boolean started =false;
        try{
            start0();//可以看到,start()方法真實(shí)的調(diào)用時(shí)start0()方法 
            started =true;
        }finally{
            ...     
        } 
    }
    
    privatenativevoidstart0();//start0()是一個(gè)native方法,由JVM調(diào)用底層操作系統(tǒng),開啟一個(gè)線程,由操作系統(tǒng)過統(tǒng)一調(diào)度 

    @Override
    publicvoidrun(){
        if(target !=null){
             target.run();//操作系統(tǒng)在執(zhí)行新開啟的線程時(shí),回調(diào)Runnable接口的run()方法,執(zhí)行我們預(yù)設(shè)的線程任務(wù)

        } 
     } 
}
2.5、總結(jié)

1.JAVA 不能直接創(chuàng)建線程執(zhí)行任務(wù),而是通過創(chuàng)建 Thread 對(duì)象調(diào)用操作系統(tǒng)開啟線程,在由操作系 統(tǒng)回調(diào) Runnable 接口的 run () 方法執(zhí)行任務(wù);

2.實(shí)現(xiàn) Runnable 的方式,將線程實(shí)際要執(zhí)行的回調(diào)任務(wù)單獨(dú)提出來了,實(shí)現(xiàn)線程的啟動(dòng)與回調(diào)任務(wù) 解耦;

3.實(shí)現(xiàn) Callable 的方式,通過 Future 模式不但將線程的啟動(dòng)與回調(diào)任務(wù)解耦,而且可以在執(zhí)行完成后 獲取到執(zhí)行的結(jié)果;

二、多線程

1、什么是多線程

多線程(multithreading),是指從軟件或者硬件上實(shí)現(xiàn)多個(gè)線程并發(fā)執(zhí)行的技術(shù)。同一個(gè)線程只 能處理完一個(gè)任務(wù)在處理下一個(gè)任務(wù),有時(shí)我們需要多個(gè)任務(wù)同時(shí)處理,這時(shí),我們就需要?jiǎng)?chuàng)建多 個(gè)線程來同時(shí)處理任務(wù)。

2、多線程有什么好處

2.1、串行處理

publicstaticvoidmain(String[] args)throwsException{
    System.out.println("start...");
    long start =System.currentTimeMillis();
    for(int i =0; i <5; i++){
        Thread.sleep(2000);//每個(gè)任務(wù)執(zhí)行2秒 
        System.out.println("task done...");//處理執(zhí)行結(jié)果
    }
    long end =System.currentTimeMillis();
    System.out.println("end...,time = "+(end - start));
}
//執(zhí)行結(jié)果
start...
task done...
task done...
task done...
task done...
task done... end...,time =10043

2.2、并行處理

publicstaticvoidmain(String[] args)throwsException{
    System.out.println("start...");
    long start =System.currentTimeMillis();
    List list =newArrayList<>();

    for(int i =0; i <5; i++){
        Callable callable =newCallable(){
            @Override
            publicStringcall()throwsException{
                Thread.sleep(2000);//每個(gè)任務(wù)執(zhí)行2秒 
                return"task done...";
            }

        };
        FutureTask task =newFutureTask(callable);
        list.add(task);
        newThread(task).start();

    }
    
    list.forEach(future ->{
        try{ 
            System.out.println(future.get());//處理執(zhí)行結(jié)果 } catch (Exception e) {
         } 
    });
    
    long end =System.currentTimeMillis();
    System.out.println("end...,time = "+(end - start));

} 
//執(zhí)行結(jié)果
 start...
 task done...
 task done...
 task done...
 task done...
 task done... end...,time =2005
2.3、總結(jié)

1.多線程可以把一個(gè)任務(wù)拆分為幾個(gè)子任務(wù),多個(gè)子任務(wù)可以并發(fā)執(zhí)行,每一個(gè)子任務(wù)就是一個(gè)線程。

2.多線程是為了同步完成多項(xiàng)任務(wù),不是為了提高運(yùn)行效率,而是為了提高資源使用效率來提高系統(tǒng) 的效率。

2.4、多線程的問題

上面示例中我們可以看到,如果每來一個(gè)任務(wù),我們就創(chuàng)建一個(gè)線程,有很多任務(wù)的情況下,我們 會(huì)創(chuàng)建大量的線程,可能會(huì)導(dǎo)致系統(tǒng)資源的耗盡。同時(shí),我們知道線程的執(zhí)行是需要搶占 CPU 資源 的,那如果有太多的線程,就會(huì)導(dǎo)致大量時(shí)間用在線程切換的開銷上。 再有,每來一個(gè)任務(wù)都需要?jiǎng)?chuàng)建一個(gè)線程,而創(chuàng)建一個(gè)線程需要調(diào)用操作系統(tǒng)底層方法,開銷較 大,而線程執(zhí)行完成后就被回收了。在需要大量線程的時(shí)候,創(chuàng)建線程的時(shí)間就花費(fèi)不少了。

三、線程池

1、如何設(shè)計(jì)一個(gè)線程池

由于多線程的開發(fā)存在上述的一些問題,那我們是否可以設(shè)計(jì)一個(gè)東西來避免這些問題呢?當(dāng)然可以!線程池就是為了解決這些問題而生的。那我們?cè)撊绾卧O(shè)計(jì)一個(gè)線程池來解決這些問題呢?或者說,一個(gè)線程池該具備什么樣的功能?

1.1、線程池基本功能

1.多線程會(huì)創(chuàng)建大量的線程耗盡資源,那線程池應(yīng)該對(duì)線程數(shù)量有所限制,可以保證不會(huì)耗盡系統(tǒng)資 源; 2.每次創(chuàng)建新的線程會(huì)增加創(chuàng)建時(shí)的開銷,那線程池應(yīng)該減少線程的創(chuàng)建,盡量復(fù)用已創(chuàng)建好的線 程;

1.2、線程池面臨問題

1.我們知道線程在執(zhí)行完自己的任務(wù)后就會(huì)被回收,那我們?nèi)绾螐?fù)用線程? 2.我們指定了線程的最大數(shù)量,當(dāng)任務(wù)數(shù)超出線程數(shù)時(shí),我們?cè)撊绾翁幚恚?/p>

1.3、創(chuàng)新源于生活

先假設(shè)一個(gè)場(chǎng)景:假設(shè)我們是一個(gè)物流公司的管理人員,要配送的貨物就是我們的任務(wù),貨車就是 我們配送工具,我們當(dāng)然不能有多少貨物就準(zhǔn)備多少貨車。那當(dāng)顧客源源不斷的將貨物交給我們配 送,我們?cè)撊绾喂芾聿拍茏尮窘?jīng)營(yíng)的最好呢? 1.最開始貨物來的時(shí)候,我們還沒有貨車,每批要運(yùn)輸?shù)呢浳镂覀兌家?gòu)買一輛車來運(yùn)輸; 2.當(dāng)貨車運(yùn)輸完成后,暫時(shí)還沒有下一批貨物到達(dá),那貨車就在倉(cāng)庫(kù)停著,等有貨物來了立馬就可以 運(yùn)輸; 3.當(dāng)我們有了一定數(shù)量的車后,我們認(rèn)為已經(jīng)夠用了,那后面就不再買車了,這時(shí)要是由新的貨物來 了,我們就會(huì)讓貨物先放倉(cāng)庫(kù),等有車回來在配送; 4.當(dāng) 618 大促來襲,要配送的貨物太多,車都在路上,倉(cāng)庫(kù)也都放滿了,那怎么辦呢?我們就選擇臨 時(shí)租一些車來幫忙配送,提高配送的效率; 5.但是貨物還是太多,我們?cè)黾恿伺R時(shí)的貨車,依舊配送不過來,那這時(shí)我們就沒辦法了,只能讓發(fā) 貨的客戶排隊(duì)等候或者干脆不接受了; 6.大促圓滿完成后,累計(jì)的貨物已經(jīng)配送完成了,為了降低成本,我們就將臨時(shí)租的車都還了;

1.4、技術(shù)源于創(chuàng)新

基于上述場(chǎng)景,物流公司就是我們的線程池、貨物就是我們的線程任務(wù)、貨車就是我們的線程。我 們?nèi)绾卧O(shè)計(jì)公司的管理貨車的流程,就應(yīng)該如何設(shè)計(jì)線程池管理線程的流程。 1.當(dāng)任務(wù)進(jìn)來我們還沒有線程時(shí),我們就該創(chuàng)建線程執(zhí)行任務(wù); 2.當(dāng)線程任務(wù)執(zhí)行完成后,線程不釋放,等著下一個(gè)任務(wù)進(jìn)來后接著執(zhí)行; 3.當(dāng)創(chuàng)建的線程數(shù)量達(dá)到一定量后,新來的任務(wù)我們存起來等待空閑線程執(zhí)行,這就要求線程池有個(gè) 存任務(wù)的容器; 4.當(dāng)容器存滿后,我們需要增加一些臨時(shí)的線程來提高處理效率; 5.當(dāng)增加臨時(shí)線程后依舊處理不了的任務(wù),那就應(yīng)該將此任務(wù)拒絕; 6.當(dāng)所有任務(wù)執(zhí)行完成后,就應(yīng)該將臨時(shí)的線程釋放掉,以免增加不必要的開銷;

2、線程池具體分析

上文中,我們講了該如何設(shè)計(jì)一個(gè)線程池,下面我們看看大神是如何設(shè)計(jì)的;

2.1、 JAVA 中的線程池是如何設(shè)計(jì)的

2.1.1、 線程池設(shè)計(jì)

看下線程池中的屬性,了解線程池的設(shè)計(jì)。

publicclassThreadPoolExecutorextendsAbstractExecutorService{

    //線程池的打包控制狀態(tài),用高3位來表示線程池的運(yùn)行狀態(tài),低29位來表示線程池中工作線程的數(shù)量 
    privatefinalAtomicInteger ctl =newAtomicInteger(ctlOf(RUNNING,0)); 
    
    //值為29,用來表示偏移量
     privatestaticfinalint COUNT_BITS =Integer.SIZE -3; 

    //線程池的最大容量
     privatestaticfinalint CAPACITY =(1<< COUNT_BITS)-1; 

    //線程池的運(yùn)行狀態(tài),總共有5個(gè)狀態(tài),用高3位來表示 
    privatestaticfinalint RUNNING =-1<< COUNT_BITS;//接受新任務(wù)并處理阻塞隊(duì)列中的任務(wù) 

    privatestaticfinalint SHUTDOWN =0<< COUNT_BITS;//不接受新任務(wù)但會(huì)處理阻塞隊(duì)列中的任務(wù)  

    privatestaticfinalint STOP =1<< COUNT_BITS;//不會(huì)接受新任務(wù),也不會(huì)處理阻塞隊(duì)列中的任務(wù),并且中斷正在運(yùn)行的任務(wù)

    privatestaticfinalint TIDYING =2<< COUNT_BITS;//所有任務(wù)都已終止, 工作線程數(shù)量為0,即將要執(zhí)行terminated()鉤子方法 

    privatestaticfinalint TERMINATED =3<< COUNT_BITS;// terminated()方法已經(jīng)執(zhí)行結(jié)束

    //任務(wù)緩存隊(duì)列,用來存放等待執(zhí)行的任務(wù)
    privatefinalBlockingQueue workQueue; 

    //全局鎖,對(duì)線程池狀態(tài)等屬性修改時(shí)需要使用這個(gè)鎖
    privatefinalReentrantLock mainLock =newReentrantLock(); 

    //線程池中工作線程的集合,訪問和修改需要持有全局鎖
    privatefinalHashSet workers =newHashSet(); 

    // 終止條件
    privatefinalCondition termination = mainLock.newCondition(); 

    //線程池中曾經(jīng)出現(xiàn)過的最大線程數(shù) 
    privateint largestPoolSize; 
    
    //已完成任務(wù)的數(shù)量
    privatelong completedTaskCount; 
    
    //線程工廠
    privatevolatileThreadFactory threadFactory; 
    
    //任務(wù)拒絕策略
    privatevolatileRejectedExecutionHandler handler; 

    //線程存活時(shí)間
    privatevolatilelong keepAliveTime; 

    //是否允許核心線程超時(shí)
    privatevolatileboolean allowCoreThreadTimeOut; 

    //核心池大小,若allowCoreThreadTimeOut被設(shè)置,核心線程全部空閑超時(shí)被回收的情況下會(huì)為0 
    privatevolatileint corePoolSize; 

    //最大池大小,不得超過CAPACITY
    privatevolatileint maximumPoolSize; 
    
    //默認(rèn)的任務(wù)拒絕策略
    privatestaticfinalRejectedExecutionHandler defaultHandler =newAbortPolicy();

    //運(yùn)行權(quán)限相關(guān)
    privatestaticfinalRuntimePermission shutdownPerm = 
        newRuntimePermission("modifyThread");

    ... 
}
小結(jié)一下:以上線程池的設(shè)計(jì)可以看出,線程池的功能還是很完善的。 1.提供了線程創(chuàng)建、數(shù)量及存活時(shí)間等的管理; 2.提供了線程池狀態(tài)流轉(zhuǎn)的管理; 3.提供了任務(wù)緩存的各種容器; 4.提供了多余任務(wù)的處理機(jī)制; 5.提供了簡(jiǎn)單的統(tǒng)計(jì)功能;

2.1.2、線程池構(gòu)造函數(shù)

//構(gòu)造函數(shù)
 publicThreadPoolExecutor(int corePoolSize,//核心線程數(shù) 
                           int maximumPoolSize,//最大允許線程數(shù) 
                           long keepAliveTime,//線程存活時(shí)間 
                           TimeUnit unit,//存活時(shí)間單位 
                           BlockingQueue workQueue,//任務(wù)緩存隊(duì)列
                           ThreadFactory threadFactory,//線程工廠 
                           RejectedExecutionHandler handler){//拒絕策略 
    if(corePoolSize <0||
        maximumPoolSize <=0||
        maximumPoolSize < corePoolSize ||
        keepAliveTime <0)
        thrownewIllegalArgumentException();
        
    if(workQueue ==null|| threadFactory ==null|| handler ==null)
        thrownewNullPointerException();
        
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
小結(jié)一下: 1.構(gòu)造函數(shù)告訴了我們可以怎樣去適用線程池,線程池的哪些特性是我們可以控制的;

2.1.3、線程池執(zhí)行

2.1.3.1、提交任務(wù)方法

?public void execute(Runnable command);

?Future submit(Runnable task);

?Future submit(Runnable task, T result);

?Future submit(Callable task);

publicFuture submit(Runnable task){
        if(task ==null)thrownewNullPointerException();
        RunnableFuture ftask =newTaskFor(task,null);
        execute(ftask);
        return ftask;
}

可以看到 submit 方法的底層調(diào)用的也是 execute 方法,所以我們這里只分析 execute 方法;

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();
        //第一步:創(chuàng)建核心線程
        if (workerCountOf(c) < corePoolSize) {  //worker數(shù)量小于corePoolSize
            if (addWorker(command, true))       //創(chuàng)建worker
                return;
            c = ctl.get();
        }
        //第二步:加入緩存隊(duì)列
        if (isRunning(c) && workQueue.offer(command)) { //線程池處于RUNNING狀態(tài),將任務(wù)加入workQueue任務(wù)緩存隊(duì)列
            int recheck = ctl.get();    
            if (! isRunning(recheck) && remove(command))    //雙重檢查,若線程池狀態(tài)關(guān)閉了,移除任務(wù)
                reject(command);
            else if (workerCountOf(recheck) == 0)       //線程池狀態(tài)正常,但是沒有線程了,創(chuàng)建worker
                addWorker(null, false);
        }
        //第三步:創(chuàng)建臨時(shí)線程
        else if (!addWorker(command, false))
            reject(command);
    }
小結(jié)一下:execute () 方法主要功能:

1.核心線程數(shù)量不足就創(chuàng)建核心線程;

2.核心線程滿了就加入緩存隊(duì)列;

3.緩存隊(duì)列滿了就增加非核心線程;

4.非核心線程也滿了就拒絕任務(wù);

2.1.3.2、創(chuàng)建線程
privatebooleanaddWorker(Runnable firstTask,boolean core){
        retry:
        for(;;){
            int c = ctl.get();
            int rs =runStateOf(c);

            //等價(jià)于:rs>=SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
            //線程池已關(guān)閉,并且無需執(zhí)行緩存隊(duì)列中的任務(wù),則不創(chuàng)建
            if(rs >= SHUTDOWN &&
                !(rs == SHUTDOWN &&
                   firstTask ==null&&
                   ! workQueue.isEmpty()))
                returnfalse;

            for(;;){
                int wc =workerCountOf(c);
                if(wc >= CAPACITY ||
                    wc >=(core ? corePoolSize : maximumPoolSize))
                    returnfalse;
                if(compareAndIncrementWorkerCount(c))//CAS增加線程數(shù)
                    break retry;
                c = ctl.get();// Re-read ctl
                if(runStateOf(c)!= rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        //上面的流程走完,就可以真實(shí)開始創(chuàng)建線程了
        boolean workerStarted =false;
        boolean workerAdded =false;
        Worker w =null;
        try{
            w =newWorker(firstTask);//這里創(chuàng)建了線程
            finalThread t = w.thread;
            if(t !=null){
                finalReentrantLock mainLock =this.mainLock;
                mainLock.lock();
                try{
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs =runStateOf(ctl.get());

                    if(rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask ==null)){
                        if(t.isAlive())// precheck that t is startable
                            thrownewIllegalThreadStateException();
                        workers.add(w);//這里將線程加入到線程池中
                        int s = workers.size();
                        if(s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded =true;
                    }
                }finally{
                    mainLock.unlock();
                }
                if(workerAdded){
                    t.start();//添加成功,啟動(dòng)線程
                    workerStarted =true;
                }
            }
        }finally{
            if(! workerStarted)
                addWorkerFailed(w);//添加線程失敗操作
        }
        return workerStarted;
    }
小結(jié):addWorker () 方法主要功能;

1.增加線程數(shù);

2.創(chuàng)建線程 Worker 實(shí)例加入線程池;

3.加入完成開啟線程;

4.啟動(dòng)失敗則回滾增加流程;

2.1.3.3、工作線程的實(shí)現(xiàn)
privatefinalclassWorker//Worker類是ThreadPoolExecutor的內(nèi)部類
        extendsAbstractQueuedSynchronizer  
        implementsRunnable
    {
        
        finalThread thread;//持有實(shí)際線程
        Runnable firstTask;//worker所對(duì)應(yīng)的第一個(gè)任務(wù),可能為空
        volatilelong completedTasks;//記錄執(zhí)行任務(wù)數(shù)

        Worker(Runnable firstTask){
            setState(-1);// inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread =getThreadFactory().newThread(this);
        }
        
        publicvoidrun(){
            runWorker(this);//當(dāng)前線程調(diào)用ThreadPoolExecutor中的runWorker方法,在這里實(shí)現(xiàn)的線程復(fù)用
        }

        ...繼承AQS,實(shí)現(xiàn)了不可重入鎖...
    }

小結(jié):工作線程 Worker 類主要功能;

1.此類持有一個(gè)工作線程,不斷處理拿到的新任務(wù),持有的線程即為可復(fù)用的線程;

2.此類可看作一個(gè)適配類,在 run () 方法中真實(shí)調(diào)用 runWorker () 方法不斷獲取新任務(wù),完成線程復(fù)用;

2.1.3.4、線程的復(fù)用

finalvoidrunWorker(Worker w){//ThreadPoolExecutor中的runWorker方法,在這里實(shí)現(xiàn)的線程復(fù)用
        Thread wt =Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask =null;
        w.unlock();// allow interrupts
        boolean completedAbruptly =true;//標(biāo)識(shí)線程是否異常終止
        try{
            while(task !=null||(task =getTask())!=null){//這里會(huì)不斷從任務(wù)隊(duì)列獲取任務(wù)并執(zhí)行
                w.lock();
                
                //線程是否需要中斷
                if((runStateAtLeast(ctl.get(), STOP)||    
                     (Thread.interrupted()&&
                      runStateAtLeast(ctl.get(), STOP)))&&
                    !wt.isInterrupted())
                    wt.interrupt();
                try{
                    beforeExecute(wt, task);//執(zhí)行任務(wù)前的Hook方法,可自定義
                    Throwable thrown =null;
                    try{
                        task.run();//執(zhí)行實(shí)際的任務(wù)
                    }catch(RuntimeException x){
                        thrown = x;throw x;
                    }catch(Error x){
                        thrown = x;throw x;
                    }catch(Throwable x){
                        thrown = x;thrownewError(x);
                    }finally{
                        afterExecute(task, thrown);//執(zhí)行任務(wù)后的Hook方法,可自定義
                    }
                }finally{
                    task =null;//執(zhí)行完成后,將當(dāng)前線程中的任務(wù)制空,準(zhǔn)備執(zhí)行下一個(gè)任務(wù)
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly =false;
        }finally{
            processWorkerExit(w, completedAbruptly);//線程執(zhí)行完成后的清理工作
        }
    }
小結(jié):runWorker () 方法主要功能;

1.循環(huán)從緩存隊(duì)列中獲取新的任務(wù),直到?jīng)]有任務(wù)為止;

2.使用 worker 持有的線程真實(shí)執(zhí)行任務(wù);

3.任務(wù)都執(zhí)行完成后的清理工作;

2.1.3.5、隊(duì)列中獲取待執(zhí)行任務(wù)
privateRunnablegetTask(){
        boolean timedOut =false;//標(biāo)識(shí)當(dāng)前線程是否超時(shí)未能獲取到task對(duì)象

        for(;;){
            int c = ctl.get();
            int rs =runStateOf(c);

            // Check if queue empty only if necessary.
            if(rs >= SHUTDOWN &&(rs >= STOP || workQueue.isEmpty())){
                decrementWorkerCount();
                returnnull;
            }

            int wc =workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if((wc > maximumPoolSize ||(timed && timedOut))
                &&(wc >1|| workQueue.isEmpty())){
                if(compareAndDecrementWorkerCount(c))//若線程存活時(shí)間超時(shí),則CAS減去線程數(shù)量
                    returnnull;
                continue;
            }

            try{
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)://允許超時(shí)回收則阻塞等待
                    workQueue.take();//不允許則直接獲取,沒有就返回null
                if(r !=null)
                    return r;
                timedOut =true;
            }catch(InterruptedException retry){
                timedOut =false;
            }
        }
    }
小結(jié):getTask () 方法主要功能;

1.實(shí)際在緩存隊(duì)列中獲取待執(zhí)行的任務(wù);

2.在這里管理線程是否要阻塞等待,控制線程的數(shù)量;

2.1.3.6、清理工作
privatevoidprocessWorkerExit(Worker w,boolean completedAbruptly){
        if(completedAbruptly)// If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        finalReentrantLock mainLock =this.mainLock;
        mainLock.lock();
        try{
            completedTaskCount += w.completedTasks;
            workers.remove(w);//移除執(zhí)行完成的線程
        }finally{
            mainLock.unlock();
        }

        tryTerminate();//每次回收完一個(gè)線程后都嘗試終止線程池

        int c = ctl.get();
        if(runStateLessThan(c, STOP)){//到這里說明線程池沒有終止
            if(!completedAbruptly){
                int min = allowCoreThreadTimeOut ?0: corePoolSize;
                if(min ==0&&! workQueue.isEmpty())
                    min =1;
                if(workerCountOf(c)>= min)
                    return;// replacement not needed
            }
            addWorker(null,false);//異常終止線程的話,需要在常見一個(gè)線程
        }
    }

小結(jié):processWorkerExit () 方法主要功能;

1.真實(shí)完成線程池線程的回收;

2.調(diào)用嘗試終止線程池;

3.保證線程池正常運(yùn)行;

2.1.3.7、嘗試終止線程池

finalvoidtryTerminate(){
        for(;;){
            int c = ctl.get();
            
            //若線程池正在執(zhí)行、線程池已終止、線程池還需要執(zhí)行緩存隊(duì)列中的任務(wù)時(shí),返回
            if(isRunning(c)||
                runStateAtLeast(c, TIDYING)||
                (runStateOf(c)== SHUTDOWN &&! workQueue.isEmpty()))
                return;
                
            //執(zhí)行到這里,線程池為SHUTDOWN且無待執(zhí)行任務(wù) 或 STOP 狀態(tài)
            if(workerCountOf(c)!=0){
                interruptIdleWorkers(ONLY_ONE);//只中斷一個(gè)線程
                return;
            }

            //執(zhí)行到這里,線程池已經(jīng)沒有可用線程了,可以終止了
            finalReentrantLock mainLock =this.mainLock;
            mainLock.lock();
            try{
                if(ctl.compareAndSet(c,ctlOf(TIDYING,0))){//CAS設(shè)置線程池終止
                    try{
                        terminated();//執(zhí)行鉤子方法
                    }finally{
                        ctl.set(ctlOf(TERMINATED,0));//這里將線程池設(shè)為終態(tài)
                        termination.signalAll();
                    }
                    return;
                }
            }finally{
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }
小結(jié):tryTerminate () 方法主要功能;

1.實(shí)際嘗試終止線程池;

2.終止成功則調(diào)用鉤子方法,并且將線程池置為終態(tài)。

2.2、JAVA 線程池總結(jié)

以上通過對(duì) JAVA 線程池的具體分析我們可以看出,雖然流程看似復(fù)雜,但其實(shí)有很多內(nèi)容都是狀態(tài)重復(fù)校驗(yàn)、線程安全的保證等內(nèi)容,其主要的功能與我們前面所提出的設(shè)計(jì)功能一致,只是額外增加了一些擴(kuò)展,下面我們簡(jiǎn)單整理下線程池的功能;

2.2.1、主要功能

1.線程數(shù)量及存活時(shí)間的管理;

2.待處理任務(wù)的存儲(chǔ)功能;

3.線程復(fù)用機(jī)制功能;

4.任務(wù)超量的拒絕功能;

2.2.2、擴(kuò)展功能

1.簡(jiǎn)單的執(zhí)行結(jié)果統(tǒng)計(jì)功能;

2.提供線程執(zhí)行異常處理機(jī)制;

3.執(zhí)行前后處理流程自定義;

4.提供線程創(chuàng)建方式的自定義;

2.2.3、流程總結(jié)

以上通過對(duì) JAVA 線程池任務(wù)提交流程的分析我們可以看出,線程池執(zhí)行的簡(jiǎn)單流程如下圖所示;

42b18592-7c96-11ee-939d-92fbcf53809c.png

2.3、JAVA 線程池使用

線程池基本使用驗(yàn)證上述流程:

publicstaticvoidmain(String[] args)throwsException{
        
        //創(chuàng)建線程池
       ThreadPoolExecutor threadPoolExecutor =newThreadPoolExecutor(
               5,10,100,TimeUnit.SECONDS,newArrayBlockingQueue(5));
        
        //加入4個(gè)任務(wù),小于核心線程,應(yīng)該只有4個(gè)核心線程,隊(duì)列為0
        for(int i =0; i <4; i++){
            threadPoolExecutor.submit(newMyRunnable());
        }
        System.out.println("worker count = "+ threadPoolExecutor.getPoolSize());//worker count = 4
        System.out.println("queue size = "+ threadPoolExecutor.getQueue().size());//queue size = 0
        
        //再加4個(gè)任務(wù),超過核心線程,但是沒有超過核心線程 + 緩存隊(duì)列容量,應(yīng)該5個(gè)核心線程,隊(duì)列為3
        for(int i =0; i <4; i++){
            threadPoolExecutor.submit(newMyRunnable());
        }
        System.out.println("worker count = "+ threadPoolExecutor.getPoolSize());//worker count = 5
        System.out.println("queue size = "+ threadPoolExecutor.getQueue().size());//queue size = 3
        
        //再加4個(gè)任務(wù),隊(duì)列滿了,應(yīng)該5個(gè)熱核心線程,隊(duì)列5個(gè),非核心線程2個(gè)
        for(int i =0; i <4; i++){
            threadPoolExecutor.submit(newMyRunnable());
        }
        System.out.println("worker count = "+ threadPoolExecutor.getPoolSize());//worker count = 7
        System.out.println("queue size = "+ threadPoolExecutor.getQueue().size());//queue size = 5
        
        //再加4個(gè)任務(wù),核心線程滿了,應(yīng)該5個(gè)熱核心線程,隊(duì)列5個(gè),非核心線程5個(gè),最后一個(gè)拒絕
        for(int i =0; i <4; i++){
            try{
                threadPoolExecutor.submit(newMyRunnable());
            }catch(Exception e){
                e.printStackTrace();//java.util.concurrent.RejectedExecutionException
            }
        }
        System.out.println("worker count = "+ threadPoolExecutor.getPoolSize());//worker count = 10
        System.out.println("queue size = "+ threadPoolExecutor.getQueue().size());//queue size = 5
        System.out.println(threadPoolExecutor.getTaskCount());//共執(zhí)行15個(gè)任務(wù)
        
        //執(zhí)行完成,休眠15秒,非核心線程釋放,應(yīng)該5個(gè)核心線程,隊(duì)列為0
        Thread.sleep(1500);
        System.out.println("worker count = "+ threadPoolExecutor.getPoolSize());//worker count = 5
        System.out.println("queue size = "+ threadPoolExecutor.getQueue().size());//queue size = 0
        
        //關(guān)閉線程池
        threadPoolExecutor.shutdown();
    }






審核編輯:劉清

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • cpu
    cpu
    +關(guān)注

    關(guān)注

    68

    文章

    10870

    瀏覽量

    211899
  • JAVA
    +關(guān)注

    關(guān)注

    19

    文章

    2969

    瀏覽量

    104789
  • 線程池
    +關(guān)注

    關(guān)注

    0

    文章

    57

    瀏覽量

    6853

原文標(biāo)題:深入淺出線程池

文章出處:【微信號(hào):OSC開源社區(qū),微信公眾號(hào):OSC開源社區(qū)】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    Java線程包括哪些

    線程是用來統(tǒng)管理線程的,在 Java 創(chuàng)建和銷毀線程
    的頭像 發(fā)表于 10-11 15:33 ?821次閱讀
    <b class='flag-5'>Java</b><b class='flag-5'>中</b>的<b class='flag-5'>線程</b><b class='flag-5'>池</b>包括哪些

    線程是如何實(shí)現(xiàn)的

    線程的概念是什么?線程是如何實(shí)現(xiàn)的?
    發(fā)表于 02-28 06:20

    java自帶的線程方法

    二、原理分析 從上面使用線程的例子來看,最主要就是兩步,構(gòu)造ThreadPoolExecutor對(duì)象,然后每來個(gè)任務(wù),就調(diào)用ThreadPoolExecutor對(duì)象的execute
    發(fā)表于 09-27 11:06 ?0次下載

    如何正確關(guān)閉線程

    前言本章分為兩個(gè)議題 如何正確關(guān)閉線程 shutdown 和 shutdownNow 的區(qū)別 項(xiàng)目環(huán)境jdk 1.8 github 地址:https://github.com
    的頭像 發(fā)表于 09-29 14:41 ?9932次閱讀

    基于Nacos的簡(jiǎn)單動(dòng)態(tài)化線程實(shí)現(xiàn)

    本文以Nacos作為服務(wù)配置中心,以修改線程核心線程數(shù)、最大線程數(shù)為例,實(shí)現(xiàn)個(gè)簡(jiǎn)單的動(dòng)態(tài)化
    發(fā)表于 01-06 14:14 ?868次閱讀

    線程線程

    線程通常用于服務(wù)器應(yīng)用程序。 每個(gè)傳入請(qǐng)求都將分配給線程池中的個(gè)線程,因此可以異步處理請(qǐng)求,
    的頭像 發(fā)表于 02-28 09:53 ?800次閱讀
    多<b class='flag-5'>線程</b>之<b class='flag-5'>線程</b><b class='flag-5'>池</b>

    Java線程核心原理

    看過Java線程源碼的小伙伴都知道,在Java線程池中最核心的類就是ThreadPoolExecutor,
    的頭像 發(fā)表于 04-21 10:24 ?858次閱讀

    細(xì)數(shù)線程的10個(gè)

    JDK開發(fā)者提供了線程的實(shí)現(xiàn)類,我們基于Executors組件,就可以快速創(chuàng)建個(gè)線程 。
    的頭像 發(fā)表于 06-16 10:11 ?729次閱讀
    細(xì)數(shù)<b class='flag-5'>線程</b><b class='flag-5'>池</b>的10<b class='flag-5'>個(gè)</b>坑

    線程線程怎么釋放

    線程分組看,pool名開頭線程占616條,而且waiting狀態(tài)也是616條,這個(gè)點(diǎn)就非??梢闪耍覕喽ň褪沁@個(gè)pool開頭線程導(dǎo)致的問題。我們先排查為何這個(gè)
    發(fā)表于 07-31 10:49 ?2301次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的<b class='flag-5'>線程</b>怎么釋放

    線程的兩個(gè)思考

    今天還是說一下線程的兩個(gè)思考。 池子 我們常用的線程, JDK的ThreadPoolExecutor. CompletableFutur
    的頭像 發(fā)表于 09-30 11:21 ?3106次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的兩<b class='flag-5'>個(gè)</b>思考

    Spring 的線程應(yīng)用

    我們?cè)谌粘i_發(fā),經(jīng)常跟多線程打交道,Spring 為我們提供了個(gè)線程方便我們開發(fā),它就是
    的頭像 發(fā)表于 10-13 10:47 ?626次閱讀
    Spring 的<b class='flag-5'>線程</b><b class='flag-5'>池</b>應(yīng)用

    線程基本概念與原理

    、17、20等的新特性,簡(jiǎn)化了多線程編程的實(shí)現(xiàn)。 提高性能與資源利用率 線程主要解決兩個(gè)問題:線程創(chuàng)建與銷毀的開銷以及
    的頭像 發(fā)表于 11-10 10:24 ?541次閱讀

    線程的基本概念

    ? 呃呃,我這么問就很奇怪,因?yàn)?b class='flag-5'>線程是什么我都沒說,怎么會(huì)知道為什么會(huì)有線程呢?所以我打算帶大家去思考
    的頭像 發(fā)表于 11-10 16:37 ?529次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的基本概念

    線程的創(chuàng)建方式有幾種

    線程種用于管理和調(diào)度線程的技術(shù),能夠有效地提高系統(tǒng)的性能和資源利用率。它通過預(yù)先創(chuàng)建線程
    的頭像 發(fā)表于 12-04 16:52 ?874次閱讀

    什么是動(dòng)態(tài)線程?動(dòng)態(tài)線程的簡(jiǎn)單實(shí)現(xiàn)思路

    因此,動(dòng)態(tài)可監(jiān)控線程種針對(duì)以上痛點(diǎn)開發(fā)的線程管理工具。主要可實(shí)現(xiàn)功能有:提供對(duì) Spring 應(yīng)用內(nèi)
    的頭像 發(fā)表于 02-28 10:42 ?648次閱讀