線程池的應用
在我認知中,任何網絡服務器都是一個死循環(huán)。這個死循環(huán)長下面這個樣子。
基本上服務器框架都是基于這個架構而不斷開發(fā)拓展的。
這個死循環(huán)總共分為四個步驟,可以涵蓋所有客戶端的需求,然而目前絕大多數(shù)企業(yè)不會用這樣的架構。
問題在于容易產生阻塞。
作為客戶端,我們當然希望訪問服務器的時候,能夠在短時間內收到回復,意味著自己連接上了該服務器。但是上述架構卻很容易產生響應延遲。
當某一個連接的2.3.4時間過長,也許是因為客戶上傳了很大的數(shù)據,也許是因為業(yè)務處理起來比較麻煩,需要計算很多東西,也許是客戶需要下載很大的東西,總之只要2,3,4的時間延遲,意味著下一個循環(huán)處理其它連接的動作也會被無限延遲。
也就是說,系統(tǒng)需要處理一個很慢的客戶端的連接,后面的所有連接,哪怕只是耗時很短的任務,都需要等這個很慢的任務完成才能進行。
所以除了像redis的服務器,數(shù)據庫都是基于hash的key-value結構,業(yè)務處理起來十分快速,才會將1,2,3,4都在一個線程中完成,其它的服務器若要提供千萬乃至億級別的客戶接入量,必須更快地處理客戶的連接,解除1和234之間的耦合,這才引入了多線程,我的主線程只負責1,然后將2,3,4分發(fā)到其它線程中執(zhí)行。
然而,如果服務器選擇這種多線程架構,當我們面臨著巨大的客戶端流量,則勢必需要頻繁地創(chuàng)建和銷毀線程,這個過程十分浪費系統(tǒng)資源,還容易造成系統(tǒng)崩潰,然后老板震驚,被迫畢業(yè),流落街頭,思之令人發(fā)笑。
解決辦法就是線程池。
我們預先創(chuàng)建好一系列線程,就好比后宮佳麗三千,然后皇上(線程池中樞)來了興致(收到任務),就去翻一個妃子(線程池中某個線程)的牌子。妃子(線程)解決完需求后,回到后宮(線程池),等待下一次召喚。
不用創(chuàng)建和銷毀,而是回收利用,所有池式結構都可以看做是一種對資源調度的緩沖,這就是線程池的精髓。
線程池設計
我們手撕線程池,目的還是搞懂基本原理,不弄太多花里胡哨的架構,比如工廠模式之類的。
當前這個版本的線程池是基于互斥鎖和條件變量實現(xiàn)的。
預告(畫餅):無鎖線程池后續(xù)也會手撕。
線程池總體上可以分為三大組件。
- 任務隊列(存還沒有執(zhí)行的任務)
- 執(zhí)行隊列(可以看成就是線程池,存放著可以用來執(zhí)行任務的線程)
- 線程池管理中樞(負責封裝前兩個類,任務的分發(fā),線程池的創(chuàng)建,銷毀,等等。對外提供統(tǒng)一的接口)
其工作流程大概如圖所示
任務隊列節(jié)點數(shù)據結構
class TaskEle{
public:
void (*taskCallback)(void *arg);
string user_data;
void setFunc(void (*tcb)(void *arg)){
taskCallback = tcb;
}
};
任務隊列負責存還沒有執(zhí)行的業(yè)務,我們可以將每個業(yè)務都抽象成一個函數(shù),每個函數(shù)自然有可能需要參數(shù)。
所以任務隊列的節(jié)點需要兩個成員:
- taskCallback:函數(shù)回調,執(zhí)行客戶端想要的業(yè)務。
- user_data:函數(shù)參數(shù),包含客戶端的信息,比如socketfd等。
順便提供了一個接口,可以修改回調函數(shù)。
執(zhí)行隊列節(jié)點數(shù)據結構
class ExecEle{
public:
pthread_t tid;
bool usable = true;
ThreadPool* pool;
static void* start(void* arg);
};
- tid:每個節(jié)點都對應一個線程,所以需要一個id成員來存線程id,
- usable:這個成員非常妙,它代表當前線程是否可用,默認為true,一旦設置為false,則該線程會結束。
- 使用usable可以在最后銷毀線程池的時候,以一種優(yōu)雅的方式結束每個線程,而代替pthread_cancel這種強制銷毀線程的方式,因為你不知道線程中的任務是否處理完,強制銷毀就會使某些業(yè)務中斷。
- pool:這個成員是指向中樞管理(后面會講)的指針,主要是為了在每個線程中通過pool獲取到一個全局(對于所有線程池線程共享)的互斥鎖和條件變量。
- start:是線程池對象執(zhí)行的一個實現(xiàn)線程再回收利用的任務循環(huán)。具體實現(xiàn)代碼也是在后面會講。
線程池管理中樞設計
總體結構:
public:
//-任務隊列和執(zhí)行隊列
deque task_queue;
deque exec_queue;
//-條件變量
pthread_cond_t cont;
//-互斥鎖
pthread_mutex_t mutex;
//-線程池大小
int thread_count;
//-構造函數(shù)
ThreadPool(int thread_count):thread_count(thread_count);
//-創(chuàng)建線程池
void createPool();
//-加入任務
void push_task(void(*tcb)(void* arg),int i);
//-利用析構銷毀線程池
~ThreadPool();
};*>*>
關于數(shù)據成員:
- task_queue、exec_queue: 任務隊列和執(zhí)行隊列,我使用deque作為容器實現(xiàn)隊列。
- cont:所有線程共享的條件變量。
- mutex:所有線程共享的互斥鎖。
- thread_count: 線程池創(chuàng)建的時候,初始大小
關于成員方法:
- ThreadPool:構造函數(shù)
- createPool:創(chuàng)建線程池
- push_task: 給服務器主循環(huán)用的,給線程池添加任務。
- ~ ThreadPool: 銷毀線程池,事實上應該單獨定義一個destroyPool的api,我這里為了簡便合并到析構中了。
ExecEle的start函數(shù)實現(xiàn)
現(xiàn)在對于ThreadPool對象有概念以后,可以先將剛剛執(zhí)行隊列節(jié)點ExecEle的start函數(shù)實現(xiàn),其代表了每個線程池的線程始終在跑的循環(huán),在無任務分配的時候阻塞在某個位置。
//-獲得執(zhí)行對象
ExecEle *ee = (ExecEle*)arg;
while(true){
//-加鎖
pthread_mutex_lock(&(ee->pool->mutex));
while(ee->pool->task_queue.empty()){//-如果任務隊列為空,等待新任務
if(!ee->usable){
break;
}
pthread_cond_wait(&ee->pool->cont, &ee->pool->mutex);
}
if(!ee -> usable){
pthread_mutex_unlock(&ee -> pool -> mutex);
break;
}
TaskEle *te = ee->pool->task_queue.front();
ee->pool->task_queue.pop_front();
//-解鎖
pthread_mutex_unlock(&(ee->pool -> mutex));
//-執(zhí)行任務回調
te->user_data+=to_string(pthread_self());
te->taskCallback(te);
}
//-刪除線程執(zhí)行對象
delete ee;
fprintf(stdout,"destroy thread %dn",pthread_self());
return NULL;
}
arg參數(shù)指向的是該線程函數(shù)對應的執(zhí)行元素ExecEle本身的指針,我們定義其為ee。然后進入死循環(huán),通過ee,我們可以獲得線程池中樞對象pool。
通過pool。我們可以獲得任務隊列的情況,當任務隊列為空,則線程進入阻塞狀態(tài),等待任務隊列有任務進來后,通過條件變量通知,再恢復執(zhí)行。
恢復執(zhí)行后,從任務隊列中取出隊首的任務,這個過程需要在mutex的范圍內,保證獨占性。
之后解除互斥鎖,開始執(zhí)行任務的回調。執(zhí)行完進行入下個循環(huán),嘗試再次獲得互斥鎖。
最后說一說usable,當我們銷毀線程池的時候,設置每一個線程的usable為false,那么不會立刻中斷每個線程正在執(zhí)行的回調,而是等回調結束后,在下一次循環(huán)中如果檢測到usable為false后,就會退出整個大循環(huán),并釋放自己的鎖,喚醒線程池其它休眠的線程。退出大循環(huán)后,線程自然而優(yōu)雅地結束。
之后是ThreadPool自己的api實現(xiàn)
構造函數(shù)ThreadPool實現(xiàn):
//-初始化條件變量和互斥鎖
pthread_cond_init(&cont,NULL);
pthread_mutex_init(&mutex,NULL);
}
主要為了初始化cont,mutex,thread_count。
創(chuàng)建線程池createPool實現(xiàn):
int ret;
//-初始執(zhí)行隊列
for(int i = 0;i ExecEle *ee = new ExecEle;
ee->pool = const_cast(this);
if(ret = pthread_create(&(ee->tid),NULL,ee->start,ee)){
delete ee;
ERR_EXIT_THREAD(ret,"pthread_create");
}
fprintf(stdout,"create thread %dn",i);
exec_queue.push_back(ee);
}
fprintf(stdout,"create pool finish...n");
}*>;++i){
通過pthread_create創(chuàng)建thread_count個線程,每個線程執(zhí)行自己的start函數(shù)進入等待任務循環(huán),并阻塞在鎖和條件變量的位置。將exec對象push進執(zhí)行隊列。
添加任務 push_task實現(xiàn):
TaskEle *te = new TaskEle;
te->setFunc(tcb);
te->user_data = "Task "+to_string(i)+" run in thread ";
//-加鎖
pthread_mutex_lock(&mutex);
task_queue.push_back(te);
//-通知執(zhí)行隊列中的一個進行任務
pthread_cond_signal(&cont);
//-解鎖
pthread_mutex_unlock(&mutex);
}
主要功能是構造TaskEle對象并加入到執(zhí)行隊列中。
每個TaskEle可能需要執(zhí)行不同的業(yè)務,所以push_task需要傳入對應業(yè)務的回調tcb(task callback)
i是我加的額外參數(shù),代表主線程中連接的客戶端編號,其意義可以是socketfd。
注意在修改執(zhí)行隊列(push)的時候,需要加鎖保證獨占。
銷毀線程池~ ThreadPool 實現(xiàn):
for(int i = 0;i exec_queue[i]->usable = false;
}
pthread_mutex_lock(&mutex);
//-清空任務隊列
task_queue.clear();
//-廣播給每個執(zhí)行線程令其退出(執(zhí)行線程破開循環(huán)會free掉堆內存)
pthread_cond_broadcast(&cont);
pthread_mutex_unlock(&mutex);//-讓其他線程拿到鎖
//-等待所有線程退出
for(int i = 0;i pthread_join(exec_queue[i] -> tid,NULL);
}
//-清空執(zhí)行隊列
exec_queue.clear();
//-銷毀鎖和條件變量
pthread_cond_destroy(&cont);
pthread_mutex_destroy(&mutex);
}();>();++i){
先將所有線程的usable設置為false,之后加鎖,清空任務隊列,并通過條件變量通知所有線程,等所有線程退出后,銷毀執(zhí)行隊列,銷毀鎖和條件變量。
業(yè)務代碼和服務器主循環(huán)
void execFunc(void* arg){
TaskEle *te =(TaskEle*)arg;
fprintf(stdout, "%sn",te->user_data.c_str());
};
int main(){
//-創(chuàng)建線程池
ThreadPool pool(100);
pool.createPool();
//-創(chuàng)建任務
for(int i =0;i<1000;++i){
pool.push_task(&execFunc,i);
}
exit(EXIT_SUCCESS);
}
隨便寫的線程執(zhí)行的業(yè)務,打印一下客戶信息。
主線程創(chuàng)建100大小的線程池,并添加1000個任務(連接)。
完整代碼
//-三個組件:任務隊列,執(zhí)行隊列,線程池(中樞管理)
#include
#include
#include
#include
#include
#include
#include
#include
#include
using namespace std;
//-打印線程錯誤專用,根據err來識別錯誤信息
static inline void ERR_EXIT_THREAD(int err, const char * msg){
fprintf(stderr,"%s:%sn",strerror(err),msg);
exit(EXIT_FAILURE);
}
class ThreadPool;//-聲明
//- 任務隊列元素
class TaskEle{
public:
void (*taskCallback)(void *arg);
string user_data;
void setFunc(void (*tcb)(void *arg)){
taskCallback = tcb;
}
};
//-執(zhí)行隊列元素
class ExecEle{
public:
pthread_t tid;
bool usable = true;
ThreadPool* pool;
static void* start(void* arg);
};
//-線程池
class ThreadPool{
public:
//-任務隊列和執(zhí)行隊列
deque task_queue;
deque exec_queue;
//-條件變量
pthread_cond_t cont;
//-互斥鎖
pthread_mutex_t mutex;
//-線程池大小
int thread_count;
//-構造函數(shù)
ThreadPool(int thread_count):thread_count(thread_count){
//-初始化條件變量和互斥鎖
pthread_cond_init(&cont,NULL);
pthread_mutex_init(&mutex,NULL);
}
void createPool(){
int ret;
//-初始執(zhí)行隊列
for(int i = 0;i ExecEle *ee = new ExecEle;
ee->pool = const_cast(this);
if(ret = pthread_create(&(ee->tid),NULL,ee->start,ee)){
delete ee;
ERR_EXIT_THREAD(ret,"pthread_create");
}
fprintf(stdout,"create thread %dn",i);
exec_queue.push_back(ee);
}
fprintf(stdout,"create pool finish...n");
}
//-加入任務
void push_task(void(*tcb)(void* arg),int i){
TaskEle *te = new TaskEle;
te->setFunc(tcb);
te->user_data = "Task "+to_string(i)+" run in thread ";
//-加鎖
pthread_mutex_lock(&mutex);
task_queue.push_back(te);
//-通知執(zhí)行隊列中的一個進行任務
pthread_cond_signal(&cont);
//-解鎖
pthread_mutex_unlock(&mutex);
}
//-銷毀線程池
~ThreadPool() {
for(int i = 0;i exec_queue[i]->usable = false;
}
pthread_mutex_lock(&mutex);
//-清空任務隊列
task_queue.clear();
//-廣播給每個執(zhí)行線程令其退出(執(zhí)行線程破開循環(huán)會free掉堆內存)
pthread_cond_broadcast(&cont);
pthread_mutex_unlock(&mutex);//-讓其他線程拿到鎖
//-等待所有線程退出
for(int i = 0;i pthread_join(exec_queue[i] -> tid,NULL);
}
//-清空執(zhí)行隊列
exec_queue.clear();
//-銷毀鎖和條件變量
pthread_cond_destroy(&cont);
pthread_mutex_destroy(&mutex);
}
};
void* ExecEle::start(void*arg){
//-獲得執(zhí)行對象
ExecEle *ee = (ExecEle*)arg;
while(true){
//-加鎖
pthread_mutex_lock(&(ee->pool->mutex));
while(ee->pool->task_queue.empty()){//-如果任務隊列為空,等待新任務
if(!ee->usable){
break;
}
pthread_cond_wait(&ee->pool->cont, &ee->pool->mutex);
}
if(!ee -> usable){
pthread_mutex_unlock(&ee -> pool -> mutex);
break;
}
TaskEle *te = ee->pool->task_queue.front();
ee->pool->task_queue.pop_front();
//-解鎖
pthread_mutex_unlock(&(ee->pool -> mutex));
//-執(zhí)行任務回調
te->user_data+=to_string(pthread_self());
te->taskCallback(te);
}
//-刪除線程執(zhí)行對象
delete ee;
fprintf(stdout,"destroy thread %dn",pthread_self());
return NULL;
}
//-線程執(zhí)行的業(yè)務函數(shù)
void execFunc(void* arg){
TaskEle *te =(TaskEle*)arg;
fprintf(stdout, "%sn",te->user_data.c_str());
};
int main(){
//-創(chuàng)建線程池
ThreadPool pool(100);
pool.createPool();
//-創(chuàng)建任務
for(int i =0;i<1000;++i){
pool.push_task(&execFunc,i);
}
exit(EXIT_SUCCESS);
}();>();++i){
*>;++i){
*>*>
-
服務器
+關注
關注
12文章
9251瀏覽量
85739 -
數(shù)據庫
+關注
關注
7文章
3838瀏覽量
64541 -
線程池
+關注
關注
0文章
57瀏覽量
6868
發(fā)布評論請先 登錄
相關推薦
評論