自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

告別性能瓶頸:NtyCo 協(xié)程框架實(shí)戰(zhàn)指南

開發(fā) 前端
對于每一個(gè)觸發(fā)的事件,co_eventloop 首先通過指針域 data.ptr 取出保存的 stPollItem_t 結(jié)構(gòu),并將其添加到 pstActiveList 列表中;之后從定時(shí)器輪盤中取出所有已經(jīng)超時(shí)的事件,也將其全部添加到 pstActiveList 中,pstActiveList 中的所有事件都作為活躍事件處理。

協(xié)程不是系統(tǒng)級線程,很多時(shí)候協(xié)程被稱為“輕量級線程”、“微線程”、“纖程(fiber)”等。簡單來說可以認(rèn)為協(xié)程是線程里不同的函數(shù),這些函數(shù)之間可以相互快速切換。

協(xié)程和用戶態(tài)線程非常接近,用戶態(tài)線程之間的切換不需要陷入內(nèi)核,但部分操作系統(tǒng)中用戶態(tài)線程的切換需要內(nèi)核態(tài)線程的輔助。

協(xié)程是編程語言(或者 lib)提供的特性(協(xié)程之間的切換方式與過程可以由編程人員確定),是用戶態(tài)操作。協(xié)程適用于 IO 密集型的任務(wù)。常見提供原生協(xié)程支持的語言有:c++20、golang、python 等,其他語言以庫的形式提供協(xié)程功能,比如 C++20 之前騰訊的 fiber 和 libco等等。

一、協(xié)程(Coroutine)簡介

協(xié)程,又稱微線程,纖程。英文名Coroutine。協(xié)程的概念很早就提出來了,但直到最近幾年才在某些語言(如Lua)中得到廣泛應(yīng)用。

子程序,或者稱為函數(shù),在所有語言中都是層級調(diào)用,比如A調(diào)用B,B在執(zhí)行過程中又調(diào)用了C,C執(zhí)行完畢返回,B執(zhí)行完畢返回,最后是A執(zhí)行完畢。所以子程序調(diào)用是通過棧實(shí)現(xiàn)的,一個(gè)線程就是執(zhí)行一個(gè)子程序。

子程序調(diào)用總是一個(gè)入口,一次返回,調(diào)用順序是明確的。而協(xié)程的調(diào)用和子程序不同,協(xié)程看上去也是子程序,但執(zhí)行過程中,在子程序內(nèi)部可中斷,然后轉(zhuǎn)而執(zhí)行別的子程序,在適當(dāng)?shù)臅r(shí)候再返回來接著執(zhí)行(注意,在一個(gè)子程序中中斷,去執(zhí)行其他子程序,不是函數(shù)調(diào)用,有點(diǎn)類似CPU的中斷)。

比如子程序A、B:def A():

print '1'
print '2'
print '3'
def B():
print 'x'
print 'y'
print 'z'

假設(shè)由協(xié)程執(zhí)行,在執(zhí)行A的過程中,可以隨時(shí)中斷,去執(zhí)行B,B也可能在執(zhí)行過程中中斷再去執(zhí)行A,結(jié)果可能是:

1
2
x
y
3
z

但是在A中是沒有調(diào)用B的,所以協(xié)程的調(diào)用比函數(shù)調(diào)用理解起來要難一些。

看起來A、B的執(zhí)行有點(diǎn)像多線程,但協(xié)程的特點(diǎn)在于是一個(gè)線程執(zhí)行,那和多線程比,協(xié)程有何優(yōu)勢?

最大的優(yōu)勢就是協(xié)程極高的執(zhí)行效率。因?yàn)樽映绦蚯袚Q不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數(shù)量越多,協(xié)程的性能優(yōu)勢就越明顯。

第二大優(yōu)勢就是不需要多線程的鎖機(jī)制,因?yàn)橹挥幸粋€(gè)線程,也不存在同時(shí)寫變量沖突,在協(xié)程中控制共享資源不加鎖,只需要判斷狀態(tài)就好了,所以執(zhí)行效率比多線程高很多。

因?yàn)閰f(xié)程是一個(gè)線程執(zhí)行,那怎么利用多核CPU呢?最簡單的方法是多進(jìn)程+協(xié)程,既充分利用多核,又充分發(fā)揮協(xié)程的高效率,可獲得極高的性能。

Python對協(xié)程的支持還非常有限,用在generator中的yield可以一定程度上實(shí)現(xiàn)協(xié)程。雖然支持不完全,但已經(jīng)可以發(fā)揮相當(dāng)大的威力了。

來看例子:

傳統(tǒng)的生產(chǎn)者-消費(fèi)者模型是一個(gè)線程寫消息,一個(gè)線程取消息,通過鎖機(jī)制控制隊(duì)列和等待,但一不小心就可能死鎖。

如果改用協(xié)程,生產(chǎn)者生產(chǎn)消息后,直接通過yield跳轉(zhuǎn)到消費(fèi)者開始執(zhí)行,待消費(fèi)者執(zhí)行完畢后,切換回生產(chǎn)者繼續(xù)生產(chǎn),效率極高:import time

def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
time.sleep(1)
r = '200 OK'
def produce(c):
c.next()
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
if __name__=='__main__':
c = consumer()
produce(c)

執(zhí)行結(jié)果:

[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK=

注意到consumer函數(shù)是一個(gè)generator(生成器),把一個(gè)consumer傳入produce后:

  • 首先調(diào)用c.next()啟動(dòng)生成器;
  • 然后,一旦生產(chǎn)了東西,通過c.send(n)切換到consumer執(zhí)行;
  • consumer通過yield拿到消息,處理,又通過yield把結(jié)果傳回;
  • produce拿到consumer處理的結(jié)果,繼續(xù)生產(chǎn)下一條消息;
  • produce決定不生產(chǎn)了,通過c.close()關(guān)閉consumer,整個(gè)過程結(jié)束。

整個(gè)流程無鎖,由一個(gè)線程執(zhí)行,produce和consumer協(xié)作完成任務(wù),所以稱為“協(xié)程”,而非線程的搶占式多任務(wù)。

二、C/C++協(xié)程

c++做為一個(gè)相對古老的語言,曾經(jīng)是步履蹣跚,直到c++11才奮起直追,但是對新技術(shù)的整體演進(jìn),其實(shí)c++仍然是保守的?,F(xiàn)在c++20的標(biāo)準(zhǔn)雖然已經(jīng)實(shí)現(xiàn)了協(xié)程,但目前能比較好支持c++20的編譯器幾乎都和整體的環(huán)境不太兼容。換句話說,還需要繼續(xù)等待整個(gè)c++的迭代版本,可能到了c++23,整體的環(huán)境就會(huì)跟上去,協(xié)程才會(huì)真正的飛入程序員的“尋常百姓家”。

正如前面提到的,協(xié)程一般來說是不需要鎖的,但是如果協(xié)程的底層操作是跨越線程動(dòng)態(tài)操作,仍然是需要鎖的存在的。這也是為什么要求盡量把協(xié)和的調(diào)度放到一個(gè)線程中去的原因。

首先需要聲明的是,這里不打算花時(shí)間來介紹什么是協(xié)程,以及協(xié)程和線程有什么不同。如果對此有任何疑問,可以自行 google。與 Python 不同,C/C++ 語言本身是不能天然支持協(xié)程的?,F(xiàn)有的 C++ 協(xié)程庫均基于兩種方案:利用匯編代碼控制協(xié)程上下文的切換,以及利用操作系統(tǒng)提供的 API 來實(shí)現(xiàn)協(xié)程上下文切換。

典型的例如:

  • libco,Boost.context:基于匯編代碼的上下文切換
  • phxrpc:基于 ucontext/Boost.context 的上下文切換
  • libmill:基于 setjump/longjump 的協(xié)程切換

一般而言,基于匯編的上下文切換要比采用系統(tǒng)調(diào)用的切換更加高效,這也是為什么 phxrpc 在使用 Boost.context 時(shí)要比使用 ucontext 性能更好的原因。關(guān)于 phxrpc 和 libmill 具體的協(xié)程實(shí)現(xiàn)方式,以后有時(shí)間再詳細(xì)介紹。

2.1協(xié)程的原理

既然協(xié)程如此厲害,那么它實(shí)現(xiàn)的原理到底是什么呢?協(xié)程最重要的應(yīng)用方式就是把線程在內(nèi)核上的開銷轉(zhuǎn)到了應(yīng)用層的開銷,避開或者屏蔽(對應(yīng)用者)線程操作的難度。那多線程操作的復(fù)雜性在哪兒呢?線程切換的隨機(jī)性和線程Context的跟隨,出入棧的保存和恢復(fù),相關(guān)數(shù)據(jù)的鎖和讀寫控制。這才是多線程的復(fù)雜性,如果再加異步引起的數(shù)據(jù)的非連續(xù)性和事件的非必然性操作,就更加增強(qiáng)了多線程遇到問題的判別和斷點(diǎn)的準(zhǔn)確。

好,既然是這樣,那么上框架,封裝不就得了。

協(xié)程和線程一樣,同樣需要做好兩個(gè)重點(diǎn):第一個(gè)是協(xié)程的調(diào)度;第二是上下文的切換。而這兩點(diǎn)在OS的相關(guān)書籍中的介紹海了去了,這里就不再贅述,原理基本都是一樣的。

如果以協(xié)程的關(guān)系來區(qū)分,協(xié)程也可以劃分為對稱和非對稱協(xié)程兩種。協(xié)程間是平等關(guān)系的,就是對稱的;反之為非對稱的。名字越起越多,但事兒還是那么兩下子,大家自己體會(huì)即可。

只要能保證上面所說的對上下文數(shù)據(jù)的安全性保證又能夠?qū)崿F(xiàn)協(xié)程在具體線程上的操作(某一個(gè)線程上執(zhí)行的所有協(xié)程是串行的),那么鎖的操作,從理論上講是不需要的(但實(shí)際開發(fā)中,因?yàn)閰f(xié)程的應(yīng)用還是少,所以還需要具體的問題具體分析)。協(xié)程的動(dòng)作集中在應(yīng)用層,而把復(fù)雜的內(nèi)核調(diào)度的線程屏蔽在下層框架上(或者以后會(huì)不會(huì)出現(xiàn)OS進(jìn)行封裝),從而大幅的降低了編程的難度,但卻擁有了線程快速異步調(diào)用的效果。

