自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Redis 如何高效實(shí)現(xiàn)定時(shí)任務(wù)

網(wǎng)絡(luò) Redis
Redis通過(guò)單線程結(jié)合非阻塞事件輪詢機(jī)制實(shí)現(xiàn)高效的網(wǎng)絡(luò)IO和時(shí)間事件處理,這篇文章我們將從源碼的角度深入分析一下redis時(shí)間事件的設(shè)計(jì)與實(shí)現(xiàn)。

Redis通過(guò)單線程結(jié)合非阻塞事件輪詢機(jī)制實(shí)現(xiàn)高效的網(wǎng)絡(luò)IO和時(shí)間事件處理,這篇文章我們將從源碼的角度深入分析一下redis時(shí)間事件的設(shè)計(jì)與實(shí)現(xiàn)。

詳解redis中的時(shí)間事件

時(shí)間事件的定義

時(shí)間事件可以是單次到期執(zhí)行銷毀,也可以是定時(shí)任務(wù),對(duì)此redis對(duì)于時(shí)間事件統(tǒng)一封裝為aeTimeEvent對(duì)象,通過(guò)id來(lái)唯一標(biāo)識(shí)一個(gè)事件,結(jié)合when_sec和when_ms記錄任務(wù)到期執(zhí)行的秒和分,而執(zhí)行時(shí)間事件的函數(shù)也是交由timeProc指針?biāo)赶虻暮瘮?shù)執(zhí)行。 我們以一個(gè)redis定時(shí)執(zhí)行的任務(wù)為例,如下所示,該結(jié)果通過(guò)when_sec和when_ms記錄秒之前的時(shí)間和毫秒的時(shí)間,一旦這個(gè)時(shí)間到了就會(huì)執(zhí)行timeProc這個(gè)函數(shù)指針?biāo)赶虻姆椒╯erverCron,該函數(shù)會(huì)定期執(zhí)行各種任務(wù),這一點(diǎn)筆者會(huì)在后文展開(kāi):

對(duì)應(yīng)的我們給出時(shí)間事件的代碼描述,即位于ae.h這個(gè)頭文件中的aeTimeEvent 結(jié)構(gòu)體,這就是對(duì)時(shí)間事件的封裝結(jié)構(gòu)體,可以看到它除了筆者上述提到的核心字段以外,還有一個(gè)next指針用于連接下一個(gè)注冊(cè)的時(shí)間事件:

//時(shí)間事件
typedef struct aeTimeEvent {
    //時(shí)間事件的id全局遞增
    long long id; /* time event identifier. */
    long when_sec; /* seconds */
    //時(shí)間到達(dá)的時(shí)間
    long when_ms; /* milliseconds */
    //對(duì)應(yīng)時(shí)間時(shí)間的處理器
    aeTimeProc *timeProc;
    //......
    //連接下一個(gè)時(shí)間時(shí)間
    struct aeTimeEvent *next;
} aeTimeEvent;

上文提到redis的時(shí)間事件是以鏈表的形式關(guān)聯(lián)起來(lái),這里我們也給出時(shí)間時(shí)間統(tǒng)一管理對(duì)象,即時(shí)間輪詢器aeEventLoop ,它通過(guò)timeEventHead記錄第一個(gè)時(shí)間時(shí)間而后續(xù)的時(shí)間時(shí)間統(tǒng)一用時(shí)間時(shí)間的next指針進(jìn)行管理:

對(duì)應(yīng)我們也給出這段時(shí)間代碼的定義,即位于ae.h中aeEventLoop 的定義:

typedef struct aeEventLoop {
   //......
    //管理時(shí)間事件的列表
    aeTimeEvent *timeEventHead;
   //......
} aeEventLoop;

注冊(cè)時(shí)間事件

redis在服務(wù)器初始化階段,會(huì)注冊(cè)一個(gè)定時(shí)的時(shí)間事件,大約每1毫秒觸發(fā)一次,該事件主要做的是:

  • 更新redis全局時(shí)鐘,該時(shí)鐘用于全局變量獲取時(shí)間用的。
  • 隨機(jī)抽取redis內(nèi)存數(shù)據(jù)庫(kù)中的樣本刪除過(guò)期的鍵值對(duì)。
  • 如果檢查到aof重寫(xiě)完成,則進(jìn)行刷盤(pán)操作。
  • 如果發(fā)現(xiàn)當(dāng)前aof大小過(guò)大,則fork子進(jìn)程進(jìn)行aof重寫(xiě)操作。
  • ......。

