0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫(xiě)文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

使用C++11新特性實(shí)現(xiàn)一個(gè)通用的線程池設(shè)計(jì)

CPP開(kāi)發(fā)者 ? 來(lái)源:程序員班吉 ? 2023-12-22 13:58 ? 次閱讀

C++11標(biāo)準(zhǔn)之前,多線程編程只能使用pthread_xxx開(kāi)頭的一組POSIX標(biāo)準(zhǔn)的接口。從C++11標(biāo)準(zhǔn)開(kāi)始,多線程相關(guān)接口封裝在了C++的std命名空間里。

Linux中,即便我們用std命名空間中的接口,也需要使用-lpthread鏈接pthread庫(kù),不難猜出,C++多線程的底層依然還是POSIX標(biāo)準(zhǔn)的接口。你可能會(huì)有疑問(wèn),既然底層還是使用pthread庫(kù),為什么不直接用pthread,而要繞一大圈再封裝一層呢?

在我看來(lái),除了統(tǒng)一C++編程接口之外,C++11標(biāo)準(zhǔn)更多的是在語(yǔ)言層面做了很多優(yōu)化,規(guī)避了原來(lái)C語(yǔ)言中的很多陷阱,比如C++11中的lock_guard、future、promise等技術(shù),將原來(lái)C語(yǔ)言中語(yǔ)法上容易犯錯(cuò)的地方進(jìn)行了簡(jiǎn)化,簡(jiǎn)單來(lái)說(shuō)就是將原來(lái)依賴人的地方交給了編譯器(很多時(shí)候機(jī)器比人更可靠)。比如在C++11標(biāo)準(zhǔn)之前,我們使用mutex是像下面這樣的:

pthread_mutex_lock(mutex)
....
if (condition) {
  ...
} elser if {
  ...
} else {
  ...
}
...
pthread_mutex_unlock(mutex)
相信有mutex使用經(jīng)驗(yàn)的人都或多或少都在這上面踩過(guò)坑,比如少寫(xiě)了一個(gè)unlock,中間異常退出沒(méi)有執(zhí)行到unlock等等各種各樣的情況導(dǎo)致的鎖沒(méi)有被正確釋放。而在C++11標(biāo)準(zhǔn)中,我們只需要使用lock_guard就可以了,比如:
lock_gruard locker(mutex)
...
if (condition) {
  ...
} elser if {
  ...
} else {
  ...
}
...

C++編譯器會(huì)自動(dòng)為我們插入釋放鎖的代碼,這樣我們就不用時(shí)刻擔(dān)心鎖有沒(méi)有正確釋放了。我個(gè)人的感覺(jué)是,從C++11標(biāo)準(zhǔn)開(kāi)始,C++變得不那么可怕了,甚至很多時(shí)候覺(jué)得C++變得好用了。

這篇文章我們?cè)囍褂肅++11標(biāo)準(zhǔn)的新特性來(lái)實(shí)現(xiàn)一個(gè)通用的線程池。首先,我們來(lái)看一下C++11標(biāo)準(zhǔn)中線程是如何創(chuàng)建的,先看代碼:

#include 
#include 


using namespace std;
void threadFunc(int &a) {
    a += 10;
    std::cout << "this is thread fun!" << std::endl;
}


int main() {
    int x = 10;
    std::thread t1(threadFunc, std::ref(x));
    t1.join();


    std::cout << "x=" << x << std::endl;
}

使用std::thread創(chuàng)建一個(gè)t1線程對(duì)象,傳入線程主函數(shù),將x以引用的方式傳入線程主函數(shù),接著調(diào)用join方法等待主函數(shù)threadFunc執(zhí)行完成。上面x最終的結(jié)果將是20。

整個(gè)流程和使用pthread_create其實(shí)區(qū)別不大,只是C++11將線程的創(chuàng)建封裝成了一個(gè)類,使得我們可以用面向?qū)ο缶幊痰姆绞絹?lái)創(chuàng)建多線程。

我們可以思考一下,要實(shí)現(xiàn)一個(gè)線程池,應(yīng)該怎么做?這個(gè)問(wèn)題我們可以倒過(guò)來(lái)想一下,線程池應(yīng)該怎么使用。比如,對(duì)于一個(gè)web服務(wù)器來(lái)講,為了能利用多核的優(yōu)勢(shì),我們可以在主線程里接收到請(qǐng)求之后,將對(duì)這個(gè)請(qǐng)求的具體操作交給worker線程,比如像下面這樣:

04c8b604-a07e-11ee-8b88-92fbcf53809c.png

這個(gè)流程我們?cè)贑語(yǔ)言網(wǎng)絡(luò)編程那個(gè)系列文章中有非常詳細(xì)的說(shuō)明,如果你get不到這個(gè)例子是什么意思,建議你去看一下那個(gè)系列的文章(C語(yǔ)言網(wǎng)絡(luò)編程系列合集)