2.2協(xié)程實(shí)現(xiàn)機(jī)制

協(xié)程的實(shí)現(xiàn)有以下幾種機(jī)制:

①基于匯編的實(shí)現(xiàn):這個(gè)對匯編編程得要求有兩下子,這個(gè)網(wǎng)上也有不少例子,就不再這里搬門弄斧了。

②基于switch-case來實(shí)現(xiàn):這個(gè)其實(shí)更像是一個(gè)C語言的技巧,利用不同的狀態(tài)Case來達(dá)到目的,或者說大家認(rèn)知中的對編程語言的一種內(nèi)卷使用,網(wǎng)上有一個(gè)開源的項(xiàng)目:

https:/github.com/georgeredinger/protothreads

③基于操作系統(tǒng)提供的接口:Linux的ucontext,Windows的Fiber

Fiber可能很多人都不熟悉,這其實(shí)就是微軟原來提供的纖程,有興趣的可以去網(wǎng)上查找一下,有幾年這個(gè)概念炒得還是比較火的。ucontext是Linux上的一種操作,這兩個(gè)都可以當(dāng)作是一種類似特殊的應(yīng)用存在。游戲界的大佬云風(fēng)(《游戲之旅:我的編程感悟》作者)的coroutine就是類似于這種。興趣是編程的動(dòng)力,大家如果對這些有興趣可以看看這本書,雖然其中很多的東西都比較老了,但是整體的思想還是非常有借鑒的。

④基于接口setjmp和 longjmp同時(shí)使用static local 的變量來保存協(xié)程內(nèi)部的數(shù)據(jù)

這兩個(gè)函數(shù)是C語言的一個(gè)非常有意思的應(yīng)用,一般寫C好長時(shí)間的人,都沒接觸過這兩個(gè)API函數(shù),這個(gè)函數(shù)的定義是:

int setjmp(jmp_buf envbuf);
void longjmp(jmp_buf envbuf, int val);

它們兩個(gè)的作用,前者是用來將棧楨(上下文)保存在jmp_buf這個(gè)數(shù)據(jù)結(jié)構(gòu)中,然后可以通過后者 longjmp在指定的位置恢復(fù)出來。這就類似于使用goto語句跳轉(zhuǎn)到任意的地方,然后再把相關(guān)的數(shù)據(jù)恢復(fù)出來??匆幌聜€(gè)《C專家編程》中的例子:

#include <stdio.h>
#include <setjmp.h>

jmp_buf buf;

banana()
{
    printf("in banana() \n");
    longjmp(buf,1);
    printf("you'll never see this,because i longjmp'd");
}

main()
{
    if(setjmp(buf))
        printf("back in main\n");
    else
    {
        printf("first time through\n");
        banana();
    }
}

看完了上述的幾種方法,其實(shí)網(wǎng)上還有幾種實(shí)現(xiàn)的方式,但都是比較刻板,有興趣的可以搜索一下,這里就不提供鏈接了。

協(xié)程的實(shí)現(xiàn),按理說還是OS搞定最好,其實(shí)是框架底層,但C/C++的復(fù)雜性,以及不同的平臺(tái)和不同編譯器、庫之間的長期差異,導(dǎo)致這方面能做好的可能性真心是覺得不會(huì)太大。

三、libco協(xié)程的創(chuàng)建與切換

在介紹 coroutine 的創(chuàng)建之前,我們先來熟悉一下 libco 中用來表示一個(gè) coroutine 的數(shù)據(jù)結(jié)構(gòu),即定義在 co_routine_inner.h 中的 stCoRoutine_t:

struct stCoRoutine_t
{
stCoRoutineEnv_t *env; // 協(xié)程運(yùn)行環(huán)境
pfn_co_routine_t pfn; // 協(xié)程執(zhí)行的邏輯函數(shù)
void *arg; // 函數(shù)參數(shù)
coctx_t ctx; // 保存協(xié)程的下文環(huán)境
...
char cEnableSysHook; // 是否運(yùn)行系統(tǒng) hook,即非侵入式邏輯
char cIsShareStack; // 是否在共享?xiàng)DJ?void *pvEnv;
stStackMem_t* stack_mem; // 協(xié)程運(yùn)行時(shí)的??臻g
char* stack_sp; // 用來保存協(xié)程運(yùn)行時(shí)的棧空間
unsigned int save_size;
char* save_buffer;
};

我們暫時(shí)只需要了解表示協(xié)程的最簡單的幾個(gè)參數(shù),例如協(xié)程運(yùn)行環(huán)境,協(xié)程的上下文環(huán)境,協(xié)程運(yùn)行的函數(shù)以及運(yùn)行時(shí)??臻g。后面的 stack_sp,save_size 和 save_buffer 與 libco 共享?xiàng)DJ较嚓P(guān),有關(guān)共享?xiàng)5膬?nèi)容我們后續(xù)再說。

四、協(xié)程的實(shí)現(xiàn)與原理剖析

4.1協(xié)程的起源

問題:協(xié)程存在的原因?協(xié)程能夠解決哪些問題?

在我們現(xiàn)在CS,BS開發(fā)模式下,服務(wù)器的吞吐量是一個(gè)很重要的參數(shù)。其實(shí)吞吐量是IO處理時(shí)間加上業(yè)務(wù)處理。為了簡單起見,比如,客戶端與服務(wù)器之間是長連接的,客戶端定期給服務(wù)器發(fā)送心跳包數(shù)據(jù)。客戶端發(fā)送一次心跳包到服務(wù)器,服務(wù)器更新該新客戶端狀態(tài)的。心跳包發(fā)送的過程,業(yè)務(wù)處理時(shí)長等于IO讀?。≧ECV系統(tǒng)調(diào)用)加上業(yè)務(wù)處理(更新客戶狀態(tài))。吞吐量等于1s業(yè)務(wù)處理次數(shù)。

業(yè)務(wù)處理(更新客戶端狀態(tài))時(shí)間,業(yè)務(wù)不一樣的,處理時(shí)間不一樣,我們就不做討論。

那如何提升recv的性能。若只有一個(gè)客戶端,recv的性能也沒有必要提升,也不能提升。若在有百萬計(jì)的客戶端長連接的情況,我們該如何提升。以Linux為例,在這里需要介紹一個(gè)“網(wǎng)紅”就是epoll。服務(wù)器使用epoll管理百萬計(jì)的客戶端長連接,代碼框架如下:

while (1) {
    int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);

    for (i = 0;i < nready;i ++) {

        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            int connfd = accept(listenfd, xxx, xxxx);

            setnonblock(connfd);

            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = connfd;
            epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

        } else {
            handle(sockfd);
        }
    }
}

對于響應(yīng)式服務(wù)器,所有的客戶端的操作驅(qū)動(dòng)都是來源于這個(gè)大循環(huán)。來源于epoll_wait的反饋結(jié)果。

對于服務(wù)器處理百萬計(jì)的IO。Handle(sockfd)實(shí)現(xiàn)方式有兩種。

第一種,handle(sockfd)函數(shù)內(nèi)部對sockfd進(jìn)行讀寫動(dòng)作。代碼如下

int handle(int sockfd) {
    recv(sockfd, rbuffer, length, 0);
    parser_proto(rbuffer, length);
    send(sockfd, sbuffer, length, 0);
}

handle的io操作(send,recv)與epoll_wait是在同一個(gè)處理流程里面的。這就是IO同步操作。

優(yōu)點(diǎn):

1. sockfd管理方便。

2. 操作邏輯清晰。

缺點(diǎn):

1. 服務(wù)器程序依賴epoll_wait的循環(huán)響應(yīng)速度慢。

2. 程序性能差

第二種,handle(sockfd)函數(shù)內(nèi)部將sockfd的操作,push到線程池中,代碼如下:

int thread_cb(int sockfd) {
    // 此函數(shù)是在線程池創(chuàng)建的線程中運(yùn)行。
    // 與handle不在一個(gè)線程上下文中運(yùn)行
    recv(sockfd, rbuffer, length, 0);
    parser_proto(rbuffer, length);
    send(sockfd, sbuffer, length, 0);
}

int handle(int sockfd) {
    //此函數(shù)在主線程 main_thread 中運(yùn)行
    //在此處之前,確保線程池已經(jīng)啟動(dòng)。
    push_thread(sockfd, thread_cb); //將sockfd放到其他線程中運(yùn)行。
}

Handle函數(shù)是將sockfd處理方式放到另一個(gè)已經(jīng)其他的線程中運(yùn)行,如此做法,將io操作(recv,send)與epoll_wait 不在一個(gè)處理流程里面,使得io操作(recv,send)與epoll_wait實(shí)現(xiàn)解耦。這就叫做IO異步操作。

優(yōu)點(diǎn):

1. 子模塊好規(guī)劃。

2. 程序性能高。

缺點(diǎn):

正因?yàn)樽幽K好規(guī)劃,使得模塊之間的sockfd的管理異常麻煩。每一個(gè)子線程都需要管理好sockfd,避免在IO操作的時(shí)候,sockfd出現(xiàn)關(guān)閉或其他異常。

