自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

鴻蒙內(nèi)核源碼分析(消息隊(duì)列篇) | 進(jìn)程間如何異步解耦傳遞大數(shù)據(jù)

開發(fā) 前端
文章由鴻蒙社區(qū)產(chǎn)出,想要了解更多內(nèi)容請(qǐng)前往:51CTO和華為官方戰(zhàn)略合作共建的鴻蒙技術(shù)社區(qū)https://harmonyos.51cto.com

[[391696]]

想了解更多內(nèi)容,請(qǐng)?jiān)L問:

51CTO和華為官方合作共建的鴻蒙技術(shù)社區(qū)

https://harmonyos.51cto.com

 基本概念

● 隊(duì)列又稱消息隊(duì)列,是一種常用于任務(wù)間通信的數(shù)據(jù)結(jié)構(gòu)。隊(duì)列接收來自任務(wù)或中斷的不固定長度消息,并根據(jù)不同的接口確定傳遞的消息是否存放在隊(duì)列空間中。

● 任務(wù)能夠從隊(duì)列里面讀取消息,當(dāng)隊(duì)列中的消息為空時(shí),掛起讀取任務(wù);當(dāng)隊(duì)列中有新消息時(shí),掛起的讀取任務(wù)被喚醒并處理新消息。任務(wù)也能夠往隊(duì)列里寫入消息,當(dāng)隊(duì)列已經(jīng)寫滿消息時(shí),掛起寫入任務(wù);當(dāng)隊(duì)列中有空閑消息節(jié)點(diǎn)時(shí),掛起的寫入任務(wù)被喚醒并寫入消息。如果將讀隊(duì)列和寫隊(duì)列的超時(shí)時(shí)間設(shè)置為0,則不會(huì)掛起任務(wù),接口會(huì)直接返回,這就是非阻塞模式。

● 消息隊(duì)列提供了異步處理機(jī)制,允許將一個(gè)消息放入隊(duì)列,但不立即處理。同時(shí)隊(duì)列還有緩沖消息的作用。

● 隊(duì)列用于任務(wù)間通信,可以實(shí)現(xiàn)消息的異步處理。同時(shí)消息的發(fā)送方和接收方不需要彼此聯(lián)系,兩者間是解耦的。

[[391697]]

隊(duì)列特性

● 消息以先進(jìn)先出的方式排隊(duì),支持異步讀寫。

● 讀隊(duì)列和寫隊(duì)列都支持超時(shí)機(jī)制。

● 每讀取一條消息,就會(huì)將該消息節(jié)點(diǎn)設(shè)置為空閑。

● 發(fā)送消息類型由通信雙方約定,可以允許不同長度(不超過隊(duì)列的消息節(jié)點(diǎn)大小)的消息。

● 一個(gè)任務(wù)能夠從任意一個(gè)消息隊(duì)列接收和發(fā)送消息。

● 多個(gè)任務(wù)能夠從同一個(gè)消息隊(duì)列接收和發(fā)送消息。

● 創(chuàng)建隊(duì)列時(shí)所需的隊(duì)列空間,默認(rèn)支持接口內(nèi)系統(tǒng)自行動(dòng)態(tài)申請(qǐng)內(nèi)存的方式,同時(shí)也支持將用戶分配的隊(duì)列空間作為接口入?yún)魅氲姆绞健?/p>

