自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

解密高性能異步I/O:io_uring的魔力與應(yīng)用

系統(tǒng) Linux
io_uring 是一個(gè)Linux內(nèi)核提供的高性能異步 I/O 框架,最初在 Linux 5.1 版本中引入。它的設(shè)計(jì)目標(biāo)是解決傳統(tǒng)的異步 I/O 模型(如epoll或者 POSIX AIO)在大規(guī)模 I/O 操作中效率不高的問題。
在 Linux 系統(tǒng)的發(fā)展歷程中,I/O 操作的效率一直是影響系統(tǒng)性能的關(guān)鍵因素。從早期簡單的阻塞式 I/O,到后來的非阻塞 I/O、I/O 多路復(fù)用,每一次的技術(shù)演進(jìn)都在不斷突破 I/O 性能的瓶頸。而 io_uring 的出現(xiàn),更是為 Linux 異步 I/O 領(lǐng)域帶來了一場革命。

它的高效性和創(chuàng)新性,使得開發(fā)者能夠構(gòu)建出性能更卓越的應(yīng)用程序,無論是在高性能網(wǎng)絡(luò)服務(wù)、數(shù)據(jù)庫系統(tǒng),還是大規(guī)模文件處理等場景中,io_uring 都展現(xiàn)出了巨大的潛力。接下來,讓我們一起深入探索 io_uring 的實(shí)現(xiàn)原理與應(yīng)用案例。

一、io_uring簡介

1.1io_uring概述

io_uring 是一個(gè)Linux內(nèi)核提供的高性能異步 I/O 框架,最初在 Linux 5.1 版本中引入。它的設(shè)計(jì)目標(biāo)是解決傳統(tǒng)的異步 I/O 模型(如epoll或者 POSIX AIO)在大規(guī)模 I/O 操作中效率不高的問題。

在傳統(tǒng)的 Linux I/O 操作中,存在一些性能瓶頸。例如,系統(tǒng)調(diào)用的開銷較大,同步 I/O 操作會導(dǎo)致線程在等待 I/O 完成時(shí)被阻塞,浪費(fèi)了 CPU 資源。隨著對高性能、高并發(fā)服務(wù)器和應(yīng)用程序的需求不斷增加,需要一種更高效的 I/O 處理機(jī)制。io_uring 應(yīng)運(yùn)而生,它是由 Jens Axboe 開發(fā)的,目的是為了解決這些傳統(tǒng) I/O 機(jī)制的效率問題。

*過往IO接口的缺陷

(1)同步IO接口

最原始的文件IO系統(tǒng)調(diào)用就是read,write。read系統(tǒng)調(diào)用從文件描述符所指代的打開文件中讀取數(shù)據(jù)。write系統(tǒng)調(diào)用將數(shù)據(jù)寫入一個(gè)已打開的文件中。在文件特定偏移處的IO是pread,pwrite。調(diào)用時(shí)可以指定位置進(jìn)行文件IO操作,而非始于文件的當(dāng)前偏移處,且他們不會改變文件的當(dāng)前偏移量。

分散輸入和集中輸出(Scatter-Gather IO)是readv, writev,調(diào)用并非只對單個(gè)緩沖區(qū)進(jìn)行讀寫操作,而是一次即可傳輸多個(gè)緩沖區(qū)的數(shù)據(jù),免除了多次系統(tǒng)調(diào)用的開銷,提高文件 I/O 的效率,特別是當(dāng)需要讀寫多個(gè)連續(xù)或非連續(xù)的數(shù)據(jù)塊時(shí)。

該機(jī)制使用一個(gè)數(shù)組iov定義了一組用來傳輸數(shù)據(jù)的緩沖區(qū),一個(gè)整形數(shù)iovcnt指定iov的成員個(gè)數(shù),其中,iov中的每個(gè)成員都是如下形式的數(shù)據(jù)結(jié)構(gòu)。

struct iovec {
   void  *iov_base;    /* Starting address */
   size_t iov_len;     /* Number of bytes to transfer */
};

上述接口在讀寫IO時(shí),系統(tǒng)調(diào)用會阻塞住等待,在數(shù)據(jù)讀取或?qū)懭牒蟛欧祷亟Y(jié)果。同步導(dǎo)致的后果就是在阻塞的同時(shí)無法繼續(xù)執(zhí)行其他的操作,只能等待IO結(jié)果返回。存儲場景中對性能的要求非常高,所以需要異步IO。

(2)異步IO接口:AIO

Linux 的異步 IO(AIO,Asynchronous I/O)是一種高級的文件 IO 模型,允許應(yīng)用程序在發(fā)起 IO 操作后不必等待操作完成,而是可以繼續(xù)執(zhí)行其他任務(wù)。這與傳統(tǒng)的同步 IO 模型不同,后者在 IO 操作完成之前會阻塞應(yīng)用程序的執(zhí)行。

1.2io_uring設(shè)計(jì)思路

(1)解決“系統(tǒng)調(diào)用開銷大”的問題?

針對這個(gè)問題,考慮是否每次都需要系統(tǒng)調(diào)用。如果能將多次系統(tǒng)調(diào)用中的邏輯放到有限次數(shù)中來,就能將消耗降為常數(shù)時(shí)間復(fù)雜度。

(2)解決“拷貝開銷大”的問題?

之所以在提交和完成事件中存在大量的內(nèi)存拷貝,是因?yàn)閼?yīng)用程序和內(nèi)核之間的通信需要拷貝數(shù)據(jù),所以為了避免這個(gè)問題,需要重新考量應(yīng)用與內(nèi)核間的通信方式。我們發(fā)現(xiàn),兩者通信,不是必須要拷貝,通過現(xiàn)有技術(shù),可以讓應(yīng)用與內(nèi)核共享內(nèi)存。

要實(shí)現(xiàn)核外與內(nèi)核的零拷貝,最佳方式就是實(shí)現(xiàn)一塊內(nèi)存映射區(qū)域,兩者共享一段內(nèi)存,核外往這段內(nèi)存寫數(shù)據(jù),然后通知內(nèi)核使用這段內(nèi)存數(shù)據(jù),或者內(nèi)核填寫這段數(shù)據(jù),核外使用這部分?jǐn)?shù)據(jù)。因此,需要一對共享的ring buffer用于應(yīng)用程序和內(nèi)核之間的通信。

  • 一塊用于核外傳遞數(shù)據(jù)給內(nèi)核,一塊是內(nèi)核傳遞數(shù)據(jù)給核外,一方只讀,一方只寫。
  • 提交隊(duì)列SQ(submission queue)中,應(yīng)用是IO提交的生產(chǎn)者,內(nèi)核是消費(fèi)者。
  • 完成隊(duì)列CQ(completion queue)中,內(nèi)核是IO完成的生產(chǎn)者,應(yīng)用是消費(fèi)者。
  • 內(nèi)核控制SQ ring的head和CQ ring的tail,應(yīng)用程序控制SQ ring的tail和CQ ring的head

(3)解決“API不友好”的問題?

問題在于需要多個(gè)系統(tǒng)調(diào)用才能完成,考慮是否可以把多個(gè)系統(tǒng)調(diào)用合而為一。有時(shí)候,將多個(gè)類似的函數(shù)合并并通過參數(shù)區(qū)分不同的行為是更好的選擇,而有時(shí)候可能需要將復(fù)雜的函數(shù)分解為更簡單的部分來進(jìn)行重構(gòu)。

如果發(fā)現(xiàn)函數(shù)中的某一部分代碼可以獨(dú)立出來成為一個(gè)單獨(dú)的函數(shù),可以先進(jìn)行這樣的提煉,然后再考慮是否需要進(jìn)一步使用參數(shù)化方法重構(gòu)。

1.3與其他 I/O 模型的對比

阻塞 I/O:阻塞 I/O 是最基本的 I/O 模型,在這種模型下,當(dāng)應(yīng)用程序調(diào)用 I/O 操作(如 read、write)時(shí),線程會被阻塞,直到 I/O 操作完成。例如,當(dāng)讀取一個(gè)文件時(shí),線程會一直等待,直到數(shù)據(jù)從磁盤讀取到內(nèi)存中。這種模型的優(yōu)點(diǎn)是簡單直觀,易于理解和實(shí)現(xiàn),但缺點(diǎn)也很明顯,在 I/O 操作期間,線程無法執(zhí)行其他任務(wù),這在高并發(fā)場景下會導(dǎo)致大量線程被阻塞,嚴(yán)重降低系統(tǒng)的性能和響應(yīng)速度。例如,在一個(gè)同時(shí)處理多個(gè)客戶端請求的服務(wù)器中,如果使用阻塞 I/O,每個(gè)請求都可能導(dǎo)致線程阻塞,當(dāng)請求數(shù)量較多時(shí),服務(wù)器將無法及時(shí)響應(yīng)其他請求。

