自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Linux 網(wǎng)絡(luò)編程:從 Socket API 到極簡(jiǎn) Redis 發(fā)布/訂閱 sub/pub 服務(wù)的實(shí)現(xiàn)

系統(tǒng) Linux
網(wǎng)絡(luò)編程的本質(zhì)是實(shí)現(xiàn)進(jìn)程間通信(Inter-Process Communication, IPC),特別是跨越主機(jī)邊界的分布式通信。在 Linux 及其他類(lèi) Unix 操作系統(tǒng)中,Socket(套接字)接口是實(shí)現(xiàn)網(wǎng)絡(luò)通信的標(biāo)準(zhǔn)范式。

引言

本文旨在系統(tǒng)性地闡述 Linux 環(huán)境下的網(wǎng)絡(luò)編程基礎(chǔ),重點(diǎn)關(guān)注 Socket 應(yīng)用程序接口(API)的原理與應(yīng)用。通過(guò)循序漸進(jìn)的方式,結(jié)合具體的 C 語(yǔ)言代碼示例,我們將剖析核心系統(tǒng)調(diào)用的機(jī)制,并最終構(gòu)建一個(gè)基于傳輸控制協(xié)議(TCP)的簡(jiǎn)化版發(fā)布/訂閱(Publish/Subscribe, Pub/Sub)服務(wù)器模型。

在進(jìn)行深入探討之前,本文假設(shè)讀者已具備以下先驗(yàn)知識(shí):

  1. C 語(yǔ)言編程能力 :熟悉 C 語(yǔ)言的核心語(yǔ)法、指針操作及內(nèi)存管理機(jī)制。
  2. Linux 操作系統(tǒng)基礎(chǔ) :了解 Linux 基本命令行操作及 C 程序的編譯流程(例如,使用 GCC 工具鏈)。
  3. 計(jì)算機(jī)網(wǎng)絡(luò)基礎(chǔ) :對(duì) OSI 參考模型或 TCP/IP 協(xié)議棧有概念性認(rèn)識(shí),理解 TCP 與 UDP 的核心差異(面向連接與無(wú)連接、可靠性保證機(jī)制等),并掌握 IP 地址和端口號(hào)在網(wǎng)絡(luò)通信中的作用。

網(wǎng)絡(luò)編程的本質(zhì)是實(shí)現(xiàn)進(jìn)程間通信(Inter-Process Communication, IPC),特別是跨越主機(jī)邊界的分布式通信。在 Linux 及其他類(lèi) Unix 操作系統(tǒng)中,Socket(套接字)接口是實(shí)現(xiàn)網(wǎng)絡(luò)通信的標(biāo)準(zhǔn)范式。它提供了一套抽象的 API,允許應(yīng)用程序?qū)⒕W(wǎng)絡(luò)通信視為一種特殊的文件 I/O 操作,從而簡(jiǎn)化了網(wǎng)絡(luò)數(shù)據(jù)收發(fā)的復(fù)雜性。

本文的實(shí)踐目標(biāo)是構(gòu)建一個(gè)功能類(lèi)似于 Redis 服務(wù)器 PUBLISH 和 SUBSCRIBE 命令的簡(jiǎn)化服務(wù)實(shí)例:

  • 服務(wù)器進(jìn)程監(jiān)聽(tīng)一個(gè)預(yù)定義的 TCP 端口。
  • 允許多個(gè)客戶(hù)端并發(fā)連接至服務(wù)器。
  • 客戶(hù)端通過(guò)發(fā)送特定格式的命令(如 SUB <topic_name>)訂閱感興趣的主題。
  • 客戶(hù)端通過(guò)發(fā)送特定格式的命令(如 PUB <topic_name> <message_data>)向指定主題發(fā)布消息。
  • 服務(wù)器負(fù)責(zé)將發(fā)布的消息轉(zhuǎn)發(fā)給所有訂閱了對(duì)應(yīng)主題的客戶(hù)端(通常不包括發(fā)布者自身)。

接下來(lái),我們將逐步解析相關(guān)的系統(tǒng)調(diào)用和編程技術(shù)。

Linux 上的 Socket API

在 Linux 操作系統(tǒng)的設(shè)計(jì)哲學(xué)中,“一切皆文件”是一個(gè)核心概念。網(wǎng)絡(luò)連接在內(nèi)核層面被抽象為一種特殊的文件類(lèi)型,并通過(guò)文件描述符(File Descriptor)進(jìn)行管理。socket() 系統(tǒng)調(diào)用是創(chuàng)建此類(lèi)“網(wǎng)絡(luò)文件”的入口點(diǎn),用于在網(wǎng)絡(luò)通信的參與方創(chuàng)建一個(gè)通信端點(diǎn)(Endpoint)。

socket() 系統(tǒng)調(diào)用詳解

socket() 的主要功能是在內(nèi)核中創(chuàng)建一個(gè)新的、未連接的套接字,并返回一個(gè)與之關(guān)聯(lián)的文件描述符,供用戶(hù)空間程序使用。其函數(shù)原型定義于 <sys/socket.h> 頭文件中:

#include <sys/types.h>
#include <sys/socket.h>

int socket(int domain, int type, int protocol);

參數(shù)說(shuō)明 :

  • domain (地址族 Address Family - AF): 指定套接字使用的協(xié)議族。常用取值包括:

a.AF_INET: 用于 IPv4 網(wǎng)絡(luò)協(xié)議通信。本文主要采用此地址族。

b.AF_INET6: 用于 IPv6 網(wǎng)絡(luò)協(xié)議通信。

c.AF_UNIX (或 AF_LOCAL): 用于同一主機(jī)內(nèi)部的進(jìn)程間通信,依賴(lài)本地文件系統(tǒng)路徑而非網(wǎng)絡(luò)地址。

  • type (套接字類(lèi)型 Socket Type): 定義套接字的通信語(yǔ)義。關(guān)鍵取值有:
  • SOCK_STREAM: 提供面向連接、可靠的、基于字節(jié)流的傳輸服務(wù)。TCP 協(xié)議即為此類(lèi)型,保證數(shù)據(jù)傳輸?shù)捻樞蛐院涂煽啃裕ㄍㄟ^(guò)序列號(hào)、確認(rèn)、重傳等機(jī)制)。
  • SOCK_DGRAM: 提供無(wú)連接、不可靠的數(shù)據(jù)報(bào)服務(wù)。UDP 協(xié)議屬此類(lèi),數(shù)據(jù)包傳輸可能發(fā)生丟失、重復(fù)或亂序。
  • protocol (協(xié)議 Protocol): 通常設(shè)置為 0,表示由系統(tǒng)根據(jù)指定的 domain 和 type 自動(dòng)選擇默認(rèn)協(xié)議。例如,AF_INET 與 SOCK_STREAM 組合通常默認(rèn)選用 IPPROTO_TCP;AF_INET 與 SOCK_DGRAM 組合則默認(rèn)選用 IPPROTO_UDP。亦可顯式指定協(xié)議常量(如 IPPROTO_TCP)。

內(nèi)核操作與數(shù)據(jù)結(jié)構(gòu) :

當(dāng)應(yīng)用程序調(diào)用 socket() 時(shí),會(huì)觸發(fā)一次系統(tǒng)調(diào)用,進(jìn)入內(nèi)核態(tài)執(zhí)行:

  1. 資源分配 :內(nèi)核網(wǎng)絡(luò)協(xié)議棧為新的套接字分配必要的內(nèi)存資源,創(chuàng)建一個(gè)內(nèi)部的 struct socket 或類(lèi)似的核心數(shù)據(jù)結(jié)構(gòu)。此結(jié)構(gòu)包含了套接字的狀態(tài)信息(如初始狀態(tài)、類(lèi)型、協(xié)議族)、發(fā)送和接收緩沖區(qū)、指向特定協(xié)議(如 TCP、UDP)處理函數(shù)的指針、以及等待隊(duì)列等。
  2. 文件描述符關(guān)聯(lián) :內(nèi)核在當(dāng)前進(jìn)程的文件描述符表中找到一個(gè)未使用的條目,并將該條目指向一個(gè)代表該套接字的內(nèi)核文件對(duì)象(struct file)。這個(gè)文件描述符(一個(gè)非負(fù)整數(shù))是用戶(hù)空間程序操作該套接字的句柄。
  3. 返回 :系統(tǒng)調(diào)用返回新分配的文件描述符給應(yīng)用程序。若創(chuàng)建失敗(如資源不足、權(quán)限問(wèn)題),則返回 -1,并設(shè)置全局變量 errno 以指示具體錯(cuò)誤代碼。