消息隊(duì)列長什么樣?

  1. #ifndef LOSCFG_BASE_IPC_QUEUE_LIMIT 
  2. #define LOSCFG_BASE_IPC_QUEUE_LIMIT 1024 //隊(duì)列個(gè)數(shù) 
  3. #endif 
  4. LITE_OS_SEC_BSS LosQueueCB *g_allQueue = NULL;//消息隊(duì)列池 
  5. LITE_OS_SEC_BSS STATIC LOS_DL_LIST g_freeQueueList;//空閑隊(duì)列鏈表,管分配的,需要隊(duì)列從這里申請(qǐng) 
  6.  
  7. typedef struct { 
  8.     UINT8 *queueHandle; /**< Pointer to a queue handle */   //指向隊(duì)列句柄的指針 
  9.     UINT16 queueState; /**< Queue state */  //隊(duì)列狀態(tài) 
  10.     UINT16 queueLen; /**< Queue length */   //隊(duì)列中消息總數(shù)的上限值,由創(chuàng)建時(shí)確定,不再改變 
  11.     UINT16 queueSize; /**< Node size */     //消息節(jié)點(diǎn)大小,由創(chuàng)建時(shí)確定,不再改變,即定義了每個(gè)消息長度的上限. 
  12.     UINT32 queueID; /**< queueID */         //隊(duì)列ID 
  13.     UINT16 queueHead; /**< Node head */     //消息頭節(jié)點(diǎn)位置(數(shù)組下標(biāo)) 
  14.     UINT16 queueTail; /**< Node tail */     //消息尾節(jié)點(diǎn)位置(數(shù)組下標(biāo)) 
  15.     UINT16 readWriteableCnt[OS_QUEUE_N_RW]; /**< Count of readable or writable resources, 0:readable, 1:writable */ 
  16.                                             //隊(duì)列中可寫或可讀消息數(shù),0表示可讀,1表示可寫 
  17.     LOS_DL_LIST readWriteList[OS_QUEUE_N_RW]; /**< the linked list to be read or written, 0:readlist, 1:writelist */ 
  18.                                             //掛的都是等待讀/寫消息的任務(wù)鏈表,0表示讀消息的鏈表,1表示寫消息的任務(wù)鏈表 
  19.     LOS_DL_LIST memList; /**< Pointer to the memory linked list */  //@note_why 這里尚未搞明白是啥意思 ,是共享內(nèi)存嗎? 
  20. } LosQueueCB;//讀寫隊(duì)列分離 

解讀

● 和進(jìn)程,線程,定時(shí)器一樣,消息隊(duì)列也由全局統(tǒng)一的消息隊(duì)列池管理,池有多大?默認(rèn)是1024

● 鴻蒙內(nèi)核對(duì)消息的總個(gè)數(shù)有限制,queueLen消息總數(shù)的上限,在創(chuàng)建隊(duì)列的時(shí)候需指定,不能更改.

● 對(duì)每個(gè)消息的長度也有限制, queueSize 規(guī)定了消息的大小,也是在創(chuàng)建的時(shí)候指定.

● 為啥要指定總個(gè)數(shù)和每個(gè)的總大小,是因?yàn)閮?nèi)核一次性會(huì)把隊(duì)列的總內(nèi)存(queueLen*queueSize)申請(qǐng)下來,確保不會(huì)出現(xiàn)后續(xù)使用過程中內(nèi)存不夠的問題出現(xiàn),但同時(shí)也帶來了內(nèi)存的浪費(fèi),因?yàn)楹芸赡艽蟛糠謺r(shí)間隊(duì)列并沒有跑滿.

● 隊(duì)列的讀取由queueHead,queueTail管理,Head表示隊(duì)列中被占用的消息節(jié)點(diǎn)的起始位置。Tail表示被占用的消息節(jié)點(diǎn)的結(jié)束位置,也是空閑消息節(jié)點(diǎn)的起始位置。隊(duì)列剛創(chuàng)建時(shí),Head和Tail均指向隊(duì)列起始位置

● 寫隊(duì)列時(shí),根據(jù)readWriteableCnt[1]判斷隊(duì)列是否可以寫入,不能對(duì)已滿(readWriteableCnt[1]為0)隊(duì)列進(jìn)行寫操作。寫隊(duì)列支持兩種寫入方式:向隊(duì)列尾節(jié)點(diǎn)寫入,也可以向隊(duì)列頭節(jié)點(diǎn)寫入。尾節(jié)點(diǎn)寫入時(shí),根據(jù)Tail找到起始空閑消息節(jié)點(diǎn)作為數(shù)據(jù)寫入對(duì)象,如果Tail已經(jīng)指向隊(duì)列尾部則采用回卷方式。頭節(jié)點(diǎn)寫入時(shí),將Head的前一個(gè)節(jié)點(diǎn)作為數(shù)據(jù)寫入對(duì)象,如果Head指向隊(duì)列起始位置則采用回卷方式。