非阻塞 I/O:非阻塞 I/O 允許應(yīng)用程序在 I/O 操作未完成時(shí)立即返回,線程不會被阻塞。應(yīng)用程序可以通過輪詢的方式檢查 I/O 操作的狀態(tài),以確定是否完成。雖然這種模型避免了線程的阻塞,但頻繁的輪詢會消耗大量的 CPU 資源,并且在 I/O 操作較多時(shí),管理和協(xié)調(diào)這些操作會變得非常復(fù)雜。以網(wǎng)絡(luò)編程為例,在非阻塞 I/O 模式下,應(yīng)用程序需要不斷地檢查 socket 是否有數(shù)據(jù)可讀或可寫,這會增加 CPU 的負(fù)擔(dān),降低系統(tǒng)的整體性能。

epoll:epoll 是一種 I/O 多路復(fù)用技術(shù),它允許應(yīng)用程序同時(shí)監(jiān)控多個(gè)文件描述符的事件(如可讀、可寫、異常等)。當(dāng)有事件發(fā)生時(shí),epoll 會通知應(yīng)用程序進(jìn)行處理。epoll 在一定程度上提高了 I/O 操作的效率,特別是在高并發(fā)場景下,它通過減少系統(tǒng)調(diào)用的次數(shù),降低了 CPU 的開銷。然而,epoll 本質(zhì)上還是同步 I/O,它只是提供了一種高效的事件通知機(jī)制,應(yīng)用程序在處理 I/O 事件時(shí),仍然需要進(jìn)行實(shí)際的 I/O 操作,這可能會導(dǎo)致線程阻塞。比如,在一個(gè)使用 epoll 的網(wǎng)絡(luò)服務(wù)器中,當(dāng)有新的連接請求或數(shù)據(jù)到達(dá)時(shí),epoll 會通知應(yīng)用程序,但應(yīng)用程序在讀取或?qū)懭霐?shù)據(jù)時(shí),仍然可能會因?yàn)?I/O 操作的延遲而阻塞線程。

傳統(tǒng) AIO:傳統(tǒng) AIO 雖然提供了異步 I/O 的功能,但存在諸多限制。如前文所述,它只能在 Direct I/O 模式下使用,無法利用頁緩存,這使得數(shù)據(jù)讀寫的效率受到影響。此外,傳統(tǒng) AIO 在 I/O 提交時(shí)可能會出現(xiàn)阻塞,導(dǎo)致其異步性并不完全可靠。在實(shí)際應(yīng)用中,由于這些限制,傳統(tǒng) AIO 的使用場景相對較窄,難以滿足大多數(shù)應(yīng)用程序?qū)Ω咝?I/O 的需求。

相比之下,io_uring 具有明顯的優(yōu)勢。它通過用戶態(tài)和內(nèi)核態(tài)共享提交隊(duì)列(Submission Queue)和完成隊(duì)列(Completion Queue) ,減少了系統(tǒng)調(diào)用的次數(shù)和上下文切換的開銷。在 io_uring 中,應(yīng)用程序只需將 I/O 請求放入提交隊(duì)列,內(nèi)核會在后臺處理這些請求,并將結(jié)果放入完成隊(duì)列,應(yīng)用程序可以隨時(shí)從完成隊(duì)列中獲取結(jié)果,無需頻繁進(jìn)行系統(tǒng)調(diào)用和輪詢。此外,io_uring 支持更多的異步系統(tǒng)調(diào)用,不僅適用于存儲文件的 I/O 操作,還能很好地應(yīng)用于網(wǎng)絡(luò)套接字(network sockets)的 I/O 處理,具有更廣泛的適用性和更高的靈活性。

二、io_uring的實(shí)現(xiàn)原理

io_uring實(shí)現(xiàn)異步I/O的方式其實(shí)是一個(gè)生產(chǎn)者-消費(fèi)者模型:

  • 用戶進(jìn)程生產(chǎn)I/O請求,放入提交隊(duì)列(Submission Queue,簡稱SQ)。
  • 內(nèi)核消費(fèi)SQ中的I/O請求,完成后將結(jié)果放入完成隊(duì)列(Completion Queue,簡稱CQ)。
  • 用戶進(jìn)程從CQ中收割I(lǐng)/O結(jié)果。

SQ和CQ是內(nèi)核初始化io_uring實(shí)例的時(shí)候創(chuàng)建的。為了減少系統(tǒng)調(diào)用和減少用戶進(jìn)程與內(nèi)核之間的數(shù)據(jù)拷貝,io_uring使用mmap的方式讓用戶進(jìn)程和內(nèi)核共享SQ和CQ的內(nèi)存空間。

另外,由于先提交的I/O請求不一定先完成,SQ保存的其實(shí)是一個(gè)數(shù)組索引(數(shù)據(jù)類型 uint32),真正的SQE(Submission Queue Entry)保存在一個(gè)獨(dú)立的數(shù)組(SQ Array)。所以要提交一個(gè)I/O請求,得先在SQ Array中找到一個(gè)空閑的SQE,設(shè)置好之后,將其數(shù)組索引放到SQ中。

用戶進(jìn)程、內(nèi)核、SQ、CQ和SQ Array之間的基本關(guān)系如下:

圖片圖片

2.1核心組件解析

提交隊(duì)列(SQ)與提交隊(duì)列項(xiàng)(SQE):提交隊(duì)列(Submission Queue,簡稱 SQ)是 io_uring 中用于存儲 I/O 請求的隊(duì)列,它是一個(gè)環(huán)形緩沖區(qū),位于用戶態(tài)和內(nèi)核態(tài)共享的內(nèi)存區(qū)域。每個(gè) I/O 請求在提交隊(duì)列中都以提交隊(duì)列項(xiàng)(Submission Queue Entry,簡稱 SQE)的形式存在。SQE 是一個(gè)結(jié)構(gòu)體,它存儲了 I/O 請求的詳細(xì)信息,包括操作類型(如讀、寫、異步連接等)、目標(biāo)文件描述符、緩沖區(qū)地址、操作長度、偏移量等關(guān)鍵信息。

例如,在進(jìn)行文件讀取操作時(shí),SQE 會記錄要讀取的文件描述符、讀取數(shù)據(jù)的緩沖區(qū)地址、讀取的字節(jié)數(shù)以及文件中的偏移量等信息。應(yīng)用程序通過填充 SQE 結(jié)構(gòu)體,并將其添加到 SQ 中,來向內(nèi)核提交 I/O 請求。由于 SQ 是環(huán)形緩沖區(qū),當(dāng)隊(duì)列滿時(shí),新的請求會覆蓋舊的請求,從而保證 I/O 請求的持續(xù)提交。

完成隊(duì)列(CQ)與完成隊(duì)列項(xiàng)(CQE):完成隊(duì)列(Completion Queue,簡稱 CQ)同樣是一個(gè)環(huán)形緩沖區(qū),用于存儲 I/O 請求的完成結(jié)果。當(dāng)內(nèi)核完成一個(gè) I/O 操作后,會將操作的結(jié)果封裝成一個(gè)完成隊(duì)列項(xiàng)(Completion Queue Entry,簡稱 CQE),并將其放入 CQ 中。CQE 結(jié)構(gòu)體包含了 I/O 操作的返回值、狀態(tài)碼、用戶自定義數(shù)據(jù)等信息。

通過這些信息,應(yīng)用程序可以判斷 I/O 操作是否成功,并獲取操作的相關(guān)結(jié)果。比如,在文件讀取操作完成后,CQE 中的返回值會表示實(shí)際讀取的字節(jié)數(shù),狀態(tài)碼則用于指示操作是否成功,若操作失敗,狀態(tài)碼會包含具體的錯(cuò)誤信息。應(yīng)用程序可以通過輪詢 CQ 或者等待特定的事件通知,來獲取完成的 I/O 請求結(jié)果,從而進(jìn)行后續(xù)的處理。

SQ Ring 與 CQ Ring:SQ Ring 和 CQ Ring 分別是提交隊(duì)列和完成隊(duì)列的環(huán)形緩沖區(qū)結(jié)構(gòu)。它們包含了隊(duì)列本身(即 SQ 和 CQ)、頭部索引(head)、尾部索引(tail)以及隊(duì)列大小等關(guān)鍵信息。頭部索引(head)指向隊(duì)列中第一個(gè)待處理的元素,而尾部索引(tail)則指向隊(duì)列中下一個(gè)空閑的位置。當(dāng)應(yīng)用程序向 SQ 提交 I/O 請求時(shí),它會將請求信息填充到 tail 指向的 SQE 中,然后將 tail 指針遞增,指向下一個(gè)空閑位置。

內(nèi)核在處理 I/O 請求時(shí),會從 head 指向的 SQE 中獲取請求信息,處理完成后,將結(jié)果放入 CQ 中。同樣,CQ Ring 通過 head 和 tail 指針來管理完成隊(duì)列,內(nèi)核將完成的 I/O 結(jié)果放入 tail 指向的 CQE 中,并遞增 tail 指針,應(yīng)用程序則從 head 指向的 CQE 中獲取結(jié)果。這種環(huán)形緩沖區(qū)結(jié)構(gòu)以及基于 head 和 tail 指針的操作方式,實(shí)現(xiàn)了用戶態(tài)和內(nèi)核態(tài)之間高效的數(shù)據(jù)交換,減少了鎖的使用和上下文切換的開銷,從而大大提高了 I/O 操作的效率。