主線程將請(qǐng)求交給Worker線程這個(gè)好理解,但是你有沒(méi)有想過(guò),它怎么就能交給Worker線程呢?線程創(chuàng)建完之后,對(duì)應(yīng)的主函數(shù)就已經(jīng)運(yùn)行起來(lái)了,對(duì)應(yīng)accept出來(lái)的套接字要怎么傳遞給一個(gè)正在運(yùn)行的函數(shù)呢?

為了說(shuō)明這個(gè)問(wèn)題,我們先來(lái)看一段Golang的代碼,暫時(shí)看不懂沒(méi)關(guān)系,后面我會(huì)解釋

func TestWorker(t *testing.T) {
  msg := make(chan int, 1)
  notify := make(chan struct{}, 0)
  poolSize := 10


  buildWorkers(poolSize, msg, notify)


  // 模擬任務(wù)
  for i := 0; i < poolSize; i++ {
    msg <- i
  }


  for i := 0; i < poolSize; i++ {
    <-notify
  }
}


func buildWorkers(poolSize int, msg <-chan int, notify chan<- struct{}) {
  for i := 0; i < poolSize; i++ {
    go func(i int) {
      if ret := recover(); ret != nil {
        log.Printf("panic: %v", ret)
      }


      log.Println("worker-id:", i)


      for {
        select {
        case m := <-msg:
          fmt.Println("m:", m)
          notify <- struct{}{}
        }
      }
    }(i)
  }
}

buildWorkers方法創(chuàng)建了10個(gè)協(xié)程,每個(gè)協(xié)程里面都有一個(gè)for循環(huán),一開(kāi)始每個(gè)for循環(huán)都阻塞在select語(yǔ)句中的case m := ←msg,如果之前沒(méi)有接觸過(guò)Go語(yǔ)言(這里的select你可以簡(jiǎn)單和Linux中的select技術(shù)類比)。另外還有兩個(gè)通道,一個(gè)是msg和notify,msg用來(lái)傳遞參數(shù),notify用來(lái)通知外面這個(gè)協(xié)程任務(wù)已經(jīng)執(zhí)行完了。

在TestWorker方法中,我們模擬了10個(gè)任務(wù),這10個(gè)任務(wù)不斷的往msg通道中發(fā)送數(shù)據(jù),當(dāng)msg有數(shù)據(jù)之后,我們創(chuàng)建的那10個(gè)協(xié)程就會(huì)爭(zhēng)取從msg通道接收消息,只要接收到消息就說(shuō)明這是執(zhí)行任務(wù)所必需的參數(shù)。執(zhí)行完成之后向notify發(fā)送消息。在TestWorker中我們同樣接收了10次,在沒(méi)有消息的時(shí)候就會(huì)阻塞在<-notify這一行,直到有協(xié)程執(zhí)行完成向notify通道發(fā)送消息,這里就從<-notify返回,進(jìn)入下一次循環(huán)。上面其實(shí)就是一個(gè)非常簡(jiǎn)單的協(xié)程池了,當(dāng)然為了演示,代碼并不是很完整。

運(yùn)行上面的代碼,得到的結(jié)果大概像下面這樣

2023/10/07 21:37:44 worker-id: 9
2023/10/07 21:37:44 worker-id: 4
2023/10/07 21:37:44 worker-id: 8
2023/10/07 21:37:44 m: 2
2023/10/07 21:37:44 m: 0
2023/10/07 21:37:44 worker-id: 5
2023/10/07 21:37:44 m: 3
2023/10/07 21:37:44 worker-id: 2
2023/10/07 21:37:44 worker-id: 1
2023/10/07 21:37:44 m: 5
2023/10/07 21:37:44 m: 4
2023/10/07 21:37:44 worker-id: 7
2023/10/07 21:37:44 m: 6
2023/10/07 21:37:44 worker-id: 0
2023/10/07 21:37:44 m: 7
2023/10/07 21:37:44 m: 1
2023/10/07 21:37:44 worker-id: 6
2023/10/07 21:37:44 m: 8
2023/10/07 21:37:44 m: 9
2023/10/07 21:37:44 worker-id: 3

從上面的結(jié)果來(lái)看,協(xié)程運(yùn)行并不是順序執(zhí)行的,這和多線程是一樣的道理。上面Golang的代碼執(zhí)行的流程我畫(huà)了一張圖,如下:

04db9b2a-a07e-11ee-8b88-92fbcf53809c.png

注意箭頭的方向,所有協(xié)程都不斷的嘗試從channel中接收消息,拿到程序運(yùn)行必要的參數(shù),當(dāng)msg中有數(shù)據(jù)時(shí)從case m := <-msg中蘇醒并執(zhí)行具體的業(yè)務(wù)邏輯,我們知道,在Golang中channel是線程安全的,其內(nèi)部有一把鎖,這把鎖就是mutex,下面是channel底層結(jié)構(gòu)體