● 讀隊(duì)列時(shí),根據(jù)readWriteableCnt[0]判斷隊(duì)列是否有消息需要讀取,對(duì)全部空閑(readWriteableCnt[0]為0)隊(duì)列進(jìn)行讀操作會(huì)引起任務(wù)掛起。如果隊(duì)列可以讀取消息,則根據(jù)Head找到最先寫入隊(duì)列的消息節(jié)點(diǎn)進(jìn)行讀取。如果Head已經(jīng)指向隊(duì)列尾部則采用回卷方式。

● 刪除隊(duì)列時(shí),根據(jù)隊(duì)列ID找到對(duì)應(yīng)隊(duì)列,把隊(duì)列狀態(tài)置為未使用,把隊(duì)列控制塊置為初始狀態(tài)。如果是通過系統(tǒng)動(dòng)態(tài)申請(qǐng)內(nèi)存方式創(chuàng)建的隊(duì)列,還會(huì)釋放隊(duì)列所占內(nèi)存。

● 留意readWriteList,這又是兩個(gè)雙向鏈表, 雙向鏈表是內(nèi)核最重要的結(jié)構(gòu)體,牢牢的寄生在宿主結(jié)構(gòu)體上.readWriteList上掛的是未來讀/寫消息隊(duì)列的任務(wù)列表.

初始化隊(duì)列

  1. LITE_OS_SEC_TEXT_INIT UINT32 OsQueueInit(VOID)//消息隊(duì)列模塊初始化 
  2.     LosQueueCB *queueNode = NULL
  3.     UINT32 index
  4.     UINT32 size
  5.  
  6.     size = LOSCFG_BASE_IPC_QUEUE_LIMIT * sizeof(LosQueueCB);//支持1024個(gè)IPC隊(duì)列 
  7.     /* system resident memory, don't free */ 
  8.     g_allQueue = (LosQueueCB *)LOS_MemAlloc(m_aucSysMem0, size);//常駐內(nèi)存 
  9.     if (g_allQueue == NULL) { 
  10.         return LOS_ERRNO_QUEUE_NO_MEMORY; 
  11.     } 
  12.     (VOID)memset_s(g_allQueue, size, 0, size);//清0 
  13.     LOS_ListInit(&g_freeQueueList);//初始化空閑鏈表 
  14.     for (index = 0; index < LOSCFG_BASE_IPC_QUEUE_LIMIT; index++) {//循環(huán)初始化每個(gè)消息隊(duì)列 
  15.         queueNode = ((LosQueueCB *)g_allQueue) + index;//一個(gè)一個(gè)來 
  16.         queueNode->queueID = index;//這可是 隊(duì)列的身份證 
  17.         LOS_ListTailInsert(&g_freeQueueList, &queueNode->readWriteList[OS_QUEUE_WRITE]);//通過寫節(jié)點(diǎn)掛到空閑隊(duì)列鏈表上 
  18.     }//這里要注意是用 readWriteList 掛到 g_freeQueueList鏈上的,所以要通過 GET_QUEUE_LIST 來找到 LosQueueCB 
  19.  
  20.     if (OsQueueDbgInitHook() != LOS_OK) {//調(diào)試隊(duì)列使用的. 
  21.         return LOS_ERRNO_QUEUE_NO_MEMORY; 
  22.     } 
  23.     return LOS_OK; 

解讀

● 初始隊(duì)列模塊,對(duì)幾個(gè)全局變量賦值,創(chuàng)建消息隊(duì)列池,所有池都是常駐內(nèi)存,關(guān)于池后續(xù)有專門的文章整理,到目前為止已經(jīng)解除到了進(jìn)程池,任務(wù)池,定時(shí)器池,隊(duì)列池,==

● 將LOSCFG_BASE_IPC_QUEUE_LIMIT個(gè)隊(duì)列掛到空閑鏈表g_freeQueueList上,供后續(xù)分配和回收.熟悉內(nèi)核全局資源管理的對(duì)這種方式應(yīng)該不會(huì)再陌生.