返回值 :

  • 成功:返回一個(gè)新的文件描述符(非負(fù)整數(shù))。
  • 失?。悍祷?-1,并設(shè)置 errno。

示例:創(chuàng)建 IPv4 TCP 套接字

#include <stdio.h>      // 標(biāo)準(zhǔn)輸入輸出
#include <stdlib.h>     // 標(biāo)準(zhǔn)庫(kù)函數(shù),如 exit
#include <sys/socket.h> // 套接字核心函數(shù)和數(shù)據(jù)結(jié)構(gòu)
#include <errno.h>      // 錯(cuò)誤碼 errno
#include <unistd.h>     // close 函數(shù)

int main() {
    int sockfd;

    // 創(chuàng)建一個(gè)用于 IPv4 TCP 通信的套接字
    // AF_INET: 指定使用 IPv4 協(xié)議族
    // SOCK_STREAM: 指定使用面向連接的字節(jié)流服務(wù) (TCP)
    // 0: 讓內(nèi)核自動(dòng)選擇合適的協(xié)議 (對(duì)于 AF_INET 和 SOCK_STREAM,通常是 IPPROTO_TCP)
    sockfd = socket(AF_INET, SOCK_STREAM, 0);

    // 檢查 socket() 調(diào)用是否成功
    if (sockfd == -1) {
        perror("socket 創(chuàng)建失敗"); // perror 會(huì)根據(jù)當(dāng)前的 errno 值打印錯(cuò)誤信息
        exit(EXIT_FAILURE);       // 異常退出程序
    }

    printf("套接字創(chuàng)建成功! 文件描述符: %d\n", sockfd);

    // 在實(shí)際應(yīng)用中,套接字使用完畢后應(yīng)顯式關(guān)閉
    // close(sockfd);

    return 0;
}

編譯與執(zhí)行 :

gcc create_socket_example.c -o create_socket_example
./create_socket_example
套接字創(chuàng)建成功! 文件描述符: 4

輸出將提示套接字創(chuàng)建成功,并顯示其文件描述符。

sockaddr_in 結(jié)構(gòu)體與地址表示

僅創(chuàng)建套接字不足以進(jìn)行通信,服務(wù)器端需要將其綁定到具體的本地網(wǎng)絡(luò)地址(IP 地址和端口號(hào))。在 IPv4 環(huán)境下,此地址信息通過(guò) struct sockaddr_in 結(jié)構(gòu)體(定義于 <netinet/in.h>)來(lái)承載:

#include <netinet/in.h>

struct sockaddr_in {
    sa_family_t    sin_family; /* 地址族: 固定為 AF_INET */
    in_port_t      sin_port;   /* 端口號(hào) (網(wǎng)絡(luò)字節(jié)序) */
    struct in_addr sin_addr;   /* IPv4 地址 (網(wǎng)絡(luò)字節(jié)序) */
    // char           sin_zero[8]; /* 填充字節(jié),通常不直接使用,應(yīng)清零 */
};

/* IPv4 地址結(jié)構(gòu) */
struct in_addr {
    uint32_t       s_addr;     /* 32位IPv4地址 (網(wǎng)絡(luò)字節(jié)序) */
};

關(guān)鍵字段解析 :

  • sin_family : 地址族,對(duì)于 IPv4,必須設(shè)置為 AF_INET,與 socket() 調(diào)用中的 domain 參數(shù)保持一致。
  • sin_port : 端口號(hào)。 注意 :該字段必須存儲(chǔ)為 網(wǎng)絡(luò)字節(jié)序 (Network Byte Order,即大端序 Big-Endian)。應(yīng)用程序需使用 htons() (Host to Network Short) 函數(shù)將主機(jī)字節(jié)序(Host Byte Order)的端口號(hào)轉(zhuǎn)換為網(wǎng)絡(luò)字節(jié)序。例如,htons(8080)。
  • sin_addr.s_addr : 32位 IPv4 地址。 注意 : 同樣必須是 網(wǎng)絡(luò)字節(jié)序 。

服務(wù)器端執(zhí)行 bind() 時(shí),若希望監(jiān)聽(tīng)本機(jī)所有可用的網(wǎng)絡(luò)接口,應(yīng)將此字段設(shè)置為 htonl(INADDR_ANY)。INADDR_ANY 是一個(gè)特殊常量(通常為 0),htonl() (Host to Network Long) 用于將其轉(zhuǎn)換為主機(jī)字節(jié)序到網(wǎng)絡(luò)字節(jié)序。

若需綁定到特定 IP 地址(如服務(wù)器僅監(jiān)聽(tīng)某塊網(wǎng)卡,或客戶(hù)端執(zhí)行 connect() 時(shí)指定目標(biāo)服務(wù)器 IP),可使用 inet_pton() (Presentation to Network) 函數(shù)將點(diǎn)分十進(jìn)制表示的 IP 地址字符串(如 "192.168.1.100")轉(zhuǎn)換為網(wǎng)絡(luò)字節(jié)序的 32 位整數(shù),并存入 s_addr。

字節(jié)序轉(zhuǎn)換函數(shù) (通常定義于 <arpa/inet.h>):

  • htons(): 主機(jī)字節(jié)序到網(wǎng)絡(luò)字節(jié)序(16位,用于端口號(hào))。
  • htonl(): 主機(jī)字節(jié)序到網(wǎng)絡(luò)字節(jié)序(32位,用于 IPv4 地址)。
  • ntohs(): 網(wǎng)絡(luò)字節(jié)序到主機(jī)字節(jié)序(16位)。
  • ntohl(): 網(wǎng)絡(luò)字節(jié)序到主機(jī)字節(jié)序(32位)。
  • inet_pton(AF_INET, "ip_string", &addr_struct->sin_addr): 將點(diǎn)分十進(jìn)制 IP 字符串轉(zhuǎn)換為網(wǎng)絡(luò)字節(jié)序二進(jìn)制形式。
  • inet_ntop(AF_INET, &addr_struct->sin_addr, buffer, buffer_size): 將網(wǎng)絡(luò)字節(jié)序二進(jìn)制 IP 地址轉(zhuǎn)換為點(diǎn)分十進(jìn)制字符串。

bind(), listen(), accept() - 服務(wù)器端核心調(diào)用

對(duì)于 TCP 服務(wù)器而言,創(chuàng)建套接字后,必須執(zhí)行一系列步驟來(lái)準(zhǔn)備接收客戶(hù)端連接:

  1. bind() : 將套接字與一個(gè)本地 IP 地址和端口號(hào)關(guān)聯(lián)起來(lái),定義服務(wù)的監(jiān)聽(tīng)地址。
  2. listen() : 將套接字設(shè)置為監(jiān)聽(tīng)模式,使其能夠接受外來(lái)的連接請(qǐng)求,并配置連接請(qǐng)求隊(duì)列。
  3. accept() : 從已完成三次握手的連接隊(duì)列中接受一個(gè)連接,并為此連接創(chuàng)建一個(gè)新的專(zhuān)用套接字。

1. bind() 系統(tǒng)調(diào)用

bind() 用于將 socket() 創(chuàng)建的套接字文件描述符 sockfd 與 my_addr 指定的本地地址(IP 和端口)進(jìn)行綁定。

#include <sys/socket.h>