2.2系統(tǒng)調(diào)用詳解

io_uring的實(shí)現(xiàn)僅僅使用了三個(gè)syscall:io_uring_setup, io_uring_enter和io_uring_register

這幾個(gè)系統(tǒng)調(diào)用接口都在io_uring.c文件中:

⑴io_uring_setup

io_uring_setup 是用于初始化 io_uring 環(huán)境的系統(tǒng)調(diào)用。在使用 io_uring 進(jìn)行異步 I/O 操作之前,首先需要調(diào)用 io_uring_setup 來創(chuàng)建一個(gè) io_uring 實(shí)例。它接受兩個(gè)參數(shù),第一個(gè)參數(shù)是期望的提交隊(duì)列(SQ)的大小,即隊(duì)列中可以容納的 I/O 請求數(shù)量;第二個(gè)參數(shù)是一個(gè)指向 io_uring_params 結(jié)構(gòu)體的指針,該結(jié)構(gòu)體用于返回 io_uring 實(shí)例的相關(guān)參數(shù),如實(shí)際分配的 SQ 和完成隊(duì)列(CQ)的大小、隊(duì)列的偏移量等信息。

在調(diào)用 io_uring_setup 時(shí),內(nèi)核會為 io_uring 實(shí)例分配所需的內(nèi)存空間,包括 SQ、CQ 以及相關(guān)的控制結(jié)構(gòu)。同時(shí),內(nèi)核還會創(chuàng)建一些內(nèi)部數(shù)據(jù)結(jié)構(gòu),用于管理和調(diào)度 I/O 請求。如果初始化成功,io_uring_setup 會返回一個(gè)文件描述符,這個(gè)文件描述符用于標(biāo)識創(chuàng)建的 io_uring 實(shí)例,后續(xù)的 io_uring 系統(tǒng)調(diào)用(如 io_uring_enter、io_uring_register)將通過這個(gè)文件描述符來操作該 io_uring 實(shí)例。若初始化失敗,函數(shù)將返回一個(gè)負(fù)數(shù),表示相應(yīng)的錯(cuò)誤代碼。

io_uring_setup():

SYSCALL_DEFINE2(io_uring_setup, u32, entries,
                struct io_uring_params __user *, params)                                                                                                                                                           
{
        return io_uring_setup(entries, params);
}
  • 功能:用于初始化和配置 io_uring 。
  • 應(yīng)用用途:在使用 io_uring 之前,首先需要調(diào)用此接口初始化一個(gè) io_uring 環(huán),并設(shè)置其參數(shù)。

⑵io_uring_enter

io_uring_enter 是用于提交和等待 I/O 操作的系統(tǒng)調(diào)用。它的主要作用是將應(yīng)用程序準(zhǔn)備好的 I/O 請求提交給內(nèi)核,并可以選擇等待這些操作完成。io_uring_enter 接受多個(gè)參數(shù),其中包括 io_uring_setup 返回的文件描述符,用于指定要操作的 io_uring 實(shí)例;to_submit 參數(shù)表示要提交的 I/O 請求的數(shù)量,即從提交隊(duì)列(SQ)中取出并提交給內(nèi)核的 SQE 的數(shù)量;min_complete 參數(shù)指定了內(nèi)核在返回之前必須等待完成的 I/O 操作的最小數(shù)量;flags 參數(shù)則用于控制 io_uring_enter 的行為,例如可以設(shè)置是否等待 I/O 操作完成、是否獲取完成的 I/O 事件等。當(dāng)調(diào)用 io_uring_enter 時(shí),如果 to_submit 參數(shù)大于 0,內(nèi)核會從 SQ 中取出相應(yīng)數(shù)量的 SQE,并將這些 I/O 請求提交到內(nèi)核中進(jìn)行處理。

同時(shí),如果設(shè)置了等待 I/O 操作完成的標(biāo)志,內(nèi)核會阻塞等待,直到至少有 min_complete 個(gè) I/O 操作完成,然后將這些完成的操作結(jié)果放入完成隊(duì)列(CQ)中。應(yīng)用程序可以通過檢查 CQ 來獲取這些完成的 I/O 請求的結(jié)果。通過 io_uring_enter,應(yīng)用程序可以靈活地控制 I/O 請求的提交和等待策略,提高 I/O 操作的效率和靈活性。

io_uring_enter():

SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,                                                                                                                                                  
                u32, min_complete, u32, flags, const void __user *, argp,
                size_t, argsz)
  • 功能:用于提交和處理異步 I/O 操作。
  • 應(yīng)用用途:在向 io_uring 環(huán)中提交 I/O 操作后,通過調(diào)用此接口觸發(fā)內(nèi)核處理這些操作,并獲取完成的操作結(jié)果。

⑶io_uring_register

io_uring_register 用于注冊文件描述符或事件文件描述符到 io_uring 實(shí)例中,以便在后續(xù)的 I/O 操作中使用。它接受四個(gè)參數(shù),第一個(gè)參數(shù)是 io_uring_setup 返回的文件描述符,用于指定要注冊到的 io_uring 實(shí)例;第二個(gè)參數(shù) opcode 表示注冊的類型,例如可以是 IORING_REGISTER_FILES(注冊文件描述符集合)、IORING_REGISTER_BUFFERS(注冊內(nèi)存緩沖區(qū))、IORING_REGISTER_EVENTFD(注冊 eventfd 用于通知完成事件)等;

第三個(gè)參數(shù) arg 是一個(gè)指針,根據(jù) opcode 的類型不同,它指向不同的內(nèi)容,如注冊文件描述符時(shí),arg 指向一個(gè)包含文件描述符的數(shù)組;注冊緩沖區(qū)時(shí),arg 指向一個(gè)描述緩沖區(qū)的結(jié)構(gòu)體數(shù)組;第四個(gè)參數(shù) nr_args 表示 arg 所指向的數(shù)組的長度。通過 io_uring_register 注冊文件描述符或緩沖區(qū)等資源后,內(nèi)核在處理 I/O 請求時(shí),可以直接訪問這些預(yù)先注冊的資源,而無需每次都重新設(shè)置相關(guān)信息,從而提高了 I/O 操作的效率。例如,在進(jìn)行大量文件讀寫操作時(shí),預(yù)先注冊文件描述符可以避免每次提交 I/O 請求時(shí)都進(jìn)行文件描述符的查找和驗(yàn)證,減少了系統(tǒng)開銷,提升了 I/O 性能。

io_uring_register():

SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
                void __user *, arg, unsigned int, nr_args)
  • 功能:用于注冊文件描述符、緩沖區(qū)、事件文件描述符等資源到 io_uring 環(huán)中。
  • 應(yīng)用用途:在進(jìn)行 I/O 操作之前,需要將相關(guān)的資源注冊到 io_uring 環(huán)中,以便進(jìn)行后續(xù)的異步 I/O 操作。

2.3工作流程深度剖析

①創(chuàng)建 io_uring 對象

使用 io_uring 進(jìn)行異步 I/O 操作的第一步是創(chuàng)建 io_uring 對象。內(nèi)核提供了io_uring_setup系統(tǒng)調(diào)用來初始化一個(gè)io_uring實(shí)例,創(chuàng)建SQ、CQ和SQ Array,entries參數(shù)表示的是SQ和SQArray的大小,CQ的大小默認(rèn)是2 * entries。params參數(shù)既是輸入?yún)?shù),也是輸出參數(shù)。

該函數(shù)返回一個(gè)file descriptor,并將io_uring支持的功能、以及各個(gè)數(shù)據(jù)結(jié)構(gòu)在fd中的偏移量存入params。用戶根據(jù)偏移量將fd通過mmap內(nèi)存映射得到一塊內(nèi)核用戶共享的內(nèi)存區(qū)域。這塊內(nèi)存區(qū)域中,有io_uring的上下文信息:SQ信息、CQ信息和SQ Array信息。

int io_uring_setup(int entries, struct io_uring_params *params);

這通過調(diào)用 io_uring_setup 系統(tǒng)調(diào)用來完成。在調(diào)用 io_uring_setup 時(shí),用戶需要指定提交隊(duì)列(SQ)的大小,即期望的 I/O 請求隊(duì)列長度。內(nèi)核會根據(jù)這個(gè)請求,為 io_uring 對象分配必要的內(nèi)存空間,包括提交隊(duì)列(SQ)、完成隊(duì)列(CQ)以及相關(guān)的控制結(jié)構(gòu)。內(nèi)核會創(chuàng)建一個(gè) io_ring_ctx 結(jié)構(gòu)體對象,用于管理 io_uring 的上下文信息。

