自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Redis源碼學習之事件驅(qū)動

運維 數(shù)據(jù)庫運維 Redis
Redis基于多路復用技術(shù)實現(xiàn)了一套簡單的事件驅(qū)動庫,代碼在ae.h、ae.c以及ae_epoll.c、ae_evport.c和ae_kqueue.c、ae_select.c這幾個文件中。其中ae表示的是antirez eventloop的意思。

Redis基于多路復用技術(shù)實現(xiàn)了一套簡單的事件驅(qū)動庫,代碼在ae.h、ae.c以及ae_epoll.c、ae_evport.c和ae_kqueue.c、ae_select.c這幾個文件中。其中ae表示的是antirez eventloop的意思。

Redis里面包含兩種事件類型:FileEvent和TimeEvent。

Redis采用IO多路復用技術(shù),所有的事件都是在一個線程中進行處理。Redis的事件驅(qū)動模型可以以以下為代碼進行表示:

  1. int main(int argc,char **argv) 
  2.  
  3.  
  4.     while(true) { 
  5.  
  6.         // 等待事件到來:wait4Event(); 
  7.  
  8.         // 處理事件:processEvent() 
  9.  
  10.     } 
  11.  
  12.  

在一個死循環(huán)中等待事件的到來,然后對事件進行處理,以此往復。這就是一個最經(jīng)典的網(wǎng)絡編程模型。

1.基本數(shù)據(jù)結(jié)構(gòu)

aeEventLoop 

 

aeEventLoop是Redis中事件驅(qū)動模型的核心,封裝了整個事件循環(huán),其中每個字段解釋如下:

  • maxfd:已經(jīng)接受的最大的文件描述符。
  • setsize:當前循環(huán)中所能容納的文件描述符的數(shù)量。
  • timeEventNextId:下一個時間事件的ID.
  • lastTime:上一次被訪問的時間,用來檢測系統(tǒng)時鐘是否被修改。
  • events:指針,指向保存所有注冊的事件的數(shù)組首地址。
  • fired:指針,保存所有已經(jīng)買被觸發(fā)的事件的數(shù)組首地址。
  • timeEventHead:Redis用一個鏈表來存儲所有的時間事件,timeEventHead是指向這個鏈表的首節(jié)點指針。
  • stop:停止整個事件循環(huán)。
  • apiData:指針,指向epoll結(jié)構(gòu)。
  • beforeSleep:函數(shù)指針。每次實現(xiàn)循環(huán)的時候,在阻塞直到時間到來之前,會先調(diào)用這個函數(shù)。

aeFileEvent和aeTimeEvent

這兩個結(jié)構(gòu)分別表示文件事件和時間事件,定義如下

  1. typedef struct aeFileEvent { 
  2.  
  3.     int mask; /* one of AE_(READABLE|WRITABLE) */ 
  4.  
  5.     aeFileProc *rfileProc; // 函數(shù)指針,寫事件處理 
  6.  
  7.     aeFileProc *wfileProc; // 函數(shù)指針,讀事件處理 
  8.  
  9.     void *clientData; // 具體的數(shù)據(jù) 
  10.  
  11. } aeFileEvent;  

其中mask表示文件事件類型掩碼,可以是AE_READABLE表示是可讀事件,AE_WRITABLE為可寫事件。aeFileProc是函數(shù)指針。

  1. /* Time event structure */ 
  2.  
  3. typedef struct aeTimeEvent { 
  4.  
  5.     long long id; // 事件ID 
  6.  
  7.     long when_sec; // 事件觸發(fā)的時間:s 
  8.  
  9.     long when_ms; // 事件觸發(fā)的時間:ms 
  10.  
  11.     aeTimeProc *timeProc; // 函數(shù)指針 
  12.  
  13.     aeEventFinalizerProc *finalizerProc; // 函數(shù)指針:在對應的aeTieEvent節(jié)點被刪除前調(diào)用,可以理解為aeTimeEvent的析構(gòu)函數(shù) 
  14.  
  15.     void *clientData; // 指針,指向具體的數(shù)據(jù) 
  16.  
  17.     struct aeTimeEvent *next; // 指向下一個時間事件指針 
  18.  
  19. } aeTimeEvent;  