int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
  • sockfd: socket() 返回的文件描述符。
  • addr: 指向包含待綁定地址信息的 sockaddr 結(jié)構(gòu)體的指針。對(duì)于 IPv4,實(shí)際傳遞的是已填充好的 struct sockaddr_in 結(jié)構(gòu)體的地址,需強(qiáng)制類(lèi)型轉(zhuǎn)換為 (struct sockaddr *)。
  • addrlen: addr 指向的結(jié)構(gòu)體的大小,對(duì)于 struct sockaddr_in,通常為 sizeof(struct sockaddr_in)。

內(nèi)核操作 :

調(diào)用 bind() 進(jìn)入內(nèi)核態(tài)后:

  1. 地址復(fù)制與校驗(yàn) :內(nèi)核將用戶(hù)空間傳入的 sockaddr 結(jié)構(gòu)體復(fù)制到內(nèi)核內(nèi)存。
  2. 狀態(tài)檢查 :檢查 sockfd 對(duì)應(yīng)的套接字是否有效且未被綁定。
  3. 地址可用性檢查 :檢查指定的 IP 地址和端口號(hào)是否可用。對(duì)于端口號(hào),檢查是否已被其他套接字綁定(除非設(shè)置了 SO_REUSEADDR 等選項(xiàng));對(duì)于 IP 地址,檢查是否是分配給本機(jī)的有效地址(或 INADDR_ANY)。
  4. 權(quán)限檢查 :檢查進(jìn)程是否有權(quán)限綁定到指定端口(通常,綁定到 1024 以下的端口需要超級(jí)用戶(hù)權(quán)限)。
  5. 綁定操作 :如果所有檢查通過(guò),內(nèi)核將該地址信息與內(nèi)部的套接字結(jié)構(gòu)關(guān)聯(lián)起來(lái)。

返回值 :成功返回 0;失敗返回 -1,并設(shè)置 errno。常見(jiàn)錯(cuò)誤包括 EADDRINUSE(地址已在使用)、EACCES(權(quán)限不足)、EINVAL(sockfd 無(wú)效或已綁定)。

示例:綁定套接字到本地 8080 端口

#include <stdio.h>
#include <stdlib.h>
#include <string.h>       // 用于 memset
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>    // 用于 htons, htonl
#include <unistd.h>       // 用于 close

#define PORT 8080

int main() {
    int server_fd;
    struct sockaddr_in address;
    int opt = 1; // 用于 setsockopt

    // 創(chuàng)建套接字
    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket 創(chuàng)建失敗");
        exit(EXIT_FAILURE);
    }

    // 可選: 設(shè)置套接字選項(xiàng),允許地址重用,便于服務(wù)器快速重啟
    // SO_REUSEADDR 允許重用本地地址 (IP+端口),尤其是在 TIME_WAIT 狀態(tài)下的端口
    // SO_REUSEPORT (需要內(nèi)核支持) 允許多個(gè)進(jìn)程綁定到同一 IP 和端口
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt 失敗");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    // 準(zhǔn)備 sockaddr_in 結(jié)構(gòu)體
    memset(&address, 0, sizeof(address)); // 推薦清零結(jié)構(gòu)體
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = htonl(INADDR_ANY); // 監(jiān)聽(tīng)所有網(wǎng)絡(luò)接口
    address.sin_port = htons(PORT);              // 監(jiān)聽(tīng)指定端口 (轉(zhuǎn)換為網(wǎng)絡(luò)字節(jié)序)

    // 將套接字綁定到指定的地址和端口
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind 失敗");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    printf("套接字成功綁定到端口 %d\n", PORT);

    // ... 后續(xù)步驟: listen() 和 accept() ...

    close(server_fd); // 完成后關(guān)閉監(jiān)聽(tīng)套接字

    return 0;
}

2. listen() 系統(tǒng)調(diào)用

listen() 用于將一個(gè)已綁定的流式套接字(SOCK_STREAM 或 SOCK_SEQPACKET)轉(zhuǎn)換為被動(dòng)監(jiān)聽(tīng)狀態(tài),使其能夠接受傳入的連接請(qǐng)求。

#include <sys/socket.h>

int listen(int sockfd, int backlog);
  • sockfd: 已成功 bind() 的套接字文件描述符。
  • backlog: 指定內(nèi)核為此監(jiān)聽(tīng)套接字維護(hù)的 已完成連接隊(duì)列 (Completed Connection Queue, 或稱(chēng) Accept Queue)的最大長(zhǎng)度。

內(nèi)核操作與連接隊(duì)列 :

當(dāng)調(diào)用 listen() 時(shí):

  • 狀態(tài)轉(zhuǎn)換 :內(nèi)核將 sockfd 對(duì)應(yīng)的內(nèi)部套接字結(jié)構(gòu)的狀態(tài)從 CLOSED(或 BOUND)修改為 LISTEN。
  • 隊(duì)列初始化 :內(nèi)核為該監(jiān)聽(tīng)套接字關(guān)聯(lián)并初始化兩個(gè)重要的隊(duì)列:

a.未完成連接隊(duì)列 (Incomplete Connection Queue / SYN Queue) :存儲(chǔ)收到的 SYN 包,但尚未完成三次握手的連接請(qǐng)求(處于 SYN_RCVD 狀態(tài))。此隊(duì)列的大小通常由系統(tǒng)參數(shù)(如 net.ipv4.tcp_max_syn_backlog)控制,backlog 參數(shù)對(duì)其影響有限或間接。

b.已完成連接隊(duì)列 (Completed Connection Queue / Accept Queue) :存儲(chǔ)已經(jīng)完成 TCP 三次握手,等待被應(yīng)用程序通過(guò) accept() 提取的連接(這些連接在內(nèi)核中已是 ESTABLISHED 狀態(tài),但從服務(wù)器監(jiān)聽(tīng)角度看是在等待 accept)。backlog 參數(shù)主要限制的是這個(gè)隊(duì)列的大小。當(dāng)此隊(duì)列滿(mǎn)時(shí),內(nèi)核可能會(huì)拒絕新的已完成握手的連接(例如,不響應(yīng) ACK,或發(fā)送 RST)。

返回值 :成功返回 0;失敗返回 -1,并設(shè)置 errno。

示例(續(xù)上) :

