一文搞懂epoll:高效I/O多路復(fù)用的核心技術(shù)
在 Linux 系統(tǒng)中,處理 I/O 操作有多種方式,像我們熟知的 select 和 poll 等。在連接數(shù)較少的情況下,它們或許還能應(yīng)付自如,但一旦面對大量的并發(fā)連接,它們的性能就會大打折扣,就像小馬拉大車一樣,顯得力不從心。然而,有一個技術(shù)卻能在這種高并發(fā)的場景下脫穎而出,它就是 epoll。
epoll 作為一種高效的 I/O 多路復(fù)用技術(shù),與傳統(tǒng)的 select 和 poll 相比,具有許多獨特的優(yōu)勢和強(qiáng)大的性能。它就像是一把專門為高并發(fā)場景打造的利器,能夠讓我們的程序在處理大量連接時更加高效、穩(wěn)定。那么,epoll 到底是如何做到的呢?讓我們一起深入理解 epoll,探尋它的奧秘吧。
一、epoll簡介
epoll是Linux內(nèi)核為處理大批量文件描述符而作了改進(jìn)的poll,是Linux下多路復(fù)用IO接口select/poll的增強(qiáng)版本,它能顯著提高程序在大量并發(fā)連接中只有少量活躍的情況下的系統(tǒng)CPU利用率。另一點原因就是獲取事件的時候,它無須遍歷整個被偵聽的描述符集,只要遍歷那些被內(nèi)核IO事件異步喚醒而加入Ready隊列的描述符集合就行了。
epoll除了提供select/poll那種IO事件的水平觸發(fā)(Level Triggered)外,還提供了邊緣觸發(fā)(Edge Triggered),這就使得用戶空間程序有可能緩存IO狀態(tài),減少epoll_wait/epoll_pwait的調(diào)用,提高應(yīng)用程序效率。
1.1 epoll 初印象
epoll 可是 Linux 下多路復(fù)用 I/O 接口的 “超級增強(qiáng)版”,專為應(yīng)對高并發(fā)而生。與傳統(tǒng)的 select 和 poll 相比,那優(yōu)勢可不是一星半點。select 在處理大量并發(fā)連接時,就像個沒頭蒼蠅,每次都得把所有文件描述符集合一股腦從用戶態(tài)拷貝到內(nèi)核態(tài),開銷巨大,而且還得在內(nèi)核里線性遍歷這些描述符,看看哪個 “有事”,效率低得感人,關(guān)鍵它還有個致命弱點,默認(rèn)最多只能處理 1024 個文件描述符,稍微多點連接就應(yīng)付不來。
poll 雖說在一些方面改進(jìn)了 select,比如不需要計算最大文件描述符加一的大小,對大批文件描述符處理速度稍快,基于鏈表存儲沒了最大連接數(shù)限制,但本質(zhì)上還是得遍歷所有描述符找就緒的,大量無謂的遍歷讓它在高并發(fā)下也力不從心。
epoll 就不一樣了,它采用全新的設(shè)計理念。當(dāng)創(chuàng)建一個 epoll 實例后,在內(nèi)核中有個精心構(gòu)建的數(shù)據(jù)結(jié)構(gòu),像是用紅黑樹來高效管理所有要監(jiān)聽的文件描述符,添加、刪除操作那叫一個快,時間復(fù)雜度僅 O (log n);還有個就緒列表,通常用雙向鏈表實現(xiàn),專門存放已經(jīng)就緒、有事件發(fā)生的文件描述符。
當(dāng)調(diào)用 epoll_wait 時,壓根不用像 select、poll 那樣大海撈針般遍歷所有描述符,只需瞅瞅這個就緒列表就行,輕松定位到 “有事” 的連接,大大節(jié)省了 CPU 時間。就好比在一個大型倉庫里找?guī)准囟ㄎ锲?,select 和 poll 是逐個貨架、逐件貨物查看,epoll 則是有個智能清單,直接指引到目標(biāo)貨物所在貨架,效率高下立判。這使得 epoll 在面對海量并發(fā)連接時,系統(tǒng)資源開銷小,響應(yīng)迅速,成為眾多高性能網(wǎng)絡(luò)應(yīng)用的堅實后盾。
epoll除了提供select/poll那種IO事件的電平觸發(fā) (Level Triggered)外,還提供了邊沿觸發(fā)(Edge Triggered),這就使得用戶空間程序有可能緩存IO狀態(tài),減少epoll_wait/epoll_pwait的調(diào)用,提高應(yīng)用程序效率。Linux2.6內(nèi)核中對/dev/epoll設(shè)備的訪問的封裝(system epoll)。這個使我們開發(fā)網(wǎng)絡(luò)應(yīng)用程序更加簡單,并且更加高效。
1.2 為什么要使用epoll?
同樣,我們在linux系統(tǒng)下,影響效率的依然是I/O操作,linux提供給我們select/poll/epoll等多路復(fù)用I/O方式(kqueue暫時沒研究過),為什么我們對epoll情有獨鐘呢?原因如下:
⑴文件描述符數(shù)量的對比
epoll并沒有fd(文件描述符)的上限,它只跟系統(tǒng)內(nèi)存有關(guān),我的2G的ubuntu下查看是20480個,輕松支持20W個fd??墒褂萌缦旅畈榭矗?/p>
cat /proc/sys/fs/file-max
再來看select/poll,有一個限定的fd的數(shù)量,linux/posix_types.h頭文件中
#define __FD_SETSIZE 1024
⑵效率對比
當(dāng)然了,你可以修改上述值,然后重新編譯內(nèi)核,然后再次寫代碼,這也是沒問題的,不過我先說說select/poll的機(jī)制,估計你馬上會作廢上面修改枚舉值的想法。
select/poll會因為監(jiān)聽fd的數(shù)量而導(dǎo)致效率低下,因為它是輪詢所有fd,有數(shù)據(jù)就處理,沒數(shù)據(jù)就跳過,所以fd的數(shù)量會降低效率;而epoll只處理就緒的fd,它有一個就緒設(shè)備的隊列,每次只輪詢該隊列的數(shù)據(jù),然后進(jìn)行處理。
⑶內(nèi)存處理方式對比
不管是哪種I/O機(jī)制,都無法避免fd在操作過程中拷貝的問題,而epoll使用了mmap(是指文件/對象的內(nèi)存映射,被映射到多個內(nèi)存頁上),所以同一塊內(nèi)存就可以避免這個問題。
btw:TCP/IP協(xié)議棧使用內(nèi)存池管理sk_buff結(jié)構(gòu),你還可以通過修改內(nèi)存池pool的大小,畢竟linux支持各種微調(diào)內(nèi)核。
二、epoll核心原理
2.1 epoll的工作方式
epoll分為兩種工作方式LT和ET:
LT(level triggered) 是默認(rèn)/缺省的工作方式,同時支持 block和no_block socket。這種工作方式下,內(nèi)核會通知你一個fd是否就緒,然后才可以對這個就緒的fd進(jìn)行I/O操作。就算你沒有任何操作,系統(tǒng)還是會繼續(xù)提示fd已經(jīng)就緒,不過這種工作方式出錯會比較小,傳統(tǒng)的select/poll就是這種工作方式的代表。
ET(edge-triggered) 是高速工作方式,僅支持no_block socket,這種工作方式下,當(dāng)fd從未就緒變?yōu)榫途w時,內(nèi)核會通知fd已經(jīng)就緒,并且內(nèi)核認(rèn)為你知道該fd已經(jīng)就緒,不會再次通知了,除非因為某些操作導(dǎo)致fd就緒狀態(tài)發(fā)生變化。如果一直不對這個fd進(jìn)行I/O操作,導(dǎo)致fd變?yōu)槲淳途w時,內(nèi)核同樣不會發(fā)送更多的通知,因為only once。所以這種方式下,出錯率比較高,需要增加一些檢測程序。
LT可以理解為水平觸發(fā),只要有數(shù)據(jù)可以讀,不管怎樣都會通知。而ET為邊緣觸發(fā),只有狀態(tài)發(fā)生變化時才會通知,可以理解為電平變化。
2.2 如何使用epoll?
使用epoll很簡單,只需要:
#include <sys/epoll.h>
有三個關(guān)鍵函數(shù):
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_events* event);
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
當(dāng)然了,不要忘記關(guān)閉函數(shù)。
epoll和select
epoll 和 select 的主要區(qū)別是:
epoll 監(jiān)聽的 fd(file descriptor)集合是常駐內(nèi)核的,它有 3 個系統(tǒng)調(diào)用 (epoll_create, epoll_wait, epoll_ctl),通過 epoll_wait
select 只有一個系統(tǒng)調(diào)用,每次要監(jiān)聽都要將其從用戶態(tài)傳到內(nèi)核,有事件時返回整個集合。
從性能上看,如果 fd 集合很大,用戶態(tài)和內(nèi)核態(tài)之間數(shù)據(jù)復(fù)制的花銷是很大的,所以 select 一般限制 fd 集合最大1024。
從使用上看,epoll 返回的是可用的 fd 子集,select 返回的是全部,哪些可用需要用戶遍歷判斷。
盡管如此,epoll 的性能并不必然比 select 高,對于 fd 數(shù)量較少并且 fd IO 都非常繁忙的情況 select 在性能上有優(yōu)勢。
2.3 epoll原理
⑴為什么要 I/O 多路復(fù)用
epoll 是一個優(yōu)秀的 I/O 多路復(fù)用方式。所以,在講解 epoll 之前,我們先來看一下為什么需要 I/O 多路復(fù)用。
阻塞 OR 非阻塞
我們知道,對于 linux 來說,I/O 設(shè)備為特殊的文件,讀寫和文件是差不多的,但是 I/O 設(shè)備因為讀寫與內(nèi)存讀寫相比,速度差距非常大。與 cpu 讀寫速度更是沒法比,所以相比于對內(nèi)存的讀寫,I/O 操作總是拖后腿的那個。網(wǎng)絡(luò) I/O 更是如此,我們很多時候不知道網(wǎng)絡(luò) I/O 什么時候到來,就好比我們點了一份外賣,不知道外賣小哥們什么時候送過來,這個時候有兩個處理辦法:
- 第一個是我們可以先去睡覺,外賣小哥送到樓下了自然會給我們打電話,這個時候我們在醒來取外賣就可以了。
- 第二個是我們可以每隔一段時間就給外賣小哥打個電話,這樣就能實時掌握外賣的動態(tài)信息了。
第一種方式對應(yīng)的就是阻塞的 I/O 處理方式,進(jìn)程在進(jìn)行 I/O 操作的時候,進(jìn)入睡眠,如果有 I/O 時間到達(dá),就喚醒這個進(jìn)程。第二種方式對應(yīng)的是非阻塞輪詢的方式,進(jìn)程在進(jìn)行 I/O 操作后,每隔一段時間向內(nèi)核詢問是否有 I/O 事件到達(dá),如果有就立刻處理。
線程池OR輪詢
在現(xiàn)實中,我們當(dāng)然選擇第一種方式,但是在計算機(jī)中,情況就要復(fù)雜一些。我們知道,在 linux 中,不管是線程還是進(jìn)程都會占用一定的資源,也就是說,系統(tǒng)總的線程和進(jìn)程數(shù)是一定的。如果有許多的線程或者進(jìn)程被掛起,無疑是白白消耗了系統(tǒng)的資源。而且,線程或者進(jìn)程的切換也是需要一定的成本的,需要上下文切換,如果頻繁的進(jìn)行上下文切換,系統(tǒng)會損失很大的性能。一個網(wǎng)絡(luò)服務(wù)器經(jīng)常需要連接成千上萬個客戶端,而它能創(chuàng)建的線程可能之后幾百個,線程耗光就不能對外提供服務(wù)了。這些都是我們在選擇 I/O 機(jī)制的時候需要考慮的。這種阻塞的 I/O 模式下,一個線程只能處理一個流的 I/O 事件,這是問題的根源。
這個時候我們首先想到的是采用線程池的方式限制同時訪問的線程數(shù),這樣就能夠解決線程不足的問題了。但是這又會有第二個問題了,多余的任務(wù)會通過隊列的方式存儲在內(nèi)存只能夠,這樣很容易在客戶端過多的情況下出現(xiàn)內(nèi)存不足的情況。
還有一種方式是采用輪詢的方式,我們只要不停的把所有流從頭到尾問一遍,又從頭開始。這樣就可以處理多個流了。
代理
采用輪詢的方式雖然能夠處理多個 I/O 事件,但是也有一個明顯的缺點,那就是會導(dǎo)致 CPU 空轉(zhuǎn)。試想一下,如果所有的流中都沒有數(shù)據(jù),那么 CPU 時間就被白白的浪費了。
為了避免CPU空轉(zhuǎn),可以引進(jìn)了一個代理。這個代理比較厲害,可以同時觀察許多流的I/O事件,在空閑的時候,會把當(dāng)前線程阻塞掉,當(dāng)有一個或多個流有I/O事件時,就從阻塞態(tài)中醒來,于是我們的程序就會輪詢一遍所有的流。這就是 select 與 poll 所做的事情,可見,采用 I/O 復(fù)用極大的提高了系統(tǒng)的效率。
(2)核心原理大揭秘
紅黑樹 —— 精準(zhǔn)管理的 “魔法樹”
紅黑樹在 epoll 里可是扮演著 “大管家” 的關(guān)鍵角色,它專門負(fù)責(zé)存儲和管理海量的文件描述符。這棵樹有著獨特的 “魔力”,它是一種自平衡的二叉搜索樹,意味著無論插入、刪除還是查找操作,時間復(fù)雜度都能穩(wěn)穩(wěn)地保持在 O (log n)。
想象一下,在高并發(fā)場景下,每秒有成千上萬個新連接涌入,每個連接對應(yīng)一個文件描述符。要是沒有紅黑樹,查找一個特定的文件描述符就如同大海撈針,效率極其低下。而有了紅黑樹,就好比給每個文件描述符都安排了一個專屬的智能導(dǎo)航。當(dāng)需要添加新連接(即新文件描述符)時,它能快速指引插入位置;要關(guān)閉某個連接刪除對應(yīng)描述符,也能迅速定位并移除,絲毫不亂。舉個例子,在大型在線游戲服務(wù)器里,同時在線玩家眾多,網(wǎng)絡(luò)連接頻繁變動,紅黑樹就能高效管理這些連接,確保游戲運行順暢,玩家操作即時響應(yīng),不會因連接管理混亂而卡頓。
就緒鏈表 —— 即時響應(yīng)的 “情報站”
就緒鏈表就像是 epoll 的 “情報收集站”。當(dāng)某個被監(jiān)聽的文件描述符狀態(tài)發(fā)生變化,比如有數(shù)據(jù)可讀、可寫,內(nèi)核立馬知曉,并通過回調(diào)機(jī)制,閃電般地將這個就緒的文件描述符添加到就緒鏈表中。這個鏈表通常是用雙向鏈表實現(xiàn),插入和刪除操作那叫一個快,時間復(fù)雜度僅 O (1)。
打個比方,這就好比快遞驛站收到了你的包裹(數(shù)據(jù)就緒),立馬把你的取件碼(文件描述符)放到一個專門的 “待取貨架”(就緒鏈表)上,你一來就能快速拿到包裹。對于應(yīng)用程序而言,當(dāng)調(diào)用 epoll_wait 時,根本不用費時費力去遍歷所有文件描述符,直接到這個 “情報站”—— 就緒鏈表瞅一眼,就能瞬間獲取所有已就緒的文件描述符,第一時間進(jìn)行數(shù)據(jù)讀寫操作,大大提升了響應(yīng)速度,讓數(shù)據(jù)處理快如閃電。
mmap—— 高效傳輸?shù)?“隱形橋梁”
mmap 堪稱 epoll 實現(xiàn)高效的幕后英雄,它搭建起了內(nèi)核空間與用戶空間的 “隱形橋梁”—— 共享內(nèi)存。在傳統(tǒng)的 I/O 操作里,數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到用戶緩沖區(qū),這個過程就像搬運工來回搬貨,費時費力,還增加系統(tǒng)開銷。
但有了 mmap 就不一樣了,它直接讓內(nèi)核空間和用戶空間共享同一塊內(nèi)存區(qū)域,數(shù)據(jù)來了,雙方都能直接訪問,減少了數(shù)據(jù)拷貝次數(shù)。這就好比圖書館有個公共書架,管理員(內(nèi)核)和讀者(用戶程序)都能直接在上面取放書籍(數(shù)據(jù)),無需來回搬運。像是視頻流處理應(yīng)用,大量視頻數(shù)據(jù)頻繁傳輸,mmap 使得數(shù)據(jù)能快速從內(nèi)核流向用戶空間,減少傳輸延遲,讓視頻播放流暢無卡頓,極大提升了系統(tǒng)整體性能。
2.4 epoll優(yōu)缺點
select 與 poll 的缺陷
上文中我們發(fā)現(xiàn),實現(xiàn)一個代理來幫助我們處理 I/O 時間能夠極大的提高工作效率,select 與 poll 就是這樣的代理。但是它們也不是完美的,從上文中我們可以發(fā)現(xiàn),我們能夠從 select 中知道是只是有 I/O 事件發(fā)生了。但是我們不知道那一個事件發(fā)生,每一個 I/O 事件發(fā)生的時候,都需要輪詢所有的流,這樣的時間復(fù)雜度 O(N)。但是很多情況下,發(fā)生 I/O 時間的只是少數(shù)的幾個。通過輪詢所有的找出少數(shù)的幾個發(fā)生 I/O 的流顯然效率非常低下,因此 select 和 epoll 通常只能處理幾千個并發(fā)連接。
epoll 的優(yōu)勢
select的缺點之一就是在網(wǎng)絡(luò)IO流到來的時候,線程會輪詢監(jiān)控文件數(shù)組,并且是線性掃描,還有最大值的限制。相比select,epoll則無需如此。服務(wù)器主線程創(chuàng)建了epoll對象,并且注冊socket和文件事件即可。當(dāng)數(shù)據(jù)抵達(dá)的時候,也就是對于事件發(fā)生,則會調(diào)用此前注冊的那個io文件。
先看一個python的epoll例子,采用了網(wǎng)絡(luò)上一段著名的code:
import socket
import select
EOL1 = b'\n\n'
EOL2 = b'\n\r\n'
response = b'HTTP/1.0 200 OK\r\nDate: Mon, 1 Jan 1996 01:01:01 GMT\r\n'
response += b'Content-Type: text/plain\r\nContent-Length: 13\r\n\r\n'
response += b'Hello, world!'
# 創(chuàng)建套接字對象并綁定監(jiān)聽端口
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
serversocket.bind(('0.0.0.0', 8080))
serversocket.listen(1)
serversocket.setblocking(0)
# 創(chuàng)建epoll對象,并注冊socket對象的 epoll可讀事件
epoll = select.epoll()
epoll.register(serversocket.fileno(), select.EPOLLIN)
try:
connections = {}
requests = {}
responses = {}
while True:
# 主循環(huán),epoll的系統(tǒng)調(diào)用,一旦有網(wǎng)絡(luò)IO事件發(fā)生,poll調(diào)用返回。這是和select系統(tǒng)調(diào)用的關(guān)鍵區(qū)別
events = epoll.poll(1)
# 通過事件通知獲得監(jiān)聽的文件描述符,進(jìn)而處理
for fileno, event in events:
# 注冊監(jiān)聽的socket對象可讀,獲取連接,并注冊連接的可讀事件
if fileno == serversocket.fileno():
connection, address = serversocket.accept()
connection.setblocking(0)
epoll.register(connection.fileno(), select.EPOLLIN)
connections[connection.fileno()] = connection
requests[connection.fileno()] = b''
responses[connection.fileno()] = response
elif event & select.EPOLLIN:
# 連接對象可讀,處理客戶端發(fā)生的信息,并注冊連接對象可寫
requests[fileno] += connections[fileno].recv(1024)
if EOL1 in requests[fileno] or EOL2 in requests[fileno]:
epoll.modify(fileno, select.EPOLLOUT)
print('-' * 40 + '\n' + requests[fileno].decode()[:-2])
elif event & select.EPOLLOUT:
# 連接對象可寫事件發(fā)生,發(fā)送數(shù)據(jù)到客戶端
byteswritten = connections[fileno].send(responses[fileno])
responses[fileno] = responses[fileno][byteswritten:]
if len(responses[fileno]) == 0:
epoll.modify(fileno, 0)
connections[fileno].shutdown(socket.SHUT_RDWR)
elif event & select.EPOLLHUP:
epoll.unregister(fileno)
connections[fileno].close()
del connections[fileno]
finally:
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()
可見epoll使用也很簡單,并沒有過多復(fù)雜的邏輯,當(dāng)然主要是在系統(tǒng)層面封裝的好。至于Epoll的原理,也不是三言兩語可以解釋清楚,作為開發(fā)者,先學(xué)會如何使用API。
epoll與tornado
既然epoll是一種高性能的網(wǎng)絡(luò)io模型,很多web框架也采取epoll模型。大名鼎鼎tornado是python框架中一個高性能的異步框架,其底層也是來者epoll的IO模型。
當(dāng)然,tornado是跨平臺的,因此他的網(wǎng)絡(luò)io,在linux下是epoll,unix下則是kqueue。幸好tornado都做了封裝,對于開發(fā)者及其友好,下面看一個tornado寫的回顯例子。
import errno
import functools
import tornado.ioloop
import socket
def handle_connection(connection, address):
""" 處理請求,返回數(shù)據(jù)給客戶端 """
data = connection.recv(2014)
print data
connection.send(data)
def connection_ready(sock, fd, events):
""" 事件回調(diào)函數(shù),主要用于socket可讀事件,用于獲取socket的鏈接 """
while True:
try:
connection, address = sock.accept()
except socket.error as e:
if e.args[0] not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
return
connection.setblocking(0)
handle_connection(connection, address)
if __name__ == '__main__':
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setblocking(0)
sock.bind(("", 5000))
sock.listen(128)
# 使用tornado封裝好的epoll接口,即IOLoop對象
io_loop = tornado.ioloop.IOLoop.current()
callback = functools.partial(connection_ready, sock)
# io_loop對象注冊網(wǎng)絡(luò)io文件描述符和回調(diào)函數(shù)與io事件的綁定
io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
io_loop.start()
上面的代碼來者tornado的模塊IOLoop源碼的文檔,很簡明的介紹了在tornado中如何使用網(wǎng)絡(luò)IO。當(dāng)然具體的封裝實現(xiàn),可以參考tornado源碼獲知,在此不做介紹了。
說了這么多,總算引出了我們的主人公 epoll 了。不同于忙輪詢和無差別輪詢,epoll 會把哪個流發(fā)生了怎樣的 I/O 事件通知我們。此時我們對這些流的操作都是有意義的。(復(fù)雜度降低到了O(k),k為產(chǎn)生 I/O 事件的流的個數(shù)。
三、epoll實現(xiàn)原理
3.1 epoll 操作
epoll 在 linux 內(nèi)核中申請了一個簡易的文件系統(tǒng),把原先的一個 select 或者 poll 調(diào)用分為了三個部分:調(diào)用 epoll_create 建立一個 epoll 對象(在 epoll 文件系統(tǒng)中給這個句柄分配資源)、調(diào)用 epoll_ctl 向 epoll 對象中添加連接的套接字、調(diào)用 epoll_wait 收集發(fā)生事件的連接。
這樣只需要在進(jìn)程啟動的時候建立一個 epoll 對象,并在需要的時候向它添加或者刪除連接就可以了,因此,在實際收集的時候,epoll_wait 的效率會非常高,因為調(diào)用的時候只是傳遞了發(fā)生 IO 事件的連接。
3.2 epoll 實現(xiàn)
我們以 linux 內(nèi)核 2.6 為例,說明一下 epoll 是如何高效的處理事件的。當(dāng)某一個進(jìn)程調(diào)用 epoll_create 方法的時候,Linux 內(nèi)核會創(chuàng)建一個 eventpoll 結(jié)構(gòu)體,這個結(jié)構(gòu)體中有兩個重要的成員。
- 第一個是 rb_root rbr,這是紅黑樹的根節(jié)點,存儲著所有添加到 epoll 中的事件,也就是這個 epoll 監(jiān)控的事件。
- 第二個是 list_head rdllist 這是一個雙向鏈表,保存著將要通過 epoll_wait 返回給用戶的、滿足條件的事件。
每一個 epoll 對象都有一個獨立的 eventpoll 結(jié)構(gòu)體,這個結(jié)構(gòu)體會在內(nèi)核空間中創(chuàng)造獨立的內(nèi)存,用于存儲使用 epoll_ctl 方法向 epoll 對象中添加進(jìn)來的事件。這些事件都會掛到 rbr 紅黑樹中,這樣就能夠高效的識別重復(fù)添加的節(jié)點。
所有添加到 epoll 中的事件都會與設(shè)備(如網(wǎng)卡等)驅(qū)動程序建立回調(diào)關(guān)系,也就是說,相應(yīng)的事件發(fā)生時會調(diào)用這里的方法。這個回調(diào)方法在內(nèi)核中叫做 ep_poll_callback,它把這樣的事件放到 rdllist 雙向鏈表中。在 epoll 中,對于每一個事件都會建立一個 epitem 結(jié)構(gòu)體。
當(dāng)調(diào)用 epoll_wait 檢查是否有發(fā)生事件的連接時,只需要檢查 eventpoll 對象中的 rdllist 雙向鏈表中是否有 epitem 元素,如果 rdllist 鏈表不為空,則把這里的事件復(fù)制到用戶態(tài)內(nèi)存中的同時,將事件數(shù)量返回給用戶。通過這種方法,epoll_wait 的效率非常高。epoll-ctl 在向 epoll 對象中添加、修改、刪除事件時,從 rbr 紅黑樹中查找事件也非???。這樣,epoll 就能夠輕易的處理百萬級的并發(fā)連接。
⑴pollable
首先,linux 的 file 有個 pollable 的概念,只有 pollable 的 file 才可以加入到 epoll 和 select 中。一個 file 是 pollable 的當(dāng)且僅當(dāng)其定義了 file->f_op->poll。file->f_op->poll 的形式如下:
__poll_t poll(struct file *fp, poll_table *wait)
不同類型的 file 實現(xiàn)不同,但做的事情都差不多:
- 通過 fp 拿到其對應(yīng)的 waitqueue
- 通過 wait 拿到外部設(shè)置的 callback[[1]]
- 執(zhí)行 callback(fp, waitqueue, wait),在 callback 中會將另外一個 callback2[[2]] 注冊到 waitqueue[[3]]中,此后 fp 有觸發(fā)事件就會調(diào)用 callback2
waitqueue 是事件驅(qū)動的,與驅(qū)動程序密切相關(guān),簡單來說 poll 函數(shù)在 file 的觸發(fā)隊列中注冊了個 callback, 有事件發(fā)生時就調(diào)用callback。感興趣可以根據(jù)文后 [[4]] 的提示看看 socket 的 poll 實現(xiàn)
了解了 pollable 我們看看 epoll 的三個系統(tǒng)調(diào)用 epoll_create,,epoll_ctl,,epoll_wait:
①epoll_create:開啟 epoll 之門
epoll_create 宛如一把神奇的鑰匙,用來開啟 epoll 的大門。它的使命是創(chuàng)建一個 epoll 實例,函數(shù)原型為 “int epoll_create (int size);”,這里的 size 參數(shù)在早期 Linux 內(nèi)核版本里,像是給內(nèi)核的一個 “小提示”,暗示預(yù)計要監(jiān)聽的文件描述符數(shù)量,好讓內(nèi)核提前規(guī)劃資源。不過在后續(xù)版本,特別是 Linux 2.6.8 及以后,內(nèi)核變得更加智能,這個參數(shù)就沒那么關(guān)鍵了,只要傳入大于 0 的值就行,通常我們就簡單傳個 1。函數(shù)執(zhí)行成功,會返回一個文件描述符(fd),這可是后續(xù)操作 epoll 實例的關(guān)鍵 “入口”,有了它,才能進(jìn)行添加、刪除監(jiān)聽事件等一系列操作,要是創(chuàng)建失敗,就會返回 -1,同時 errno 會被設(shè)置成相應(yīng)錯誤碼,告訴你問題出在哪,就像出門找不到鑰匙,后續(xù)啥事都干不了,所以得小心檢查錯誤。
②epoll_ctl:掌控監(jiān)控大權(quán)
epoll_ctl 則是掌控監(jiān)控大權(quán)的 “指揮官”,負(fù)責(zé)向 epoll 實例添加、修改或刪除文件描述符,原型是 “int epoll_ctl (int epfd, int op, int fd, struct epoll_event *event);”。其中 epfd 就是前面 epoll_create 返回的 epoll 實例文件描述符,op 指明操作類型,有三個 “指令” 可選:EPOLL_CTL_ADD 如同招募新兵,把新的 fd 及其關(guān)注事件注冊到 epoll 實例中;EPOLL_CTL_MOD 類似給士兵換崗,修改已注冊 fd 的監(jiān)聽事件;EPOLL_CTL_DEL 則是讓士兵退役,從 epoll 實例里移除指定 fd。fd 就是要操作的文件描述符,而 event 是個關(guān)鍵的結(jié)構(gòu)體指針,結(jié)構(gòu)體里的 events 成員能指定要監(jiān)聽的事件類型,像 EPOLLIN(可讀)、EPOLLOUT(可寫)、EPOLLERR(出錯)等,data 成員可以存放些自定義數(shù)據(jù),方便識別 fd,比如存?zhèn)€指向結(jié)構(gòu)體的指針,里面包含 fd 相關(guān)的業(yè)務(wù)信息。操作成功返回 0,失敗返回 -1,還會設(shè)置 errno,讓你知曉 “指揮” 哪里出了岔子。
epoll_ctl 的主要操作在 ep_insert, 它做了以下事情:
- 初始化一個 epitem,里面包含 fd,監(jiān)聽的事件,就緒鏈表,關(guān)聯(lián)的 epoll_fd 等信息
- 調(diào)用 ep_item_poll(epitem, ep_ptable_queue_proc[[1]])。ep_item_poll 會調(diào)用 vfs_poll, vfs_poll 會調(diào)用上面說的 file->f_op->poll 將 ep_poll_callback[[2]] 注冊到 waitqueue
- 調(diào)用 ep_rbtree_insert(eventpoll, epitem) 將 epitem 插入 evenpoll 對象的紅黑樹,方便后續(xù)查找
ep_poll_callback
在了解 epoll_wait 之前我們還需要知道 ep_poll_callback 做了哪些操作
- ep_poll_callback 被調(diào)用,說明 epoll 中某個 file 有了新事件
- eventpoll 對象有一個 rdllist 字段,用鏈表存著當(dāng)前就緒的所有 epitem
- ep_poll_callback 被調(diào)用的時候?qū)?file 對應(yīng)的 epitem 加到 rdllist 里(不重復(fù))
- 如果當(dāng)前用戶正在 epoll_wait 阻塞狀態(tài) ep_poll_callback 還會通過 wake_up_locked 將 epoll_wait 喚醒
③epoll_wait:等待事件降臨
epoll_wait 就像個耐心的 “守望者”,阻塞等待文件描述符上的事件發(fā)生。函數(shù)定義為 “int epoll_wait (int epfd, struct epoll_event *events, int maxevents, int timeout);”,epfd 還是那個熟悉的 epoll 實例文件描述符,events 是個傳出參數(shù),是個數(shù)組,用來存放就緒事件的詳細(xì)信息,就像個 “收件箱”,內(nèi)核把發(fā)生的事件詳情投遞進(jìn)來。maxevents 規(guī)定了這個 “收件箱” 的最大容量,也就是最多能接收多少個就緒事件,要注意不能超過創(chuàng)建 epoll 實例時的 size 值。timeout 是超時時間,單位毫秒,-1 表示永遠(yuǎn)等待,直到有事件發(fā)生;0 則是急性子,立馬返回,不管有沒有事件;正數(shù)就設(shè)定個等待期限。當(dāng)有事件就緒或者超時,函數(shù)就會返回就緒事件的數(shù)量,如果返回 -1,那就是遇到錯誤,errno 會記錄錯誤原因,等待結(jié)束后,就能從 events 數(shù)組里依次取出就緒事件,按業(yè)務(wù)需求處理,開啟數(shù)據(jù)的讀寫之旅。epoll_wait 主要做了以下操作:
- 檢查 rdllist,如果不為空則去到 7,如果為空則去到 2
- 設(shè)置 timeout
- 開始無限循環(huán)
- 設(shè)置線程狀態(tài)為 TASK_INTERRUPTIBLE [參看 Sleeping in the Kernal](Kernel Korner - Sleeping in the Kernel)
- 檢查 rdllist 如果不為空去到 7, 否則去到 6
- 調(diào)用 schedule_hrtimeout_range 睡到 timeout,中途有可能被 ep_poll_callback 喚醒回到 4,如果真的 timeout 則 break 去到 7
- 設(shè)置線程狀態(tài)為 TASK_RUNNING,rdllist如果不為空時退出循環(huán),否則繼續(xù)循環(huán)
- 調(diào)用 ep_send_events 將 rdllist 返回給用戶態(tài)
epoll 的原理基本上就這些,還有很多細(xì)節(jié)如紅黑樹在哪里用,怎樣實現(xiàn) level-triggered 和 edge-triggered... 我還沒看。
PS. 普通文件不是 pollable 的,詳情請看 epoll_does_not_work_with_file
3.3 epoll 工作模式
①水平觸發(fā)(LT):持續(xù)通知的 “貼心管家”
水平觸發(fā)(LT)可是 epoll 的默認(rèn)工作模式,就像一位貼心管家,時刻關(guān)注著文件描述符的狀態(tài)。當(dāng)某個文件描述符處于就緒狀態(tài),比如有數(shù)據(jù)可讀或者可寫,內(nèi)核就會通知應(yīng)用程序。要是應(yīng)用程序這次沒處理完數(shù)據(jù),或者沒來得及處理,別擔(dān)心,下次調(diào)用 epoll_wait 時,內(nèi)核依舊會不厭其煩地再次通知,直到數(shù)據(jù)被處理完或者緩沖區(qū)里沒數(shù)據(jù)可讀、可寫了為止。
舉個例子,在處理 HTTP 報文時,數(shù)據(jù)可能是一段段陸續(xù)到達(dá)的。使用 LT 模式,只要緩沖區(qū)還有沒讀完的報文片段,每次 epoll_wait 都會把對應(yīng)的文件描述符事件返回,讓應(yīng)用程序可以分次從容地解析報文,不用擔(dān)心錯過任何數(shù)據(jù),大大降低了編程復(fù)雜度,對新手程序員那是相當(dāng)友好,就像有個老師在旁邊,不停提醒你還有作業(yè)沒做完呢。
②邊緣觸發(fā)(ET):高效靈敏的 “情報員”
邊緣觸發(fā)(ET)模式則像一位高效靈敏的情報員,奉行 “只報新事” 原則。只有在文件描述符的狀態(tài)發(fā)生改變時,比如從無數(shù)據(jù)變?yōu)橛袛?shù)據(jù)可讀,或者從不可寫變?yōu)榭蓪?,?nèi)核才會觸發(fā)事件通知應(yīng)用程序。一旦通知了,它就默認(rèn)你知曉此事,后續(xù)除非狀態(tài)再次改變,否則不會重復(fù)通知。這意味著應(yīng)用程序得打起十二分精神,在收到通知后,必須立刻、馬上處理數(shù)據(jù),而且要盡可能把當(dāng)前就緒的數(shù)據(jù)一次性處理完。
比如說讀取大型文件,使用 ET 模式,一旦檢測到文件描述符可讀,就得趕緊用 while 循環(huán)一股腦把數(shù)據(jù)全讀完,不然下次 epoll_wait 可不會再提醒你還有剩余數(shù)據(jù)。要是讀數(shù)據(jù)時遇到 EAGAIN 或 EWOULDBLOCK 錯誤,那就說明這次數(shù)據(jù)真讀完了。這種模式雖然編程難度稍高,需要精細(xì)處理數(shù)據(jù),但減少了不必要的喚醒次數(shù),系統(tǒng)開銷小,在追求極致性能的場景下,那可是 “利器”,能讓數(shù)據(jù)如閃電般高效流轉(zhuǎn)。
當(dāng)然,在 LT 模式下開發(fā)基于 epoll 的應(yīng)用要簡單一些,不太容易出錯,而在 ET 模式下事件發(fā)生時,如果沒有徹底地將緩沖區(qū)的數(shù)據(jù)處理完,則會導(dǎo)致緩沖區(qū)的用戶請求得不到響應(yīng)。注意,默認(rèn)情況下 Nginx 采用 ET 模式使用 epoll 的。
四、epoll內(nèi)核源碼詳解
網(wǎng)上很多博客說epoll使用了共享內(nèi)存,這個是完全錯誤的 ,可以閱讀源碼,會發(fā)現(xiàn)完全沒有使用共享內(nèi)存的任何api,而是 使用了copy_from_user跟__put_user進(jìn)行內(nèi)核跟用戶虛擬空間數(shù)據(jù)交互。
/*
* fs/eventpoll.c (Efficient event retrieval implementation)
* Copyright (C) 2001,...,2009 Davide Libenzi
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* Davide Libenzi <davidel@xmailserver.org>
*
*/
/*
* 在深入了解epoll的實現(xiàn)之前, 先來了解內(nèi)核的3個方面.
* 1. 等待隊列 waitqueue
* 我們簡單解釋一下等待隊列:
* 隊列頭(wait_queue_head_t)往往是資源生產(chǎn)者,
* 隊列成員(wait_queue_t)往往是資源消費者,
* 當(dāng)頭的資源ready后, 會逐個執(zhí)行每個成員指定的回調(diào)函數(shù),
* 來通知它們資源已經(jīng)ready了, 等待隊列大致就這個意思.
* 2. 內(nèi)核的poll機(jī)制
* 被Poll的fd, 必須在實現(xiàn)上支持內(nèi)核的Poll技術(shù),
* 比如fd是某個字符設(shè)備,或者是個socket, 它必須實現(xiàn)
* file_operations中的poll操作, 給自己分配有一個等待隊列頭.
* 主動poll fd的某個進(jìn)程必須分配一個等待隊列成員, 添加到
* fd的對待隊列里面去, 并指定資源ready時的回調(diào)函數(shù).
* 用socket做例子, 它必須有實現(xiàn)一個poll操作, 這個Poll是
* 發(fā)起輪詢的代碼必須主動調(diào)用的, 該函數(shù)中必須調(diào)用poll_wait(),
* poll_wait會將發(fā)起者作為等待隊列成員加入到socket的等待隊列中去.
* 這樣socket發(fā)生狀態(tài)變化時可以通過隊列頭逐個通知所有關(guān)心它的進(jìn)程.
* 這一點必須很清楚的理解, 否則會想不明白epoll是如何
* 得知fd的狀態(tài)發(fā)生變化的.
* 3. epollfd本身也是個fd, 所以它本身也可以被epoll,
* 可以猜測一下它是不是可以無限嵌套epoll下去...
*
* epoll基本上就是使用了上面的1,2點來完成.
* 可見epoll本身并沒有給內(nèi)核引入什么特別復(fù)雜或者高深的技術(shù),
* 只不過是已有功能的重新組合, 達(dá)到了超過select的效果.
*/
/*
* 相關(guān)的其它內(nèi)核知識:
* 1. fd我們知道是文件描述符, 在內(nèi)核態(tài), 與之對應(yīng)的是struct file結(jié)構(gòu),
* 可以看作是內(nèi)核態(tài)的文件描述符.
* 2. spinlock, 自旋鎖, 必須要非常小心使用的鎖,
* 尤其是調(diào)用spin_lock_irqsave()的時候, 中斷關(guān)閉, 不會發(fā)生進(jìn)程調(diào)度,
* 被保護(hù)的資源其它CPU也無法訪問. 這個鎖是很強(qiáng)力的, 所以只能鎖一些
* 非常輕量級的操作.
* 3. 引用計數(shù)在內(nèi)核中是非常重要的概念,
* 內(nèi)核代碼里面經(jīng)常有些release, free釋放資源的函數(shù)幾乎不加任何鎖,
* 這是因為這些函數(shù)往往是在對象的引用計數(shù)變成0時被調(diào)用,
* 既然沒有進(jìn)程在使用在這些對象, 自然也不需要加鎖.
* struct file 是持有引用計數(shù)的.
*/
/* --- epoll相關(guān)的數(shù)據(jù)結(jié)構(gòu) --- */
/*
* This structure is stored inside the "private_data" member of the file
* structure and rapresent the main data sructure for the eventpoll
* interface.
*/
/* 每創(chuàng)建一個epollfd, 內(nèi)核就會分配一個eventpoll與之對應(yīng), 可以說是
* 內(nèi)核態(tài)的epollfd. */
struct eventpoll {
/* Protect the this structure access */
spinlock_t lock;
/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
/* 添加, 修改或者刪除監(jiān)聽fd的時候, 以及epoll_wait返回, 向用戶空間
* 傳遞數(shù)據(jù)時都會持有這個互斥鎖, 所以在用戶空間可以放心的在多個線程
* 中同時執(zhí)行epoll相關(guān)的操作, 內(nèi)核級已經(jīng)做了保護(hù). */
struct mutex mtx;
/* Wait queue used by sys_epoll_wait() */
/* 調(diào)用epoll_wait()時, 我們就是"睡"在了這個等待隊列上... */
wait_queue_head_t wq;
/* Wait queue used by file->poll() */
/* 這個用于epollfd本事被poll的時候... */
wait_queue_head_t poll_wait;
/* List of ready file descriptors */
/* 所有已經(jīng)ready的epitem都在這個鏈表里面 */
struct list_head rdllist;
/* RB tree root used to store monitored fd structs */
/* 所有要監(jiān)聽的epitem都在這里 */
struct rb_root rbr;
/*
這是一個單鏈表鏈接著所有的struct epitem當(dāng)event轉(zhuǎn)移到用戶空間時
*/
* This is a single linked list that chains all the "struct epitem" that
* happened while transfering ready events to userspace w/out
* holding ->lock.
*/
struct epitem *ovflist;
/* The user that created the eventpoll descriptor */
/* 這里保存了一些用戶變量, 比如fd監(jiān)聽數(shù)量的最大值等等 */
struct user_struct *user;
};
/*
* Each file descriptor added to the eventpoll interface will
* have an entry of this type linked to the "rbr" RB tree.
*/
/* epitem 表示一個被監(jiān)聽的fd */
struct epitem {
/* RB tree node used to link this structure to the eventpoll RB tree */
/* rb_node, 當(dāng)使用epoll_ctl()將一批fds加入到某個epollfd時, 內(nèi)核會分配
* 一批的epitem與fds們對應(yīng), 而且它們以rb_tree的形式組織起來, tree的root
* 保存在epollfd, 也就是struct eventpoll中.
* 在這里使用rb_tree的原因我認(rèn)為是提高查找,插入以及刪除的速度.
* rb_tree對以上3個操作都具有O(lgN)的時間復(fù)雜度 */
struct rb_node rbn;
/* List header used to link this structure to the eventpoll ready list */
/* 鏈表節(jié)點, 所有已經(jīng)ready的epitem都會被鏈到eventpoll的rdllist中 */
struct list_head rdllink;
/*
* Works together "struct eventpoll"->ovflist in keeping the
* single linked chain of items.
*/
/* 這個在代碼中再解釋... */
struct epitem *next;
/* The file descriptor information this item refers to */
/* epitem對應(yīng)的fd和struct file */
struct epoll_filefd ffd;
/* Number of active wait queue attached to poll operations */
int nwait;
/* List containing poll wait queues */
struct list_head pwqlist;
/* The "container" of this item */
/* 當(dāng)前epitem屬于哪個eventpoll */
struct eventpoll *ep;
/* List header used to link this item to the "struct file" items list */
struct list_head fllink;
/* The structure that describe the interested events and the source fd */
/* 當(dāng)前的epitem關(guān)系哪些events, 這個數(shù)據(jù)是調(diào)用epoll_ctl時從用戶態(tài)傳遞過來 */
struct epoll_event event;
};
struct epoll_filefd {
struct file *file;
int fd;
};
/* poll所用到的鉤子Wait structure used by the poll hooks */
struct eppoll_entry {
/* List header used to link this structure to the "struct epitem" */
struct list_head llink;
/* The "base" pointer is set to the container "struct epitem" */
struct epitem *base;
/*
* Wait queue item that will be linked to the target file wait
* queue head.
*/
wait_queue_t wait;
/* The wait queue head that linked the "wait" wait queue item */
wait_queue_head_t *whead;
};
/* Wrapper struct used by poll queueing */
struct ep_pqueue {
poll_table pt;
struct epitem *epi;
};
/* Used by the ep_send_events() function as callback private data */
struct ep_send_events_data {
int maxevents;
struct epoll_event __user *events;
};
/* --- 代碼注釋 --- */
/* 你沒看錯, 這就是epoll_create()的真身, 基本啥也不干直接調(diào)用epoll_create1了,
* 另外你也可以發(fā)現(xiàn), size這個參數(shù)其實是沒有任何用處的... */
SYSCALL_DEFINE1(epoll_create, int, size)
{
if (size <= 0)
return -EINVAL;
return sys_epoll_create1(0);
}
/* 這才是真正的epoll_create啊~~ */
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
int error;
struct eventpoll *ep = NULL;//主描述符
/* Check the EPOLL_* constant for consistency. */
/* 這句沒啥用處... */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
/* 對于epoll來講, 目前唯一有效的FLAG就是CLOEXEC */
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
/* 分配一個struct eventpoll, 分配和初始化細(xì)節(jié)我們隨后深聊~ */
error = ep_alloc(&ep);
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
/* 這里是創(chuàng)建一個匿名fd, 說起來就話長了...長話短說:
* epollfd本身并不存在一個真正的文件與之對應(yīng), 所以內(nèi)核需要創(chuàng)建一個
* "虛擬"的文件, 并為之分配真正的struct file結(jié)構(gòu), 而且有真正的fd.
* 這里2個參數(shù)比較關(guān)鍵:
* eventpoll_fops, fops就是file operations, 就是當(dāng)你對這個文件(這里是虛擬的)進(jìn)行操作(比如讀)時,
* fops里面的函數(shù)指針指向真正的操作實現(xiàn), 類似C++里面虛函數(shù)和子類的概念.
* epoll只實現(xiàn)了poll和release(就是close)操作, 其它文件系統(tǒng)操作都有VFS全權(quán)處理了.
* ep, ep就是struct epollevent, 它會作為一個私有數(shù)據(jù)保存在struct file的private指針里面.
* 其實說白了, 就是為了能通過fd找到struct file, 通過struct file能找到eventpoll結(jié)構(gòu).
* 如果懂一點Linux下字符設(shè)備驅(qū)動開發(fā), 這里應(yīng)該是很好理解的,
* 推薦閱讀 <Linux device driver 3rd>
*/
error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (error < 0)
ep_free(ep);
return error;
}
/*
* 創(chuàng)建好epollfd后, 接下來我們要往里面添加fd咯
* 來看epoll_ctl
* epfd 就是epollfd
* op ADD,MOD,DEL
* fd 需要監(jiān)聽的描述符
* event 我們關(guān)心的events
*/
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
struct epoll_event __user *, event)
{
int error;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
error = -EFAULT;
/*
* 錯誤處理以及從用戶空間將epoll_event結(jié)構(gòu)copy到內(nèi)核空間.
*/
if (ep_op_has_event(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
goto error_return;
/* Get the "struct file *" for the eventpoll file */
/* 取得struct file結(jié)構(gòu), epfd既然是真正的fd, 那么內(nèi)核空間
* 就會有與之對于的一個struct file結(jié)構(gòu)
* 這個結(jié)構(gòu)在epoll_create1()中, 由函數(shù)anon_inode_getfd()分配 */
error = -EBADF;
file = fget(epfd);
if (!file)
goto error_return;
/* Get the "struct file *" for the target file */
/* 我們需要監(jiān)聽的fd, 它當(dāng)然也有個struct file結(jié)構(gòu), 上下2個不要搞混了哦 */
tfile = fget(fd);
if (!tfile)
goto error_fput;
/* The target file descriptor must support poll */
error = -EPERM;
/* 如果監(jiān)聽的文件不支持poll, 那就沒轍了.
* 你知道什么情況下, 文件會不支持poll嗎?
*/
if (!tfile->f_op || !tfile->f_op->poll)
goto error_tgt_fput;
/*
* We have to check that the file structure underneath the file descriptor
* the user passed to us _is_ an eventpoll file. And also we do not permit
* adding an epoll file descriptor inside itself.
*/
error = -EINVAL;
/* epoll不能自己監(jiān)聽自己... */
if (file == tfile || !is_file_epoll(file))
goto error_tgt_fput;
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
/* 取到我們的eventpoll結(jié)構(gòu), 來自與epoll_create1()中的分配 */
ep = file->private_data;
/* 接下來的操作有可能修改數(shù)據(jù)結(jié)構(gòu)內(nèi)容, 鎖之~ */
mutex_lock(&ep->mtx);
/*
* Try to lookup the file inside our RB tree, Since we grabbed "mtx"
* above, we can be sure to be able to use the item looked up by
* ep_find() till we release the mutex.
*/
/* 對于每一個監(jiān)聽的fd, 內(nèi)核都有分配一個epitem結(jié)構(gòu),
* 而且我們也知道, epoll是不允許重復(fù)添加fd的,
* 所以我們首先查找該fd是不是已經(jīng)存在了.
* ep_find()其實就是RBTREE查找, 跟C++STL的map差不多一回事, O(lgn)的時間復(fù)雜度.
*/
epi = ep_find(ep, tfile, fd);
error = -EINVAL;
switch (op) {
/* 首先我們關(guān)心添加 */
case EPOLL_CTL_ADD:
if (!epi) {
/* 之前的find沒有找到有效的epitem, 證明是第一次插入, 接受!
* 這里我們可以知道, POLLERR和POLLHUP事件內(nèi)核總是會關(guān)心的
* */
epds.events |= POLLERR | POLLHUP;
/* rbtree插入, 詳情見ep_insert()的分析
* 其實我覺得這里有insert的話, 之前的find應(yīng)該
* 是可以省掉的... */
error = ep_insert(ep, &epds, tfile, fd);
} else
/* 找到了!? 重復(fù)添加! */
error = -EEXIST;
break;
/* 刪除和修改操作都比較簡單 */
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else
error = -ENOENT;
break;
}
mutex_unlock(&ep->mtx);
error_tgt_fput:
fput(tfile);
error_fput:
fput(file);
error_return:
return error;
}
/* 分配一個eventpoll結(jié)構(gòu) */
static int ep_alloc(struct eventpoll **pep)
{
int error;
struct user_struct *user;
struct eventpoll *ep;
/* 獲取當(dāng)前用戶的一些信息, 比如是不是root啦, 最大監(jiān)聽fd數(shù)目啦 */
user = get_current_user();
error = -ENOMEM;
ep = kzalloc(sizeof(*ep), GFP_KERNEL);
if (unlikely(!ep))
goto free_uid;
/* 這些都是初始化啦 */
spin_lock_init(&ep->lock);
mutex_init(&ep->mtx);
init_waitqueue_head(&ep->wq);//初始化自己睡在的等待隊列
init_waitqueue_head(&ep->poll_wait);//初始化
INIT_LIST_HEAD(&ep->rdllist);//初始化就緒鏈表
ep->rbr = RB_ROOT;
ep->ovflist = EP_UNACTIVE_PTR;
ep->user = user;
*pep = ep;
return 0;
free_uid:
free_uid(user);
return error;
}
/*
* Must be called with "mtx" held.
*/
/*
* ep_insert()在epoll_ctl()中被調(diào)用, 完成往epollfd里面添加一個監(jiān)聽fd的工作
* tfile是fd在內(nèi)核態(tài)的struct file結(jié)構(gòu)
*/
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
{
int error, revents, pwake = 0;
unsigned long flags;
struct epitem *epi;
struct ep_pqueue epq;
/* 查看是否達(dá)到當(dāng)前用戶的最大監(jiān)聽數(shù) */
if (unlikely(atomic_read(&ep->user->epoll_watches) >=
max_user_watches))
return -ENOSPC;
/* 從著名的slab中分配一個epitem */
if (!(epi = kmem_***_alloc(epi_***, GFP_KERNEL)))
return -ENOMEM;
/* Item initialization follow here ... */
/* 這些都是相關(guān)成員的初始化... */
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
/* 這里保存了我們需要監(jiān)聽的文件fd和它的file結(jié)構(gòu) */
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
/* 這個指針的初值不是NULL哦... */
epi->next = EP_UNACTIVE_PTR;
/* Initialize the poll table using the queue callback */
/* 好, 我們終于要進(jìn)入到poll的正題了 */
epq.epi = epi;
/* 初始化一個poll_table
* 其實就是指定調(diào)用poll_wait(注意不是epoll_wait!!!)時的回調(diào)函數(shù),和我們關(guān)心哪些events,
* ep_ptable_queue_proc()就是我們的回調(diào)啦, 初值是所有event都關(guān)心 */
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
/*
* Attach the item to the poll hooks and get current event bits.
* We can safely use the file* here because its usage count has
* been increased by the caller of this function. Note that after
* this operation completes, the poll callback can start hitting
* the new item.
*/
/* 這一部很關(guān)鍵, 也比較難懂, 完全是內(nèi)核的poll機(jī)制導(dǎo)致的...
* 首先, f_op->poll()一般來說只是個wrapper, 它會調(diào)用真正的poll實現(xiàn),
* 拿UDP的socket來舉例, 這里就是這樣的調(diào)用流程: f_op->poll(), sock_poll(),
* udp_poll(), datagram_poll(), sock_poll_wait(), 最后調(diào)用到我們上面指定的
* ep_ptable_queue_proc()這個回調(diào)函數(shù)...(好深的調(diào)用路徑...).
* 完成這一步, 我們的epitem就跟這個socket關(guān)聯(lián)起來了, 當(dāng)它有狀態(tài)變化時,
* 會通過ep_poll_callback()來通知.
* 最后, 這個函數(shù)還會查詢當(dāng)前的fd是不是已經(jīng)有啥event已經(jīng)ready了, 有的話
* 會將event返回. */
revents = tfile->f_op->poll(tfile, &epq.pt);
/*
* We have to check if something went wrong during the poll wait queue
* install process. Namely an allocation for a wait queue failed due
* high memory pressure.
*/
error = -ENOMEM;
if (epi->nwait < 0)
goto error_unregister;
/* Add the current item to the list of active epoll hook for this file */
/* 這個就是每個文件會將所有監(jiān)聽自己的epitem鏈起來 */
spin_lock(&tfile->f_lock);
list_add_tail(&epi->fllink, &tfile->f_ep_links);
spin_unlock(&tfile->f_lock);
/*
* Add the current item to the RB tree. All RB tree operations are
* protected by "mtx", and ep_insert() is called with "mtx" held.
*/
/* 都搞定后, 將epitem插入到對應(yīng)的eventpoll中去 */
ep_rbtree_insert(ep, epi);
/* We have to drop the new item inside our item list to keep track of it */
spin_lock_irqsave(&ep->lock, flags);
/* If the file is already "ready" we drop it inside the ready list */
/* 到達(dá)這里后, 如果我們監(jiān)聽的fd已經(jīng)有事件發(fā)生, 那就要處理一下 */
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
/* 將當(dāng)前的epitem加入到ready list中去 */
list_add_tail(&epi->rdllink, &ep->rdllist);
/* Notify waiting tasks that events are available */
/* 誰在epoll_wait, 就喚醒它... */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
/* 誰在epoll當(dāng)前的epollfd, 也喚醒它... */
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
atomic_inc(&ep->user->epoll_watches);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 0;
error_unregister:
ep_unregister_pollwait(ep, epi);
/*
* We need to do this because an event could have been arrived on some
* allocated wait queue. Note that we don't care about the ep->ovflist
* list, since that is used/cleaned only inside a section bound by "mtx".
* And ep_insert() is called with "mtx" held.
*/
spin_lock_irqsave(&ep->lock, flags);
if (ep_is_linked(&epi->rdllink))
list_del_init(&epi->rdllink);
spin_unlock_irqrestore(&ep->lock, flags);
kmem_***_free(epi_***, epi);
return error;
}
/*
* This is the callback that is used to add our wait queue to the
* target file wakeup lists.
*/
/*
* 該函數(shù)在調(diào)用f_op->poll()時會被調(diào)用.
* 也就是epoll主動poll某個fd時, 用來將epitem與指定的fd關(guān)聯(lián)起來的.
* 關(guān)聯(lián)的辦法就是使用等待隊列(waitqueue)
*/
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
if (epi->nwait >= 0 && (pwq = kmem_***_alloc(pwq_***, GFP_KERNEL))) {
/* 初始化等待隊列, 指定ep_poll_callback為喚醒時的回調(diào)函數(shù),
* 當(dāng)我們監(jiān)聽的fd發(fā)生狀態(tài)改變時, 也就是隊列頭被喚醒時,
* 指定的回調(diào)函數(shù)將會被調(diào)用. */
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
/* 將剛分配的等待隊列成員加入到頭中, 頭是由fd持有的 */
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
/* nwait記錄了當(dāng)前epitem加入到了多少個等待隊列中,
* 我認(rèn)為這個值最大也只會是1... */
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}
/*
* This is the callback that is passed to the wait queue wakeup
* machanism. It is called by the stored file descriptors when they
* have events to report.
*/
/*
* 這個是關(guān)鍵性的回調(diào)函數(shù), 當(dāng)我們監(jiān)聽的fd發(fā)生狀態(tài)改變時, 它會被調(diào)用.
* 參數(shù)key被當(dāng)作一個unsigned long整數(shù)使用, 攜帶的是events.
*/
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait);//從等待隊列獲取epitem.需要知道哪個進(jìn)程掛載到這個設(shè)備
struct eventpoll *ep = epi->ep;//獲取
spin_lock_irqsave(&ep->lock, flags);
/*
* If the event mask does not contain any poll(2) event, we consider the
* descriptor to be disabled. This condition is likely the effect of the
* EPOLLONESHOT bit that disables the descriptor when an event is received,
* until the next EPOLL_CTL_MOD will be issued.
*/
if (!(epi->event.events & ~EP_PRIVATE_BITS))
goto out_unlock;
/*
* Check the events coming with the callback. At this stage, not
* every device reports the events in the "key" parameter of the
* callback. We need to be able to handle both cases here, hence the
* test for "key" != NULL before the event match test.
*/
/* 沒有我們關(guān)心的event... */
if (key && !((unsigned long) key & epi->event.events))
goto out_unlock;
/*
* If we are trasfering events to userspace, we can hold no locks
* (because we're accessing user memory, and because of linux f_op->poll()
* semantics). All the events that happens during that period of time are
* chained in ep->ovflist and requeued later on.
*/
/*
* 這里看起來可能有點費解, 其實干的事情比較簡單:
* 如果該callback被調(diào)用的同時, epoll_wait()已經(jīng)返回了,
* 也就是說, 此刻應(yīng)用程序有可能已經(jīng)在循環(huán)獲取events,
* 這種情況下, 內(nèi)核將此刻發(fā)生event的epitem用一個單獨的鏈表
* 鏈起來, 不發(fā)給應(yīng)用程序, 也不丟棄, 而是在下一次epoll_wait
* 時返回給用戶.
*/
if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
if (epi->next == EP_UNACTIVE_PTR) {
epi->next = ep->ovflist;
ep->ovflist = epi;
}
goto out_unlock;
}
/* If this file is already in the ready list we exit soon */
/* 將當(dāng)前的epitem放入ready list */
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);
/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
/* 喚醒epoll_wait... */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
/* 如果epollfd也在被poll, 那就喚醒隊列里面的所有成員. */
if (waitqueue_active(&ep->poll_wait))
pwake++;
out_unlock:
spin_unlock_irqrestore(&ep->lock, flags);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return 1;
}
/*
* Implement the event wait interface for the eventpoll file. It is the kernel
* part of the user space epoll_wait(2).
*/
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
int, maxevents, int, timeout)
{
int error;
struct file *file;
struct eventpoll *ep;
/* The maximum number of event must be greater than zero */
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;
/* Verify that the area passed by the user is writeable */
/* 這個地方有必要說明一下:
* 內(nèi)核對應(yīng)用程序采取的策略是"絕對不信任",
* 所以內(nèi)核跟應(yīng)用程序之間的數(shù)據(jù)交互大都是copy, 不允許(也時候也是不能...)指針引用.
* epoll_wait()需要內(nèi)核返回數(shù)據(jù)給用戶空間, 內(nèi)存由用戶程序提供,
* 所以內(nèi)核會用一些手段來驗證這一段內(nèi)存空間是不是有效的.
*/
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
error = -EFAULT;
goto error_return;
}
/* Get the "struct file *" for the eventpoll file */
error = -EBADF;
/* 獲取epollfd的struct file, epollfd也是文件嘛 */
file = fget(epfd);
if (!file)
goto error_return;
/*
* We have to check that the file structure underneath the fd
* the user passed to us _is_ an eventpoll file.
*/
error = -EINVAL;
/* 檢查一下它是不是一個真正的epollfd... */
if (!is_file_epoll(file))
goto error_fput;
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
/* 獲取eventpoll結(jié)構(gòu) */
ep = file->private_data;
/* Time to fish for events ... */
/* OK, 睡覺, 等待事件到來~~ */
error = ep_poll(ep, events, maxevents, timeout);
error_fput:
fput(file);
error_return:
return error;
}
/* 這個函數(shù)真正將執(zhí)行epoll_wait的進(jìn)程帶入睡眠狀態(tài)... */
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;//等待隊列
/*
* Calculate the timeout by checking for the "infinite" value (-1)
* and the overflow condition. The passed timeout is in milliseconds,
* that why (t * HZ) / 1000.
*/
/* 計算睡覺時間, 毫秒要轉(zhuǎn)換為HZ */
jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;
retry:
spin_lock_irqsave(&ep->lock, flags);
res = 0;
/* 如果ready list不為空, 就不睡了, 直接干活... */
if (list_empty(&ep->rdllist)) {
/*
* We don't have any available event to return to the caller.
* We need to sleep here, and we will be wake up by
* ep_poll_callback() when events will become available.
*/
/* OK, 初始化一個等待隊列, 準(zhǔn)備直接把自己掛起,
* 注意current是一個宏, 代表當(dāng)前進(jìn)程 */
init_waitqueue_entry(&wait, current);//初始化等待隊列,wait表示當(dāng)前進(jìn)程
__add_wait_queue_exclusive(&ep->wq, &wait);//掛載到ep結(jié)構(gòu)的等待隊列
for (;;) {
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
/* 將當(dāng)前進(jìn)程設(shè)置位睡眠, 但是可以被信號喚醒的狀態(tài),
* 注意這個設(shè)置是"將來時", 我們此刻還沒睡! */
set_current_state(TASK_INTERRUPTIBLE);
/* 如果這個時候, ready list里面有成員了,
* 或者睡眠時間已經(jīng)過了, 就直接不睡了... */
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
/* 如果有信號產(chǎn)生, 也起床... */
if (signal_pending(current)) {
res = -EINTR;
break;
}
/* 啥事都沒有,解鎖, 睡覺... */
spin_unlock_irqrestore(&ep->lock, flags);
/* jtimeout這個時間后, 會被喚醒,
* ep_poll_callback()如果此時被調(diào)用,
* 那么我們就會直接被喚醒, 不用等時間了...
* 再次強(qiáng)調(diào)一下ep_poll_callback()的調(diào)用時機(jī)是由被監(jiān)聽的fd
* 的具體實現(xiàn), 比如socket或者某個設(shè)備驅(qū)動來決定的,
* 因為等待隊列頭是他們持有的, epoll和當(dāng)前進(jìn)程
* 只是單純的等待...
**/
jtimeout = schedule_timeout(jtimeout);//睡覺
spin_lock_irqsave(&ep->lock, flags);
}
__remove_wait_queue(&ep->wq, &wait);
/* OK 我們醒來了... */
set_current_state(TASK_RUNNING);
}
/* Is it worth to try to dig for events ? */
eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
spin_unlock_irqrestore(&ep->lock, flags);
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
/* 如果一切正常, 有event發(fā)生, 就開始準(zhǔn)備數(shù)據(jù)copy給用戶空間了... */
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;
return res;
}
/* 這個簡單, 我們直奔下一個... */
static int ep_send_events(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
struct ep_send_events_data esed;
esed.maxevents = maxevents;
esed.events = events;
return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}
/**
* ep_scan_ready_list - Scans the ready list in a way that makes possible for
* the scan code, to call f_op->poll(). Also allows for
* O(NumReady) performance.
*
* @ep: Pointer to the epoll private data structure.
* @sproc: Pointer to the scan callback.
* @priv: Private opaque data passed to the @sproc callback.
*
* Returns: The same integer error code returned by the @sproc callback.
*/
static int ep_scan_ready_list(struct eventpoll *ep,
int (*sproc)(struct eventpoll *,
struct list_head *, void *),
void *priv)
{
int error, pwake = 0;
unsigned long flags;
struct epitem *epi, *nepi;
LIST_HEAD(txlist);
/*
* We need to lock this because we could be hit by
* eventpoll_release_file() and epoll_ctl().
*/
mutex_lock(&ep->mtx);
/*
* Steal the ready list, and re-init the original one to the
* empty list. Also, set ep->ovflist to NULL so that events
* happening while looping w/out locks, are not lost. We cannot
* have the poll callback to queue directly on ep->rdllist,
* because we want the "sproc" callback to be able to do it
* in a lockless way.
*/
spin_lock_irqsave(&ep->lock, flags);
/* 這一步要注意, 首先, 所有監(jiān)聽到events的epitem都鏈到rdllist上了,
* 但是這一步之后, 所有的epitem都轉(zhuǎn)移到了txlist上, 而rdllist被清空了,
* 要注意哦, rdllist已經(jīng)被清空了! */
list_splice_init(&ep->rdllist, &txlist);
/* ovflist, 在ep_poll_callback()里面我解釋過, 此時此刻我們不希望
* 有新的event加入到ready list中了, 保存后下次再處理... */
ep->ovflist = NULL;
spin_unlock_irqrestore(&ep->lock, flags);
/*
* Now call the callback function.
*/
/* 在這個回調(diào)函數(shù)里面處理每個epitem
* sproc 就是 ep_send_events_proc, 下面會注釋到. */
error = (*sproc)(ep, &txlist, priv);
spin_lock_irqsave(&ep->lock, flags);
/*
* During the time we spent inside the "sproc" callback, some
* other events might have been queued by the poll callback.
* We re-insert them inside the main ready-list here.
*/
/* 現(xiàn)在我們來處理ovflist, 這些epitem都是我們在傳遞數(shù)據(jù)給用戶空間時
* 監(jiān)聽到了事件. */
for (nepi = ep->ovflist; (epi = nepi) != NULL;
nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
/*
* We need to check if the item is already in the list.
* During the "sproc" callback execution time, items are
* queued into ->ovflist but the "txlist" might already
* contain them, and the list_splice() below takes care of them.
*/
/* 將這些直接放入readylist */
if (!ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);
}
/*
* We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
* releasing the lock, events will be queued in the normal way inside
* ep->rdllist.
*/
ep->ovflist = EP_UNACTIVE_PTR;
/*
* Quickly re-inject items left on "txlist".
*/
/* 上一次沒有處理完的epitem, 重新插入到ready list */
list_splice(&txlist, &ep->rdllist);
/* ready list不為空, 直接喚醒... */
if (!list_empty(&ep->rdllist)) {
/*
* Wake up (if active) both the eventpoll wait list and
* the ->poll() wait list (delayed after we release the lock).
*/
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irqrestore(&ep->lock, flags);
mutex_unlock(&ep->mtx);
/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);
return error;
}
/* 該函數(shù)作為callbakc在ep_scan_ready_list()中被調(diào)用
* head是一個鏈表, 包含了已經(jīng)ready的epitem,
* 這個不是eventpoll里面的ready list, 而是上面函數(shù)中的txlist.
*/
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
void *priv)
{
struct ep_send_events_data *esed = priv;
int eventcnt;
unsigned int revents;
struct epitem *epi;
struct epoll_event __user *uevent;
/*
* We can loop without lock because we are passed a task private list.
* Items cannot vanish during the loop because ep_scan_ready_list() is
* holding "mtx" during this call.
*/
/* 掃描整個鏈表... */
for (eventcnt = 0, uevent = esed->events;
!list_empty(head) && eventcnt < esed->maxevents;) {
/* 取出第一個成員 */
epi = list_first_entry(head, struct epitem, rdllink);
/* 然后從鏈表里面移除 */
list_del_init(&epi->rdllink);
/* 讀取events,
* 注意events我們ep_poll_callback()里面已經(jīng)取過一次了, 為啥還要再取?
* 1. 我們當(dāng)然希望能拿到此刻的最新數(shù)據(jù), events是會變的~
* 2. 不是所有的poll實現(xiàn), 都通過等待隊列傳遞了events, 有可能某些驅(qū)動壓根沒傳
* 必須主動去讀取. */
revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
epi->event.events;
if (revents) {
/* 將當(dāng)前的事件和用戶傳入的數(shù)據(jù)都copy給用戶空間,
* 就是epoll_wait()后應(yīng)用程序能讀到的那一堆數(shù)據(jù). */
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
return eventcnt ? eventcnt : -EFAULT;
}
eventcnt++;
uevent++;
if (epi->event.events & EPOLLONESHOT)
epi->event.events &= EP_PRIVATE_BITS;
else if (!(epi->event.events & EPOLLET)) {
/* 嘿嘿, EPOLLET和非ET的區(qū)別就在這一步之差呀~
* 如果是ET, epitem是不會再進(jìn)入到readly list,
* 除非fd再次發(fā)生了狀態(tài)改變, ep_poll_callback被調(diào)用.
* 如果是非ET, 不管你還有沒有有效的事件或者數(shù)據(jù),
* 都會被重新插入到ready list, 再下一次epoll_wait
* 時, 會立即返回, 并通知給用戶空間. 當(dāng)然如果這個
* 被監(jiān)聽的fds確實沒事件也沒數(shù)據(jù)了, epoll_wait會返回一個0,
* 空轉(zhuǎn)一次.
*/
list_add_tail(&epi->rdllink, &ep->rdllist);
}
}
}
return eventcnt;
}
/* ep_free在epollfd被close時調(diào)用,
* 釋放一些資源而已, 比較簡單 */
static void ep_free(struct eventpoll *ep)
{
struct rb_node *rbp;
struct epitem *epi;
/* We need to release all tasks waiting for these file */
if (waitqueue_active(&ep->poll_wait))
ep_poll_safewake(&ep->poll_wait);
/*
* We need to lock this because we could be hit by
* eventpoll_release_file() while we're freeing the "struct eventpoll".
* We do not need to hold "ep->mtx" here because the epoll file
* is on the way to be removed and no one has references to it
* anymore. The only hit might come from eventpoll_release_file() but
* holding "epmutex" is sufficent here.
*/
mutex_lock(&epmutex);
/*
* Walks through the whole tree by unregistering poll callbacks.
*/
for (rbp = rb_first(&ep->rbr); rbp; rbp = rb_next(rbp)) {
epi = rb_entry(rbp, struct epitem, rbn);
ep_unregister_pollwait(ep, epi);
}
/*
* Walks through the whole tree by freeing each "struct epitem". At this
* point we are sure no poll callbacks will be lingering around, and also by
* holding "epmutex" we can be sure that no file cleanup code will hit
* us during this operation. So we can avoid the lock on "ep->lock".
*/
/* 之所以在關(guān)閉epollfd之前不需要調(diào)用epoll_ctl移除已經(jīng)添加的fd,
* 是因為這里已經(jīng)做了... */
while ((rbp = rb_first(&ep->rbr)) != NULL) {
epi = rb_entry(rbp, struct epitem, rbn);
ep_remove(ep, epi);
}
mutex_unlock(&epmutex);
mutex_destroy(&ep->mtx);
free_uid(ep->user);
kfree(ep);
}
/* File callbacks that implement the eventpoll file behaviour */
static const struct file_operations eventpoll_fops = {
.release = ep_eventpoll_release,
.poll = ep_eventpoll_poll
};
/* Fast test to see if the file is an evenpoll file */
static inline int is_file_epoll(struct file *f)
{
return f->f_op == &eventpoll_fops;
}
/* OK, eventpoll我認(rèn)為比較重要的函數(shù)都注釋完了... */
4.1 epoll_create
從slab緩存中創(chuàng)建一個eventpoll對象,并且創(chuàng)建一個匿名的fd跟fd對應(yīng)的file對象, 而eventpoll對象保存在struct file結(jié)構(gòu)的private指針中,并且返回, 該fd對應(yīng)的file operations只是實現(xiàn)了poll跟release操作。
創(chuàng)建eventpoll對象的初始化操作,獲取當(dāng)前用戶信息,是不是root,最大監(jiān)聽fd數(shù)目等并且保存到eventpoll對象中 初始化等待隊列,初始化就緒鏈表,初始化紅黑樹的頭結(jié)點。
4.2 epoll_ctl操作
將epoll_event結(jié)構(gòu)拷貝到內(nèi)核空間中,并且判斷加入的fd是否支持poll結(jié)構(gòu)(epoll,poll,selectI/O多路復(fù)用必須支持poll操作),并且從epfd->file->privatedata獲取event_poll對象,根據(jù)op區(qū)分是添加刪除還是修改, 首先在eventpoll結(jié)構(gòu)中的紅黑樹查找是否已經(jīng)存在了相對應(yīng)的fd,沒找到就支持插入操作,否則報重復(fù)的錯誤,相對應(yīng)的修改,刪除比較簡單就不啰嗦了。
插入操作時,會創(chuàng)建一個與fd對應(yīng)的epitem結(jié)構(gòu),并且初始化相關(guān)成員,比如保存監(jiān)聽的fd跟file結(jié)構(gòu)之類的,重要的是指定了調(diào)用poll_wait時的回調(diào)函數(shù)用于數(shù)據(jù)就緒時喚醒進(jìn)程,(其內(nèi)部,初始化設(shè)備的等待隊列,將該進(jìn)程注冊到等待隊列)完成這一步, 我們的epitem就跟這個socket關(guān)聯(lián)起來了, 當(dāng)它有狀態(tài)變化時, 會通過ep_poll_callback()來通知,最后調(diào)用加入的fd的file operation->poll函數(shù)(最后會調(diào)用poll_wait操作)用于完成注冊操作,最后將epitem結(jié)構(gòu)添加到紅黑樹中。
4.3 epoll_wait操作
計算睡眠時間(如果有),判斷eventpoll對象的鏈表是否為空,不為空那就干活不睡明.并且初始化一個等待隊列,把自己掛上去,設(shè)置自己的進(jìn)程狀態(tài),為可睡眠狀態(tài).判斷是否有信號到來(有的話直接被中斷醒來,),如果啥事都沒有那就調(diào)用schedule_timeout進(jìn)行睡眠,如果超時或者被喚醒,首先從自己初始化的等待隊列刪除,然后開始拷貝資源給用戶空間了,拷貝資源則是先把就緒事件鏈表轉(zhuǎn)移到中間鏈表,然后挨個遍歷拷貝到用戶空間, 并且挨個判斷其是否為水平觸發(fā),是的話再次插入到就緒鏈表。
五、epoll使用實例:TCP服務(wù)器處理多個客戶端請求
5.1 epoll創(chuàng)建
int epoll_create(int size); //監(jiān)聽個數(shù)
5.2 epoll事件設(shè)置
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
第一個參數(shù)epfd是epoll_create()的返回值。
第二個參數(shù)op表示動作,用三個宏來表示:
EPOLL_CTL_ADD:注冊新的fd到epfd中;
EPOLL_CTL_MOD:修改已經(jīng)注冊的fd的監(jiān)聽事件;
EPOLL_CTL_DEL:從epfd中刪除一個fd;
第三個參數(shù)是需要監(jiān)聽的fd。
第四個參數(shù)是告訴內(nèi)核需要監(jiān)聽什么事。
struct epoll_event結(jié)構(gòu)如下:
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events可以是以下幾個宏的集合:
- EPOLLIN :表示對應(yīng)的文件描述符可以讀(包括對端SOCKET正常關(guān)閉);
- EPOLLOUT:表示對應(yīng)的文件描述符可以寫;
- EPOLLPRI:表示對應(yīng)的文件描述符有緊急的數(shù)據(jù)可讀(這里應(yīng)該表示有帶外數(shù)據(jù)到來);
- EPOLLERR:表示對應(yīng)的文件描述符發(fā)生錯誤;
- EPOLLHUP:表示對應(yīng)的文件描述符被掛斷;
- EPOLLET:將EPOLL設(shè)為邊緣觸發(fā)(Edge Triggered)模式,這是相對于水平觸發(fā)(Level Triggered)來說的。
- EPOLLONESHOT:只監(jiān)聽一次事件,當(dāng)監(jiān)聽完這次事件之后,如果還需要繼續(xù)監(jiān)聽這個socket的話,需要再次把這個socket加入到EPOLL隊列里
5.3 epoll監(jiān)聽
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
- 等待事件的產(chǎn)生,類似于select()調(diào)用。
- 參數(shù)events用來從內(nèi)核得到事件的集合,
- maxevents告之內(nèi)核這個events有多大,這個maxevents的值不能大于創(chuàng)建epoll_create()時的size,
- 參數(shù)timeout是超時時間(毫秒,0會立即返回,-1將不確定,也有說法說是永久阻塞)。
- 該函數(shù)返回需要處理的事件數(shù)目,如返回0表示已超時。
5.4 編程實例測試
本次測試在上篇Unix域socket通信代碼的基礎(chǔ)上進(jìn)行修改,只使用TCP方式的socket通信進(jìn)行測試。上篇的測試代碼,服務(wù)端接收到一個客戶端的連接后,就僅對該客戶端進(jìn)行服務(wù),沒有再接收其它客戶端的處理邏輯,本篇要實現(xiàn)的,就是一個服務(wù)端,能夠接收多個客戶端的數(shù)據(jù)。
編程之前,先來看下要實現(xiàn)的程序結(jié)構(gòu),其中黃色的部分為本篇在上篇例程的基礎(chǔ)上,需要增加的部分:
圖片
。
①為socket服務(wù)端增加epoll監(jiān)聽功能,TCP服務(wù)端的代碼修改后如下,主要的修改在listen之后,創(chuàng)建一個epoll,然后把服務(wù)端的socketfd加入epoll進(jìn)行監(jiān)聽:
當(dāng)有新的客戶端請求連接時,服務(wù)端的socketfd會收到事件,進(jìn)而epoll會收到服務(wù)端socketfd的EPOLLIN事件,此時可以讓服務(wù)端接受客戶端的請求,并把創(chuàng)建的客戶端fd也加入到epoll進(jìn)行監(jiān)聽
當(dāng)客戶端連接成功并被epoll監(jiān)聽后,客戶端再發(fā)消息過來,epoll就會收到對應(yīng)客戶端fd的EPOLLIN事件,此時可以讓服務(wù)端讀取客戶端的消息
#define LISTEN_MAX 5
#define EPOLL_FDSIZE LISTEN_MAX
#define EPOLL_EVENTS 20
#define CLIENT_NUM 3
void EpollAddEvent(int epollfd, int fd, int event)
{
PRINT("epollfd:%d add fd:%d(event:%d)\n", epollfd, fd, event);
struct epoll_event ev;
ev.events = event;
ev.data.fd = fd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &ev);
}
void TcpServerThread()
{
//------------socket
int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sockfd < 0)
{
PRINT("create socket fail\n");
return;
}
PRINT("create socketfd:%d\n", sockfd);
struct sockaddr_un addr;
memset (&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strcpy(addr.sun_path, UNIX_TCP_SOCKET_ADDR);
//------------bind
if (bind(sockfd, (struct sockaddr *)&addr, sizeof(addr)))
{
PRINT("bind fail\n");
return;
}
PRINT("bind ok\n");
//------------listen
if (listen(sockfd, LISTEN_MAX))
{
PRINT("listen fail\n");
return;
}
PRINT("listen ok\n");
//------------epoll---------------
int epollfd = epoll_create(EPOLL_FDSIZE);
if (epollfd < 0)
{
PRINT("epoll create fail\n");
return;
}
PRINT("epoll create fd:%d\n", epollfd);
EpollAddEvent(epollfd, sockfd, EPOLLIN);
struct epoll_event events[EPOLL_EVENTS];
while(1)
{
PRINT("epoll wait...\n");
int num = epoll_wait(epollfd, events, EPOLL_EVENTS, -1);
PRINT("epoll wait done, num:%d\n", num);
for (int i = 0;i < num;i++)
{
int fd = events[i].data.fd;
if (EPOLLIN == events[i].events)
{
//接受客戶端的連接請求
if (fd == sockfd)
{
//------------accept
int clientfd = accept(sockfd, NULL, NULL);
if (clientfd == -1)
{
PRINT("accpet error\n");
}
else
{
PRINT("=====> accept new clientfd:%d\n", clientfd);
EpollAddEvent(epollfd, clientfd, EPOLLIN);
}
}
//讀取客戶端發(fā)來的數(shù)據(jù)
else
{
char buf[BUF_SIZE] = {0};
//------------recv
size_t size = recv(fd, buf, BUF_SIZE, 0);
//size = read(clientfd, buf, BUF_SIZE);
if (size > 0)
{
PRINT("recv from clientfd:%d, msg:%s\n", fd, buf);
}
}
}
}
}
PRINT("end\n");
}
⑵啟動多個客戶端進(jìn)行測試,修改主程序,創(chuàng)建多個客戶端線程,產(chǎn)生多個客戶端,去連接同一個服務(wù)端,來測試epoll監(jiān)聽多個事件的功能。
int main()
{
unlink(UNIX_TCP_SOCKET_ADDR);
//創(chuàng)建一個服務(wù)端
thread thServer(TcpServerThread);
//創(chuàng)建多個客戶端
thread thClinet[CLIENT_NUM];
for (int i=0; i<CLIENT_NUM; i++)
{
thClinet[i] = thread(TcpClientThread);
sleep(1);
}
while(1)
{
sleep(5);
}
}
本例中,CLIENT_NUM為3,使用3個客戶端來測試epoll功能。
⑶測試結(jié)果,在Ubuntu上編譯運行,程序運行時的打印如下:
[TcpServerThread] create socketfd:3
[TcpServerThread] bind ok
[TcpClientThread] create socketfd:4
[TcpServerThread] listen ok
[TcpServerThread] epoll create fd:5
[EpollAddEvent] epollfd:5 add fd:3(event:1)
[TcpServerThread] epoll wait...
[TcpClientThread] create socketfd:6
[TcpClientThread] connect ok
[TcpServerThread] epoll wait done, num:1
[TcpServerThread] =====> accept new clientfd:7
[EpollAddEvent] epollfd:5 add fd:7(event:1)
[TcpServerThread] epoll wait...
[TcpServerThread] epoll wait done, num:1
[TcpServerThread] recv from clientfd:7, msg:helloTCP(fd:4)1
[TcpServerThread] epoll wait...
[TcpClientThread] create socketfd:8
[TcpClientThread] connect ok
[TcpServerThread] epoll wait done, num:2
[TcpServerThread] recv from clientfd:7, msg:helloTCP(fd:4)2
[TcpServerThread] =====> accept new clientfd:9
[EpollAddEvent] epollfd:5 add fd:9(event:1)
[TcpServerThread] epoll wait...
[TcpServerThread] epoll wait done, num:1
[TcpServerThread] recv from clientfd:9, msg:helloTCP(fd:6)3
[TcpServerThread] epoll wait...
[TcpClientThread] connect ok
[TcpServerThread] epoll wait done, num:3
[TcpServerThread] =====> accept new clientfd:10
[EpollAddEvent] epollfd:5 add fd:10(event:1)
[TcpServerThread] recv from clientfd:7, msg:helloTCP(fd:4)5
[TcpServerThread] recv from clientfd:9, msg:helloTCP(fd:6)6
[TcpServerThread] epoll wait...
[TcpServerThread] epoll wait done, num:1
[TcpServerThread] recv from clientfd:10, msg:helloTCP(fd:8)4
[TcpServerThread] epoll wait...
[TcpServerThread] epoll wait done, num:3
[TcpServerThread] recv from clientfd:10, msg:helloTCP(fd:8)7
[TcpServerThread] recv from clientfd:7, msg:helloTCP(fd:4)8
[TcpServerThread] recv from clientfd:9, msg:helloTCP(fd:6)9
[TcpServerThread] epoll wait...
[TcpServerThread] epoll wait done, num:1
[TcpServerThread] recv from clientfd:7, msg:helloTCP(fd:4)10
[TcpServerThread] epoll wait...
[TcpServerThread] epoll wait done, num:2
[TcpServerThread] recv from clientfd:9, msg:helloTCP(fd:6)12
[TcpServerThread] recv from clientfd:10, msg:helloTCP(fd:8)11
[TcpServerThread] epoll wait...
[TcpServerThread] epoll wait done, num:3
[TcpServerThread] recv from clientfd:10, msg:helloTCP(fd:8)14
[TcpServerThread] recv from clientfd:7, msg:helloTCP(fd:4)13
[TcpServerThread] recv from clientfd:9, msg:helloTCP(fd:6)15
[TcpServerThread] epoll wait...
[TcpServerThread] epoll wait done, num:3
[TcpServerThread] recv from clientfd:10, msg:helloTCP(fd:8)16
[TcpServerThread] recv from clientfd:7, msg:helloTCP(fd:4)17
[TcpServerThread] recv from clientfd:9, msg:helloTCP(fd:6)18
[TcpServerThread] epoll wait...
對結(jié)果標(biāo)注一下,更容易理解程序運行過程:
可以看到,服務(wù)端依次接受了3個客戶端的連接請求,然后可以接收3個客戶端發(fā)來的數(shù)據(jù)。