// src/runtime/chan.go:33
type hchan struct {
  ...
  lock mutex
}

channel除了能保證線程安全,還能保證順序性,也就是發(fā)送方最先發(fā)送的,在接收方一定也是最先收到的。這不就是一個(gè)加了鎖的隊(duì)列嗎?我們可以試著想一下在C++中是不是也可以實(shí)現(xiàn)類似的效果呢?不難想到,我們可以使用一個(gè)隊(duì)列在各個(gè)線程之間傳遞數(shù)據(jù),像下面這樣:

04e27f1c-a07e-11ee-8b88-92fbcf53809c.png

主線程accept出來(lái)的套接字,只管往隊(duì)列里面丟就可以了,我們創(chuàng)建的一堆worker線程,不斷的嘗試從隊(duì)列里面pop數(shù)據(jù)。這樣,我們就解決了線程之間的交互問(wèn)題。

下面,我們就參照上面Golang的代碼,先把這個(gè)框架給搭出來(lái),然后再在這個(gè)基礎(chǔ)之上去完善代碼,最后實(shí)現(xiàn)一個(gè)準(zhǔn)生產(chǎn)的線程池。

我們先參照上面Golang的代碼,實(shí)現(xiàn)相似邏輯,代碼如下:

#include 
#include 
#include 


void threadFunc(std::queue& q) {
    while (1) {
        if (!q.empty()) {
            int param = q.front();
            q.pop();
            std::cout << "param:" << param << std::endl;
        }
    }
}


void jobDispatch(std::queue& q) {
    for (int i = 0; i < 1000; i++) {
        q.push(i);
    }
}


int main() {
   std::queue q1;


   std::vector ths;
   for (int i = 0; i < 10; i++) {
       ths.emplace_back(threadFunc, std::ref(q1));
   }


   jobDispatch(q1);


   for (auto& th: ths) {
       th.join();
   }


   return 0;
}

上面的代碼盡可能的還原了Golang的邏輯,我們來(lái)分析一下這段代碼。在main函數(shù)中,創(chuàng)建了一個(gè)隊(duì)列q1,這個(gè)隊(duì)列用來(lái)向線程池傳遞參數(shù),接著創(chuàng)建了10個(gè)線程保存在了vector中,將q1以引用的形式傳入線程池主函數(shù)(注意:這里傳引用必須使用std::ref包裝一下),再接著調(diào)用jobDispatch模擬任務(wù)分配然后每個(gè)線程調(diào)用join等待結(jié)束。

接著我們來(lái)看線程池主函數(shù)threadFunc,這個(gè)函數(shù)接收一個(gè)隊(duì)列q作為參數(shù),這里的q就是我們?cè)趧?chuàng)建線程池的時(shí)候傳進(jìn)來(lái)的q1,然后是一個(gè)死循環(huán),在這個(gè)循環(huán)里面我們不斷的判斷隊(duì)列是否為空,如果不為空就從隊(duì)列取出一個(gè)元素出來(lái)。最后,分配任務(wù)的函數(shù)jobDispatch向隊(duì)列q1里面push了1000個(gè)元素,來(lái)模擬1000個(gè)任務(wù)。

上面的代碼當(dāng)然是有問(wèn)題的,有興趣的可以把這段代碼拷貝下來(lái)把自己跑一下,你會(huì)發(fā)現(xiàn)雖然代碼能跑,但是結(jié)果完全看不懂。

首先,第一個(gè)問(wèn)題就是queue不是線程安全的。所以,這個(gè)隊(duì)列得有一把鎖,比如:

std::mutex mtx;


void threadFunc(std::queue& q) {
    while (true) {
        if (!q.empty()) {
            std::lock_guard ltx(mtx);
            int param = q.front();
            q.pop();
            std::cout << "param:" << param << std::endl;
        }
    }
}

我們?cè)趖hreadFund函數(shù)中的出隊(duì)列之前加了一把鎖。這把鎖是全局的,每個(gè)線程都要先拿到這把鎖之后才能從隊(duì)列里拿到數(shù)據(jù),你可以把這段代碼替換之后再運(yùn)行一下,這次的結(jié)果應(yīng)該是正確的了。

可能你覺(jué)得奇怪,我們使用lock_guard創(chuàng)建了一個(gè)ltx對(duì)象,但是并沒(méi)有地方去釋放這把鎖,如果你有這樣的疑問(wèn)應(yīng)該是對(duì)C++11還不是很熟悉,在C++11標(biāo)準(zhǔn)中,因?yàn)橛辛薘AII的緣故,一個(gè)對(duì)象被銷毀時(shí)一定會(huì)執(zhí)行析構(gòu)函數(shù),就算是運(yùn)行過(guò)程中對(duì)象產(chǎn)生異常析構(gòu)函數(shù)也會(huì)執(zhí)行,在C++中這叫棧展開(kāi)。有了這個(gè)特性之后,lock_guard就不難理解了,其構(gòu)造函數(shù)其實(shí)就是調(diào)用了mutex的lock方法,然后把這個(gè)mutex保存在成員變量中,當(dāng)對(duì)象銷毀時(shí)析構(gòu)函數(shù)中調(diào)用unlock。所以,有了這個(gè)機(jī)制之后,我們就不用到處寫(xiě)unlock了,這也是我覺(jué)得C++更好用了的原因之一。