// ... bind() 成功后 ...

    // 開(kāi)始監(jiān)聽(tīng)傳入連接
    // backlog 設(shè)置為 10,意味著最多允許 10 個(gè)已完成三次握手的連接在隊(duì)列中等待 accept()
    if (listen(server_fd, 10) < 0) {
        perror("listen 失敗");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    printf("服務(wù)器正在端口 %d 上監(jiān)聽(tīng)...\n", PORT);

    // ... 下一步: accept() ...

3. accept() 系統(tǒng)調(diào)用

accept() 從監(jiān)聽(tīng)套接字 sockfd 的已完成連接隊(duì)列中取出一個(gè)連接請(qǐng)求,為該連接創(chuàng)建一個(gè) 新的 、已連接的套接字,并返回這個(gè)新套接字的文件描述符。

#include <sys/socket.h>

int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
  • sockfd: 處于監(jiān)聽(tīng)狀態(tài)的套接字文件描述符。
  • addr: (可選)指向 sockaddr 結(jié)構(gòu)體的指針,用于接收發(fā)起連接的客戶(hù)端的地址信息。若不關(guān)心客戶(hù)端地址,可傳遞 NULL。
  • addrlen: (可選)指向 socklen_t 類(lèi)型變量的指針。這是一個(gè) 值-結(jié)果 參數(shù):

調(diào)用前,*addrlen 必須初始化為 addr 指向的緩沖區(qū)的實(shí)際大小。

調(diào)用成功后,內(nèi)核會(huì)將客戶(hù)端地址結(jié)構(gòu)的實(shí)際大小寫(xiě)入 *addrlen。若傳入 addr 為 NULL,則 addrlen 也應(yīng)為 NULL。

內(nèi)核操作 :

  • 檢查隊(duì)列 :內(nèi)核檢查與 sockfd 關(guān)聯(lián)的已完成連接隊(duì)列。
  • 阻塞/非阻塞行為 :

如果隊(duì)列為空且 sockfd 是阻塞模式(默認(rèn)),accept() 調(diào)用將使進(jìn)程 睡眠 ,直到隊(duì)列中出現(xiàn)新的已完成連接。

如果隊(duì)列為空且 sockfd 是非阻塞模式,accept() 立即返回 -1,并將 errno 設(shè)置為 EAGAIN 或 EWOULDBLOCK。

  1. 提取連接 :如果隊(duì)列非空,內(nèi)核從中取出一個(gè)連接。
  • 創(chuàng)建新套接字 :內(nèi)核為這個(gè)被接受的連接創(chuàng)建一個(gè) 全新的 內(nèi)部套接字結(jié)構(gòu)和對(duì)應(yīng)的文件對(duì)象。這個(gè)新套接字的狀態(tài)被設(shè)為 ESTABLISHED,并關(guān)聯(lián)了客戶(hù)端的地址信息。
  • 返回新描述符 :內(nèi)核在進(jìn)程的文件描述符表中分配一個(gè)新的文件描述符,指向這個(gè)新創(chuàng)建的已連接套接字的文件對(duì)象,并將此描述符返回給應(yīng)用程序。
  • 填充地址信息 :如果 addr 和 addrlen 參數(shù)有效,內(nèi)核將客戶(hù)端的地址信息復(fù)制到 addr 指向的緩沖區(qū),并更新 *addrlen。

關(guān)鍵點(diǎn) :accept() 返回的是一個(gè) 新的文件描述符 ,代表與特定客戶(hù)端的通信通道。后續(xù)與該客戶(hù)端的數(shù)據(jù)收發(fā)(send(), recv() 等)必須使用這個(gè) 新描述符 ,而非原來(lái)的監(jiān)聽(tīng)描述符 sockfd。監(jiān)聽(tīng)描述符 sockfd 保持不變,繼續(xù)用于接受后續(xù)的連接請(qǐng)求。

返回值 :

  • 成功:返回一個(gè)新的、非負(fù)的已連接套接字文件描述符。
  • 失?。悍祷?-1,并設(shè)置 errno。

示例(續(xù)上,接受單個(gè)連接) :

// ... listen() 成功后 ...

    int new_socket;
    struct sockaddr_in client_address;
    socklen_t addrlen = sizeof(client_address); // 注意類(lèi)型是 socklen_t
    char client_ip[INET_ADDRSTRLEN]; // 用于存儲(chǔ)客戶(hù)端 IP 字符串

    printf("等待連接...\n");

    // 接受一個(gè)傳入連接
    // 默認(rèn)情況下,accept() 會(huì)阻塞,直到有客戶(hù)端連接進(jìn)來(lái)
    if ((new_socket = accept(server_fd, (struct sockaddr *)&client_address, &addrlen)) < 0) {
        perror("accept 失敗");
        close(server_fd);
        exit(EXIT_FAILURE);
    }

    // 將客戶(hù)端的 IP 地址從網(wǎng)絡(luò)字節(jié)序轉(zhuǎn)換為點(diǎn)分十進(jìn)制字符串以便打印
    inet_ntop(AF_INET, &client_address.sin_addr, client_ip, INET_ADDRSTRLEN);
    printf("接受來(lái)自 %s:%d 的連接\n", client_ip, ntohs(client_address.sin_port));
    printf("用于通信的新套接字描述符: %d\n", new_socket);

    // 現(xiàn)在可以使用 new_socket 與該客戶(hù)端進(jìn)行數(shù)據(jù)交換
    // 例如: send(new_socket, "歡迎!\n", 7, 0);
    // 例如: recv(new_socket, buffer, 1024, 0);

    // 完成與該客戶(hù)端的通信后,關(guān)閉連接套接字
    close(new_socket);

    // 服務(wù)器最終關(guān)閉時(shí),關(guān)閉監(jiān)聽(tīng)套接字
    close(server_fd);

上述示例僅能處理一次連接。為實(shí)現(xiàn)并發(fā)處理多個(gè)客戶(hù)端,需引入循環(huán)結(jié)構(gòu),并結(jié)合多進(jìn)程/多線(xiàn)程模型或 I/O 多路復(fù)用技術(shù)。對(duì)于需要高效處理大量并發(fā)連接的場(chǎng)景(如我們的 Pub/Sub 服務(wù)器),I/O 多路復(fù)用是更常用的方案。

select() - I/O 多路復(fù)用機(jī)制

服務(wù)器程序通常需要同時(shí)關(guān)注多個(gè)事件源:監(jiān)聽(tīng)套接字上的新連接請(qǐng)求,以及多個(gè)已連接客戶(hù)端套接字上的數(shù)據(jù)到達(dá)。若使用阻塞式的 accept() 和 recv(),程序執(zhí)行流會(huì)在單一調(diào)用點(diǎn)暫停,無(wú)法及時(shí)響應(yīng)其他事件。I/O 多路復(fù)用技術(shù)解決了這個(gè)問(wèn)題,它允許進(jìn)程同時(shí)監(jiān)視多個(gè)文件描述符,并在其中任何一個(gè)或多個(gè)描述符準(zhǔn)備好進(jìn)行 I/O 操作(可讀、可寫(xiě)或異常)時(shí)獲得通知。select() 是 POSIX 標(biāo)準(zhǔn)中定義的一種經(jīng)典 I/O 多路復(fù)用機(jī)制。

#include <sys/select.h>
#include <sys/time.h> // 對(duì)于 struct timeval

int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);
  • nfds: 被監(jiān)聽(tīng)的文件描述符的數(shù)量,其值應(yīng)為所有被監(jiān)聽(tīng)文件描述符中的最大值 加 1 。例如,若監(jiān)聽(tīng) fd 3, 5, 8,則 nfds 應(yīng)為 9。
  • readfds: 指向 fd_set 結(jié)構(gòu)體的指針,用于指定需要監(jiān)聽(tīng) 可讀 事件的文件描述符集合。對(duì)于監(jiān)聽(tīng)套接字,可讀意味著有新連接待 accept();對(duì)于已連接套接字,可讀意味著有數(shù)據(jù)到達(dá)、連接已關(guān)閉(收到 FIN)或發(fā)生錯(cuò)誤。
  • writefds: 指向 fd_set 的指針,用于指定需要監(jiān)聽(tīng) 可寫(xiě) 事件的文件描述符集合。通常表示套接字的發(fā)送緩沖區(qū)有可用空間。
  • exceptfds: 指向 fd_set 的指針,用于指定需要監(jiān)聽(tīng) 異常 條件的文件描述符集合(如 TCP 帶外數(shù)據(jù))。
  • timeout: 指向 struct timeval 結(jié)構(gòu)體的指針,用于設(shè)定 select() 的最大等待時(shí)間。

struct timeval { time_t tv_sec; suseconds_t tv_usec; }; (秒和微秒)

若 timeout 為 NULL,select() 將無(wú)限期阻塞,直到至少有一個(gè)描述符就緒。

若 timeout 指向的結(jié)構(gòu)體中 tv_sec 和 tv_usec 均為 0,select() 執(zhí)行非阻塞檢查,立即返回。

若 timeout 指向的結(jié)構(gòu)體包含正值,select() 最多等待指定時(shí)間。超時(shí)前有描述符就緒則返回;超時(shí)則返回 0。

fd_set 相關(guān)宏定義 :

  • FD_ZERO(fd_set *set): 清空(初始化)一個(gè) fd_set 集合。 每次調(diào)用 select() 前,對(duì)工作集合必須執(zhí)行此操作或從主集合復(fù)制 。
  • FD_SET(int fd, fd_set *set): 將文件描述符 fd 添加到集合 set 中。
  • FD_CLR(int fd, fd_set *set): 將文件描述符 fd 從集合 set 中移除。
  • FD_ISSET(int fd, fd_set *set):  在 select() 返回后 ,用于檢查文件描述符 fd 是否仍在就緒集合 set 中。