aeFiredEvent

aeFiredEvent結(jié)構(gòu)表示一個已經(jīng)被觸發(fā)的事件,結(jié)果如下:

  1. /* A fired event */ 
  2.  
  3. typedef struct aeFiredEvent { 
  4.  
  5.     int fd; // 事件被觸發(fā)的文件描述符 
  6.  
  7.     int mask; // 被觸發(fā)事件的掩碼,表示被觸發(fā)事件的類型 
  8.  
  9. } aeFiredEvent;  

fd表示事件發(fā)生在哪個文件描述符上面,mask用來表示具體事件的類型。

aeApiState

Redis底層采用IO多路復用技術(shù)實現(xiàn)高并發(fā),具體實現(xiàn)可以采用kqueue、select、epoll等技術(shù)。對于Linux來說,epoll的性能要優(yōu)于select,所以以epoll為例來進行分析。

  1. typedef struct aeApiState { 
  2.  
  3.     int epfd; 
  4.  
  5.     struct epoll_event *events; 
  6.  
  7. } aeApiState;  

aeApiState封裝了跟epoll相關(guān)的數(shù)據(jù),epfd保存epoll_create()返回的文件描述符。

具體實現(xiàn)細節(jié)

事件循環(huán)啟動:aeMain()

事件驅(qū)動的啟動代碼位于ae.c的aeMain()函數(shù)中,代碼如下:

 

從aeMain()方法中可以看到,整個事件驅(qū)動是在一個while()循環(huán)中不停地執(zhí)行aeProcessEvents()方法,在這個方法中執(zhí)行從客戶端發(fā)送過來的請求。

初始化:aeCreateEventLoop()

aeEventLoop的初始化是在aeCreateEventLoop()方法中進行的,這個方法是在server.c中的initServer()中調(diào)用的。實現(xiàn)如下:

 

在這個方法中主要就是給aeEventLoop對象分配內(nèi)存然后并進行初始化。其中關(guān)鍵的地方有:

1、調(diào)用aeApiCreate()初始化epoll相關(guān)的數(shù)據(jù)。aeApiCreate()實現(xiàn)如下:

 

在aeApiCreate()方法中主要完成以下三件事:

  1. 分配aeApiState結(jié)構(gòu)需要的內(nèi)存。
  2. 調(diào)用epoll_create()方法生成epoll的文件描述符,并保存在aeApiState.epfd字段中。
  3. 把第一步分配的aeApiState的內(nèi)存地址保存在EventLoop->apidata字段中。

2、初始化events中的mask字段為為AE_NONE。

生成fileEvent:aeCreateFileEvent()

Redis使用aeCreateFileEvent()來生成fileEvent,代碼如下:

 

aeCreateFileEvent()方法主要做了以下三件事:

  1. 檢查新增的fd是否超過所能容納最大值。
  2. 調(diào)用aeApiAddEvent()方法把對應的fd以mask模式添加到epoll監(jiān)聽器中。
  3. 設置相應的字段值。

其中最關(guān)鍵的步驟是第二步,aeApiAddEvent()方法如下:

 

生成timeEvent:aeCreateTimeEvent()

aeCreateTimeEvent()方法主要是用來生成timeEvent節(jié)點,其實現(xiàn)比較簡單,代碼如下所示:

 

處理timeEevnt:processTimeEvents()