上文有提到IO同步操作,程序響應(yīng)慢,IO異步操作,程序響應(yīng)快。

下面來對比一下IO同步操作與IO異步操作。

代碼如下:

https:/github.com/wangbojing/c1000k_test/blob/master/server_mulport_epoll.c

在這份代碼的486行,#if 1, 打開的時(shí)候,為IO異步操作。關(guān)閉的時(shí)候,為IO同步操作。

接下來把我測試接入量的結(jié)果粘貼出來。

  • IO異步操作,每1000個(gè)連接接入的服務(wù)器響應(yīng)時(shí)間(900ms左右)。
  • IO同步操作,每1000個(gè)連接接入的服務(wù)器響應(yīng)時(shí)間(6500ms左右)。
  • IO異步操作與IO同步操作

對比項(xiàng)

  • IO同步操作
  • IO異步操作

Sockfd管理

  • 管理方便
  • 多個(gè)線程共同管理

代碼邏輯

  • 程序整體邏輯清晰
  • 子模塊邏輯清晰

程序性能

  • 響應(yīng)時(shí)間長,性能差
  • 響應(yīng)時(shí)間短,性能好

有沒有一種方式,有異步性能,同步的代碼邏輯。來方便編程人員對IO操作的組件呢?有,采用一種輕量級的協(xié)程來實(shí)現(xiàn)。在每次send或者recv之前進(jìn)行切換,再由調(diào)度器來處理epoll_wait的流程。

就是采用了基于這樣的思考,寫了NtyCo,實(shí)現(xiàn)了一個(gè)IO異步操作與協(xié)程結(jié)合的組件。

4.2協(xié)程的案例

問題:協(xié)程如何使用?與線程使用有何區(qū)別?

在做網(wǎng)絡(luò)IO編程的時(shí)候,有一個(gè)非常理想的情況,就是每次accept返回的時(shí)候,就為新來的客戶端分配一個(gè)線程,這樣一個(gè)客戶端對應(yīng)一個(gè)線程。就不會(huì)有多個(gè)線程共用一個(gè)sockfd。每請求每線程的方式,并且代碼邏輯非常易讀。但是這只是理想,線程創(chuàng)建代價(jià),調(diào)度代價(jià)就呵呵了。

先來看一下每請求每線程的代碼如下:

while(1) {
    socklen_t len = sizeof(struct sockaddr_in);
    int clientfd = accept(sockfd, (struct sockaddr*)&remote, &len);

    pthread_t thread_id;
    pthread_create(&thread_id, NULL, client_cb, &clientfd);

}

這樣的做法,寫完放到生產(chǎn)環(huán)境下面,如果你的老板不打死你,你來找我。我來幫你老板,為民除害。

如果我們有協(xié)程,我們就可以這樣實(shí)現(xiàn)。參考代碼如下:

https://github.com/wangbojing/NtyCo/blob/master/nty_server_test.c

while (1) {
    socklen_t len = sizeof(struct sockaddr_in);
    int cli_fd = nty_accept(fd, (struct sockaddr*)&remote, &len);

    nty_coroutine *read_co;
    nty_coroutine_create(&read_co, server_reader, &cli_fd);
}

這樣的代碼是完全可以放在生成環(huán)境下面的。如果你的老板要打死你,你來找我,我?guī)湍惆涯憷习宕蛩溃瑸槊癯Α?/span>

線程的API思維來使用協(xié)程,函數(shù)調(diào)用的性能來測試協(xié)程。

NtyCo封裝出來了若干接口,一類是協(xié)程本身的,二類是posix的異步封裝

協(xié)程API:while

1. 協(xié)程創(chuàng)建

int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)

2. 協(xié)程調(diào)度器的運(yùn)行

void nty_schedule_run(void)

POSIX異步封裝API:

int nty_socket(int domain, int type, int protocol)
int nty_accept(int fd, struct sockaddr *addr, socklen_t *len)
int nty_recv(int fd, void *buf, int length)
int nty_send(int fd, const void *buf, int length)
int nty_close(int fd)

4.3協(xié)程的實(shí)現(xiàn)之工作流程

問題:協(xié)程內(nèi)部是如何工作呢?

先來看一下協(xié)程服務(wù)器案例的代碼, 代碼參考:https://github.com/wangbojing/NtyCo/blob/master/nty_server_test.c

分別討論三個(gè)協(xié)程的比較晦澀的工作流程。第一個(gè)協(xié)程的創(chuàng)建;第二個(gè)IO異步操作;第三個(gè)協(xié)程子過程回調(diào)

(1)創(chuàng)建協(xié)程

當(dāng)我們需要異步調(diào)用的時(shí)候,我們會(huì)創(chuàng)建一個(gè)協(xié)程。比如accept返回一個(gè)新的sockfd,創(chuàng)建一個(gè)客戶端處理的子過程。再比如需要監(jiān)聽多個(gè)端口的時(shí)候,創(chuàng)建一個(gè)server的子過程,這樣多個(gè)端口同時(shí)工作的,是符合微服務(wù)的架構(gòu)的。

創(chuàng)建協(xié)程的時(shí)候,進(jìn)行了如何的工作?創(chuàng)建API如下:

int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)
  • 參數(shù)1:nty_coroutine **new_co,需要傳入空的協(xié)程的對象,這個(gè)對象是由內(nèi)部創(chuàng)建的,并且在函數(shù)返回的時(shí)候,會(huì)返回一個(gè)內(nèi)部創(chuàng)建的協(xié)程對象。
  • 參數(shù)2:proc_coroutine func,協(xié)程的子過程。當(dāng)協(xié)程被調(diào)度的時(shí)候,就會(huì)執(zhí)行該函數(shù)。
  • 參數(shù)3:void *arg,需要傳入到新協(xié)程中的參數(shù)。

協(xié)程不存在親屬關(guān)系,都是一致的調(diào)度關(guān)系,接受調(diào)度器的調(diào)度。調(diào)用create API就會(huì)創(chuàng)建一個(gè)新協(xié)程,新協(xié)程就會(huì)加入到調(diào)度器的就緒隊(duì)列中。

創(chuàng)建的協(xié)程具體步驟會(huì)在《協(xié)程的實(shí)現(xiàn)之原語操作》來描述。

(2)實(shí)現(xiàn)IO異步操作

大部分的朋友會(huì)關(guān)心IO異步操作如何實(shí)現(xiàn),在send與recv調(diào)用的時(shí)候,如何實(shí)現(xiàn)異步操作的。

先來看一下一段代碼:

while (1) {
    int nready = epoll_wait(epfd, events, EVENT_SIZE, -1);

    for (i = 0;i < nready;i ++) {

        int sockfd = events[i].data.fd;
        if (sockfd == listenfd) {
            int connfd = accept(listenfd, xxx, xxxx);

            setnonblock(connfd);

            ev.events = EPOLLIN | EPOLLET;
            ev.data.fd = connfd;
            epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev);

        } else {

            epoll_ctl(epfd, EPOLL_CTL_DEL, sockfd, NULL);
            recv(sockfd, buffer, length, 0);

            //parser_proto(buffer, length);

            send(sockfd, buffer, length, 0);
            epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, NULL);
        }
    }
}

在進(jìn)行IO操作(recv,send)之前,先執(zhí)行了 epoll_ctl的del操作,將相應(yīng)的sockfd從epfd中刪除掉,在執(zhí)行完IO操作(recv,send)再進(jìn)行epoll_ctl的add的動(dòng)作。這段代碼看起來似乎好像沒有什么作用。

如果是在多個(gè)上下文中,這樣的做法就很有意義了。能夠保證sockfd只在一個(gè)上下文中能夠操作IO的。不會(huì)出現(xiàn)在多個(gè)上下文同時(shí)對一個(gè)IO進(jìn)行操作的。協(xié)程的IO異步操作正式是采用此模式進(jìn)行的。

把單一協(xié)程的工作與調(diào)度器的工作的劃分清楚,先引入兩個(gè)原語操作 resume,yield會(huì)在《協(xié)程的實(shí)現(xiàn)之原語操作》來講解協(xié)程所有原語操作的實(shí)現(xiàn),yield就是讓出運(yùn)行,resume就是恢復(fù)運(yùn)行。

調(diào)度器與協(xié)程的上下文切換如下圖所示:

圖片圖片

在協(xié)程的上下文IO異步操作(nty_recv,nty_send)函數(shù),步驟如下:

1. 將sockfd 添加到epoll管理中。

2. 進(jìn)行上下文環(huán)境切換,由協(xié)程上下文yield到調(diào)度器的上下文。

3. 調(diào)度器獲取下一個(gè)協(xié)程上下文。Resume新的協(xié)程

(3)回調(diào)協(xié)程的子過程

在create協(xié)程后,何時(shí)回調(diào)子過程?何種方式回調(diào)子過程?

首先來回顧一下x86_64寄存器的相關(guān)知識。匯編與寄存器相關(guān)知識還會(huì)在《協(xié)程的實(shí)現(xiàn)之切換》繼續(xù)深入探討的。x86_64 的寄存器有16個(gè)64位寄存器,分別是:

%rax, %rbx,%rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12, %r13, %r14, %r15。
  • %rax 作為函數(shù)返回值使用的。
  • %rsp 棧指針寄存器,指向棧頂
  • %rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函數(shù)參數(shù),依次對應(yīng)第1參數(shù),第2參數(shù)。。。
  • %rbx, %rbp, %r12, %r13, %r14, %r15 用作數(shù)據(jù)存儲(chǔ),遵循調(diào)用者使用規(guī)則,換句話說,就是隨便用。調(diào)用子函數(shù)之前要備份它,以防它被修改
  • %r10, %r11 用作數(shù)據(jù)存儲(chǔ),就是使用前要先保存原值

