0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

線程池的應用

科技綠洲 ? 來源:Linux開發(fā)架構之路 ? 作者:Linux開發(fā)架構之路 ? 2023-11-10 11:07 ? 次閱讀

線程池的應用

在我認知中,任何網絡服務器都是一個死循環(huán)。這個死循環(huán)長下面這個樣子。

圖片

基本上服務器框架都是基于這個架構而不斷開發(fā)拓展的。

這個死循環(huán)總共分為四個步驟,可以涵蓋所有客戶端的需求,然而目前絕大多數(shù)企業(yè)不會用這樣的架構。

問題在于容易產生阻塞。

作為客戶端,我們當然希望訪問服務器的時候,能夠在短時間內收到回復,意味著自己連接上了該服務器。但是上述架構卻很容易產生響應延遲。

當某一個連接的2.3.4時間過長,也許是因為客戶上傳了很大的數(shù)據,也許是因為業(yè)務處理起來比較麻煩,需要計算很多東西,也許是客戶需要下載很大的東西,總之只要2,3,4的時間延遲,意味著下一個循環(huán)處理其它連接的動作也會被無限延遲。

也就是說,系統(tǒng)需要處理一個很慢的客戶端的連接,后面的所有連接,哪怕只是耗時很短的任務,都需要等這個很慢的任務完成才能進行。

所以除了像redis的服務器,數(shù)據庫都是基于hash的key-value結構,業(yè)務處理起來十分快速,才會將1,2,3,4都在一個線程中完成,其它的服務器若要提供千萬乃至億級別的客戶接入量,必須更快地處理客戶的連接,解除1和234之間的耦合,這才引入了多線程,我的主線程只負責1,然后將2,3,4分發(fā)到其它線程中執(zhí)行。

然而,如果服務器選擇這種多線程架構,當我們面臨著巨大的客戶端流量,則勢必需要頻繁地創(chuàng)建和銷毀線程,這個過程十分浪費系統(tǒng)資源,還容易造成系統(tǒng)崩潰,然后老板震驚,被迫畢業(yè),流落街頭,思之令人發(fā)笑。

解決辦法就是線程池。

我們預先創(chuàng)建好一系列線程,就好比后宮佳麗三千,然后皇上(線程池中樞)來了興致(收到任務),就去翻一個妃子(線程池中某個線程)的牌子。妃子(線程)解決完需求后,回到后宮(線程池),等待下一次召喚。

不用創(chuàng)建和銷毀,而是回收利用,所有池式結構都可以看做是一種對資源調度的緩沖,這就是線程池的精髓。

線程池設計

我們手撕線程池,目的還是搞懂基本原理,不弄太多花里胡哨的架構,比如工廠模式之類的。

當前這個版本的線程池是基于互斥鎖和條件變量實現(xiàn)的。

預告(畫餅):無鎖線程池后續(xù)也會手撕。