Redis在processTimeEvents()方法中來處理所有的timeEvent,實現(xiàn)如下:

  1. static int processTimeEvents(aeEventLoop *eventLoop) { 
  2.  
  3.     int processed = 0; 
  4.  
  5.     aeTimeEvent *te, *prev; 
  6.  
  7.     long long maxId; 
  8.  
  9.     time_t now = time(NULL); 
  10.  
  11.     /** 
  12.  
  13.      * 如果系統(tǒng)時間被調(diào)整到將來某段時間然后又被設置回正確的時間, 
  14.  
  15.      * 這種情況下鏈表中的timeEvent有可能會被隨機的延遲執(zhí)行,因 
  16.  
  17.      * 此在這個情況下把所有的timeEvent的觸發(fā)時間設置為0表示及執(zhí)行 
  18.  
  19.      */ 
  20.  
  21.     if (now < eventLoop->lastTime) { 
  22.  
  23.         te = eventLoop->timeEventHead; 
  24.  
  25.         while(te) { 
  26.  
  27.             te->when_sec = 0; 
  28.  
  29.             te = te->next
  30.  
  31.         } 
  32.  
  33.     } 
  34.  
  35.     eventLoop->lastTime = now; // 設置上次運行時間為now 
  36.  
  37.   
  38.  
  39.     prev = NULL
  40.  
  41.     te = eventLoop->timeEventHead; 
  42.  
  43.     maxId = eventLoop->timeEventNextId-1; 
  44.  
  45.     while(te) { 
  46.  
  47.         long now_sec, now_ms; 
  48.  
  49.         long long id; 
  50.  
  51.         /** 
  52.  
  53.          * 刪除已經(jīng)被標志位 刪除 的時間事件 
  54.  
  55.          */ 
  56.  
  57.         if (te->id == AE_DELETED_EVENT_ID) { 
  58.  
  59.             aeTimeEvent *next = te->next
  60.  
  61.             if (prev == NULL
  62.  
  63.                 eventLoop->timeEventHead = te->next
  64.  
  65.             else 
  66.  
  67.                 prev->next = te->next
  68.  
  69.             if (te->finalizerProc) 
  70.  
  71.                 // 在時間事件節(jié)點被刪除前調(diào)用finlizerProce()方法 
  72.  
  73.                 te->finalizerProc(eventLoop, te->clientData); 
  74.  
  75.             zfree(te); 
  76.  
  77.             te = next
  78.  
  79.             continue
  80.  
  81.         } 
  82.  
  83.         if (te->id > maxId) { 
  84.  
  85.             /** 
  86.  
  87.              * te->id > maxId 表示當前te指向的timeEvent為當前循環(huán)中新添加的, 
  88.  
  89.              * 對于新添加的節(jié)點在本次循環(huán)中不作處理。 
  90.  
  91.              * PS:為什么會出現(xiàn)這種情況呢?有可能是在timeProc()里面會注冊新的timeEvent節(jié)點? 
  92.  
  93.              * 對于當前的Redis版本中不會出現(xiàn)te->id > maxId這種情況 
  94.  
  95.              */ 
  96.  
  97.             te = te->next
  98.  
  99.             continue
  100.  
  101.         } 
  102.  
  103.         aeGetTime(&now_sec, &now_ms); 
  104.  
  105.         if (now_sec > te->when_sec || 
  106.  
  107.             (now_sec == te->when_sec && now_ms >= te->when_ms)) 
  108.  
  109.         { 
  110.  
  111.             // 如果當前時間已經(jīng)超過了對應的timeEvent節(jié)點設置的觸發(fā)時間, 
  112.  
  113.             // 則調(diào)用timeProc()方法執(zhí)行對應的任務 
  114.  
  115.             int retval; 
  116.  
  117.   
  118.  
  119.             id = te->id; 
  120.  
  121.             retval = te->timeProc(eventLoop, id, te->clientData); 
  122.  
  123.             processed++; 
  124.  
  125.             if (retval != AE_NOMORE) { 
  126.  
  127.                 // 要執(zhí)行多次,則計算下次執(zhí)行時間 
  128.  
  129.                 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); 
  130.  
  131.             } else { 
  132.  
  133.                 // 如果只需要執(zhí)行一次,則把id設置為-1,再下次循環(huán)中刪除 
  134.  
  135.                 te->id = AE_DELETED_EVENT_ID; 
  136.  
  137.             } 
  138.  
  139.         } 
  140.  
  141.         prev = te; 
  142.  
  143.         te = te->next
  144.  
  145.     } 
  146.  
  147.     return processed; 
  148.  
  149.  

在這個方法中會

  1. 判斷系統(tǒng)時間有沒有調(diào)整過,如果調(diào)整過,則會把timeEvent鏈表中的所有的timeEvent的觸發(fā)時間設置為0,表示立即執(zhí)行。
  2. 對timeEvent鏈表進行遍歷,對于每個timeEvent節(jié)點,如果有:
    • 返回為AE_NOMORE,表示當前timeEvent節(jié)點屬于一次性事件,標記該節(jié)點ID為AE_DELETED_EVENT_ID,表示刪除節(jié)點,該節(jié)點將會在下一輪的循環(huán)中被刪除。
    • 返回不是AE_NOMORE,表示當前timeEvent節(jié)點屬于周期性事件,需要多次執(zhí)行,調(diào)用aeAddMillisecondsToNow()方法設置下次被執(zhí)行時間。
    • 如果已經(jīng)被標記為刪除(AE_DELETED_EVENT_ID),則立即釋放對應節(jié)點內(nèi)存,遍歷下個節(jié)點。
      • 如果id大于maxId,則表示當前節(jié)點為本次循環(huán)中新增節(jié)點,咋本次循環(huán)中不錯處理,繼續(xù)下個節(jié)點。
      • 如果當前節(jié)點的觸發(fā)時間大于當前時間,則調(diào)用對應節(jié)點的timeProc()方法執(zhí)行任務。根據(jù)timeProc()方法的返回,又分為兩種情況:

處理所有事件:aeProcessEvents()

Redis中所有的事件,包括timeEvent和fileEvent都是在aeProcessEvents()方法中進行處理的,剛方法實現(xiàn)如下:

  1. /* Process every pending time event, then every pending file event 
  2.  
  3. * (that may be registered by time event callbacks just processed). 
  4.  
  5. * Without special flags the function sleeps until some file event 
  6.  
  7. * fires, or when the next time event occurs (if any). 
  8.  
  9.  
  10. * If flags is 0, the function does nothing and returns
  11.  
  12. * if flags has AE_ALL_EVENTS setall the kind of events are processed. 
  13.  
  14. * if flags has AE_FILE_EVENTS set, file events are processed. 
  15.  
  16. * if flags has AE_TIME_EVENTS settime events are processed. 
  17.  
  18. * if flags has AE_DONT_WAIT set the function returns ASAP until all 
  19.  
  20. * the events that's possible to process without to wait are processed. 
  21.  
  22.  
  23. * The function returns the number of events processed. */ 
  24.  
  25. int aeProcessEvents(aeEventLoop *eventLoop, int flags) 
  26.  
  27.  
  28.     int processed = 0, numevents; 
  29.  
  30.     /** 
  31.  
  32.      * 既沒有時間事件也沒有文件事件,則直接返回 
  33.  
  34.      */ 
  35.  
  36.     if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; 
  37.  
  38.     /** 
  39.  
  40.      * -1 == eventloop->maxfd 表示還么有任何aeFileEvent被添加到epoll     
  41.  
  42.      * 事件循環(huán)中進行監(jiān)聽 
  43.  
  44.      */ 
  45.  
  46.     if (eventLoop->maxfd != -1 || 
  47.  
  48.         ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { 
  49.  
  50.         int j; 
  51.  
  52.         aeTimeEvent *shortest = NULL
  53.  
  54.         struct timeval tv, *tvp; 
  55.  
  56.   
  57.  
  58.         /** 
  59.  
  60.          * 如果有aeFileEvent需要處理,就先要從所有待處理的 
  61.  
  62.          * aeTimeEvent事件中找到最近的將要被執(zhí)行的aeTimeEvent節(jié)點 
  63.  
  64.          * 并結(jié)算該節(jié)點觸發(fā)時間 
  65.  
  66.          */ 
  67.  
  68.         if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) 
  69.  
  70.             shortest = aeSearchNearestTimer(eventLoop); 
  71.  
  72.         if (shortest) { 
  73.  
  74.             long now_sec, now_ms; 
  75.  
  76.   
  77.  
  78.             aeGetTime(&now_sec, &now_ms); 
  79.  
  80.             tvp = &tv; 
  81.  
  82.   
  83.  
  84.             /* How many milliseconds we need to wait for the next 
  85.  
  86.              * time event to fire? */ 
  87.  
  88.             // 計算epoll_wait()需要等待的時間 
  89.  
  90.             long long ms = 
  91.  
  92.                 (shortest->when_sec - now_sec)*1000 + 
  93.  
  94.                 shortest->when_ms - now_ms; 
  95.  
  96.   
  97.  
  98.             if (ms > 0) { 
  99.  
  100.                 tvp->tv_sec = ms/1000; 
  101.  
  102.                 tvp->tv_usec = (ms % 1000)*1000; 
  103.  
  104.             } else { 
  105.  
  106.                 tvp->tv_sec = 0; 
  107.  
  108.                 tvp->tv_usec = 0; 
  109.  
  110.             } 
  111.  
  112.         } else { 
  113.  
  114.             // 如果flags設置了AE_DONT_WAIT,則設置epoll_wait()等待時間為0, 
  115.  
  116.             // 即立刻從epoll中返回 
  117.  
  118.             if (flags & AE_DONT_WAIT) { 
  119.  
  120.                 tv.tv_sec = tv.tv_usec = 0; 
  121.  
  122.                 tvp = &tv; 
  123.  
  124.             } else { 
  125.  
  126.                 /* Otherwise we can block */ 
  127.  
  128.                 tvp = NULL; /* wait forever */ 
  129.  
  130.             } 
  131.  
  132.         } 
  133.  
  134.   
  135.  
  136.         // 調(diào)用aeApiPoll()進行阻塞等待事件的到來,等待時間為tvp 
  137.  
  138.         numevents = aeApiPoll(eventLoop, tvp); 
  139.  
  140.         for (j = 0; j < numevents; j++) { 
  141.  
  142.             aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; 
  143.  
  144.             int mask = eventLoop->fired[j].mask; 
  145.  
  146.             int fd = eventLoop->fired[j].fd; 
  147.  
  148.             int rfired = 0; 
  149.  
  150.             /* note the fe->mask & mask & ... code: maybe an already processed 
  151.  
  152.              * event removed an element that fired and we still didn't 
  153.  
  154.              * processed, so we check if the event is still valid. */ 
  155.  
  156.             // fe->mask && mask 的目的是確保對應事件時候還有效 
  157.  
  158.             if (fe->mask & mask & AE_READABLE) { 
  159.  
  160.                 rfired = 1; 
  161.  
  162.                 fe->rfileProc(eventLoop,fd,fe->clientData,mask); 
  163.  
  164.             } 
  165.  
  166.             if (fe->mask & mask & AE_WRITABLE) { 
  167.  
  168.                 if (!rfired || fe->wfileProc != fe->rfileProc) 
  169.  
  170.                     fe->wfileProc(eventLoop,fd,fe->clientData,mask); 
  171.  
  172.             } 
  173.  
  174.             processed++; 
  175.  
  176.         } 
  177.  
  178.     } 
  179.  
  180.     /* Check time events */ 
  181.  
  182.     if (flags & AE_TIME_EVENTS) 
  183.  
  184.         // 處理aeTimeEvent 
  185.  
  186.         processed += processTimeEvents(eventLoop); 
  187.  
  188.   
  189.  
  190.     return processed; /* return the number of processed file/time events */ 
  191.  
  192.  

該方法的入?yún)lag表示要處理哪些事件,可以取以下幾個值 :

  • AE_ALL_EVENTS:timeEvent和fileEvent都會處理。
  • AE_FILE_EVENTS:只處理fileEvent。
  • AE_TIME_EVENTS:只處理timeEvent。
  • AE_DONT_WAIT:要么立馬返回,要么處理完那些不需要等待的事件之后再立馬返回。