在C++中同樣遵循大括號(hào)作用域,在上面的代碼中,lock_guard是在if語(yǔ)句中的,當(dāng)if語(yǔ)句執(zhí)行完之后,ltx就被銷毀了,所以當(dāng)循環(huán)進(jìn)入到下一次的時(shí)候?qū)嶋H上鎖已經(jīng)被釋放了。

這樣我們就解決了隊(duì)列的線程安全問(wèn)題,但眼尖的你一定看出來(lái)其實(shí)還有一個(gè)問(wèn)題,threadFunc函數(shù)中的死循環(huán)一直在空轉(zhuǎn),這顯然是有問(wèn)題的。解決這個(gè)問(wèn)題最容易想到的就是每次循環(huán)都sleep一下,但這顯然也是有問(wèn)題的,對(duì)于一個(gè)有實(shí)時(shí)要求的系統(tǒng)是不能接受的。

所以,我們迫切需要一種機(jī)制,讓threadFunc沒(méi)事干的時(shí)候就停在那等通知,想想看什么技術(shù)可以實(shí)現(xiàn)?對(duì),就是cond,在C++11中條件變量也被封裝在了std::命名空間中。下面我們就使用cond來(lái)改造一下,相關(guān)代碼如下:

std::mutex mtx;
std::condition_variable cond;   // v2


void threadFunc(std::queue& q) {
    while (true) {
        std::unique_lock ltx(mtx);       // v2
        cond.wait(ltx, [q]() { return !q.empty();}); // v2


        int param = q.front();
        q.pop();
        std::cout << "param:" << param << std::endl;
    }
}


void jobDispatch(std::queue& q) {
    for (int i = 0; i < 1000; i++) {
        q.push(i);
    }
    cond.notify_all();  // v2
}

修改后的代碼我在后面都加了注釋(v2), 首先我們定義了一個(gè)全局的條件變量cond,然后在threadFunc中調(diào)用cond的wait方法等待通知。然后在jobDispatch中往隊(duì)列里面寫(xiě)完數(shù)據(jù)之后調(diào)用notify_all方法通知所有等待的線程。這樣,當(dāng)隊(duì)列中沒(méi)有數(shù)據(jù)的時(shí)候線程池中的線程就會(huì)進(jìn)入睡眠,直到notify_all被調(diào)用。這里你可以想一下,上面notify_all還可以進(jìn)一步優(yōu)化嗎?

當(dāng)然,上面還作了一個(gè)調(diào)整,就是將原來(lái)的lock_guard換了unique_lock,這個(gè)改動(dòng)是必須的,我們知道cond調(diào)用wait的時(shí)候會(huì)嘗試釋放鎖,可lock_guard里面沒(méi)有釋放鎖的方法,而unique_lock是有unlock方法的。也就是說(shuō),unique_lock創(chuàng)建的ltx對(duì)象可以手動(dòng)調(diào)用unlock方法釋放鎖。

好了,到這里其實(shí)我們已經(jīng)寫(xiě)出一個(gè)簡(jiǎn)單的線程池了,這個(gè)線程池通過(guò)一個(gè)隊(duì)列傳遞參數(shù),使用mutex解決線程安全問(wèn)題,cond解決性能問(wèn)題??雌饋?lái)已經(jīng)和上面Golang的邏輯非常接近了。如果你使用Golang寫(xiě)過(guò)代碼,并且上面C++的代碼你也嘗試寫(xiě)出來(lái)了,你就會(huì)驚嘆于Golang簡(jiǎn)單了。好了,這里不吹Golang了,我們繼續(xù)回到C++上來(lái)。

當(dāng)然,到這里還遠(yuǎn)遠(yuǎn)沒(méi)完呢,C++的看家本領(lǐng)是啥?對(duì),是面向?qū)ο缶幊?。上面的代碼很顯然沒(méi)有面向?qū)ο蟮奈兜?。下面我們就使用面向?qū)ο蟮乃枷雭?lái)實(shí)現(xiàn)一個(gè)線程池。這里直接給出代碼

#include 
#include 
#include 
#include 