以NtyCo的實(shí)現(xiàn)為例,來分析這個(gè)過程。CPU有一個(gè)非常重要的寄存器叫做EIP,用來存儲(chǔ)CPU運(yùn)行下一條指令的地址。我們可以把回調(diào)函數(shù)的地址存儲(chǔ)到EIP中,將相應(yīng)的參數(shù)存儲(chǔ)到相應(yīng)的參數(shù)寄存器中。實(shí)現(xiàn)子過程調(diào)用的邏輯代碼如下:

void _exec(nty_coroutine *co) {
    co->func(co->arg); //子過程的回調(diào)函數(shù)
}

void nty_coroutine_init(nty_coroutine *co) {
    //ctx 就是協(xié)程的上下文
    co->ctx.edi = (void*)co; //設(shè)置參數(shù)
    co->ctx.eip = (void*)_exec; //設(shè)置回調(diào)函數(shù)入口
    //當(dāng)實(shí)現(xiàn)上下文切換的時(shí)候,就會(huì)執(zhí)行入口函數(shù)_exec , _exec 調(diào)用子過程func
}

4.4協(xié)程的實(shí)現(xiàn)之原語操作

問題:協(xié)程的內(nèi)部原語操作有哪些?分別如何實(shí)現(xiàn)的?

協(xié)程的核心原語操作:create, resume, yield。協(xié)程的原語操作有create怎么沒有exit?以NtyCo為例,協(xié)程一旦創(chuàng)建就不能有用戶自己銷毀,必須得以子過程執(zhí)行結(jié)束,就會(huì)自動(dòng)銷毀協(xié)程的上下文數(shù)據(jù)。以_exec執(zhí)行入口函數(shù)返回而銷毀協(xié)程的上下文與相關(guān)信息。co->func(co->arg) 是子過程,若用戶需要長久運(yùn)行協(xié)程,就必須要在func函數(shù)里面寫入循環(huán)等操作。所以NtyCo里面沒有實(shí)現(xiàn)exit的原語操作。

create:創(chuàng)建一個(gè)協(xié)程。

1. 調(diào)度器是否存在,不存在也創(chuàng)建。調(diào)度器作為全局的單例。將調(diào)度器的實(shí)例存儲(chǔ)在線程的私有空間pthread_setspecific。

2. 分配一個(gè)coroutine的內(nèi)存空間,分別設(shè)置coroutine的數(shù)據(jù)項(xiàng),??臻g,棧大小,初始狀態(tài),創(chuàng)建時(shí)間,子過程回調(diào)函數(shù),子過程的調(diào)用參數(shù)。

  • 3. 將新分配協(xié)程添加到就緒隊(duì)列 ready_queue中

實(shí)現(xiàn)代碼如下:

int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg) {

    assert(pthread_once(&sched_key_once, nty_coroutine_sched_key_creator) == 0);
    nty_schedule *sched = nty_coroutine_get_sched();

    if (sched == NULL) {
        nty_schedule_create(0);

        sched = nty_coroutine_get_sched();
        if (sched == NULL) {
            printf("Failed to create schedulern");
            return -1;
        }
    }

    nty_coroutine *co = calloc(1, sizeof(nty_coroutine));
    if (co == NULL) {
        printf("Failed to allocate memory for new coroutinen");
        return -2;
    }

    //
    int ret = posix_memalign(&co->stack, getpagesize(), sched->stack_size);
    if (ret) {
        printf("Failed to allocate stack for new coroutinen");
        free(co);
        return -3;
    }

    co->sched = sched;
    co->stack_size = sched->stack_size;
    co->status = BIT(NTY_COROUTINE_STATUS_NEW); //
    co->id = sched->spawned_coroutines ++;
co->func = func;

    co->fd = -1;
co->events = 0;

    co->arg = arg;
    co->birth = nty_coroutine_usec_now();
    *new_co = co;

    TAILQ_INSERT_TAIL(&co->sched->ready, co, ready_next);

    return 0;
}

yield:讓出CPU。

void nty_coroutine_yield(nty_coroutine *co)

參數(shù):當(dāng)前運(yùn)行的協(xié)程實(shí)例

調(diào)用后該函數(shù)不會(huì)立即返回,而是切換到最近執(zhí)行resume的上下文。該函數(shù)返回是在執(zhí)行resume的時(shí)候,會(huì)有調(diào)度器統(tǒng)一選擇resume的,然后再次調(diào)用yield的。resume與yield是兩個(gè)可逆過程的原子操作。

resume:恢復(fù)協(xié)程的運(yùn)行權(quán)

int nty_coroutine_resume(nty_coroutine *co)

參數(shù):需要恢復(fù)運(yùn)行的協(xié)程實(shí)例

調(diào)用后該函數(shù)也不會(huì)立即返回,而是切換到運(yùn)行協(xié)程實(shí)例的yield的位置。返回是在等協(xié)程相應(yīng)事務(wù)處理完成后,主動(dòng)yield會(huì)返回到resume的地方。

4.5協(xié)程的實(shí)現(xiàn)之切換

問題:協(xié)程的上下文如何切換?切換代碼如何實(shí)現(xiàn)?

首先來回顧一下x86_64寄存器的相關(guān)知識。x86_64 的寄存器有16個(gè)64位寄存器,分別是:

%rax, %rbx, %rcx, %esi, %edi, %rbp, %rsp, %r8, %r9, %r10, %r11, %r12,%r13, %r14, %r15。

%rax 作為函數(shù)返回值使用的。%rsp 棧指針寄存器,指向棧頂

%rdi, %rsi, %rdx, %rcx, %r8, %r9 用作函數(shù)參數(shù),依次對應(yīng)第1參數(shù),第2參數(shù)。

%rbx, %rbp, %r12, %r13, %r14, %r15 用作數(shù)據(jù)存儲(chǔ),遵循調(diào)用者使用規(guī)則,換句話說,就是隨便用。調(diào)用子函數(shù)之前要備份它,以防它被修改

%r10, %r11 用作數(shù)據(jù)存儲(chǔ),就是使用前要先保存原值。

上下文切換,就是將CPU的寄存器暫時(shí)保存,再將即將運(yùn)行的協(xié)程的上下文寄存器,分別mov到相對應(yīng)的寄存器上。此時(shí)上下文完成切換。如下圖所示:

切換_switch函數(shù)定義:

int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
  • 參數(shù)1:即將運(yùn)行協(xié)程的上下文,寄存器列表
  • 參數(shù)2:正在運(yùn)行協(xié)程的上下文,寄存器列表

我們nty_cpu_ctx結(jié)構(gòu)體的定義,為了兼容x86,結(jié)構(gòu)體項(xiàng)命令采用的是x86的寄存器名字命名。

typedef struct _nty_cpu_ctx {
void *esp; //
void *ebp;
void *eip;
void *edi;
void *esi;
void *ebx;
void *r1;
void *r2;
void *r3;
void *r4;
void *r5;
} nty_cpu_ctx;

_switch返回后,執(zhí)行即將運(yùn)行協(xié)程的上下文。是實(shí)現(xiàn)上下文的切換

_switch的實(shí)現(xiàn)代碼:

0: __asm__ (
1: "    .text                                  n"
2: "       .p2align 4,,15                                   n"
3: ".globl _switch                                          n"
4: ".globl __switch                                         n"
5: "_switch:                                                n"
6: "__switch:                                               n"
7: "       movq %rsp, 0(%rsi)      # save stack_pointer     n"
8: "       movq %rbp, 8(%rsi)      # save frame_pointer     n"
9: "       movq (%rsp), %rax       # save insn_pointer      n"
10: "       movq %rax, 16(%rsi)                              n"
11: "       movq %rbx, 24(%rsi)     # save rbx,r12-r15       n"
12: "       movq %r12, 32(%rsi)                              n"
13: "       movq %r13, 40(%rsi)                              n"
14: "       movq %r14, 48(%rsi)                              n"
15: "       movq %r15, 56(%rsi)                              n"
16: "       movq 56(%rdi), %r15                              n"
17: "       movq 48(%rdi), %r14                              n"
18: "       movq 40(%rdi), %r13     # restore rbx,r12-r15    n"
19: "       movq 32(%rdi), %r12                              n"
20: "       movq 24(%rdi), %rbx                              n"
21: "       movq 8(%rdi), %rbp      # restore frame_pointer  n"
22: "       movq 0(%rdi), %rsp      # restore stack_pointer  n"
23: "       movq 16(%rdi), %rax     # restore insn_pointer   n"
24: "       movq %rax, (%rsp)                                n"
25: "       ret                                              n"
26: );

按照x86_64的寄存器定義,%rdi保存第一個(gè)參數(shù)的值,即new_ctx的值,%rsi保存第二個(gè)參數(shù)的值,即保存cur_ctx的值。X86_64每個(gè)寄存器是64bit,8byte。

Movq %rsp, 0(%rsi) 保存在棧指針到cur_ctx實(shí)例的rsp項(xiàng)

Movq %rbp, 8(%rsi)

Movq (%rsp), %rax #將棧頂?shù)刂防锩娴闹荡鎯?chǔ)到rax寄存器中。Ret后出棧,執(zhí)行棧頂

Movq %rbp, 8(%rsi) #后續(xù)的指令都是用來保存CPU的寄存器到new_ctx的每一項(xiàng)中

Movq 8(%rdi), %rbp #將new_ctx的值

Movq 16(%rdi), %rax #將指令指針rip的值存儲(chǔ)到rax中

Movq %rax, (%rsp) # 將存儲(chǔ)的rip值的rax寄存器賦值給棧指針的地址的值。

Ret # 出棧,回到棧指針,執(zhí)行rip指向的指令。

