一文搞懂POSIX多線程:解鎖高性能編程的密碼
在計(jì)算機(jī)編程的廣闊領(lǐng)域中,POSIX 標(biāo)準(zhǔn)就像是一把通用的鑰匙,開(kāi)啟了跨平臺(tái)編程的大門。POSIX,即 Portable Operating System Interface(可移植操作系統(tǒng)接口) ,是 IEEE 為了規(guī)范各種 UNIX 操作系統(tǒng)提供的 API 接口而定義的一系列互相關(guān)聯(lián)標(biāo)準(zhǔn)的總稱。它的出現(xiàn),旨在解決不同 UNIX 系統(tǒng)之間接口不一致的問(wèn)題,讓開(kāi)發(fā)者能夠編寫(xiě)一次代碼,在多個(gè)符合 POSIX 標(biāo)準(zhǔn)的系統(tǒng)上運(yùn)行,實(shí)現(xiàn)源代碼級(jí)別的軟件可移植性。
對(duì)于多線程編程而言,POSIX 標(biāo)準(zhǔn)同樣意義非凡。在多核處理器盛行的今天,多線程編程成為充分利用硬件資源、提高程序性能的關(guān)鍵技術(shù)。POSIX 標(biāo)準(zhǔn)定義了一套清晰、規(guī)范的多線程編程接口,讓開(kāi)發(fā)者可以在不同的操作系統(tǒng)環(huán)境中,以統(tǒng)一的方式創(chuàng)建、管理線程,以及處理線程之間的同步和通信問(wèn)題 。無(wú)論是開(kāi)發(fā)高性能的服務(wù)器程序,還是優(yōu)化計(jì)算密集型的應(yīng)用,POSIX 標(biāo)準(zhǔn)下的多線程編程都能提供強(qiáng)大的支持。
接下來(lái),讓我們深入探索 POSIX 標(biāo)準(zhǔn)下的多線程編程世界,揭開(kāi)線程創(chuàng)建、同步機(jī)制等核心概念的神秘面紗。
一、多線程編程簡(jiǎn)介
1.1線程初印象
線程,作為進(jìn)程內(nèi)的執(zhí)行單元,可以理解為進(jìn)程這個(gè)大舞臺(tái)上的一個(gè)個(gè)小舞者,各自有著獨(dú)立的舞步(執(zhí)行路徑),卻又共享著舞臺(tái)的資源(進(jìn)程資源)。與進(jìn)程相比,線程更加輕量級(jí)。進(jìn)程是系統(tǒng)進(jìn)行資源分配和調(diào)度的基本單位,擁有獨(dú)立的地址空間、內(nèi)存、文件描述符等資源 ,進(jìn)程間的切換開(kāi)銷較大。而線程則是共享所屬進(jìn)程的資源,它們之間的切換開(kāi)銷相對(duì)較小,就像在同一個(gè)舞臺(tái)上不同舞者之間的快速換位,無(wú)需重新搭建整個(gè)舞臺(tái)。
線程的這些特點(diǎn),使得多線程編程在提升程序執(zhí)行效率上有著獨(dú)特的優(yōu)勢(shì)。多個(gè)線程可以并發(fā)執(zhí)行,充分利用多核處理器的并行計(jì)算能力,將復(fù)雜的任務(wù)分解為多個(gè)子任務(wù),每個(gè)子任務(wù)由一個(gè)線程負(fù)責(zé)處理,從而大大提高了程序的整體運(yùn)行速度。例如,在一個(gè)網(wǎng)絡(luò)服務(wù)器程序中,一個(gè)線程可以負(fù)責(zé)監(jiān)聽(tīng)客戶端的連接請(qǐng)求,另一個(gè)線程負(fù)責(zé)處理已經(jīng)建立連接的客戶端的數(shù)據(jù)傳輸,這樣可以同時(shí)處理多個(gè)客戶端的請(qǐng)求,提升服務(wù)器的響應(yīng)性能 。
1.2POSIX 線程庫(kù)
在 POSIX 標(biāo)準(zhǔn)下,進(jìn)行多線程編程離不開(kāi) POSIX 線程庫(kù)(pthread 庫(kù))。它就像是一根神奇的魔法棒,為開(kāi)發(fā)者提供了一系列強(qiáng)大的接口函數(shù),讓我們能夠輕松地操控線程。
其中,pthread_create函數(shù)用于創(chuàng)建一個(gè)新的線程 ,它的原型如下:
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);
thread參數(shù)用于返回新創(chuàng)建線程的 ID;attr參數(shù)用于設(shè)置線程的屬性,如果為NULL則使用默認(rèn)屬性;start_routine是一個(gè)函數(shù)指針,指向線程開(kāi)始執(zhí)行時(shí)調(diào)用的函數(shù);arg是傳遞給start_routine函數(shù)的參數(shù)。
而pthread_join函數(shù)則用于等待一個(gè)線程結(jié)束,其原型為:
int pthread_join(pthread_t thread, void **retval);
thread參數(shù)是要等待結(jié)束的線程 ID,retval用于獲取線程結(jié)束時(shí)的返回值。
下面是一個(gè)簡(jiǎn)單的使用pthread_create和pthread_join函數(shù)的代碼示例:
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
// 線程執(zhí)行的函數(shù)
void* thread_function(void* arg) {
printf("線程開(kāi)始執(zhí)行,參數(shù)為: %s\n", (char*)arg);
sleep(2); // 模擬線程執(zhí)行任務(wù)
printf("線程執(zhí)行結(jié)束\n");
return (void*)1; // 返回線程執(zhí)行結(jié)果
}
int main() {
pthread_t thread;
int res;
void* thread_result;
// 創(chuàng)建線程
res = pthread_create(&thread, NULL, thread_function, (void*)"Hello, Thread!");
if (res != 0) {
perror("線程創(chuàng)建失敗");
return 1;
}
printf("等待線程結(jié)束...\n");
// 等待線程結(jié)束,并獲取線程返回值
res = pthread_join(thread, &thread_result);
if (res != 0) {
perror("線程等待失敗");
return 1;
}
printf("線程已結(jié)束,返回值為: %ld\n", (long)thread_result);
return 0;
}
在這個(gè)示例中,我們創(chuàng)建了一個(gè)新線程,線程執(zhí)行thread_function函數(shù),在函數(shù)中打印傳入的參數(shù),然后休眠 2 秒模擬執(zhí)行任務(wù),最后返回一個(gè)值。主線程通過(guò)pthread_join等待子線程結(jié)束,并獲取其返回值。
1.3線程的生命周期
線程如同一個(gè)有生命的個(gè)體,有著自己完整的生命周期,從創(chuàng)建的那一刻開(kāi)始,經(jīng)歷運(yùn)行、阻塞、喚醒等階段,最終走向結(jié)束。
當(dāng)我們調(diào)用pthread_create函數(shù)時(shí),線程就誕生了,此時(shí)它處于就緒狀態(tài),等待著 CPU 的調(diào)度。一旦獲得 CPU 時(shí)間片,線程就進(jìn)入運(yùn)行狀態(tài),開(kāi)始執(zhí)行它的任務(wù),也就是調(diào)用我們指定的函數(shù) 。
在運(yùn)行過(guò)程中,線程可能會(huì)因?yàn)槟承┰蜻M(jìn)入阻塞狀態(tài)。比如,當(dāng)線程調(diào)用sleep函數(shù)時(shí),它會(huì)主動(dòng)放棄 CPU 使用權(quán),進(jìn)入睡眠狀態(tài),直到睡眠時(shí)間結(jié)束才會(huì)重新回到就緒狀態(tài),等待再次被調(diào)度執(zhí)行 。又或者,當(dāng)線程訪問(wèn)共享資源時(shí),如果資源被其他線程占用,它就需要等待,從而進(jìn)入阻塞狀態(tài),直到獲取到資源才會(huì)被喚醒,重新進(jìn)入運(yùn)行狀態(tài)。
當(dāng)線程執(zhí)行完它的任務(wù),也就是指定的函數(shù)返回時(shí),線程就進(jìn)入了結(jié)束狀態(tài)。此時(shí),我們可以通過(guò)pthread_join函數(shù)等待線程結(jié)束,并獲取它的返回值 ,也可以在創(chuàng)建線程時(shí)將其設(shè)置為分離狀態(tài),這樣線程結(jié)束后資源會(huì)自動(dòng)被回收,無(wú)需等待。了解線程的生命周期,有助于我們更好地管理線程,優(yōu)化程序的性能 。
二、Posix網(wǎng)絡(luò)API
2.1客戶端和服務(wù)端代碼示例
(1)服務(wù)端server.cpp
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
int main(int argc,char *argv[])
{
if (argc != 2)
{
printf("Using:./server port\nExample:./server 5005\n\n"); return -1;
}
// 第1步:創(chuàng)建服務(wù)端的socket。
int listenfd;
if ( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
perror("socket");
return -1;
}
// 第2步:把服務(wù)端用于通信的地址和端口綁定到socket上。
struct sockaddr_in servaddr; // 服務(wù)端地址信息的數(shù)據(jù)結(jié)構(gòu)。
memset(&servaddr,0,sizeof(servaddr));
servaddr.sin_family = AF_INET; // 協(xié)議族,在socket編程中只能是AF_INET。
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); // 任意ip地址。
//servaddr.sin_addr.s_addr = inet_addr("192.168.190.134"); // 指定ip地址。
servaddr.sin_port = htons(atoi(argv[1])); // 指定通信端口。
if (bind(listenfd,(struct sockaddr *)&servaddr,sizeof(servaddr)) != 0 )
{
perror("bind");
close(listenfd);
return -1;
}
// 第3步:把socket設(shè)置為監(jiān)聽(tīng)模式。
if (listen(listenfd,5) != 0 )
{
perror("listen");
close(listenfd);
return -1;
}
// 第4步:接受客戶端的連接。
int clientfd; // 連上來(lái)的客戶端socket。
int socklen = sizeof(struct sockaddr_in); // struct sockaddr_in的大小
struct sockaddr_in clientaddr; // 客戶端的地址信息。
clientfd = accept(listenfd, (struct sockaddr *)&clientaddr, (socklen_t*)&socklen);
printf("client (%s) connect server success。。。\n", inet_ntoa(clientaddr.sin_addr));
// 第5步:與客戶端通信,接收客戶端發(fā)過(guò)來(lái)的報(bào)文后,將該報(bào)文原封不動(dòng)返回給客戶端。
char buffer[1024];
// memset(buffer, 0, 1024);
while (1)
{
int ret;
memset(buffer, 0, sizeof(buffer));
// 接收客戶端的請(qǐng)求報(bào)文。
if ( (ret = recv(clientfd, buffer, sizeof(buffer), 0)) <= 0)
{
printf("ret = %d , client disconected!!!\n", ret);
break;
}
printf("recv msg: %s\n", buffer);
// 向客戶端發(fā)送響應(yīng)結(jié)果。
if ( (ret = send(clientfd, buffer, strlen(buffer), 0)) <= 0)
{
perror("send");
break;
}
printf("response client: %s success...\n", buffer);
}
// 第6步:關(guān)閉socket,釋放資源。
close(listenfd);
close(clientfd);
return 0;
}
(2)客戶端client.cpp
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include <netdb.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
int main(int argc,char *argv[])
{
if (argc != 3)
{
printf("Using:./client ip port\nExample:./client 127.0.0.1 5005\n\n"); return -1;
}
// 第1步:創(chuàng)建客戶端的socket。
int sockfd;
if ( (sockfd = socket(AF_INET,SOCK_STREAM,0))==-1)
{
perror("socket");
return -1;
}
// 第2步:向服務(wù)器發(fā)起連接請(qǐng)求。
struct hostent* h;
if ( (h = gethostbyname(argv[1])) == 0 ) // 指定服務(wù)端的ip地址。
{ printf("gethostbyname failed.\n"); close(sockfd); return -1; }
struct sockaddr_in servaddr;
memset(&servaddr,0,sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(atoi(argv[2])); // 指定服務(wù)端的通信端口。
memcpy(&servaddr.sin_addr,h->h_addr,h->h_length);
// 向服務(wù)端發(fā)起連接清求。
if (connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) != 0)
{
perror("connect");
close(sockfd);
return -1;
}
char buffer[1024];
// 第3步:與服務(wù)端通信,發(fā)送一個(gè)報(bào)文后等待回復(fù),然后再發(fā)下一個(gè)報(bào)文。
for (int i = 0; i < 3; i++)
{
int ret;
memset(buffer, 0, sizeof(buffer));
sprintf(buffer, "這是第[%d]條消息!", i+1);
if ( (ret = send(sockfd, buffer, strlen(buffer),0)) <= 0) // 向服務(wù)端發(fā)送請(qǐng)求報(bào)文。
{
perror("send");
break;
}
printf("發(fā)送:%s\n", buffer);
memset(buffer,0,sizeof(buffer));
if ( (ret = recv(sockfd, buffer, sizeof(buffer), 0)) <= 0) // 接收服務(wù)端的回應(yīng)報(bào)文。
{
printf("ret = %d error\n", ret);
break;
}
printf("從服務(wù)端接收:%s\n", buffer);
sleep(1);
}
// 第4步:關(guān)閉socket,釋放資源。
close(sockfd);
}
運(yùn)行結(jié)果:
圖片
著重分析以下幾個(gè)函數(shù)
(1)socket函數(shù)
int socket(int domain, int type, int protocol);
調(diào)用socket()函數(shù)會(huì)創(chuàng)建一個(gè)套接字(socket)對(duì)象。套接字由兩部分組成,文件描述符(fd)和 TCP控制塊(Tcp Control Block,tcb) 。Tcb主要包括關(guān)系信息有網(wǎng)絡(luò)的五元組(remote IP,remote Port, local IP, local Port, protocol),一個(gè)五元組就可以確定一個(gè)具體的網(wǎng)絡(luò)連接。
(2)listen函數(shù)
listen(int listenfd, backlog);
服務(wù)端在調(diào)用listen()后,就開(kāi)始監(jiān)聽(tīng)網(wǎng)絡(luò)上連接請(qǐng)求。第二個(gè)參數(shù) backlog, 在Linux是指全連接隊(duì)列的長(zhǎng)度,即一次最多能保存 backlog 個(gè)連接請(qǐng)求。
圖片
(3)connect 函數(shù)
客戶端調(diào)用connect()函數(shù),向指定服務(wù)端發(fā)起連接請(qǐng)求。
(4)accept 函數(shù)
int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
accept()函數(shù)只做兩件事,將連接請(qǐng)求從全連接隊(duì)列中取出,給該連接分配一個(gè)fd并返回。
(5) 三次握手過(guò)程分析
三次握手與listen/connect/accept三個(gè)函數(shù)有關(guān),這里放到一起進(jìn)行描述。
客戶端調(diào)用 connect 函數(shù),開(kāi)始進(jìn)入三次握手。客戶端發(fā)送syn包,以及帶著隨機(jī)的seq;
服務(wù)端listen函數(shù)監(jiān)聽(tīng)到有客戶端連接,listen函數(shù)會(huì)在內(nèi)核協(xié)議棧為該客戶端創(chuàng)建一個(gè)Tcb控制塊,并將其加入到半連接隊(duì)列。服務(wù)端在收到syn包后,會(huì)給客戶端恢復(fù)ack和syn包;
客戶端收到服務(wù)端的ack和syn后再次恢復(fù)ack,連接建立成功。
服務(wù)端在收到客戶端的ack后,會(huì)將該客戶端對(duì)應(yīng)的Tcb數(shù)據(jù)從半連接隊(duì)列移動(dòng)到全連接隊(duì)列。只要全連接隊(duì)列中有數(shù)據(jù)就會(huì)觸發(fā)accept,返回連接成功的客戶端fd、IP以及端口。此時(shí),Tcb完整的五元組構(gòu)建成功。
(6)send/recv 函數(shù)
至此,客戶端與服務(wù)端已經(jīng)成功建立連接,就可以相互通信了。
send/recv函數(shù)主要負(fù)責(zé)數(shù)據(jù)的收發(fā)。
過(guò)程分析
send函數(shù):負(fù)責(zé)將數(shù)據(jù)從用戶空間拷貝到內(nèi)核(具體是拷貝到該連接對(duì)應(yīng)的Tcb控制塊中的發(fā)送緩沖區(qū))。注意:send函數(shù)返回并不意味著數(shù)據(jù)已成功發(fā)送,因?yàn)閿?shù)據(jù)在到達(dá)內(nèi)核緩沖區(qū)后,內(nèi)核會(huì)根據(jù)自己的策略決定什么時(shí)候?qū)?shù)據(jù)發(fā)出。
recv函數(shù):負(fù)責(zé)將數(shù)據(jù)從內(nèi)核緩沖區(qū)拷貝到用戶空間。同理,數(shù)據(jù)也顯示到達(dá)該連接對(duì)應(yīng)的Tcb控制塊的接受緩沖區(qū)。
(7)close 函數(shù)
在服務(wù)器與客戶端建立連接之后,會(huì)進(jìn)行一些讀寫(xiě)操作,完成讀寫(xiě)操作后我們需要關(guān)閉相應(yīng)的socket,好比操作完打開(kāi)的文件要調(diào)用fclose關(guān)閉打開(kāi)的文件一樣。close過(guò)程涉及到四次揮手的全過(guò)程
四次揮手流程:
- 客戶端調(diào)用close函數(shù),內(nèi)核會(huì)發(fā)送fin包,客戶端進(jìn)入fin_wait1狀態(tài);
- 服務(wù)端收到fin包回復(fù)ack,客戶端進(jìn)入close_wait狀態(tài)。此時(shí),客戶客戶端往服務(wù)端發(fā)送的通道就關(guān)閉了,因?yàn)門cp是全雙工的,服務(wù)端還可以向客戶端發(fā)數(shù)據(jù)。
- 客戶端收到ack,進(jìn)入到fin_wait2狀態(tài);
- 服務(wù)端發(fā)送完數(shù)據(jù),發(fā)送fin包,服務(wù)端進(jìn)入last_ack狀態(tài);
- 客戶端收到fin包后,回復(fù)ack,進(jìn)入到time_wait狀態(tài);
- 服務(wù)端收到ack,雙方連接正常關(guān)閉。
注意:close操作只是讓相應(yīng)socket描述字的引用計(jì)數(shù)-1,只有當(dāng)引用計(jì)數(shù)為0的時(shí)候,才會(huì)觸發(fā)TCP客戶端向服務(wù)器發(fā)送終止連接請(qǐng)求
2.2雙方同時(shí)調(diào)用close
圖片
2.3常見(jiàn)面試問(wèn)題
為什么要三次握手?
答:因?yàn)橐粋€(gè)完整的TCP連接需要雙方都得到確認(rèn),客戶端發(fā)送請(qǐng)求和收到確認(rèn)需要兩次;服務(wù)端發(fā)送請(qǐng)求和收到確認(rèn)需要兩次,當(dāng)中服務(wù)回復(fù)確認(rèn)和發(fā)送請(qǐng)求合并為一次總共需要3次;才能保證雙向通道是通的。
一個(gè)服務(wù)器的端口數(shù)是65535,為何能做到一百萬(wàn)的連接?
答:主要是因?yàn)橐粭l連接是由五元組所組成,所以一個(gè)服務(wù)器的連接數(shù)是五個(gè)成員數(shù)的乘積。
如何應(yīng)對(duì)Dos(Deny of Service,拒絕服務(wù))攻擊?
答:Dos攻擊就是利用三次握手的原理,模擬客戶端只向服務(wù)器發(fā)送syn包,然后耗盡被攻擊對(duì)象的資源。比較多的做法是利用防火墻,做一些過(guò)濾規(guī)則
如何解決Tcp的粘包問(wèn)題?
答:(1) 在包頭上添加一個(gè)數(shù)據(jù)包長(zhǎng)度的字段,用于數(shù)據(jù)的劃分,實(shí)際項(xiàng)目中這個(gè)也用的最多;(2)包尾部加固定分隔符;
Tcp如何保證順序到達(dá)?
答:順序到達(dá)是由于TCP的延遲ACK的機(jī)制來(lái)保證的,TCP接收到數(shù)據(jù)并不是立即回復(fù)而是經(jīng)過(guò)一個(gè)延遲時(shí)間,回復(fù)接收到連續(xù)包的最大序列號(hào)加1。如果丟包之后的包都需要重傳。在弱網(wǎng)情況下這里就會(huì)有實(shí)時(shí)性問(wèn)題和帶寬占用的問(wèn)題;
time_wait 作用?
答:防止最后一個(gè)ACK沒(méi)有順利到達(dá)對(duì)方,超時(shí)重新發(fā)送ack。time_wait時(shí)常一般是120s可以修改。
服務(wù)器掉線重啟出現(xiàn)端口被占用怎么辦?
答:其實(shí)主要是由于還處于time_wait狀態(tài),端口并沒(méi)有真正釋放。這時(shí)候可以設(shè)置SO_REUSEADDR屬性,保證掉線能馬上重連。
三、同步機(jī)制:多線程協(xié)作的 “指揮家”
在多線程編程的舞臺(tái)上,同步機(jī)制就像是一位經(jīng)驗(yàn)豐富的指揮家,協(xié)調(diào)著各個(gè)線程的行動(dòng),確保它們能夠和諧共處,高效地完成任務(wù)。多線程編程中,由于多個(gè)線程共享進(jìn)程資源,資源競(jìng)爭(zhēng)和線程協(xié)作問(wèn)題不可避免,而同步機(jī)制正是解決這些問(wèn)題的關(guān)鍵。接下來(lái),我們將深入探討互斥鎖、信號(hào)量和條件變量這幾種常見(jiàn)的同步機(jī)制 。
3.1資源競(jìng)爭(zhēng):多線程中的 “暗礁”
當(dāng)多個(gè)線程同時(shí)訪問(wèn)和修改共享資源時(shí),資源競(jìng)爭(zhēng)問(wèn)題就如同隱藏在暗處的暗礁,隨時(shí)可能讓程序的運(yùn)行陷入混亂。假設(shè)我們有一個(gè)簡(jiǎn)單的程序,包含兩個(gè)線程,它們都試圖對(duì)一個(gè)全局變量進(jìn)行加 1 操作:
#include <stdio.h>
#include <pthread.h>
// 全局變量
int global_variable = 0;
// 線程執(zhí)行函數(shù)
void* thread_function(void* arg) {
for (int i = 0; i < 1000000; i++) {
global_variable++;
}
return NULL;
}
int main() {
pthread_t thread1, thread2;
// 創(chuàng)建線程
pthread_create(&thread1, NULL, thread_function, NULL);
pthread_create(&thread2, NULL, thread_function, NULL);
// 等待線程結(jié)束
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);
printf("最終的全局變量值: %d\n", global_variable);
return 0;
}
按照我們的預(yù)期,兩個(gè)線程各對(duì)全局變量加 1000000 次,最終的結(jié)果應(yīng)該是 2000000。然而,實(shí)際運(yùn)行這個(gè)程序,你會(huì)發(fā)現(xiàn)結(jié)果往往小于 2000000。這是因?yàn)樵诙嗑€程環(huán)境下,global_variable++ 這一操作并非原子操作,它實(shí)際上包含了讀取變量值、加 1、寫(xiě)回變量值這三個(gè)步驟 。當(dāng)兩個(gè)線程同時(shí)執(zhí)行這一操作時(shí),可能會(huì)出現(xiàn)一個(gè)線程讀取了變量值,還未完成加 1 和寫(xiě)回操作,另一個(gè)線程也讀取了相同的值,導(dǎo)致最終結(jié)果出現(xiàn)偏差,數(shù)據(jù)不一致 。
3.2互斥鎖:守護(hù)資源的 “衛(wèi)士”
互斥鎖(Mutex)是解決資源競(jìng)爭(zhēng)問(wèn)題的常用工具,它就像一位忠誠(chéng)的衛(wèi)士,守護(hù)著共享資源,確保同一時(shí)間只有一個(gè)線程能夠訪問(wèn)資源?;コ怄i的工作原理基于一個(gè)簡(jiǎn)單的概念:當(dāng)一個(gè)線程獲取到互斥鎖時(shí),其他線程就必須等待,直到該線程釋放互斥鎖。
在 POSIX 線程庫(kù)中,使用互斥鎖非常簡(jiǎn)單。首先,我們需要定義一個(gè)互斥鎖變量:
pthread_mutex_t mutex;
然后,在訪問(wèn)共享資源之前,通過(guò) pthread_mutex_lock 函數(shù)獲取互斥鎖:
pthread_mutex_lock(&mutex);
如果互斥鎖已經(jīng)被其他線程持有,調(diào)用 pthread_mutex_lock 的線程將被阻塞,直到互斥鎖被釋放。當(dāng)訪問(wèn)完共享資源后,使用 pthread_mutex_unlock 函數(shù)釋放互斥鎖:
pthread_mutex_unlock(&mutex);
下面是使用互斥鎖改進(jìn)后的代碼:
#include <stdio.h>
#include <pthread.h>
// 全局變量
int global_variable = 0;
// 互斥鎖
pthread_mutex_t mutex;
// 線程執(zhí)行函數(shù)
void* thread_function(void* arg) {
for (int i = 0; i < 1000000; i++) {
// 獲取互斥鎖
pthread_mutex_lock(&mutex);
global_variable++;
// 釋放互斥鎖
pthread_mutex_unlock(&mutex);
}
return NULL;
}
int main() {
pthread_t thread1, thread2;
// 初始化互斥鎖
pthread_mutex_init(&mutex, NULL);
// 創(chuàng)建線程
pthread_create(&thread1, NULL, thread_function, NULL);
pthread_create(&thread2, NULL, thread_function, NULL);
// 等待線程結(jié)束
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);
// 銷毀互斥鎖
pthread_mutex_destroy(&mutex);
printf("最終的全局變量值: %d\n", global_variable);
return 0;
}
通過(guò)這種方式,互斥鎖有效地保護(hù)了共享資源,確保了數(shù)據(jù)的一致性 。
3.3信號(hào)量:資源分配的 “調(diào)度員”
信號(hào)量(Semaphore)是另一種強(qiáng)大的同步工具,它不僅可以用于實(shí)現(xiàn)互斥,還能用于管理資源的分配。信號(hào)量可以看作是一個(gè)計(jì)數(shù)器,它的值表示可用資源的數(shù)量 。當(dāng)一個(gè)線程想要訪問(wèn)資源時(shí),它需要先獲取信號(hào)量,如果信號(hào)量的值大于 0,則表示有可用資源,線程可以獲取信號(hào)量并繼續(xù)執(zhí)行,同時(shí)信號(hào)量的值減 1;如果信號(hào)量的值為 0,則表示沒(méi)有可用資源,線程將被阻塞,直到有其他線程釋放信號(hào)量 。
在 POSIX 標(biāo)準(zhǔn)中,信號(hào)量相關(guān)的函數(shù)主要有 sem_init(初始化信號(hào)量)、sem_wait(等待信號(hào)量)、sem_post(釋放信號(hào)量)和 sem_destroy(銷毀信號(hào)量)。假設(shè)我們有一個(gè)場(chǎng)景,有多個(gè)線程需要訪問(wèn)有限數(shù)量的資源,比如數(shù)據(jù)庫(kù)連接池中的連接。我們可以使用信號(hào)量來(lái)控制對(duì)這些資源的訪問(wèn):
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
// 定義信號(hào)量,假設(shè)有5個(gè)可用資源
sem_t semaphore;
// 線程執(zhí)行函數(shù)
void* thread_function(void* arg) {
// 等待信號(hào)量
sem_wait(&semaphore);
printf("線程獲取到資源,開(kāi)始執(zhí)行任務(wù)...\n");
// 模擬任務(wù)執(zhí)行
sleep(1);
printf("線程任務(wù)執(zhí)行完畢,釋放資源\n");
// 釋放信號(hào)量
sem_post(&semaphore);
return NULL;
}
int main() {
pthread_t threads[10];
// 初始化信號(hào)量,設(shè)置初始值為5
sem_init(&semaphore, 0, 5);
// 創(chuàng)建10個(gè)線程
for (int i = 0; i < 10; i++) {
pthread_create(&threads[i], NULL, thread_function, NULL);
}
// 等待所有線程結(jié)束
for (int i = 0; i < 10; i++) {
pthread_join(threads[i], NULL);
}
// 銷毀信號(hào)量
sem_destroy(&semaphore);
return 0;
}
在這個(gè)例子中,我們初始化信號(hào)量的值為 5,表示有 5 個(gè)可用資源。每個(gè)線程在執(zhí)行任務(wù)前先通過(guò) sem_wait 等待信號(hào)量,獲取到信號(hào)量后才能訪問(wèn)資源,執(zhí)行完任務(wù)后通過(guò) sem_post 釋放信號(hào)量,這樣就保證了同時(shí)最多只有 5 個(gè)線程可以訪問(wèn)資源 。
3.4條件變量:線程間的 “傳聲筒”
條件變量(Condition Variable)用于線程間基于條件的通信,它為線程提供了一種等待特定條件發(fā)生的機(jī)制,就像一個(gè)傳聲筒,讓線程之間能夠相互傳達(dá)信息。條件變量通常與互斥鎖配合使用,以實(shí)現(xiàn)線程之間的同步和協(xié)作。
一個(gè)經(jīng)典的例子是生產(chǎn)者 - 消費(fèi)者模型。在這個(gè)模型中,生產(chǎn)者線程負(fù)責(zé)生成數(shù)據(jù)并將其放入緩沖區(qū),消費(fèi)者線程則從緩沖區(qū)中取出數(shù)據(jù)進(jìn)行處理。當(dāng)緩沖區(qū)為空時(shí),消費(fèi)者線程需要等待,直到生產(chǎn)者線程向緩沖區(qū)中放入數(shù)據(jù);當(dāng)緩沖區(qū)滿時(shí),生產(chǎn)者線程需要等待,直到消費(fèi)者線程從緩沖區(qū)中取出數(shù)據(jù) 。
下面是使用條件變量和互斥鎖實(shí)現(xiàn)生產(chǎn)者 - 消費(fèi)者模型的代碼示例:
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#define BUFFER_SIZE 5
int buffer[BUFFER_SIZE];
int in = 0, out = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;
pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
// 生產(chǎn)者線程函數(shù)
void* producer(void* arg) {
while (1) {
int item = rand() % 100; // 生成一個(gè)隨機(jī)數(shù)
pthread_mutex_lock(&mutex);
while ((in + 1) % BUFFER_SIZE == out) { // 緩沖區(qū)滿
pthread_cond_wait(?_full, &mutex);
}
buffer[in] = item;
printf("生產(chǎn)者放入數(shù)據(jù): %d\n", item);
in = (in + 1) % BUFFER_SIZE;
pthread_cond_signal(?_empty);
pthread_mutex_unlock(&mutex);
sleep(rand() % 2); // 模擬生產(chǎn)時(shí)間
}
return NULL;
}
// 消費(fèi)者線程函數(shù)
void* consumer(void* arg) {
while (1) {
pthread_mutex_lock(&mutex);
while (in == out) { // 緩沖區(qū)空
pthread_cond_wait(?_empty, &mutex);
}
int item = buffer[out];
printf("消費(fèi)者取出數(shù)據(jù): %d\n", item);
out = (out + 1) % BUFFER_SIZE;
pthread_cond_signal(?_full);
pthread_mutex_unlock(&mutex);
sleep(rand() % 3); // 模擬消費(fèi)時(shí)間
}
return NULL;
}
int main() {
pthread_t producer_thread, consumer_thread;
// 創(chuàng)建生產(chǎn)者和消費(fèi)者線程
pthread_create(&producer_thread, NULL, producer, NULL);
pthread_create(&consumer_thread, NULL, consumer, NULL);
// 等待線程結(jié)束
pthread_join(producer_thread, NULL);
pthread_join(consumer_thread, NULL);
// 銷毀互斥鎖和條件變量
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(?_empty);
pthread_cond_destroy(?_full);
return 0;
}
在這個(gè)代碼中,pthread_cond_wait 函數(shù)會(huì)使線程進(jìn)入等待狀態(tài),并自動(dòng)釋放互斥鎖,當(dāng)條件滿足被喚醒時(shí),會(huì)重新獲取互斥鎖。pthread_cond_signal 函數(shù)則用于喚醒等待在條件變量上的一個(gè)線程。通過(guò)條件變量和互斥鎖的緊密配合,生產(chǎn)者和消費(fèi)者線程能夠有條不紊地工作,實(shí)現(xiàn)高效的數(shù)據(jù)處理 。
四、多線程編程實(shí)戰(zhàn)演練
4.1多線程案例分析
在日常的編程工作中,文件處理是一項(xiàng)常見(jiàn)的任務(wù)。當(dāng)面對(duì)大量文件需要處理時(shí),單線程的處理方式往往效率低下,而多線程編程則能成為提升效率的利器。假設(shè)我們有一個(gè)需求:處理一批日志文件,需要統(tǒng)計(jì)每個(gè)文件中特定關(guān)鍵詞出現(xiàn)的次數(shù),并將結(jié)果匯總。
為了實(shí)現(xiàn)這個(gè)目標(biāo),我們可以設(shè)計(jì)一個(gè)多線程的文件處理方案。首先,將文件列表進(jìn)行分割,把不同的文件分配給不同的線程處理,這就像是將一堆任務(wù)分配給不同的工人,每個(gè)工人專注于自己手頭的任務(wù) 。每個(gè)線程負(fù)責(zé)讀取分配給自己的文件內(nèi)容,逐行掃描,統(tǒng)計(jì)關(guān)鍵詞出現(xiàn)的次數(shù)。
這個(gè)過(guò)程中,線程之間的同步機(jī)制至關(guān)重要。我們可以使用互斥鎖來(lái)保護(hù)共享的統(tǒng)計(jì)結(jié)果變量,確保不同線程在更新統(tǒng)計(jì)結(jié)果時(shí)不會(huì)出現(xiàn)數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題 。比如,當(dāng)一個(gè)線程統(tǒng)計(jì)完自己負(fù)責(zé)文件后,需要將統(tǒng)計(jì)結(jié)果累加到全局的統(tǒng)計(jì)變量中,此時(shí)通過(guò)獲取互斥鎖,保證同一時(shí)間只有一個(gè)線程能夠進(jìn)行累加操作,避免了數(shù)據(jù)不一致的情況 。
4.2代碼實(shí)現(xiàn)示例
下面是使用 POSIX 線程庫(kù)實(shí)現(xiàn)多線程文件處理的具體代碼:
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
#include <unistd.h>
#define MAX_FILES 10
#define KEYWORD "error" // 要統(tǒng)計(jì)的關(guān)鍵詞
// 線程參數(shù)結(jié)構(gòu)體
typedef struct {
char *file_name;
} ThreadArgs;
// 全局統(tǒng)計(jì)變量
int global_count = 0;
// 互斥鎖
pthread_mutex_t mutex;
// 線程執(zhí)行函數(shù)
void* count_keyword(void* arg) {
ThreadArgs *args = (ThreadArgs*)arg;
FILE *file = fopen(args->file_name, "r");
if (file == NULL) {
perror("文件打開(kāi)失敗");
pthread_exit(NULL);
}
char line[1024];
int local_count = 0;
while (fgets(line, sizeof(line), file) != NULL) {
if (strstr(line, KEYWORD) != NULL) {
local_count++;
}
}
fclose(file);
// 獲取互斥鎖,更新全局統(tǒng)計(jì)變量
pthread_mutex_lock(&mutex);
global_count += local_count;
pthread_mutex_unlock(&mutex);
pthread_exit(NULL);
}
int main() {
pthread_t threads[MAX_FILES];
ThreadArgs args[MAX_FILES];
char file_names[MAX_FILES][50] = {"file1.log", "file2.log", "file3.log", "file4.log", "file5.log", "file6.log", "file7.log", "file8.log", "file9.log", "file10.log"};
// 初始化互斥鎖
pthread_mutex_init(&mutex, NULL);
// 創(chuàng)建線程并分配文件
for (int i = 0; i < MAX_FILES; i++) {
args[i].file_name = file_names[i];
if (pthread_create(&threads[i], NULL, count_keyword, &args[i]) != 0) {
perror("線程創(chuàng)建失敗");
return 1;
}
}
// 等待所有線程結(jié)束
for (int i = 0; i < MAX_FILES; i++) {
if (pthread_join(threads[i], NULL) != 0) {
perror("線程等待失敗");
return 1;
}
}
// 銷毀互斥鎖
pthread_mutex_destroy(&mutex);
printf("關(guān)鍵詞 '%s' 出現(xiàn)的總次數(shù): %d\n", KEYWORD, global_count);
return 0;
}
在這段代碼中,count_keyword 函數(shù)是線程執(zhí)行的主體,它打開(kāi)分配的文件,逐行讀取并統(tǒng)計(jì)關(guān)鍵詞出現(xiàn)的次數(shù),最后通過(guò)互斥鎖將本地統(tǒng)計(jì)結(jié)果累加到全局變量中 。main 函數(shù)負(fù)責(zé)創(chuàng)建線程,為每個(gè)線程分配文件,并等待所有線程執(zhí)行完畢后輸出最終的統(tǒng)計(jì)結(jié)果 。
4.3多線程調(diào)試與優(yōu)化
在多線程程序的調(diào)試過(guò)程中,我們可能會(huì)遇到各種各樣的問(wèn)題。死鎖是一個(gè)常見(jiàn)的問(wèn)題,比如兩個(gè)線程分別持有不同的鎖,卻又試圖獲取對(duì)方持有的鎖,就會(huì)陷入死鎖狀態(tài),導(dǎo)致程序無(wú)法繼續(xù)執(zhí)行 。為了檢測(cè)死鎖,可以使用工具如Valgrind的Helgrind工具,它能夠幫助我們發(fā)現(xiàn)潛在的死鎖問(wèn)題。一旦發(fā)現(xiàn)死鎖,我們需要仔細(xì)檢查代碼中鎖的獲取和釋放順序,避免嵌套鎖的不合理使用 。
線程異常也是需要關(guān)注的問(wèn)題。當(dāng)線程執(zhí)行過(guò)程中出現(xiàn)未捕獲的異常時(shí),可能會(huì)導(dǎo)致整個(gè)程序崩潰。我們可以在線程函數(shù)中使用try - catch塊(如果是 C++ 代碼)或者進(jìn)行適當(dāng)?shù)腻e(cuò)誤處理,確保線程在遇到異常時(shí)能夠安全地退出,而不影響其他線程的正常運(yùn)行 。
在優(yōu)化方面,合理調(diào)整線程數(shù)量是一個(gè)重要的思路。線程數(shù)量并非越多越好,過(guò)多的線程會(huì)導(dǎo)致上下文切換開(kāi)銷增大,反而降低程序性能 。對(duì)于 CPU 密集型的任務(wù),線程數(shù)量可以設(shè)置為接近 CPU 核心數(shù);對(duì)于 I/O 密集型的任務(wù),由于線程在等待 I/O 操作時(shí)會(huì)阻塞,不會(huì)占用 CPU 資源,因此可以適當(dāng)增加線程數(shù)量 。此外,優(yōu)化同步機(jī)制也能提升性能,比如使用更細(xì)粒度的鎖,減少鎖的競(jìng)爭(zhēng)范圍,或者在合適的場(chǎng)景下使用無(wú)鎖數(shù)據(jù)結(jié)構(gòu),避免鎖帶來(lái)的開(kāi)銷 。通過(guò)不斷地調(diào)試和優(yōu)化,我們能夠讓多線程程序更加穩(wěn)健高效地運(yùn)行 。