對(duì)應(yīng)我們給出時(shí)間事件注冊(cè)的源碼段,即redis初始化時(shí)調(diào)用的方法initServer中的aeCreateTimeEvent,可以看到它將定時(shí)任務(wù)封裝為時(shí)間事件timeEvent,并設(shè)置時(shí)間間隔為1毫秒一次:

void initServer(void) {
     //......

    /* Create the serverCron() time event, that's our main way to process
     * background operations. */
    //創(chuàng)建時(shí)間事件注冊(cè)到eventLoop->timeEventHead中
    if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        redisPanic("Can't create the serverCron time event.");
        exit(1);
    }
 //......
}

輪詢處理時(shí)間事件

redis每次處理完所有用戶的請(qǐng)求之后,都會(huì)調(diào)用一次時(shí)間時(shí)間處理函數(shù)processTimeEvents,輪詢并處理就緒的時(shí)間事件,由此保證盡可能準(zhǔn)時(shí)執(zhí)行時(shí)間事件,如果事件時(shí)間非定時(shí)任務(wù)則執(zhí)行完成直接刪除,反之設(shè)置下一次執(zhí)行時(shí)間。這些步驟全部完成之后,返回本次處理的時(shí)間事件數(shù):

我們給出處理時(shí)間循環(huán)的入口aeMain,可以看到該函數(shù)就是redis核心函數(shù)所在,它會(huì)循環(huán)調(diào)用aeProcessEvents處理各種事件:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        //處理各種事件    
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

不如aeProcessEvents可以看到該函數(shù)執(zhí)行完所有用戶請(qǐng)求之后調(diào)用processTimeEvents方法獲取并執(zhí)行就緒的時(shí)間事件:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    //......

  //處理就緒的客戶端事件
        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;

     /* note the fe->mask & mask & ... code: maybe an already processed
             * event removed an element that fired and we still didn't
             * processed, so we check if the event is still valid. */
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
   
    //上述核心網(wǎng)絡(luò)IO事件完成后處理時(shí)間事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

最后我們就可以看到處理時(shí)間事件的核心代碼段,其內(nèi)部會(huì)從timeEventHead開(kāi)始輪詢就緒的時(shí)間事件,比對(duì)當(dāng)前時(shí)間是否大于或者等于到期時(shí)間,如果是則執(zhí)行當(dāng)前時(shí)間事件,再判斷這個(gè)事件是否是定時(shí)事件,如果是則更新下次執(zhí)行時(shí)間,反之刪除,最后累加本次處理的時(shí)間時(shí)間數(shù):

static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te;
    long long maxId;
    time_t now = time(NULL);

   //......
    if (now < eventLoop->lastTime) {
     //從時(shí)間事件頭開(kāi)始
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    eventLoop->lastTime = now;

    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    //循環(huán)處理到期的時(shí)間事件
    while(te) {
        long now_sec, now_ms;
        long long id;

        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);
        //如果現(xiàn)在的事件大于到達(dá)時(shí)間
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;

            id = te->id;
            //調(diào)用時(shí)間時(shí)間函數(shù)處理該事件
            retval = te->timeProc(eventLoop, id, te->clientData);
            //更新處理數(shù)
            processed++;
           //.....
            if (retval != AE_NOMORE) {//如果事件類型不是AE_NOMORE則說(shuō)明是定時(shí)事件更新周期,反之刪除
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                aeDeleteTimeEvent(eventLoop, id);
            }
            te = eventLoop->timeEventHead;
        } else {
            te = te->next;
        }
    }
    return processed;
}

redis對(duì)于時(shí)間事件實(shí)現(xiàn)上的優(yōu)化

因?yàn)闀r(shí)間事件有些要求定期執(zhí)行,所以redis為了保證時(shí)間執(zhí)行的實(shí)時(shí)性,做了如下兩個(gè)優(yōu)化:

  • 對(duì)于比較耗時(shí)的時(shí)間事件,例如AOF重寫(xiě),通過(guò)fork子進(jìn)程異步完成:
  • 對(duì)于返回給客戶端套接字的內(nèi)容,如果長(zhǎng)度超過(guò)預(yù)設(shè)的值,會(huì)主動(dòng)讓出線程執(zhí)行權(quán),避免時(shí)間時(shí)間饑餓。

