自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Reactor網(wǎng)絡(luò)模型核心思想探秘

網(wǎng)絡(luò) 網(wǎng)絡(luò)管理
reactor網(wǎng)絡(luò)模型是網(wǎng)絡(luò)編程中非常重要的一種編程思想,本文通過(guò)一個(gè)簡(jiǎn)短的示例試圖講明白reactor網(wǎng)絡(luò)編程模型的核心思想。當(dāng)然,本文的實(shí)現(xiàn)還不是很完善,比如在調(diào)用回調(diào)函數(shù)的時(shí)候還是傳入了fd,我們是否可以不需要這個(gè)參數(shù),徹徹底底地和IO進(jìn)行分離呢?

在網(wǎng)絡(luò)編程系列文章中,我們實(shí)現(xiàn)了一個(gè)基于epoll的網(wǎng)絡(luò)框架,并在此基礎(chǔ)上開(kāi)發(fā)了一個(gè)簡(jiǎn)單的HTTP服務(wù),在那個(gè)系列文章中我們使用了讀、寫(xiě)兩個(gè)buffer將網(wǎng)絡(luò)IO和數(shù)據(jù)的讀寫(xiě)進(jìn)行了分離,它們之間的扭轉(zhuǎn)完全通過(guò)epoll事件通知,如果你認(rèn)真研究過(guò)源碼,會(huì)發(fā)現(xiàn),所有針對(duì)網(wǎng)絡(luò)IO的操作都是由事件觸發(fā)的。這種基于事件觸發(fā)的網(wǎng)絡(luò)模型通常我們叫做Reactor網(wǎng)絡(luò)模型。

由于網(wǎng)絡(luò)編程系列文章中代碼實(shí)現(xiàn)相對(duì)比較復(fù)雜,不太好講清楚。所以,我決定單獨(dú)出幾篇文章對(duì)那個(gè)系列文章進(jìn)行一些拓展,主要涉及到網(wǎng)絡(luò)編程思想和性能測(cè)試。

這篇文章我們通過(guò)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的網(wǎng)絡(luò)框架,來(lái)說(shuō)明Reactor網(wǎng)絡(luò)模型實(shí)現(xiàn)的一般思路,其本質(zhì)思想和x-net項(xiàng)目基本上是一樣的,只是在代碼上做了非常大的精簡(jiǎn),理解起來(lái)會(huì)輕松很多。

首先,我們來(lái)看一段代碼

#include <sys/socket.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/epoll.h>




int main() {
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);


    struct sockaddr_in servaddr;
    memset(&servaddr, 0, sizeof(struct sockaddr_in));


    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(2048);


    if (-1 == bind(sockfd, (struct sockaddr*)&servaddr, sizeof(struct sockaddr))) {
        perror("bind fail");
        return -1;
    }


    listen(sockfd, 10);


    printf("sock-fd:%d\n", sockfd);


    int epfd = epoll_create(1);


    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = sockfd;


    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);


    struct epoll_event events[1024] = {0};


    while(1) {
        int nready = epoll_wait(epfd, events, 1024, -1);


        int i = 0;
        for (i = 0; i < nready; i++) {
            int connfd = events[i].data.fd;
            if (events[i].events & EPOLLIN && sockfd == connfd) {
                struct sockaddr_in clientaddr;
                socklen_t len = sizeof(clientaddr);


                int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);


                ev.events = EPOLLIN | EPOLLET;
                ev.data.fd = clientfd;
                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);


                printf("clientfd: %d\n", clientfd);
            } else if (events[i].events & EPOLLIN) {


                char buffer[10] = {0};


                int count = recv(connfd, buffer, 10, 0);
                if (count == 0) {
                    printf("discounnect\n");


                    epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);
                    close(i);


                    continue;
                }


                send(connfd, buffer, count, 0);
                printf("clientfd: %d, count: %d, buffer: %s\n", connfd, count, buffer);
            }
        }
    }
}

熟悉epoll的人應(yīng)該對(duì)上面的代碼比較熟悉,這段代碼的核心在下面的while主循環(huán),如果是當(dāng)前Server的Socket說(shuō)明有新的連接進(jìn)來(lái),調(diào)用accept拿到客戶(hù)端的fd,將其放在epoll的events中,并注冊(cè)EPOLLIN事件,一般我們理解為可讀事件。

如果不是sockfd,說(shuō)明是客戶(hù)端的fd可讀,我們將數(shù)據(jù)讀出來(lái)再原樣發(fā)送回去。