同時(shí),還會創(chuàng)建一個(gè) io_urings 結(jié)構(gòu)體對象,該對象包含了 SQ 和 CQ 的具體實(shí)現(xiàn),如隊(duì)列的頭部索引(head)、尾部索引(tail)、隊(duì)列大小等信息。在創(chuàng)建過程中,內(nèi)核會初始化 SQ 和 CQ 的所有隊(duì)列項(xiàng)(SQE 和 CQE),并設(shè)置好相關(guān)的指針和標(biāo)志位。如果用戶在調(diào)用 io_uring_setup 時(shí)設(shè)置了 IORING_SETUP_SQPOLL 標(biāo)志位,內(nèi)核還會創(chuàng)建一個(gè) SQ 線程,用于從 SQ 隊(duì)列中獲取 I/O 請求并提交給內(nèi)核處理。

創(chuàng)建完成后,io_uring_setup 會返回一個(gè)文件描述符,這個(gè)文件描述符是后續(xù)操作 io_uring 對象的關(guān)鍵標(biāo)識,通過它可以進(jìn)行 I/O 請求的提交、注冊文件描述符等操作。

②準(zhǔn)備 I/O 請求

在創(chuàng)建 io_uring 對象后,需要準(zhǔn)備具體的 I/O 請求。這通常通過 io_uring_prep_XXX 系列函數(shù)來完成,這些函數(shù)用于準(zhǔn)備不同類型的 I/O 請求,如 io_uring_prep_read 用于準(zhǔn)備讀取操作,io_uring_prep_write 用于準(zhǔn)備寫入操作,io_uring_prep_accept 用于準(zhǔn)備異步接受連接操作等。

以 io_uring_prep_read 為例,它接受多個(gè)參數(shù),包括指向提交隊(duì)列項(xiàng)(SQE)的指針、目標(biāo)文件描述符、讀取數(shù)據(jù)的緩沖區(qū)地址、讀取的字節(jié)數(shù)以及文件中的偏移量等。函數(shù)會根據(jù)這些參數(shù),將 I/O 請求的相關(guān)信息填充到 SQE 結(jié)構(gòu)體中,包括設(shè)置操作類型(如 IORING_OP_READ)、目標(biāo)文件描述符、緩沖區(qū)地址、數(shù)據(jù)長度、偏移量等字段。

除了基本的 I/O 操作參數(shù)外,還可以設(shè)置一些額外的標(biāo)志位和選項(xiàng),如 I/O 操作的優(yōu)先級、是否使用直接 I/O 等,以滿足不同的應(yīng)用需求。通過這些函數(shù),應(yīng)用程序可以靈活地構(gòu)建各種類型的 I/O 請求,并將其準(zhǔn)備好以便提交到內(nèi)核中進(jìn)行處理。

③提交 I/O 請求

當(dāng) I/O 請求準(zhǔn)備好后,需要將其提交到內(nèi)核中執(zhí)行。這通過調(diào)用 io_uring_submit 函數(shù)(內(nèi)部調(diào)用 io_uring_enter 系統(tǒng)調(diào)用)來實(shí)現(xiàn)。在提交 I/O 請求時(shí),首先應(yīng)用程序會將準(zhǔn)備好的 SQE 添加到提交隊(duì)列(SQ)中。SQ 是一個(gè)環(huán)形緩沖區(qū),應(yīng)用程序通過操作 SQ Ring 中的 tail 指針來將 SQE 放入隊(duì)列。具體來說,應(yīng)用程序會將 tail 指向的 SQE 填充為準(zhǔn)備好的 I/O 請求信息,然后將 tail 指針遞增,指向下一個(gè)空閑的 SQE 位置。在填充 SQE 時(shí),需要注意按照 SQE 結(jié)構(gòu)體的定義,正確設(shè)置各項(xiàng)字段,確保 I/O 請求的信息準(zhǔn)確無誤。

默認(rèn)情況下,使用 io_uring 提交 I/O 請求需要:

  • 從SQ Arrary中找到一個(gè)空閑的SQE;
  • 根據(jù)具體的I/O請求設(shè)置該SQE;
  • 將SQE的數(shù)組索引放到SQ中;
  • 調(diào)用系統(tǒng)調(diào)用io_uring_enter提交SQ中的I/O請求。

圖片圖片

當(dāng)所有要提交的 I/O 請求都添加到 SQ 中后,調(diào)用 io_uring_submit 函數(shù),該函數(shù)會觸發(fā) io_uring_enter 系統(tǒng)調(diào)用,將 SQ 中的 I/O 請求提交給內(nèi)核。內(nèi)核接收到請求后,會從 SQ 中獲取 SQE,并根據(jù) SQE 中的信息執(zhí)行相應(yīng)的 I/O 操作。在這個(gè)過程中,由于 SQ 是用戶態(tài)和內(nèi)核態(tài)共享的內(nèi)存區(qū)域,避免了數(shù)據(jù)的多次拷貝和額外的系統(tǒng)調(diào)用開銷,提高了 I/O 請求提交的效率。

④等待 IO 請求完成

提交 I/O 請求后,應(yīng)用程序可以選擇等待請求完成。等待 I/O 請求完成有兩種主要方式。一種是使用 io_uring_wait_cqe 函數(shù),該函數(shù)會阻塞調(diào)用線程,直到至少有一個(gè) I/O 請求完成,并返回完成的完成隊(duì)列項(xiàng)(CQE)。當(dāng)調(diào)用 io_uring_wait_cqe 時(shí),它會檢查完成隊(duì)列(CQ)中是否有新完成的 I/O 請求。如果沒有,線程會進(jìn)入阻塞狀態(tài),直到內(nèi)核將完成的 I/O 請求結(jié)果放入 CQ 中。一旦有新的 CQE 可用,io_uring_wait_cqe 會返回該 CQE,應(yīng)用程序可以通過 CQE 獲取 I/O 操作的結(jié)果。

另一種方式是使用 io_uring_peek_batch_cqe 函數(shù),它是非阻塞的,用于檢查 CQ 中是否有已經(jīng)完成的 I/O 請求。如果有,它會返回已完成的 CQE 列表,應(yīng)用程序可以根據(jù)返回的 CQE 進(jìn)行相應(yīng)的處理;如果沒有完成的請求,函數(shù)會立即返回,應(yīng)用程序可以繼續(xù)執(zhí)行其他任務(wù),然后在適當(dāng)?shù)臅r(shí)候再次調(diào)用該函數(shù)檢查 CQ。這兩種方式為應(yīng)用程序提供了靈活的等待策略,使其可以根據(jù)自身的業(yè)務(wù)需求和性能要求,選擇合適的方式來處理 I/O 請求的完成事件。

⑤獲取 IO 請求結(jié)果

當(dāng) I/O 請求完成后,應(yīng)用程序需要從完成隊(duì)列(CQ)中獲取結(jié)果。這可以通過 io_uring_peek_cqe 函數(shù)來實(shí)現(xiàn)。io_uring_peek_cqe 函數(shù)用于從 CQ 中獲取一個(gè)完成的 CQE,而不將其從隊(duì)列中移除。應(yīng)用程序獲取到 CQE 后,可以根據(jù) CQE 中的信息來處理完成的 I/O 請求。CQE 中包含了豐富的信息,如 I/O 操作的返回值、狀態(tài)碼、用戶自定義數(shù)據(jù)等。例如,對于文件讀取操作,CQE 中的返回值表示實(shí)際讀取的字節(jié)數(shù),狀態(tài)碼用于指示操作是否成功,若操作失敗,狀態(tài)碼會包含具體的錯(cuò)誤信息。

應(yīng)用程序可以根據(jù)這些信息進(jìn)行相應(yīng)的處理,如讀取數(shù)據(jù)并進(jìn)行后續(xù)的業(yè)務(wù)邏輯處理,或者在操作失敗時(shí)進(jìn)行錯(cuò)誤處理,如記錄錯(cuò)誤日志、重新嘗試 I/O 操作等。在獲取 CQE 后,應(yīng)用程序通常會根據(jù) I/O 操作的類型和結(jié)果,執(zhí)行相應(yīng)的業(yè)務(wù)邏輯,以實(shí)現(xiàn)應(yīng)用程序的功能需求。

⑥釋放 IO 請求結(jié)果

在獲取并處理完 IO 請求結(jié)果后,需要釋放該結(jié)果,以便內(nèi)核可以繼續(xù)使用完成隊(duì)列(CQ)。這通過調(diào)用 io_uring_cqe_seen 函數(shù)來實(shí)現(xiàn)。io_uring_cqe_seen 函數(shù)的作用是標(biāo)記一個(gè)完成的 CQE 已經(jīng)被處理,它會將 CQ Ring 中的 head 指針遞增,指向下一個(gè)未處理的 CQE。通過這種方式,內(nèi)核可以知道哪些 CQE 已經(jīng)被應(yīng)用程序處理,從而可以繼續(xù)向 CQ 中放入新的完成結(jié)果。