class TPool {
public:
    TPool(): m_thread_size(1), m_terminate(false) {};
    ~TPool() { stop(); }
    // 線程池初始化
    bool init(size_t size);
    // 停止所有線程
    void stop();
    // 啟動(dòng)線程池的各個(gè)線程
    void start();
    // 任務(wù)執(zhí)行入口
    template 
    auto exec(F&& f, A&&... args)->std::future;
    // 等待所有線程執(zhí)行完成
    bool waitDone();


private:
    // 每個(gè)任務(wù)都是一個(gè)struct結(jié)構(gòu)體,方便未來(lái)擴(kuò)展
    struct Task {
        Task() {}
        std::function m_func;
    };
    // Task未來(lái)在隊(duì)列中以智能指針的形式傳遞
    typedef std::shared_ptr TaskFunc;


private:
    // 嘗試從任務(wù)隊(duì)列獲取一個(gè)任務(wù)
    bool get(TaskFunc &t);
    // 線程主函數(shù)
    bool run();


private:
    // 任務(wù)隊(duì)列,將Task直接放到隊(duì)列中
    std::queue m_tasks;
    // 線程池
    std::vector m_threads;
    // 鎖
    std::mutex m_mutex;
    // 用于線程之間通知的條件變量
    std::condition_variable m_cond;
    // 線程池大小
    size_t m_thread_size;
    // 標(biāo)記線程是否結(jié)束
    bool m_terminate;
    // 用來(lái)記錄狀態(tài)的原子變量
    std::atomic m_atomic{0};
};

我們定義了一個(gè)TPool類,這個(gè)類里面包含幾個(gè)部分,我們從下往上看。第一個(gè)部分是線程池管理相關(guān)的各種資源,每一個(gè)我都寫(xiě)了注釋。第二部分是任務(wù)相關(guān)的操作,這部分不對(duì)外開(kāi)放。第三部分是任務(wù)定義,使用struct聲明了一個(gè)Task,其中有一個(gè)空的構(gòu)造函數(shù),還聲明了一個(gè)m_func,這是最終task執(zhí)行的入口。最后一部分是線程池對(duì)外開(kāi)放的各種接口。用戶使用線程的大致流程如下:

04e99b1c-a07e-11ee-8b88-92fbcf53809c.png

這里我將線程的初始化和線程的啟動(dòng)分成了兩步,是希望在使用的時(shí)候精確知道是哪一步出了問(wèn)題,如果你覺(jué)得這樣太繁瑣,可以適當(dāng)減少步驟,比如將init和start方法進(jìn)行合并。

下面我們就來(lái)詳細(xì)講一下線程各個(gè)方法的實(shí)現(xiàn)。首先是init和start方法,init方法用來(lái)初始化線程,代碼如下:

bool init(size_t size) {
    unique_lock lock(m_mutex);
    if (!m_threads.empty()) {
        return false;
    }
    m_thread_size = size;
    return true;
}

傳入一個(gè)size,表示線程池的大小,上來(lái)就加鎖,這是為了防止在多線程的情景下執(zhí)行init,這個(gè)方法實(shí)際上只做了一件事,就是設(shè)置線程池的大小。

初始化完了之后,調(diào)用start方法啟動(dòng)線程池,start方法的代碼如下:

bool start() {
    unique_lock lock(m_mutex);
    if (!m_threads.empty()) {
        return false;
    }


    for (size_t i = 0; i < m_thread_size; i++) {
        m_threads.push_back(new thread(&TPool::run, this));
    }
    return true;
}
同樣,為了防止多線程語(yǔ)境,上來(lái)也是加了一把鎖,如果m_threads不為空說(shuō)明已經(jīng)初始化過(guò)了,直接就返回了。接著就創(chuàng)建線程放到m_threads這個(gè)vector中,線程的主函數(shù)是當(dāng)前類的run方法。這樣,所有線程的主函數(shù)都跑起來(lái)了,接下來(lái)我們看一下線程主函數(shù)的代碼,如下:
void run() {
    while(!m_terminate) {
        TaskFunc task;
        bool ok = get(task);
        if (ok) {
            ++m_atomic;


            task->m_func();


            --m_atomic;


            unique_lock lock(m_mutex);
            if (m_atomic == 0 && m_tasks.empty()) { // 是否所有任務(wù)都執(zhí)行完成
                m_cond.notify_all();
            }
        }
    }
}

不出所料,run方法里其實(shí)就是一個(gè)死循環(huán),這個(gè)循環(huán)上來(lái)就判斷是否結(jié)束了,如果已經(jīng)結(jié)束就退出當(dāng)前循環(huán),run方法返回,當(dāng)前線程結(jié)束。

如果沒(méi)有結(jié)束,就調(diào)用get方法從任務(wù)隊(duì)列里取一個(gè)任務(wù)出來(lái)執(zhí)行,這里使用一個(gè)原子變量來(lái)判斷最后是不是所有任務(wù)都執(zhí)行完成,這個(gè)原子變量不是必須的,你可以根據(jù)你自己的場(chǎng)景做相應(yīng)的修改。取到任務(wù)之后,就會(huì)調(diào)用任務(wù)的m_func方法,還記得這個(gè)方法嗎?它定義在Task結(jié)構(gòu)體中。最后會(huì)判斷是否所有任務(wù)都結(jié)束了,如果已經(jīng)結(jié)束了會(huì)通知其它線程。