重要特性 :select() 調(diào)用會(huì) 修改 傳入的 fd_set 集合(readfds, writefds, exceptfds),將其中未就緒的文件描述符移除。因此,應(yīng)用程序通常需要維護(hù)一個(gè) 主集合 (master set)記錄所有需要關(guān)心的文件描述符,在每次循環(huán)調(diào)用 select() 之前,將主集合的內(nèi)容 復(fù)制 到一個(gè) 工作集合 (working set),然后將工作集合傳遞給 select()。

返回值 :

  • 成功:返回三個(gè)集合中總共就緒的文件描述符數(shù)量。
  • 超時(shí):返回 0。
  • 失?。悍祷?-1,并設(shè)置 errno。

使用 select() 的服務(wù)器模式 :

  • 初始化監(jiān)聽(tīng)套接字 (socket, bind, listen)。
  • 初始化主文件描述符集合 master_fds:FD_ZERO(&master_fds),然后 FD_SET(listener_fd, &master_fds)。
  • 維護(hù)當(dāng)前最大文件描述符 max_fd,初始值為 listener_fd。
  • 進(jìn)入主事件循環(huán): a.  創(chuàng)建臨時(shí)工作集合 read_fds,將其初始化為主集合:read_fds = master_fds。 b.  調(diào)用 select(max_fd + 1, &read_fds, NULL, NULL, NULL)(此處示例僅關(guān)心讀事件,阻塞等待)。 c.  檢查 select 返回值。若為 -1,處理錯(cuò)誤(需注意 EINTR 信號(hào)中斷)。 d.  遍歷從 0 到 max_fd 的所有文件描述符 i: i.  使用 FD_ISSET(i, &read_fds) 判斷描述符 i 是否在就緒的讀集合中。 ii. 若 i 是監(jiān)聽(tīng)套接字 (listener_fd) 且 FD_ISSET 為真: * 調(diào)用 accept() 接受新連接,得到 new_fd。 * 將 new_fd 添加到主集合 master_fds:FD_SET(new_fd, &master_fds)。 * 更新 max_fd:if (new_fd > max_fd) max_fd = new_fd;。 iii. 若 i 是已連接客戶(hù)端套接字且 FD_ISSET 為真: * 調(diào)用 recv(i, buffer, ...) 或 read(i, buffer, ...) 嘗試讀取數(shù)據(jù)。 * 處理連接狀態(tài) : * 返回值 > 0 : 成功讀取 valread 字節(jié)數(shù)據(jù)。處理接收到的數(shù)據(jù)(解析命令 SUB/PUB 等)。 * 返回值 0 : 表示對(duì)端執(zhí)行了有序關(guān)閉(發(fā)送了 FIN 包),所有數(shù)據(jù)已接收完畢。這是 TCP 半關(guān)閉 狀態(tài)的體現(xiàn)(對(duì)方關(guān)閉了寫(xiě),本方可以繼續(xù)寫(xiě),但通常也應(yīng)準(zhǔn)備關(guān)閉)。應(yīng)用程序應(yīng): * close(i) 關(guān)閉本端的套接字連接。 * 從 master_fds 中移除 i:FD_CLR(i, &master_fds)。 * 清理與該客戶(hù)端相關(guān)的應(yīng)用層資源(如訂閱信息)。 * 返回值 -1 : 發(fā)生錯(cuò)誤。檢查 errno: * 若 errno 為 EAGAIN 或 EWOULDBLOCK(非阻塞模式下),表示暫時(shí)無(wú)數(shù)據(jù)可讀,不是錯(cuò)誤。 * 若 errno 為 ECONNRESET,表示對(duì)端發(fā)送了 RST 包(連接異常中斷)。 * 其他錯(cuò)誤(如 ETIMEDOUT, ENOTCONN 等)。 * 對(duì)于實(shí)際錯(cuò)誤,同樣需要 close(i),從 master_fds 移除 i,并清理資源。

關(guān)于 shutdown() 和半關(guān)閉

recv/read 返回 0 表明對(duì)端關(guān)閉了其發(fā)送通道。如果本端應(yīng)用還想發(fā)送數(shù)據(jù),理論上可以(只要對(duì)端接收緩沖區(qū)未滿(mǎn)且未完全關(guān)閉連接),但這通常不符合應(yīng)用邏輯。更常見(jiàn)的是,接收到 0 后本端也應(yīng)關(guān)閉連接。

有時(shí)應(yīng)用可能需要主動(dòng)進(jìn)行 半關(guān)閉 ,即關(guān)閉自己的發(fā)送通道,但仍保持接收通道打開(kāi),以接收對(duì)端可能還未發(fā)送完的數(shù)據(jù)。這可以通過(guò) shutdown() 系統(tǒng)調(diào)用實(shí)現(xiàn):

#include <sys/socket.h>

int shutdown(int sockfd, int how);
  • sockfd: 要操作的套接字。
  • how: 指定關(guān)閉方式:

SHUT_RD: 關(guān)閉接收通道(之后不能再?gòu)拇颂捉幼纸邮諗?shù)據(jù))。

SHUT_WR: 關(guān)閉發(fā)送通道(之后不能再?gòu)拇颂捉幼职l(fā)送數(shù)據(jù))。這會(huì)向?qū)Χ税l(fā)送一個(gè) FIN 包。

SHUT_RDWR: 同時(shí)關(guān)閉接收和發(fā)送通道(等同于 close() 的一部分效果,但不釋放文件描述符)。

使用 shutdown(sockfd, SHUT_WR) 后,對(duì)端 recv 會(huì)在接收完所有已在途的數(shù)據(jù)后返回 0。而本端仍可調(diào)用 recv 接收數(shù)據(jù),直到對(duì)端也關(guān)閉其發(fā)送通道(發(fā)送 FIN)。

close() 調(diào)用則會(huì)同時(shí)關(guān)閉讀寫(xiě)兩個(gè)方向(如果引用計(jì)數(shù)為零,還會(huì)釋放文件描述符和相關(guān)內(nèi)核資源)。

在我們的簡(jiǎn)單 Pub/Sub 服務(wù)器中,我們直接使用 close() 處理連接終止,這隱含了雙向關(guān)閉。

極簡(jiǎn) Pub/Sub 服務(wù)器 C 代碼實(shí)現(xiàn)

以下是使用 select() 實(shí)現(xiàn)的極簡(jiǎn) Pub/Sub 服務(wù)器的 C 代碼,注釋已更新為中文。請(qǐng)注意,此實(shí)現(xiàn)為了教學(xué)目的保持簡(jiǎn)潔,省略了許多生產(chǎn)環(huán)境中必要的健壯性設(shè)計(jì)(如完善的錯(cuò)誤處理、動(dòng)態(tài)資源管理、優(yōu)化的 max_fd 更新策略等)。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/select.h>
#include <errno.h>

#define PORT 8080               // 服務(wù)器監(jiān)聽(tīng)端口
#define MAX_CLIENTS 30          // 最大并發(fā)客戶(hù)端數(shù)量
#define MAX_TOPICS 10           // 最大主題數(shù)量
#define MAX_SUBS_PER_TOPIC 10   // 每個(gè)主題最大訂閱者數(shù)量
#define BUFFER_SIZE 1024        // 接收緩沖區(qū)大小

// 主題訂閱數(shù)據(jù)結(jié)構(gòu)
typedef struct {
    char name[50];                       // 主題名稱(chēng)
    int subscribers[MAX_SUBS_PER_TOPIC]; // 訂閱該主題的客戶(hù)端套接字描述符數(shù)組
    int sub_count;                       // 當(dāng)前訂閱者數(shù)量
} Topic;

Topic topics[MAX_TOPICS]; // 全局主題數(shù)組
int topic_count = 0;      // 當(dāng)前主題數(shù)量