在釋放 IO 請求結(jié)果時(shí),需要注意確保已經(jīng)完成了對 CQE 中信息的處理,避免在釋放后再次訪問已釋放的 CQE。同時(shí),及時(shí)釋放 CQE 也有助于提高系統(tǒng)的性能和資源利用率,避免 CQ 隊(duì)列被占用過多而影響后續(xù) I/O 請求結(jié)果的存儲和處理。通過正確地釋放 IO 請求結(jié)果,保證了 io_uring 的工作流程能夠持續(xù)高效地運(yùn)行,為應(yīng)用程序提供穩(wěn)定的異步 I/O 服務(wù)。

三、io_uring案例分析

3.1簡單文件讀寫案例

⑴代碼實(shí)現(xiàn)

#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include <sys/ioctl.h>
#include <linux/io_uring.h>

int main() {
    struct io_uring ring;
    struct io_uring_sqe *sqe;
    struct io_uring_cqe *cqe;
    int fd, ret;
    // 打開文件
    fd = open("example.txt", O_RDONLY);
    if (fd < 0) {
        perror("Failed to open file");
        return 1;
    }
    // 初始化io_uring
    io_uring_queue_init(8, &ring, 0);
    // 獲取一個(gè)提交隊(duì)列條目
    sqe = io_uring_get_sqe(&ring);
    if (!sqe) {
        fprintf(stderr, "Could not get sqe\n");
        return 1;
    }
    // 準(zhǔn)備異步讀操作
    char *buf = malloc(1024);
    io_uring_prep_read(sqe, fd, buf, 1024, 0);
    // 提交請求
    io_uring_submit(&ring);
    // 等待完成
    ret = io_uring_wait_cqe(&ring, &cqe);
    if (ret < 0) {
        perror("io_uring_wait_cqe");
        return 1;
    }
    // 檢查結(jié)果
    if (cqe->res < 0) {
        fprintf(stderr, "Async read failed: %s\n", strerror(-cqe->res));
    } else {
        printf("Read %d bytes: %s\n", cqe->res, buf);
    }
    // 釋放資源
    io_uring_cqe_seen(&ring, cqe);
    io_uring_queue_exit(&ring);
    close(fd);
    free(buf);
    return 0;
}

代碼解讀

文件打開:fd = open("example.txt", O_RDONLY); 這行代碼使用 open 函數(shù)打開名為 example.txt 的文件,以只讀模式(O_RDONLY)打開。如果打開失敗,open 函數(shù)會返回一個(gè)負(fù)數(shù),并通過 perror 函數(shù)打印錯(cuò)誤信息,然后程序返回錯(cuò)誤代碼 1。

io_uring 初始化:io_uring_queue_init(8, &ring, 0); 這行代碼用于初始化 io_uring 實(shí)例。其中,第一個(gè)參數(shù) 8 表示提交隊(duì)列(SQ)和完成隊(duì)列(CQ)的大小,即隊(duì)列中可以容納的 I/O 請求數(shù)量;第二個(gè)參數(shù) &ring 是指向 io_uring 結(jié)構(gòu)體的指針,用于存儲初始化后的 io_uring 實(shí)例;第三個(gè)參數(shù) 0 表示使用默認(rèn)的初始化標(biāo)志。

獲取提交隊(duì)列條目:sqe = io_uring_get_sqe(&ring); 從 io_uring 的提交隊(duì)列中獲取一個(gè)提交隊(duì)列項(xiàng)(SQE)。如果獲取失敗,io_uring_get_sqe 函數(shù)會返回 NULL,程序會打印錯(cuò)誤信息并返回錯(cuò)誤代碼 1。

準(zhǔn)備異步讀操作:

char *buf = malloc(1024); //分配 1024 字節(jié)的內(nèi)存空間,用于存儲讀取的文件數(shù)據(jù)。

io_uring_prep_read(sqe, fd, buf, 1024, 0); 使用 io_uring_prep_read 函數(shù)準(zhǔn)備一個(gè)異步讀操作。它接受五個(gè)參數(shù),第一個(gè)參數(shù) sqe 是之前獲取的提交隊(duì)列項(xiàng);第二個(gè)參數(shù) fd 是要讀取的文件描述符;第三個(gè)參數(shù) buf 是用于存儲讀取數(shù)據(jù)的緩沖區(qū);第四個(gè)參數(shù) 1024 表示要讀取的字節(jié)數(shù);第五個(gè)參數(shù) 0 表示從文件的起始位置開始讀取。

提交請求:io_uring_submit(&ring); 將準(zhǔn)備好的 I/O 請求提交到內(nèi)核中執(zhí)行。這個(gè)函數(shù)會觸發(fā) io_uring_enter 系統(tǒng)調(diào)用,將提交隊(duì)列中的請求提交給內(nèi)核。

等待完成:ret = io_uring_wait_cqe(&ring, &cqe); 等待 I/O 操作完成。這個(gè)函數(shù)會阻塞調(diào)用線程,直到至少有一個(gè) I/O 請求完成,并返回完成的完成隊(duì)列項(xiàng)(CQE)。如果等待過程中出現(xiàn)錯(cuò)誤,io_uring_wait_cqe 函數(shù)會返回一個(gè)負(fù)數(shù),程序會通過 perror 函數(shù)打印錯(cuò)誤信息并返回錯(cuò)誤代碼 1。

檢查結(jié)果:

if (cqe->res < 0) 檢查 I/O 操作的結(jié)果。如果 cqe->res 小于 0,表示操作失敗,通過 fprintf 函數(shù)打印錯(cuò)誤信息。

else 分支表示操作成功,打印實(shí)際讀取的字節(jié)數(shù)和讀取到的數(shù)據(jù)。

釋放資源:

io_uring_cqe_seen(&ring, cqe); /* 知內(nèi)核已經(jīng)處理完一個(gè)完成事件,
           釋放相關(guān)資源。這通過將完成隊(duì)列的頭部指針遞增來實(shí)現(xiàn),以便內(nèi)核可以繼續(xù)使用完成隊(duì)列。*/

io_uring_queue_exit(&ring); 釋放 io_uring 實(shí)例所占用的資源,包括提交隊(duì)列和完成隊(duì)列等。

close(fd); 關(guān)閉之前打開的文件。

free(buf); 釋放之前分配的內(nèi)存緩沖區(qū)。

3.2網(wǎng)絡(luò)編程案例(TCP 服務(wù)器)

⑴代碼實(shí)現(xiàn)

#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <liburing.h>

#define ENTRIES_LENGTH 4096
#define MAX_CONNECTIONS 1024
#define BUFFER_LENGTH 1024

char buf_table[MAX_CONNECTIONS][BUFFER_LENGTH] = {0};

enum {
    READ,
    WRITE,
    ACCEPT,
};

struct conninfo {
    int connfd;
    int type;
};

void set_read_event(struct io_uring *ring, int fd, void *buf, size_t len, int flags) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_recv(sqe, fd, buf, len, flags);
    struct conninfo ci = {.connfd = fd,.type = READ};
    memcpy(&sqe->user_data, &ci, sizeof(struct conninfo));
}

void set_write_event(struct io_uring *ring, int fd, const void *buf, size_t len, int flags) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_send(sqe, fd, buf, len, flags);
    struct conninfo ci = {.connfd = fd,.type = WRITE};
    memcpy(&sqe->user_data, &ci, sizeof(struct conninfo));
}

void set_accept_event(struct io_uring *ring, int fd, struct sockaddr *cliaddr, socklen_t *clilen, unsigned flags) {
    struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
    io_uring_prep_accept(sqe, fd, cliaddr, clilen, flags);
    struct conninfo ci = {.connfd = fd,.type = ACCEPT};
    memcpy(&sqe->user_data, &ci, sizeof(struct conninfo));
}

int main() {
    int listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if (listenfd == -1) return -1;
    struct sockaddr_in servaddr, clientaddr;
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(9999);
    if (-1 == bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr))) {
        return -2;
    }
    listen(listenfd, 10);
    struct io_uring_params params;
    memset(?ms, 0, sizeof(params));
    struct io_uring ring;
    memset(&ring, 0, sizeof(ring));
    /*初始化params 和 ring*/
    io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ?ms);
    socklen_t clilen = sizeof(clientaddr);
    set_accept_event(&ring, listenfd, (struct sockaddr *)&clientaddr, &clilen, 0);
    while (1) {
        struct io_uring_cqe *cqe;
        io_uring_submit(&ring);
        int ret = io_uring_wait_cqe(&ring, &cqe);
        struct io_uring_cqe *cqes[10];
        int cqecount = io_uring_peek_batch_cqe(&ring, cqes, 10);
        unsigned count = 0;
        for (int i = 0; i < cqecount; i++) {
            cqe = cqes[i];
            count++;
            struct conninfo ci;
            memcpy(&ci, &cqe->user_data, sizeof(ci));
            if (ci.type == ACCEPT) {
                int connfd = cqe->res;
                char *buffer = buf_table[connfd];
                set_read_event(&ring, connfd, buffer, 1024, 0);
                set_accept_event(&ring, listenfd, (struct sockaddr *)&clientaddr, &clilen, 0);
            } else if (ci.type == READ) {
                int bytes_read = cqe->res;
                if (bytes_read == 0) {
                    close(ci.connfd);
                } else if (bytes_read < 0) {
                    close(ci.connfd);
                    printf("client %d disconnected!\n", ci.connfd);
                } else {
                    char *buffer = buf_table[ci.connfd];
                    set_write_event(&ring, ci.connfd, buffer, bytes_read, 0);
                }
            } else if (ci.type == WRITE) {
                char *buffer = buf_table[ci.connfd];
                set_read_event(&ring, ci.connfd, buffer, 1024, 0);
            }
        }
        io_uring_cq_advance(&ring, count);
    }
    return 0;
}