上下文環(huán)境的切換完成。

4.6協(xié)程的實(shí)現(xiàn)之定義

問題:協(xié)程如何定義? 調(diào)度器如何定義?

先來一道設(shè)計(jì)題:

設(shè)計(jì)一個(gè)協(xié)程的運(yùn)行體R與運(yùn)行體調(diào)度器S的結(jié)構(gòu)體

  • 1. 運(yùn)行體R:包含運(yùn)行狀態(tài){就緒,睡眠,等待},運(yùn)行體回調(diào)函數(shù),回調(diào)參數(shù),棧指針,棧大小,當(dāng)前運(yùn)行體
  • 2. 調(diào)度器S:包含執(zhí)行集合{就緒,睡眠,等待}

這道設(shè)計(jì)題拆分兩個(gè)個(gè)問題,一個(gè)運(yùn)行體如何高效地在多種狀態(tài)集合更換。調(diào)度器與運(yùn)行體的功能界限。

運(yùn)行體如何高效地在多種狀態(tài)集合更換

新創(chuàng)建的協(xié)程,創(chuàng)建完成后,加入到就緒集合,等待調(diào)度器的調(diào)度;協(xié)程在運(yùn)行完成后,進(jìn)行IO操作,此時(shí)IO并未準(zhǔn)備好,進(jìn)入等待狀態(tài)集合;IO準(zhǔn)備就緒,協(xié)程開始運(yùn)行,后續(xù)進(jìn)行sleep操作,此時(shí)進(jìn)入到睡眠狀態(tài)集合。

就緒(ready),睡眠(sleep),等待(wait)集合該采用如何數(shù)據(jù)結(jié)構(gòu)來存儲(chǔ)?

就緒(ready)集合并不沒有設(shè)置優(yōu)先級的選型,所有在協(xié)程優(yōu)先級一致,所以可以使用隊(duì)列來存儲(chǔ)就緒的協(xié)程,簡稱為就緒隊(duì)列(ready_queue)。

睡眠(sleep)集合需要按照睡眠時(shí)長進(jìn)行排序,采用紅黑樹來存儲(chǔ),簡稱睡眠樹(sleep_tree)紅黑樹在工程實(shí)用為<key, value>, key為睡眠時(shí)長,value為對應(yīng)的協(xié)程結(jié)點(diǎn)。

等待(wait)集合,其功能是在等待IO準(zhǔn)備就緒,等待IO也是有時(shí)長的,所以等待(wait)集合采用紅黑樹的來存儲(chǔ),簡稱等待樹(wait_tree),此處借鑒nginx的設(shè)計(jì)。

Coroutine就是協(xié)程的相應(yīng)屬性,status表示協(xié)程的運(yùn)行狀態(tài)。sleep與wait兩顆紅黑樹,ready使用的隊(duì)列,比如某協(xié)程調(diào)用sleep函數(shù),加入睡眠樹(sleep_tree),status |= S即可。比如某協(xié)程在等待樹(wait_tree)中,而IO準(zhǔn)備就緒放入ready隊(duì)列中,只需要移出等待樹(wait_tree),狀態(tài)更改status &= ~W即可。有一個(gè)前提條件就是不管何種運(yùn)行狀態(tài)的協(xié)程,都在就緒隊(duì)列中,只是同時(shí)包含有其他的運(yùn)行狀態(tài)。

(2)調(diào)度器與協(xié)程的功能界限

每一協(xié)程都需要使用的而且可能會(huì)不同屬性的,就是協(xié)程屬性。每一協(xié)程都需要的而且數(shù)據(jù)一致的,就是調(diào)度器的屬性。比如棧大小的數(shù)值,每個(gè)協(xié)程都一樣的后不做更改可以作為調(diào)度器的屬性,如果每個(gè)協(xié)程大小不一致,則可以作為協(xié)程的屬性。

用來管理所有協(xié)程的屬性,作為調(diào)度器的屬性。比如epoll用來管理每一個(gè)協(xié)程對應(yīng)的IO,是需要作為調(diào)度器屬性。

按照前面幾章的描述,定義一個(gè)協(xié)程結(jié)構(gòu)體需要多少域,我們描述了每一個(gè)協(xié)程有自己的上下文環(huán)境,需要保存CPU的寄存器ctx;需要有子過程的回調(diào)函數(shù)func;需要有子過程回調(diào)函數(shù)的參數(shù) arg;需要定義自己的棧空間 stack;需要有自己棧空間的大小 stack_size;需要定義協(xié)程的創(chuàng)建時(shí)間 birth;需要定義協(xié)程當(dāng)前的運(yùn)行狀態(tài) status;需要定當(dāng)前運(yùn)行狀態(tài)的結(jié)點(diǎn)(ready_next, wait_node, sleep_node);需要定義協(xié)程id;需要定義調(diào)度器的全局對象 sched。

協(xié)程的核心結(jié)構(gòu)體如下:

typedef struct _nty_coroutine {

    nty_cpu_ctx ctx;
    proc_coroutine func;
    void *arg;
    size_t stack_size;

    nty_coroutine_status status;
    nty_schedule *sched;

    uint64_t birth;
    uint64_t id;

    void *stack;

    RB_ENTRY(_nty_coroutine) sleep_node;
    RB_ENTRY(_nty_coroutine) wait_node;

    TAILQ_ENTRY(_nty_coroutine) ready_next;
    TAILQ_ENTRY(_nty_coroutine) defer_next;

} nty_coroutine;

調(diào)度器是管理所有協(xié)程運(yùn)行的組件,協(xié)程與調(diào)度器的運(yùn)行關(guān)系。

調(diào)度器的屬性,需要有保存CPU的寄存器上下文 ctx,可以從協(xié)程運(yùn)行狀態(tài)yield到調(diào)度器運(yùn)行的。從協(xié)程到調(diào)度器用yield,從調(diào)度器到協(xié)程用resume以下為協(xié)程的定義。

typedef struct _nty_coroutine_queue nty_coroutine_queue;

typedef struct _nty_coroutine_rbtree_sleep nty_coroutine_rbtree_sleep;
typedef struct _nty_coroutine_rbtree_wait nty_coroutine_rbtree_wait;

typedef struct _nty_schedule {
    uint64_t birth;
nty_cpu_ctx ctx;

    struct _nty_coroutine *curr_thread;
    int page_size;

    int poller_fd;
    int eventfd;
    struct epoll_event eventlist[NTY_CO_MAX_EVENTS];
    int nevents;

    int num_new_events;

    nty_coroutine_queue ready;
    nty_coroutine_rbtree_sleep sleeping;
    nty_coroutine_rbtree_wait waiting;

} nty_schedule;

4.7協(xié)程的實(shí)現(xiàn)之調(diào)度器

問題:協(xié)程如何被調(diào)度?

調(diào)度器的實(shí)現(xiàn),有兩種方案,一種是生產(chǎn)者消費(fèi)者模式,另一種多狀態(tài)運(yùn)行。

(1)生產(chǎn)者消費(fèi)者模式

邏輯代碼如下:

while (1) {

        //遍歷睡眠集合,將滿足條件的加入到ready
        nty_coroutine *expired = NULL;
        while ((expired = sleep_tree_expired(sched)) != ) {
            TAILQ_ADD(&sched->ready, expired);
        }

        //遍歷等待集合,將滿足添加的加入到ready
        nty_coroutine *wait = NULL;
        int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
        for (i = 0;i < nready;i ++) {
            wait = wait_tree_search(events[i].data.fd);
            TAILQ_ADD(&sched->ready, wait);
        }

        // 使用resume回復(fù)ready的協(xié)程運(yùn)行權(quán)
        while (!TAILQ_EMPTY(&sched->ready)) {
            nty_coroutine *ready = TAILQ_POP(sched->ready);
            resume(ready);
        }
    }

(2)多狀態(tài)運(yùn)行

實(shí)現(xiàn)邏輯代碼如下:

while (1) {

        //遍歷睡眠集合,使用resume恢復(fù)expired的協(xié)程運(yùn)行權(quán)
        nty_coroutine *expired = NULL;
        while ((expired = sleep_tree_expired(sched)) != ) {
            resume(expired);
        }

        //遍歷等待集合,使用resume恢復(fù)wait的協(xié)程運(yùn)行權(quán)
        nty_coroutine *wait = NULL;
        int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
        for (i = 0;i < nready;i ++) {
            wait = wait_tree_search(events[i].data.fd);
            resume(wait);
        }

        // 使用resume恢復(fù)ready的協(xié)程運(yùn)行權(quán)
        while (!TAILQ_EMPTY(sched->ready)) {
            nty_coroutine *ready = TAILQ_POP(sched->ready);
            resume(ready);
        }
    }

4.8協(xié)程性能測試

測試環(huán)境:4臺(tái)VMWare 虛擬機(jī)

  • 1臺(tái)服務(wù)器 6G內(nèi)存,4核CPU
  • 3臺(tái)客戶端 2G內(nèi)存,2核CPU

操作系統(tǒng):ubuntu 14.04

服務(wù)器端測試代碼:https://github.com/wangbojing/NtyCo

客戶端測試代碼:https://github.com/wangbojing/c1000k_test/blob/master/client_mutlport_epoll.c

按照每一個(gè)連接啟動(dòng)一個(gè)協(xié)程來測試。每一個(gè)協(xié)程??臻g 4096byte

6G內(nèi)存 –> 測試協(xié)程數(shù)量100W無異常。并且能夠正常收發(fā)數(shù)據(jù)。

五、協(xié)程創(chuàng)建與運(yùn)行