// 輔助函數(shù):查找或創(chuàng)建主題
int find_or_create_topic(const char* topic_name) {
    // 查找現(xiàn)有主題
    for (int i = 0; i < topic_count; ++i) {
        if (strcmp(topics[i].name, topic_name) == 0) {
            return i; // 找到,返回索引
        }
    }
    // 如果未找到且還有空間,則創(chuàng)建新主題
    if (topic_count < MAX_TOPICS) {
        strncpy(topics[topic_count].name, topic_name, sizeof(topics[topic_count].name) - 1);
        topics[topic_count].name[sizeof(topics[topic_count].name) - 1] = '\0'; // 確保空字符結(jié)尾
        topics[topic_count].sub_count = 0; // 初始化訂閱者數(shù)量為0
        return topic_count++; // 返回新主題的索引,并增加主題計(jì)數(shù)
    }
    return -1; // 主題數(shù)組已滿(mǎn),無(wú)法創(chuàng)建
}

// 輔助函數(shù):將客戶(hù)端添加到主題的訂閱列表
void add_subscriber(int topic_index, int client_socket) {
    if (topic_index < 0 || topic_index >= topic_count) return; // 無(wú)效的主題索引
    Topic* topic = &topics[topic_index];
    if (topic->sub_count < MAX_SUBS_PER_TOPIC) {
        // 檢查是否已訂閱,避免重復(fù)添加
        for(int i = 0; i < topic->sub_count; ++i) {
            if (topic->subscribers[i] == client_socket) return; // 已存在,直接返回
        }
        // 添加新的訂閱者
        topic->subscribers[topic->sub_count++] = client_socket;
        printf("客戶(hù)端 %d 訂閱了主題 '%s'\n", client_socket, topic->name);
    } else {
        // 主題訂閱已滿(mǎn)
        printf("主題 '%s' 訂閱已滿(mǎn), 無(wú)法添加客戶(hù)端 %d\n", topic->name, client_socket);
        // 可選: 向客戶(hù)端發(fā)送錯(cuò)誤消息
        // send(client_socket, "ERR topic full\n", 15, 0);
    }
}

// 輔助函數(shù):從所有主題中移除指定客戶(hù)端的訂閱
void remove_subscriber(int client_socket) {
    printf("正在移除客戶(hù)端 %d 的所有訂閱。\n", client_socket);
    for (int i = 0; i < topic_count; ++i) { // 遍歷所有主題
        int found_idx = -1;
        // 在當(dāng)前主題的訂閱者列表中查找該客戶(hù)端
        for (int j = 0; j < topics[i].sub_count; ++j) {
            if (topics[i].subscribers[j] == client_socket) {
                found_idx = j;
                break;
            }
        }
        // 如果找到該客戶(hù)端
        if (found_idx != -1) {
            // 將后續(xù)訂閱者向前移動(dòng)一位,覆蓋掉要移除的客戶(hù)端
            for (int k = found_idx; k < topics[i].sub_count - 1; ++k) {
                topics[i].subscribers[k] = topics[i].subscribers[k + 1];
            }
            topics[i].sub_count--; // 減少訂閱者計(jì)數(shù)
            printf("已將客戶(hù)端 %d 從主題 '%s' 中移除\n", client_socket, topics[i].name);
        }
    }
}

// 輔助函數(shù):向指定主題發(fā)布消息
void publish_message(int topic_index, const char* message, int publisher_socket) {
    if (topic_index < 0 || topic_index >= topic_count) return; // 無(wú)效的主題索引
    Topic* topic = &topics[topic_index];
    char full_message[BUFFER_SIZE + 100]; // 預(yù)留足夠空間構(gòu)造 "MSG <topic> <data>\n" 格式的消息

    // 構(gòu)造完整的消息格式
    snprintf(full_message, sizeof(full_message), "MSG %s %s", topic->name, message);
     // 確保消息以換行符結(jié)束 (如果原始消息沒(méi)有的話(huà))
    if (full_message[strlen(full_message)-1] != '\n') {
         strncat(full_message, "\n", sizeof(full_message) - strlen(full_message) - 1);
    }


    printf("向主題 '%s' 發(fā)布消息: %s", topic->name, message); // 假設(shè)原始 message 可能已包含換行符
    // 遍歷該主題的所有訂閱者
    for (int i = 0; i < topic->sub_count; ++i) {
        int subscriber_socket = topic->subscribers[i];
        // 不將消息發(fā)回給發(fā)布者自己
        if (subscriber_socket != publisher_socket) {
            // 發(fā)送消息給訂閱者
            if (send(subscriber_socket, full_message, strlen(full_message), 0) < 0) {
                // 發(fā)送失敗,可能需要處理連接斷開(kāi)等問(wèn)題
                perror("向訂閱者發(fā)送消息失敗");
                // 注意: 健壯的實(shí)現(xiàn)應(yīng)在此處檢測(cè)到錯(cuò)誤后,可能也需要關(guān)閉并清理該訂閱者的連接
            } else {
                 printf("  -> 已發(fā)送至客戶(hù)端 %d\n", subscriber_socket);
            }
        }
    }
}