上面的代碼存在的主要問(wèn)題在于,套接字的accept和讀寫(xiě)操作我們是直接寫(xiě)在主循環(huán)里了,這將會(huì)讓代碼的邏輯變得難以琢磨。

對(duì)于一個(gè)套接字,最直接的操作就是讀和寫(xiě)。所以,最容易想到的就是將讀和寫(xiě)分離開(kāi)。為了實(shí)現(xiàn)讀和寫(xiě)分離我們封裝兩個(gè)回調(diào)函數(shù),如下:

int recv_callback(int fd, char *buffer, int size);
int send_callback(int fd, char *buffer, int size);

你可以想一下,這兩個(gè)函數(shù)應(yīng)該怎么寫(xiě)?下面是根據(jù)原有的邏輯將讀和寫(xiě)封裝在了recv_callback和send_callback兩個(gè)函數(shù)中,代碼如下:

int recv_callback(int fd, char *buffer, int size) {
    int count = recv(fd, buffer, size, 0);


    send_callback(fd, buffer, count, 0);


    return count;
}
int send_callback(int fd, char *buffer, int size) {
    int count = send(fd, buffer, size, 0);


    return count;
}

然后,在主循環(huán)中就可以這樣使用

int main() {


    ...


    while(1) {
        int nready = epoll_wait(epfd, events, 1024, -1);


        int i = 0;
        for (i = 0; i < nready; i++) {
            int connfd = events[i].data.fd;


            if (events[i].events & EPOLLIN && sockfd == connfd) {
                ...
            } else if (events[i].events & EPOLLIN) {
                char buffer[10] = {0};


                int count = recv_callback(fd, buffer, 10);
                if (count == 0) {
                    printf("disconnect\\n");
                    epoll_ctl(epfd, EPOLL_CTL_DEL, connfd, NULL);
                    clise(i);
                    continue;
                }
            }
        }
    } 
}

雖然我們將讀和寫(xiě)拆成了兩個(gè)方法,但讀和寫(xiě)并沒(méi)有分離開(kāi),我們?cè)趓ecv_callback中每次收到數(shù)據(jù)之后調(diào)用send_callback將數(shù)據(jù)原樣又發(fā)回去,在這里我們希望recv_callback和send_callback各管各的互不干擾,比如像下面這樣

int recv_callback(int fd, char *buffer, int size) {
    int count = recv(fd, buffer, size, 0);


    return count;
}
int send_callback(int fd, char *buffer, int size) {
    int count = send(fd, buffer, size, 0);


    return count;
}

但這樣明顯也是有問(wèn)題的,在recv_callback中讀完了之后,如何發(fā)送數(shù)據(jù)呢?這里,我們可以想一下,圍繞著一個(gè)套接字都有哪些部分呢?是不是可以設(shè)計(jì)出一個(gè)類(lèi)似字典的結(jié)構(gòu),這個(gè)字典的key對(duì)應(yīng)的就是套接字,而value對(duì)應(yīng)的就是圍繞套接字相關(guān)的各個(gè)組件。

我們將recv_callback和send_callback放在了一個(gè)conn_channel結(jié)構(gòu)體中,并且設(shè)計(jì)了兩個(gè)buffer,一個(gè)用來(lái)讀數(shù)據(jù),另一個(gè)用來(lái)發(fā)數(shù)據(jù),conn_channel便是這個(gè)字典對(duì)應(yīng)的value,代碼如下:

#define BUF_LEN   1024


typedef int(*callback)(int fd);


struct conn_channel {
    int fd;


    callback recv_call;
    callback send_call;


    char wbuf[BUF_LEN];
    int wlen;
    char rbuf[BUF_LEN];
    int rlen;
};

其中,fd表示的是當(dāng)前客戶(hù)端套接字。然后我們定義一個(gè)數(shù)組來(lái)表示套接字到套接字value的映射關(guān)系,代碼如下:

struct conn_channel conn_map[1024] = {0};

這樣,我們?cè)谥餮h(huán)中,就可以像下面這樣,往conn_map中添加對(duì)應(yīng)的套接字了,代碼如下:

int main() {
    ...


    while(1) {
        int nready = epoll_wait(epfd, events, 1024, -1);


        int i = 0;
        for (i = 0; i < nready; i++) {
            int connfd = events[i].data.fd;


            if (events[i].events & EPOLLIN && sockfd == connfd) {
                struct sockaddr_in clientaddr;
                socklen_t len = sizeof(clientaddr);


                int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);


                ev.events = EPOLLIN;
                ev.data.fd = clientaddr;


                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);


                conn_map[clientfd].fd = clientfd;
                conn_map[clientfd].rlen = 0;
                conn_map[clientfd].wlen = 0;
                conn_map[clientfd].recv_call = recv_callback;
                conn_map[clientfd].send_call = send_callback;
                memset(conn_map[clientfd].rbuf, 0, BUF_LEN);
                memset(conn_map[clientfd].wbuf, 0, BUF_LEN);


                printf("clientfd:%d\\n", clientfd);
            } else if (events[i].events & EPOLLIN) {
                ...
            }
        }
    } 
}