aeProcessEvents()方法會做下面幾件事:

  1. 判斷傳入的flag的值,如果既不包含AE_TIME_EVENTS也不包含AE_FILE_EVENTS則直接返回。
  2. 計算如果有aeFileEvent事件需要進行處理,則先計算epoll_wait()方法需要阻塞等待的時間,計算方式如下:
    • 先從aeTimeEvent事件鏈表中找到最近的需要被觸發(fā)的aeTimeEvent節(jié)點并計算需要被觸發(fā)的時間,該被觸發(fā)時間則為epoll_wait()需要等待的時間。
    • 如果沒有找到最近的aeTimeEvent節(jié)點,表示沒有aeTimeEvent節(jié)點被加入鏈表,則判斷傳入的flags是否包含AE_DONT_WAIT選項,則設置epoll_wait()需要等待時間為0,表示立即返回。
    • 如果沒有設置AE_DONT_WAIT,則設置需要等待時間為NULL,表示epoll_wait()一直阻塞等待知道有fileEvent事件到來。
  3. 調(diào)用aeApiPoll()方法阻塞等待事件的到來,阻塞時間為第二步中計算的時間。aeApiPoll()實現(xiàn)見文末:
    • aeApiPoll()會做下面幾件事:
      • 根據(jù)傳入的tvp計算需要阻塞的時間,然后調(diào)用epoll_wait()進行阻塞等待。
      • 有事件到來之后先計算對應事件的類型。
      • 把事件發(fā)生的fd以及對應的類型mask拷貝到fired數(shù)組中。
  4. 從aeApiPoll()方法返回之后,所有事件已經(jīng)就緒了的fd以及對應事件的類型mask已經(jīng)保存在eventLoop->fired[]數(shù)組中。依次遍歷fired數(shù)組,根據(jù)mask類型,執(zhí)行對應的frileProc()或者wfileProce()方法。
  5. 如果傳入的flags中有AE_TIME_EVENTS,則調(diào)用processTimeEvents()執(zhí)行所有已經(jīng)到時間了的timeEvent。

 