這里我們來(lái)看一下get方法是怎么獲取任務(wù)的,get方法的代碼如下:

bool get(TaskFunc &t) {
    unique_lock lock(m_mutex);
    if (m_tasks.empty()) {
        m_cond.wait(lock, [this]{return m_terminate || !m_tasks.empty();});
    }


    if (m_terminate)
        return false;


    t = std::move(m_tasks.front());
    m_tasks.pop();
    return true;
}

上來(lái)首先加了一把鎖,如果任務(wù)隊(duì)列沒(méi)有任務(wù)可以執(zhí)行,使用條件變量m_cond調(diào)用wait方法等待。

然后,如果此時(shí)線程已經(jīng)被結(jié)束掉了,直接返回false,如果沒(méi)有結(jié)束,就從隊(duì)列中取出一個(gè)任務(wù),賦值給傳進(jìn)來(lái)的t。注意,這里使用的是參數(shù)傳值的方式。這樣就實(shí)現(xiàn)了任務(wù)的傳遞,當(dāng)沒(méi)有任務(wù)的時(shí)候m_cond.wait會(huì)讓當(dāng)前進(jìn)程進(jìn)入睡眠,等待通知。

接下來(lái),我們看一下任務(wù)是如何被投遞到任務(wù)隊(duì)列中的,用來(lái)投遞任務(wù)的方法是exec,代碼如下:

template 
auto exec(F&& f, A&&... args)->future {
    using retType = decltype(f(args...));
    auto task = make_shared>(bind(std::forward(f), std::forward(args)...));
    TaskFunc fPtr = make_shared();
    fPtr->m_func = [task](){
        (*task)();
    };


    unique_lock lock(m_mutex);
    m_tasks.push(fPtr);
    m_cond.notify_one();


    return task->get_future();
}

exec方法稍微有一點(diǎn)復(fù)雜,知識(shí)點(diǎn)非常密集,我們簡(jiǎn)單過(guò)一下邏輯。首先,我們將exec方法聲明成了模板函數(shù),有兩個(gè)參數(shù),F(xiàn)表示任務(wù)最終執(zhí)行的方法,A是一個(gè)可變參數(shù),實(shí)際就是傳給函數(shù)f的參數(shù),返回值只有在運(yùn)行的時(shí)候才會(huì)知道,所以這里使用了自動(dòng)類型推導(dǎo),并且配合了decltype關(guān)鍵字,->future這句的意思是最終會(huì)返回一個(gè)future,這個(gè)future的類型是由decltype推導(dǎo)出f函數(shù)返回值的類型,這里有點(diǎn)繞,如果看不明白的話還是得去看一下future和decltype是怎么回事。

進(jìn)入函數(shù)內(nèi)部,我們一行一行講,首先是

using retType = decltype(f(args...));
decltype用于查詢表達(dá)的類型,這里的語(yǔ)義表達(dá)的就是f(args…)這個(gè)表達(dá)式最終返回的類型。接著,下一行是創(chuàng)建一個(gè)task,這個(gè)task是一個(gè)智能指針

autotask=make_shared>(bind(std::forward(f),std::forward(args)...));

首先,最外層make_shared是創(chuàng)建一個(gè)智能指針這沒(méi)什么可說(shuō)的。這里的std::packaged_task會(huì)根據(jù)前面推導(dǎo)出的類型創(chuàng)建出一個(gè)future對(duì)象,后面的bind是將這個(gè)函數(shù)和后面的可變參數(shù)綁定起來(lái)。這樣在函數(shù)調(diào)用的時(shí)候就可以獲取到參數(shù)了。

接著是創(chuàng)建Task類型的智能指針,并將剛剛創(chuàng)建好的函數(shù)放到Task結(jié)構(gòu)中的m_func中

TaskFunc fPtr = make_shared();
fPtr->m_func = [task](){
    (*task)();
};

上面用了一個(gè)Lambda表達(dá)式創(chuàng)建一個(gè)函數(shù),并將這個(gè)函數(shù)賦值給了m_func,最終任務(wù)執(zhí)行的其實(shí)就是這個(gè)Lambda表達(dá)式函數(shù),在這個(gè)函數(shù)中才最終調(diào)用傳進(jìn)來(lái)的方法。此時(shí),fPtr實(shí)際上就是一個(gè)Task對(duì)象,我們?cè)陬愔兄孛闪薚askFunc。接著將這個(gè)Task放到隊(duì)列中,注意要加鎖。最后將future對(duì)象返回出去。這意味著我們調(diào)用exec方法之后可以得到一個(gè)future對(duì)象。