⑵代碼解讀

創(chuàng)建監(jiān)聽套接字:int listenfd = socket(AF_INET, SOCK_STREAM, 0); 使用 socket 函數(shù)創(chuàng)建一個(gè) TCP 套接字,AF_INET 表示使用 IPv4 協(xié)議,SOCK_STREAM 表示使用流式套接字(即 TCP 協(xié)議),0 表示默認(rèn)協(xié)議。如果創(chuàng)建失敗,socket 函數(shù)會返回 -1,程序返回 -1。

綁定地址和端口:

填充服務(wù)器地址結(jié)構(gòu)體 servaddr,包括地址族(AF_INET)、IP 地址(INADDR_ANY 表示綁定到所有可用的網(wǎng)絡(luò)接口)和端口號(9999)。

if (-1 == bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr))) 使用 bind 函數(shù)將創(chuàng)建的套接字綁定到指定的地址和端口。如果綁定失敗,bind 函數(shù)返回 -1,程序返回 -2。

監(jiān)聽連接:listen(listenfd, 10); 使用 listen 函數(shù)開始監(jiān)聽套接字,第二個(gè)參數(shù) 10 表示最大連接數(shù),即允許同時(shí)存在的未處理連接請求的最大數(shù)量。

初始化 io_uring:

struct io_uring_params params; 和 struct io_uring ring; 分別定義了 io_uring 的參數(shù)結(jié)構(gòu)體和實(shí)例結(jié)構(gòu)體。 memset(&params, 0, sizeof(params)); 和 memset(&ring, 0, sizeof(ring)); 初始化這兩個(gè)結(jié)構(gòu)體的內(nèi)容為 0。 io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params); 使用 io_uring_queue_init_params 函數(shù)初始化 io_uring 實(shí)例,ENTRIES_LENGTH 表示提交隊(duì)列和完成隊(duì)列的大小,&ring 是指向 io_uring 實(shí)例的指針,&params 是指向參數(shù)結(jié)構(gòu)體的指針。

設(shè)置接受連接事件:set_accept_event(&ring, listenfd, (struct sockaddr *)&clientaddr, &clilen, 0); 調(diào)用 set_accept_event 函數(shù)設(shè)置一個(gè)接受連接的異步事件。在這個(gè)函數(shù)中,首先獲取一個(gè)提交隊(duì)列項(xiàng)(SQE),然后使用 io_uring_prep_accept 函數(shù)準(zhǔn)備接受連接的請求,將相關(guān)信息(如監(jiān)聽套接字、客戶端地址、地址長度等)填充到 SQE 中,并將自定義的連接信息結(jié)構(gòu)體 conninfo 復(fù)制到 SQE 的用戶數(shù)據(jù)區(qū)域,用于標(biāo)識該請求的類型和相關(guān)連接信息。

事件循環(huán)處理:

while (1) 進(jìn)入一個(gè)無限循環(huán),用于持續(xù)處理 I/O 事件。

io_uring_submit(&ring); 提交準(zhǔn)備好的 I/O 請求到內(nèi)核。

int ret = io_uring_wait_cqe(&ring, &cqe); 等待 I/O 操作完成,獲取完成的完成隊(duì)列項(xiàng)(CQE)。

struct io_uring_cqe *cqes[10]; 和 int cqecount = io_uring_peek_batch_cqe(&ring, cqes, 10); 使用 io_uring_peek_batch_cqe 函數(shù)嘗試批量獲取完成的 CQE,最多獲取 10 個(gè)。

遍歷獲取到的 CQE:

struct conninfo ci; 和 memcpy(&ci, &cqe->user_data, sizeof(ci)); 從 CQE 的用戶數(shù)據(jù)區(qū)域復(fù)制之前設(shè)置的連接信息結(jié)構(gòu)體 conninfo。

根據(jù)連接信息中的類型(ci.type)進(jìn)行不同的處理:

如果是 ACCEPT 類型,表示有新的連接請求被接受。獲取新的連接描述符 connfd,設(shè)置讀取事件,準(zhǔn)備從新連接中讀取數(shù)據(jù),并再次設(shè)置接受連接事件,以便繼續(xù)接受新的連接請求。 如果是 READ 類型,表示有數(shù)據(jù)可讀。根據(jù)讀取的字節(jié)數(shù)進(jìn)行處理,如果讀取到的字節(jié)數(shù)為 0,表示客戶端斷開連接,關(guān)閉連接;如果讀取失?。ㄗ止?jié)數(shù)小于 0),也關(guān)閉連接并打印斷開連接的信息;如果讀取成功,設(shè)置寫入事件,將讀取到的數(shù)據(jù)回顯給客戶端。 如果是 WRITE 類型,表示數(shù)據(jù)寫入完成,設(shè)置讀取事件,準(zhǔn)備從客戶端讀取下一次的數(shù)據(jù)。 io_uring_cq_advance(&ring, count); 告知內(nèi)核已經(jīng)處理完 count 個(gè)完成事件,通過將完成隊(duì)列的頭部指針遞增 count 個(gè)位置,以便內(nèi)核可以繼續(xù)使用完成隊(duì)列。

3.3性能對比測試

⑴測試環(huán)境與方法

測試環(huán)境搭建:在一臺配備 Intel (R) Xeon (R) CPU E5 - 2682 v4 @ 2.50GHz 處理器、16GB 內(nèi)存、運(yùn)行 Linux 5.10 內(nèi)核的服務(wù)器上進(jìn)行測試。使用的存儲設(shè)備為 NVMe SSD,以確保 I/O 性能不受磁盤性能的過多限制。測試機(jī)器的網(wǎng)絡(luò)配置為千兆以太網(wǎng),以保證網(wǎng)絡(luò)傳輸?shù)姆€(wěn)定性。

⑵測試方法設(shè)計(jì)

針對文件讀寫場景,使用 fio 工具進(jìn)行測試。分別設(shè)置不同的 I/O 模式,包括阻塞 I/O、非阻塞 I/O、epoll 以及 io_uring。對于每種模式,進(jìn)行多次測試,每次測試設(shè)置不同的文件大?。ㄈ?1MB、10MB、100MB)和 I/O 操作類型(如隨機(jī)讀、順序讀、隨機(jī)寫、順序?qū)懀?。在每次測試中,fio 工具會按照設(shè)定的參數(shù)進(jìn)行 I/O 操作,并記錄操作的時(shí)間、吞吐量等性能指標(biāo)。例如,在隨機(jī)讀測試中,fio 會隨機(jī)讀取文件中的數(shù)據(jù)塊,并統(tǒng)計(jì)單位時(shí)間內(nèi)讀取的數(shù)據(jù)量。

在網(wǎng)絡(luò)編程場景下,搭建一個(gè)簡單的 echo 服務(wù)器模型,分別使用 epoll 和 io_uring 實(shí)現(xiàn)??蛻舳送ㄟ^多線程模擬大量并發(fā)連接,向服務(wù)器發(fā)送數(shù)據(jù)并接收服務(wù)器回顯的數(shù)據(jù)。在測試過程中,逐漸增加并發(fā)連接數(shù),從 100 個(gè)連接開始,每次增加 100 個(gè),直到達(dá)到 1000 個(gè)連接。使用 iperf 等工具測量不同并發(fā)連接數(shù)下的 QPS(每秒查詢率)、延遲等性能指標(biāo)。iperf 工具會在客戶端和服務(wù)器之間建立 TCP 連接,發(fā)送一定量的數(shù)據(jù),并記錄數(shù)據(jù)傳輸?shù)乃俾?、延遲等信息。

⑶測試結(jié)果分析

文件讀寫性能:在小文件(1MB)讀寫測試中,阻塞 I/O 由于線程阻塞等待 I/O 操作完成,導(dǎo)致其吞吐量最低,平均吞吐量約為 50MB/s。非阻塞 I/O 雖然避免了線程阻塞,但頻繁的輪詢使得 CPU 利用率較高,且由于 I/O 操作的碎片化,其吞吐量也不高,平均約為 80MB/s。epoll 在處理多個(gè)文件描述符的 I/O 事件時(shí),通過高效的事件通知機(jī)制,提高了 I/O 操作的效率,平均吞吐量達(dá)到 120MB/s。

四、io_uring的應(yīng)用場景及未來發(fā)展

4.1適用場景探討