本系列

責任編輯:龐桂玉 來源: 數(shù)據(jù)庫開發(fā)
相關(guān)推薦

2015-11-06 13:59:01

JavaScript事件處理

2023-05-31 08:39:04

redis事件驅(qū)動

2011-04-25 09:22:44

C#事件

2017-04-10 13:30:47

Redis數(shù)據(jù)庫命令

2011-08-11 13:53:45

JavaScript

2011-04-26 08:56:31

C#

2017-06-07 14:58:39

Redis源碼學習Redis事務

2021-08-16 07:26:42

服務訂閱機制

2011-08-02 17:58:09

iPhone開發(fā) 事件

2020-12-23 07:37:17

瀏覽器HTML DOM0

2009-08-10 15:26:46

ASP.NET組件編程

2009-08-06 13:08:23

ASP.NET控件開發(fā)

2023-02-07 07:25:36

Spring事件驅(qū)動

2009-06-22 09:23:18

事件監(jiān)聽器

2019-04-19 21:06:23

事件驅(qū)動架構(gòu)VANTIQ

2009-06-25 14:05:08

Ajax JSF

2023-07-12 08:30:52

服務架構(gòu)事件驅(qū)動架構(gòu)

2009-08-14 18:00:22

C#Windows應用

2009-02-11 11:14:31

事務管理事務開始Spring

2009-02-11 13:08:29

事務提交事務管理Spring
點贊
收藏

51CTO技術(shù)棧公眾號