對(duì)應(yīng)的我們給出第一點(diǎn)時(shí)間時(shí)間對(duì)于aof重寫(xiě)的核心代碼段,可以看到serverCron內(nèi)部判斷如果當(dāng)前沒(méi)有rdb和aof子進(jìn)程,且需要進(jìn)行aof重寫(xiě)則調(diào)用rewriteAppendOnlyFileBackground函數(shù)fork子進(jìn)程進(jìn)行aof重寫(xiě):

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
  
   //......

    /* Start a scheduled AOF rewrite if this was requested by the user while
     * a BGSAVE was in progress. */
    //aof_rewrite_scheduled設(shè)置為1,且沒(méi)有其他持久化子進(jìn)程則進(jìn)行aof重寫(xiě),通過(guò)異步避免耗時(shí)
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&
        server.aof_rewrite_scheduled)
    {
        rewriteAppendOnlyFileBackground();
    }

  //......
}

//fork子進(jìn)程進(jìn)行aof重寫(xiě)
int rewriteAppendOnlyFileBackground(void) {
   //......
    if ((childpid = fork()) == 0) {//fork子進(jìn)程進(jìn)行aof重寫(xiě)
        char tmpfile[256];

        /* Child */
        closeListeningSockets(0);
        redisSetProcTitle("redis-aof-rewrite");
        //生成一個(gè)tmp文件
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {//重寫(xiě)aof
            size_t private_dirty = zmalloc_get_private_dirty();

           //......
            exitFromChild(0);
        } else {
            exitFromChild(1);
        }
    } else {
        //......
    }
    return REDIS_OK; /* unreached */
}

而回復(fù)給客戶端結(jié)果的處理器sendReplyToClient內(nèi)部也有一段,判斷如果寫(xiě)入數(shù)totwritten 大于REDIS_MAX_WRITE_PER_EVENT (宏定義為64M),則直接中止寫(xiě)入,break退出等到下一次循環(huán)處理,避免因?yàn)檫@個(gè)處理導(dǎo)致其他時(shí)間事件饑餓而導(dǎo)致事件執(zhí)行延期:

void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    //......

    while(c->bufpos > 0 || listLength(c->reply)) {
     //......
        //對(duì)于文件事件數(shù)據(jù)寫(xiě)入超長(zhǎng)會(huì)讓出執(zhí)行權(quán)讓時(shí)間事件能夠盡可能的執(zhí)行
        server.stat_net_output_bytes += totwritten;
        if (totwritten > REDIS_MAX_WRITE_PER_EVENT &&
            (server.maxmemory == 0 ||
             zmalloc_used_memory() < server.maxmemory)) break;
    }
     //......
}
責(zé)任編輯:趙寧寧 來(lái)源: 寫(xiě)代碼的SharkChili
相關(guān)推薦

2023-11-07 07:47:35

Topic線程PUSH

2017-03-13 09:12:00

TCP數(shù)據(jù)結(jié)構(gòu)請(qǐng)求包

2024-12-27 08:24:55

2020-12-21 07:31:23

實(shí)現(xiàn)單機(jī)JDK

2024-02-28 09:54:07

線程池配置

2023-12-19 08:09:06

Python定時(shí)任務(wù)Cron表達(dá)式

2024-09-09 08:11:12

2024-05-13 09:49:30

.NETQuartz庫(kù)Cron表達(dá)式

2023-11-16 09:30:27

系統(tǒng)任務(wù)

2021-11-22 12:35:40

Python命令定時(shí)任務(wù)

2024-02-26 11:12:33

定時(shí)任務(wù)線程

2024-01-31 08:38:57

Python定時(shí)任務(wù)函數(shù)

2024-01-22 08:53:00

策略任務(wù)RocketMQ

2024-05-31 13:07:29

.NET Core定時(shí)任務(wù)編程

2023-08-08 08:35:28

web框架Hosting模塊

2009-10-28 10:05:29

Ubuntucrontab定時(shí)任務(wù)

2012-02-07 13:31:14

SpringJava

2010-03-10 15:47:58

crontab定時(shí)任務(wù)

2024-10-15 16:41:35

2020-08-05 07:37:29

任務(wù)系統(tǒng)定時(shí)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)