創(chuàng)建隊(duì)列

  1. //創(chuàng)建一個(gè)隊(duì)列,根據(jù)用戶傳入隊(duì)列長度和消息節(jié)點(diǎn)大小來開辟相應(yīng)的內(nèi)存空間以供該隊(duì)列使用,參數(shù)queueID帶走隊(duì)列ID 
  2. LITE_OS_SEC_TEXT_INIT UINT32 LOS_QueueCreate(CHAR *queueName, UINT16 len, UINT32 *queueID, 
  3.                                              UINT32 flags, UINT16 maxMsgSize) 
  4.     LosQueueCB *queueCB = NULL
  5.     UINT32 intSave; 
  6.     LOS_DL_LIST *unusedQueue = NULL
  7.     UINT8 *queue = NULL
  8.     UINT16 msgSize; 
  9.  
  10.     (VOID)queueName; 
  11.     (VOID)flags; 
  12.  
  13.     if (queueID == NULL) { 
  14.         return LOS_ERRNO_QUEUE_CREAT_PTR_NULL; 
  15.     } 
  16.  
  17.     if (maxMsgSize > (OS_NULL_SHORT - sizeof(UINT32))) {// maxMsgSize上限 為啥要減去 sizeof(UINT32) ,因?yàn)榍懊娲娴氖顷?duì)列的大小 
  18.         return LOS_ERRNO_QUEUE_SIZE_TOO_BIG; 
  19.     } 
  20.  
  21.     if ((len == 0) || (maxMsgSize == 0)) { 
  22.         return LOS_ERRNO_QUEUE_PARA_ISZERO; 
  23.     } 
  24.  
  25.     msgSize = maxMsgSize + sizeof(UINT32);//總size = 消息體內(nèi)容長度 + 消息大小(UINT32)  
  26.     /* 
  27.      * Memory allocation is time-consuming, to shorten the time of disable interrupt, 
  28.      * move the memory allocation to here. 
  29.      *///內(nèi)存分配非常耗時(shí),為了縮短禁用中斷的時(shí)間,將內(nèi)存分配移到此處,用的時(shí)候分配隊(duì)列內(nèi)存 
  30.     queue = (UINT8 *)LOS_MemAlloc(m_aucSysMem1, (UINT32)len * msgSize);//從系統(tǒng)內(nèi)存池中分配,由這里提供讀寫隊(duì)列的內(nèi)存 
  31.     if (queue == NULL) {//這里是一次把隊(duì)列要用到的所有最大內(nèi)存都申請(qǐng)下來了,能保證不會(huì)出現(xiàn)后續(xù)使用過程中內(nèi)存不夠的問題出現(xiàn) 
  32.         return LOS_ERRNO_QUEUE_CREATE_NO_MEMORY;//調(diào)用處有 OsSwtmrInit sys_mbox_new DoMqueueCreate == 
  33.     } 
  34.  
  35.     SCHEDULER_LOCK(intSave); 
  36.     if (LOS_ListEmpty(&g_freeQueueList)) {//沒有空余的隊(duì)列ID的處理,注意軟時(shí)鐘定時(shí)器是由 g_swtmrCBArray統(tǒng)一管理的,里面有正在使用和可分配空閑的隊(duì)列 
  37.         SCHEDULER_UNLOCK(intSave);//g_freeQueueList是管理可用于分配的隊(duì)列鏈表,申請(qǐng)消息隊(duì)列的ID需要向它要 
  38.         OsQueueCheckHook(); 
  39.         (VOID)LOS_MemFree(m_aucSysMem1, queue);//沒有就要釋放 queue申請(qǐng)的內(nèi)存 
  40.         return LOS_ERRNO_QUEUE_CB_UNAVAILABLE; 
  41.     } 
  42.  
  43.     unusedQueue = LOS_DL_LIST_FIRST(&g_freeQueueList);//找到一個(gè)沒有被使用的隊(duì)列 
  44.     LOS_ListDelete(unusedQueue);//將自己從g_freeQueueList中摘除, unusedQueue只是個(gè) LOS_DL_LIST 結(jié)點(diǎn). 
  45.     queueCB = GET_QUEUE_LIST(unusedQueue);//通過unusedQueue找到整個(gè)消息隊(duì)列(LosQueueCB) 
  46.     queueCB->queueLen = len;    //隊(duì)列中消息的總個(gè)數(shù),注意這個(gè)一旦創(chuàng)建是不能變的. 
  47.     queueCB->queueSize = msgSize;//消息節(jié)點(diǎn)的大小,注意這個(gè)一旦創(chuàng)建也是不能變的. 
  48.     queueCB->queueHandle = queue;   //隊(duì)列句柄,隊(duì)列內(nèi)容存儲(chǔ)區(qū).  
  49.     queueCB->queueState = OS_QUEUE_INUSED;  //隊(duì)列狀態(tài)使用中 
  50.     queueCB->readWriteableCnt[OS_QUEUE_READ] = 0;//可讀資源計(jì)數(shù),OS_QUEUE_READ(0):可讀. 
  51.     queueCB->readWriteableCnt[OS_QUEUE_WRITE] = len;//可些資源計(jì)數(shù) OS_QUEUE_WRITE(1):可寫, 默認(rèn)len可寫. 
  52.     queueCB->queueHead = 0;//隊(duì)列頭節(jié)點(diǎn) 
  53.     queueCB->queueTail = 0;//隊(duì)列尾節(jié)點(diǎn) 
  54.     LOS_ListInit(&queueCB->readWriteList[OS_QUEUE_READ]);//初始化可讀隊(duì)列鏈表 
  55.     LOS_ListInit(&queueCB->readWriteList[OS_QUEUE_WRITE]);//初始化可寫隊(duì)列鏈表 
  56.     LOS_ListInit(&queueCB->memList);// 
  57.  
  58.     OsQueueDbgUpdateHook(queueCB->queueID, OsCurrTaskGet()->taskEntry);//在創(chuàng)建或刪除隊(duì)列調(diào)試信息時(shí)更新任務(wù)條目 
  59.     SCHEDULER_UNLOCK(intSave); 
  60.  
  61.     *queueID = queueCB->queueID;//帶走隊(duì)列ID 
  62.     return LOS_OK; 