exec方法是整個(gè)線程池中最復(fù)雜的部分了,涉及到很多C++的知識(shí),后面有時(shí)間我會(huì)專門(mén)開(kāi)幾篇文章單獨(dú)深入的去剖析這部分內(nèi)容。

最后,我們來(lái)看一下其它的幾個(gè)方法,首先是線程的停止,如下:

void stop() {
    {
        unique_lock lock(m_mutex);
        m_terminate = true;
        m_cond.notify_all();
    }


    for (auto & m_thread : m_threads) {
        if (m_thread->joinable()) {
            m_thread->join();
        }
        delete m_thread;
        m_thread = nullptr;
    }


    unique_lock lock(m_mutex);
    m_threads.clear();
}

這里我們使用了一對(duì)大括號(hào)將部分代碼包起來(lái)了,這種用法其實(shí)是為了讓鎖更早的釋放,unique_lock出了大括號(hào)就會(huì)被銷毀,從而調(diào)用析構(gòu)函數(shù)進(jìn)而釋放鎖。接著是等待各個(gè)線程結(jié)束,其實(shí)就是將m_terminate置為true,run里面的死循環(huán)跳出循環(huán),線程主函數(shù)返回。然后是清除各種資源。

最后我們實(shí)際用一下這個(gè)線程池,代碼如下:

#include "thread-pool.hpp"
#include 


using namespace std;


void threadFunc(int a) {
   cout << "a=" << a << endl;
}


class A {
public:
    A() = default;
    int run(int a, int b) {
        return a + b;
    }
};


int main() {
    TPool p1;
    p1.init(10);
    p1.start();
    p1.exec(threadFunc, 100);
    p1.exec(threadFunc, 200);


    A a1;
    auto fu1 = p1.exec(std::bind(&A::run, &a1, std::_1, std::_2), 10, 20);
    int ret = fu1.get();
    std::cout << "res:" << ret << std::endl;


    p1.waitDone();
    return 0;
}

可以看到,除了使用方法外,我們還可以使用一個(gè)類方法作為線程的主函數(shù),當(dāng)然,主函數(shù)是一個(gè)模板函數(shù),你可以傳任意的類型,好,到這里我整個(gè)線程池就實(shí)現(xiàn)完了。

總結(jié)

這篇文章我們使用C++11新特性實(shí)現(xiàn)了一個(gè)通用的線程池,我們先是使用Golang寫(xiě)了一個(gè)簡(jiǎn)單的協(xié)程池,然后順著相同的思路通過(guò)隊(duì)列傳參的形式實(shí)現(xiàn)了一個(gè)初級(jí)版本,但還沒(méi)有結(jié)束,因?yàn)镃++是支持面向?qū)ο缶幊痰?,所以我們又使用面向?qū)ο蟮姆绞綄?shí)現(xiàn)了最終的版本。

當(dāng)然,上面只是線程池實(shí)現(xiàn)的其中一種方式。并且很多C++相關(guān)的新特性也沒(méi)有提到,比如thread_local,這部分內(nèi)容還是需要你自己去探索了。






審核編輯:劉清

  • Linux系統(tǒng)
    +關(guān)注

    關(guān)注

    4

    文章

    593

    瀏覽量

    27397
  • C語(yǔ)言
    +關(guān)注

    關(guān)注

    180

    文章

    7604

    瀏覽量

    136839
  • 線程池
    +關(guān)注

    關(guān)注

    0

    文章

    57

    瀏覽量

    6847
  • for循環(huán)
    +關(guān)注

    關(guān)注

    0

    文章

    61

    瀏覽量

    2503

原文標(biāo)題:新特性深度探索:實(shí)現(xiàn)一個(gè)通用線程池

文章出處:【微信號(hào):CPP開(kāi)發(fā)者,微信公眾號(hào):CPP開(kāi)發(fā)者】歡迎添加關(guān)注!文章轉(zhuǎn)載請(qǐng)注明出處。