數(shù)據(jù)庫系統(tǒng):在數(shù)據(jù)庫系統(tǒng)中,大量的數(shù)據(jù)讀寫操作對 I/O 性能要求極高。io_uring 的高效異步 I/O 特性能夠顯著提升數(shù)據(jù)庫的性能。以關(guān)系型數(shù)據(jù)庫 MySQL 為例,在處理大量并發(fā)查詢和更新操作時(shí),傳統(tǒng)的 I/O 模型會導(dǎo)致線程頻繁阻塞和上下文切換,從而降低系統(tǒng)的響應(yīng)速度。而使用 io_uring,MySQL 可以將 I/O 請求異步提交到內(nèi)核,內(nèi)核在后臺處理這些請求,當(dāng)請求完成時(shí),通過完成隊(duì)列通知 MySQL。這樣,MySQL 的線程在 I/O 操作期間可以繼續(xù)執(zhí)行其他任務(wù),如查詢優(yōu)化、事務(wù)處理等,大大提高了系統(tǒng)的并發(fā)處理能力。

同時(shí),io_uring 支持直接 I/O 模式,這對于數(shù)據(jù)庫系統(tǒng)來說非常重要,因?yàn)閿?shù)據(jù)庫通常需要直接訪問存儲設(shè)備以提高數(shù)據(jù)讀寫的效率,避免了操作系統(tǒng)頁緩存帶來的額外開銷。此外,io_uring 的批量提交和處理能力,使得數(shù)據(jù)庫在進(jìn)行大規(guī)模數(shù)據(jù)導(dǎo)入、導(dǎo)出等操作時(shí),能夠一次性提交多個(gè) I/O 請求,減少系統(tǒng)調(diào)用次數(shù),進(jìn)一步提升了 I/O 性能。

網(wǎng)絡(luò)服務(wù)器:在網(wǎng)絡(luò)服務(wù)器領(lǐng)域,io_uring 同樣展現(xiàn)出了巨大的優(yōu)勢。以 Nginx 服務(wù)器為例,傳統(tǒng)的基于 epoll 的 I/O 模型在處理高并發(fā)連接時(shí),雖然通過事件驅(qū)動(dòng)機(jī)制提高了 I/O 的效率,但在 I/O 操作過程中,仍然存在一定的上下文切換開銷。而 io_uring 通過用戶態(tài)和內(nèi)核態(tài)共享的提交隊(duì)列和完成隊(duì)列,減少了系統(tǒng)調(diào)用和上下文切換的次數(shù),使得 Nginx 在處理大量并發(fā)連接時(shí),能夠更加高效地進(jìn)行數(shù)據(jù)的讀寫操作。

例如,當(dāng)有大量客戶端同時(shí)請求 Nginx 服務(wù)器時(shí),Nginx 可以使用 io_uring 將這些請求的 I/O 操作異步提交到內(nèi)核,內(nèi)核在后臺處理這些請求,并將完成結(jié)果放入完成隊(duì)列。Nginx 可以隨時(shí)從完成隊(duì)列中獲取完成的 I/O 操作結(jié)果,進(jìn)行相應(yīng)的處理,如返回響應(yīng)數(shù)據(jù)給客戶端。這種方式大大提高了 Nginx 的并發(fā)處理能力,降低了延遲,提升了服務(wù)器的性能和響應(yīng)速度。同時(shí),io_uring 支持網(wǎng)絡(luò)套接字的異步操作,使得 Nginx 在處理網(wǎng)絡(luò)連接的建立、斷開以及數(shù)據(jù)傳輸?shù)炔僮鲿r(shí),能夠更加靈活和高效。

文件存儲系統(tǒng):在文件存儲系統(tǒng)中,io_uring 的應(yīng)用可以有效提升文件的讀寫性能和系統(tǒng)的整體效率。以 Ceph 分布式文件系統(tǒng)為例,它需要處理大量的文件讀寫請求,并且要保證數(shù)據(jù)的一致性和可靠性。使用 io_uring 后,Ceph 可以將文件讀寫請求異步提交到內(nèi)核,利用內(nèi)核的高效 I/O 處理能力來完成這些請求。

在文件讀取時(shí),io_uring 可以提前將文件數(shù)據(jù)預(yù)讀到內(nèi)存中,當(dāng)應(yīng)用程序請求數(shù)據(jù)時(shí),能夠快速從內(nèi)存中獲取,減少了磁盤 I/O 的等待時(shí)間。在文件寫入時(shí),io_uring 可以將數(shù)據(jù)異步寫入磁盤,同時(shí)應(yīng)用程序可以繼續(xù)執(zhí)行其他任務(wù),提高了系統(tǒng)的并發(fā)性能。此外,io_uring 的零拷貝特性在文件存儲系統(tǒng)中也具有重要意義,它減少了數(shù)據(jù)在內(nèi)存中的拷貝次數(shù),提高了數(shù)據(jù)傳輸?shù)男?,降低?CPU 的開銷。這對于大規(guī)模文件存儲系統(tǒng)來說,能夠顯著提升系統(tǒng)的性能和可擴(kuò)展性,更好地滿足用戶對文件存儲和訪問的需求。

4.2未來發(fā)展趨勢展望

內(nèi)核支持的增強(qiáng):隨著 Linux 內(nèi)核的不斷發(fā)展,對 io_uring 的支持有望進(jìn)一步增強(qiáng)。未來的內(nèi)核版本可能會優(yōu)化 io_uring 的實(shí)現(xiàn),減少其在高并發(fā)場景下的鎖競爭和資源爭用問題,從而進(jìn)一步提升其性能。在多線程同時(shí)訪問提交隊(duì)列和完成隊(duì)列時(shí),內(nèi)核可能會采用更高效的無鎖數(shù)據(jù)結(jié)構(gòu)或優(yōu)化的鎖機(jī)制,以確保多個(gè)線程能夠高效地進(jìn)行 I/O 請求的提交和結(jié)果的獲取。此外,內(nèi)核可能會增加對更多設(shè)備和文件系統(tǒng)的支持,使 io_uring 能夠更好地應(yīng)用于各種硬件平臺和存儲設(shè)備。

例如,對于新型的存儲設(shè)備,如基于 3D XPoint 技術(shù)的非易失性內(nèi)存,內(nèi)核可能會優(yōu)化 io_uring 的驅(qū)動(dòng)程序,充分發(fā)揮這些設(shè)備的高性能優(yōu)勢。同時(shí),對于不同的文件系統(tǒng),如 ext4、XFS、Btrfs 等,內(nèi)核可能會針對 io_uring 進(jìn)行特定的優(yōu)化,提高其在不同文件系統(tǒng)上的兼容性和性能表現(xiàn)。

應(yīng)用領(lǐng)域的拓展:io_uring 在未來有望拓展到更多的應(yīng)用領(lǐng)域。隨著物聯(lián)網(wǎng)(IoT)的快速發(fā)展,大量的物聯(lián)網(wǎng)設(shè)備需要進(jìn)行高效的數(shù)據(jù)傳輸和處理。io_uring 可以應(yīng)用于物聯(lián)網(wǎng)網(wǎng)關(guān)和邊緣計(jì)算設(shè)備中,提高這些設(shè)備在處理大量傳感器數(shù)據(jù)和設(shè)備通信時(shí)的 I/O 性能。在智能工廠中,物聯(lián)網(wǎng)網(wǎng)關(guān)需要實(shí)時(shí)采集和處理大量的生產(chǎn)設(shè)備數(shù)據(jù),使用 io_uring 可以實(shí)現(xiàn)高效的異步 I/O 操作,確保數(shù)據(jù)的及時(shí)傳輸和處理,提高生產(chǎn)效率和設(shè)備的智能化管理水平。

此外,在大數(shù)據(jù)處理和人工智能領(lǐng)域,io_uring 也具有廣闊的應(yīng)用前景。大數(shù)據(jù)處理框架如 Hadoop、Spark 等,在處理大規(guī)模數(shù)據(jù)集時(shí),需要進(jìn)行大量的文件讀寫和網(wǎng)絡(luò)傳輸操作,io_uring 可以提高這些框架的數(shù)據(jù)處理速度和效率。在人工智能訓(xùn)練和推理過程中,需要頻繁地讀取和寫入模型數(shù)據(jù)和訓(xùn)練樣本,io_uring 的高效 I/O 特性可以加速這些操作,提升人工智能系統(tǒng)的性能和響應(yīng)速度。

五、io_uring代碼實(shí)踐

#include <stdio.h>
#include <liburing.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
 
#define EVENT_ACCEPT 0
#define EVENT_READ 1
#define EVENT_WRITE 2
 
struct conn_info
{
  int fd;
  int event;
};
 
int init_server(unsigned short port)
{
 
  int sockfd = socket(AF_INET, SOCK_STREAM, 0);
  struct sockaddr_in serveraddr;
  memset(&serveraddr, 0, sizeof(struct sockaddr_in));
  serveraddr.sin_family = AF_INET;
  serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
  serveraddr.sin_port = htons(port);
 
  if (-1 == bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr)))
  {
    perror("bind");
    return -1;
  }
 
  listen(sockfd, 10);
 
  return sockfd;
}
 
#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024
 
