一篇帶你了解io_uring和Node.js
前言:io_uring是大神Jens Axboe開發(fā)的異步IO框架,在Linux內(nèi)核5.1引入。本文介紹什么是異步框架和io_uring的一些基礎(chǔ)內(nèi)容,最后介紹Node.js(Libuv)中,之前有人提但至今還沒(méi)有合并的一個(gè)關(guān)于io_uring的pr。
1 io_uring介紹
在io_uring之前,Linux沒(méi)有成熟的異步IO能力,什么是異步IO呢?回想我們讀取資源的過(guò)程,我們可以以阻塞或非阻塞的模式調(diào)用read、readv,也可以通過(guò)epoll監(jiān)聽(tīng)文件描述符和事件的方式,在回調(diào)里調(diào)用read系列函數(shù)進(jìn)行讀取,這些API有個(gè)共同的地方是,不管是主動(dòng)探還是被動(dòng)探測(cè)資源是否可讀,當(dāng)可讀的時(shí)候,都需要進(jìn)程自己去執(zhí)行讀操作。而io_uring強(qiáng)大的地方是,進(jìn)程不需要再自己主動(dòng)執(zhí)行讀操作,而是內(nèi)核讀完后通知進(jìn)程,相比epoll,io_uring又進(jìn)了一步,類似的能力是windows的IOCP。
2 io_uring基本使用
2.1 初始化
io_uring和epoll一樣,API不多,但是io_uring比epoll復(fù)雜得多。我們首先需要調(diào)用io_uring_setup初始化io_uring,拿到一個(gè)fd。
- int ring_fd;
- unsigned *sring_tail, *sring_mask, *sring_array, *cring_head, *cring_tail, *cring_mask;
- struct io_uring_sqe *sqes;
- struct io_uring_cqe *cqes;
- char buff[BLOCK_SZ];
- off_t offset;
- struct io_uring_params p;
- void *sq_ptr, *cq_ptr;
- memset(&p, 0, sizeof(p));
- // 拿到io_uring對(duì)應(yīng)的fd
- int ring_fd = io_uring_setup(QUEUE_DEPTH, &p);
- int sring_sz = p.sq_off.array + p.sq_entries * sizeof(unsigned);
- int cring_sz = p.cq_off.cqes + p.cq_entries * sizeof(struct io_uring_cqe);
- // 映射ring_fd到mmap返回的地址,我們可以以操作返回地址的方式操作ring_fd,達(dá)到用戶和內(nèi)核共享數(shù)據(jù)的目的
- cq_ptr = sq_ptr = mmap(0, sring_sz, PROT_READ | PROT_WRITE,
- MAP_SHARED | MAP_POPULATE,
- ring_fd, IORING_OFF_SQ_RING);
- sqes = mmap(0, p.sq_entries * sizeof(struct io_uring_sqe),
- PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE,
- ring_fd, IORING_OFF_SQES);
- // 保存任務(wù)隊(duì)列和完成隊(duì)列的地址,后續(xù)提交任務(wù)和獲取完成任務(wù)節(jié)點(diǎn)時(shí)需要用
- sring_tail = sq_ptr + p.sq_off.tail;
- sring_mask = sq_ptr + p.sq_off.ring_mask;
- sring_array = sq_ptr + p.sq_off.array;
- cring_head = cq_ptr + p.cq_off.head;
- cring_tail = cq_ptr + p.cq_off.tail;
- cring_mask = cq_ptr + p.cq_off.ring_mask;
- cqes = cq_ptr + p.cq_off.cqes;
io_uring不僅實(shí)現(xiàn)非常復(fù)雜,就連使用也非常復(fù)雜,但是目前只需要大致了解原理就好了。上面的代碼主要目的有以下幾個(gè)。
1 出生后io_uring并拿到一個(gè)io_uring實(shí)例對(duì)應(yīng)的fd。
2 通過(guò)mmap映射io_uring對(duì)應(yīng)的fd到一個(gè)內(nèi)存地址,后續(xù)我們就可以通過(guò)操作內(nèi)存地址的方式和內(nèi)核通信。
3 保存任務(wù)隊(duì)列和完成隊(duì)列的地址信息,后續(xù)需要用到。
2.2 提交任務(wù)
我們看到io_uring底層維護(hù)了任務(wù)隊(duì)列(sq)和完成隊(duì)列兩個(gè)隊(duì)列(cq)。對(duì)應(yīng)的節(jié)點(diǎn)叫sqe和cqe。當(dāng)我們需要操作一個(gè)資源的時(shí)候,就可以獲取一個(gè)seq,并且填充字段,然后提交給內(nèi)核,我們看一下sqe的核心字段。
- struct io_uring_sqe {
- __u8 opcode; /* 操作類型,比如讀、寫 */
- __s32 fd; /* 資源對(duì)應(yīng)的fd */
- __u64 off; /* 資源的偏移(操作的起點(diǎn)) */
- __u64 addr; /* 保存數(shù)據(jù)的內(nèi)存首地址 */
- __u32 len; /* 數(shù)據(jù)長(zhǎng)度 */
- __u64 user_data; /* 用戶定義的字段,通常用于關(guān)聯(lián)請(qǐng)求和響應(yīng) */
- __u8 flags; /* 標(biāo)記 */
- ...
- };
io_uring_sqe的核心字段都比較好理解,構(gòu)造了一個(gè)請(qǐng)求后,就插入到內(nèi)核的請(qǐng)求任務(wù)隊(duì)列。接著調(diào)用io_uring_enter通知內(nèi)核,有需要處理的任務(wù),我們可以在調(diào)用io_uring_enter的時(shí)候設(shè)置等待多少個(gè)請(qǐng)求完成后再返回。另外內(nèi)核處理poll的模式,這時(shí)候內(nèi)核會(huì)開啟內(nèi)核線程去檢測(cè)任務(wù)是否完成,不需要進(jìn)程調(diào)用io_uring_enter。下面是我們發(fā)送一個(gè)讀取請(qǐng)求的邏輯。
- unsigned index, tail;
- tail = *sring_tail;
- // 拿到請(qǐng)求隊(duì)列的一個(gè)空閑位置,是一個(gè)環(huán),需要做回環(huán)處理
- index = tail & *sring_mask;
- struct io_uring_sqe *sqe = &sqes[index];
- // 初始化請(qǐng)求結(jié)構(gòu)體
- sqe->opcode = op;
- // 讀取的fd
- sqe->fd = fd;
- // 讀取的數(shù)據(jù)保存到buff
- // 可以通過(guò)關(guān)聯(lián)buff,等到響應(yīng)的時(shí)候能找到對(duì)應(yīng)的請(qǐng)求上下文
- sqe->addr = (unsigned long) buff;
- sqe->user_data = (unsigned long long) buff;
- memset(buff, 0, sizeof(buff));
- sqe->len = BLOCK_SZ;
- sqe->off = offset;
- // 插入請(qǐng)求隊(duì)列
- sring_array[index] = index;
- // 更新索引
- tail++;
- // 通知內(nèi)核有任務(wù)需要處理,并等待有一個(gè)任務(wù)完成后再返回
- io_uring_smp_store_release(sring_tail, tail);
- int ret = io_uring_enter(ring_fd, 1,1, IORING_ENTER_GETEVENTS);
2.3 任務(wù)完成
當(dāng)任務(wù)完成的時(shí)候,io_uring_enter就會(huì)返回。但是這里有個(gè)問(wèn)題,請(qǐng)求任務(wù)和響應(yīng)不是對(duì)應(yīng)的,內(nèi)核不保證任務(wù)完成的順序,內(nèi)核只是告訴我們哪些任務(wù)完成了,我們可以通過(guò)user_data關(guān)聯(lián)請(qǐng)求和響應(yīng),類似rpc通信里的seq一樣。user_data字段在請(qǐng)求里設(shè)置,響應(yīng)里會(huì)返回,從而請(qǐng)求方知道這個(gè)響應(yīng)對(duì)應(yīng)的是哪個(gè)請(qǐng)求。響應(yīng)對(duì)應(yīng)的結(jié)構(gòu)體比較簡(jiǎn)單。
- struct io_uring_cqe {
- /* 用戶定義字段,通常用于關(guān)聯(lián)請(qǐng)求和響應(yīng) */
- __u64 user_data;
- /* 系統(tǒng)調(diào)用的返回碼,比如read */
- __s32 res;
- // 暫時(shí)沒(méi)用到
- __u32 flags;
- };
我們這里假設(shè)請(qǐng)求和響應(yīng)是串行的,所以不需要用到user_data字段關(guān)聯(lián)請(qǐng)求和響應(yīng)。從前面代碼我們可以看到,我們把數(shù)據(jù)讀取到buff變量里。我們看看內(nèi)核返回后我們的處理邏輯。
- struct io_uring_cqe *cqe;
- unsigned head, reaped = 0;
- // 拿到完成隊(duì)列隊(duì)頭節(jié)點(diǎn),可消費(fèi)buff里面存儲(chǔ)的數(shù)據(jù)
- head = io_uring_smp_load_acquire(cring_head);
- cqe = &cqes[head & (*cring_mask)];
- // 更新頭索引
- head++;
- io_uring_smp_store_release(cring_head, head);
這就是io_uring一個(gè)讀取操作的大致過(guò)程,我們看到用戶層面的邏輯還是挺復(fù)雜的,作者也想到了,所以又封裝了Liburing庫(kù)簡(jiǎn)化使用。
3 Liburing的使用
那么我們到底怎么使用它呢,我們回想epoll的使用。
- // 創(chuàng)建epoll 實(shí)例
- int epollfd = epoll_create();
- // 封裝fd和訂閱事件
- struct epoll_event event;
- event.events = EPOLLIN;
- event.data.fd = listenFd;
- // 注冊(cè)到epoll
- epoll_ctl(epollfd, EPOLL_CTL_ADD, listenFd, &event);
- // 等待事件觸發(fā)
- int num = epoll_wait(epollfd, events, MAX_EVENTS, -1);
- for (i = 0; i < num; ++i) {
- // 處理事件,比如讀寫
- }
接著我們看看基于Liburing的o_uring的使用。
- // 拿到一個(gè)請(qǐng)求結(jié)構(gòu)體
- struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
- // 設(shè)置fd和數(shù)據(jù)地址
- io_uring_prep_recv(sqe, fd, data, len, 0);
- // 通知內(nèi)核有任務(wù)處理
- io_uring_submit(&ring);
- // 等待事件完成
- io_uring_submit_and_wait(&ring, 1);
- // 獲取完成的任務(wù)
- int nums = io_uring_peek_batch_cqe(&ring, cqes, sizeof(cqes) / sizeof(cqes[0]));
- for (i = 0; i < nums; ++i) {
- // 處理完成的任務(wù)
- struct io_uring_cqe *cqe = cqes[i];
- }
我們看到基于Liburing的使用簡(jiǎn)單了很多,有點(diǎn)epoll的風(fēng)格了。io_uring就介紹到這里,io_uring的細(xì)節(jié)比較多,實(shí)現(xiàn)也比較復(fù)雜,代碼量也達(dá)到了近1萬(wàn)行(epoll是2500左右),關(guān)于io_uring網(wǎng)上有非常多講解得非常好的文章,大家可以自行閱讀。
4 Node.js中的io_uring
最后介紹一下之前看到的一個(gè)Node.js的pr(https://github.com/libuv/libuv/pull/2322),這個(gè)pr引入了io_uring。雖然不是取代epoll對(duì)Libuv的核心進(jìn)行重構(gòu),但是依然值得探討。該pr涉及了150+文件,不過(guò)大部分是Liburing的代碼,我們只關(guān)注核心改動(dòng)。首先Libuv初始化的時(shí)候做了一個(gè)處理。
- // loop里做了修改
- struct loop {
- ...
- // int backend_fd; 改成下面的聯(lián)合體
- union {
- int fd;
- void* data;
- } backend;
- }
- // 定義一個(gè)使用io_uring時(shí)的結(jié)構(gòu)體
- struct uv__backend_data_io_uring {
- // io_uring的fd
- int fd;
- // 等待io_uring處理的任務(wù)個(gè)數(shù)
- int32_t pending;
- // io_uring相關(guān)結(jié)構(gòu)體
- struct io_uring ring;
- // 用于epoll中監(jiān)聽(tīng)io_uring是否有事件觸發(fā)
- uv_poll_t poll_handle;
- };
- // 分配一個(gè)uv__backend_data_io_uring結(jié)構(gòu)體
- backend_data = uv__malloc(sizeof(*backend_data));
- ring = &backend_data->ring;
- // 初始化io_uring
- rc = io_uring_queue_init(IOURING_SQ_SIZE, ring, 0);
- // epoll的fd
- backend_data->fd = fd;
- // 初始化
- uv__handle_init(loop, &backend_data->poll_handle, UV_POLL);
- backend_data->poll_handle.flags |= UV_HANDLE_INTERNAL;
- // 初始化poll_handle的io觀察者,fd是io_uring的fd,回調(diào)是uv__io_uring_done。
- uv__io_init(&backend_data->poll_handle.io_watcher,
- uv__io_uring_done,
- ring->ring_fd);
- loop->flags |= UV_LOOP_USE_IOURING;
- loop->backend.data = backend_data;
我們看到初始化時(shí)對(duì)io_uring進(jìn)行了初始化并且初始化了一個(gè)io觀察者。接下來(lái)我們看在哪里使用。
- int uv_fs_read(uv_loop_t* loop, uv_fs_t* req,
- uv_file file,
- const uv_buf_t bufs[],
- unsigned int nbufs,
- int64_t off,
- uv_fs_cb cb) {
- int rc;
- INIT(READ);
- req->file = file;
- req->nbufs = nbufs;
- req->bufs = req->bufsml;
- memcpy(req->bufs, bufs, nbufs * sizeof(*bufs));
- req->off = off;
- /*
- 優(yōu)先調(diào)用uv__platform_fs_read,不支持則降級(jí)到原來(lái)線程池的方案
- static int uv__fs_retry_with_threadpool(int rc) {
- return rc == UV_ENOSYS || rc == UV_ENOTSUP || rc == UV_ENOMEM;
- }
- */
- rc = uv__platform_fs_read(loop, req, file, bufs, nbufs, off, cb);
- if (!uv__fs_retry_with_threadpool(rc))
- return rc;
- // 走到這說(shuō)明使用降級(jí)方案
- POST;
- }
uv_fs_read函數(shù)是讀取文件內(nèi)容時(shí)執(zhí)行的函數(shù),之前時(shí)候給線程池提交一個(gè)任務(wù),修改后,加了個(gè)前置的邏輯uvplatform_fs_read。我們看看uvplatform_fs_read。
- int uv__platform_fs_read(uv_loop_t* loop,
- uv_fs_t* req,
- uv_os_fd_t file,
- const uv_buf_t bufs[],
- unsigned int nbufs,
- int64_t off,
- uv_fs_cb cb) {
- return uv__io_uring_fs_work(IORING_OP_READV,
- loop,
- req,
- file,
- bufs,
- nbufs,
- off,
- cb);}int uv__io_uring_fs_work(uint8_t opcode,
- uv_loop_t* loop,
- uv_fs_t* req,
- uv_os_fd_t file,
- const uv_buf_t bufs[],
- unsigned int nbufs,
- int64_t off,
- uv_fs_cb cb) {
- struct uv__backend_data_io_uring* backend_data;
- struct io_uring_sqe* sqe;
- int submitted;
- uint32_t incr_val;
- uv_poll_t* handle;
- backend_data = loop->backend.data;
- incr_val = (uint32_t)backend_data->pending + 1;
- // 獲取一個(gè)請(qǐng)求結(jié)構(gòu)體
- sqe = io_uring_get_sqe(&backend_data->ring);
- // 初始化請(qǐng)求
- sqe->opcode = opcode;
- sqe->fd = file;
- sqe->off = off;
- sqe->addr = (uint64_t)req->bufs;
- sqe->len = nbufs;
- // 管理req上下文,任務(wù)完成時(shí)會(huì)用到
- sqe->user_data = (uint64_t)req;
- // 提交給內(nèi)核,非阻塞式調(diào)用,返回提交任務(wù)的個(gè)數(shù)
- submitted = io_uring_submit(&backend_data->ring);
- // 提交成功
- if (submitted == 1) {
- req->priv.fs_req_engine |= UV__ENGINE_IOURING;
- // 提交的時(shí)是第一個(gè)任務(wù),則注冊(cè)io觀察者的等待可讀事件
- if (backend_data->pending++ == 0) {
- handle = &backend_data->poll_handle;
- uv__io_start(loop, &handle->io_watcher, POLLIN);
- uv__handle_start(handle);
- }
- return 0;
- }
- return UV__ERR(errno);
- }
我們看到上面的代碼會(huì)給內(nèi)核提交一個(gè)任務(wù),但是不會(huì)等待內(nèi)核返回,并在提交第一個(gè)任務(wù)的時(shí)候給epoll注冊(cè)一個(gè)等待可讀事件。我們看看io_uring的poll接口的實(shí)現(xiàn)(epoll原理可參考之前的文章)。
- static __poll_t io_uring_poll(struct file *file, poll_table *wait){
- struct io_ring_ctx *ctx = file->private_data;
- __poll_t mask = 0;
- poll_wait(file, &ctx->cq_wait, wait);
- smp_rmb();
- // 提交隊(duì)列沒(méi)滿則可寫
- if (READ_ONCE(ctx->rings->sq.tail) - ctx->cached_sq_head !=
- ctx->rings->sq_ring_entries)
- mask |= EPOLLOUT | EPOLLWRNORM;
- // 完成隊(duì)列非空則可讀
- if (io_cqring_events(ctx, false))
- mask |= EPOLLIN | EPOLLRDNORM;
- return mask;}static unsigned io_cqring_events(struct io_ring_ctx *ctx, bool noflush){
- struct io_rings *rings = ctx->rings;
- smp_rmb();
- // 完成隊(duì)列非空則可讀
- return ctx->cached_cq_tail - READ_ONCE(rings->cq.head);
- }
所以當(dāng)io_uring有任務(wù)完成,即完成隊(duì)列非空的時(shí)候,就會(huì)在Libuv的poll io被檢測(cè)到,從而執(zhí)行回調(diào)。
- void uv__io_uring_done(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
- uv_poll_t* handle;
- struct io_uring* ring;
- struct uv__backend_data_io_uring* backend_data;
- struct io_uring_cqe* cqe;
- uv_fs_t* req;
- int finished1;
- handle = container_of(w, uv_poll_t, io_watcher);
- backend_data = loop->backend.data;
- ring = &backend_data->ring;
- finished1 = 0;
- while (1) {
- // 獲取完成節(jié)點(diǎn)
- io_uring_peek_cqe(ring, &cqe);
- // 全部任務(wù)完成則注銷事件
- if (--backend_data->pending == 0)
- uv_poll_stop(handle);
- // 獲取響應(yīng)對(duì)應(yīng)的請(qǐng)求上下文
- req = (void*) (uintptr_t) cqe->user_data;
- if (req->result == 0)
- req->result = cqe->res;
- io_uring_cq_advance(ring, 1);
- // 執(zhí)行回調(diào)
- req->cb(req);
- }
- }
至此我們看到了這個(gè)pr的邏輯,主要是為文件io引入了io_uring,文件io因?yàn)榧嫒菪詥?wèn)題,在Libuv中使用線程池實(shí)現(xiàn)的,而io_uring支持普通文件,自然可以用于在Linux新版本上替換掉線程池方案。
后記:io_uring既強(qiáng)大又復(fù)雜。一切都交給內(nèi)核來(lái)處理,完成后通知我們,我們不僅不需要再手動(dòng)執(zhí)行read,同時(shí)也減少了系統(tǒng)調(diào)用的成本,尤其需要多次read的時(shí)候??雌饋?lái)是一個(gè)很棒的事情,io_uring---Linux上真正的異步IO。但其中所蘊(yùn)含的知識(shí)遠(yuǎn)不止于此,有空再更。