由于多個(gè)協(xié)程運(yùn)行于一個(gè)線程內(nèi)部的,因此當(dāng)創(chuàng)建線程中的第一個(gè)協(xié)程時(shí),需要初始化該協(xié)程所在的環(huán)境 stCoRoutineEnv_t,這個(gè)環(huán)境是線程用來管理協(xié)程的,通過該環(huán)境,線程可以得知當(dāng)前一共創(chuàng)建了多少個(gè)協(xié)程,當(dāng)前正在運(yùn)行哪一個(gè)協(xié)程,當(dāng)前應(yīng)當(dāng)如何調(diào)度協(xié)程:

struct stCoRoutineEnv_t
{
stCoRoutine_t *pCallStack[ 128 ]; // 記錄當(dāng)前創(chuàng)建的協(xié)程
int iCallStackSize; // 記錄當(dāng)前一共創(chuàng)建了多少個(gè)協(xié)程
stCoEpoll_t *pEpoll; // 該線程的協(xié)程調(diào)度器
// 在使用共享?xiàng)DJ娇截悧?nèi)存時(shí)記錄相應(yīng)的 coroutine
stCoRoutine_t* pending_co;
stCoRoutine_t* occupy_co;
};

上述代碼表明 libco 允許一個(gè)線程內(nèi)最多創(chuàng)建 128 個(gè)協(xié)程,其中 pCallStack[iCallStackSize-1] 也就是棧頂?shù)膮f(xié)程表示當(dāng)前正在運(yùn)行的協(xié)程。當(dāng)調(diào)用函數(shù) co_create 時(shí),首先檢查當(dāng)前線程中的 coroutine env 結(jié)構(gòu)是否創(chuàng)建。這里 libco 對于每個(gè)線程內(nèi)的 stCoRoutineEnv_t 并沒有使用 thread-local 的方式(例如gcc 內(nèi)置的 __thread,phxrpc采用這種方式)來管理,而是預(yù)先定義了一個(gè)大的數(shù)組,并通過對應(yīng)的 PID 來獲取其協(xié)程環(huán)境。

static stCoRoutineEnv_t* g_arrCoEnvPerThread[204800]
stCoRoutineEnv_t *co_get_curr_thread_env()
{
return g_arrCoEnvPerThread[ GetPid() ];
}

初始化 stCoRoutineEnv_t 時(shí)主要完成以下幾步:

為 stCoRoutineEnv_t 申請空間并且進(jìn)行初始化,設(shè)置協(xié)程調(diào)度器 pEpoll。

創(chuàng)建一個(gè)空的 coroutine,初始化其上下文環(huán)境( 有關(guān) coctx 在后文詳細(xì)介紹 ),將其加入到該線程的協(xié)程環(huán)境中進(jìn)行管理,并且設(shè)置其為 main coroutine。這個(gè) main coroutine 用來運(yùn)行該線程主邏輯。

當(dāng)初始化完成協(xié)程環(huán)境之后,調(diào)用函數(shù) co_create_env 來創(chuàng)建具體的協(xié)程,該函數(shù)初始化一個(gè)協(xié)程結(jié)構(gòu) stCoRoutine_t,設(shè)置該結(jié)構(gòu)中的各項(xiàng)字段,例如運(yùn)行的函數(shù) pfn,運(yùn)行時(shí)的棧地址等等。需要說明的就是,如果使用了非共享?xiàng)DJ剑瑒t需要為該協(xié)程單獨(dú)申請??臻g,否則從共享?xiàng)V猩暾埧臻g。??臻g表示如下:

struct stStackMem_t
{
stCoRoutine_t* occupy_co; // 使用該棧的協(xié)程
int stack_size; // 棧大小
char* stack_bp; // 棧的指針,棧從高地址向低地址增長
char* stack_buffer; // 棧底
};

使用 co_create 創(chuàng)建完一個(gè)協(xié)程之后,將調(diào)用 co_resume 來將該協(xié)程激活運(yùn)行:

void co_resume( stCoRoutine_t *co )
{
stCoRoutineEnv_t *env = co->env;
// 獲取當(dāng)前正在運(yùn)行的協(xié)程的結(jié)構(gòu)
stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
if( !co->cStart )
{
// 為將要運(yùn)行的 co 布置上下文環(huán)境
coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
co->cStart = 1;
}
env->pCallStack[ env->iCallStackSize++ ] = co; // 設(shè)置co為運(yùn)行的線程
co_swap( lpCurrRoutine, co );
}

函數(shù) co_swap 的作用類似于 Unix 提供的函數(shù) swapcontext:將當(dāng)前正在運(yùn)行的 coroutine 的上下文以及狀態(tài)保存到結(jié)構(gòu) lpCurrRoutine 中,并且將 co 設(shè)置成為要運(yùn)行的協(xié)程,從而實(shí)現(xiàn)協(xié)程的切換。co_swap 具體完成三項(xiàng)工作:

記錄當(dāng)前協(xié)程 curr 的運(yùn)行棧的棧頂指針,通過 char c; curr_stack_sp=&c 實(shí)現(xiàn),當(dāng)下次切換回 curr時(shí),可以從該棧頂指針指向的位置繼續(xù),執(zhí)行完 curr 后可以順利釋放該棧。

處理共享?xiàng)O嚓P(guān)的操作,并且調(diào)用函數(shù) coctx_swap 來完成上下文環(huán)境的切換。注意執(zhí)行完 coctx_swap之后,執(zhí)行流程將跳到新的 coroutine 也就是 pending_co 中運(yùn)行,后續(xù)的代碼需要等下次切換回 curr 時(shí)才會(huì)執(zhí)行。

當(dāng)下次切換回 curr 時(shí),處理共享?xiàng)O嚓P(guān)的操作。

對應(yīng)于 co_resume 函數(shù),協(xié)程主動(dòng)讓出執(zhí)行權(quán)則調(diào)用 co_yield 函數(shù)。co_yield 函數(shù)調(diào)用了 co_yield_env,將當(dāng)前協(xié)程與當(dāng)前線程中記錄的其他協(xié)程進(jìn)行切換:

void co_yield_env( stCoRoutineEnv_t *env )
{
stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];
env->iCallStackSize--;
co_swap( curr, last);
}

前面我們已經(jīng)提到過,pCallStack 棧頂所指向的即為當(dāng)前正在運(yùn)行的協(xié)程所對應(yīng)的結(jié)構(gòu),因此該函數(shù)將 curr 取出來,并將當(dāng)前正運(yùn)行的協(xié)程上下文保存到該結(jié)構(gòu)上,并切換到協(xié)程 last 上執(zhí)行。接下來我們以 32-bit 的系統(tǒng)為例來分析 libco 是如何實(shí)現(xiàn)協(xié)程運(yùn)行環(huán)境的切換的。

六、協(xié)程上下文的創(chuàng)建與切換

libco 使用結(jié)構(gòu) struct coctx_t 來表示一個(gè)協(xié)程的上下文環(huán)境:

struct coctx_t
{

if defined(__i386__)
void *regs[ 8 ];

else
void *regs[ 14 ];

endif
size_t ss_size;
char *ss_sp;
};

圖片圖片

結(jié)合上圖,我們需要知道關(guān)鍵的幾點(diǎn):

函數(shù)調(diào)用棧是調(diào)用者和被調(diào)用者共同負(fù)責(zé)布置的。Caller 將其參數(shù)從右向左反向壓棧,再將調(diào)用后的返回地址壓棧,然后將執(zhí)行流程交給 Callee。

典型的編譯器會(huì)將 Callee 函數(shù)匯編成為以 push %ebp; move %ebp, %esp; sub $esp N; 這種形式開頭的匯編代碼。這幾句代碼主要目的是為了方便 Callee 利用 ebp 來訪問調(diào)用者提供的參數(shù)以及自身的局部變量(如下圖)。

當(dāng)調(diào)用過程完成清除了局部變量以后,會(huì)執(zhí)行 pop %ebp; ret,這樣指令會(huì)跳轉(zhuǎn)到 RA 也就是返回地址上面執(zhí)行。這一點(diǎn)也是實(shí)現(xiàn)協(xié)程切換的關(guān)鍵:我們只需要將指定協(xié)程的函數(shù)指針地址保存到 RA 中,當(dāng)調(diào)用完 coctx_swap 之后,會(huì)自動(dòng)跳轉(zhuǎn)到該協(xié)程的函數(shù)起始地址開始運(yùn)行。

了解了這些,我們就來看一下協(xié)程上下文環(huán)境的初始化函數(shù) coctx_make:

int coctx_make( coctx_t ctx, coctx_pfn_t pfn, const void s, const void *s1 )
{
char *sp = ctx->ss_sp + ctx->ss_size - sizeof(coctx_param_t);
sp = (char*)((unsigned long)sp & -16L);
coctx_param_t param = (coctx_param_t)sp ;
param->s1 = s;
param->s2 = s1;
memset(ctx->regs, 0, sizeof(ctx->regs));
ctx->regs[ kESP ] = (char)(sp) - sizeof(void);
ctx->regs[ kEIP ] = (char*)pfn;
return 0;
}

這段代碼應(yīng)該比較好理解,首先為函數(shù) coctx_pfn_t 預(yù)留 2 個(gè)參數(shù)的??臻g并對其到 16 字節(jié),之后將實(shí)參設(shè)置到預(yù)留的棧上空間中。最后在 ctx 結(jié)構(gòu)中填入相應(yīng)的,其中記錄 reg[kEIP] 返回地址為函數(shù)指針 pfn,記錄 reg[kESP] 為獲得的棧頂指針 sp 減去一個(gè)指針長度,這個(gè)減去的空間是為返回地址 RA 預(yù)留的。當(dāng)調(diào)用 coctx_swap 時(shí),reg[kEIP] 會(huì)被放到返回地址 RA 的位置,待 coctx_swap 執(zhí)行結(jié)束,自然會(huì)跳轉(zhuǎn)到函數(shù) pfn 處執(zhí)行。

