圖解 | Linux進(jìn)程通信之管道實現(xiàn)
本文轉(zhuǎn)載自微信公眾號「Linux內(nèi)核那些事」,作者songsong001 。轉(zhuǎn)載本文請聯(lián)系Linux內(nèi)核那些事公眾號。
處于安全的考慮,不同進(jìn)程之間的內(nèi)存空間是相互隔離的,也就是說 進(jìn)程A 是不能訪問 進(jìn)程B 的內(nèi)存空間,反之亦然。如果不同進(jìn)程間能夠相互訪問和修改對方的內(nèi)存,那么當(dāng)前進(jìn)程的內(nèi)存就有可能被其他進(jìn)程非法修改,從而導(dǎo)致安全隱患。
不同的進(jìn)程就像是大海上孤立的島嶼,它們之間不能直接相互通信,如下圖所示:
但某些場景下,不同進(jìn)程間需要相互通信,比如:進(jìn)程A 負(fù)責(zé)處理用戶的請求,而 進(jìn)程B 負(fù)責(zé)保存處理后的數(shù)據(jù)。那么當(dāng) 進(jìn)程A 處理完請求后,就需要把處理后的數(shù)據(jù)提交給 進(jìn)程B 進(jìn)行存儲。此時,進(jìn)程A 就需要與 進(jìn)程B 進(jìn)行通信。如下圖所示:
由于不同進(jìn)程間是相互隔離的,所以必須借助內(nèi)核來作為橋梁來進(jìn)行相互通信,內(nèi)核相當(dāng)于島嶼之間的輪船,如下圖所示:
內(nèi)核提供多種進(jìn)程間通信的方式,如:共享內(nèi)存,信號,消息隊列 和 管道(pipe) 等。本文主要介紹 管道 的原理與實現(xiàn)。
一、管道的使用
管道 一般用于父子進(jìn)程之間相互通信,一般的用法如下:
- 父進(jìn)程使用 pipe 系統(tǒng)調(diào)用創(chuàng)建一個管道。
- 然后父進(jìn)程使用 fork 系統(tǒng)調(diào)用創(chuàng)建一個子進(jìn)程。
- 由于子進(jìn)程會繼承父進(jìn)程打開的文件句柄,所以父子進(jìn)程可以通過新創(chuàng)建的管道進(jìn)行通信。
其原理如下圖所示:
由于管道分為讀端和寫端,所以需要兩個文件描述符來管理管道:fd[0] 為讀端,fd[1] 為寫端。
下面代碼介紹了怎么使用 pipe 系統(tǒng)調(diào)用來創(chuàng)建一個管道:
- #include <stdio.h>
- #include <unistd.h>
- #include <sys/types.h>
- #include <stdlib.h>
- #include <string.h>
- int main()
- {
- int ret = -1;
- int fd[2]; // 用于管理管道的文件描述符
- pid_t pid;
- char buf[512] = {0};
- char *msg = "hello world";
- // 創(chuàng)建一個管理
- ret = pipe(fd);
- if (-1 == ret) {
- printf("failed to create pipe\n");
- return -1;
- }
- pid = fork(); // 創(chuàng)建子進(jìn)程
- if (0 == pid) { // 子進(jìn)程
- close(fd[0]); // 關(guān)閉管道的讀端
- ret = write(fd[1], msg, strlen(msg)); // 向管道寫端寫入數(shù)據(jù)
- exit(0);
- } else { // 父進(jìn)程
- close(fd[1]); // 關(guān)閉管道的寫端
- ret = read(fd[0], buf, sizeof(buf)); // 從管道的讀端讀取數(shù)據(jù)
- printf("parent read %d bytes data: %s\n", ret, buf);
- }
- return 0;
- }
編譯代碼:
- [root@localhost pipe]# gcc -g pipe.c -o pipe
運行代碼,輸出結(jié)果如下:
- [root@localhost pipe]# ./pipe
- parent read 11 bytes data: hello world
二、管道的實現(xiàn)
每個進(jìn)程的用戶空間都是獨立的,但內(nèi)核空間卻是共用的。所以,進(jìn)程間通信必須由內(nèi)核提供服務(wù)。前面介紹了 管道(pipe) 的使用,接下來將會介紹管道在內(nèi)核中的實現(xiàn)方式。
本文使用 Linux-2.6.23 內(nèi)核作為分析對象。
1. 環(huán)形緩沖區(qū)(Ring Buffer)
在內(nèi)核中,管道 使用了環(huán)形緩沖區(qū)來存儲數(shù)據(jù)。環(huán)形緩沖區(qū)的原理是:把一個緩沖區(qū)當(dāng)成是首尾相連的環(huán),其中通過讀指針和寫指針來記錄讀操作和寫操作位置。如下圖所示:
在 Linux 內(nèi)核中,使用了 16 個內(nèi)存頁作為環(huán)形緩沖區(qū),所以這個環(huán)形緩沖區(qū)的大小為 64KB(16 * 4KB)。
當(dāng)向管道寫數(shù)據(jù)時,從寫指針指向的位置開始寫入,并且將寫指針向前移動。而從管道讀取數(shù)據(jù)時,從讀指針開始讀入,并且將讀指針向前移動。當(dāng)對沒有數(shù)據(jù)可讀的管道進(jìn)行讀操作,將會阻塞當(dāng)前進(jìn)程。而對沒有空閑空間的管道進(jìn)行寫操作,也會阻塞當(dāng)前進(jìn)程。
注意:可以將管道文件描述符設(shè)置為非阻塞,這樣對管道進(jìn)行讀寫操作時,就不會阻塞當(dāng)前進(jìn)程。
2. 管道對象
在 Linux 內(nèi)核中,管道使用 pipe_inode_info 對象來進(jìn)行管理。我們先來看看 pipe_inode_info 對象的定義,如下所示:
- struct pipe_inode_info {
- wait_queue_head_t wait;
- unsigned int nrbufs,
- unsigned int curbuf;
- ...
- unsigned int readers;
- unsigned int writers;
- unsigned int waiting_writers;
- ...
- struct inode *inode;
- struct pipe_buffer bufs[16];
- };
下面介紹一下 pipe_inode_info 對象各個字段的作用:
- wait:等待隊列,用于存儲正在等待管道可讀或者可寫的進(jìn)程。
- bufs:環(huán)形緩沖區(qū),由 16 個 pipe_buffer 對象組成,每個 pipe_buffer 對象擁有一個內(nèi)存頁 ,后面會介紹。
- nrbufs:表示未讀數(shù)據(jù)已經(jīng)占用了環(huán)形緩沖區(qū)的多少個內(nèi)存頁。
- curbuf:表示當(dāng)前正在讀取環(huán)形緩沖區(qū)的哪個內(nèi)存頁中的數(shù)據(jù)。
- readers:表示正在讀取管道的進(jìn)程數(shù)。
- writers:表示正在寫入管道的進(jìn)程數(shù)。
- waiting_writers:表示等待管道可寫的進(jìn)程數(shù)。
- inode:與管道關(guān)聯(lián)的 inode 對象。
由于環(huán)形緩沖區(qū)是由 16 個 pipe_buffer 對象組成,所以下面我們來看看 pipe_buffer 對象的定義:
- struct pipe_buffer {
- struct page *page;
- unsigned int offset;
- unsigned int len;
- ...
- };
下面介紹一下 pipe_buffer 對象各個字段的作用:
- page:指向 pipe_buffer 對象占用的內(nèi)存頁。
- offset:如果進(jìn)程正在讀取當(dāng)前內(nèi)存頁的數(shù)據(jù),那么 offset 指向正在讀取當(dāng)前內(nèi)存頁的偏移量。
- len:表示當(dāng)前內(nèi)存頁擁有未讀數(shù)據(jù)的長度。
- 下圖展示了 pipe_inode_info 對象與 pipe_buffer 對象的關(guān)系:
管道的環(huán)形緩沖區(qū)實現(xiàn)方式與經(jīng)典的環(huán)形緩沖區(qū)實現(xiàn)方式有點區(qū)別,經(jīng)典的環(huán)形緩沖區(qū)一般先申請一塊地址連續(xù)的內(nèi)存塊,然后通過讀指針與寫指針來對讀操作與寫操作進(jìn)行定位。
但為了減少對內(nèi)存的使用,內(nèi)核不會在創(chuàng)建管道時就申請 64K 的內(nèi)存塊,而是在進(jìn)程向管道寫入數(shù)據(jù)時,按需來申請內(nèi)存。
那么當(dāng)進(jìn)程從管道讀取數(shù)據(jù)時,內(nèi)核怎么處理呢?下面我們來看看管道讀操作的實現(xiàn)方式。
3. 讀操作
從 經(jīng)典的環(huán)形緩沖區(qū) 中讀取數(shù)據(jù)時,首先通過讀指針來定位到讀取數(shù)據(jù)的起始地址,然后判斷環(huán)形緩沖區(qū)中是否有數(shù)據(jù)可讀,如果有就從環(huán)形緩沖區(qū)中讀取數(shù)據(jù)到用戶空間的緩沖區(qū)中。如下圖所示:
而 管道的環(huán)形緩沖區(qū) 與 經(jīng)典的環(huán)形緩沖區(qū) 實現(xiàn)稍有不同,管道的環(huán)形緩沖區(qū) 其讀指針是由 pipe_inode_info 對象的 curbuf 字段與 pipe_buffer 對象的 offset 字段組合而成:
- pipe_inode_info 對象的 curbuf 字段表示讀操作要從 bufs 數(shù)組的哪個 pipe_buffer 中讀取數(shù)據(jù)。
- pipe_buffer 對象的 offset 字段表示讀操作要從內(nèi)存頁的哪個位置開始讀取數(shù)據(jù)。
讀取數(shù)據(jù)的過程如下圖所示:
從緩沖區(qū)中讀取到 n 個字節(jié)的數(shù)據(jù)后,會相應(yīng)移動讀指針 n 個字節(jié)的位置(也就是增加 pipe_buffer 對象的 offset 字段),并且減少 n 個字節(jié)的可讀數(shù)據(jù)長度(也就是減少 pipe_buffer 對象的 len 字段)。
當(dāng) pipe_buffer 對象的 len 字段變?yōu)?0 時,表示當(dāng)前 pipe_buffer 沒有可讀數(shù)據(jù),那么將會對 pipe_inode_info 對象的 curbuf 字段移動一個位置,并且其 nrbufs 字段進(jìn)行減一操作。
我們來看看管道讀操作的代碼實現(xiàn),讀操作由 pipe_read 函數(shù)完成。為了突出重點,我們只列出關(guān)鍵代碼,如下所示:
- static ssize_t
- pipe_read(struct kiocb *iocb, const struct iovec *_iov, unsigned long nr_segs,
- loff_t pos)
- {
- ...
- struct pipe_inode_info *pipe;
- // 1. 獲取管道對象
- pipe = inode->i_pipe;
- for (;;) {
- // 2. 獲取管道未讀數(shù)據(jù)占有多少個內(nèi)存頁
- int bufs = pipe->nrbufs;
- if (bufs) {
- // 3. 獲取讀操作應(yīng)該從環(huán)形緩沖區(qū)的哪個內(nèi)存頁處讀取數(shù)據(jù)
- int curbuf = pipe->curbuf;
- struct pipe_buffer *buf = pipe->bufs + curbuf;
- ...
- /* 4. 通過 pipe_buffer 的 offset 字段獲取真正的讀指針,
- * 并且從管道中讀取數(shù)據(jù)到用戶緩沖區(qū).
- */
- error = pipe_iov_copy_to_user(iov, addr + buf->offset, chars, atomic);
- ...
- ret += chars;
- buf->offset += chars; // 增加 pipe_buffer 對象的 offset 字段的值
- buf->len -= chars; // 減少 pipe_buffer 對象的 len 字段的值
- /* 5. 如果當(dāng)前內(nèi)存頁的數(shù)據(jù)已經(jīng)被讀取完畢 */
- if (!buf->len) {
- ...
- curbuf = (curbuf + 1) & (PIPE_BUFFERS - 1);
- pipe->curbuf = curbuf; // 移動 pipe_inode_info 對象的 curbuf 指針
- pipe->nrbufs = --bufs; // 減少 pipe_inode_info 對象的 nrbufs 字段
- do_wakeup = 1;
- }
- total_len -= chars;
- // 6. 如果讀取到用戶期望的數(shù)據(jù)長度, 退出循環(huán)
- if (!total_len)
- break;
- }
- ...
- }
- ...
- return ret;
- }
上面代碼總結(jié)來說分為以下步驟:
- 通過文件 inode 對象來獲取到管道的 pipe_inode_info 對象。
- 通過 pipe_inode_info 對象的 nrbufs 字段獲取管道未讀數(shù)據(jù)占有多少個內(nèi)存頁。
- 通過 pipe_inode_info 對象的 curbuf 字段獲取讀操作應(yīng)該從環(huán)形緩沖區(qū)的哪個內(nèi)存頁處讀取數(shù)據(jù)。
- 通過 pipe_buffer 對象的 offset 字段獲取真正的讀指針, 并且從管道中讀取數(shù)據(jù)到用戶緩沖區(qū)。
- 如果當(dāng)前內(nèi)存頁的數(shù)據(jù)已經(jīng)被讀取完畢,那么移動 pipe_inode_info 對象的 curbuf 指針,并且減少其 nrbufs 字段的值。
- 如果讀取到用戶期望的數(shù)據(jù)長度,退出循環(huán)。
4. 寫操作
分析完管道讀操作的實現(xiàn)后,接下來,我們分析一下管道寫操作的實現(xiàn)。
經(jīng)典的環(huán)形緩沖區(qū) 寫入數(shù)據(jù)時,首先通過寫指針進(jìn)行定位要寫入的內(nèi)存地址,然后判斷環(huán)形緩沖區(qū)的空間是否足夠,足夠就把數(shù)據(jù)寫入到環(huán)形緩沖區(qū)中。如下圖所示:
但 管道的環(huán)形緩沖區(qū) 并沒有保存 寫指針,而是通過 讀指針 計算出來。那么怎么通過讀指針計算出寫指針呢?
其實很簡單,就是:
寫指針 = 讀指針 + 未讀數(shù)據(jù)長度
下面我們來看看,向管道寫入 200 字節(jié)數(shù)據(jù)的過程示意圖,如下所示:
如上圖所示,向管道寫入數(shù)據(jù)時:
- 首先通過 pipe_inode_info 的 curbuf 字段和 nrbufs 字段來定位到,應(yīng)該向哪個 pipe_buffer 寫入數(shù)據(jù)。
- 然后再通過 pipe_buffer 對象的 offset 字段和 len 字段來定位到,應(yīng)該寫入到內(nèi)存頁的哪個位置。
下面我們通過源碼來分析,寫操作是怎么實現(xiàn)的,代碼如下(為了特出重點,代碼有所刪減):
- static ssize_t
- pipe_write(struct kiocb *iocb, const struct iovec *_iov, unsigned long nr_segs,
- loff_t ppos)
- {
- ...
- struct pipe_inode_info *pipe;
- ...
- pipe = inode->i_pipe;
- ...
- chars = total_len & (PAGE_SIZE - 1); /* size of the last buffer */
- // 1. 如果最后寫入的 pipe_buffer 還有空閑的空間
- if (pipe->nrbufs && chars != 0) {
- // 獲取寫入數(shù)據(jù)的位置
- int lastbuf = (pipe->curbuf + pipe->nrbufs - 1) & (PIPE_BUFFERS-1);
- struct pipe_buffer *buf = pipe->bufs + lastbuf;
- const struct pipe_buf_operations *ops = buf->ops;
- int offset = buf->offset + buf->len;
- if (ops->can_merge && offset + chars <= PAGE_SIZE) {
- ...
- error = pipe_iov_copy_from_user(offset + addr, iov, chars, atomic);
- ...
- buf->len += chars;
- total_len -= chars;
- ret = chars;
- // 如果要寫入的數(shù)據(jù)已經(jīng)全部寫入成功, 退出循環(huán)
- if (!total_len)
- goto out;
- }
- }
- // 2. 如果最后寫入的 pipe_buffer 空閑空間不足, 那么申請一個新的內(nèi)存頁來存儲數(shù)據(jù)
- for (;;) {
- int bufs;
- ...
- bufs = pipe->nrbufs;
- if (bufs < PIPE_BUFFERS) {
- int newbuf = (pipe->curbuf + bufs) & (PIPE_BUFFERS-1);
- struct pipe_buffer *buf = pipe->bufs + newbuf;
- ...
- // 申請一個新的內(nèi)存頁
- if (!page) {
- page = alloc_page(GFP_HIGHUSER);
- ...
- }
- ...
- error = pipe_iov_copy_from_user(src, iov, chars, atomic);
- ...
- ret += chars;
- buf->page = page;
- buf->ops = &anon_pipe_buf_ops;
- buf->offset = 0;
- buf->len = chars;
- pipe->nrbufs = ++bufs;
- pipe->tmp_page = NULL;
- // 如果要寫入的數(shù)據(jù)已經(jīng)全部寫入成功, 退出循環(huán)
- total_len -= chars;
- if (!total_len)
- break;
- }
- ...
- }
- out:
- ...
- return ret;
- }
上面代碼有點長,但是邏輯卻很簡單,主要進(jìn)行如下操作:
如果上次寫操作寫入的 pipe_buffer 還有空閑的空間,那么就將數(shù)據(jù)寫入到此 pipe_buffer 中,并且增加其 len 字段的值。
如果上次寫操作寫入的 pipe_buffer 沒有足夠的空閑空間,那么就新申請一個內(nèi)存頁,并且把數(shù)據(jù)保存到新的內(nèi)存頁中,并且增加 pipe_inode_info 的 nrbufs 字段的值。
如果寫入的數(shù)據(jù)已經(jīng)全部寫入成功,那么就退出寫操作。
三、思考一下
管道讀寫操作的實現(xiàn)已經(jīng)分析完畢,現(xiàn)在我們來思考一下以下問題。
1. 為什么父子進(jìn)程可以通過管道來通信?
這是因為父子進(jìn)程通過 pipe 系統(tǒng)調(diào)用打開的管道,在內(nèi)核空間中指向同一個管道對象(pipe_inode_info)。所以父子進(jìn)程共享著同一個管道對象,那么就可以通過這個共享的管道對象進(jìn)行通信。
2. 為什么內(nèi)核要使用 16 個內(nèi)存頁進(jìn)行數(shù)據(jù)存儲?
這是為了減少內(nèi)存使用。
因為使用 pipe 系統(tǒng)調(diào)用打開管道時,并沒有立刻申請內(nèi)存頁,而是當(dāng)有進(jìn)程向管道寫入數(shù)據(jù)時,才會按需申請內(nèi)存頁。當(dāng)內(nèi)存頁的數(shù)據(jù)被讀取完后,內(nèi)核會將此內(nèi)存頁回收,來減少管道對內(nèi)存的使用。