在上面的代碼中,每當(dāng)accept出來(lái)一個(gè)客戶(hù)端的套接字,我們就將它放到conn_map中,設(shè)置好讀寫(xiě)buffer和回調(diào)函數(shù)。但如果你細(xì)心點(diǎn)會(huì)發(fā)現(xiàn),recv_callback、send_callback和conn_channel中的回調(diào)函數(shù)簽名是不一樣的。所以,我們要調(diào)整一下這兩個(gè)函數(shù)的實(shí)現(xiàn),調(diào)整之后代碼如下:

int recv_callback(int fd) {
    int count = recv(fd, conn_map[fd].rbuf + conn_map[fd].rlen, BUF_LEN - conn_map[fd].rlen, 0);
    // do something


    memcpy(conn_map[fd].wbuf, conn_map[fd].rbuf, conn_map[fd].rlen);
    conn_map[fd].wlen = conn_map[fd].rlen;
    conn_map[fd].rlen = 0;


    return count;
}
int send_callback(int fd) {
    int count = send(fd, conn_map[fd].wbuffer, conn_map[fd].wlen, 0);


    return count;
}

因?yàn)橛辛薱onn_map,所以原來(lái)傳進(jìn)來(lái)的buffer和size都不需要了,在conn_channel中已經(jīng)有記錄了。所以只需要一個(gè)fd參數(shù)就可以了。我們?cè)趓ecv_callback中模擬了回復(fù)消息,強(qiáng)行將讀到的數(shù)據(jù)寫(xiě)到了wbuffer中。這里補(bǔ)充一下,conn_channel中的rbuffer是用來(lái)從套接字中讀數(shù)據(jù)的,wbuffer表示的是將要發(fā)送到套接字的數(shù)據(jù)。

你可以試著把上面的代碼跑起來(lái),然后你會(huì)發(fā)現(xiàn),并沒(méi)有按我們的預(yù)期執(zhí)行,send_callback中的send似乎沒(méi)有起作用。這是因?yàn)槲覀冎皇菍?shù)據(jù)從rbuffer寫(xiě)到了wbuffer中,而send_callback并沒(méi)有機(jī)會(huì)調(diào)用。你可以想一想send_callback放在哪里調(diào)用比較合適呢?

在上面的例子中,顯然放在主循環(huán)中執(zhí)行比較合適,在epoll中,EPOLLOUT表示可寫(xiě)事件,我們可以利用這個(gè)事件。在recv_callback執(zhí)行完之后我們注冊(cè)一個(gè)EPOLLOUT事件,然后在主循環(huán)中我們?nèi)ケO(jiān)聽(tīng)EPOLLOUT事件。這樣,當(dāng)recv_callback將rbuffer的數(shù)據(jù)復(fù)制到wbuffer中之后,send_callback通過(guò)EPOLLOUT事件就可以在主循環(huán)中得以執(zhí)行。

為了實(shí)現(xiàn)上面的效果我們要修改兩個(gè)地方,一個(gè)是recv_callback中我們要注冊(cè)一下EPOLLOUT事件,代碼如下:

int recv_callback(int fd) {
    int count = recv(fd, conn_map[fd].rbuf + conn_map[fd].rlen, BUF_LEN - conn_map[fd].rlen, 0);
    // do something


    memcpy(conn_map[fd].wbuf, conn_map[fd].rbuf, conn_map[fd].rlen);
    conn_map[fd].wlen = conn_map[fd].rlen;
    conn_map[fd].rlen = 0;


    struct epoll_event ev;
    ev.events = EPOLLOUT;
    ev.data.fd = fd;


    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);


    return count;
}

我們?cè)趓buf拷貝到wbuf之后,給當(dāng)前fd注冊(cè)了EPOLLOUT事件,然后我們?cè)谥餮h(huán)中要處理EPOLLOUT事件,代碼如下:

int main() {
    ...


    while(1) {
        int nready = epoll_wait(epfd, events, 1024, -1);


        int i = 0;
        for (i = 0; i < nready; i++) {
            int connfd = events[i].data.fd;


            if (events[i].events & EPOLLIN && sockfd == connfd) {
                struct sockaddr_in clientaddr;
                socklen_t len = sizeof(clientaddr);


                int clientfd = accept(sockfd, (struct sockaddr*)&clientaddr, &len);


                ev.events = EPOLLIN;
                ev.data.fd = clientaddr;


                epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);


                conn_map[clientfd].fd = clientfd;
                conn_map[clientfd].rlen = 0;
                conn_map[clientfd].wlen = 0;
                conn_map[clientfd].recv_call = recv_callback;
                conn_map[clientfd].send_call = send_callback;
                memset(conn_map[clientfd].rbuf, 0, BUF_LEN);
                memset(conn_map[clientfd].wbuf, 0, BUF_LEN);


                printf("clientfd:%d\\n", clientfd);
            } else if (events[i].events & EPOLLIN) {
                int count = conn_map[connfd].recv_call(connfd);
                printf("recv-count:%d\\n", count);
            } else if (events[i].events & EPOLLOUT) { // 處理EPOLLOUT事件
                int count  = conn_map[connfd].send_call(connfd);
                printf("send-count:%d\\n", count);
            }
        }
    } 
}

要注意的是,epfd是在main函數(shù)中定義的,而我們?cè)趓ecv_callback中有使用,所以我們可以暫時(shí)將epfd聲明成一個(gè)全局變量,放在外面。

上面的代碼有一個(gè)問(wèn)題,EPOLLOUT事件觸發(fā)之后你會(huì)發(fā)現(xiàn)再向當(dāng)前fd發(fā)送數(shù)據(jù),就沒(méi)響應(yīng)了,這是因?yàn)閑poll事件被我們修改了,為了解決這個(gè)問(wèn)題我們可以在send_callback執(zhí)行完之后再設(shè)置回去,如下:

int send_callback(int fd) {
    int count = send(fd, conn_map[fd].wbuffer, conn_map[fd].wlen, 0);


    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = fd;


    epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);


    return count;
}

這樣,我們就將IO操作給屏蔽了,在主循環(huán)中我們只關(guān)注事件,不同的事件調(diào)用不同的回調(diào)函數(shù)。在對(duì)應(yīng)的回調(diào)函數(shù)中只做自己該做的,做完之后注冊(cè)事件通知其它的回調(diào)函數(shù)。

但是,上面的代碼還不夠優(yōu)雅,對(duì)于accept和讀事件來(lái)講在epoll中都是EPOLLIN事件,這兩個(gè)是不是可以合并在一起處理呢?答案是可以的,首先,我們要將accept相關(guān)的邏輯給拆出來(lái),拆解之后的代碼如下:

int accept_callback(int fd) {
    struct sockaddr_in clientaddr;
    socklen_t len = sizeof(clientaddr);


    int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);


    ev.events = EPOLLIN;
    ev.data.fd = clientaddr;


    epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &ev);


    conn_map[clientfd].fd = clientfd;
    conn_map[clientfd].rlen = 0;
    conn_map[clientfd].wlen = 0;
    conn_map[clientfd].recv_call = recv_callback;
    conn_map[clientfd].send_call = send_callback;
    memset(conn_map[clientfd].rbuf, 0, BUF_LEN);
    memset(conn_map[clientfd].wbuf, 0, BUF_LEN);


    return clientfd;
}

我們發(fā)現(xiàn),accept_callback和recv_callback以及send_callback的簽名是一樣的,這樣我們可以在conn_channel用一個(gè)union,將accept_callback也放到conn_channel中來(lái)。如下:

struct conn_channel {
    int fd;


    union {
        callback accept_call;
        callback recv_call;
    } call_t;
    callback send_call;


    char wbuf[BUF_LEN];
    int wlen;
    char rbuf[BUF_LEN];
    int rlen;
};

在主循環(huán)中,我們就可以先給sockfd注冊(cè)好accept回調(diào)函數(shù),然后我們只需要在主循環(huán)中保留兩個(gè)邏輯就可以了,代碼如下:

int main() {
    int sockfd = create_serv(9000);
    if (sockfd == -1) {
        perror("create-server-fail");
        return -1;
    }


    make_nonblocking(sockfd);


    epfd = epoll_create1(1);


    struct epoll_event ev;
    ev.events = EPOLLIN;
    ev.data.fd = sockfd;


    epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);


    struct epoll_event events[1024] = {0}; 


    conn_map[sockfd].rlen = 0;
    conn_map[sockfd].wlen = 0;
    conn_map[sockfd].fd = sockfd;
    conn_map[sockfd].call_t.accept_call = accept_callback;
    conn_map[sodkfd].send_call = send_callback;
    memset(conn_map[sockfd].rbuf, 0, BUF_LEN);
    memset(conn_map[sockfd].wbuf, 0, BUF_LEN);


    while(1) {
        int nready = epoll_wait(epfd, events, 1024, -1);


        int i = 0;
        for (i = 0; i < nready; i++) {
            int connfd = events[i].data.fd;
            if (events[i].events & EPOLLIN) {
                int count = conn_map[connfd].call_t.recv_call(connfd);
                printf("recv-count:%d\\n", count);
            } else if (events[i].events & EPOLLOUT) {
                int count  = conn_map[connfd].send_call(connfd);
                printf("send-count:%d\\n", count);
            }
        }
    } 
}

你可以想一下,我們注冊(cè)的是call_t.accept_call,但在調(diào)用的時(shí)候確是call_t.recv_call,為什么這樣可行?

我們?cè)诰W(wǎng)絡(luò)編程系列文章中,單獨(dú)為accept抽象出了一個(gè)對(duì)象,你可以對(duì)比一下這兩種實(shí)現(xiàn)方式,看看它們有什么區(qū)別?在系列文件中我們?yōu)槭裁匆獑为?dú)抽象出一個(gè)accepter對(duì)象呢?

可以看到,最后主循環(huán)中的邏輯,只有兩個(gè)分支,這兩個(gè)分支代表了兩種事件,這種通過(guò)事件驅(qū)動(dòng)的網(wǎng)絡(luò)模型便是Reactor網(wǎng)絡(luò)模型。本文為了容易理解,將代碼進(jìn)行了精簡(jiǎn)。在實(shí)際的工程中我們還要考慮諸多情況。比如,上面的代碼只支持epoll,我們是不是可以將事件驅(qū)動(dòng)相關(guān)的代碼抽象成單獨(dú)的組件,讓其可以支持其它的事件模型。

本文雖然代碼簡(jiǎn)單,但Reactor網(wǎng)絡(luò)模型的實(shí)現(xiàn)基本上都逃脫不了這個(gè)套路,只是在此基礎(chǔ)上可能會(huì)將各個(gè)部分進(jìn)行單獨(dú)的封裝,比如我們?cè)诰W(wǎng)絡(luò)編程系列文章中就將channel和map進(jìn)行了抽象,讓它能適配各種場(chǎng)景。

總結(jié)

reactor網(wǎng)絡(luò)模型是網(wǎng)絡(luò)編程中非常重要的一種編程思想,本文通過(guò)一個(gè)簡(jiǎn)短的示例試圖講明白reactor網(wǎng)絡(luò)編程模型的核心思想。當(dāng)然,本文的實(shí)現(xiàn)還不是很完善,比如在調(diào)用回調(diào)函數(shù)的時(shí)候還是傳入了fd,我們是否可以不需要這個(gè)參數(shù),徹徹底底地和IO進(jìn)行分離呢?

責(zé)任編輯:武曉燕 來(lái)源: 程序員班吉
相關(guān)推薦

2022-05-12 09:00:50

動(dòng)態(tài)規(guī)劃算法項(xiàng)目

2015-08-26 09:18:16

云原生云原生機(jī)制云原生框架

2022-05-09 09:03:04

SQL數(shù)據(jù)流數(shù)據(jù)

2020-12-01 07:08:23

Linux網(wǎng)絡(luò)I

2024-05-31 08:10:58

Netty線(xiàn)程模型多路復(fù)用模型

2025-03-21 00:00:05

Reactor設(shè)計(jì)模式I/O 機(jī)制

2023-12-06 09:33:54

Reactor網(wǎng)絡(luò)

2010-08-18 10:13:55

IntentAndroid

2010-03-12 17:09:18

2010-01-27 17:38:58

Windows Emb

2024-04-18 09:34:28

Reactor項(xiàng)目異步編程

2022-01-04 11:11:32

Redis單線(xiàn)程Reactor

2023-11-30 11:39:52

Rust生態(tài)框架

2010-08-27 10:41:41

iPhone核心應(yīng)用程序

2023-02-01 18:31:03

陳峰 數(shù)倉(cāng)寶貝庫(kù)

2018-07-20 14:30:15

2024-02-27 22:31:00

Feign動(dòng)態(tài)代理核心

2022-09-29 15:39:10

服務(wù)器NettyReactor

2010-01-06 15:34:53

軟交換體系

2024-08-16 21:30:00

IO網(wǎng)絡(luò)網(wǎng)絡(luò)通信
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)