coctx_swap(ctx1, ctx2) 在 coctx_swap.S 中實(shí)現(xiàn)。這里可以看到,該函數(shù)并沒有使用 push %ebp; move %ebp, %esp; sub $esp N; 開頭,因此??臻g分布中不會(huì)出現(xiàn) ebp 的位置。coctx_swap 函數(shù)主要分為兩段,其首先將當(dāng)前的上下文環(huán)境保存到 ctx1 結(jié)構(gòu)中:

leal 4(%esp), %eax // eax = old_esp + 4
movl 4(%esp), %esp // 將 esp 的值設(shè)為 &ctx1(即ctx1的地址)
leal 32(%esp), %esp // esp = (char*)&ctx1 + 32
pushl %eax // ctx1->regs[EAX] = %eax
pushl %ebp // ctx1->regs[EBP] = %ebp
pushl %esi // ctx1->regs[ESI] = %esi
pushl %edi // ctx1->regs[EDI] = %edi
pushl %edx // ctx1->regs[EDX] = %edx
pushl %ecx // ctx1->regs[ECX] = %ecx
pushl %ebx // ctx1->regs[EBX] = %ebx
pushl -4(%eax) // ctx1->regs[EIP] = RA, 注意:%eax-4=%old_esp

這里需要注意指令 leal 和 movl 的區(qū)別。leal 將 eax 的值設(shè)置成為 esp 的值加 4,而 movl 將 esp 的值設(shè)為 esp+4 所指向的內(nèi)存上的值,也就是參數(shù) ctx1 的地址。之后該函數(shù)將 ctx2 中記錄的上下文恢復(fù)到 CPU 寄存器中,并跳轉(zhuǎn)到其函數(shù)地址處運(yùn)行:

movl 4(%eax), %esp // 將 esp 的值設(shè)為 &ctx2(即ctx2的地址)
popl %eax // %eax = ctx1->regs[EIP],也就是 &pfn
popl %ebx // %ebx = ctx1->regs[EBP]
popl %ecx // %ecx = ctx1->regs[ECX]
popl %edx // %edx = ctx1->regs[EDX]
popl %edi // %edi = ctx1->regs[EDI]
popl %esi // %esi = ctx1->regs[ESI]
popl %ebp // %ebp = ctx1->regs[EBP]
popl %esp // %esp = ctx1->regs[ESP],即(char)(sp) - sizeof(void)
pushl %eax // RA = %eax = &pfn,注意此時(shí)esp已經(jīng)指向了新的esp
xorl %eax, %eax // reset eax
ret

上面的代碼看起來可能有些繞:

首先 line 1 將 esp 設(shè)置為參數(shù) ctx2 的地址,后續(xù)的 popl 操作均在 ctx2 的內(nèi)存空間上執(zhí)行。

line 2-9 將 ctx2->regs[] 中的內(nèi)容恢復(fù)到相應(yīng)的寄存器中。還記得在前面 coctx_make 中設(shè)置了 regs[EIP] 和 regs[ESP] 嗎?這里剛好就對應(yīng)恢復(fù)了相應(yīng)的值。

當(dāng)執(zhí)行完 line 9 之后,esp 已經(jīng)指向了 ctx2 中新的棧頂指針,由于在 coctx_make 中預(yù)留了一個(gè)指針長度的 RA 空間,line 10 剛好將新的函數(shù)指針 &pfn 設(shè)置到該 RA 上。

最后執(zhí)行 ret 指令時(shí),函數(shù)流程將跳到 pfn 處執(zhí)行。這樣,整個(gè)協(xié)程上下文的切換就完成了。

七、如何使用libco

我們首先以 libco 提供的例子 example_echosvr.cpp 來介紹應(yīng)用程序如何使用 libco 來編寫服務(wù)端程序。在 example_echosvr.cpp 的 main 函數(shù)中,主要執(zhí)行如下幾步:

創(chuàng)建 socket,監(jiān)聽在本機(jī)的 1024 端口,并設(shè)置為非阻塞;

主線程使用函數(shù) readwrite_coroutine 創(chuàng)建多個(gè)讀寫協(xié)程,調(diào)用 co_resume 啟動(dòng)協(xié)程運(yùn)行直到其掛起。這里我們忽略掉無關(guān)的多進(jìn)程 fork 的過程;

主線程繼續(xù)創(chuàng)建 socket 接收協(xié)程 accpet_co,同樣調(diào)用 co_resume 啟動(dòng)協(xié)程直到其掛起;

主線程調(diào)用函數(shù) co_eventloop 實(shí)現(xiàn)事件的監(jiān)聽和協(xié)程的循環(huán)切換;

函數(shù) readwrite_coroutine 在外層循環(huán)中將新創(chuàng)建的讀寫協(xié)程都加入到隊(duì)列 g_readwrite 中,此時(shí)這些讀寫協(xié)程都沒有具體與某個(gè) socket 連接對應(yīng),可以將隊(duì)列 g_readwrite 看成一個(gè) coroutine pool。當(dāng)加入到隊(duì)列中之后,調(diào)用函數(shù) co_yield_ct 函數(shù)讓出 CPU,此時(shí)控制權(quán)回到主線程。

主線程中的函數(shù) co_eventloop 監(jiān)聽網(wǎng)絡(luò)事件,將來自于客戶端新進(jìn)的連接交由協(xié)程 accept_co 處理,關(guān)于 co_eventloop 如何喚醒 accept_co 的細(xì)節(jié)我們將在后續(xù)介紹。accept_co 調(diào)用函數(shù) accept_routine 接收新連接,該函數(shù)的流程如下:

檢查隊(duì)列 g_readwrite 是否有空閑的讀寫 coroutine,如果沒有,調(diào)用函數(shù) poll 將該協(xié)程加入到 Epoll 管理的定時(shí)器隊(duì)列中,也就是 sleep(1000) 的作用;

調(diào)用 co_accept 來接收新連接,如果接收連接失敗,那么調(diào)用 co_poll 將服務(wù)端的 listen_fd 加入到 Epoll 中來觸發(fā)下一次連接事件;

對于成功的連接,從 g_readwrite 中取出一個(gè)讀寫協(xié)程來負(fù)責(zé)處理讀寫;

再次回到函數(shù) readwrite_coroutine 中,該函數(shù)會(huì)調(diào)用 co_poll 將新建立的連接的 fd 加入到 Epoll 監(jiān)聽中,并將控制流程返回到 main 協(xié)程;當(dāng)有讀或者寫事件發(fā)生時(shí),Epoll 會(huì)喚醒對應(yīng)的 coroutine ,繼續(xù)執(zhí)行 read 函數(shù)以及 write 函數(shù)。

上面的過程大致說明了控制流程是如何在不同的協(xié)程中切換,接下來我們介紹具體的實(shí)現(xiàn)細(xì)節(jié),即如何通過 Epoll 來管理協(xié)程,以及如何對系統(tǒng)函數(shù)進(jìn)行改造以滿足 libco 的調(diào)用。

八、通過Epoll管理和喚醒協(xié)程

Epoll監(jiān)聽FD

協(xié)程可以通過函數(shù) co_poll 來將 fd 交由 Epoll 管理,待 Epoll 的相應(yīng)的事件觸發(fā)時(shí),再切換回來執(zhí)行 read 或者 write 操作,從而實(shí)現(xiàn)由 Epoll 管理協(xié)程的功能。co_poll 函數(shù)原型如下:

int co_poll(stCoEpoll_t *ctx, struct pollfd fds[],
nfds_t nfds, int timeout_ms)

stCoEpoll_t 是為 libco 定制的 Epoll 相關(guān)數(shù)據(jù)結(jié)構(gòu),fds 是 pollfd 結(jié)構(gòu)的文件句柄,nfds 為 fds 數(shù)組的長度,最后一個(gè)參數(shù)表示定時(shí)器時(shí)間,也就是在 timeout 毫秒之后觸發(fā)處理這些文件句柄。這里可以看到,co_poll 能夠同時(shí)將多個(gè)文件句柄同時(shí)加入到 Epoll 管理中。我們先看 stCoEpoll_t 結(jié)構(gòu):

struct stCoEpoll_t
{
int iEpollFd; // Epoll 主 FD
static const int _EPOLL_SIZE = 1024 * 10; // Epoll 可以監(jiān)聽的句柄總數(shù)
struct stTimeout_t *pTimeout; // 時(shí)間輪定時(shí)器
struct stTimeoutItemLink_t *pstTimeoutList; // 已經(jīng)超時(shí)的時(shí)間
struct stTimeoutItemLink_t *pstActiveList; // 活躍的事件
co_epoll_res *result; // Epoll 返回的事件結(jié)果
};

以 stTimeout_ 開頭的數(shù)據(jù)結(jié)構(gòu)與 libco 的定時(shí)器管理有關(guān),我們在后面介紹。co_epoll_res 是對 Epoll 事件數(shù)據(jù)結(jié)構(gòu)的封裝,也就是每次觸發(fā) Epoll 事件時(shí)的返回結(jié)果,在 Unix 和 MaxOS 下,libco 將使用 Kqueue 替代 Epoll,因此這里也保留了 kevent 數(shù)據(jù)結(jié)構(gòu)。