int main() {
    int listener_fd;                   // 監(jiān)聽(tīng)套接字描述符
    int new_socket;                    // 新接受的客戶(hù)端連接套接字描述符
    int client_sockets[MAX_CLIENTS];   // 存儲(chǔ)客戶(hù)端套接字描述符的數(shù)組
    int max_clients = MAX_CLIENTS;     // 最大客戶(hù)端數(shù) (冗余變量,可直接用宏)
    int activity;                      // select() 的返回值
    int i, valread, sd;                // 循環(huán)變量, read() 返回值, 當(dāng)前處理的套接字描述符
    int max_sd;                        // select() 需要的最大文件描述符值 + 1
    struct sockaddr_in address;        // 服務(wù)器地址結(jié)構(gòu)
    char buffer[BUFFER_SIZE + 1];      // 數(shù)據(jù)接收緩沖區(qū) (+1 用于空字符結(jié)尾)

    // 文件描述符集合
    fd_set readfds;

    // 初始化客戶(hù)端套接字?jǐn)?shù)組,全部置 0 (0 不是有效的文件描述符)
    for (i = 0; i < max_clients; i++) {
        client_sockets[i] = 0;
    }

    // 創(chuàng)建主監(jiān)聽(tīng)套接字
    if ((listener_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket 創(chuàng)建失敗");
        exit(EXIT_FAILURE);
    }

    // 設(shè)置監(jiān)聽(tīng)套接字選項(xiàng),允許地址重用
    int opt = 1;
    if (setsockopt(listener_fd, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)) < 0) {
        perror("setsockopt 失敗");
        exit(EXIT_FAILURE);
    }

    // 配置服務(wù)器地址結(jié)構(gòu)
    address.sin_family = AF_INET;         // IPv4
    address.sin_addr.s_addr = INADDR_ANY; // 監(jiān)聽(tīng)所有接口
    address.sin_port = htons(PORT);       // 指定端口(網(wǎng)絡(luò)字節(jié)序)

    // 將監(jiān)聽(tīng)套接字綁定到指定地址和端口
    if (bind(listener_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind 失敗");
        exit(EXIT_FAILURE);
    }
    printf("監(jiān)聽(tīng)器已啟動(dòng)在端口 %d \n", PORT);

    // 使套接字進(jìn)入監(jiān)聽(tīng)狀態(tài),設(shè)定等待隊(duì)列長(zhǎng)度為 3
    if (listen(listener_fd, 3) < 0) {
        perror("listen 失敗");
        exit(EXIT_FAILURE);
    }

    // 等待客戶(hù)端連接
    socklen_t addrlen = sizeof(address);
    puts("等待連接中 ...");

    while (1) { // 服務(wù)器主循環(huán)
        // 清空讀文件描述符集合
        FD_ZERO(&readfds);

        // 將監(jiān)聽(tīng)套接字加入集合
        FD_SET(listener_fd, &readfds);
        max_sd = listener_fd; // 初始化 max_sd

        // 將所有活動(dòng)的客戶(hù)端套接字加入集合
        for (i = 0; i < max_clients; i++) {
            sd = client_sockets[i]; // 獲取客戶(hù)端套接字描述符

            // 如果是有效的套接字描述符 (大于 0),則加入讀集合
            if (sd > 0)
                FD_SET(sd, &readfds);

            // 更新 max_sd 以跟蹤最大的文件描述符值
            if (sd > max_sd)
                max_sd = sd;
        }

        // 調(diào)用 select() 等待活動(dòng)發(fā)生,timeout 設(shè)置為 NULL 表示無(wú)限期阻塞
        activity = select(max_sd + 1, &readfds, NULL, NULL, NULL);

        // 檢查 select() 是否出錯(cuò) (忽略 EINTR 信號(hào)中斷)
        if ((activity < 0) && (errno != EINTR)) {
            perror("select 錯(cuò)誤");
            // 在實(shí)際應(yīng)用中可能需要更復(fù)雜的錯(cuò)誤處理邏輯
        }

        // 檢查監(jiān)聽(tīng)套接字是否就緒 (表示有新連接請(qǐng)求)
        if (FD_ISSET(listener_fd, &readfds)) {
            // 接受新連接
            if ((new_socket = accept(listener_fd, (struct sockaddr *)&address, &addrlen)) < 0) {
                perror("accept 失敗");
                // 嚴(yán)重錯(cuò)誤,通常需要退出或重啟服務(wù)
                exit(EXIT_FAILURE);
            }

            // 獲取并打印新客戶(hù)端的 IP 和端口信息
             char client_ip[INET_ADDRSTRLEN];
             inet_ntop(AF_INET, &address.sin_addr, client_ip, INET_ADDRSTRLEN);
             printf("新連接建立,套接字描述符為 %d,IP 為: %s,端口為: %d\n",
                   new_socket, client_ip, ntohs(address.sin_port));

            // 將新套接字添加到客戶(hù)端數(shù)組中
            for (i = 0; i < max_clients; i++) {
                // 找到數(shù)組中第一個(gè)空位
                if (client_sockets[i] == 0) {
                    client_sockets[i] = new_socket;
                    printf("已將套接字 %d 添加到列表位置 %d\n", new_socket, i);
                    break; // 添加成功后退出循環(huán)
                }
            }
            // 如果客戶(hù)端數(shù)組已滿(mǎn)
             if (i == max_clients) {
                printf("客戶(hù)端連接數(shù)已達(dá)上限,拒絕連接。\n");
                close(new_socket); // 關(guān)閉這個(gè)無(wú)法處理的連接
            }
        }

        // 檢查各個(gè)客戶(hù)端套接字是否有活動(dòng) (數(shù)據(jù)到達(dá)或連接關(guān)閉)
        for (i = 0; i < max_clients; i++) {
            sd = client_sockets[i]; // 當(dāng)前要檢查的客戶(hù)端套接字

            // 如果當(dāng)前客戶(hù)端套接字在就緒的讀集合中
            if (FD_ISSET(sd, &readfds)) {
                // 嘗試讀取數(shù)據(jù)
                // read() 返回值: >0 表示讀取的字節(jié)數(shù), 0 表示連接已關(guān)閉 (收到FIN), <0 表示錯(cuò)誤
                if ((valread = read(sd, buffer, BUFFER_SIZE)) <= 0) {
                    // 連接關(guān)閉或發(fā)生錯(cuò)誤
                    if (valread == 0) {
                        // 對(duì)端正常關(guān)閉連接 (FIN)
                        getpeername(sd, (struct sockaddr*)&address, &addrlen); // 獲取對(duì)端地址信息用于打印
                        char client_ip[INET_ADDRSTRLEN];
                        inet_ntop(AF_INET, &address.sin_addr, client_ip, INET_ADDRSTRLEN);
                        printf("主機(jī) %s:%d (套接字 %d) 斷開(kāi)連接\n", client_ip, ntohs(address.sin_port), sd);
                    } else {
                        // 讀取錯(cuò)誤
                        perror("read 錯(cuò)誤");
                        // 也可以在這里獲取對(duì)端信息打印
                    }

                    // 關(guān)閉套接字
                    close(sd);
                    // 從客戶(hù)端數(shù)組中移除 (標(biāo)記為 0)
                    client_sockets[i] = 0;
                    // 從所有主題訂閱中移除該客戶(hù)端
                    remove_subscriber(sd);

                } else {
                    // 成功讀取到數(shù)據(jù),處理客戶(hù)端發(fā)送的命令
                    buffer[valread] = '\0'; // 添加空字符結(jié)尾,確保字符串處理安全

                    // 簡(jiǎn)單的命令解析 (生產(chǎn)環(huán)境需要更健壯的解析器)
                    char command[5];       // 存儲(chǔ)命令 (SUB/PUB)
                    char topic_name[50];   // 存儲(chǔ)主題名
                    char message[BUFFER_SIZE]; // 存儲(chǔ)消息內(nèi)容 (PUB 命令)
                    memset(message, 0, sizeof(message)); // 清空消息緩沖區(qū)

                    // 使用 sscanf 解析輸入,格式為 "CMD TOPIC [MESSAGE]"
                    // %4s: 讀取最多4個(gè)字符作為命令
                    // %49s: 讀取最多49個(gè)字符作為主題名 (防止溢出)
                    // %[^\n]: 讀取從主題名后到行尾的所有字符作為消息 (包括空格)
                    int parsed_items = sscanf(buffer, "%4s %49s %[^\n]", command, topic_name, message);

                    printf("收到來(lái)自客戶(hù)端 %d 的數(shù)據(jù): %s", sd, buffer); // buffer 可能自帶換行符

                    if (parsed_items >= 2 && strcmp(command, "SUB") == 0) {
                        // 處理 SUB 命令
                        int topic_idx = find_or_create_topic(topic_name);
                        if (topic_idx != -1) {
                            add_subscriber(topic_idx, sd);
                            // 可選: 向客戶(hù)端發(fā)送訂閱成功確認(rèn)
                            // send(sd, "SUB OK\n", 7, 0);
                        } else {
                            // 可選: 發(fā)送錯(cuò)誤(如主題數(shù)達(dá)到上限)
                            // send(sd, "ERR max topics\n", 15, 0);
                        }
                    } else if (parsed_items >= 3 && strcmp(command, "PUB") == 0) {
                         // 處理 PUB 命令
                         int topic_idx = find_or_create_topic(topic_name); // 查找主題,如果不存在,簡(jiǎn)單實(shí)現(xiàn)可以忽略或報(bào)錯(cuò)
                          if (topic_idx != -1) {
                             // 確保消息格式包含換行符,便于客戶(hù)端接收處理
                             // 注意: 這里假設(shè) message 已包含從 sscanf 讀取的內(nèi)容
                             // 這里對(duì) message 的處理可以省略,因?yàn)?publish_message 內(nèi)部會(huì)確保換行符
                             publish_message(topic_idx, message, sd);
                         } else {
                              printf("未找到用于發(fā)布的主題 '%s'.\n", topic_name);
                             // 可選: 發(fā)送錯(cuò)誤(主題不存在)
                             // send(sd, "ERR no such topic\n", 18, 0);
                         }
                    } else {
                        // 無(wú)效命令格式
                        printf("來(lái)自客戶(hù)端 %d 的無(wú)效命令: %s", sd, buffer);
                        // 可選: 發(fā)送錯(cuò)誤
                        // send(sd, "ERR invalid command\n", 20, 0);
                    }
                    // 清理緩沖區(qū)以便下次讀取 (可選,因?yàn)?read 會(huì)覆蓋)
                    memset(buffer, 0, BUFFER_SIZE + 1);
                }
            }
        }
    }

    // 理論上,服務(wù)器的無(wú)限循環(huán)不會(huì)到達(dá)這里,但在正常關(guān)閉流程中應(yīng)關(guān)閉監(jiān)聽(tīng)套接字
    close(listener_fd);

    return 0;
}

編譯

gcc pubsub_server_revised.c -o pubsub_server_revised

運(yùn)行

./pubsub_server_revised
(base) ?  back-end-notes git:(master) ? nc localhost 8080
^C
(base) ?  back-end-notes git:(master) ? nc localhost 8080
SUB TOPIC-1
MSG TOPIC-1 1,242.42,214\n
(base) ?  back-end-notes git:(master) ? nc localhost 8080
PUB TOPIC-1 1,242.42,214\n
piperliu@go-x86:~/code/socket$ ./pubsub_server_revised
監(jiān)聽(tīng)器已啟動(dòng)在端口 8080 
等待連接中 ...
新連接建立,套接字描述符為 5,IP 為: 127.0.0.1,端口為: 38910
已將套接字 5 添加到列表位置 0
主機(jī) 127.0.0.1:38910 (套接字 5) 斷開(kāi)連接
正在移除客戶(hù)端 5 的所有訂閱。
新連接建立,套接字描述符為 5,IP 為: 127.0.0.1,端口為: 38924
已將套接字 5 添加到列表位置 0
收到來(lái)自客戶(hù)端 5 的數(shù)據(jù): SUB TOPIC-1
客戶(hù)端 5 訂閱了主題 'TOPIC-1'
新連接建立,套接字描述符為 6,IP 為: 127.0.0.1,端口為: 58630
已將套接字 6 添加到列表位置 1
收到來(lái)自客戶(hù)端 6 的數(shù)據(jù): PUB TOPIC-1 1,242.42,214\n
向主題 'TOPIC-1' 發(fā)布消息: 1,242.42,214\n  -> 已發(fā)送至客戶(hù)端 5

訂閱與發(fā)布流程的內(nèi)核視角梳理

結(jié)合內(nèi)核操作,我們重新梳理客戶(hù)端訂閱和發(fā)布時(shí)服務(wù)器端的處理流程:

  1. 服務(wù)器初始化與監(jiān)聽(tīng)
  • socket():內(nèi)核創(chuàng)建 socket 結(jié)構(gòu),分配資源,返回文件描述符 listener_fd。
  • bind():內(nèi)核校驗(yàn)地址、端口可用性及權(quán)限,將本地地址信息關(guān)聯(lián)到 listener_fd 的 socket 結(jié)構(gòu)。
  • listen():內(nèi)核將 listener_fd 對(duì)應(yīng)的 socket 結(jié)構(gòu)狀態(tài)置為 LISTEN,并初始化 SYN 隊(duì)列和 Accept 隊(duì)列。
  1. 客戶(hù)端連接建立
  • 客戶(hù)端發(fā)起 TCP 連接請(qǐng)求(發(fā)送 SYN 包)。
  • 服務(wù)器內(nèi)核網(wǎng)絡(luò)協(xié)議棧收到 SYN,創(chuàng)建半連接條目放入 SYN 隊(duì)列,回復(fù) SYN-ACK。
  • 客戶(hù)端回復(fù) ACK,完成三次握手。內(nèi)核將對(duì)應(yīng)的連接從 SYN 隊(duì)列移至 Accept 隊(duì)列。
  • 服務(wù)器進(jìn)程調(diào)用 select(),select 檢測(cè)到 listener_fd 可讀(因?yàn)?Accept 隊(duì)列非空)。
  • FD_ISSET(listener_fd, ...) 為真。
  • 服務(wù)器進(jìn)程調(diào)用 accept()。內(nèi)核從 Accept 隊(duì)列取出一個(gè)連接,創(chuàng)建新的 socket 結(jié)構(gòu)和文件描述符 new_socket(狀態(tài)為 ESTABLISHED),返回給服務(wù)器進(jìn)程。
  1. 客戶(hù)端訂閱 (SUB)
  • 客戶(hù)端通過(guò) new_socket 發(fā)送 "SUB ..." 數(shù)據(jù)。
  • 數(shù)據(jù)到達(dá)服務(wù)器,內(nèi)核將其放入 new_socket 的接收緩沖區(qū)。
  • 服務(wù)器進(jìn)程調(diào)用 select(),select 檢測(cè)到 new_socket 可讀(接收緩沖區(qū)有數(shù)據(jù))。
  • FD_ISSET(new_socket, ...) 為真。
  • 服務(wù)器進(jìn)程調(diào)用 read(new_socket, ...),內(nèi)核從接收緩沖區(qū)復(fù)制數(shù)據(jù)到用戶(hù)空間 buffer。
  • 服務(wù)器進(jìn)程解析 buffer,識(shí)別訂閱請(qǐng)求,更新應(yīng)用層數(shù)據(jù)結(jié)構(gòu)(topics 數(shù)組)。
  1. 客戶(hù)端發(fā)布 (PUB)
  • 客戶(hù)端通過(guò) new_socket 發(fā)送 "PUB ..." 數(shù)據(jù)。
  • 數(shù)據(jù)到達(dá)服務(wù)器,內(nèi)核處理同上,select 報(bào)告 new_socket 可讀。
  • 服務(wù)器進(jìn)程調(diào)用 read() 獲取數(shù)據(jù)并解析。
  • 服務(wù)器進(jìn)程查找主題,遍歷訂閱者列表。
  • 對(duì)于每個(gè)訂閱者 sub_socket(且 sub_socket != new_socket):

