鴻蒙輕內(nèi)核M核源碼分析系列三數(shù)據(jù)結(jié)構(gòu)-任務(wù)排序鏈表
51CTO和華為官方合作共建的鴻蒙技術(shù)社區(qū)
在鴻蒙輕內(nèi)核源碼分析系列一和系列二,我們分析了雙向循環(huán)鏈表、優(yōu)先級就緒隊(duì)列的源碼。本文會繼續(xù)給讀者介紹鴻蒙輕內(nèi)核源碼中重要的數(shù)據(jù)結(jié)構(gòu):任務(wù)排序鏈表TaskSortLinkAttr。鴻蒙輕內(nèi)核的任務(wù)排序鏈表,用于任務(wù)延遲到期/超時喚醒等業(yè)務(wù)場景,是一個非常重要、非?;A(chǔ)的數(shù)據(jù)結(jié)構(gòu)。
1 任務(wù)排序鏈表
我們先看下任務(wù)排序鏈接的數(shù)據(jù)結(jié)構(gòu)。任務(wù)排序鏈表是一個環(huán)狀的雙向鏈表數(shù)組,任務(wù)排序鏈表屬性結(jié)構(gòu)體TaskSortLinkAttr作為雙向鏈表的頭結(jié)點(diǎn),指向雙向鏈表數(shù)組的第一個元素,還維護(hù)游標(biāo)信息,記錄當(dāng)前的位置信息。我們先看下排序鏈表屬性的結(jié)構(gòu)體的定義。
1.1 任務(wù)排序鏈表屬性結(jié)構(gòu)體定義
在kernel\include\los_task.h頭文件中定義了排序鏈表屬性的結(jié)構(gòu)體TaskSortLinkAttr。該結(jié)構(gòu)體定義了排序鏈表的頭節(jié)點(diǎn)LOS_DL_LIST *sortLink,游標(biāo)UINT16 cursor,還有一個保留字段,暫時沒有使用。
源碼如下:
- typedef struct {
- LOS_DL_LIST *sortLink;
- UINT16 cursor;
- UINT16 reserved;
- } TaskSortLinkAttr;
在文件kernel\src\los_task.c中定義了排序鏈表屬性結(jié)構(gòu)體TaskSortLinkAttr類型的全局變量g_taskSortLink,該全局變量的成員變量sortLink作為排序鏈表的頭結(jié)點(diǎn),指向一個長度為32的環(huán)狀的雙向鏈表數(shù)組,成員變量cursor作為游標(biāo)記錄環(huán)狀數(shù)組的當(dāng)前游標(biāo)位置。源代碼如下。
- LITE_OS_SEC_BSS TaskSortLinkAttr g_taskSortLink;
我們使用示意圖來講述一下。任務(wù)排序鏈表是環(huán)狀雙向鏈表數(shù)組,長度為32,每一個元素是一個雙向鏈表,掛載任務(wù)LosTaskCB的鏈表節(jié)點(diǎn)timerList。任務(wù)LosTaskCB的成員變量idxRollNum記錄數(shù)組的索引和滾動數(shù)。全局變量g_taskSortLink的成員變量cursor記錄當(dāng)前游標(biāo)位置,每過一個Tick,游標(biāo)指向下一個位置,轉(zhuǎn)一輪需要32 ticks。當(dāng)運(yùn)行到的數(shù)組位置,雙向鏈表不為空,則把第一個節(jié)點(diǎn)維護(hù)的滾動數(shù)減1。這樣的數(shù)據(jù)結(jié)構(gòu)類似鐘表表盤,也稱為時間輪。
我們舉個例子來說明,基于時間輪實(shí)現(xiàn)的任務(wù)排序鏈表是如何管理任務(wù)延遲超時的。假如當(dāng)前游標(biāo)cursor為1,當(dāng)一個任務(wù)需要延時72 ticks,72=2*32+8,表示排序索引sortIndex為8,滾動數(shù)rollNum為2。會把任務(wù)插入數(shù)組索引為sortIndex+cursor=9的雙向鏈表位置,索要9處的雙向鏈表維護(hù)節(jié)點(diǎn)的滾動為2。隨著Tick時間的進(jìn)行,從當(dāng)前游標(biāo)位置運(yùn)行到數(shù)組索引位置9,歷時8 ticks。運(yùn)行到9時,如果滾動數(shù)大于0,則把滾動數(shù)減1。等再運(yùn)行2輪,共需要72 ticks,任務(wù)就會延遲到期,可以從排序鏈表移除。每個數(shù)組元素對應(yīng)的雙向鏈表的第一個鏈表節(jié)點(diǎn)的滾動數(shù)表示需要轉(zhuǎn)多少輪,節(jié)點(diǎn)任務(wù)才到期。第二個鏈表節(jié)點(diǎn)的滾動數(shù)需要加上第一個節(jié)點(diǎn)的滾動數(shù),表示第二個節(jié)點(diǎn)需要轉(zhuǎn)的輪數(shù)。依次類推。
示意圖如下:
1.2 任務(wù)排序鏈表宏定義
在OS_TSK_SORTLINK_LEN頭文件中定義了一些和任務(wù)排序鏈表相關(guān)的宏定義。延遲任務(wù)雙向鏈表數(shù)組的長度定義為32,高階bit位位數(shù)為5,低階bit位位數(shù)為27。對于任務(wù)的超時時間,取其高27位作為滾動數(shù),低5位作為數(shù)組索引。
源碼如下:
- /**
- * 延遲任務(wù)雙向鏈表數(shù)組的數(shù)量(桶的數(shù)量):32
- */
- #define OS_TSK_SORTLINK_LEN 32
- /**
- * 高階bit位數(shù)目:5
- */
- #define OS_TSK_HIGH_BITS 5U
- /**
- * 低階bit位數(shù)目:27
- */
- #define OS_TSK_LOW_BITS (32U - OS_TSK_HIGH_BITS)
- /**
- * 滾動數(shù)最大值:0xFFFF FFDF,1111 0111 1111 1111 1111 1111 1101 1111
- */
- #define OS_TSK_MAX_ROLLNUM (0xFFFFFFFFU - OS_TSK_SORTLINK_LEN)
- /**
- * 任務(wù)延遲時間數(shù)的位寬:5
- */
- #define OS_TSK_SORTLINK_LOGLEN 5
- /**
- * 延遲任務(wù)的桶編號的掩碼:31、0001 1111
- */
- #define OS_TSK_SORTLINK_MASK (OS_TSK_SORTLINK_LEN - 1U)
- /**
- * 滾動數(shù)的高階掩碼:1111 1000 0000 0000 0000 0000 0000 0000
- */
- #define OS_TSK_HIGH_BITS_MASK (OS_TSK_SORTLINK_MASK << OS_TSK_LOW_BITS)
- /**
- * 滾動數(shù)的低階掩碼:0000 0111 1111 1111 1111 1111 1111 1111
- */
- #define OS_TSK_LOW_BITS_MASK (~OS_TSK_HIGH_BITS_MASK)
2 任務(wù)排序鏈表操作
我們分析下任務(wù)排序鏈表的操作,包含初始化,插入,刪除,滾動數(shù)更新,獲取下一個到期時間等。
2.1 初始化排序鏈表
在系系統(tǒng)內(nèi)核初始化啟動階段,在函數(shù)UINT32 OsTaskInit(VOID)中初始化任務(wù)排序鏈表。該函數(shù)的調(diào)用關(guān)系如下,main.c:main() --> kernel\src\los_init.c:LOS_KernelInit() --> kernel\src\los_task.c:OsTaskInit()。
初始化排序鏈表函數(shù)的源碼如下:
- LITE_OS_SEC_TEXT_INIT UINT32 OsTaskInit(VOID)
- {
- UINT32 size;
- UINT32 index;
- LOS_DL_LIST *listObject = NULL;
- ......
- ⑴ size = sizeof(LOS_DL_LIST) * OS_TSK_SORTLINK_LEN;
- listObject = (LOS_DL_LIST *)LOS_MemAlloc(m_aucSysMem0, size);
- ⑵ if (listObject == NULL) {
- (VOID)LOS_MemFree(m_aucSysMem0, g_taskCBArray);
- return LOS_ERRNO_TSK_NO_MEMORY;
- }
- ⑶ (VOID)memset_s((VOID *)listObject, size, 0, size);
- ⑷ g_taskSortLink.sortLink = listObject;
- g_taskSortLink.cursor = 0;
- for (index = 0; index < OS_TSK_SORTLINK_LEN; index++, listObject++) {
- ⑸ LOS_ListInit(listObject);
- }
- return LOS_OK;
- }
⑴處代碼計(jì)算需要申請的雙向鏈表的內(nèi)存大小,OS_TSK_SORTLINK_LEN為32,即需要為32個雙向鏈表節(jié)點(diǎn)申請內(nèi)存空間。然后申請內(nèi)存,⑵處申請內(nèi)存失敗時返回相應(yīng)錯誤碼。⑶處初始化申請的內(nèi)存區(qū)域?yàn)?等。⑷處把申請的雙向鏈表節(jié)點(diǎn)賦值給g_taskSortLink的鏈表節(jié)點(diǎn).sortLink,作為排序鏈表的頭節(jié)點(diǎn),游標(biāo).cursor初始化為0。然后⑸處的循環(huán),調(diào)用LOS_ListInit()函數(shù)把雙向鏈表數(shù)組每個元素都初始化為雙向循環(huán)鏈表。
2.2 插入排序鏈表
插入排序鏈表的函數(shù)為OsTaskAdd2TimerList()。在任務(wù)等待互斥鎖/信號量等資源時,都需要調(diào)用該函數(shù)將任務(wù)加入到對應(yīng)的排序鏈表中。該函數(shù)包含兩個入?yún)?,第一個參數(shù)LosTaskCB *taskCB用于指定要延遲的任務(wù),第二個參數(shù)UINT32 timeout指定超時等待時間。
源碼如下:
- LITE_OS_SEC_TEXT VOID OsTaskAdd2TimerList(LosTaskCB *taskCB, UINT32 timeout)
- {
- LosTaskCB *taskDelay = NULL;
- LOS_DL_LIST *listObject = NULL;
- UINT32 sortIndex;
- UINT32 rollNum;
- ⑴ sortIndex = timeout & OS_TSK_SORTLINK_MASK;
- rollNum = (timeout >> OS_TSK_SORTLINK_LOGLEN);
- ⑵ (sortIndex > 0) ? 0 : (rollNum--);
- ⑶ EVALUATE_L(taskCB->idxRollNum, rollNum);
- ⑷ sortIndex = (sortIndex + g_taskSortLink.cursor);
- sortIndex = sortIndex & OS_TSK_SORTLINK_MASK;
- ⑸ EVALUATE_H(taskCB->idxRollNum, sortIndex);
- ⑹ listObject = g_taskSortLink.sortLink + sortIndex;
- ⑺ if (listObject->pstNext == listObject) {
- LOS_ListTailInsert(listObject, &taskCB->timerList);
- } else {
- ⑻ taskDelay = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList);
- do {
- ⑼ if (UWROLLNUM(taskDelay->idxRollNum) <= UWROLLNUM(taskCB->idxRollNum)) {
- UWROLLNUMSUB(taskCB->idxRollNum, taskDelay->idxRollNum);
- } else {
- ⑽ UWROLLNUMSUB(taskDelay->idxRollNum, taskCB->idxRollNum);
- break;
- }
- ⑾ taskDelay = LOS_DL_LIST_ENTRY(taskDelay->timerList.pstNext, LosTaskCB, timerList);
- } while (&taskDelay->timerList != (listObject));
- ⑿ LOS_ListTailInsert(&taskDelay->timerList, &taskCB->timerList);
- }
- }
⑴處代碼計(jì)算等待時間timeout的低5位作為數(shù)組索引,高27位作為滾動數(shù)rollNum。這2行代碼數(shù)學(xué)上的意義,就是把等待時間處于32得到的商作為滾動數(shù),余數(shù)作為數(shù)組索引。⑵處代碼,如果余數(shù)為0,可以整除時,滾動數(shù)減1。減1設(shè)計(jì)的原因是,在函數(shù)VOID OsTaskScan(VOID)中,每一個tick到來時,如果滾動數(shù)大于0,滾動數(shù)減1,并繼續(xù)滾動一圈。后文會分析該函數(shù)VOID OsTaskScan(VOID)。
⑶處代碼把滾動數(shù)賦值給任務(wù)taskCB->idxRollNum的低27位。⑷處把數(shù)組索引加上游標(biāo),然后執(zhí)行⑸賦值給任務(wù)taskCB->idxRollNum的高5位。⑹根據(jù)數(shù)組索引獲取雙向鏈表頭結(jié)點(diǎn),⑺如果此處雙向鏈表為空,直接插入鏈表里。如果鏈表不為空,執(zhí)行⑻獲取第一個鏈表節(jié)點(diǎn)對應(yīng)的任務(wù)taskDelay,然后遍歷循環(huán)雙向鏈表,把任務(wù)插入到合適的位置。⑼處如果待插入任務(wù)taskCB的滾動數(shù)大于等于當(dāng)前鏈表節(jié)點(diǎn)對應(yīng)任務(wù)的滾動數(shù),則從待插入任務(wù)taskCB的滾動數(shù)中減去當(dāng)前鏈表節(jié)點(diǎn)對應(yīng)任務(wù)的滾動數(shù),然后執(zhí)行⑾獲取下一個節(jié)點(diǎn)繼續(xù)遍歷。⑽處如果待插入任務(wù)taskCB的滾動數(shù)小于當(dāng)前鏈表節(jié)點(diǎn)對應(yīng)任務(wù)的滾動數(shù),則從當(dāng)前鏈表節(jié)點(diǎn)對應(yīng)任務(wù)的滾動數(shù)中減去待插入任務(wù)taskCB的滾動數(shù),然后跳出循環(huán)。執(zhí)行⑿,完成任務(wù)插入。插入過程,可以結(jié)合上文的示意圖進(jìn)行理解。
2.3 從排序鏈表中刪除
從排序鏈表中刪除的函數(shù)為VOID OsTimerListDelete(LosTaskCB *taskCB)。在任務(wù)恢復(fù)/刪除等場景中,需要調(diào)用該函數(shù)將任務(wù)從任務(wù)排序鏈表中刪除。該函數(shù)包含一個參數(shù)LosTaskCB *taskCB,用于指定要從排序鏈表中刪除的任務(wù)。
源碼如下:
- LITE_OS_SEC_TEXT VOID OsTimerListDelete(LosTaskCB *taskCB)
- {
- LOS_DL_LIST *listObject = NULL;
- LosTaskCB *nextTask = NULL;
- UINT32 sortIndex;
- ⑴ sortIndex = UWSORTINDEX(taskCB->idxRollNum);
- ⑵ listObject = g_taskSortLink.sortLink + sortIndex;
- ⑶ if (listObject != taskCB->timerList.pstNext) {
- ⑷ nextTask = LOS_DL_LIST_ENTRY(taskCB->timerList.pstNext, LosTaskCB, timerList);
- UWROLLNUMADD(nextTask->idxRollNum, taskCB->idxRollNum);
- }
- ⑸ LOS_ListDelete(&taskCB->timerList);
- }
⑴處代碼獲取待從排序鏈表中刪除的任務(wù)對應(yīng)的數(shù)字索引。⑵處代碼獲取排序鏈表的頭節(jié)點(diǎn)listObject。⑶處代碼判斷待刪除節(jié)點(diǎn)是否是最后一個節(jié)點(diǎn),如果不是最后一個節(jié)點(diǎn),執(zhí)行執(zhí)行⑷處代碼獲取待刪除節(jié)點(diǎn)的下一個節(jié)點(diǎn)對應(yīng)的任務(wù)nextTask,在下一個節(jié)點(diǎn)的滾動數(shù)中增加待刪除節(jié)點(diǎn)的滾動數(shù),然后執(zhí)行⑸處代碼執(zhí)行刪除操作。如果是最后一個節(jié)點(diǎn),直接執(zhí)行⑸處代碼刪除該節(jié)點(diǎn)即可。
2.4 獲取下一個超時到期時間
獲取下一個超時到期時間的函數(shù)為OsTaskNextSwitchTimeGet(),我們分析下其代碼。
源碼如下:
- UINT32 OsTaskNextSwitchTimeGet(VOID)
- {
- LosTaskCB *taskCB = NULL;
- UINT32 taskSortLinkTick = LOS_WAIT_FOREVER;
- LOS_DL_LIST *listObject = NULL;
- UINT32 tempTicks;
- UINT32 index;
- ⑴ for (index = 0; index < OS_TSK_SORTLINK_LEN; index++) {
- ⑵ listObject = g_taskSortLink.sortLink + ((g_taskSortLink.cursor + index) % OS_TSK_SORTLINK_LEN);
- ⑶ if (!LOS_ListEmpty(listObject)) {
- ⑷ taskCB = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList);
- ⑸ tempTicks = (index == 0) ? OS_TSK_SORTLINK_LEN : index;
- ⑹ tempTicks += (UINT32)(UWROLLNUM((UINT32)taskCB->idxRollNum) * OS_TSK_SORTLINK_LEN);
- ⑺ if (taskSortLinkTick > tempTicks) {
- taskSortLinkTick = tempTicks;
- }
- }
- }
- return taskSortLinkTick;
- }
⑴處代碼循環(huán)遍歷雙向鏈表數(shù)組,⑵處代碼從當(dāng)前游標(biāo)位置開始獲取排序鏈表的頭節(jié)點(diǎn)listObject。⑶處代碼判斷排序鏈表是否為空,如果排序鏈表為空,則繼續(xù)遍歷下一個數(shù)組。如果鏈表不為空,⑷處代碼獲取排序鏈表的第一個鏈表節(jié)點(diǎn)對應(yīng)的任務(wù)。⑸處如果遍歷的數(shù)字索引為0,tick數(shù)目使用32,否則使用具體的數(shù)字索引。⑹處獲取任務(wù)的滾動數(shù),計(jì)算出需要的等待時間,加上⑸處計(jì)算出的不足滾動一圈的時間。⑺處計(jì)算出需要等待的最小時間,即下一個最快到期的時間。
3 排序鏈表和Tick時間的關(guān)系
任務(wù)加入到排序鏈表后,時間一個tick一個tick的逝去,排序鏈表中的滾動數(shù)該如何更新呢?
時間每走過一個tick,系統(tǒng)就會調(diào)用Tick中斷的處理函數(shù)OsTickHandler(),該函數(shù)在kernel\src\los_tick.c文件中實(shí)現(xiàn)。下面是該函數(shù)的代碼片段,⑴處代碼分別任務(wù)的超時到期情況。
- LITE_OS_SEC_TEXT VOID OsTickHandler(VOID)
- {
- #if (LOSCFG_BASE_CORE_TICK_HW_TIME == 1)
- platform_tick_handler();
- #endif
- g_ullTickCount++;
- #if (LOSCFG_BASE_CORE_TIMESLICE == 1)
- OsTimesliceCheck();
- #endif
- ⑴ OsTaskScan(); // task timeout scan
- #if (LOSCFG_BASE_CORE_SWTMR == 1)
- (VOID)OsSwtmrScan();
- #endif
- }
詳細(xì)分析下函數(shù)OsTaskScan(),來了解排序鏈表和tick時間的關(guān)系。函數(shù)在kernel\base\los_task.c文件中實(shí)現(xiàn),代碼片段如下:
- LITE_OS_SEC_TEXT VOID OsTaskScan(VOID)
- {
- LosTaskCB *taskCB = NULL;
- BOOL needSchedule = FALSE;
- LOS_DL_LIST *listObject = NULL;
- UINT16 tempStatus;
- UINTPTR intSave;
- intSave = LOS_IntLock();
- ⑴ g_taskSortLink.cursor = (g_taskSortLink.cursor + 1) % OS_TSK_SORTLINK_LEN;
- listObject = g_taskSortLink.sortLink + g_taskSortLink.cursor;
- ⑵ if (listObject->pstNext == listObject) {
- LOS_IntRestore(intSave);
- return;
- }
- ⑶ for (taskCB = LOS_DL_LIST_ENTRY((listObject)->pstNext, LosTaskCB, timerList);
- &taskCB->timerList != (listObject);) {
- tempStatus = taskCB->taskStatus;
- ⑷ if (UWROLLNUM(taskCB->idxRollNum) > 0) {
- UWROLLNUMDEC(taskCB->idxRollNum);
- break;
- }
- ⑸ LOS_ListDelete(&taskCB->timerList);
- ⑹ if (tempStatus & OS_TASK_STATUS_PEND) {
- taskCB->taskStatus &= ~(OS_TASK_STATUS_PEND);
- LOS_ListDelete(&taskCB->pendList);
- taskCB->taskSem = NULL;
- taskCB->taskMux = NULL;
- }
- ⑺ else if (tempStatus & OS_TASK_STATUS_EVENT) {
- taskCB->taskStatus &= ~(OS_TASK_STATUS_EVENT);
- }
- ⑻ else if (tempStatus & OS_TASK_STATUS_PEND_QUEUE) {
- LOS_ListDelete(&taskCB->pendList);
- taskCB->taskStatus &= ~(OS_TASK_STATUS_PEND_QUEUE);
- } else {
- taskCB->taskStatus &= ~(OS_TASK_STATUS_DELAY);
- }
- ⑼ if (!(tempStatus & OS_TASK_STATUS_SUSPEND)) {
- taskCB->taskStatus |= OS_TASK_STATUS_READY;
- OsHookCall(LOS_HOOK_TYPE_MOVEDTASKTOREADYSTATE, taskCB);
- OsPriqueueEnqueue(&taskCB->pendList, taskCB->priority);
- needSchedule = TRUE;
- }
- if (listObject->pstNext == listObject) {
- break;
- }
- taskCB = LOS_DL_LIST_ENTRY(listObject->pstNext, LosTaskCB, timerList);
- }
- LOS_IntRestore(intSave);
- ⑽ if (needSchedule) {
- LOS_Schedule();
- }
- }
⑴處代碼更新全局變量g_taskSortLink的游標(biāo),指向雙向鏈表數(shù)組下一個位置,然后獲取該位置的雙向鏈表頭結(jié)點(diǎn)listObject。⑵如果鏈表為空,則返回。如果雙向鏈表不為空,則執(zhí)行⑶循環(huán)遍歷每一個鏈表節(jié)點(diǎn)。⑷處如果鏈表節(jié)點(diǎn)的滾動數(shù)大于0,則滾動數(shù)減1,說明任務(wù)還需要繼續(xù)等待一輪。如果鏈表節(jié)點(diǎn)的滾動數(shù)等于0,說明任務(wù)超時到期,執(zhí)行⑸從排序鏈表中刪除。接下來需要根據(jù)任務(wù)狀態(tài)分別處理,⑹處如果代碼是阻塞狀態(tài),取消阻塞狀態(tài),并從阻塞鏈表中刪除。⑺處如果任務(wù)阻塞在事件中,取消阻塞狀態(tài)。⑻如果任務(wù)阻塞在隊(duì)列,從阻塞鏈表中刪除,取消阻塞狀態(tài),如果不是上述狀態(tài),取消延遲狀態(tài)OS_TASK_STATUS_DELAY。⑼處如果代碼是掛起狀態(tài),設(shè)置任務(wù)為就緒狀態(tài),加入任務(wù)就緒隊(duì)列,設(shè)置需要重新調(diào)度標(biāo)記。⑽如果設(shè)置需要重新調(diào)度,調(diào)用調(diào)度函數(shù)觸發(fā)任務(wù)調(diào)度。
小結(jié)
掌握鴻蒙輕內(nèi)核的排序鏈表TaskSortLinkAttr這一重要的數(shù)據(jù)結(jié)構(gòu),會給進(jìn)一步學(xué)習(xí)、分析鴻蒙輕內(nèi)核源代碼打下了基礎(chǔ),讓后續(xù)的學(xué)習(xí)更加容易。
51CTO和華為官方合作共建的鴻蒙技術(shù)社區(qū)