int set_event_recv(struct io_uring *ring, int sockfd,
           void *buf, size_t len, int flags)
{
 
  struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
 
  struct conn_info accept_info = {
    .fd = sockfd,
    .event = EVENT_READ,
  };
 
  io_uring_prep_recv(sqe, sockfd, buf, len, flags);
  memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
 
int set_event_send(struct io_uring *ring, int sockfd,
           void *buf, size_t len, int flags)
{
 
  struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
 
  struct conn_info accept_info = {
    .fd = sockfd,
    .event = EVENT_WRITE,
  };
 
  io_uring_prep_send(sqe, sockfd, buf, len, flags);
  memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
 
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr,
           socklen_t *addrlen, int flags)
{
 
  struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
 
  struct conn_info accept_info = {
    .fd = sockfd,
    .event = EVENT_ACCEPT,
  };
 
  io_uring_prep_accept(sqe, sockfd, (struct sockaddr *)addr, addrlen, flags);
  memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}
 
int main(int argc, char *argv[])
{
 
  unsigned short port = 9999;
  int sockfd = init_server(port);
 
  struct io_uring_params params;
  memset(?ms, 0, sizeof(params));
 
  struct io_uring ring;
  io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ?ms);
 
#if 0
  struct sockaddr_in clientaddr;  
  socklen_t len = sizeof(clientaddr);
  accept(sockfd, (struct sockaddr*)&clientaddr, &len);
#else
 
  struct sockaddr_in clientaddr;
  socklen_t len = sizeof(clientaddr);
  set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
 
#endif
 
  char buffer[BUFFER_LENGTH] = {0};
 
  while (1)
  {
 
    io_uring_submit(&ring);
 
    struct io_uring_cqe *cqe;
    io_uring_wait_cqe(&ring, &cqe);
 
    struct io_uring_cqe *cqes[128];
    int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); // epoll_wait
 
    int i = 0;
    for (i = 0; i < nready; i++)
    {
 
      struct io_uring_cqe *entries = cqes[i];
      struct conn_info result;
      memcpy(&result, &entries->user_data, sizeof(struct conn_info));
 
      if (result.event == EVENT_ACCEPT)
      {
 
        set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
        // printf("set_event_accept\n"); //
 
        int connfd = entries->res;
 
        set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
      }
      else if (result.event == EVENT_READ)
      { //
 
        int ret = entries->res;
        // printf("set_event_recv ret: %d, %s\n", ret, buffer); //
 
        if (ret == 0)
        {
          close(result.fd);
        }
        else if (ret > 0)
        {
          set_event_send(&ring, result.fd, buffer, ret, 0);
        }
      }
      else if (result.event == EVENT_WRITE)
      {
        //
 
        int ret = entries->res;
        // printf("set_event_send ret: %d, %s\n", ret, buffer);
 
        set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);
      }
    }
 
    io_uring_cq_advance(&ring, nready);
  }
}

5.1服務(wù)器初始化

int init_server(unsigned short port)
{
	int sockfd = socket(AF_INET, SOCK_STREAM, 0);
	struct sockaddr_in serveraddr;
	memset(&serveraddr, 0, sizeof(struct sockaddr_in));
	serveraddr.sin_family = AF_INET;
	serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
	serveraddr.sin_port = htons(port);
 
	if (-1 == bind(sockfd, (struct sockaddr *)&serveraddr, sizeof(struct sockaddr)))
	{
		perror("bind");
		return -1;
	}
 
	listen(sockfd, 10);
	return sockfd;
}

該函數(shù)初始化了一個(gè) TCP 服務(wù)器套接字,用于監(jiān)聽客戶端連接請求。socket、bind 和 listen 是常規(guī)的服務(wù)器初始化步驟,將服務(wù)器綁定到指定的端口,并使其開始監(jiān)聽客戶端連接。

5.2io_uring 環(huán)境初始化

struct io_uring_params params;
memset(?ms, 0, sizeof(params));
 
struct io_uring ring;
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ?ms);

io_uring_queue_init_params 函數(shù)初始化了一個(gè) io_uring 實(shí)例,這個(gè)實(shí)例將用于管理所有的異步 I/O 操作,ENTRIES_LENGTH 定義了提交隊(duì)列和完成隊(duì)列的大小,表示可以同時(shí)處理的最大 I/O 操作數(shù)量。

5.3設(shè)置 accept 事件

struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);

set_event_accept 函數(shù)將一個(gè) accept 操作添加到 io_uring 的提交隊(duì)列中。這個(gè)操作用于接受客戶端連接請求。這一步是服務(wù)器啟動(dòng)時(shí)的初始操作,它告訴 io_uring 開始監(jiān)聽并處理客戶端連接。

5.4主循環(huán):提交操作和處理完成事件

while (1)
{
	io_uring_submit(&ring);
	struct io_uring_cqe *cqe;
	io_uring_wait_cqe(&ring, &cqe);
 
	struct io_uring_cqe *cqes[128];
	int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);
  • io_uring_submit:將之前添加到提交隊(duì)列中的所有操作提交給內(nèi)核,由內(nèi)核異步執(zhí)行這些操作。
  • io_uring_wait_cqe:等待至少一個(gè)操作完成,這是一個(gè)阻塞調(diào)用。
  • io_uring_peek_batch_cqe:批量獲取已經(jīng)完成的操作結(jié)果,nready 表示完成的操作數(shù)量。

5.5處理完成的事件

for (i = 0; i < nready; i++)
{
	struct io_uring_cqe *entries = cqes[i];
	struct conn_info result;
	memcpy(&result, &entries->user_data, sizeof(struct conn_info));
 
	if (result.event == EVENT_ACCEPT)
	{
		set_event_accept(&ring, sockfd, (struct sockaddr *)&clientaddr, &len, 0);
		int connfd = entries->res;
		set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
	}
	else if (result.event == EVENT_READ)
	{
		int ret = entries->res;
		if (ret == 0)
		{
			close(result.fd);
		}
		else if (ret > 0)
		{
			set_event_send(&ring, result.fd, buffer, ret, 0);
		}
	}
	else if (result.event == EVENT_WRITE)
	{
		int ret = entries->res;
		set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);
	}
}
  • EVENT_ACCEPT:處理 accept 事件。當(dāng)一個(gè)新的客戶端連接到來時(shí),io_uring 完成隊(duì)列會返回 EVENT_ACCEPT 事件,表示一個(gè)新的連接已經(jīng)建立。此時(shí),服務(wù)器會:重新設(shè)置 accept 事件,繼續(xù)監(jiān)聽新的客戶端連接。獲取新連接的文件描述符 connfd,并設(shè)置一個(gè) recv 事件來準(zhǔn)備接收數(shù)據(jù)。
  • EVENT_READ:處理 recv 事件。當(dāng)從客戶端接收到數(shù)據(jù)時(shí),io_uring 返回 EVENT_READ 事件。如果接收到的數(shù)據(jù)長度大于0,則會設(shè)置一個(gè) send 事件來將數(shù)據(jù)發(fā)送回客戶端。如果 ret == 0,說明客戶端關(guān)閉了連接,則關(guān)閉文件描述符。
  • EVENT_WRITE:處理 send 事件。當(dāng)數(shù)據(jù)成功發(fā)送給客戶端后,io_uring 返回 EVENT_WRITE 事件。此時(shí),服務(wù)器會再次設(shè)置一個(gè) recv 事件,準(zhǔn)備接收更多數(shù)據(jù)。

5.6完成隊(duì)列的推進(jìn)

io_uring_cq_advance(&ring, nready);

這個(gè)函數(shù)通知 io_uring,你已經(jīng)處理完了 nready 個(gè)完成隊(duì)列條目(CQE)。io_uring 可以釋放這些 CQE 供后續(xù)操作使用。

總結(jié)

io_uring 的作用:在這個(gè)示例中,io_uring 被用來高效地處理網(wǎng)絡(luò) I/O 操作。通過異步提交和處理 accept、recv、send 操作,服務(wù)器能夠高效處理多個(gè)并發(fā)連接,而無需阻塞等待每個(gè)I/O操作完成。

異步模型:io_uring 提供了一種低延遲、高并發(fā)的異步 I/O 處理方式。操作在提交后由內(nèi)核異步執(zhí)行,完成后再由應(yīng)用程序查詢并處理結(jié)果。這種方式大大減少了系統(tǒng)調(diào)用的開銷,提高了程序的并發(fā)處理能力。

關(guān)鍵點(diǎn):

  • 提交操作:使用 io_uring_prep_* 函數(shù)準(zhǔn)備操作,并提交給內(nèi)核處理。
  • 等待完成:使用 io_uring_wait_cqe 等方法等待操作完成,并獲取結(jié)果。
  • 處理結(jié)果:根據(jù)完成隊(duì)列中的事件類型(如 EVENT_ACCEPT、EVENT_READ、EVENT_WRITE)進(jìn)行相應(yīng)的處理和后續(xù)操作。
責(zé)任編輯:武曉燕 來源: 深度Linux
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號