聊聊 Libuv 最近引入的 io_uring
io_uring 是 Linux 下高性能的異步 IO 框架,網(wǎng)上很多相關(guān)資料,我之前也初步分析了一下它的實(shí)現(xiàn),有興趣的可以查看 https://zhuanlan.zhihu.com/p/387620810。
Libuv 中最近加入了對(duì) io_uring 的支持,那么為什么要把它引入 Libuv 呢?因?yàn)?epoll 不支持普通文件的 Poll 能力,所以在 Libuv 中,異步文件 IO 操作需要通過線程池來實(shí)現(xiàn),具體來說就是當(dāng)用戶發(fā)起一個(gè)異步文件 IO 操作時(shí),Libuv 會(huì)把這個(gè)操作放到線程池中,當(dāng)子線程處理這個(gè)任務(wù)時(shí),會(huì)執(zhí)行一個(gè)阻塞式的系統(tǒng)調(diào)用,這個(gè)系統(tǒng)調(diào)用會(huì)引起線程阻塞,從而導(dǎo)致這個(gè)線程被消耗掉了,當(dāng) IO 操作完成后,子線程就會(huì)被喚醒,子線程再通過主線程去執(zhí)行用戶的回調(diào)。在 Libuv 早期的實(shí)現(xiàn)中,如果執(zhí)行比較慢的任務(wù)過多就會(huì)把線程池中的線程消耗完,從而導(dǎo)致執(zhí)行比較快的 IO 操作需要等待很長時(shí)間,一個(gè)例子就是 DNS 解析會(huì)阻塞文件 IO 任務(wù)。而 io_uring 可以支持普通文件 IO(當(dāng)然能力不僅于此),不再需要借助線程池的能力,目前 Libuv 中部分異步文件 IO 操作已經(jīng)替換成 io_uring(需要通過環(huán)境變量開啟),下面來看看它的實(shí)現(xiàn)。
原生 io_uring 的使用比較復(fù)雜,通常需要借助 liburing 庫,但是 Libuv 中可能為了減少對(duì)第三方庫的依賴,實(shí)現(xiàn)上使用原生的方式。
io_uring 初始化
在 Libuv 初始化時(shí)會(huì)進(jìn)行 io_uring 的初始化。
uv__iou_init(loop->backend_fd, &lfields->iou, 64, UV__IORING_SETUP_SQPOLL);
lfields->iou 為 io_uring 核心結(jié)構(gòu)體,UVIORING_SETUP_SQPOLL 設(shè)置內(nèi)核創(chuàng)建線程輪詢是否有任務(wù)需要處理(用戶層設(shè)置),接著看看 uviou_init。
static void uv__iou_init(int epollfd,
struct uv__iou* iou,
uint32_t entries,
uint32_t flags) {
struct uv__io_uring_params params;
struct epoll_event e;
size_t cqlen;
size_t sqlen;
size_t maxlen;
size_t sqelen;
uint32_t i;
char* sq;
char* sqe;
int ringfd;
memset(?ms, 0, sizeof(params));
params.flags = flags;
// UV__IORING_SETUP_SQPOLL 模式下,設(shè)置多久沒有任務(wù)提交則內(nèi)核線程進(jìn)入 sleep 狀態(tài)
if (flags & UV__IORING_SETUP_SQPOLL)
params.sq_thread_idle = 10; /* milliseconds /
// 調(diào)用系統(tǒng)調(diào)用初始化 io_uring
ringfd = uv__io_uring_setup(entries, ?ms);
// 映射到內(nèi)核發(fā)送 / 完成隊(duì)列的內(nèi)存,用戶層和內(nèi)核可以共同操作這個(gè)隊(duì)列
sq = mmap(0,
maxlen,
PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE,
ringfd,
0); /
sqe = mmap(0,
sqelen,
PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE,
ringfd,
0x10000000ull); /* IORING_OFF_SQES */
memset(&e, 0, sizeof(e));
e.events = POLLIN;
e.data.fd = ringfd;
// 注冊(cè)等待可讀事件,io_uring 中有任務(wù)完成后就會(huì)通過 epoll
epoll_ctl(epollfd, EPOLL_CTL_ADD, ringfd, &e);
// 初始化 io_uring 結(jié)構(gòu)體
iou->sqhead = (uint32_t*) (sq + params.sq_off.head);
iou->sqtail = (uint32_t*) (sq + params.sq_off.tail);
iou->sqmask = (uint32_t) (sq + params.sq_off.ring_mask);
iou->sqarray = (uint32_t*) (sq + params.sq_off.array);
iou->sqflags = (uint32_t*) (sq + params.sq_off.flags);
iou->cqhead = (uint32_t*) (sq + params.cq_off.head);
iou->cqtail = (uint32_t*) (sq + params.cq_off.tail);
iou->cqmask = (uint32_t) (sq + params.cq_off.ring_mask);
iou->sq = sq;
iou->cqe = sq + params.cq_off.cqes;
iou->sqe = sqe;
iou->sqlen = sqlen;
iou->cqlen = cqlen;
iou->maxlen = maxlen;
iou->sqelen = sqelen;
iou->ringfd = ringfd;
iou->in_flight = 0;
iou->flags = 0;
}
uv__iou_init 完成了 io_uring 的初始化,并且把 io_uring 對(duì)應(yīng)的 fd 注冊(cè)到 epoll,當(dāng) io_uring 有任務(wù)完成時(shí),就可以通過 epoll 感知到。接著就可以使用 io_uring 了。
提交異步任務(wù)
下面看一個(gè)異步文件 IO 的操作。
int uv_fs_open(uv_loop_t* loop,
uv_fs_t* req,
const char* path,
int flags,
int mode,
uv_fs_cb cb) {
INIT(OPEN);
PATH;
req->flags = flags;
req->mode = mode;
if (cb != NULL)
if (uv__iou_fs_open(loop, req))
return 0;
POST;
}
uv_fs_open 可以以異步的方式打開一個(gè)文件,之前時(shí)通過線程池實(shí)現(xiàn)的,加入 io_uring 后,就會(huì)多了一層攔截,來看看 uv__iou_fs_open。
int uv__iou_fs_open(uv_loop_t* loop, uv_fs_t* req) {
struct uv__io_uring_sqe* sqe;
struct uv__iou* iou;
// 獲取 io_uring 結(jié)構(gòu)體
iou = &uv__get_internal_fields(loop)->iou;
// 獲取一個(gè)任務(wù)節(jié)點(diǎn),任務(wù)節(jié)點(diǎn)會(huì)和 req 互相關(guān)聯(lián),回調(diào)時(shí)會(huì)用到
sqe = uv__iou_get_sqe(iou, loop, req);
// 設(shè)置操作上下文
sqe->addr = (uintptr_t) req->path;
sqe->fd = AT_FDCWD;
sqe->len = req->mode;
// 設(shè)置操作類型
sqe->opcode = UV__IORING_OP_OPENAT;
sqe->open_flags = req->flags | O_CLOEXEC;
// 提交任務(wù)
uv__iou_submit(iou);
return 1;
}
uviou_fs_open 中有兩個(gè)核心邏輯 uviou_get_sqe 和 uviou_submit,首先來看 uviou_get_sqe。
static struct uv__io_uring_sqe* uv__iou_get_sqe(struct uv__iou* iou,
uv_loop_t* loop,
uv_fs_t* req) {
struct uv__io_uring_sqe* sqe;
uint32_t head;
uint32_t tail;
uint32_t mask;
uint32_t slot;
if (iou->ringfd == -1)
return NULL;
head = atomic_load_explicit((_Atomic uint32_t*) iou->sqhead,
memory_order_acquire);
tail = *iou->sqtail;
mask = iou->sqmask;
slot = tail & mask;
sqe = iou->sqe;
// 從請(qǐng)求隊(duì)列中獲取一個(gè)節(jié)點(diǎn)
sqe = &sqe[slot];
memset(sqe, 0, sizeof(*sqe));
// 任務(wù)節(jié)點(diǎn)關(guān)聯(lián)到 req,回調(diào)時(shí)需要使用
sqe->user_data = (uintptr_t) req;
req->work_req.loop = loop;
req->work_req.work = NULL;
req->work_req.done = NULL;
uv__queue_init(&req->work_req.wq);
uv__req_register(loop, req);
iou->in_flight++;
return sqe;
}
uviou_get_sqe 主要是從任務(wù)隊(duì)列中獲取一個(gè)空閑節(jié)點(diǎn)并關(guān)聯(lián)上請(qǐng)求上下文結(jié)構(gòu)體,uviou_get_sqe 的調(diào)用方需要設(shè)置操作上下文,比如操作類型,操作的 fd 等。通過 uviou_get_sqe 獲取任務(wù)節(jié)點(diǎn)并設(shè)置了操作上下文后,這個(gè)任務(wù)就會(huì)自動(dòng)被操作系統(tǒng)感知。因?yàn)?Libuv 是使用了 UVIORING_SETUP_SQPOLL 模式,所以還需要判斷這時(shí)候內(nèi)核輪訓(xùn)線程是否處于睡眠狀態(tài),這就是 uv__iou_submit 的邏輯。
static void uv__iou_submit(struct uv__iou* iou) {
uint32_t flags;
atomic_store_explicit((_Atomic uint32_t*) iou->sqtail,
*iou->sqtail + 1,
memory_order_release);
flags = atomic_load_explicit((_Atomic uint32_t*) iou->sqflags,
memory_order_acquire);
// 判斷內(nèi)核線程是否處于睡眠狀態(tài)
if (flags & UV__IORING_SQ_NEED_WAKEUP)
// 喚醒內(nèi)核線程,說明有任務(wù)需要處理
if (uv__io_uring_enter(iou->ringfd, 0, 0, UV__IORING_ENTER_SQ_WAKEUP))
if (errno != EOWNERDEAD) /* Kernel bug. Harmless, ignore. /
perror("libuv: io_uring_enter(wakeup)"); /
這樣就完成了任務(wù)的提交。
任務(wù)完成
任務(wù)完成后,io_uring 對(duì)應(yīng)的 fd 就會(huì)變成可讀,從而 epoll 就會(huì)感知到,來看看 epoll 的處理。下面是 epoll 處理就緒 fd 時(shí)的一段邏輯。
if(fd == iou->ringfd) {
uv__poll_io_uring(loop, iou);
have_iou_events = 1;
continue;
}
如果是 io_uring 的 fd 可讀,則執(zhí)行 uv__poll_io_uring。
static void uv__poll_io_uring(uv_loop_t* loop, struct uv__iou* iou) {
struct uv__io_uring_cqe* cqe;
struct uv__io_uring_cqe* e;
uv_fs_t* req;
uint32_t head;
uint32_t tail;
uint32_t mask;
uint32_t i;
uint32_t flags;
int nevents;
int rc;
// 完成隊(duì)列頭/尾節(jié)點(diǎn)
head = iou->cqhead;
tail = atomic_load_explicit((_Atomic uint32_t) iou->cqtail,
memory_order_acquire);
mask = iou->cqmask;
cqe = iou->cqe;
nevents = 0;
// 遍歷完成隊(duì)列
for (i = head; i != tail; i++) {
e = &cqe[i & mask];
// 拿到操作關(guān)聯(lián)的請(qǐng)求結(jié)構(gòu)體
req = (uv_fs_t*) (uintptr_t) e->user_data;
uv__req_unregister(loop, req);
iou->in_flight--;
// 操作返回值,表示操作是否成功
req->result = e->res;
// 執(zhí)行回調(diào)
req->cb(req);
}
uv__poll_io_uring 的邏輯很簡單,就是遍歷完成隊(duì)列,然后拿到對(duì)應(yīng)的請(qǐng)求上下文結(jié)構(gòu)體,最后執(zhí)行它的回調(diào)。
現(xiàn)代軟件中大多數(shù)使用的 IO 模型是 epoll,隨著 io_uring 的發(fā)展和成熟,io_uring 將會(huì)出現(xiàn)在更多的軟件中,之前我也體驗(yàn)了一下 io_uring,有興趣的可以體驗(yàn)下 https://github.com/theanarkh/nodejs_io_uring。