收藏 人收藏

    評(píng)論

    相關(guān)推薦

    C語(yǔ)言線程實(shí)現(xiàn)方案

    這是個(gè)簡(jiǎn)單小巧的C語(yǔ)言線程實(shí)現(xiàn),在 Github 上有 1.1K 的 star,很適合用來(lái)學(xué)
    的頭像 發(fā)表于 01-29 16:43 ?1540次閱讀

    Java中的線程包括哪些

    java.util.concurrent 包來(lái)實(shí)現(xiàn)的,最主要的就是 ThreadPoolExecutor 類。 Executor: 代表線程的接口,有
    的頭像 發(fā)表于 10-11 15:33 ?817次閱讀
    Java中的<b class='flag-5'>線程</b><b class='flag-5'>池</b>包括哪些

    線程是如何實(shí)現(xiàn)

    線程的概念是什么?線程是如何實(shí)現(xiàn)的?
    發(fā)表于 02-28 06:20

    《深入理解C++11C++11特性解析與應(yīng)用的詳細(xì)電子教材免費(fèi)下載

    國(guó)內(nèi)首本全面深入解讀 C++11 新標(biāo)準(zhǔn)的專著,由 C++ 標(biāo)準(zhǔn)委員會(huì)代表和 IBM XL 編譯器中國(guó)開(kāi)發(fā)團(tuán)隊(duì)共同撰寫(xiě)。不僅詳細(xì)闡述了 C++11 標(biāo)準(zhǔn)的設(shè)計(jì)原則,而且系統(tǒng)地講解了 C++11
    發(fā)表于 08-27 08:00 ?0次下載

    基于Nacos的簡(jiǎn)單動(dòng)態(tài)化線程實(shí)現(xiàn)

    本文以Nacos作為服務(wù)配置中心,以修改線程核心線程數(shù)、最大線程數(shù)為例,實(shí)現(xiàn)
    發(fā)表于 01-06 14:14 ?863次閱讀

    如何用C++實(shí)現(xiàn)個(gè)線程呢?

    C++線程種多線程管理模型,把線程分成任務(wù)執(zhí)行和線程
    發(fā)表于 06-08 14:53 ?1779次閱讀
    如何用<b class='flag-5'>C</b>++<b class='flag-5'>實(shí)現(xiàn)</b><b class='flag-5'>一</b><b class='flag-5'>個(gè)</b><b class='flag-5'>線程</b><b class='flag-5'>池</b>呢?

    細(xì)數(shù)線程的10個(gè)

    JDK開(kāi)發(fā)者提供了線程實(shí)現(xiàn)類,我們基于Executors組件,就可以快速創(chuàng)建個(gè)線程
    的頭像 發(fā)表于 06-16 10:11 ?728次閱讀
    細(xì)數(shù)<b class='flag-5'>線程</b><b class='flag-5'>池</b>的10<b class='flag-5'>個(gè)</b>坑

    線程的兩個(gè)思考

    今天還是說(shuō)一下線程的兩個(gè)思考。 池子 我們常用的線程, JDK的ThreadPoolExecutor. CompletableFutur
    的頭像 發(fā)表于 09-30 11:21 ?3106次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的兩<b class='flag-5'>個(gè)</b>思考

    Spring 的線程應(yīng)用

    我們?cè)谌粘i_(kāi)發(fā)中,經(jīng)常跟多線程打交道,Spring 為我們提供了個(gè)線程方便我們開(kāi)發(fā),它就是 ThreadPoolTaskExecutor
    的頭像 發(fā)表于 10-13 10:47 ?623次閱讀
    Spring 的<b class='flag-5'>線程</b><b class='flag-5'>池</b>應(yīng)用

    線程基本概念與原理

    、17、20等的新特性,簡(jiǎn)化了多線程編程的實(shí)現(xiàn)。 提高性能與資源利用率 線程主要解決兩個(gè)問(wèn)題:
    的頭像 發(fā)表于 11-10 10:24 ?537次閱讀

    線程的基本概念

    線程的基本概念 不管線程是什么東西!但是我們必須知道線程被搞出來(lái)的目的就是:提高程序執(zhí)行效
    的頭像 發(fā)表于 11-10 16:37 ?527次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的基本概念

    如何用C++11實(shí)現(xiàn)自旋鎖

    下面我會(huì)分析下自旋鎖,并代碼實(shí)現(xiàn)自旋鎖和互斥鎖的性能對(duì)比,以及利用C++11實(shí)現(xiàn)自旋鎖。 :自旋鎖(spin lock) 自旋鎖是
    的頭像 發(fā)表于 11-11 16:48 ?1442次閱讀
    如何用<b class='flag-5'>C++11</b><b class='flag-5'>實(shí)現(xiàn)</b>自旋鎖

    基于C++11線程實(shí)現(xiàn)

    C++11 加入了線程庫(kù),從此告別了標(biāo)準(zhǔn)庫(kù)不支持并發(fā)的歷史。然而 c++ 對(duì)于多線程的支持還是比較低級(jí),稍微高級(jí)點(diǎn)的用法都需要自己去
    的頭像 發(fā)表于 11-13 15:29 ?768次閱讀

    線程的創(chuàng)建方式有幾種

    線程種用于管理和調(diào)度線程的技術(shù),能夠有效地提高系統(tǒng)的性能和資源利用率。它通過(guò)預(yù)先創(chuàng)建線程
    的頭像 發(fā)表于 12-04 16:52 ?869次閱讀

    什么是動(dòng)態(tài)線程?動(dòng)態(tài)線程的簡(jiǎn)單實(shí)現(xiàn)思路

    因此,動(dòng)態(tài)可監(jiān)控線程種針對(duì)以上痛點(diǎn)開(kāi)發(fā)的線程管理工具。主要可實(shí)現(xiàn)功能有:提供對(duì) Sprin
    的頭像 發(fā)表于 02-28 10:42 ?645次閱讀