解讀

● 創(chuàng)建和初始化一個(gè)LosQueueCB

● 動(dòng)態(tài)分配內(nèi)存來保存消息內(nèi)容,LOS_MemAlloc(m_aucSysMem1, (UINT32)len * msgSize);

● msgSize = maxMsgSize + sizeof(UINT32);頭四個(gè)字節(jié)放消息的長度,但消息最大長度不能超過maxMsgSize

● readWriteableCnt記錄讀/寫隊(duì)列的數(shù)量,獨(dú)立計(jì)算

● readWriteList掛的是等待讀取隊(duì)列的任務(wù)鏈表 將在OsTaskWait(&queueCB->readWriteList[readWrite], timeout, TRUE);中將任務(wù)掛到鏈表上.

關(guān)鍵函數(shù)OsQueueOperate

隊(duì)列的讀寫都要經(jīng)過 OsQueueOperate

  1. /************************************************ 
  2. 隊(duì)列操作.是讀是寫由operateType定 
  3. 本函數(shù)是消息隊(duì)列最重要的一個(gè)函數(shù),可以分析出讀取消息過程中 
  4. 發(fā)生的細(xì)節(jié),涉及任務(wù)的喚醒和阻塞,阻塞鏈表任務(wù)的相互提醒. 
  5. ************************************************/ 
  6. UINT32 OsQueueOperate(UINT32 queueID, UINT32 operateType, VOID *bufferAddr, UINT32 *bufferSize, UINT32 timeout) 
  7.     LosQueueCB *queueCB = NULL
  8.     LosTaskCB *resumedTask = NULL
  9.     UINT32 ret; 
  10.     UINT32 readWrite = OS_QUEUE_READ_WRITE_GET(operateType);//獲取讀/寫操作標(biāo)識(shí) 
  11.     UINT32 intSave; 
  12.  
  13.     SCHEDULER_LOCK(intSave); 
  14.     queueCB = (LosQueueCB *)GET_QUEUE_HANDLE(queueID);//獲取對(duì)應(yīng)的隊(duì)列控制塊 
  15.     ret = OsQueueOperateParamCheck(queueCB, queueID, operateType, bufferSize);//參數(shù)檢查 
  16.     if (ret != LOS_OK) { 
  17.         goto QUEUE_END; 
  18.     } 
  19.  
  20.     if (queueCB->readWriteableCnt[readWrite] == 0) {//根據(jù)readWriteableCnt判斷隊(duì)列是否有消息讀/寫 
  21.         if (timeout == LOS_NO_WAIT) {//不等待直接退出 
  22.             ret = OS_QUEUE_IS_READ(operateType) ? LOS_ERRNO_QUEUE_ISEMPTY : LOS_ERRNO_QUEUE_ISFULL; 
  23.             goto QUEUE_END; 
  24.         } 
  25.  
  26.         if (!OsPreemptableInSched()) {//不支持搶占式調(diào)度 
  27.             ret = LOS_ERRNO_QUEUE_PEND_IN_LOCK; 
  28.             goto QUEUE_END; 
  29.         } 
  30.         //任務(wù)等待,這里很重要啊,將自己從就緒列表摘除,讓出了CPU并發(fā)起了調(diào)度,并掛在readWriteList[readWrite]上,掛的都等待讀/寫消息的task 
  31.         ret = OsTaskWait(&queueCB->readWriteList[readWrite], timeout, TRUE);//任務(wù)被喚醒后會(huì)回到這里執(zhí)行,什么時(shí)候會(huì)被喚醒?當(dāng)然是有消息的時(shí)候! 
  32.         if (ret == LOS_ERRNO_TSK_TIMEOUT) {//喚醒后如果超時(shí)了,返回讀/寫消息失敗 
  33.             ret = LOS_ERRNO_QUEUE_TIMEOUT; 
  34.             goto QUEUE_END;// 
  35.         } 
  36.     } else { 
  37.         queueCB->readWriteableCnt[readWrite]--;//對(duì)應(yīng)隊(duì)列中計(jì)數(shù)器--,說明一條消息只能被讀/寫一次 
  38.     } 
  39.  
  40.     OsQueueBufferOperate(queueCB, operateType, bufferAddr, bufferSize);//發(fā)起讀或?qū)戧?duì)列操作 
  41.  
  42.     if (!LOS_ListEmpty(&queueCB->readWriteList[!readWrite])) {//如果還有任務(wù)在排著隊(duì)等待讀/寫入消息(當(dāng)時(shí)不能讀/寫的原因有可能當(dāng)時(shí)隊(duì)列滿了==) 
  43.         resumedTask = OS_TCB_FROM_PENDLIST(LOS_DL_LIST_FIRST(&queueCB->readWriteList[!readWrite]));//取出要讀/寫消息的任務(wù) 
  44.         OsTaskWake(resumedTask);//喚醒任務(wù)去讀/寫消息啊 
  45.         SCHEDULER_UNLOCK(intSave); 
  46.         LOS_MpSchedule(OS_MP_CPU_ALL);//讓所有CPU發(fā)出調(diào)度申請(qǐng),因?yàn)楹芸赡苣莻€(gè)要讀/寫消息的隊(duì)列是由其他CPU執(zhí)行 
  47.         LOS_Schedule();//申請(qǐng)調(diào)度 
  48.         return LOS_OK; 
  49.     } else { 
  50.         queueCB->readWriteableCnt[!readWrite]++;//對(duì)應(yīng)隊(duì)列讀/寫中計(jì)數(shù)器++ 
  51.     } 
  52.  
  53. QUEUE_END: 
  54.     SCHEDULER_UNLOCK(intSave); 
  55.     return ret; 

