基本概念
隊列又稱消息隊列,是一種常用于任務(wù)間通信的數(shù)據(jù)結(jié)構(gòu)。隊列接收來自任務(wù)或中斷的不固定長度消息,并根據(jù)不同的接口確定傳遞的消息是否存放在隊列空間中。
任務(wù)能夠從隊列里面讀取消息,當(dāng)隊列中的消息為空時,掛起讀取任務(wù);當(dāng)隊列中有新消息時,掛起的讀取任務(wù)被喚醒并處理新消息。任務(wù)也能夠往隊列里寫入消息,當(dāng)隊列已經(jīng)寫滿消息時,掛起寫入任務(wù);當(dāng)隊列中有空閑消息節(jié)點時,掛起的寫入任務(wù)被喚醒并寫入消息。如果將讀隊列和寫隊列的超時時間設(shè)置為0,則不會掛起任務(wù),接口會直接返回,這就是非阻塞模式。
消息隊列提供了異步處理機(jī)制,允許將一個消息放入隊列,但不立即處理。同時隊列還有緩沖消息的作用。
隊列用于任務(wù)間通信,可以實現(xiàn)消息的異步處理。同時消息的發(fā)送方和接收方不需要彼此聯(lián)系,兩者間是解耦的。
隊列特性
消息以先進(jìn)先出的方式排隊,支持異步讀寫。
讀隊列和寫隊列都支持超時機(jī)制。
每讀取一條消息,就會將該消息節(jié)點設(shè)置為空閑。
發(fā)送消息類型由通信雙方約定,可以允許不同長度(不超過隊列的消息節(jié)點大?。┑南ⅰ?/p>
一個任務(wù)能夠從任意一個消息隊列接收和發(fā)送消息。
多個任務(wù)能夠從同一個消息隊列接收和發(fā)送消息。
創(chuàng)建隊列時所需的隊列空間,默認(rèn)支持接口內(nèi)系統(tǒng)自行動態(tài)申請內(nèi)存的方式,同時也支持將用戶分配的隊列空間作為接口入?yún)魅氲姆绞健?/p>
消息隊列長什么樣?
#ifndef LOSCFG_BASE_IPC_QUEUE_LIMIT #define LOSCFG_BASE_IPC_QUEUE_LIMIT 1024 //隊列個數(shù) #endif LITE_OS_SEC_BSS LosQueueCB *g_allQueue = NULL;//消息隊列池 LITE_OS_SEC_BSS STATIC LOS_DL_LIST g_freeQueueList;//空閑隊列鏈表,管分配的,需要隊列從這里申請 typedef struct { UINT8 *queueHandle; /**< Pointer to a queue handle */ //指向隊列句柄的指針 UINT16 queueState; /**< Queue state */ //隊列狀態(tài) UINT16 queueLen; /**< Queue length */ //隊列中消息總數(shù)的上限值,由創(chuàng)建時確定,不再改變 UINT16 queueSize; /**< Node size */ //消息節(jié)點大小,由創(chuàng)建時確定,不再改變,即定義了每個消息長度的上限. UINT32 queueID; /**< queueID */ //隊列ID UINT16 queueHead; /**< Node head */ //消息頭節(jié)點位置(數(shù)組下標(biāo)) UINT16 queueTail; /**< Node tail */ //消息尾節(jié)點位置(數(shù)組下標(biāo)) UINT16 readWriteableCnt[OS_QUEUE_N_RW]; /**< Count of readable or writable resources, 0:readable, 1:writable */ //隊列中可寫或可讀消息數(shù),0表示可讀,1表示可寫 LOS_DL_LIST readWriteList[OS_QUEUE_N_RW]; /**< the linked list to be read or written, 0:readlist, 1:writelist */ //掛的都是等待讀/寫消息的任務(wù)鏈表,0表示讀消息的鏈表,1表示寫消息的任務(wù)鏈表 LOS_DL_LIST memList; /**< Pointer to the memory linked list */ //@note_why 這里尚未搞明白是啥意思 ,是共享內(nèi)存嗎? } LosQueueCB;//讀寫隊列分離
解讀
和進(jìn)程,線程,定時器一樣,消息隊列也由全局統(tǒng)一的消息隊列池管理,池有多大?默認(rèn)是1024
鴻蒙內(nèi)核對消息的總個數(shù)有限制,queueLen消息總數(shù)的上限,在創(chuàng)建隊列的時候需指定,不能更改.
對每個消息的長度也有限制,queueSize規(guī)定了消息的大小,也是在創(chuàng)建的時候指定.
為啥要指定總個數(shù)和每個的總大小,是因為內(nèi)核一次性會把隊列的總內(nèi)存(queueLen*queueSize)申請下來,確保不會出現(xiàn)后續(xù)使用過程中內(nèi)存不夠的問題出現(xiàn),但同時也帶來了內(nèi)存的浪費,因為很可能大部分時間隊列并沒有跑滿.
隊列的讀取由queueHead,queueTail管理,Head表示隊列中被占用的消息節(jié)點的起始位置。Tail表示被占用的消息節(jié)點的結(jié)束位置,也是空閑消息節(jié)點的起始位置。隊列剛創(chuàng)建時,Head和Tail均指向隊列起始位置
寫隊列時,根據(jù)readWriteableCnt[1]判斷隊列是否可以寫入,不能對已滿(readWriteableCnt[1]為0)隊列進(jìn)行寫操作。寫隊列支持兩種寫入方式:向隊列尾節(jié)點寫入,也可以向隊列頭節(jié)點寫入。尾節(jié)點寫入時,根據(jù)Tail找到起始空閑消息節(jié)點作為數(shù)據(jù)寫入對象,如果Tail已經(jīng)指向隊列尾部則采用回卷方式。頭節(jié)點寫入時,將Head的前一個節(jié)點作為數(shù)據(jù)寫入對象,如果Head指向隊列起始位置則采用回卷方式。
讀隊列時,根據(jù)readWriteableCnt[0]判斷隊列是否有消息需要讀取,對全部空閑(readWriteableCnt[0]為0)隊列進(jìn)行讀操作會引起任務(wù)掛起。如果隊列可以讀取消息,則根據(jù)Head找到最先寫入隊列的消息節(jié)點進(jìn)行讀取。如果Head已經(jīng)指向隊列尾部則采用回卷方式。
刪除隊列時,根據(jù)隊列ID找到對應(yīng)隊列,把隊列狀態(tài)置為未使用,把隊列控制塊置為初始狀態(tài)。如果是通過系統(tǒng)動態(tài)申請內(nèi)存方式創(chuàng)建的隊列,還會釋放隊列所占內(nèi)存。
留意readWriteList,這又是兩個雙向鏈表, 雙向鏈表是內(nèi)核最重要的結(jié)構(gòu)體,牢牢的寄生在宿主結(jié)構(gòu)體上.readWriteList上掛的是未來讀/寫消息隊列的任務(wù)列表.
初始化隊列
LITE_OS_SEC_TEXT_INIT UINT32 OsQueueInit(VOID)//消息隊列模塊初始化 { LosQueueCB *queueNode = NULL; UINT32 index; UINT32 size; size = LOSCFG_BASE_IPC_QUEUE_LIMIT * sizeof(LosQueueCB);//支持1024個IPC隊列 /* system resident memory, don't free */ g_allQueue = (LosQueueCB *)LOS_MemAlloc(m_aucSysMem0, size);//常駐內(nèi)存 if (g_allQueue == NULL) { return LOS_ERRNO_QUEUE_NO_MEMORY; } (VOID)memset_s(g_allQueue, size, 0, size);//清0 LOS_ListInit(&g_freeQueueList);//初始化空閑鏈表 for (index = 0; index < LOSCFG_BASE_IPC_QUEUE_LIMIT; index++) {//循環(huán)初始化每個消息隊列 queueNode = ((LosQueueCB *)g_allQueue) + index;//一個一個來 queueNode->queueID = index;//這可是 隊列的身份證 LOS_ListTailInsert(&g_freeQueueList, &queueNode->readWriteList[OS_QUEUE_WRITE]);//通過寫節(jié)點掛到空閑隊列鏈表上 }//這里要注意是用 readWriteList 掛到 g_freeQueueList鏈上的,所以要通過 GET_QUEUE_LIST 來找到 LosQueueCB if (OsQueueDbgInitHook() != LOS_OK) {//調(diào)試隊列使用的. return LOS_ERRNO_QUEUE_NO_MEMORY; } return LOS_OK; }
解讀
初始隊列模塊,對幾個全局變量賦值,創(chuàng)建消息隊列池,所有池都是常駐內(nèi)存,關(guān)于池后續(xù)有專門的文章整理,到目前為止已經(jīng)解除到了進(jìn)程池,任務(wù)池,定時器池,隊列池,==
將LOSCFG_BASE_IPC_QUEUE_LIMIT個隊列掛到空閑鏈表g_freeQueueList上,供后續(xù)分配和回收.熟悉內(nèi)核全局資源管理的對這種方式應(yīng)該不會再陌生.
創(chuàng)建隊列
//創(chuàng)建一個隊列,根據(jù)用戶傳入隊列長度和消息節(jié)點大小來開辟相應(yīng)的內(nèi)存空間以供該隊列使用,參數(shù)queueID帶走隊列ID LITE_OS_SEC_TEXT_INIT UINT32 LOS_QueueCreate(CHAR *queueName, UINT16 len, UINT32 *queueID, UINT32 flags, UINT16 maxMsgSize) { LosQueueCB *queueCB = NULL; UINT32 intSave; LOS_DL_LIST *unusedQueue = NULL; UINT8 *queue = NULL; UINT16 msgSize; (VOID)queueName; (VOID)flags; if (queueID == NULL) { return LOS_ERRNO_QUEUE_CREAT_PTR_NULL; } if (maxMsgSize > (OS_NULL_SHORT - sizeof(UINT32))) {// maxMsgSize上限 為啥要減去 sizeof(UINT32) ,因為前面存的是隊列的大小 return LOS_ERRNO_QUEUE_SIZE_TOO_BIG; } if ((len == 0) || (maxMsgSize == 0)) { return LOS_ERRNO_QUEUE_PARA_ISZERO; } msgSize = maxMsgSize + sizeof(UINT32);//總size = 消息體內(nèi)容長度 + 消息大小(UINT32) /* * Memory allocation is time-consuming, to shorten the time of disable interrupt, * move the memory allocation to here. *///內(nèi)存分配非常耗時,為了縮短禁用中斷的時間,將內(nèi)存分配移到此處,用的時候分配隊列內(nèi)存 queue = (UINT8 *)LOS_MemAlloc(m_aucSysMem1, (UINT32)len * msgSize);//從系統(tǒng)內(nèi)存池中分配,由這里提供讀寫隊列的內(nèi)存 if (queue == NULL) {//這里是一次把隊列要用到的所有最大內(nèi)存都申請下來了,能保證不會出現(xiàn)后續(xù)使用過程中內(nèi)存不夠的問題出現(xiàn) return LOS_ERRNO_QUEUE_CREATE_NO_MEMORY;//調(diào)用處有 OsSwtmrInit sys_mbox_new DoMqueueCreate == } SCHEDULER_LOCK(intSave); if (LOS_ListEmpty(&g_freeQueueList)) {//沒有空余的隊列ID的處理,注意軟時鐘定時器是由 g_swtmrCBArray統(tǒng)一管理的,里面有正在使用和可分配空閑的隊列 SCHEDULER_UNLOCK(intSave);//g_freeQueueList是管理可用于分配的隊列鏈表,申請消息隊列的ID需要向它要 OsQueueCheckHook(); (VOID)LOS_MemFree(m_aucSysMem1, queue);//沒有就要釋放 queue申請的內(nèi)存 return LOS_ERRNO_QUEUE_CB_UNAVAILABLE; } unusedQueue = LOS_DL_LIST_FIRST(&g_freeQueueList);//找到一個沒有被使用的隊列 LOS_ListDelete(unusedQueue);//將自己從g_freeQueueList中摘除, unusedQueue只是個 LOS_DL_LIST 結(jié)點. queueCB = GET_QUEUE_LIST(unusedQueue);//通過unusedQueue找到整個消息隊列(LosQueueCB) queueCB->queueLen = len; //隊列中消息的總個數(shù),注意這個一旦創(chuàng)建是不能變的. queueCB->queueSize = msgSize;//消息節(jié)點的大小,注意這個一旦創(chuàng)建也是不能變的. queueCB->queueHandle = queue; //隊列句柄,隊列內(nèi)容存儲區(qū). queueCB->queueState = OS_QUEUE_INUSED; //隊列狀態(tài)使用中 queueCB->readWriteableCnt[OS_QUEUE_READ] = 0;//可讀資源計數(shù),OS_QUEUE_READ(0):可讀. queueCB->readWriteableCnt[OS_QUEUE_WRITE] = len;//可些資源計數(shù) OS_QUEUE_WRITE(1):可寫, 默認(rèn)len可寫. queueCB->queueHead = 0;//隊列頭節(jié)點 queueCB->queueTail = 0;//隊列尾節(jié)點 LOS_ListInit(&queueCB->readWriteList[OS_QUEUE_READ]);//初始化可讀隊列鏈表 LOS_ListInit(&queueCB->readWriteList[OS_QUEUE_WRITE]);//初始化可寫隊列鏈表 LOS_ListInit(&queueCB->memList);// OsQueueDbgUpdateHook(queueCB->queueID, OsCurrTaskGet()->taskEntry);//在創(chuàng)建或刪除隊列調(diào)試信息時更新任務(wù)條目 SCHEDULER_UNLOCK(intSave); *queueID = queueCB->queueID;//帶走隊列ID return LOS_OK; }
解讀
創(chuàng)建和初始化一個LosQueueCB
動態(tài)分配內(nèi)存來保存消息內(nèi)容,LOS_MemAlloc(m_aucSysMem1, (UINT32)len * msgSize);
msgSize = maxMsgSize + sizeof(UINT32);頭四個字節(jié)放消息的長度,但消息最大長度不能超過maxMsgSize
readWriteableCnt記錄讀/寫隊列的數(shù)量,獨立計算
readWriteList掛的是等待讀取隊列的任務(wù)鏈表 將在OsTaskWait(&queueCB->readWriteList[readWrite], timeout, TRUE);中將任務(wù)掛到鏈表上.
關(guān)鍵函數(shù)OsQueueOperate
隊列的讀寫都要經(jīng)過OsQueueOperate
/************************************************ 隊列操作.是讀是寫由operateType定 本函數(shù)是消息隊列最重要的一個函數(shù),可以分析出讀取消息過程中 發(fā)生的細(xì)節(jié),涉及任務(wù)的喚醒和阻塞,阻塞鏈表任務(wù)的相互提醒. ************************************************/ UINT32 OsQueueOperate(UINT32 queueID, UINT32 operateType, VOID *bufferAddr, UINT32 *bufferSize, UINT32 timeout) { LosQueueCB *queueCB = NULL; LosTaskCB *resumedTask = NULL; UINT32 ret; UINT32 readWrite = OS_QUEUE_READ_WRITE_GET(operateType);//獲取讀/寫操作標(biāo)識 UINT32 intSave; SCHEDULER_LOCK(intSave); queueCB = (LosQueueCB *)GET_QUEUE_HANDLE(queueID);//獲取對應(yīng)的隊列控制塊 ret = OsQueueOperateParamCheck(queueCB, queueID, operateType, bufferSize);//參數(shù)檢查 if (ret != LOS_OK) { goto QUEUE_END; } if (queueCB->readWriteableCnt[readWrite] == 0) {//根據(jù)readWriteableCnt判斷隊列是否有消息讀/寫 if (timeout == LOS_NO_WAIT) {//不等待直接退出 ret = OS_QUEUE_IS_READ(operateType) ? LOS_ERRNO_QUEUE_ISEMPTY : LOS_ERRNO_QUEUE_ISFULL; goto QUEUE_END; } if (!OsPreemptableInSched()) {//不支持搶占式調(diào)度 ret = LOS_ERRNO_QUEUE_PEND_IN_LOCK; goto QUEUE_END; } //任務(wù)等待,這里很重要啊,將自己從就緒列表摘除,讓出了CPU并發(fā)起了調(diào)度,并掛在readWriteList[readWrite]上,掛的都等待讀/寫消息的task ret = OsTaskWait(&queueCB->readWriteList[readWrite], timeout, TRUE);//任務(wù)被喚醒后會回到這里執(zhí)行,什么時候會被喚醒?當(dāng)然是有消息的時候! if (ret == LOS_ERRNO_TSK_TIMEOUT) {//喚醒后如果超時了,返回讀/寫消息失敗 ret = LOS_ERRNO_QUEUE_TIMEOUT; goto QUEUE_END;// } } else { queueCB->readWriteableCnt[readWrite]--;//對應(yīng)隊列中計數(shù)器--,說明一條消息只能被讀/寫一次 } OsQueueBufferOperate(queueCB, operateType, bufferAddr, bufferSize);//發(fā)起讀或?qū)戧犃胁僮? if (!LOS_ListEmpty(&queueCB->readWriteList[!readWrite])) {//如果還有任務(wù)在排著隊等待讀/寫入消息(當(dāng)時不能讀/寫的原因有可能當(dāng)時隊列滿了==) resumedTask = OS_TCB_FROM_PENDLIST(LOS_DL_LIST_FIRST(&queueCB->readWriteList[!readWrite]));//取出要讀/寫消息的任務(wù) OsTaskWake(resumedTask);//喚醒任務(wù)去讀/寫消息啊 SCHEDULER_UNLOCK(intSave); LOS_MpSchedule(OS_MP_CPU_ALL);//讓所有CPU發(fā)出調(diào)度申請,因為很可能那個要讀/寫消息的隊列是由其他CPU執(zhí)行 LOS_Schedule();//申請調(diào)度 return LOS_OK; } else { queueCB->readWriteableCnt[!readWrite]++;//對應(yīng)隊列讀/寫中計數(shù)器++ } QUEUE_END: SCHEDULER_UNLOCK(intSave); return ret; }
解讀
queueID指定操作消息隊列池中哪個消息隊列
operateType表示本次是是讀/寫消息
bufferAddr,bufferSize表示如果讀操作,用buf接走數(shù)據(jù),如果寫操作,將buf寫入隊列.
timeout只用于當(dāng)隊列中沒有讀/寫內(nèi)容時的等待.
當(dāng)讀消息時發(fā)現(xiàn)隊列中沒有可讀的消息,此時timeout決定是否將任務(wù)掛入等待讀隊列任務(wù)鏈表
當(dāng)寫消息時發(fā)現(xiàn)隊列中沒有空間用于寫的消息,此時timeout決定是否將任務(wù)掛入等待寫隊列任務(wù)鏈表
if (!LOS_ListEmpty(&queueCB->readWriteList[!readWrite]))最有意思的是這行代碼.
在一次讀消息完成后會立即喚醒寫隊列任務(wù)鏈表的任務(wù),因為讀完了就有了剩余空間,等待寫隊列的任務(wù)往往是因為沒有空間而進(jìn)入等待狀態(tài).
在一次寫消息完成后會立即喚醒讀隊列任務(wù)鏈表的任務(wù),因為寫完了隊列就有了新消息,等待讀隊列的任務(wù)往往是因為隊列中沒有消息而進(jìn)入等待狀態(tài).
編程實例
創(chuàng)建一個隊列,兩個任務(wù)。任務(wù)1調(diào)用寫隊列接口發(fā)送消息,任務(wù)2通過讀隊列接口接收消息。
通過LOS_TaskCreate創(chuàng)建任務(wù)1和任務(wù)2。
通過LOS_QueueCreate創(chuàng)建一個消息隊列。
在任務(wù)1 send_Entry中發(fā)送消息。
在任務(wù)2 recv_Entry中接收消息。
通過LOS_QueueDelete刪除隊列。
#include "los_task.h" #include "los_queue.h" static UINT32 g_queue; #define BUFFER_LEN 50 VOID send_Entry(VOID) { UINT32 i = 0; UINT32 ret = 0; CHAR abuf[] = "test is message x"; UINT32 len = sizeof(abuf); while (i < 5) { abuf[len -2] = '0' + i; i++; ret = LOS_QueueWriteCopy(g_queue, abuf, len, 0); if(ret != LOS_OK) { dprintf("send message failure, error: %x\n", ret); } LOS_TaskDelay(5); } } VOID recv_Entry(VOID) { UINT32 ret = 0; CHAR readBuf[BUFFER_LEN] = {0}; UINT32 readLen = BUFFER_LEN; while (1) { ret = LOS_QueueReadCopy(g_queue, readBuf, &readLen, 0); if(ret != LOS_OK) { dprintf("recv message failure, error: %x\n", ret); break; } dprintf("recv message: %s\n", readBuf); LOS_TaskDelay(5); } while (LOS_OK != LOS_QueueDelete(g_queue)) { LOS_TaskDelay(1); } dprintf("delete the queue success!\n"); } UINT32 Example_CreateTask(VOID) { UINT32 ret = 0; UINT32 task1, task2; TSK_INIT_PARAM_S initParam; initParam.pfnTaskEntry = (TSK_ENTRY_FUNC)send_Entry; initParam.usTaskPrio = 9; initParam.uwStackSize = LOS_TASK_MIN_STACK_SIZE; initParam.pcName = "sendQueue"; #ifdef LOSCFG_KERNEL_SMP initParam.usCpuAffiMask = CPUID_TO_AFFI_MASK(ArchCurrCpuid()); #endif initParam.uwResved = LOS_TASK_STATUS_DETACHED; LOS_TaskLock(); ret = LOS_TaskCreate(&task1, &initParam); if(ret != LOS_OK) { dprintf("create task1 failed, error: %x\n", ret); return ret; } initParam.pcName = "recvQueue"; initParam.pfnTaskEntry = (TSK_ENTRY_FUNC)recv_Entry; ret = LOS_TaskCreate(&task2, &initParam); if(ret != LOS_OK) { dprintf("create task2 failed, error: %x\n", ret); return ret; } ret = LOS_QueueCreate("queue", 5, &g_queue, 0, BUFFER_LEN); if(ret != LOS_OK) { dprintf("create queue failure, error: %x\n", ret); } dprintf("create the queue success!\n"); LOS_TaskUnlock(); return ret; }
結(jié)果驗證
create the queue success! recv message: test is message 0 recv message: test is message 1 recv message: test is message 2 recv message: test is message 3 recv message: test is message 4 recv message failure, error: 200061d delete the queue success!
總結(jié)
消息隊列解決任務(wù)間大數(shù)據(jù)的傳遞
以一種異步,解耦的方式實現(xiàn)任務(wù)通訊
全局由消息隊列池統(tǒng)一管理
在創(chuàng)建消息隊列時申請內(nèi)存塊存儲消息內(nèi)存.
讀/寫操作統(tǒng)一管理,分開執(zhí)行,A任務(wù)讀/寫完消息后會立即喚醒等待寫/讀的B任務(wù).
編輯:hfy
-
異步處理
+關(guān)注
關(guān)注
0文章
7瀏覽量
6564
發(fā)布評論請先 登錄
相關(guān)推薦
評論