服務(wù)器進(jìn)程調(diào)用 send(sub_socket, ...)。內(nèi)核將數(shù)據(jù)復(fù)制到 sub_socket 的發(fā)送緩沖區(qū)。

內(nèi)核網(wǎng)絡(luò)協(xié)議棧負(fù)責(zé)將發(fā)送緩沖區(qū)的數(shù)據(jù)打包成 TCP 段并發(fā)送出去。

  1. 客戶(hù)端斷開(kāi)連接 (有序關(guān)閉)
  • 客戶(hù)端調(diào)用 close() 或 shutdown(SHUT_WR),其內(nèi)核發(fā)送 FIN 包。
  • 服務(wù)器內(nèi)核收到 FIN,將 new_socket 標(biāo)記為收到 FIN,并向客戶(hù)端回復(fù) ACK。連接進(jìn)入 CLOSE_WAIT 狀態(tài)。
  • 服務(wù)器進(jìn)程調(diào)用 select(),select 檢測(cè)到 new_socket 可讀(因?yàn)槭盏?FIN 也是可讀事件)。
  • FD_ISSET(new_socket, ...) 為真。
  • 服務(wù)器進(jìn)程調(diào)用 read(new_socket, ...),read 返回 0。
  • 服務(wù)器進(jìn)程識(shí)別出連接關(guān)閉,調(diào)用 close(new_socket)。內(nèi)核發(fā)送 FIN 包給客戶(hù)端(如果尚未發(fā)送),釋放套接字資源(當(dāng)引用計(jì)數(shù)為0時(shí)),從 master_fds 移除 new_socket,清理應(yīng)用層資源。
責(zé)任編輯:武曉燕 來(lái)源: Piper蛋窩
相關(guān)推薦

2022-08-15 09:02:22

Redis模式訂閱消息

2010-03-03 16:19:29

Python Sock

2014-05-04 13:47:39

銳捷網(wǎng)絡(luò)極簡(jiǎn)網(wǎng)絡(luò)

2025-02-25 09:29:34

2021-01-12 08:43:29

Redis ListStreams

2019-02-17 10:05:24

TCPSocket網(wǎng)絡(luò)編程

2019-07-16 09:20:11

Redis數(shù)據(jù)庫(kù)NoSQL

2023-05-31 15:47:52

銳捷

2024-01-10 08:16:08

Redis集成JMS

2022-09-02 17:12:16

BlackboxLinux

2013-03-27 13:26:04

Android開(kāi)發(fā)Socket

2009-09-07 14:29:47

C# Socket編程C# Socket

2024-07-02 11:42:53

SpringRedis自定義

2017-09-07 08:33:07

華為CloudFabric云數(shù)據(jù)

2022-05-26 18:08:32

數(shù)據(jù)中心

2015-05-05 14:36:05

高校網(wǎng)絡(luò)銳捷

2018-09-10 18:50:05

云管理網(wǎng)絡(luò)華為云管理網(wǎng)絡(luò)

2020-09-15 10:25:13

Redis命令Java

2014-03-25 09:50:00

解釋器編程語(yǔ)言

2013-10-14 10:41:41

分配器buddy syste
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)