解讀

● queueID 指定操作消息隊(duì)列池中哪個(gè)消息隊(duì)列

● operateType 表示本次是是讀/寫消息

● bufferAddr,bufferSize表示如果讀操作,用buf接走數(shù)據(jù),如果寫操作,將buf寫入隊(duì)列.

● timeout只用于當(dāng)隊(duì)列中沒有讀/寫內(nèi)容時(shí)的等待.

◊ 當(dāng)讀消息時(shí)發(fā)現(xiàn)隊(duì)列中沒有可讀的消息,此時(shí)timeout決定是否將任務(wù)掛入等待讀隊(duì)列任務(wù)鏈表

◊ 當(dāng)寫消息時(shí)發(fā)現(xiàn)隊(duì)列中沒有空間用于寫的消息,此時(shí)timeout決定是否將任務(wù)掛入等待寫隊(duì)列任務(wù)鏈表

● if (!LOS_ListEmpty(&queueCB->readWriteList[!readWrite]))最有意思的是這行代碼.

◊ 在一次讀消息完成后會(huì)立即喚醒寫隊(duì)列任務(wù)鏈表的任務(wù),因?yàn)樽x完了就有了剩余空間,等待寫隊(duì)列的任務(wù)往往是因?yàn)闆]有空間而進(jìn)入等待狀態(tài).

