Linux 網(wǎng)絡(luò)編程:從 Socket API 到極簡(jiǎn) Redis 發(fā)布/訂閱 sub/pub 服務(wù)的實(shí)現(xiàn)
引言
本文旨在系統(tǒng)性地闡述 Linux 環(huán)境下的網(wǎng)絡(luò)編程基礎(chǔ),重點(diǎn)關(guān)注 Socket 應(yīng)用程序接口(API)的原理與應(yīng)用。通過(guò)循序漸進(jìn)的方式,結(jié)合具體的 C 語(yǔ)言代碼示例,我們將剖析核心系統(tǒng)調(diào)用的機(jī)制,并最終構(gòu)建一個(gè)基于傳輸控制協(xié)議(TCP)的簡(jiǎn)化版發(fā)布/訂閱(Publish/Subscribe, Pub/Sub)服務(wù)器模型。
在進(jìn)行深入探討之前,本文假設(shè)讀者已具備以下先驗(yàn)知識(shí):
- C 語(yǔ)言編程能力 :熟悉 C 語(yǔ)言的核心語(yǔ)法、指針操作及內(nèi)存管理機(jī)制。
- Linux 操作系統(tǒng)基礎(chǔ) :了解 Linux 基本命令行操作及 C 程序的編譯流程(例如,使用 GCC 工具鏈)。
- 計(jì)算機(jī)網(wǎng)絡(luò)基礎(chǔ) :對(duì) OSI 參考模型或 TCP/IP 協(xié)議棧有概念性認(rèn)識(shí),理解 TCP 與 UDP 的核心差異(面向連接與無(wú)連接、可靠性保證機(jī)制等),并掌握 IP 地址和端口號(hào)在網(wǎng)絡(luò)通信中的作用。
網(wǎng)絡(luò)編程的本質(zhì)是實(shí)現(xiàn)進(jìn)程間通信(Inter-Process Communication, IPC),特別是跨越主機(jī)邊界的分布式通信。在 Linux 及其他類(lèi) Unix 操作系統(tǒng)中,Socket(套接字)接口是實(shí)現(xiàn)網(wǎng)絡(luò)通信的標(biāo)準(zhǔn)范式。它提供了一套抽象的 API,允許應(yīng)用程序?qū)⒕W(wǎng)絡(luò)通信視為一種特殊的文件 I/O 操作,從而簡(jiǎn)化了網(wǎng)絡(luò)數(shù)據(jù)收發(fā)的復(fù)雜性。
本文的實(shí)踐目標(biāo)是構(gòu)建一個(gè)功能類(lèi)似于 Redis 服務(wù)器 PUBLISH 和 SUBSCRIBE 命令的簡(jiǎn)化服務(wù)實(shí)例:
- 服務(wù)器進(jìn)程監(jiān)聽(tīng)一個(gè)預(yù)定義的 TCP 端口。
- 允許多個(gè)客戶(hù)端并發(fā)連接至服務(wù)器。
- 客戶(hù)端通過(guò)發(fā)送特定格式的命令(如 SUB <topic_name>)訂閱感興趣的主題。
- 客戶(hù)端通過(guò)發(fā)送特定格式的命令(如 PUB <topic_name> <message_data>)向指定主題發(fā)布消息。
- 服務(wù)器負(fù)責(zé)將發(fā)布的消息轉(zhuǎn)發(fā)給所有訂閱了對(duì)應(yīng)主題的客戶(hù)端(通常不包括發(fā)布者自身)。
接下來(lái),我們將逐步解析相關(guān)的系統(tǒng)調(diào)用和編程技術(shù)。
Linux 上的 Socket API
在 Linux 操作系統(tǒng)的設(shè)計(jì)哲學(xué)中,“一切皆文件”是一個(gè)核心概念。網(wǎng)絡(luò)連接在內(nèi)核層面被抽象為一種特殊的文件類(lèi)型,并通過(guò)文件描述符(File Descriptor)進(jìn)行管理。socket() 系統(tǒng)調(diào)用是創(chuàng)建此類(lèi)“網(wǎng)絡(luò)文件”的入口點(diǎn),用于在網(wǎng)絡(luò)通信的參與方創(chuàng)建一個(gè)通信端點(diǎn)(Endpoint)。
socket() 系統(tǒng)調(diào)用詳解
socket() 的主要功能是在內(nèi)核中創(chuàng)建一個(gè)新的、未連接的套接字,并返回一個(gè)與之關(guān)聯(lián)的文件描述符,供用戶(hù)空間程序使用。其函數(shù)原型定義于 <sys/socket.h> 頭文件中:
#include <sys/types.h>
#include <sys/socket.h>
int socket(int domain, int type, int protocol);
參數(shù)說(shuō)明 :
- domain (地址族 Address Family - AF): 指定套接字使用的協(xié)議族。常用取值包括:
a.AF_INET: 用于 IPv4 網(wǎng)絡(luò)協(xié)議通信。本文主要采用此地址族。
b.AF_INET6: 用于 IPv6 網(wǎng)絡(luò)協(xié)議通信。
c.AF_UNIX (或 AF_LOCAL): 用于同一主機(jī)內(nèi)部的進(jìn)程間通信,依賴(lài)本地文件系統(tǒng)路徑而非網(wǎng)絡(luò)地址。
- type (套接字類(lèi)型 Socket Type): 定義套接字的通信語(yǔ)義。關(guān)鍵取值有:
- SOCK_STREAM: 提供面向連接、可靠的、基于字節(jié)流的傳輸服務(wù)。TCP 協(xié)議即為此類(lèi)型,保證數(shù)據(jù)傳輸?shù)捻樞蛐院涂煽啃裕ㄍㄟ^(guò)序列號(hào)、確認(rèn)、重傳等機(jī)制)。
- SOCK_DGRAM: 提供無(wú)連接、不可靠的數(shù)據(jù)報(bào)服務(wù)。UDP 協(xié)議屬此類(lèi),數(shù)據(jù)包傳輸可能發(fā)生丟失、重復(fù)或亂序。
- protocol (協(xié)議 Protocol): 通常設(shè)置為 0,表示由系統(tǒng)根據(jù)指定的 domain 和 type 自動(dòng)選擇默認(rèn)協(xié)議。例如,AF_INET 與 SOCK_STREAM 組合通常默認(rèn)選用 IPPROTO_TCP;AF_INET 與 SOCK_DGRAM 組合則默認(rèn)選用 IPPROTO_UDP。亦可顯式指定協(xié)議常量(如 IPPROTO_TCP)。
內(nèi)核操作與數(shù)據(jù)結(jié)構(gòu) :
當(dāng)應(yīng)用程序調(diào)用 socket() 時(shí),會(huì)觸發(fā)一次系統(tǒng)調(diào)用,進(jìn)入內(nèi)核態(tài)執(zhí)行:
- 資源分配 :內(nèi)核網(wǎng)絡(luò)協(xié)議棧為新的套接字分配必要的內(nèi)存資源,創(chuàng)建一個(gè)內(nèi)部的 struct socket 或類(lèi)似的核心數(shù)據(jù)結(jié)構(gòu)。此結(jié)構(gòu)包含了套接字的狀態(tài)信息(如初始狀態(tài)、類(lèi)型、協(xié)議族)、發(fā)送和接收緩沖區(qū)、指向特定協(xié)議(如 TCP、UDP)處理函數(shù)的指針、以及等待隊(duì)列等。
- 文件描述符關(guān)聯(lián) :內(nèi)核在當(dāng)前進(jìn)程的文件描述符表中找到一個(gè)未使用的條目,并將該條目指向一個(gè)代表該套接字的內(nèi)核文件對(duì)象(struct file)。這個(gè)文件描述符(一個(gè)非負(fù)整數(shù))是用戶(hù)空間程序操作該套接字的句柄。
- 返回 :系統(tǒng)調(diào)用返回新分配的文件描述符給應(yīng)用程序。若創(chuàng)建失敗(如資源不足、權(quán)限問(wèn)題),則返回 -1,并設(shè)置全局變量 errno 以指示具體錯(cuò)誤代碼。
返回值 :
- 成功:返回一個(gè)新的文件描述符(非負(fù)整數(shù))。
- 失?。悍祷?-1,并設(shè)置 errno。
示例:創(chuàng)建 IPv4 TCP 套接字
#include <stdio.h> // 標(biāo)準(zhǔn)輸入輸出
#include <stdlib.h> // 標(biāo)準(zhǔn)庫(kù)函數(shù),如 exit
#include <sys/socket.h> // 套接字核心函數(shù)和數(shù)據(jù)結(jié)構(gòu)
#include <errno.h> // 錯(cuò)誤碼 errno
#include <unistd.h> // close 函數(shù)
int main() {
int sockfd;
// 創(chuàng)建一個(gè)用于 IPv4 TCP 通信的套接字
// AF_INET: 指定使用 IPv4 協(xié)議族
// SOCK_STREAM: 指定使用面向連接的字節(jié)流服務(wù) (TCP)
// 0: 讓內(nèi)核自動(dòng)選擇合適的協(xié)議 (對(duì)于 AF_INET 和 SOCK_STREAM,通常是 IPPROTO_TCP)
sockfd = socket(AF_INET, SOCK_STREAM, 0);
// 檢查 socket() 調(diào)用是否成功
if (sockfd == -1) {
perror("socket 創(chuàng)建失敗"); // perror 會(huì)根據(jù)當(dāng)前的 errno 值打印錯(cuò)誤信息
exit(EXIT_FAILURE); // 異常退出程序
}
printf("套接字創(chuàng)建成功! 文件描述符: %d\n", sockfd);
// 在實(shí)際應(yīng)用中,套接字使用完畢后應(yīng)顯式關(guān)閉
// close(sockfd);
return 0;
}
編譯與執(zhí)行 :
gcc create_socket_example.c -o create_socket_example
./create_socket_example
套接字創(chuàng)建成功! 文件描述符: 4
輸出將提示套接字創(chuàng)建成功,并顯示其文件描述符。
sockaddr_in 結(jié)構(gòu)體與地址表示
僅創(chuàng)建套接字不足以進(jìn)行通信,服務(wù)器端需要將其綁定到具體的本地網(wǎng)絡(luò)地址(IP 地址和端口號(hào))。在 IPv4 環(huán)境下,此地址信息通過(guò) struct sockaddr_in 結(jié)構(gòu)體(定義于 <netinet/in.h>)來(lái)承載:
#include <netinet/in.h>
struct sockaddr_in {
sa_family_t sin_family; /* 地址族: 固定為 AF_INET */
in_port_t sin_port; /* 端口號(hào) (網(wǎng)絡(luò)字節(jié)序) */
struct in_addr sin_addr; /* IPv4 地址 (網(wǎng)絡(luò)字節(jié)序) */
// char sin_zero[8]; /* 填充字節(jié),通常不直接使用,應(yīng)清零 */
};
/* IPv4 地址結(jié)構(gòu) */
struct in_addr {
uint32_t s_addr; /* 32位IPv4地址 (網(wǎng)絡(luò)字節(jié)序) */
};
關(guān)鍵字段解析 :
- sin_family : 地址族,對(duì)于 IPv4,必須設(shè)置為 AF_INET,與 socket() 調(diào)用中的 domain 參數(shù)保持一致。
- sin_port : 端口號(hào)。 注意 :該字段必須存儲(chǔ)為 網(wǎng)絡(luò)字節(jié)序 (Network Byte Order,即大端序 Big-Endian)。應(yīng)用程序需使用 htons() (Host to Network Short) 函數(shù)將主機(jī)字節(jié)序(Host Byte Order)的端口號(hào)轉(zhuǎn)換為網(wǎng)絡(luò)字節(jié)序。例如,htons(8080)。
- sin_addr.s_addr : 32位 IPv4 地址。 注意 : 同樣必須是 網(wǎng)絡(luò)字節(jié)序 。
服務(wù)器端執(zhí)行 bind() 時(shí),若希望監(jiān)聽(tīng)本機(jī)所有可用的網(wǎng)絡(luò)接口,應(yīng)將此字段設(shè)置為 htonl(INADDR_ANY)。INADDR_ANY 是一個(gè)特殊常量(通常為 0),htonl() (Host to Network Long) 用于將其轉(zhuǎn)換為主機(jī)字節(jié)序到網(wǎng)絡(luò)字節(jié)序。
若需綁定到特定 IP 地址(如服務(wù)器僅監(jiān)聽(tīng)某塊網(wǎng)卡,或客戶(hù)端執(zhí)行 connect() 時(shí)指定目標(biāo)服務(wù)器 IP),可使用 inet_pton() (Presentation to Network) 函數(shù)將點(diǎn)分十進(jìn)制表示的 IP 地址字符串(如 "192.168.1.100")轉(zhuǎn)換為網(wǎng)絡(luò)字節(jié)序的 32 位整數(shù),并存入 s_addr。
字節(jié)序轉(zhuǎn)換函數(shù) (通常定義于 <arpa/inet.h>):
- htons(): 主機(jī)字節(jié)序到網(wǎng)絡(luò)字節(jié)序(16位,用于端口號(hào))。
- htonl(): 主機(jī)字節(jié)序到網(wǎng)絡(luò)字節(jié)序(32位,用于 IPv4 地址)。
- ntohs(): 網(wǎng)絡(luò)字節(jié)序到主機(jī)字節(jié)序(16位)。
- ntohl(): 網(wǎng)絡(luò)字節(jié)序到主機(jī)字節(jié)序(32位)。
- inet_pton(AF_INET, "ip_string", &addr_struct->sin_addr): 將點(diǎn)分十進(jìn)制 IP 字符串轉(zhuǎn)換為網(wǎng)絡(luò)字節(jié)序二進(jìn)制形式。
- inet_ntop(AF_INET, &addr_struct->sin_addr, buffer, buffer_size): 將網(wǎng)絡(luò)字節(jié)序二進(jìn)制 IP 地址轉(zhuǎn)換為點(diǎn)分十進(jìn)制字符串。
bind(), listen(), accept() - 服務(wù)器端核心調(diào)用
對(duì)于 TCP 服務(wù)器而言,創(chuàng)建套接字后,必須執(zhí)行一系列步驟來(lái)準(zhǔn)備接收客戶(hù)端連接:
- bind() : 將套接字與一個(gè)本地 IP 地址和端口號(hào)關(guān)聯(lián)起來(lái),定義服務(wù)的監(jiān)聽(tīng)地址。
- listen() : 將套接字設(shè)置為監(jiān)聽(tīng)模式,使其能夠接受外來(lái)的連接請(qǐng)求,并配置連接請(qǐng)求隊(duì)列。
- accept() : 從已完成三次握手的連接隊(duì)列中接受一個(gè)連接,并為此連接創(chuàng)建一個(gè)新的專(zhuān)用套接字。
1. bind() 系統(tǒng)調(diào)用
bind() 用于將 socket() 創(chuàng)建的套接字文件描述符 sockfd 與 my_addr 指定的本地地址(IP 和端口)進(jìn)行綁定。
#include <sys/socket.h>
int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
- sockfd: socket() 返回的文件描述符。
- addr: 指向包含待綁定地址信息的 sockaddr 結(jié)構(gòu)體的指針。對(duì)于 IPv4,實(shí)際傳遞的是已填充好的 struct sockaddr_in 結(jié)構(gòu)體的地址,需強(qiáng)制類(lèi)型轉(zhuǎn)換為 (struct sockaddr *)。
- addrlen: addr 指向的結(jié)構(gòu)體的大小,對(duì)于 struct sockaddr_in,通常為 sizeof(struct sockaddr_in)。
內(nèi)核操作 :
調(diào)用 bind() 進(jìn)入內(nèi)核態(tài)后:
- 地址復(fù)制與校驗(yàn) :內(nèi)核將用戶(hù)空間傳入的 sockaddr 結(jié)構(gòu)體復(fù)制到內(nèi)核內(nèi)存。
- 狀態(tài)檢查 :檢查 sockfd 對(duì)應(yīng)的套接字是否有效且未被綁定。
- 地址可用性檢查 :檢查指定的 IP 地址和端口號(hào)是否可用。對(duì)于端口號(hào),檢查是否已被其他套接字綁定(除非設(shè)置了 SO_REUSEADDR 等選項(xiàng));對(duì)于 IP 地址,檢查是否是分配給本機(jī)的有效地址(或 INADDR_ANY)。
- 權(quán)限檢查 :檢查進(jìn)程是否有權(quán)限綁定到指定端口(通常,綁定到 1024 以下的端口需要超級(jí)用戶(hù)權(quán)限)。
- 綁定操作 :如果所有檢查通過(guò),內(nèi)核將該地址信息與內(nèi)部的套接字結(jié)構(gòu)關(guān)聯(lián)起來(lái)。
返回值 :成功返回 0;失敗返回 -1,并設(shè)置 errno。常見(jiàn)錯(cuò)誤包括 EADDRINUSE(地址已在使用)、EACCES(權(quán)限不足)、EINVAL(sockfd 無(wú)效或已綁定)。
示例:綁定套接字到本地 8080 端口
#include <stdio.h>
#include <stdlib.h>
#include <string.h> // 用于 memset
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h> // 用于 htons, htonl
#include <unistd.h> // 用于 close
#define PORT 8080
int main() {
int server_fd;
struct sockaddr_in address;
int opt = 1; // 用于 setsockopt
// 創(chuàng)建套接字
if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket 創(chuàng)建失敗");
exit(EXIT_FAILURE);
}
// 可選: 設(shè)置套接字選項(xiàng),允許地址重用,便于服務(wù)器快速重啟
// SO_REUSEADDR 允許重用本地地址 (IP+端口),尤其是在 TIME_WAIT 狀態(tài)下的端口
// SO_REUSEPORT (需要內(nèi)核支持) 允許多個(gè)進(jìn)程綁定到同一 IP 和端口
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
perror("setsockopt 失敗");
close(server_fd);
exit(EXIT_FAILURE);
}
// 準(zhǔn)備 sockaddr_in 結(jié)構(gòu)體
memset(&address, 0, sizeof(address)); // 推薦清零結(jié)構(gòu)體
address.sin_family = AF_INET;
address.sin_addr.s_addr = htonl(INADDR_ANY); // 監(jiān)聽(tīng)所有網(wǎng)絡(luò)接口
address.sin_port = htons(PORT); // 監(jiān)聽(tīng)指定端口 (轉(zhuǎn)換為網(wǎng)絡(luò)字節(jié)序)
// 將套接字綁定到指定的地址和端口
if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("bind 失敗");
close(server_fd);
exit(EXIT_FAILURE);
}
printf("套接字成功綁定到端口 %d\n", PORT);
// ... 后續(xù)步驟: listen() 和 accept() ...
close(server_fd); // 完成后關(guān)閉監(jiān)聽(tīng)套接字
return 0;
}
2. listen() 系統(tǒng)調(diào)用
listen() 用于將一個(gè)已綁定的流式套接字(SOCK_STREAM 或 SOCK_SEQPACKET)轉(zhuǎn)換為被動(dòng)監(jiān)聽(tīng)狀態(tài),使其能夠接受傳入的連接請(qǐng)求。
#include <sys/socket.h>
int listen(int sockfd, int backlog);
- sockfd: 已成功 bind() 的套接字文件描述符。
- backlog: 指定內(nèi)核為此監(jiān)聽(tīng)套接字維護(hù)的 已完成連接隊(duì)列 (Completed Connection Queue, 或稱(chēng) Accept Queue)的最大長(zhǎng)度。
內(nèi)核操作與連接隊(duì)列 :
當(dāng)調(diào)用 listen() 時(shí):
- 狀態(tài)轉(zhuǎn)換 :內(nèi)核將 sockfd 對(duì)應(yīng)的內(nèi)部套接字結(jié)構(gòu)的狀態(tài)從 CLOSED(或 BOUND)修改為 LISTEN。
- 隊(duì)列初始化 :內(nèi)核為該監(jiān)聽(tīng)套接字關(guān)聯(lián)并初始化兩個(gè)重要的隊(duì)列:
a.未完成連接隊(duì)列 (Incomplete Connection Queue / SYN Queue) :存儲(chǔ)收到的 SYN 包,但尚未完成三次握手的連接請(qǐng)求(處于 SYN_RCVD 狀態(tài))。此隊(duì)列的大小通常由系統(tǒng)參數(shù)(如 net.ipv4.tcp_max_syn_backlog)控制,backlog 參數(shù)對(duì)其影響有限或間接。
b.已完成連接隊(duì)列 (Completed Connection Queue / Accept Queue) :存儲(chǔ)已經(jīng)完成 TCP 三次握手,等待被應(yīng)用程序通過(guò) accept() 提取的連接(這些連接在內(nèi)核中已是 ESTABLISHED 狀態(tài),但從服務(wù)器監(jiān)聽(tīng)角度看是在等待 accept)。backlog 參數(shù)主要限制的是這個(gè)隊(duì)列的大小。當(dāng)此隊(duì)列滿(mǎn)時(shí),內(nèi)核可能會(huì)拒絕新的已完成握手的連接(例如,不響應(yīng) ACK,或發(fā)送 RST)。
返回值 :成功返回 0;失敗返回 -1,并設(shè)置 errno。
示例(續(xù)上) :
// ... bind() 成功后 ...
// 開(kāi)始監(jiān)聽(tīng)傳入連接
// backlog 設(shè)置為 10,意味著最多允許 10 個(gè)已完成三次握手的連接在隊(duì)列中等待 accept()
if (listen(server_fd, 10) < 0) {
perror("listen 失敗");
close(server_fd);
exit(EXIT_FAILURE);
}
printf("服務(wù)器正在端口 %d 上監(jiān)聽(tīng)...\n", PORT);
// ... 下一步: accept() ...
3. accept() 系統(tǒng)調(diào)用
accept() 從監(jiān)聽(tīng)套接字 sockfd 的已完成連接隊(duì)列中取出一個(gè)連接請(qǐng)求,為該連接創(chuàng)建一個(gè) 新的 、已連接的套接字,并返回這個(gè)新套接字的文件描述符。
#include <sys/socket.h>
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
- sockfd: 處于監(jiān)聽(tīng)狀態(tài)的套接字文件描述符。
- addr: (可選)指向 sockaddr 結(jié)構(gòu)體的指針,用于接收發(fā)起連接的客戶(hù)端的地址信息。若不關(guān)心客戶(hù)端地址,可傳遞 NULL。
- addrlen: (可選)指向 socklen_t 類(lèi)型變量的指針。這是一個(gè) 值-結(jié)果 參數(shù):
調(diào)用前,*addrlen 必須初始化為 addr 指向的緩沖區(qū)的實(shí)際大小。
調(diào)用成功后,內(nèi)核會(huì)將客戶(hù)端地址結(jié)構(gòu)的實(shí)際大小寫(xiě)入 *addrlen。若傳入 addr 為 NULL,則 addrlen 也應(yīng)為 NULL。
內(nèi)核操作 :
- 檢查隊(duì)列 :內(nèi)核檢查與 sockfd 關(guān)聯(lián)的已完成連接隊(duì)列。
- 阻塞/非阻塞行為 :
如果隊(duì)列為空且 sockfd 是阻塞模式(默認(rèn)),accept() 調(diào)用將使進(jìn)程 睡眠 ,直到隊(duì)列中出現(xiàn)新的已完成連接。
如果隊(duì)列為空且 sockfd 是非阻塞模式,accept() 立即返回 -1,并將 errno 設(shè)置為 EAGAIN 或 EWOULDBLOCK。
- 提取連接 :如果隊(duì)列非空,內(nèi)核從中取出一個(gè)連接。
- 創(chuàng)建新套接字 :內(nèi)核為這個(gè)被接受的連接創(chuàng)建一個(gè) 全新的 內(nèi)部套接字結(jié)構(gòu)和對(duì)應(yīng)的文件對(duì)象。這個(gè)新套接字的狀態(tài)被設(shè)為 ESTABLISHED,并關(guān)聯(lián)了客戶(hù)端的地址信息。
- 返回新描述符 :內(nèi)核在進(jìn)程的文件描述符表中分配一個(gè)新的文件描述符,指向這個(gè)新創(chuàng)建的已連接套接字的文件對(duì)象,并將此描述符返回給應(yīng)用程序。
- 填充地址信息 :如果 addr 和 addrlen 參數(shù)有效,內(nèi)核將客戶(hù)端的地址信息復(fù)制到 addr 指向的緩沖區(qū),并更新 *addrlen。
關(guān)鍵點(diǎn) :accept() 返回的是一個(gè) 新的文件描述符 ,代表與特定客戶(hù)端的通信通道。后續(xù)與該客戶(hù)端的數(shù)據(jù)收發(fā)(send(), recv() 等)必須使用這個(gè) 新描述符 ,而非原來(lái)的監(jiān)聽(tīng)描述符 sockfd。監(jiān)聽(tīng)描述符 sockfd 保持不變,繼續(xù)用于接受后續(xù)的連接請(qǐng)求。
返回值 :
- 成功:返回一個(gè)新的、非負(fù)的已連接套接字文件描述符。
- 失?。悍祷?-1,并設(shè)置 errno。
示例(續(xù)上,接受單個(gè)連接) :
// ... listen() 成功后 ...
int new_socket;
struct sockaddr_in client_address;
socklen_t addrlen = sizeof(client_address); // 注意類(lèi)型是 socklen_t
char client_ip[INET_ADDRSTRLEN]; // 用于存儲(chǔ)客戶(hù)端 IP 字符串
printf("等待連接...\n");
// 接受一個(gè)傳入連接
// 默認(rèn)情況下,accept() 會(huì)阻塞,直到有客戶(hù)端連接進(jìn)來(lái)
if ((new_socket = accept(server_fd, (struct sockaddr *)&client_address, &addrlen)) < 0) {
perror("accept 失敗");
close(server_fd);
exit(EXIT_FAILURE);
}
// 將客戶(hù)端的 IP 地址從網(wǎng)絡(luò)字節(jié)序轉(zhuǎn)換為點(diǎn)分十進(jìn)制字符串以便打印
inet_ntop(AF_INET, &client_address.sin_addr, client_ip, INET_ADDRSTRLEN);
printf("接受來(lái)自 %s:%d 的連接\n", client_ip, ntohs(client_address.sin_port));
printf("用于通信的新套接字描述符: %d\n", new_socket);
// 現(xiàn)在可以使用 new_socket 與該客戶(hù)端進(jìn)行數(shù)據(jù)交換
// 例如: send(new_socket, "歡迎!\n", 7, 0);
// 例如: recv(new_socket, buffer, 1024, 0);
// 完成與該客戶(hù)端的通信后,關(guān)閉連接套接字
close(new_socket);
// 服務(wù)器最終關(guān)閉時(shí),關(guān)閉監(jiān)聽(tīng)套接字
close(server_fd);
上述示例僅能處理一次連接。為實(shí)現(xiàn)并發(fā)處理多個(gè)客戶(hù)端,需引入循環(huán)結(jié)構(gòu),并結(jié)合多進(jìn)程/多線(xiàn)程模型或 I/O 多路復(fù)用技術(shù)。對(duì)于需要高效處理大量并發(fā)連接的場(chǎng)景(如我們的 Pub/Sub 服務(wù)器),I/O 多路復(fù)用是更常用的方案。
select() - I/O 多路復(fù)用機(jī)制
服務(wù)器程序通常需要同時(shí)關(guān)注多個(gè)事件源:監(jiān)聽(tīng)套接字上的新連接請(qǐng)求,以及多個(gè)已連接客戶(hù)端套接字上的數(shù)據(jù)到達(dá)。若使用阻塞式的 accept() 和 recv(),程序執(zhí)行流會(huì)在單一調(diào)用點(diǎn)暫停,無(wú)法及時(shí)響應(yīng)其他事件。I/O 多路復(fù)用技術(shù)解決了這個(gè)問(wèn)題,它允許進(jìn)程同時(shí)監(jiān)視多個(gè)文件描述符,并在其中任何一個(gè)或多個(gè)描述符準(zhǔn)備好進(jìn)行 I/O 操作(可讀、可寫(xiě)或異常)時(shí)獲得通知。select() 是 POSIX 標(biāo)準(zhǔn)中定義的一種經(jīng)典 I/O 多路復(fù)用機(jī)制。
#include <sys/select.h>
#include <sys/time.h> // 對(duì)于 struct timeval
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
- nfds: 被監(jiān)聽(tīng)的文件描述符的數(shù)量,其值應(yīng)為所有被監(jiān)聽(tīng)文件描述符中的最大值 加 1 。例如,若監(jiān)聽(tīng) fd 3, 5, 8,則 nfds 應(yīng)為 9。
- readfds: 指向 fd_set 結(jié)構(gòu)體的指針,用于指定需要監(jiān)聽(tīng) 可讀 事件的文件描述符集合。對(duì)于監(jiān)聽(tīng)套接字,可讀意味著有新連接待 accept();對(duì)于已連接套接字,可讀意味著有數(shù)據(jù)到達(dá)、連接已關(guān)閉(收到 FIN)或發(fā)生錯(cuò)誤。
- writefds: 指向 fd_set 的指針,用于指定需要監(jiān)聽(tīng) 可寫(xiě) 事件的文件描述符集合。通常表示套接字的發(fā)送緩沖區(qū)有可用空間。
- exceptfds: 指向 fd_set 的指針,用于指定需要監(jiān)聽(tīng) 異常 條件的文件描述符集合(如 TCP 帶外數(shù)據(jù))。
- timeout: 指向 struct timeval 結(jié)構(gòu)體的指針,用于設(shè)定 select() 的最大等待時(shí)間。
struct timeval { time_t tv_sec; suseconds_t tv_usec; }; (秒和微秒)
若 timeout 為 NULL,select() 將無(wú)限期阻塞,直到至少有一個(gè)描述符就緒。
若 timeout 指向的結(jié)構(gòu)體中 tv_sec 和 tv_usec 均為 0,select() 執(zhí)行非阻塞檢查,立即返回。
若 timeout 指向的結(jié)構(gòu)體包含正值,select() 最多等待指定時(shí)間。超時(shí)前有描述符就緒則返回;超時(shí)則返回 0。
fd_set 相關(guān)宏定義 :
- FD_ZERO(fd_set *set): 清空(初始化)一個(gè) fd_set 集合。 每次調(diào)用 select() 前,對(duì)工作集合必須執(zhí)行此操作或從主集合復(fù)制 。
- FD_SET(int fd, fd_set *set): 將文件描述符 fd 添加到集合 set 中。
- FD_CLR(int fd, fd_set *set): 將文件描述符 fd 從集合 set 中移除。
- FD_ISSET(int fd, fd_set *set): 在 select() 返回后 ,用于檢查文件描述符 fd 是否仍在就緒集合 set 中。
重要特性 :select() 調(diào)用會(huì) 修改 傳入的 fd_set 集合(readfds, writefds, exceptfds),將其中未就緒的文件描述符移除。因此,應(yīng)用程序通常需要維護(hù)一個(gè) 主集合 (master set)記錄所有需要關(guān)心的文件描述符,在每次循環(huán)調(diào)用 select() 之前,將主集合的內(nèi)容 復(fù)制 到一個(gè) 工作集合 (working set),然后將工作集合傳遞給 select()。
返回值 :
- 成功:返回三個(gè)集合中總共就緒的文件描述符數(shù)量。
- 超時(shí):返回 0。
- 失?。悍祷?-1,并設(shè)置 errno。
使用 select() 的服務(wù)器模式 :
- 初始化監(jiān)聽(tīng)套接字 (socket, bind, listen)。
- 初始化主文件描述符集合 master_fds:FD_ZERO(&master_fds),然后 FD_SET(listener_fd, &master_fds)。
- 維護(hù)當(dāng)前最大文件描述符 max_fd,初始值為 listener_fd。
- 進(jìn)入主事件循環(huán): a. 創(chuàng)建臨時(shí)工作集合 read_fds,將其初始化為主集合:read_fds = master_fds。 b. 調(diào)用 select(max_fd + 1, &read_fds, NULL, NULL, NULL)(此處示例僅關(guān)心讀事件,阻塞等待)。 c. 檢查 select 返回值。若為 -1,處理錯(cuò)誤(需注意 EINTR 信號(hào)中斷)。 d. 遍歷從 0 到 max_fd 的所有文件描述符 i: i. 使用 FD_ISSET(i, &read_fds) 判斷描述符 i 是否在就緒的讀集合中。 ii. 若 i 是監(jiān)聽(tīng)套接字 (listener_fd) 且 FD_ISSET 為真: * 調(diào)用 accept() 接受新連接,得到 new_fd。 * 將 new_fd 添加到主集合 master_fds:FD_SET(new_fd, &master_fds)。 * 更新 max_fd:if (new_fd > max_fd) max_fd = new_fd;。 iii. 若 i 是已連接客戶(hù)端套接字且 FD_ISSET 為真: * 調(diào)用 recv(i, buffer, ...) 或 read(i, buffer, ...) 嘗試讀取數(shù)據(jù)。 * 處理連接狀態(tài) : * 返回值 > 0 : 成功讀取 valread 字節(jié)數(shù)據(jù)。處理接收到的數(shù)據(jù)(解析命令 SUB/PUB 等)。 * 返回值 0 : 表示對(duì)端執(zhí)行了有序關(guān)閉(發(fā)送了 FIN 包),所有數(shù)據(jù)已接收完畢。這是 TCP 半關(guān)閉 狀態(tài)的體現(xiàn)(對(duì)方關(guān)閉了寫(xiě),本方可以繼續(xù)寫(xiě),但通常也應(yīng)準(zhǔn)備關(guān)閉)。應(yīng)用程序應(yīng): * close(i) 關(guān)閉本端的套接字連接。 * 從 master_fds 中移除 i:FD_CLR(i, &master_fds)。 * 清理與該客戶(hù)端相關(guān)的應(yīng)用層資源(如訂閱信息)。 * 返回值 -1 : 發(fā)生錯(cuò)誤。檢查 errno: * 若 errno 為 EAGAIN 或 EWOULDBLOCK(非阻塞模式下),表示暫時(shí)無(wú)數(shù)據(jù)可讀,不是錯(cuò)誤。 * 若 errno 為 ECONNRESET,表示對(duì)端發(fā)送了 RST 包(連接異常中斷)。 * 其他錯(cuò)誤(如 ETIMEDOUT, ENOTCONN 等)。 * 對(duì)于實(shí)際錯(cuò)誤,同樣需要 close(i),從 master_fds 移除 i,并清理資源。
關(guān)于 shutdown() 和半關(guān)閉
recv/read 返回 0 表明對(duì)端關(guān)閉了其發(fā)送通道。如果本端應(yīng)用還想發(fā)送數(shù)據(jù),理論上可以(只要對(duì)端接收緩沖區(qū)未滿(mǎn)且未完全關(guān)閉連接),但這通常不符合應(yīng)用邏輯。更常見(jiàn)的是,接收到 0 后本端也應(yīng)關(guān)閉連接。
有時(shí)應(yīng)用可能需要主動(dòng)進(jìn)行 半關(guān)閉 ,即關(guān)閉自己的發(fā)送通道,但仍保持接收通道打開(kāi),以接收對(duì)端可能還未發(fā)送完的數(shù)據(jù)。這可以通過(guò) shutdown() 系統(tǒng)調(diào)用實(shí)現(xiàn):
#include <sys/socket.h>
int shutdown(int sockfd, int how);
- sockfd: 要操作的套接字。
- how: 指定關(guān)閉方式:
SHUT_RD: 關(guān)閉接收通道(之后不能再?gòu)拇颂捉幼纸邮諗?shù)據(jù))。
SHUT_WR: 關(guān)閉發(fā)送通道(之后不能再?gòu)拇颂捉幼职l(fā)送數(shù)據(jù))。這會(huì)向?qū)Χ税l(fā)送一個(gè) FIN 包。
SHUT_RDWR: 同時(shí)關(guān)閉接收和發(fā)送通道(等同于 close() 的一部分效果,但不釋放文件描述符)。
使用 shutdown(sockfd, SHUT_WR) 后,對(duì)端 recv 會(huì)在接收完所有已在途的數(shù)據(jù)后返回 0。而本端仍可調(diào)用 recv 接收數(shù)據(jù),直到對(duì)端也關(guān)閉其發(fā)送通道(發(fā)送 FIN)。
close() 調(diào)用則會(huì)同時(shí)關(guān)閉讀寫(xiě)兩個(gè)方向(如果引用計(jì)數(shù)為零,還會(huì)釋放文件描述符和相關(guān)內(nèi)核資源)。
在我們的簡(jiǎn)單 Pub/Sub 服務(wù)器中,我們直接使用 close() 處理連接終止,這隱含了雙向關(guān)閉。
極簡(jiǎn) Pub/Sub 服務(wù)器 C 代碼實(shí)現(xiàn)
以下是使用 select() 實(shí)現(xiàn)的極簡(jiǎn) Pub/Sub 服務(wù)器的 C 代碼,注釋已更新為中文。請(qǐng)注意,此實(shí)現(xiàn)為了教學(xué)目的保持簡(jiǎn)潔,省略了許多生產(chǎn)環(huán)境中必要的健壯性設(shè)計(jì)(如完善的錯(cuò)誤處理、動(dòng)態(tài)資源管理、優(yōu)化的 max_fd 更新策略等)。
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <errno.h>
#define PORT 8080 // 服務(wù)器監(jiān)聽(tīng)端口
#define MAX_CLIENTS 30 // 最大并發(fā)客戶(hù)端數(shù)量
#define MAX_TOPICS 10 // 最大主題數(shù)量
#define MAX_SUBS_PER_TOPIC 10 // 每個(gè)主題最大訂閱者數(shù)量
#define BUFFER_SIZE 1024 // 接收緩沖區(qū)大小
// 主題訂閱數(shù)據(jù)結(jié)構(gòu)
typedef struct {
char name[50]; // 主題名稱(chēng)
int subscribers[MAX_SUBS_PER_TOPIC]; // 訂閱該主題的客戶(hù)端套接字描述符數(shù)組
int sub_count; // 當(dāng)前訂閱者數(shù)量
} Topic;
Topic topics[MAX_TOPICS]; // 全局主題數(shù)組
int topic_count = 0; // 當(dāng)前主題數(shù)量
// 輔助函數(shù):查找或創(chuàng)建主題
int find_or_create_topic(const char* topic_name) {
// 查找現(xiàn)有主題
for (int i = 0; i < topic_count; ++i) {
if (strcmp(topics[i].name, topic_name) == 0) {
return i; // 找到,返回索引
}
}
// 如果未找到且還有空間,則創(chuàng)建新主題
if (topic_count < MAX_TOPICS) {
strncpy(topics[topic_count].name, topic_name, sizeof(topics[topic_count].name) - 1);
topics[topic_count].name[sizeof(topics[topic_count].name) - 1] = '\0'; // 確保空字符結(jié)尾
topics[topic_count].sub_count = 0; // 初始化訂閱者數(shù)量為0
return topic_count++; // 返回新主題的索引,并增加主題計(jì)數(shù)
}
return -1; // 主題數(shù)組已滿(mǎn),無(wú)法創(chuàng)建
}
// 輔助函數(shù):將客戶(hù)端添加到主題的訂閱列表
void add_subscriber(int topic_index, int client_socket) {
if (topic_index < 0 || topic_index >= topic_count) return; // 無(wú)效的主題索引
Topic* topic = &topics[topic_index];
if (topic->sub_count < MAX_SUBS_PER_TOPIC) {
// 檢查是否已訂閱,避免重復(fù)添加
for(int i = 0; i < topic->sub_count; ++i) {
if (topic->subscribers[i] == client_socket) return; // 已存在,直接返回
}
// 添加新的訂閱者
topic->subscribers[topic->sub_count++] = client_socket;
printf("客戶(hù)端 %d 訂閱了主題 '%s'\n", client_socket, topic->name);
} else {
// 主題訂閱已滿(mǎn)
printf("主題 '%s' 訂閱已滿(mǎn), 無(wú)法添加客戶(hù)端 %d\n", topic->name, client_socket);
// 可選: 向客戶(hù)端發(fā)送錯(cuò)誤消息
// send(client_socket, "ERR topic full\n", 15, 0);
}
}
// 輔助函數(shù):從所有主題中移除指定客戶(hù)端的訂閱
void remove_subscriber(int client_socket) {
printf("正在移除客戶(hù)端 %d 的所有訂閱。\n", client_socket);
for (int i = 0; i < topic_count; ++i) { // 遍歷所有主題
int found_idx = -1;
// 在當(dāng)前主題的訂閱者列表中查找該客戶(hù)端
for (int j = 0; j < topics[i].sub_count; ++j) {
if (topics[i].subscribers[j] == client_socket) {
found_idx = j;
break;
}
}
// 如果找到該客戶(hù)端
if (found_idx != -1) {
// 將后續(xù)訂閱者向前移動(dòng)一位,覆蓋掉要移除的客戶(hù)端
for (int k = found_idx; k < topics[i].sub_count - 1; ++k) {
topics[i].subscribers[k] = topics[i].subscribers[k + 1];
}
topics[i].sub_count--; // 減少訂閱者計(jì)數(shù)
printf("已將客戶(hù)端 %d 從主題 '%s' 中移除\n", client_socket, topics[i].name);
}
}
}
// 輔助函數(shù):向指定主題發(fā)布消息
void publish_message(int topic_index, const char* message, int publisher_socket) {
if (topic_index < 0 || topic_index >= topic_count) return; // 無(wú)效的主題索引
Topic* topic = &topics[topic_index];
char full_message[BUFFER_SIZE + 100]; // 預(yù)留足夠空間構(gòu)造 "MSG <topic> <data>\n" 格式的消息
// 構(gòu)造完整的消息格式
snprintf(full_message, sizeof(full_message), "MSG %s %s", topic->name, message);
// 確保消息以換行符結(jié)束 (如果原始消息沒(méi)有的話(huà))
if (full_message[strlen(full_message)-1] != '\n') {
strncat(full_message, "\n", sizeof(full_message) - strlen(full_message) - 1);
}
printf("向主題 '%s' 發(fā)布消息: %s", topic->name, message); // 假設(shè)原始 message 可能已包含換行符
// 遍歷該主題的所有訂閱者
for (int i = 0; i < topic->sub_count; ++i) {
int subscriber_socket = topic->subscribers[i];
// 不將消息發(fā)回給發(fā)布者自己
if (subscriber_socket != publisher_socket) {
// 發(fā)送消息給訂閱者
if (send(subscriber_socket, full_message, strlen(full_message), 0) < 0) {
// 發(fā)送失敗,可能需要處理連接斷開(kāi)等問(wèn)題
perror("向訂閱者發(fā)送消息失敗");
// 注意: 健壯的實(shí)現(xiàn)應(yīng)在此處檢測(cè)到錯(cuò)誤后,可能也需要關(guān)閉并清理該訂閱者的連接
} else {
printf(" -> 已發(fā)送至客戶(hù)端 %d\n", subscriber_socket);
}
}
}
}
int main() {
int listener_fd; // 監(jiān)聽(tīng)套接字描述符
int new_socket; // 新接受的客戶(hù)端連接套接字描述符
int client_sockets[MAX_CLIENTS]; // 存儲(chǔ)客戶(hù)端套接字描述符的數(shù)組
int max_clients = MAX_CLIENTS; // 最大客戶(hù)端數(shù) (冗余變量,可直接用宏)
int activity; // select() 的返回值
int i, valread, sd; // 循環(huán)變量, read() 返回值, 當(dāng)前處理的套接字描述符
int max_sd; // select() 需要的最大文件描述符值 + 1
struct sockaddr_in address; // 服務(wù)器地址結(jié)構(gòu)
char buffer[BUFFER_SIZE + 1]; // 數(shù)據(jù)接收緩沖區(qū) (+1 用于空字符結(jié)尾)
// 文件描述符集合
fd_set readfds;
// 初始化客戶(hù)端套接字?jǐn)?shù)組,全部置 0 (0 不是有效的文件描述符)
for (i = 0; i < max_clients; i++) {
client_sockets[i] = 0;
}
// 創(chuàng)建主監(jiān)聽(tīng)套接字
if ((listener_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
perror("socket 創(chuàng)建失敗");
exit(EXIT_FAILURE);
}
// 設(shè)置監(jiān)聽(tīng)套接字選項(xiàng),允許地址重用
int opt = 1;
if (setsockopt(listener_fd, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)) < 0) {
perror("setsockopt 失敗");
exit(EXIT_FAILURE);
}
// 配置服務(wù)器地址結(jié)構(gòu)
address.sin_family = AF_INET; // IPv4
address.sin_addr.s_addr = INADDR_ANY; // 監(jiān)聽(tīng)所有接口
address.sin_port = htons(PORT); // 指定端口(網(wǎng)絡(luò)字節(jié)序)
// 將監(jiān)聽(tīng)套接字綁定到指定地址和端口
if (bind(listener_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
perror("bind 失敗");
exit(EXIT_FAILURE);
}
printf("監(jiān)聽(tīng)器已啟動(dòng)在端口 %d \n", PORT);
// 使套接字進(jìn)入監(jiān)聽(tīng)狀態(tài),設(shè)定等待隊(duì)列長(zhǎng)度為 3
if (listen(listener_fd, 3) < 0) {
perror("listen 失敗");
exit(EXIT_FAILURE);
}
// 等待客戶(hù)端連接
socklen_t addrlen = sizeof(address);
puts("等待連接中 ...");
while (1) { // 服務(wù)器主循環(huán)
// 清空讀文件描述符集合
FD_ZERO(&readfds);
// 將監(jiān)聽(tīng)套接字加入集合
FD_SET(listener_fd, &readfds);
max_sd = listener_fd; // 初始化 max_sd
// 將所有活動(dòng)的客戶(hù)端套接字加入集合
for (i = 0; i < max_clients; i++) {
sd = client_sockets[i]; // 獲取客戶(hù)端套接字描述符
// 如果是有效的套接字描述符 (大于 0),則加入讀集合
if (sd > 0)
FD_SET(sd, &readfds);
// 更新 max_sd 以跟蹤最大的文件描述符值
if (sd > max_sd)
max_sd = sd;
}
// 調(diào)用 select() 等待活動(dòng)發(fā)生,timeout 設(shè)置為 NULL 表示無(wú)限期阻塞
activity = select(max_sd + 1, &readfds, NULL, NULL, NULL);
// 檢查 select() 是否出錯(cuò) (忽略 EINTR 信號(hào)中斷)
if ((activity < 0) && (errno != EINTR)) {
perror("select 錯(cuò)誤");
// 在實(shí)際應(yīng)用中可能需要更復(fù)雜的錯(cuò)誤處理邏輯
}
// 檢查監(jiān)聽(tīng)套接字是否就緒 (表示有新連接請(qǐng)求)
if (FD_ISSET(listener_fd, &readfds)) {
// 接受新連接
if ((new_socket = accept(listener_fd, (struct sockaddr *)&address, &addrlen)) < 0) {
perror("accept 失敗");
// 嚴(yán)重錯(cuò)誤,通常需要退出或重啟服務(wù)
exit(EXIT_FAILURE);
}
// 獲取并打印新客戶(hù)端的 IP 和端口信息
char client_ip[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &address.sin_addr, client_ip, INET_ADDRSTRLEN);
printf("新連接建立,套接字描述符為 %d,IP 為: %s,端口為: %d\n",
new_socket, client_ip, ntohs(address.sin_port));
// 將新套接字添加到客戶(hù)端數(shù)組中
for (i = 0; i < max_clients; i++) {
// 找到數(shù)組中第一個(gè)空位
if (client_sockets[i] == 0) {
client_sockets[i] = new_socket;
printf("已將套接字 %d 添加到列表位置 %d\n", new_socket, i);
break; // 添加成功后退出循環(huán)
}
}
// 如果客戶(hù)端數(shù)組已滿(mǎn)
if (i == max_clients) {
printf("客戶(hù)端連接數(shù)已達(dá)上限,拒絕連接。\n");
close(new_socket); // 關(guān)閉這個(gè)無(wú)法處理的連接
}
}
// 檢查各個(gè)客戶(hù)端套接字是否有活動(dòng) (數(shù)據(jù)到達(dá)或連接關(guān)閉)
for (i = 0; i < max_clients; i++) {
sd = client_sockets[i]; // 當(dāng)前要檢查的客戶(hù)端套接字
// 如果當(dāng)前客戶(hù)端套接字在就緒的讀集合中
if (FD_ISSET(sd, &readfds)) {
// 嘗試讀取數(shù)據(jù)
// read() 返回值: >0 表示讀取的字節(jié)數(shù), 0 表示連接已關(guān)閉 (收到FIN), <0 表示錯(cuò)誤
if ((valread = read(sd, buffer, BUFFER_SIZE)) <= 0) {
// 連接關(guān)閉或發(fā)生錯(cuò)誤
if (valread == 0) {
// 對(duì)端正常關(guān)閉連接 (FIN)
getpeername(sd, (struct sockaddr*)&address, &addrlen); // 獲取對(duì)端地址信息用于打印
char client_ip[INET_ADDRSTRLEN];
inet_ntop(AF_INET, &address.sin_addr, client_ip, INET_ADDRSTRLEN);
printf("主機(jī) %s:%d (套接字 %d) 斷開(kāi)連接\n", client_ip, ntohs(address.sin_port), sd);
} else {
// 讀取錯(cuò)誤
perror("read 錯(cuò)誤");
// 也可以在這里獲取對(duì)端信息打印
}
// 關(guān)閉套接字
close(sd);
// 從客戶(hù)端數(shù)組中移除 (標(biāo)記為 0)
client_sockets[i] = 0;
// 從所有主題訂閱中移除該客戶(hù)端
remove_subscriber(sd);
} else {
// 成功讀取到數(shù)據(jù),處理客戶(hù)端發(fā)送的命令
buffer[valread] = '\0'; // 添加空字符結(jié)尾,確保字符串處理安全
// 簡(jiǎn)單的命令解析 (生產(chǎn)環(huán)境需要更健壯的解析器)
char command[5]; // 存儲(chǔ)命令 (SUB/PUB)
char topic_name[50]; // 存儲(chǔ)主題名
char message[BUFFER_SIZE]; // 存儲(chǔ)消息內(nèi)容 (PUB 命令)
memset(message, 0, sizeof(message)); // 清空消息緩沖區(qū)
// 使用 sscanf 解析輸入,格式為 "CMD TOPIC [MESSAGE]"
// %4s: 讀取最多4個(gè)字符作為命令
// %49s: 讀取最多49個(gè)字符作為主題名 (防止溢出)
// %[^\n]: 讀取從主題名后到行尾的所有字符作為消息 (包括空格)
int parsed_items = sscanf(buffer, "%4s %49s %[^\n]", command, topic_name, message);
printf("收到來(lái)自客戶(hù)端 %d 的數(shù)據(jù): %s", sd, buffer); // buffer 可能自帶換行符
if (parsed_items >= 2 && strcmp(command, "SUB") == 0) {
// 處理 SUB 命令
int topic_idx = find_or_create_topic(topic_name);
if (topic_idx != -1) {
add_subscriber(topic_idx, sd);
// 可選: 向客戶(hù)端發(fā)送訂閱成功確認(rèn)
// send(sd, "SUB OK\n", 7, 0);
} else {
// 可選: 發(fā)送錯(cuò)誤(如主題數(shù)達(dá)到上限)
// send(sd, "ERR max topics\n", 15, 0);
}
} else if (parsed_items >= 3 && strcmp(command, "PUB") == 0) {
// 處理 PUB 命令
int topic_idx = find_or_create_topic(topic_name); // 查找主題,如果不存在,簡(jiǎn)單實(shí)現(xiàn)可以忽略或報(bào)錯(cuò)
if (topic_idx != -1) {
// 確保消息格式包含換行符,便于客戶(hù)端接收處理
// 注意: 這里假設(shè) message 已包含從 sscanf 讀取的內(nèi)容
// 這里對(duì) message 的處理可以省略,因?yàn)?publish_message 內(nèi)部會(huì)確保換行符
publish_message(topic_idx, message, sd);
} else {
printf("未找到用于發(fā)布的主題 '%s'.\n", topic_name);
// 可選: 發(fā)送錯(cuò)誤(主題不存在)
// send(sd, "ERR no such topic\n", 18, 0);
}
} else {
// 無(wú)效命令格式
printf("來(lái)自客戶(hù)端 %d 的無(wú)效命令: %s", sd, buffer);
// 可選: 發(fā)送錯(cuò)誤
// send(sd, "ERR invalid command\n", 20, 0);
}
// 清理緩沖區(qū)以便下次讀取 (可選,因?yàn)?read 會(huì)覆蓋)
memset(buffer, 0, BUFFER_SIZE + 1);
}
}
}
}
// 理論上,服務(wù)器的無(wú)限循環(huán)不會(huì)到達(dá)這里,但在正常關(guān)閉流程中應(yīng)關(guān)閉監(jiān)聽(tīng)套接字
close(listener_fd);
return 0;
}
編譯
gcc pubsub_server_revised.c -o pubsub_server_revised
運(yùn)行
./pubsub_server_revised
(base) ? back-end-notes git:(master) ? nc localhost 8080
^C
(base) ? back-end-notes git:(master) ? nc localhost 8080
SUB TOPIC-1
MSG TOPIC-1 1,242.42,214\n
(base) ? back-end-notes git:(master) ? nc localhost 8080
PUB TOPIC-1 1,242.42,214\n
piperliu@go-x86:~/code/socket$ ./pubsub_server_revised
監(jiān)聽(tīng)器已啟動(dòng)在端口 8080
等待連接中 ...
新連接建立,套接字描述符為 5,IP 為: 127.0.0.1,端口為: 38910
已將套接字 5 添加到列表位置 0
主機(jī) 127.0.0.1:38910 (套接字 5) 斷開(kāi)連接
正在移除客戶(hù)端 5 的所有訂閱。
新連接建立,套接字描述符為 5,IP 為: 127.0.0.1,端口為: 38924
已將套接字 5 添加到列表位置 0
收到來(lái)自客戶(hù)端 5 的數(shù)據(jù): SUB TOPIC-1
客戶(hù)端 5 訂閱了主題 'TOPIC-1'
新連接建立,套接字描述符為 6,IP 為: 127.0.0.1,端口為: 58630
已將套接字 6 添加到列表位置 1
收到來(lái)自客戶(hù)端 6 的數(shù)據(jù): PUB TOPIC-1 1,242.42,214\n
向主題 'TOPIC-1' 發(fā)布消息: 1,242.42,214\n -> 已發(fā)送至客戶(hù)端 5
訂閱與發(fā)布流程的內(nèi)核視角梳理
結(jié)合內(nèi)核操作,我們重新梳理客戶(hù)端訂閱和發(fā)布時(shí)服務(wù)器端的處理流程:
- 服務(wù)器初始化與監(jiān)聽(tīng)
- socket():內(nèi)核創(chuàng)建 socket 結(jié)構(gòu),分配資源,返回文件描述符 listener_fd。
- bind():內(nèi)核校驗(yàn)地址、端口可用性及權(quán)限,將本地地址信息關(guān)聯(lián)到 listener_fd 的 socket 結(jié)構(gòu)。
- listen():內(nèi)核將 listener_fd 對(duì)應(yīng)的 socket 結(jié)構(gòu)狀態(tài)置為 LISTEN,并初始化 SYN 隊(duì)列和 Accept 隊(duì)列。
- 客戶(hù)端連接建立
- 客戶(hù)端發(fā)起 TCP 連接請(qǐng)求(發(fā)送 SYN 包)。
- 服務(wù)器內(nèi)核網(wǎng)絡(luò)協(xié)議棧收到 SYN,創(chuàng)建半連接條目放入 SYN 隊(duì)列,回復(fù) SYN-ACK。
- 客戶(hù)端回復(fù) ACK,完成三次握手。內(nèi)核將對(duì)應(yīng)的連接從 SYN 隊(duì)列移至 Accept 隊(duì)列。
- 服務(wù)器進(jìn)程調(diào)用 select(),select 檢測(cè)到 listener_fd 可讀(因?yàn)?Accept 隊(duì)列非空)。
- FD_ISSET(listener_fd, ...) 為真。
- 服務(wù)器進(jìn)程調(diào)用 accept()。內(nèi)核從 Accept 隊(duì)列取出一個(gè)連接,創(chuàng)建新的 socket 結(jié)構(gòu)和文件描述符 new_socket(狀態(tài)為 ESTABLISHED),返回給服務(wù)器進(jìn)程。
- 客戶(hù)端訂閱 (SUB)
- 客戶(hù)端通過(guò) new_socket 發(fā)送 "SUB ..." 數(shù)據(jù)。
- 數(shù)據(jù)到達(dá)服務(wù)器,內(nèi)核將其放入 new_socket 的接收緩沖區(qū)。
- 服務(wù)器進(jìn)程調(diào)用 select(),select 檢測(cè)到 new_socket 可讀(接收緩沖區(qū)有數(shù)據(jù))。
- FD_ISSET(new_socket, ...) 為真。
- 服務(wù)器進(jìn)程調(diào)用 read(new_socket, ...),內(nèi)核從接收緩沖區(qū)復(fù)制數(shù)據(jù)到用戶(hù)空間 buffer。
- 服務(wù)器進(jìn)程解析 buffer,識(shí)別訂閱請(qǐng)求,更新應(yīng)用層數(shù)據(jù)結(jié)構(gòu)(topics 數(shù)組)。
- 客戶(hù)端發(fā)布 (PUB)
- 客戶(hù)端通過(guò) new_socket 發(fā)送 "PUB ..." 數(shù)據(jù)。
- 數(shù)據(jù)到達(dá)服務(wù)器,內(nèi)核處理同上,select 報(bào)告 new_socket 可讀。
- 服務(wù)器進(jìn)程調(diào)用 read() 獲取數(shù)據(jù)并解析。
- 服務(wù)器進(jìn)程查找主題,遍歷訂閱者列表。
- 對(duì)于每個(gè)訂閱者 sub_socket(且 sub_socket != new_socket):
服務(wù)器進(jìn)程調(diào)用 send(sub_socket, ...)。內(nèi)核將數(shù)據(jù)復(fù)制到 sub_socket 的發(fā)送緩沖區(qū)。
內(nèi)核網(wǎng)絡(luò)協(xié)議棧負(fù)責(zé)將發(fā)送緩沖區(qū)的數(shù)據(jù)打包成 TCP 段并發(fā)送出去。
- 客戶(hù)端斷開(kāi)連接 (有序關(guān)閉)
- 客戶(hù)端調(diào)用 close() 或 shutdown(SHUT_WR),其內(nèi)核發(fā)送 FIN 包。
- 服務(wù)器內(nèi)核收到 FIN,將 new_socket 標(biāo)記為收到 FIN,并向客戶(hù)端回復(fù) ACK。連接進(jìn)入 CLOSE_WAIT 狀態(tài)。
- 服務(wù)器進(jìn)程調(diào)用 select(),select 檢測(cè)到 new_socket 可讀(因?yàn)槭盏?FIN 也是可讀事件)。
- FD_ISSET(new_socket, ...) 為真。
- 服務(wù)器進(jìn)程調(diào)用 read(new_socket, ...),read 返回 0。
- 服務(wù)器進(jìn)程識(shí)別出連接關(guān)閉,調(diào)用 close(new_socket)。內(nèi)核發(fā)送 FIN 包給客戶(hù)端(如果尚未發(fā)送),釋放套接字資源(當(dāng)引用計(jì)數(shù)為0時(shí)),從 master_fds 移除 new_socket,清理應(yīng)用層資源。