問(wèn)題產(chǎn)生
- 無(wú)論是Linux,RTOS,還是Android等開(kāi)發(fā),我們都會(huì)用到多線程編程;但是往往很多人在編程時(shí),都很隨意的創(chuàng)建/銷毀線程的策略來(lái)實(shí)現(xiàn)多線程編程;很明顯這是不合理的做法,線程的創(chuàng)建/銷毀代價(jià)是很高的。那么我們要怎么去設(shè)計(jì)多線程編程呢???答案:對(duì)于長(zhǎng)駐的線程,我們可以創(chuàng)建獨(dú)立的線程去執(zhí)行。但是非長(zhǎng)駐的線程,我們可以通過(guò)線程池的方式來(lái)處理這些線程。
線程池概述
-
線程池,它是一種多線程處理形式,處理過(guò)程中將任務(wù)添加到隊(duì)列,然后在創(chuàng)建線程后自動(dòng)啟動(dòng)這些任務(wù)。線程池線程都是后臺(tái)線程。每個(gè)線程都使用默認(rèn)的堆棧大小,以默認(rèn)的優(yōu)先級(jí)運(yùn)行,并處于多線程單元中。如果某個(gè)線程在托管代碼中空閑(如正在等待某個(gè)事件),則線程池將插入另一個(gè)輔助線程來(lái)使所有處理器保持繁忙。如果所有線程池線程都始終保持繁忙,但隊(duì)列中包含掛起的工作,則線程池將在一段時(shí)間后創(chuàng)建另一個(gè)輔助線程但線程的數(shù)目永遠(yuǎn)不會(huì)超過(guò)最大值。超過(guò)最大值的線程可以排隊(duì),但他們要等到其他線程完成后才啟動(dòng)。
-
在一個(gè)系統(tǒng)中,線程數(shù)過(guò)多會(huì)帶來(lái)調(diào)度開(kāi)銷,進(jìn)而影響緩存局部性和整體性能。而線程池維護(hù)著多個(gè)線程,等待著監(jiān)督管理者分配可并發(fā)執(zhí)行的任務(wù)。這避免了在處理短時(shí)間任務(wù)時(shí)創(chuàng)建與銷毀線程的代價(jià)。線程池不僅能夠保證內(nèi)核的充分利用,還能防止過(guò)分調(diào)度??捎镁€程數(shù)量應(yīng)該取決于可用的并發(fā)處理器、處理器內(nèi)核、內(nèi)存、網(wǎng)絡(luò)sockets等的數(shù)量。線程數(shù)過(guò)多會(huì)導(dǎo)致額外的線程切換開(kāi)銷。
-
線程的創(chuàng)建-銷毀對(duì)系統(tǒng)性能影響很大:
- 創(chuàng)建太多線程,將會(huì)浪費(fèi)一定的資源,有些線程未被充分使用。
- 銷毀太多線程,將導(dǎo)致之后浪費(fèi)時(shí)間再次創(chuàng)建它們。
- 創(chuàng)建線程太慢,將會(huì)導(dǎo)致長(zhǎng)時(shí)間的等待,性能變差。
- 銷毀線程太慢,導(dǎo)致其它線程資源饑餓
-
線程池的應(yīng)用場(chǎng)景:
- 單位時(shí)間內(nèi)處理的任務(wù)頻繁,且任務(wù)時(shí)間較短;
- 對(duì)實(shí)時(shí)性要求較高。如果接收到任務(wù)之后再創(chuàng)建線程,可能無(wú)法滿足實(shí)時(shí)性的要求,此時(shí)必須使用線程池;
- 必須經(jīng)常面對(duì)高突發(fā)性事件。比如 Web 服務(wù)器。如果有足球轉(zhuǎn)播,則服務(wù)器將產(chǎn)生巨大沖擊,此時(shí)使用傳統(tǒng)方法,則必須不停的大量創(chuàng)建、銷毀線程。此時(shí)采用動(dòng)態(tài)線程池可以避免這種情況的發(fā)生。
-
線程池的應(yīng)用例子:
- EventBus:它是Android的一個(gè)事件發(fā)布/訂閱輕量級(jí)框架。其中事件的異步發(fā)布就采用了線程池機(jī)制。
- Samgr:它是OpenHarmony的一個(gè)服務(wù)管理組件,解決多服務(wù)的管理的策略,減低了線程的創(chuàng)建開(kāi)銷。
-
作者最近在開(kāi)發(fā)的過(guò)程中,也遇到多線程編程問(wèn)題,跨平臺(tái),并發(fā)任務(wù)多,執(zhí)行周期短。如果按照以往的反復(fù)的創(chuàng)建/銷毀線程,顯然不是一個(gè)很好的軟件設(shè)計(jì)。我們需要利用線程池的方式來(lái)解決我們問(wèn)題。
TP(Thread Pool)組件
TP組件,又稱線程池組件。是作者編寫一個(gè)多線程管理組件,特點(diǎn):
- 跨平臺(tái):它支持任意的RTOS系統(tǒng),Linux系統(tǒng)。
- 易移植:該組件默認(rèn)支持CMSIS和POSIX接口,其他RTOS可以輕易適配兼容。
- 接口簡(jiǎn)單:用戶操作接口簡(jiǎn)單,只有三個(gè)接口:創(chuàng)建線程池,增加task到線程池,銷毀線程池。
TP原理
- ① 創(chuàng)建一個(gè)線程池,線程池中維護(hù)一個(gè)Task隊(duì)列,用于Task任務(wù);理論上:線程池中線程數(shù)目至少一個(gè),最多無(wú)數(shù)個(gè),但是我們要系統(tǒng)能力決定。
- ② 應(yīng)用層根據(jù)業(yè)務(wù)需求,創(chuàng)建對(duì)應(yīng)Task,Task數(shù)目不限制,根據(jù)系統(tǒng)資源創(chuàng)建。
- ③ 應(yīng)用層創(chuàng)建的Task,會(huì)被掛在Task隊(duì)列中。
- ④ 線程池的空閑線程,會(huì)檢測(cè)Task隊(duì)列中是否為空,如果Task隊(duì)列不為空,則提取一個(gè)Task在線程中執(zhí)行。
TP實(shí)現(xiàn)
適配層實(shí)現(xiàn)
為了實(shí)現(xiàn)跨平臺(tái),需要將差異性接口抽象出來(lái),我們整個(gè)組件需要抽象幾個(gè)內(nèi)容:①日志接口;②內(nèi)存管理接口;③ 線程接口;④互斥量接口;⑤信號(hào)量接口。以CMSIS接口為例的實(shí)現(xiàn):
- 錯(cuò)誤碼:提供了四種錯(cuò)誤碼:無(wú)錯(cuò)誤,錯(cuò)誤,內(nèi)存不足,無(wú)效參數(shù)。
typedefenum{
TP_EOK=0,//Thereisnoerror
TP_ERROR,//Agenericerrorhappens
TP_ENOMEM,//Nomemory
TP_EINVAL,//Invalidargument
}TpErrCode;
- 日志接口適配:
- 需修改宏定義:TP_PRINT;
- 支持三個(gè)等級(jí)日志打印:錯(cuò)誤信息日志,運(yùn)行信息日志,調(diào)試信息日志的打印。并且支持帶顏色。
#defineTP_PRINTprintf
#defineTP_LOGE(...)TP_PRINT("33[31;22m[E/TP](%s:%d)",__FUNCTION__,__LINE__);
TP_PRINT(__VA_ARGS__);
TP_PRINT("33[0mn")
#defineTP_LOGI(...)TP_PRINT("33[32;22m[I/TP](%s:%d)",__FUNCTION__,__LINE__);
TP_PRINT(__VA_ARGS__);
TP_PRINT("33[0mn")
#defineTP_LOGD(...)TP_PRINT("[D/TP](%s:%d)",__FUNCTION__,__LINE__);
TP_PRINT(__VA_ARGS__);
TP_PRINT("n")
- 內(nèi)存接口:只需適配申請(qǐng)內(nèi)存和釋放內(nèi)存宏定義
#defineTP_MALLOCmalloc
#defineTP_FREEfree
- 線程接口:
//tp_def.h
typedefvoid*TpThreadId;
typedefvoid*(*tpThreadFunc)(void*argv);
typedefstruct{
char*name;
uint32_tstackSize;
uint32_tpriority:8;
uint32_treserver:24;
}TpThreadAttr;
TpThreadIdTpThreadCreate(tpThreadFuncfunc,void*argv,constTpThreadAttr*attr);
voidTpThreadDelete(TpThreadIdthread);
- 創(chuàng)建線程: TpThreadId TpThreadCreate(tpThreadFunc func, void *argv, const TpThreadAttr *attr);
「參數(shù)」 | 「說(shuō)明」 |
---|---|
func | 線程入口函數(shù) |
argv | 線程入口函數(shù)參數(shù) |
attr | 線程屬性:線程名,棧空間,優(yōu)先級(jí) |
「返回」 | -- |
NULL | 創(chuàng)建失敗 |
線程句柄 | 創(chuàng)建成功 |
- 刪除線程:void TpThreadDelete(TpThreadId thread);
「參數(shù)」 | 「說(shuō)明」 |
---|---|
thread | 線程句柄 |
- CMSIS適配:
//tp_threa_adapter.c
#include"tp_def.h"
#include"cmsis_os2.h"
TpThreadIdTpThreadCreate(tpThreadFuncfunc,void*argv,constTpThreadAttr*attr)
{
osThreadId_tthread=NULL;
osThreadAttr_ttaskAttr={
.name=attr->name,
.attr_bits=0,
.cb_mem=NULL,
.cb_size=0,
.stack_mem=NULL,
.stack_size=attr->stackSize,
.priority=(osPriority_t)attr->priority,
.tz_module=0,
.reserved=0,
};
thread=osThreadNew((osThreadFunc_t)func,argv,&taskAttr);
return(TpThreadId)thread;
}
voidTpThreadDelete(TpThreadIdthread)
{
if(thread!=NULL){
osThreadTerminate(thread);
}
}
- 互斥量接口:
//tp_def.h
typedefvoid*TpMutexId;
TpMutexIdTpMutexCreate(void);
TpErrCodeTpMutexLock(TpMutexIdmutex);
TpErrCodeTpMutexUnlock(TpMutexIdmutex);
voidTpMutexDelete(TpMutexIdmutex);
- 創(chuàng)建互斥量:TpMutexId TpMutexCreate(void);
「參數(shù)」 | 「說(shuō)明」 |
---|---|
「返回」 | -- |
NULL | 創(chuàng)建失敗 |
互斥量句柄 | 創(chuàng)建成功 |
- 獲取互斥量:TpErrCode TpMutexLock(TpMutexId mutex);
「參數(shù)」 | 「說(shuō)明」 |
---|---|
mutex | 互斥量句柄 |
「返回」 | -- |
TP_EINVAL | mutex無(wú)效參數(shù) |
TP_ERROR | 獲取互斥量失敗 |
TP_EOK | 成功獲取互斥量 |
- 釋放互斥量:TpErrCode TpMutexUnlock(TpMutexId mutex);
「參數(shù)」 | 「說(shuō)明」 |
---|---|
mutex | 互斥量句柄 |
「返回」 | -- |
TP_EINVAL | mutex無(wú)效參數(shù) |
TP_ERROR | 釋放互斥量失敗 |
TP_EOK | 成功釋放互斥量 |
- 刪除互斥量:void TpMutexDelete(TpMutexId mutex);
「參數(shù)」 | 「說(shuō)明」 |
---|---|
mutex | 互斥量句柄 |
- CMSIS適配:
//tp_mutex_adapter.c
#include"tp_def.h"
#include"cmsis_os2.h"
TpMutexIdTpMutexCreate(void)
{
osMutexId_tmutex=NULL;
mutex=osMutexNew(NULL);
return(TpMutexId)mutex;
}
TpErrCodeTpMutexLock(TpMutexIdmutex)
{
if(mutex==NULL){
returnTP_EINVAL;
}
if(osMutexAcquire((osMutexId_t)mutex,osWaitForever)==osOK){
returnTP_EOK;
}
returnTP_ERROR;
}
TpErrCodeTpMutexUnlock(TpMutexIdmutex)
{
if(mutex==NULL){
returnTP_EINVAL;
}
if(osMutexRelease((osMutexId_t)mutex)==osOK){
returnTP_EOK;
}
returnTP_ERROR;
}
voidTpMutexDelete(TpMutexIdmutex)
{
if(mutex==NULL){
return;
}
osMutexDelete(mutex);
}
- 信號(hào)量接口:
//tp_def.h
typedefvoid*TpSemId;
TpSemIdTpSemCreate(uint32_tvalue);
TpErrCodeTpSemAcquire(TpSemIdsem);
TpErrCodeTpSemRelease(TpSemIdsem);
voidTpSemDelete(TpSemIdsem);
- 創(chuàng)建信號(hào)量:TpSemId TpSemCreate(uint32_t value);
「參數(shù)」 | 「說(shuō)明」 |
---|---|
「返回」 | -- |
NULL | 創(chuàng)建失敗 |
信號(hào)量句柄 | 創(chuàng)建成功 |
- 獲取信號(hào)量:TpErrCode TpSemAcquire(TpSemId sem);
「參數(shù)」 | 「說(shuō)明」 |
---|---|
sem | 信號(hào)量句柄 |
「返回」 | -- |
TP_EINVAL | sem無(wú)效參數(shù) |
TP_ERROR | 獲取信號(hào)量失敗 |
TP_EOK | 成功獲取信號(hào)量 |
- 釋放信號(hào)量:TpErrCode TpSemRelease(TpSemId sem);
「參數(shù)」 | 「說(shuō)明」 |
---|---|
sem | 信號(hào)量句柄 |
「返回」 | -- |
TP_EINVAL | 信號(hào)量無(wú)效參數(shù) |
TP_ERROR | 釋放信號(hào)量失敗 |
TP_EOK | 成功釋放信號(hào)量 |
- 刪除信號(hào)量:void TpSemDelete(TpSemId sem);
「參數(shù)」 | 「說(shuō)明」 |
---|---|
sem | 信號(hào)量句柄 |
- CMSIS適配:
//tp_sem_adapter.c
#include"tp_def.h"
#include"cmsis_os2.h"
TpSemIdTpSemCreate(uint32_tvalue)
{
osSemaphoreId_tsem=NULL;
sem=osSemaphoreNew(1,value,NULL);
return(TpSemId)sem;
}
TpErrCodeTpSemAcquire(TpSemIdsem)
{
if(sem==NULL){
returnTP_EINVAL;
}
if(osSemaphoreAcquire((osSemaphoreId_t)sem,osWaitForever)!=osOK){
returnTP_ERROR;
}
returnTP_EOK;
}
TpErrCodeTpSemRelease(TpSemIdsem)
{
if(sem==NULL){
returnTP_EINVAL;
}
if(osSemaphoreRelease((osSemaphoreId_t)sem)!=osOK){
returnTP_ERROR;
}
returnTP_EOK;
}
voidTpSemDelete(TpSemIdsem)
{
if(sem==NULL){
return;
}
osSemaphoreDelete((osSemaphoreId_t)sem);
}
核心層實(shí)現(xiàn)
tp的提供的接口非常精簡(jiǎn):創(chuàng)建線程池,增加任務(wù)到線程池,銷毀線程池。
- 創(chuàng)建線程池:
- 接口描述:TpErrCode TpCreate(Tp *pool, const char *name, uint32_t stackSize, uint8_t threadNum);
「參數(shù)」 | 「說(shuō)明」 |
---|---|
pool | 線程池句柄 |
name | 線程池中線程名字 |
stackSize | 線程池中線程的棧大小 |
theadNum | 線程池中線程數(shù)目 |
「返回」 | -- |
TP_EINVAL | pool無(wú)效參數(shù) |
TP_ERROR | 創(chuàng)建失敗 |
TP_NOMEM | 內(nèi)存不足 |
TP_EOK | 創(chuàng)建成功 |
-
接口實(shí)現(xiàn):
- ①創(chuàng)建task隊(duì)列增刪互斥量:管理task隊(duì)列的增加及釋放的互斥關(guān)系,保證增加和釋放為同步策略。
- ②創(chuàng)建task隊(duì)列狀態(tài)信號(hào)量:當(dāng)task隊(duì)列非空則釋放信號(hào)量,線程池中的線程可以從task隊(duì)列中獲取task執(zhí)行。
- ③創(chuàng)建線程池中線程:根據(jù)threadNum參數(shù),創(chuàng)建對(duì)應(yīng)的線程數(shù)目。
TpErrCodeTpCreate(Tp*pool,constchar*name,
uint32_tstackSize,uint8_tthreadNum)
{
intindex=0;
if(pool==NULL){
TP_LOGE("ThreadpoolhandleisNULL");
returnTP_EINVAL;
}
//①
if((pool->queueLock=TpMutexCreate())==NULL){
TP_LOGE("Createthreadpoolmutexfailed");
returnTP_ERROR;
}
//②
if((pool->queueReady=TpSemCreate(0))==NULL){
TP_LOGE("Createthreadpoolsemfailed");
returnTP_ERROR;
}
pool->taskQueue=NULL;
pool->threadNum=threadNum;
pool->waitTaskNum=0;
pool->threads=(TpThreadInfo*)TP_MALLOC(threadNum*sizeof(TpThreadInfo));
if(pool->threads==NULL){
TP_LOGE("Mallocthreadpoolinfomemoryfailed");
returnTP_ENOMEM;
}
//③
for(index=0;indexthreads[index].attr.name=(char*)TP_MALLOC(TP_THREAD_NAME_LEN);
if(pool->threads[index].attr.name==NULL){
TP_LOGE("Mallocthreadnamememoryfailed");
returnTP_ENOMEM;
}
snprintf(pool->threads[index].attr.name,TP_THREAD_NAME_LEN,"%s%d",name,index);
pool->threads[index].attr.stackSize=stackSize;
pool->threads[index].attr.priority=TP_THREAD_PRIORITY;
pool->threads[index].threadId=TpThreadCreate(TpThreadHandler,pool,&pool->threads[index].attr);
}
returnTP_EOK;
}
- 增加任務(wù)到線程池:
- 接口描述:TpErrCode TpAddTask(Tp *pool, taskHandle handle, void *argv);
「參數(shù)」 | 「說(shuō)明」 |
---|---|
pool | 線程池句柄 |
handle | 線程池中線程名字 |
argv | 線程池中線程的棧大小 |
「返回」 | -- |
TP_EINVAL | pool無(wú)效參數(shù) |
TP_NOMEM | 內(nèi)存不足 |
TP_EOK | 增加task成功 |
-
接口實(shí)現(xiàn):
- ① 創(chuàng)建一個(gè)task句柄,并將注冊(cè)task函數(shù)和函數(shù)的入?yún)ⅰ?/li>
- ② 獲取task隊(duì)列互斥量,避免增加隊(duì)列成員時(shí),在釋放隊(duì)列成員。
- ③ 釋放task信號(hào)量,通知線程池中的線程可以從task隊(duì)列中獲取task執(zhí)行
TpErrCodeTpAddTask(Tp*pool,taskHandlehandle,void*argv)
{
TpTask*newTask=NULL;
TpTask*taskLIst=NULL;
if(pool==NULL){
TP_LOGE("ThreadpoolhandleisNULL");
returnTP_EINVAL;
}
//①
newTask=(TpTask*)TP_MALLOC(sizeof(TpTask));
if(newTask==NULL){
TP_LOGE("Mallocnewtaskhandlememoryfailed");
returnTP_ENOMEM;
}
newTask->handle=handle;
newTask->argv=argv;
newTask->next=NULL;
//②
TpMutexLock(pool->queueLock);
taskLIst=pool->taskQueue;
if(taskLIst==NULL){
pool->taskQueue=newTask;
}
else{
while(taskLIst->next!=NULL){
taskLIst=taskLIst->next;
}
taskLIst->next=newTask;
}
pool->waitTaskNum++;
TpMutexUnlock(pool->queueLock);
//③
TpSemRelease(pool->queueReady);
returnTP_EOK;
}
- 銷毀線程池
- 接口描述:TpErrCode TpDestroy(Tp *pool);
「參數(shù)」 | 「說(shuō)明」 |
---|---|
pool | 線程池句柄 |
「返回」 | -- |
TP_EINVAL | pool無(wú)效參數(shù) |
TP_EOK | 銷毀成功 |
-
接口實(shí)現(xiàn):
- ① 刪除線程池中所有線程。
- ② 刪除task隊(duì)列互斥量,task狀態(tài)信號(hào)量。
- ③ 刪除線程池的Task隊(duì)列。
TpErrCodeTpDestroy(Tp*pool)
{
intindex=0;
TpTask*head=NULL;
if(pool==NULL){
TP_LOGE("ThreadpoolhandleisNULL");
returnTP_EINVAL;
}
//①
for(index=0;indexthreadNum;index++){
TpThreadDelete(pool->threads[index].threadId);
pool->threads[index].threadId=NULL;
TP_FREE(pool->threads[index].attr.name);
pool->threads[index].attr.name=NULL;
}
//②
TpMutexDelete(pool->queueLock);
pool->queueLock=NULL;
TpSemDelete(pool->queueReady);
pool->queueReady=NULL;
TP_FREE(pool->threads);
pool->threads=NULL;
//③
while(pool->taskQueue!=NULL){
head=pool->taskQueue;
pool->taskQueue=pool->taskQueue->next;
TP_FREE(head);
}
pool=NULL;
returnTP_EOK;
}
- 線程池中線程函數(shù)
- 接口描述:static void *TpThreadHandler(void *argv)
「參數(shù)」 | 「說(shuō)明」 |
---|---|
argv | 線程池參數(shù) |
-
接口實(shí)現(xiàn):
- ① 獲取task隊(duì)列互斥量,避免增加隊(duì)列成員時(shí),在釋放隊(duì)列成員。
- ② 當(dāng)task隊(duì)列為空時(shí),將阻塞在獲取信號(hào)量,等待用戶增加task時(shí)釋放信號(hào)量。
- ③ 當(dāng)task隊(duì)列不為空,則從task隊(duì)列中獲取task,并執(zhí)行。
- ④ 當(dāng)task執(zhí)行完,會(huì)將對(duì)應(yīng)的task句柄刪除。
staticvoid*TpThreadHandler(void*argv)
{
Tp*pool=(Tp*)argv;
TpTask*task=NULL;
while(1){
//①
TpMutexLock(pool->queueLock);
//②
while(pool->waitTaskNum==0){
TpMutexUnlock(pool->queueLock);
TpSemAcquire(pool->queueReady);
TpMutexLock(pool->queueLock);
}
//③
task=pool->taskQueue;
pool->waitTaskNum--;
pool->taskQueue=task->next;
TpMutexUnlock(pool->queueLock);
task->handle(task->argv);
//④
TP_FREE(task);
task=NULL;
}
}
TP應(yīng)用
- 測(cè)試?yán)蹋?/li>
- 創(chuàng)建一個(gè)線程池,線程池中包含3個(gè)線程,線程的名字為tp,棧為1024byte。
- 在線程池中創(chuàng)建6個(gè)task,其中,task參數(shù)為taskId。
#include"tp_manage.h"
Tppool;
voidTestTaskHandle(void*argv)
{
printf("%s--taskId:%drn",__FUNCTION__,(uint32_t)argv);
}
intmain(void)
{
//①
TpCreate(&pool,"tp",1024,3);
//②
TpAddTask(&pool,TestTaskHandle,(void*)1);
TpAddTask(&pool,TestTaskHandle,(void*)2);
TpAddTask(&pool,TestTaskHandle,(void*)3);
TpAddTask(&pool,TestTaskHandle,(void*)4);
TpAddTask(&pool,TestTaskHandle,(void*)5);
TpAddTask(&pool,TestTaskHandle,(void*)6);
return0;
}
- RTOS中的CMSIS運(yùn)行效果:
- Linux中POSIX接口運(yùn)行效果:
總結(jié)
- 線程池是多線程的一個(gè)編程方式,它避免了線程的創(chuàng)建和銷毀的開(kāi)銷,提高了系統(tǒng)的性能。
- 增加到線程池中的任務(wù)是非長(zhǎng)駐的,不能存在死循環(huán),否則她會(huì)一直持有線程池中的某一個(gè)線程。
-
TP線程池組件的開(kāi)發(fā)倉(cāng)庫(kù)鏈接:
歡迎關(guān)注微信公眾號(hào)『Rice嵌入式開(kāi)發(fā)技術(shù)分享』
-
Linux
+關(guān)注
關(guān)注
87文章
11304瀏覽量
209521 -
RTOS
+關(guān)注
關(guān)注
22文章
813瀏覽量
119643 -
組件
+關(guān)注
關(guān)注
1文章
512瀏覽量
17828 -
線程
+關(guān)注
關(guān)注
0文章
504瀏覽量
19684
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論