◊ 在一次寫消息完成后會(huì)立即喚醒讀隊(duì)列任務(wù)鏈表的任務(wù),因?yàn)閷懲炅岁?duì)列就有了新消息,等待讀隊(duì)列的任務(wù)往往是因?yàn)殛?duì)列中沒有消息而進(jìn)入等待狀態(tài).

編程實(shí)例

創(chuàng)建一個(gè)隊(duì)列,兩個(gè)任務(wù)。任務(wù)1調(diào)用寫隊(duì)列接口發(fā)送消息,任務(wù)2通過讀隊(duì)列接口接收消息。

● 通過LOS_TaskCreate創(chuàng)建任務(wù)1和任務(wù)2。

● 通過LOS_QueueCreate創(chuàng)建一個(gè)消息隊(duì)列。

● 在任務(wù)1 send_Entry中發(fā)送消息。

● 在任務(wù)2 recv_Entry中接收消息。

● 通過LOS_QueueDelete刪除隊(duì)列。

  1. #include "los_task.h" 
  2. #include "los_queue.h" 
  3.  
  4. static UINT32 g_queue; 
  5. #define BUFFER_LEN 50 
  6.  
  7. VOID send_Entry(VOID) 
  8.     UINT32 i = 0; 
  9.     UINT32 ret = 0; 
  10.     CHAR abuf[] = "test is message x"
  11.     UINT32 len = sizeof(abuf); 
  12.  
  13.     while (i < 5) { 
  14.         abuf[len -2] = '0' + i; 
  15.         i++; 
  16.  
  17.         ret = LOS_QueueWriteCopy(g_queue, abuf, len, 0); 
  18.         if(ret != LOS_OK) { 
  19.             dprintf("send message failure, error: %x\n", ret); 
  20.         } 
  21.  
  22.         LOS_TaskDelay(5); 
  23.     } 
  24.  
  25. VOID recv_Entry(VOID) 
  26.     UINT32 ret = 0; 
  27.     CHAR readBuf[BUFFER_LEN] = {0}; 
  28.     UINT32 readLen = BUFFER_LEN; 
  29.  
  30.     while (1) { 
  31.         ret = LOS_QueueReadCopy(g_queue, readBuf, &readLen, 0); 
  32.         if(ret != LOS_OK) { 
  33.             dprintf("recv message failure, error: %x\n", ret); 
  34.             break; 
  35.         } 
  36.  
  37.         dprintf("recv message: %s\n", readBuf); 
  38.         LOS_TaskDelay(5); 
  39.     } 
  40.  
  41.     while (LOS_OK != LOS_QueueDelete(g_queue)) { 
  42.         LOS_TaskDelay(1); 
  43.     } 
  44.  
  45.     dprintf("delete the queue success!\n"); 
  46.  
  47. UINT32 Example_CreateTask(VOID) 
  48.     UINT32 ret = 0;  
  49.     UINT32 task1, task2; 
  50.     TSK_INIT_PARAM_S initParam; 
  51.  
  52.     initParam.pfnTaskEntry = (TSK_ENTRY_FUNC)send_Entry; 
  53.     initParam.usTaskPrio = 9; 
  54.     initParam.uwStackSize = LOS_TASK_MIN_STACK_SIZE; 
  55.     initParam.pcName = "sendQueue"
  56. #ifdef LOSCFG_KERNEL_SMP 
  57.     initParam.usCpuAffiMask = CPUID_TO_AFFI_MASK(ArchCurrCpuid()); 
  58. #endif 
  59.     initParam.uwResved = LOS_TASK_STATUS_DETACHED; 
  60.  
  61.     LOS_TaskLock(); 
  62.     ret = LOS_TaskCreate(&task1, &initParam); 
  63.     if(ret != LOS_OK) { 
  64.         dprintf("create task1 failed, error: %x\n", ret); 
  65.         return ret; 
  66.     } 
  67.  
  68.     initParam.pcName = "recvQueue"
  69.     initParam.pfnTaskEntry = (TSK_ENTRY_FUNC)recv_Entry; 
  70.     ret = LOS_TaskCreate(&task2, &initParam); 
  71.     if(ret != LOS_OK) { 
  72.         dprintf("create task2 failed, error: %x\n", ret); 
  73.         return ret; 
  74.     } 
  75.  
  76.     ret = LOS_QueueCreate("queue", 5, &g_queue, 0, BUFFER_LEN); 
  77.     if(ret != LOS_OK) { 
  78.         dprintf("create queue failure, error: %x\n", ret); 
  79.     } 
  80.  
  81.     dprintf("create the queue success!\n"); 
  82.     LOS_TaskUnlock(); 
  83.     return ret; 