線程池總體上可以分為三大組件。

  • 任務隊列(存還沒有執(zhí)行的任務)
  • 執(zhí)行隊列(可以看成就是線程池,存放著可以用來執(zhí)行任務的線程)
  • 線程池管理中樞(負責封裝前兩個類,任務的分發(fā),線程池的創(chuàng)建,銷毀,等等。對外提供統(tǒng)一的接口

其工作流程大概如圖所示

圖片

任務隊列節(jié)點數(shù)據結構

//- 任務隊列元素
class TaskEle{
public:
void (*taskCallback)(void *arg);
string user_data;
void setFunc(void (*tcb)(void *arg)){
taskCallback = tcb;
}
};

任務隊列負責存還沒有執(zhí)行的業(yè)務,我們可以將每個業(yè)務都抽象成一個函數(shù),每個函數(shù)自然有可能需要參數(shù)。

所以任務隊列的節(jié)點需要兩個成員:

  • taskCallback:函數(shù)回調,執(zhí)行客戶端想要的業(yè)務。
  • user_data:函數(shù)參數(shù),包含客戶端的信息,比如socketfd等。

順便提供了一個接口,可以修改回調函數(shù)。

執(zhí)行隊列節(jié)點數(shù)據結構

//-執(zhí)行隊列元素
class ExecEle{
public:
pthread_t tid;
bool usable = true;
ThreadPool* pool;
static void* start(void* arg);
};
  • tid:每個節(jié)點都對應一個線程,所以需要一個id成員來存線程id,
  • usable:這個成員非常妙,它代表當前線程是否可用,默認為true,一旦設置為false,則該線程會結束。
  • 使用usable可以在最后銷毀線程池的時候,以一種優(yōu)雅的方式結束每個線程,而代替pthread_cancel這種強制銷毀線程的方式,因為你不知道線程中的任務是否處理完,強制銷毀就會使某些業(yè)務中斷。
  • pool:這個成員是指向中樞管理(后面會講)的指針,主要是為了在每個線程中通過pool獲取到一個全局(對于所有線程池線程共享)的互斥鎖和條件變量。
  • start:是線程池對象執(zhí)行的一個實現(xiàn)線程再回收利用的任務循環(huán)。具體實現(xiàn)代碼也是在后面會講。

線程池管理中樞設計

總體結構:

class ThreadPool{
public:
//-任務隊列和執(zhí)行隊列
deque task_queue;
deque exec_queue;
//-條件變量
pthread_cond_t cont;
//-互斥鎖
pthread_mutex_t mutex;
//-線程池大小
int thread_count;
//-構造函數(shù)
ThreadPool(int thread_count):thread_count(thread_count);
//-創(chuàng)建線程池
void createPool();
//-加入任務
void push_task(void(*tcb)(void* arg),int i);
//-利用析構銷毀線程池
~ThreadPool();
};*>*>

關于數(shù)據成員:

  • task_queue、exec_queue: 任務隊列和執(zhí)行隊列,我使用deque作為容器實現(xiàn)隊列。
  • cont:所有線程共享的條件變量。
  • mutex:所有線程共享的互斥鎖。
  • thread_count: 線程池創(chuàng)建的時候,初始大小

關于成員方法:

  • ThreadPool:構造函數(shù)
  • createPool:創(chuàng)建線程池
  • push_task: 給服務器主循環(huán)用的,給線程池添加任務。
  • ~ ThreadPool: 銷毀線程池,事實上應該單獨定義一個destroyPool的api,我這里為了簡便合并到析構中了。

ExecEle的start函數(shù)實現(xiàn)

現(xiàn)在對于ThreadPool對象有概念以后,可以先將剛剛執(zhí)行隊列節(jié)點ExecEle的start函數(shù)實現(xiàn),其代表了每個線程池的線程始終在跑的循環(huán),在無任務分配的時候阻塞在某個位置。

void* ExecEle::start(void*arg){
//-獲得執(zhí)行對象
ExecEle *ee = (ExecEle*)arg;
while(true){
//-加鎖
pthread_mutex_lock(&(ee->pool->mutex));
while(ee->pool->task_queue.empty()){//-如果任務隊列為空,等待新任務
if(!ee->usable){
break;
}
pthread_cond_wait(&ee->pool->cont, &ee->pool->mutex);
}
if(!ee -> usable){
pthread_mutex_unlock(&ee -> pool -> mutex);
break;
}
TaskEle *te = ee->pool->task_queue.front();
ee->pool->task_queue.pop_front();
//-解鎖
pthread_mutex_unlock(&(ee->pool -> mutex));
//-執(zhí)行任務回調
te->user_data+=to_string(pthread_self());
te->taskCallback(te);
}
//-刪除線程執(zhí)行對象
delete ee;
fprintf(stdout,"destroy thread %dn",pthread_self());
return NULL;
}

arg參數(shù)指向的是該線程函數(shù)對應的執(zhí)行元素ExecEle本身的指針,我們定義其為ee。然后進入死循環(huán),通過ee,我們可以獲得線程池中樞對象pool。

通過pool。我們可以獲得任務隊列的情況,當任務隊列為空,則線程進入阻塞狀態(tài),等待任務隊列有任務進來后,通過條件變量通知,再恢復執(zhí)行。

恢復執(zhí)行后,從任務隊列中取出隊首的任務,這個過程需要在mutex的范圍內,保證獨占性。

之后解除互斥鎖,開始執(zhí)行任務的回調。執(zhí)行完進行入下個循環(huán),嘗試再次獲得互斥鎖。

最后說一說usable,當我們銷毀線程池的時候,設置每一個線程的usable為false,那么不會立刻中斷每個線程正在執(zhí)行的回調,而是等回調結束后,在下一次循環(huán)中如果檢測到usable為false后,就會退出整個大循環(huán),并釋放自己的鎖,喚醒線程池其它休眠的線程。退出大循環(huán)后,線程自然而優(yōu)雅地結束。

之后是ThreadPool自己的api實現(xiàn)

構造函數(shù)ThreadPool實現(xiàn):

ThreadPool(int thread_count):thread_count(thread_count){
//-初始化條件變量和互斥鎖
pthread_cond_init(&cont,NULL);
pthread_mutex_init(&mutex,NULL);
}

主要為了初始化cont,mutex,thread_count。

創(chuàng)建線程池createPool實現(xiàn):

void createPool(){
int ret;
//-初始執(zhí)行隊列
for(int i = 0;i ExecEle *ee = new ExecEle;
ee->pool = const_cast(this);
if(ret = pthread_create(&(ee->tid),NULL,ee->start,ee)){
delete ee;
ERR_EXIT_THREAD(ret,"pthread_create");
}
fprintf(stdout,"create thread %dn",i);
exec_queue.push_back(ee);
}
fprintf(stdout,"create pool finish...n");
}*>;++i){

通過pthread_create創(chuàng)建thread_count個線程,每個線程執(zhí)行自己的start函數(shù)進入等待任務循環(huán),并阻塞在鎖和條件變量的位置。將exec對象push進執(zhí)行隊列。

添加任務 push_task實現(xiàn):

void push_task(void(*tcb)(void* arg),int i){
TaskEle *te = new TaskEle;
te->setFunc(tcb);
te->user_data = "Task "+to_string(i)+" run in thread ";
//-加鎖
pthread_mutex_lock(&mutex);
task_queue.push_back(te);
//-通知執(zhí)行隊列中的一個進行任務
pthread_cond_signal(&cont);
//-解鎖
pthread_mutex_unlock(&mutex);
}

主要功能是構造TaskEle對象并加入到執(zhí)行隊列中。

每個TaskEle可能需要執(zhí)行不同的業(yè)務,所以push_task需要傳入對應業(yè)務的回調tcb(task callback)

i是我加的額外參數(shù),代表主線程中連接的客戶端編號,其意義可以是socketfd。

注意在修改執(zhí)行隊列(push)的時候,需要加鎖保證獨占。

銷毀線程池~ ThreadPool 實現(xiàn):

~ThreadPool() {
for(int i = 0;i exec_queue[i]->usable = false;
}
pthread_mutex_lock(&mutex);
//-清空任務隊列
task_queue.clear();
//-廣播給每個執(zhí)行線程令其退出(執(zhí)行線程破開循環(huán)會free掉堆內存)
pthread_cond_broadcast(&cont);
pthread_mutex_unlock(&mutex);//-讓其他線程拿到鎖
//-等待所有線程退出
for(int i = 0;i pthread_join(exec_queue[i] -> tid,NULL);
}
//-清空執(zhí)行隊列
exec_queue.clear();
//-銷毀鎖和條件變量
pthread_cond_destroy(&cont);
pthread_mutex_destroy(&mutex);
}();>();++i){

先將所有線程的usable設置為false,之后加鎖,清空任務隊列,并通過條件變量通知所有線程,等所有線程退出后,銷毀執(zhí)行隊列,銷毀鎖和條件變量。

業(yè)務代碼和服務器主循環(huán)

//-線程執(zhí)行的業(yè)務函數(shù)
void execFunc(void* arg){
TaskEle *te =(TaskEle*)arg;
fprintf(stdout, "%sn",te->user_data.c_str());
};

int main(){
//-創(chuàng)建線程池
ThreadPool pool(100);
pool.createPool();
//-創(chuàng)建任務
for(int i =0;i<1000;++i){
pool.push_task(&execFunc,i);
}
exit(EXIT_SUCCESS);
}

隨便寫的線程執(zhí)行的業(yè)務,打印一下客戶信息。

主線程創(chuàng)建100大小的線程池,并添加1000個任務(連接)。

完整代碼

// *C++和posix接口實現(xiàn)一個線程池
//-三個組件:任務隊列,執(zhí)行隊列,線程池(中樞管理)

#include
#include
#include
#include
#include
#include
#include
#include
#include

using namespace std;

//-打印線程錯誤專用,根據err來識別錯誤信息
static inline void ERR_EXIT_THREAD(int err, const char * msg){
fprintf(stderr,"%s:%sn",strerror(err),msg);
exit(EXIT_FAILURE);
}

class ThreadPool;//-聲明

//- 任務隊列元素
class TaskEle{
public:
void (*taskCallback)(void *arg);
string user_data;
void setFunc(void (*tcb)(void *arg)){
taskCallback = tcb;
}
};

//-執(zhí)行隊列元素
class ExecEle{
public:
pthread_t tid;
bool usable = true;
ThreadPool* pool;
static void* start(void* arg);
};

//-線程池
class ThreadPool{
public:
//-任務隊列和執(zhí)行隊列
deque task_queue;
deque exec_queue;
//-條件變量
pthread_cond_t cont;
//-互斥鎖
pthread_mutex_t mutex;
//-線程池大小
int thread_count;
//-構造函數(shù)
ThreadPool(int thread_count):thread_count(thread_count){
//-初始化條件變量和互斥鎖
pthread_cond_init(&cont,NULL);
pthread_mutex_init(&mutex,NULL);
}
void createPool(){
int ret;
//-初始執(zhí)行隊列
for(int i = 0;i ExecEle *ee = new ExecEle;
ee->pool = const_cast(this);
if(ret = pthread_create(&(ee->tid),NULL,ee->start,ee)){
delete ee;
ERR_EXIT_THREAD(ret,"pthread_create");
}
fprintf(stdout,"create thread %dn",i);
exec_queue.push_back(ee);
}
fprintf(stdout,"create pool finish...n");
}
//-加入任務
void push_task(void(*tcb)(void* arg),int i){
TaskEle *te = new TaskEle;
te->setFunc(tcb);
te->user_data = "Task "+to_string(i)+" run in thread ";
//-加鎖
pthread_mutex_lock(&mutex);
task_queue.push_back(te);
//-通知執(zhí)行隊列中的一個進行任務
pthread_cond_signal(&cont);
//-解鎖
pthread_mutex_unlock(&mutex);

}
//-銷毀線程池
~ThreadPool() {
for(int i = 0;i exec_queue[i]->usable = false;
}
pthread_mutex_lock(&mutex);
//-清空任務隊列
task_queue.clear();
//-廣播給每個執(zhí)行線程令其退出(執(zhí)行線程破開循環(huán)會free掉堆內存)
pthread_cond_broadcast(&cont);
pthread_mutex_unlock(&mutex);//-讓其他線程拿到鎖
//-等待所有線程退出
for(int i = 0;i pthread_join(exec_queue[i] -> tid,NULL);
}
//-清空執(zhí)行隊列
exec_queue.clear();
//-銷毀鎖和條件變量
pthread_cond_destroy(&cont);
pthread_mutex_destroy(&mutex);
}
};

void* ExecEle::start(void*arg){
//-獲得執(zhí)行對象
ExecEle *ee = (ExecEle*)arg;
while(true){
//-加鎖
pthread_mutex_lock(&(ee->pool->mutex));
while(ee->pool->task_queue.empty()){//-如果任務隊列為空,等待新任務
if(!ee->usable){
break;
}
pthread_cond_wait(&ee->pool->cont, &ee->pool->mutex);
}
if(!ee -> usable){
pthread_mutex_unlock(&ee -> pool -> mutex);
break;
}
TaskEle *te = ee->pool->task_queue.front();
ee->pool->task_queue.pop_front();
//-解鎖
pthread_mutex_unlock(&(ee->pool -> mutex));
//-執(zhí)行任務回調
te->user_data+=to_string(pthread_self());
te->taskCallback(te);
}
//-刪除線程執(zhí)行對象
delete ee;
fprintf(stdout,"destroy thread %dn",pthread_self());
return NULL;
}


//-線程執(zhí)行的業(yè)務函數(shù)
void execFunc(void* arg){
TaskEle *te =(TaskEle*)arg;
fprintf(stdout, "%sn",te->user_data.c_str());
};

int main(){
//-創(chuàng)建線程池
ThreadPool pool(100);
pool.createPool();
//-創(chuàng)建任務
for(int i =0;i<1000;++i){
pool.push_task(&execFunc,i);
}
exit(EXIT_SUCCESS);
}();>();++i){
*>;++i){
*>*>
聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 服務器
    +關注

    關注

    12

    文章

    9251

    瀏覽量

    85739
  • 數(shù)據庫
    +關注

    關注

    7

    文章

    3838

    瀏覽量

    64541
  • 線程池
    +關注

    關注

    0

    文章

    57

    瀏覽量

    6868
收藏 人收藏

    評論

    相關推薦

    Java中的線程包括哪些

    線程是用來統(tǒng)一管理線程的,在 Java 中創(chuàng)建和銷毀線程都是一件消耗資源的事情,線程可以重復
    的頭像 發(fā)表于 10-11 15:33 ?834次閱讀
    Java中的<b class='flag-5'>線程</b><b class='flag-5'>池</b>包括哪些

    線程是如何實現(xiàn)的

    線程的概念是什么?線程是如何實現(xiàn)的?
    發(fā)表于 02-28 06:20

    基于線程技術集群接入點的應用研究

    本文在深入研究高級線程技術的基礎上,分析、研究了固定線程數(shù)目的線程線程數(shù)目動態(tài)變化的
    發(fā)表于 01-22 14:21 ?5次下載

    如何正確關閉線程

    前言本章分為兩個議題 如何正確關閉線程 shutdown 和 shutdownNow 的區(qū)別 項目環(huán)境jdk 1.8 github 地址:https://github.com
    的頭像 發(fā)表于 09-29 14:41 ?9960次閱讀

    基于Nacos的簡單動態(tài)化線程實現(xiàn)

    本文以Nacos作為服務配置中心,以修改線程核心線程數(shù)、最大線程數(shù)為例,實現(xiàn)一個簡單的動態(tài)化線程
    發(fā)表于 01-06 14:14 ?874次閱讀

    Java線程核心原理

    看過Java線程源碼的小伙伴都知道,在Java線程池中最核心的類就是ThreadPoolExecutor,
    的頭像 發(fā)表于 04-21 10:24 ?878次閱讀

    細數(shù)線程的10個坑

    JDK開發(fā)者提供了線程的實現(xiàn)類,我們基于Executors組件,就可以快速創(chuàng)建一個線程 。
    的頭像 發(fā)表于 06-16 10:11 ?736次閱讀
    細數(shù)<b class='flag-5'>線程</b><b class='flag-5'>池</b>的10個坑

    線程的兩個思考

    今天還是說一下線程的兩個思考。 池子 我們常用的線程, JDK的ThreadPoolExecutor. CompletableFutures 默認使用了
    的頭像 發(fā)表于 09-30 11:21 ?3118次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的兩個思考

    Spring 的線程應用

    我們在日常開發(fā)中,經常跟多線程打交道,Spring 為我們提供了一個線程方便我們開發(fā),它就是 ThreadPoolTaskExecutor ,接下來我們就來聊聊 Spring 的線程
    的頭像 發(fā)表于 10-13 10:47 ?637次閱讀
    Spring 的<b class='flag-5'>線程</b><b class='flag-5'>池</b>應用

    線程基本概念與原理

    一、線程基本概念與原理 1.1 線程概念及優(yōu)勢 C++線程簡介
    的頭像 發(fā)表于 11-10 10:24 ?550次閱讀

    線程的基本概念

    線程的基本概念 不管線程是什么東西!但是我們必須知道線程被搞出來的目的就是:提高程序執(zhí)行效
    的頭像 發(fā)表于 11-10 16:37 ?544次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>的基本概念

    線程三大核心參數(shù)的含義 線程核心線程數(shù)制定策略

    以上考點作為線程面試幾乎必問的內容,大部分人應該都是如數(shù)家珍,張口就來,但是懂了面試八股文真的就不一定在實際運用中真的就會把線程用好 。
    的頭像 發(fā)表于 12-01 10:20 ?1100次閱讀
    <b class='flag-5'>線程</b><b class='flag-5'>池</b>三大核心參數(shù)的含義 <b class='flag-5'>線程</b><b class='flag-5'>池</b>核心<b class='flag-5'>線程</b>數(shù)制定策略

    線程七大核心參數(shù)執(zhí)行順序

    線程是一種用于管理和調度線程執(zhí)行的技術,通過將任務分配到線程池中的線程進行處理,可以有效地控制并發(fā)線程
    的頭像 發(fā)表于 12-04 16:45 ?1107次閱讀

    線程的創(chuàng)建方式有幾種

    線程是一種用于管理和調度線程的技術,能夠有效地提高系統(tǒng)的性能和資源利用率。它通過預先創(chuàng)建一組線程并維護一個工作隊列,將任務提交給線程
    的頭像 發(fā)表于 12-04 16:52 ?901次閱讀

    什么是動態(tài)線程?動態(tài)線程的簡單實現(xiàn)思路

    因此,動態(tài)可監(jiān)控線程一種針對以上痛點開發(fā)的線程管理工具。主要可實現(xiàn)功能有:提供對 Spring 應用內線程
    的頭像 發(fā)表于 02-28 10:42 ?672次閱讀