```clike
struct co_epoll_res
{
int size;
struct epoll_event *events; // for linux epoll
struct kevent *eventlist; // for Unix or MacOs kqueue
};

co_poll 實(shí)際是對函數(shù) co_poll_inner 的封裝。我們將 co_epoll_inner 函數(shù)的結(jié)構(gòu)分為上下兩半段。在上半段中,調(diào)用 co_poll 的協(xié)程 CC 將其需要監(jiān)聽的句柄數(shù)組 fds 都加入到 Epoll 管理中,并通過函數(shù) co_yield_env 讓出 CPU;當(dāng) main 協(xié)程的事件循環(huán) co_eventloop 中觸發(fā)了 CC 對應(yīng)的監(jiān)聽事件時(shí),會(huì)恢復(fù) CC的執(zhí)行。此時(shí),CC 將開始執(zhí)行下半段,即將上半段添加的句柄 fds 從 epoll 中移除,清理殘留的數(shù)據(jù)結(jié)構(gòu),下面的流程圖簡要說明了控制流的轉(zhuǎn)移過程:

圖片圖片

有了上面的基本概念,我們來看具體的實(shí)現(xiàn)細(xì)節(jié)。co_poll 首先在內(nèi)部將傳入的文件句柄數(shù)組 fds 轉(zhuǎn)化為數(shù)據(jù)結(jié)構(gòu) stPoll_t,這一步主要是為了方便后續(xù)處理。該結(jié)構(gòu)記錄了 iEpollFd,ndfs,fds 數(shù)組,以及該協(xié)程需要執(zhí)行的函數(shù)和參數(shù)。有兩點(diǎn)需要說明的是:

1、對于每一個(gè) fd,為其申請一個(gè) stPollItem_t 來管理對應(yīng) Epoll 事件以及記錄回調(diào)參數(shù)。libco 在此做了一個(gè)小的優(yōu)化,對于長度小于 2 的 fds 數(shù)組,直接在棧上定義相應(yīng)的 stPollItem_t 數(shù)組,否則從堆中申請內(nèi)存。這也是一種比較常見的優(yōu)化,畢竟從堆中申請內(nèi)存比較耗時(shí);

2、函數(shù)指針 OnPollProcessEvent 封裝了協(xié)程的切換過程。當(dāng)傳入指定的 stPollItem_t 結(jié)構(gòu)時(shí),即可喚醒對應(yīng)于該結(jié)構(gòu)的 coroutine,將控制權(quán)交由其執(zhí)行;

co_poll 的第二步,也是最關(guān)鍵的一步,就是將 fd 數(shù)組全部加入到 Epoll 中進(jìn)行監(jiān)聽。協(xié)程 CC 會(huì)將每一個(gè) epoll_event 的 data.ptr 域設(shè)置為對應(yīng)的 stPollItem_t 結(jié)構(gòu)。這樣當(dāng)事件觸發(fā)時(shí),可以直接從對應(yīng)的 ptr中取出 stPollItem_t 結(jié)構(gòu),然后喚醒指定協(xié)程。

如果本次操作提供了 Timeout 參數(shù),co_poll 還會(huì)將協(xié)程 CC 本次操作對應(yīng)的 stPoll_t 加入到定時(shí)器隊(duì)列中。這表明在 Timeout 定時(shí)觸發(fā)之后,也會(huì)喚醒協(xié)程 CC 的執(zhí)行。當(dāng)整個(gè)上半段都完成后,co_poll 立即調(diào)用 co_yield_env 讓出 CPU,執(zhí)行流程跳轉(zhuǎn)回到 main 協(xié)程中。

從上面的流程圖中也可以看出,當(dāng)執(zhí)行流程再次跳回時(shí),表明協(xié)程 CC 添加的讀寫等監(jiān)聽事件已經(jīng)觸發(fā),即可以執(zhí)行相應(yīng)的讀寫操作了。此時(shí) CC 首先將其在上半段中添加的監(jiān)聽事件從 Epoll 中刪除,清理殘留的數(shù)據(jù)結(jié)構(gòu),然后調(diào)用讀寫邏輯。

九、定時(shí)器實(shí)現(xiàn)

協(xié)程 CC 在將一組 fds 加入 Epoll 的同時(shí),還能為其設(shè)置一個(gè)超時(shí)時(shí)間。在超時(shí)時(shí)間到期時(shí),也會(huì)再次喚醒 CC 來執(zhí)行。libco 使用 Timing-Wheel 來實(shí)現(xiàn)定時(shí)器。關(guān)于 Timing-Wheel 算法,可以參考,其優(yōu)勢是 O(1) 的插入和刪除復(fù)雜度,缺點(diǎn)是只有有限的長度,在某些場合下不能滿足需求。

回過去看 stCoEpoll_t 結(jié)構(gòu),其中 pTimeout 代表時(shí)間輪,通過函數(shù) AllocateTimeout 初始化為一個(gè)固定大?。?0 1000)的數(shù)組。根據(jù) Timing-Wheel 的特性可知,libco 只支持最大 60s 的定時(shí)事件。而實(shí)際上,在添加定時(shí)器時(shí),libco 要求定時(shí)時(shí)間不超過 40s。成員 pstTimeoutList 記錄在 co_eventloop 中發(fā)生超時(shí)的事件,而 pstActiveList 記錄當(dāng)前活躍的事件,包括超時(shí)事件。這兩個(gè)結(jié)構(gòu)都將在 co_eventloop 中進(jìn)行處理。

下面我們簡要分析一下加入定時(shí)器的實(shí)現(xiàn):

int AddTimeout( stTimeout_t apTimeout, stTimeoutItem_t apItem,
unsigned long long allNow )
{
if( apTimeout->ullStart == 0 ) // 初始化時(shí)間輪的基準(zhǔn)時(shí)間
{
apTimeout->ullStart = allNow;
apTimeout->llStartIdx = 0; // 當(dāng)前時(shí)間輪指針指向數(shù)組0
}
// 1. 當(dāng)前時(shí)間不可能小于時(shí)間輪的基準(zhǔn)時(shí)間
// 2. 加入的定時(shí)器的超時(shí)時(shí)間不能小于當(dāng)前時(shí)間
if( allNow < apTimeout->ullStart || apItem->ullExpireTime < allNow )
{
return __LINE__;
}
int diff = apItem->ullExpireTime - apTimeout->ullStart;
if( diff >= apTimeout->iItemSize ) // 添加的事件不能超過時(shí)間輪的大小
{
return __LINE__;
}
// 插入到時(shí)間輪盤的指定位置
AddTail( apTimeout->pItems +
(apTimeout->llStartIdx + diff ) % apTimeout->iItemSize, apItem );
return 0;
}

定時(shí)器的超時(shí)檢查在函數(shù) co_eventloop 中執(zhí)行。

十、Epoll事件循環(huán)

main 協(xié)程通過調(diào)用函數(shù) co_eventloop 來監(jiān)聽 Epoll 事件,并在相應(yīng)的事件觸發(fā)時(shí)切換到指定的協(xié)程執(zhí)行。有關(guān) co_eventloop 與 應(yīng)用協(xié)程的交互過程在上一節(jié)的流程圖中已經(jīng)比較清楚了,下面我們主要介紹一下 co_eventloop 函數(shù)的實(shí)現(xiàn):

上文中也提到,通過 epoll_wait 返回的事件都保存在 stCoEpoll_t 結(jié)構(gòu)的 co_epoll_res 中。因此 co_eventloop 首先為 co_epoll_res 申請空間,之后通過一個(gè)無限循環(huán)來監(jiān)聽所有 coroutine 添加的所有事件:

for(;;)
{
int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
...
}

對于每一個(gè)觸發(fā)的事件,co_eventloop 首先通過指針域 data.ptr 取出保存的 stPollItem_t 結(jié)構(gòu),并將其添加到 pstActiveList 列表中;之后從定時(shí)器輪盤中取出所有已經(jīng)超時(shí)的事件,也將其全部添加到 pstActiveList 中,pstActiveList 中的所有事件都作為活躍事件處理。

對于每一個(gè)活躍事件,co_eventloop 將通過調(diào)用對應(yīng)的 pfnProcess 也就是上圖中的OnPollProcessEvent 函數(shù)來切換到該事件對應(yīng)的 coroutine,將流程跳轉(zhuǎn)到該 coroutine 處執(zhí)行。

最后 co_eventloop 在調(diào)用時(shí)也提供一個(gè)額外的參數(shù)來供調(diào)用者傳入一個(gè)函數(shù)指針 pfn。該函數(shù)將會(huì)在每次循環(huán)完成之后執(zhí)行;當(dāng)該函數(shù)返回 -1 時(shí),將會(huì)終止整個(gè)事件循環(huán)。用戶可以利用該函數(shù)來控制 main 協(xié)程的終止或者完成一些統(tǒng)計(jì)需求。

責(zé)任編輯:武曉燕 來源: 深度Linux
相關(guān)推薦

2025-01-26 00:00:15

PHP協(xié)程控制權(quán)

2024-10-18 10:27:50

PHP框架webma

2024-11-25 07:00:00

2021-09-16 09:59:13

PythonJavaScript代碼

2020-07-07 09:19:28

Android 協(xié)程開發(fā)

2019-03-01 08:57:47

iOScoobjc協(xié)程

2024-09-25 08:28:45

2023-11-01 11:27:10

Linux協(xié)程

2020-04-23 09:33:32

Android 協(xié)程開發(fā)

2023-11-17 11:36:59

協(xié)程纖程操作系統(tǒng)

2025-02-08 09:13:40

2023-10-24 19:37:34

協(xié)程Java

2021-12-09 06:41:56

Python協(xié)程多并發(fā)

2021-06-15 07:10:14

JavaScript異步編程

2024-12-04 15:49:29

2022-09-06 20:30:48

協(xié)程Context主線程

2016-10-28 17:39:47

phpgolangcoroutine

2017-05-02 11:38:00

PHP協(xié)程實(shí)現(xiàn)過程

2020-11-29 17:03:08

進(jìn)程線程協(xié)程

2023-08-08 07:18:17

協(xié)程管道函數(shù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號