結(jié)果驗(yàn)證

  1. create the queue success! 
  2. recv message: test is message 0 
  3. recv message: test is message 1 
  4. recv message: test is message 2 
  5. recv message: test is message 3 
  6. recv message: test is message 4 
  7. recv message failure, error: 200061d 
  8. delete the queue success! 

總結(jié)

● 消息隊(duì)列解決任務(wù)間大數(shù)據(jù)的傳遞

● 以一種異步,解耦的方式實(shí)現(xiàn)任務(wù)通訊

● 全局由消息隊(duì)列池統(tǒng)一管理

● 在創(chuàng)建消息隊(duì)列時(shí)申請(qǐng)內(nèi)存塊存儲(chǔ)消息內(nèi)存.

● 讀/寫操作統(tǒng)一管理,分開執(zhí)行,A任務(wù) 讀/寫完消息后會(huì)立即喚醒等待寫/讀的B任務(wù).

想了解更多內(nèi)容,請(qǐng)?jiān)L問:

51CTO和華為官方合作共建的鴻蒙技術(shù)社區(qū)

https://harmonyos.51cto.com

 

責(zé)任編輯:jianghua 來源: 鴻蒙社區(qū)
相關(guān)推薦

2021-06-01 09:38:19

消息隊(duì)列核心系統(tǒng)下游系統(tǒng)

2017-06-19 13:36:12

Linux進(jìn)程消息隊(duì)列

2021-06-04 14:15:10

鴻蒙HarmonyOS應(yīng)用

2021-04-08 09:32:17

鴻蒙HarmonyOS應(yīng)用

2021-03-11 11:14:39

鴻蒙HarmonyOS應(yīng)用

2021-09-22 14:36:32

鴻蒙HarmonyOS應(yīng)用

2024-07-10 17:51:47

2021-04-12 18:14:56

鴻蒙HarmonyOS應(yīng)用開發(fā)

2022-09-02 08:23:12

軟件開發(fā)解耦架構(gòu)

2024-06-11 00:00:05

RabbitMQAMQP協(xié)議

2022-03-11 20:23:14

鴻蒙源碼分析進(jìn)程管理

2022-03-31 16:26:49

鴻蒙源碼分析進(jìn)程管理

2022-03-03 18:28:28

Harmony進(jìn)程任務(wù)管理模塊

2019-05-13 10:00:41

Linux進(jìn)程間通信命令

2021-01-22 09:47:22

鴻蒙HarmonyOS應(yīng)用開發(fā)

2021-03-15 15:18:16

鴻蒙HarmonyOS應(yīng)用

2021-05-12 09:45:20

鴻蒙HarmonyOS應(yīng)用

2009-12-11 09:42:54

Linux內(nèi)核源碼進(jìn)程調(diào)度

2009-12-11 09:47:23

Linux內(nèi)核源碼進(jìn)程調(diào)度

2024-03-22 12:10:39

Redis消息隊(duì)列數(shù)據(jù)庫
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)