自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

解鎖C++消息隊列:多線程通信的關(guān)鍵紐帶

開發(fā) 前端
在內(nèi)存資源有限的嵌入式系統(tǒng)中,如果消息隊列設(shè)置過大,可能會導(dǎo)致其他關(guān)鍵任務(wù)因為內(nèi)存不足而無法正常運行。為了優(yōu)化性能,可以根據(jù)系統(tǒng)的實際需求和負載情況,動態(tài)調(diào)整隊列大小。

在C++編程的廣袤天地里,當(dāng)涉及多線程或多進程的程序開發(fā)時,不同線程或進程間的有效通信至關(guān)重要。想象一下,一個大型的游戲開發(fā)項目,其中渲染線程需要不斷將最新的圖像數(shù)據(jù)傳遞給顯示線程,以便在屏幕上呈現(xiàn)出精美的畫面;又或者在一個分布式系統(tǒng)中,各個節(jié)點進程需要相互傳遞任務(wù)指令和數(shù)據(jù)結(jié)果 。而 C++ 消息隊列,正是解決這類線程或進程間通信問題的得力工具。它就像是一座橋梁,搭建起不同執(zhí)行單元之間的溝通渠道,讓數(shù)據(jù)和信息能夠順暢地流通,確保整個系統(tǒng)的高效運行。

消息隊列一般簡稱為 MQ (Messges Queue),是指利用高效可靠的消息傳遞機制進行與平臺無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進行分布式系統(tǒng)的集成,是在消息的傳輸過程中保存消息的容器。消息隊列本質(zhì)上是一個隊列,而隊列中存放的是一個個消息。消息隊列是一種在分布式系統(tǒng)中進行異步通信的機制。它允許發(fā)送者將消息放入隊列中,而接收者可以從隊列中獲取并處理這些消息。消息隊列常用于解耦、異步處理和削峰填谷等場景。

一、 消息隊列簡介

消息隊列中間件是分布式系統(tǒng)中重要的組件,主要解決應(yīng)用耦合、異步消息、流量削峰等問題。實現(xiàn)高性能、高可用、可伸縮和最終一致性架構(gòu)。是大型分布式系統(tǒng)不可缺少的中間件。

消息隊列是一種用于進程間通信(也稱為 IPC),或者用于應(yīng)用程序的各個組件之間或應(yīng)用程序之間的技術(shù)。消息隊列提供用于啟用消息傳遞的協(xié)議或接口。

通過 UNIX<sys.msg.h> 和 POSIX mqueue.h 系統(tǒng)庫提供了用于單個計算機內(nèi)的 IPC 的消息隊列,其中這些消息隊列可在同步和異步模式下使用。同步消息要求在發(fā)送消息時阻止發(fā)送或接收進程,直到接收方確認消息為止。異步消息不會阻止發(fā)送或接收進程,并且可以通過緩沖來延遲送達。

針對跨多個計算機的應(yīng)用程序,已開發(fā)了許多專有的開源消息隊列系統(tǒng),如 IBM 的 WebSphere MQ、Java 消息服務(wù) (JMS) 和 RabbitMQ。所有這些系統(tǒng)都提供發(fā)送、管理和接收消息的能力。

與傳統(tǒng)的請求和響應(yīng)消息相比,使用消息隊列可以增加應(yīng)用程序設(shè)計的復(fù)雜性,并增加需要管理的服務(wù)的數(shù)量。但在一定程度上,對于流處理等應(yīng)用,需要分布式消息隊列系統(tǒng)來管理增加的復(fù)雜性和規(guī)模。

目前在生產(chǎn)環(huán)境,使用較多的消息隊列有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。

圖片

1.1消息隊列特征

通過與傳統(tǒng)的 RPC 和請求-響應(yīng)機制相比較,就可以理解和體會到消息隊列及其新穎性了。消息隊列的主要功能如下:

  • 存儲:與依賴于使用套接字的基本 TCP 和 UDP 協(xié)議的傳統(tǒng)請求和響應(yīng)系統(tǒng)不同,消息隊列通常將消息存儲在某種類型的緩沖區(qū)中,直到目標(biāo)進程讀取這些消息或?qū)⑵鋸南㈥犃兄酗@式移除為止。
  • 異步:與請求和響應(yīng)系統(tǒng)不同,消息隊列通過緩沖消息可以在應(yīng)用程序中公開一定程度的異步性,允許源進程發(fā)送消息并在隊列中累積消息,而目標(biāo)進程則可以挑選消息進行處理。這樣,應(yīng)用程序就可以在某些故障情況下運行,例如連接斷斷續(xù)續(xù)或源進程或目標(biāo)進程故障。
  • 路由:消息隊列還可以提供路由功能,其中多個進程可以在同一隊列中讀取或?qū)懭胂?,從而實現(xiàn)廣播或單播通信模式。

消息隊列特點:消息隊列有三個作用,分別是削峰、解耦和異步

  • 流量削峰:主要用于在高并發(fā)情況下,業(yè)務(wù)異步處理,提供高峰期業(yè)務(wù)處理能力,避免系統(tǒng)癱瘓。假設(shè)系統(tǒng)只能處理1000個請求,但這時突然來了3000個請求,如果不加以限制就會造成系統(tǒng)癱瘓。使用消息隊列做緩沖,將多余的請求存放在消息隊列中,等系統(tǒng)根據(jù)自己處理請求的能力去消息隊列去。
  • 應(yīng)用解耦:主要用于當(dāng)一個業(yè)務(wù)需要多個模塊共同實現(xiàn),或者一條消息有多個系統(tǒng)需要對應(yīng)處理時,只需要主業(yè)務(wù)完成以后,發(fā)送一條MQ,其余模塊消費MQ消息,即可實現(xiàn)業(yè)務(wù),降低模塊之間的耦合。假設(shè)某個服務(wù) A 需要調(diào)用服務(wù) B,但是服務(wù) B 突然出現(xiàn)問題,這樣會導(dǎo)致服務(wù) A 也會出現(xiàn)問題。如果使用消息隊列,當(dāng)服務(wù) A 執(zhí)行完成之后,發(fā)送一條消息到隊列中,服務(wù) B 讀取到這條消息,那么它立刻開始進行業(yè)務(wù)的執(zhí)行。
  • 異步通信:主業(yè)務(wù)執(zhí)行結(jié)束后從屬業(yè)務(wù)通過MQ,異步執(zhí)行,減低業(yè)務(wù)的響應(yīng)時間,提高用戶體驗。假設(shè)有一個業(yè)務(wù),要先執(zhí)行服務(wù) A ,然后服務(wù) A 去調(diào)用服務(wù) B ,當(dāng)服務(wù) B 完成之后,服務(wù) A 調(diào)用服務(wù) C,這個業(yè)務(wù)需要一步步走下去。當(dāng)使用了消息隊列之后,服務(wù) A 完成之后,可以同時執(zhí)行服務(wù) B 和 服務(wù) C ,這樣就減低業(yè)務(wù)的響應(yīng)時間,提高用戶體驗。

1.2消息隊列優(yōu)勢

消息隊列的主要優(yōu)點是它可以在分散式應(yīng)用程序中的各個實體之間提供松散耦合。這允許進行異步非阻止通信,從而提供更高級別的進程故障容錯能力。

圖片

消息隊列提供了一個允許程序相互通信的接口。

程序之間無直接連接:消息隊列允許程序間進行間接通信。發(fā)送方不一定需要知道消息的接收方,反之亦然。這樣,消息隊列系統(tǒng)就可以決定路由消息(和潛在工作)的邏輯。

程序之間的通信可以不受時間影響:消息隊列通常會緩沖程序之間的消息,因此它們不必為了發(fā)送或接收消息而阻止或中斷執(zhí)行。消息隊列可以執(zhí)行其他任務(wù),并在消息到達時或稍后處理回復(fù)。在編寫消息傳遞應(yīng)用程序時,無需知道(或關(guān)心)程序何時發(fā)送消息,或者目標(biāo)何時能夠接收消息。消息不會丟失;隊列管理器會將它保留到目標(biāo)準(zhǔn)備開始處理它為止。消息將一直保留在隊列中,直到程序?qū)⑵湟瞥?。這意味著發(fā)送和接收程序是分離的;發(fā)送方可以繼續(xù)處理,而無需等待接收方確認收到消息。發(fā)送消息時,目標(biāo)應(yīng)用程序甚至不必運行。它可以檢索啟動后的消息。

可以通過小型獨立程序執(zhí)行工作:利用消息隊列可以發(fā)揮使用小型獨立程序的優(yōu)點。你可以將作業(yè)分散到幾個較小的獨立程序上,而不是使用單個大型程序按順序執(zhí)行作業(yè)的各個部分。請求程序會將消息發(fā)送到每個單獨的程序,要求它們執(zhí)行其功能。當(dāng)每個程序完成時,將以一條或多條消息的形式發(fā)送回結(jié)果。

通信可以受事件影響:可以根據(jù)隊列的狀態(tài)控制程序。例如,可以安排程序在消息到達隊列時立即啟動?;蛘?,可以指定只有在隊列上出現(xiàn) 10 條高于某個優(yōu)先級的消息,或者隊列上出現(xiàn) 10 條任意優(yōu)先級的消息時,程序才啟動。

應(yīng)用程序可以為消息分配優(yōu)先級:當(dāng)程序?qū)⑾⒎湃腙犃袝r,它可以為消息分配優(yōu)先級。這將確定在隊列中添加新消息的位置。程序可以按照消息在隊列中的順序或通過獲取特定消息從隊列中獲取消息。(如果程序正在尋找對先前發(fā)送的請求的答復(fù),則可能需要獲取特定消息。)

  • 恢復(fù)支持:許多消息隊列提供暫留存儲和日志記錄,因此可以在故障期間恢復(fù)隊列中的狀態(tài)和消息。
  • 多個隊列:某些系統(tǒng)允許應(yīng)用程序開發(fā)人員定義和配置多個隊列。這樣就可以根據(jù)發(fā)布者或訂閱者關(guān)系將消息路由到必要的實體。例如,Apache Kafka。
  • 易于伸縮:消息隊列可以橫向擴展以應(yīng)對消息負載的增加,而緊密耦合系統(tǒng)則不同,在其中擴展和管理通信流和終結(jié)點更為困難。

(1)生產(chǎn)者-消費者模型

接下來這種數(shù)據(jù)結(jié)構(gòu)的收發(fā)方式,選用 生產(chǎn)者消費者模型:由生產(chǎn)者發(fā)布消息隊列至消息服務(wù)器,再由消費者訂閱消息。

圖片

生產(chǎn)者(Producer)業(yè)務(wù)的發(fā)起方,負責(zé)生產(chǎn)消息發(fā)布給Broker。

消費者(Consumer)業(yè)務(wù)的處理方,負責(zé)從Broker訂閱消息并進行業(yè)務(wù)邏輯處理。

消息服務(wù)器(Broker)MQ的服務(wù)器。包括接收 Producer 發(fā)過來的消息、處理 Consumer 的消費消息請求、消息的持久化存儲、以及服務(wù)端過濾功能等。

注意消息服務(wù)器可以是,分布式,或多節(jié)點的集群,且每個節(jié)點里可能不止一個隊列。

圖片

當(dāng)然除了消息服務(wù)器外,生產(chǎn)者、消費者和消息本身也可以擁有集群的概念,我們可以對這三者進行分組,形成主題的概念,并進一步細化出二級標(biāo)簽,實現(xiàn)特定的集群收發(fā)特定消息的功能。

圖片

主題(topic)一級消息類型,不同生產(chǎn)者向特定的topic發(fā)送消息,再由MQ分發(fā)至特定的訂閱者,實現(xiàn)消息的傳遞,標(biāo)簽(tag)二級消息類型,用來進一步區(qū)分某個Topic下的消息子類。集群(group)一組生產(chǎn)者或消費者,這組生產(chǎn)者或消費者通常生產(chǎn)或消費同一類消息,且消息發(fā)布或訂閱的邏輯一致,到這一步,就構(gòu)造了消息隊列服務(wù)器的雛形。

(2)生產(chǎn)者-消費者模型

回過頭來分析一下剛才構(gòu)建的模型,任何一項技術(shù)都有他的利弊。拋去額外的維護成本不說,這個模型的弊端在于,過渡依賴于消息中間件,一旦中間件宕機了,整個消息體系就瓦解了。

此外,在設(shè)計時需要考慮更多因素。生產(chǎn)過程中難免會出現(xiàn)生產(chǎn)者,消費者或中間件服務(wù)器不可用的情況,隨之帶來的問題就是消息重復(fù)、消息堆積等等,所以實際運用的時候呢,往往會給出一些補償措施。

圖片

弊端:

  1. 消息的收發(fā)依賴于中間件,且中間件的穩(wěn)定運行需要維護成本。
  2. 提高開發(fā)復(fù)雜度。需要考慮消息的處理,包括消息冪等性(重復(fù)消費問題)、消息中間件的持久化和穩(wěn)定性、可靠性等。

PS:這里說一下持久化的問題簡單來說就是將數(shù)據(jù)存入磁盤,而不是存在內(nèi)存中隨服務(wù)器重啟斷開而消失,使數(shù)據(jù)能夠永久保存,重啟后數(shù)據(jù)能夠從磁盤中讀取恢復(fù)。MQ會將你的持久化消息寫入磁盤上的持久化日志文件,等消息被消費之后,RabbitMQ會把這條消息標(biāo)識為等待垃圾回收。缺點:性能低,寫入硬盤要比寫入內(nèi)存性能較低很多,從而降低了服務(wù)器的吞吐量。

(3)重復(fù)消費問題 - 消息冪等性

這里可以講一下其中的一種非常常見的問題及其補償措施,就是重復(fù)消費的問題:

重復(fù)消費:生產(chǎn)者多發(fā)、消費者多次消費等。

生產(chǎn)者多發(fā)、消費者多次消費,都會造成重復(fù)消費的問題。

解決這種問題的常用辦法,就是保證操作的冪等性

冪等操作(Idempotent Operation):執(zhí)行任意多次冪等操作所產(chǎn)生的影響均與一次執(zhí)行的效果相同。

冪等操作有一個特點,甚至還有公式:

f(x)=f(f(x)).

舉個例子,數(shù)據(jù)庫腳本insert前都會先delete,這么一組數(shù)據(jù)庫操作無論執(zhí)行多少次結(jié)果都是一樣的,重復(fù)消費的問題也可以用這個思想去解決。

實現(xiàn):

  • 消息中間件端根據(jù) Message Id 去重。
  • 消費端:數(shù)據(jù)庫:新增/修改。組件如redis進行自身去重。

(4)主流MQ對比

目前主流的MQ有以下幾種:

圖片圖片

在流量和大數(shù)據(jù)的時代,ActiveMQ和RabbitMQ這兩者因為吞吐量以及GitHub的社區(qū)活躍度的原因,在各大互聯(lián)網(wǎng)公司基本上銷聲匿跡了,越來越多的公司開始青睞于后兩者。其中RocketMQ是阿里開源的,這和同樣是阿里開源的rpc框dubbo設(shè)計風(fēng)格比較類似。Kafka則更多應(yīng)用在大數(shù)據(jù)業(yè)務(wù)場景中。

1.3常用的消息隊列

(1)ActiveMQ:是Apache下的一個子項目。

  • 優(yōu)點:單機吞吐量每秒萬級,時效性毫秒級,可用性高,基于主從架構(gòu)實現(xiàn)高可用性,消息可靠性較低的概率丟失數(shù)據(jù)。支持多種語言、支持Spring2.0的特性、支持多種傳送協(xié)議、支持通過JDBC和journal提供高速的消息持久化。
  • 缺點:官方社區(qū)現(xiàn)在的維護越來越少;社區(qū)活躍度不高。

(2)Kafka:是一個分布式消息發(fā)布訂閱系統(tǒng)。為大數(shù)據(jù)而生的消息中間件,大數(shù)據(jù)的殺手锏

  • 優(yōu)點:單機吞吐量每秒百萬級,時效性毫秒級,不會丟失數(shù)據(jù),不會導(dǎo)致不可用
  • 缺點:支持消息順序,但是一臺代理宕機后,就會產(chǎn)生消息亂序;消費失敗不支持重試;社區(qū)更新較慢

(3)RocketMQ:阿里系下開源的一款分布式、隊列模型的消息中間件,3.0版本名稱改為RocketMQ,是阿里參照 kafka 設(shè)計思想使用 java 實現(xiàn)的一套消息隊列。

  • 優(yōu)點:單機吞吐量十萬級,時效性毫秒級,消息可以做到 0 丟失,支持 10 億級別的消息堆積
  • 缺點:支持的客戶端語言不多,目前是 java 及 c++;社區(qū)活躍度一般;

(4)RabbitMQ:是使用Erlang編寫的一個開源的消息隊列

優(yōu)點:單機吞吐量萬級,時效性微秒級,支持多種語言

二、消息隊列工作原理

C++ 消息隊列遵循先進先出(FIFO)的原則 ,就像我們?nèi)粘I钪信抨犢I東西一樣,先到的人先接受服務(wù)。在消息隊列中,先發(fā)送的消息會被先接收和處理。它為進程間或線程間提供了一種可靠的數(shù)據(jù)傳遞方式。在一個多線程的圖形渲染程序中,主線程負責(zé)生成各種圖形繪制指令,然后將這些指令作為消息發(fā)送到消息隊列中,而渲染線程則從消息隊列中依次取出這些指令,按照順序進行圖形渲染操作 ,這樣就能確保圖形的繪制順序與指令生成順序一致,從而保證畫面的準(zhǔn)確性和流暢性。

2.1核心操作流程

①創(chuàng)建隊列:在 C++ 中創(chuàng)建消息隊列,通常會使用操作系統(tǒng)提供的相關(guān)函數(shù),以 Linux 系統(tǒng)為例,使用msgget()函數(shù)來創(chuàng)建一個消息隊列。msgget()函數(shù)需要兩個參數(shù),第一個參數(shù)key是一個鍵值,它可以通過ftok()函數(shù)根據(jù)一個已存在的文件路徑和一個項目標(biāo)識符生成,用于唯一標(biāo)識這個消息隊列,不同的進程只要使用相同的key值,就能訪問同一個消息隊列;

第二個參數(shù)msgflg用于指定創(chuàng)建消息隊列的方式和權(quán)限 ,比如IPC_CREAT表示如果消息隊列不存在就創(chuàng)建它,如果存在則返回其標(biāo)識符,還可以與文件權(quán)限位(如0666表示所有用戶可讀可寫)進行按位或運算來設(shè)置消息隊列的訪問權(quán)限。通過msgget()函數(shù)創(chuàng)建成功后,會返回一個唯一的標(biāo)識符來表示這個消息隊列,后續(xù)對該消息隊列的操作都將使用這個標(biāo)識符。

②發(fā)送消息:發(fā)送進程需要將消息內(nèi)容封裝到一個特定的結(jié)構(gòu)體中,這個結(jié)構(gòu)體至少要包含一個long類型的成員來表示消息類型。以一個簡單的消息結(jié)構(gòu)體為例:

struct MsgStruct {
    long mtype; // 消息類型
    char mtext[1024]; // 消息內(nèi)容
};

假設(shè)我們要發(fā)送一條消息,首先創(chuàng)建一個MsgStruct結(jié)構(gòu)體變量,設(shè)置好mtype和mtext的值,然后使用msgsnd()函數(shù)來發(fā)送消息。msgsnd()函數(shù)有四個參數(shù),第一個參數(shù)是前面創(chuàng)建消息隊列時返回的標(biāo)識符msgid,用于指定要發(fā)送到哪個消息隊列;第二個參數(shù)是指向要發(fā)送的消息結(jié)構(gòu)體的指針&msg;第三個參數(shù)msgsz是要發(fā)送消息的大小(不包括消息類型mtype占用的字節(jié)數(shù) ),例如strlen(msg.mtext);第四個參數(shù)msgflg用于指定發(fā)送消息的方式 ,如果為0,表示當(dāng)消息隊列滿時,msgsnd()函數(shù)將會阻塞,直到消息能寫進消息隊列,如果設(shè)置為IPC_NOWAIT,當(dāng)消息隊列已滿的時候,msgsnd()函數(shù)不等待立即返回。

MsgStruct msg;
msg.mtype = 1;
strcpy(msg.mtext, "Hello, message queue!");
if (msgsnd(msgid, &msg, strlen(msg.mtext), 0) == -1) {
    perror("msgsnd");
    // 處理發(fā)送失敗的情況
}

③接收消息:接收進程使用msgrcv()函數(shù)從消息隊列中接收消息。msgrcv()函數(shù)有五個參數(shù),第一個參數(shù)同樣是消息隊列標(biāo)識符msgid;第二個參數(shù)msgp是指向用于存放接收到消息的結(jié)構(gòu)體指針;第三個參數(shù)msgsz是要接收消息的大?。ú话ㄏ㈩愋驼加玫淖止?jié)數(shù) );第四個參數(shù)msgtyp用于指定接收消息的類型 ,如果msgtyp為0,則接收消息隊列中的第一個消息,如果msgtyp大于0,則接收類型等于msgtyp的第一個消息,如果msgtyp小于0,則接收類型小于等于msgtyp絕對值的第一個消息;第五個參數(shù)msgflg用于指定接收消息的方式 ,若為0,表示阻塞式接收消息,沒有該類型的消息msgrcv()函數(shù)一直阻塞等待,如果設(shè)置為IPC_NOWAIT,如果沒有返回條件的消息調(diào)用立即返回,此時錯誤碼為ENOMSG。

MsgStruct receivedMsg;
if (msgrcv(msgid, &receivedMsg, sizeof(receivedMsg.mtext), 1, 0) == -1) {
    perror("msgrcv");
    // 處理接收失敗的情況
}
printf("Received message: %s\n", receivedMsg.mtext);

④刪除隊列:當(dāng)消息隊列不再使用時,需要將其刪除以釋放系統(tǒng)資源。在 C++ 中,使用msgctl()函數(shù)來刪除消息隊列。msgctl()函數(shù)有三個參數(shù),第一個參數(shù)msqid是要刪除的消息隊列的標(biāo)識符;第二個參數(shù)cmd設(shè)置為IPC_RMID表示執(zhí)行刪除操作;第三個參數(shù)buf在刪除操作時設(shè)置為NULL。

if (msgctl(msgid, IPC_RMID, NULL) == -1) {
    perror("msgctl");
    // 處理刪除失敗的情況
}

2.2不同操作系統(tǒng)實現(xiàn)差異

消息隊列是一種基于內(nèi)核的通信機制,這意味著它依賴于操作系統(tǒng)內(nèi)核來實現(xiàn)其功能。不同的操作系統(tǒng),如 Windows、Linux、macOS 等,在實現(xiàn)消息隊列時可能會有不同的方式和特點。在 Windows 系統(tǒng)中,消息隊列與 Windows 的消息機制緊密相關(guān),它為每個線程維護一個消息隊列,用于處理各種窗口消息和用戶輸入事件等,并且有隊列消息(通過PostMessage和PostThreadMessage發(fā)送 )和非隊列消息(通過SendMessage發(fā)送 )之分。

而 Linux 系統(tǒng)提供了 System V 消息隊列和 POSIX 消息隊列兩種標(biāo)準(zhǔn)實現(xiàn),System V 消息隊列出現(xiàn)較早,在許多現(xiàn)有應(yīng)用中廣泛使用 ,POSIX 消息隊列則更注重可移植性。不同操作系統(tǒng)在消息隊列的創(chuàng)建、操作函數(shù)接口、消息格式、隊列管理等方面都可能存在差異,開發(fā)者在編寫跨平臺程序時,需要充分考慮這些差異,選擇合適的方式來實現(xiàn)消息隊列功能,以確保程序在不同操作系統(tǒng)上都能正確運行。

三、C++ 實現(xiàn)消息隊列的實戰(zhàn)演練

3.1基于標(biāo)準(zhǔn)庫的實現(xiàn)方式

(1)數(shù)據(jù)結(jié)構(gòu)選擇

在 C++ 中,我們選用std::queue作為底層數(shù)據(jù)結(jié)構(gòu)來存儲消息。std::queue是一個容器適配器,它默認基于std::deque實現(xiàn),具有典型的先進先出(FIFO)特性,這與消息隊列的基本特性完美契合 。在一個網(wǎng)絡(luò)通信程序中,接收到的網(wǎng)絡(luò)數(shù)據(jù)包可以作為消息,按照到達的先后順序依次存入std::queue中,這樣后續(xù)處理線程就能按照數(shù)據(jù)包接收的順序進行處理,保證了數(shù)據(jù)處理的順序性和正確性。同時,std::queue提供了簡潔易用的接口,如push用于將元素添加到隊列末尾 ,pop用于移除隊列頭部的元素 ,front用于訪問隊首元素等,方便我們進行消息的插入和提取操作 。

(2)同步機制構(gòu)建

為了保證多線程環(huán)境下對std::queue的安全訪問,我們使用互斥量(std::mutex)和std::unique_lock。互斥量就像是一把鎖,同一時間只有一個線程能夠獲取到這把鎖,從而訪問被保護的資源,避免了多個線程同時對隊列進行操作導(dǎo)致的數(shù)據(jù)競爭問題 。在多線程的游戲開發(fā)場景中,如果多個線程同時嘗試向存儲游戲事件消息的隊列中添加消息,就可能會出現(xiàn)數(shù)據(jù)不一致的情況,而互斥量可以有效避免這種問題。

std::unique_lock則是一個智能鎖,它基于 RAII(Resource Acquisition Is Initialization)機制,在構(gòu)造時自動鎖定互斥量,在析構(gòu)時自動解鎖互斥量 ,這樣即使在操作過程中發(fā)生異常,也能保證互斥量被正確釋放,避免了死鎖的發(fā)生,極大地提高了代碼的安全性和可靠性。

(3)條件變量運用

條件變量(std::condition_variable)在生產(chǎn)者 - 消費者模型中起著至關(guān)重要的協(xié)調(diào)作用。生產(chǎn)者線程在向隊列中添加消息后,通過調(diào)用條件變量的notify_one或notify_all方法通知消費者線程隊列中有新消息可供消費;消費者線程在嘗試從隊列中獲取消息時,如果發(fā)現(xiàn)隊列為空,就調(diào)用條件變量的wait方法進入等待狀態(tài),同時自動釋放之前獲取的互斥鎖,避免了資源的浪費和死鎖的產(chǎn)生 。當(dāng)生產(chǎn)者線程添加消息并通知后,消費者線程被喚醒,重新獲取互斥鎖,然后從隊列中取出消息進行處理 。

在一個文件處理系統(tǒng)中,生產(chǎn)者線程負責(zé)讀取文件內(nèi)容并將數(shù)據(jù)塊作為消息放入隊列,消費者線程則從隊列中取出數(shù)據(jù)塊進行解析和處理,如果沒有條件變量的協(xié)調(diào),消費者線程可能會不斷地?zé)o效嘗試從空隊列中取數(shù)據(jù),而有了條件變量,消費者線程就能在隊列為空時等待,直到生產(chǎn)者線程添加新數(shù)據(jù)并通知它,從而實現(xiàn)了高效的線程間協(xié)作。

3.2代碼示例解析

消息隊列類定義:下面是一個簡單的MessageQueue類的實現(xiàn):

class MessageQueue {
private:
    std::queue<std::string> queue;  // 存儲消息的隊列
    std::mutex mtx;                 // 保護隊列的互斥量
    std::condition_variable cv;     // 用于線程同步的條件變量
public:
    // 向隊列中添加消息
    void push(const std::string& message) {
        std::lock_guard<std::mutex> lock(mtx); // 利用lock_guard自動管理鎖的生命周期,構(gòu)造時上鎖,析構(gòu)時解鎖
        queue.push(message);                    // 添加消息到隊列
        cv.notify_one();                        // 通知等待的消費者線程
    }
    // 從隊列中獲取消息
    std::string pop() {
        std::unique_lock<std::mutex> lock(mtx); // 使用unique_lock更靈活地管理鎖
        cv.wait(lock, [this] { return!queue.empty(); }); // 等待直到隊列不為空,lambda表達式用于判斷條件
        std::string msg = queue.front();         // 獲取隊列頭部的消息
        queue.pop();                             // 彈出該消息
        return msg;
    }
};

在這個類中,queue成員變量用于存儲消息,mtx用于保護隊列的線程安全訪問,cv用于協(xié)調(diào)生產(chǎn)者和消費者線程。push函數(shù)在向隊列中添加消息后,通過cv.notify_one通知等待的消費者線程;pop函數(shù)在隊列為空時,通過cv.wait等待,直到隊列中有消息可供消費。

生產(chǎn)者與消費者線程函數(shù)

// 生產(chǎn)者線程函數(shù)
void producer(MessageQueue& mq) {
    const std::string messages[] = { "Message 1", "Message 2", "Message 3", "Message 4" };
    for (const auto& msg : messages) {
        std::this_thread::sleep_for(std::chrono::seconds(1));  // 模擬生產(chǎn)延遲
        std::cout << "Produced: " << msg << std::endl;
        mq.push(msg);  // 將消息放入隊列
    }
}
// 消費者線程函數(shù)
void consumer(MessageQueue& mq) {
    while (true) {
        std::string msg = mq.pop();  // 從隊列中獲取消息
        std::cout << "Consumed: " << msg << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(2));  // 模擬消費延遲
    }
}

在producer函數(shù)中,生產(chǎn)者每隔 1 秒生成一個消息,并通過mq.push將消息放入隊列;在consumer函數(shù)中,消費者不斷從隊列中獲取消息并消費,每隔 2 秒處理一個消息,如果隊列為空,會等待生產(chǎn)者添加新消息。

主函數(shù)邏輯

int main() {
    MessageQueue mq;  // 創(chuàng)建消息隊列
    // 啟動生產(chǎn)者和消費者線程
    std::thread producerThread(producer, std::ref(mq));
    std::thread consumerThread(consumer, std::ref(mq));
    // 等待生產(chǎn)者和消費者線程完成
    producerThread.join();
    consumerThread.join();
    return 0;
}

在main函數(shù)中,首先創(chuàng)建了一個MessageQueue對象mq,然后分別啟動生產(chǎn)者線程和消費者線程,通過std::thread的構(gòu)造函數(shù)將producer和consumer函數(shù)與mq對象關(guān)聯(lián)起來。最后,通過調(diào)用join方法等待兩個線程執(zhí)行完畢,確保程序在所有任務(wù)完成后才結(jié)束。

四、消息隊列應(yīng)用場景

4.1 異步處理

場景說明:用戶注冊后,需要發(fā)送注冊郵件和發(fā)送注冊信息,傳統(tǒng)的做法有兩種:串行方式、并行方式

(1)串行方式

將注冊信息寫入數(shù)據(jù)庫成功后,發(fā)送注冊郵件,然后發(fā)送注冊短信,而所有任務(wù)執(zhí)行完成后,返回信息給客戶端

圖片圖片

(2)并行方式

將注冊信息寫入數(shù)據(jù)庫成功后,同時進行發(fā)送注冊郵件和發(fā)送注冊短信的操作。而所有任務(wù)執(zhí)行完成后,返回信息給客戶端。同串行方式相比,并行方式可以提高執(zhí)行效率,減少執(zhí)行時間。

圖片圖片

上面的比較可以發(fā)現(xiàn),假設(shè)三個操作均需要50ms的執(zhí)行時間,排除網(wǎng)絡(luò)因素,則最終執(zhí)行完成,串行方式需要150ms,而并行方式需要100ms。

因為cpu在單位時間內(nèi)處理的請求數(shù)量是一致的,假設(shè):CPU每1秒吞吐量是100此,則串行方式1秒內(nèi)可執(zhí)行的請求量為1000/150,不到7次;并行方式1秒內(nèi)可執(zhí)行的請求量為1000/100,為10次。

由上可以看出,傳統(tǒng)串行和并行的方式會受到系統(tǒng)性能的局限,那么如何解決這個問題?我們需要引入消息隊列,將不是必須的業(yè)務(wù)邏輯,異步進行處理,由此改造出來的流程為:

圖片圖片

根據(jù)上述的流程,用戶的響應(yīng)時間基本相當(dāng)于將用戶數(shù)據(jù)寫入數(shù)據(jù)庫的時間,發(fā)送注冊郵件、發(fā)送注冊短信的消息在寫入消息隊列后,即可返回執(zhí)行結(jié)果,寫入消息隊列的時間很快,幾乎可以忽略,也有此可以將系統(tǒng)吞吐量提升至20QPS,比串行方式提升近3倍,比并行方式提升2倍。

通俗易懂案例:

圖片圖片

這里以我以前接觸過的一個智能外呼系統(tǒng)為例:智能外呼:客服中心以電話的方式,主動發(fā)起的對客戶的呼叫問答活動。廣泛應(yīng)用在產(chǎn)品營銷、貸款催繳、投資理財?shù)确矫妗?/span>

說白了就是機器人給您打電話,不斷問問題,然后將您的問題轉(zhuǎn)成文字存儲在數(shù)據(jù)庫里的過程。

圖片圖片

業(yè)務(wù)邏輯:該系統(tǒng)的上游是外呼請求的發(fā)起方,下游是外呼動作的執(zhí)行機構(gòu)。語音識別,自然語言處理的模塊集成在這里面。

由于打電話是個耗時的過程,整個系統(tǒng)異步實現(xiàn),具體來說有2步:

當(dāng)有外呼請求發(fā)起時,中間件解析上游發(fā)來的請求報文,推送至執(zhí)行機構(gòu)并且即時回復(fù)響應(yīng)報文,實現(xiàn)異步第一步。

中間件輪詢下游處理結(jié)果(如性別的聲紋檢驗),封裝結(jié)果返回,異步第二步,實現(xiàn)閉環(huán)。注意打電話是個耗時的操作,在這個過程中,如果用傳統(tǒng)的基于請求/響應(yīng)的同步通訊方式,在上游發(fā)起請求后,監(jiān)聽過程中生產(chǎn)者線程會一直阻塞。如果這條流水線上有其他業(yè)務(wù)處理,會造成時間和資源的浪費。

但如果使用如果使用異步消息處理,立即返回消息發(fā)送成功或失敗的回調(diào)方法,就能實現(xiàn)生產(chǎn)者線程不阻塞,從而達到異步執(zhí)行的效果。

4.2應(yīng)用解耦

場景說明:用戶下單后,訂單系統(tǒng)需要通知庫存系統(tǒng)。

傳統(tǒng)的做法為:訂單系統(tǒng)調(diào)用庫存系統(tǒng)的接口。如下圖所示:

圖片圖片

傳統(tǒng)方式具有如下缺點:

  • 假設(shè)庫存系統(tǒng)訪問失敗,則訂單減少庫存失敗,導(dǎo)致訂單創(chuàng)建失敗
  • 訂單系統(tǒng)同庫存系統(tǒng)過度耦合

如何解決上述的缺點呢?需要引入消息隊列,引入消息隊列后的架構(gòu)如下圖所示:

圖片圖片

  • 訂單系統(tǒng):用戶下單后,訂單系統(tǒng)進行數(shù)據(jù)持久化處理,然后將消息寫入消息隊列,返回訂單創(chuàng)建成功
  • 庫存系統(tǒng):使用拉/推的方式,獲取下單信息,庫存系統(tǒng)根據(jù)訂單信息,進行庫存操作。

假如在下單時庫存系統(tǒng)不能正常使用。也不影響正常下單,因為下單后,訂單系統(tǒng)寫入消息隊列就不再關(guān)心其后續(xù)操作了。由此實現(xiàn)了訂單系統(tǒng)與庫存系統(tǒng)的應(yīng)用解耦。

4.3流量削鋒

流量削鋒也是消息隊列中的常用場景,一般在秒殺或團搶活動中使用廣泛。

應(yīng)用場景:秒殺活動,一般會因為流量過大,導(dǎo)致流量暴增,應(yīng)用掛掉。為解決這個問題,一般需要在應(yīng)用前端加入消息隊列。可以控制參與活動的人數(shù);

可以緩解短時間內(nèi)高流量對應(yīng)用的巨大壓力;

流量削鋒處理方式系統(tǒng)圖如下:

圖片圖片

  • 服務(wù)器在接收到用戶請求后,首先寫入消息隊列。這時如果消息隊列中消息數(shù)量超過最大數(shù)量,則直接拒絕用戶請求或返回跳轉(zhuǎn)到錯誤頁面;
  • 秒殺業(yè)務(wù)根據(jù)秒殺規(guī)則讀取消息隊列中的請求信息,進行后續(xù)處理。

4.4 日志處理

日志處理是指將消息隊列用在日志處理中,比如Kafka的應(yīng)用,解決大量日志傳輸?shù)膯栴}。架構(gòu)簡化如下:

圖片圖片

  • 日志采集客戶端:負責(zé)日志數(shù)據(jù)采集,定時寫受寫入Kafka隊列;
  • Kafka消息隊列:負責(zé)日志數(shù)據(jù)的接收,存儲和轉(zhuǎn)發(fā);
  • 日志處理應(yīng)用:訂閱并消費kafka隊列中的日志數(shù)據(jù);

圖片

  • Kafka:接收用戶日志的消息隊列。
  • Logstash:做日志解析,統(tǒng)一成JSON輸出給Elasticsearch。
  • Elasticsearch:實時日志分析服務(wù)的核心技術(shù),一個schemaless,實時的數(shù)據(jù)存儲服務(wù),通過index組織數(shù)據(jù),兼具強大的搜索和統(tǒng)計功能。
  • Kibana:基于Elasticsearch的數(shù)據(jù)可視化組件,超強的數(shù)據(jù)可視化能力是眾多公司選擇ELK stack的重要原因。

4.5 消息通訊

消息通訊是指,消息隊列一般都內(nèi)置了高效的通信機制,因此也可以用在純的消息通訊。比如實現(xiàn)點對點消息隊列、聊天室等。

點對點通訊

圖片圖片

在點對點通訊架構(gòu)設(shè)計中,客戶端A和客戶端B共用一個消息隊列,即可實現(xiàn)消息通訊功能。

聊天室通訊

圖片

客戶端A、客戶端B、直至客戶端N訂閱同一消息隊列,進行消息的發(fā)布與接收,即可實現(xiàn)聊天通訊方案架構(gòu)設(shè)計。

五、 消息中間件示例

4.1 電商系統(tǒng)

圖片圖片

消息隊列采用高可用、可持久化的消息中間件。比如Active MQ,Rabbit MQ,Rocket MQ。

  • 應(yīng)用將主干邏輯處理完成后,寫入消息隊列。消息發(fā)送是否成功可以開啟消息的確認模式。(消息隊列返回消息接收成功狀態(tài)后,應(yīng)用再返回,這樣保障消息的完整性)
  • 擴展流程(發(fā)短信、配送處理)訂閱隊列消息。采用推或拉的方式獲取消息并處理。
  • 消息將應(yīng)用解耦的同時,帶來了數(shù)據(jù)一致性問題,可以采用最終一致性方式解決。比如主數(shù)據(jù)寫入數(shù)據(jù)庫,擴展應(yīng)用根據(jù)消息隊列,并結(jié)合數(shù)據(jù)庫方式實現(xiàn)基于消息隊列的后續(xù)處理。

4.2 日志收集系統(tǒng)

圖片圖片

分為Zookeeper注冊中心,日志收集客戶端,Kafka集群和Storm集群(OtherApp)四部分組成。

  • Zookeeper注冊中心,提出負載均衡和地址查找服務(wù);
  • 日志收集客戶端,用于采集應(yīng)用系統(tǒng)的日志,并將數(shù)據(jù)推送到kafka隊列;
  • Kafka集群:接收,路由,存儲,轉(zhuǎn)發(fā)等消息處理;
  • Storm集群:與OtherApp處于同一級別,采用拉的方式消費隊列中的數(shù)據(jù);

六、JMS消息服務(wù)

講消息隊列就不得不提JMS 。JMS(Java Message Service,Java消息服務(wù))API是一個消息服務(wù)的標(biāo)準(zhǔn)/規(guī)范,允許應(yīng)用程序組件基于JavaEE平臺創(chuàng)建、發(fā)送、接收和讀取消息。它使分布式通信耦合度更低,消息服務(wù)更加可靠以及異步性。

在EJB架構(gòu)中,有消息bean可以無縫的與JM消息服務(wù)集成。在J2EE架構(gòu)模式中,有消息服務(wù)者模式,用于實現(xiàn)消息與應(yīng)用直接的解耦。

6.1 消息模型

在JMS標(biāo)準(zhǔn)中,有兩種消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。

(1)P2P模式

圖片圖片

P2P模式包含三個角色:消息隊列(Queue),發(fā)送者(Sender),接收者(Receiver)。每個消息都被發(fā)送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留著消息,直到他們被消費或超時。

P2P的特點

每個消息只有一個消費者(Consumer)(即一旦被消費,消息就不再在消息隊列中),發(fā)送者和接收者之間在時間上沒有依賴性,也就是說當(dāng)發(fā)送者發(fā)送了消息之后,不管接收者有沒有正在運行,它不會影響到消息被發(fā)送到隊列

接收者在成功接收消息之后需向隊列應(yīng)答成功,如果希望發(fā)送的每個消息都會被成功處理的話,那么需要P2P模式。

(2)Pub/Sub模式

圖片圖片

包含三個角色:主題(Topic),發(fā)布者(Publisher),訂閱者(Subscriber) 。多個發(fā)布者將消息發(fā)送到Topic,系統(tǒng)將這些消息傳遞給多個訂閱者。

Pub/Sub的特點

  • 每個消息可以有多個消費者
  • 發(fā)布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創(chuàng)建一個訂閱者之后,才能消費發(fā)布者的消息。
  • 為了消費消息,訂閱者必須保持運行的狀態(tài)。

為了緩和這樣嚴(yán)格的時間相關(guān)性,JMS允許訂閱者創(chuàng)建一個可持久化的訂閱。這樣,即使訂閱者沒有被激活(運行),它也能接收到發(fā)布者的消息。如果希望發(fā)送的消息可以不被做任何處理、或者只被一個消息者處理、或者可以被多個消費者處理的話,那么可以采用Pub/Sub模型。

6.2消息消費

在JMS中,消息的產(chǎn)生和消費都是異步的。對于消費來說,JMS的消息者可以通過兩種方式來消費消息。

  • 同步:訂閱者或接收者通過receive方法來接收消息,receive方法在接收到消息之前(或超時之前)將一直阻塞;
  • 異步:訂閱者或接收者可以注冊為一個消息監(jiān)聽器。當(dāng)消息到達之后,系統(tǒng)自動調(diào)用監(jiān)聽器的onMessage方法。

JNDI:Java命名和目錄接口,是一種標(biāo)準(zhǔn)的Java命名系統(tǒng)接口??梢栽诰W(wǎng)絡(luò)上查找和訪問服務(wù)。通過指定一個資源名稱,該名稱對應(yīng)于數(shù)據(jù)庫或命名服務(wù)中的一個記錄,同時返回資源連接建立所必須的信息。JNDI在JMS中起到查找和訪問發(fā)送目標(biāo)或消息來源的作用。

6.3JMS編程模型

  • ConnectionFactory:創(chuàng)建Connection對象的工廠,針對兩種不同的JMS消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種??梢酝ㄟ^JNDI來查找ConnectionFactory對象。
  • Destination:Destination的意思是消息生產(chǎn)者的消息發(fā)送目標(biāo)或者說消息消費者的消息來源。對于消息生產(chǎn)者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對于消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。所以,Destination實際上就是兩種類型的對象:Queue、Topic可以通過JNDI來查找Destination。
  • Connection:表示在客戶端和JMS系統(tǒng)之間建立的鏈接(對TCP/IP Socket的包裝)。Connection可以產(chǎn)生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnectionTopicConnection。
  • Session:Session是操作消息的接口??梢酝ㄟ^session創(chuàng)建生產(chǎn)者、消費者、消息等。Session提供了事務(wù)的功能。當(dāng)需要使用session發(fā)送/接收多個消息時,可以將這些發(fā)送/接收動作放到一個事務(wù)中。同樣,也分QueueSession和TopicSession。

七、案例實戰(zhàn)

7.1案例分析

假設(shè)我們要實現(xiàn)一個簡單的任務(wù)調(diào)度系統(tǒng),有多個消費者從消息隊列中獲取任務(wù)并進行處理。

首先,你需要安裝 RabbitMQ C++ 客戶端庫??梢允褂靡韵旅钸M行安裝(假設(shè)你已經(jīng)在 Linux 系統(tǒng)上安裝了 RabbitMQ):

sudo apt-get install librabbitmq-dev

然后,你可以使用以下代碼來發(fā)送和接收消息:

#include <iostream>
#include <string>
#include <SimpleAmqpClient/SimpleAmqpClient.h>

int main() {
    // 連接到 RabbitMQ 服務(wù)器
    AmqpClient::Channel::ptr_t channel = AmqpClient::Channel::Create("localhost");

    // 聲明一個隊列用于消息的發(fā)送和接收
    std::string queueName = "test_queue";
    channel->DeclareQueue(queueName, false, true, false, false);

    // 發(fā)送消息
    std::string message = "Hello, RabbitMQ!";
    channel->BasicPublish("", queueName, AmqpClient::BasicMessage::Create(message));

    std::cout << "Sent message: " << message << std::endl;

    // 接收消息
    bool noAck = true;
    AmqpClient::Envelope::ptr_t envelope;
    
    while (true) {
        bool received = channel->BasicConsumeMessage(queueName, envelope, noAck);
        
        if (received) {
            std::string receivedMessage = envelope->Message()->Body();
            
            std::cout << "Received message: " << receivedMessage << std::endl;
            
            break;
        }
        
        // 休眠一段時間再嘗試接收消息
        usleep(1000);
   }

   return 0;
}

在這個示例中,我們使用了 SimpleAmqpClient 庫來與 RabbitMQ 服務(wù)器進行交互。首先,我們創(chuàng)建一個通道并聲明一個隊列用于消息的發(fā)送和接收。然后,我們使用 BasicPublish 方法發(fā)送消息到隊列中。最后,我們使用 BasicConsumeMessage 方法循環(huán)嘗試接收消息,直到成功接收到一條消息為止。

請注意,在編譯時需要鏈接 SimpleAmqpClient 庫。你可以使用以下命令編譯代碼:

g++ -std=c++11 -I/usr/include/SimpleAmqpClient -o main main.cpp -lamqpcpp

這只是一個簡單的示例,用于演示如何在C++中使用RabbitMQ發(fā)送和接收消息。你可以根據(jù)自己的需求進行擴展和優(yōu)化。

7.2使用 C++ 消息隊列的注意要點

(1)消息格式與類型一致性

在使用 C++ 消息隊列進行通信時,確保發(fā)送和接收進程使用相同的消息格式和類型至關(guān)重要。消息格式就像是一種約定俗成的 “語言規(guī)則”,只有發(fā)送方和接收方都遵循相同的規(guī)則,才能實現(xiàn)準(zhǔn)確無誤的信息傳遞。在一個金融交易系統(tǒng)中,訂單消息可能包含訂單編號、交易金額、交易時間、股票代碼等信息,這些信息必須按照固定的順序和數(shù)據(jù)類型進行封裝和解析。如果發(fā)送方將交易金額定義為float類型,而接收方卻按照int類型去解析,就會導(dǎo)致數(shù)據(jù)錯誤,可能引發(fā)嚴(yán)重的交易問題,如金額計算錯誤、訂單匹配失敗等。

同樣,消息類型也需要嚴(yán)格一致。消息類型通常用于標(biāo)識消息的用途或類別,不同類型的消息在系統(tǒng)中會有不同的處理方式。在一個游戲開發(fā)項目中,可能存在 “玩家移動”“怪物生成”“道具拾取” 等不同類型的消息,如果發(fā)送方將 “玩家移動” 消息的類型標(biāo)識錯誤,接收方可能會將其當(dāng)作其他類型的消息進行處理,從而導(dǎo)致游戲邏輯混亂,玩家體驗受到嚴(yán)重影響。

(2)同步與并發(fā)訪問處理

當(dāng)多個線程或進程同時訪問消息隊列時,同步和并發(fā)訪問處理不當(dāng)很容易引發(fā)競爭條件和死鎖等問題。競爭條件是指多個線程或進程在訪問共享資源(如消息隊列)時,由于執(zhí)行順序的不確定性,導(dǎo)致最終結(jié)果出現(xiàn)錯誤。死鎖則是指兩個或多個線程或進程相互等待對方釋放資源,從而陷入無限期的阻塞狀態(tài)。為了避免這些問題,我們可以采用多種方法。

使用互斥鎖(如std::mutex)是最常見的方式之一,它能夠保證在同一時刻只有一個線程或進程可以訪問消息隊列,從而避免競爭條件。在一個多線程的日志記錄系統(tǒng)中,多個線程可能同時產(chǎn)生日志消息并嘗試將其發(fā)送到消息隊列中,如果沒有互斥鎖的保護,就可能出現(xiàn)消息順序混亂、數(shù)據(jù)丟失等問題。條件變量(如std::condition_variable)也起著重要作用,它可以協(xié)調(diào)線程之間的同步。

在生產(chǎn)者 - 消費者模型中,生產(chǎn)者線程在向消息隊列中添加消息后,可以通過條件變量通知消費者線程有新消息到來;消費者線程在隊列為空時,可以通過條件變量等待,避免無效的輪詢,提高系統(tǒng)效率。同時,合理的加鎖和解鎖策略也至關(guān)重要,要確保在適當(dāng)?shù)臅r機獲取和釋放鎖,避免死鎖的發(fā)生。在使用多個鎖的情況下,按照固定的順序獲取鎖可以有效防止死鎖。如果線程 A 需要獲取鎖 1 和鎖 2,線程 B 需要獲取鎖 2 和鎖 1,若不按照固定順序獲取鎖,就可能出現(xiàn)線程 A 獲取了鎖 1,線程 B 獲取了鎖 2,然后雙方都等待對方釋放自己需要的鎖,從而導(dǎo)致死鎖的情況 。

(3)隊列大小與性能優(yōu)化

消息隊列的大小設(shè)置對系統(tǒng)性能有著顯著影響。如果隊列設(shè)置過小,可能導(dǎo)致消息無法及時存儲,造成消息丟失。在一個高并發(fā)的電商訂單處理系統(tǒng)中,促銷活動期間短時間內(nèi)可能會產(chǎn)生大量的訂單消息,如果消息隊列過小,部分訂單消息可能因為隊列已滿而無法入隊,從而導(dǎo)致訂單丟失,給商家和用戶帶來損失。另一方面,隊列過大也會占用過多的系統(tǒng)內(nèi)存資源,影響系統(tǒng)的整體性能。

在內(nèi)存資源有限的嵌入式系統(tǒng)中,如果消息隊列設(shè)置過大,可能會導(dǎo)致其他關(guān)鍵任務(wù)因為內(nèi)存不足而無法正常運行。為了優(yōu)化性能,可以根據(jù)系統(tǒng)的實際需求和負載情況,動態(tài)調(diào)整隊列大小。在系統(tǒng)負載較低時,可以適當(dāng)縮小隊列大小,釋放內(nèi)存資源;當(dāng)系統(tǒng)負載升高時,及時擴大隊列大小,以滿足消息存儲的需求。采用消息壓縮技術(shù)可以減少消息在隊列中占用的空間,提高隊列的存儲效率。對于一些包含大量文本信息的日志消息,可以在發(fā)送前進行壓縮,接收后再進行解壓縮,這樣既能減少隊列空間的占用,又能降低網(wǎng)絡(luò)傳輸?shù)拈_銷,提升系統(tǒng)的整體性能。

責(zé)任編輯:武曉燕 來源: 深度Linux
相關(guān)推薦

2024-02-02 18:29:54

C++線程編程

2010-01-21 11:23:49

Linux多線程同步消息隊列

2012-05-18 10:36:20

CC++編程

2024-06-24 08:10:00

C++互斥鎖

2010-01-18 14:09:58

C++多線程

2010-02-04 10:19:39

C++多線程

2010-02-05 15:30:54

C++多線程測試

2021-02-25 15:58:46

C++線程編程開發(fā)技術(shù)

2021-03-05 07:38:52

C++線程編程開發(fā)技術(shù)

2023-12-14 15:05:08

volatile代碼C++

2021-03-26 05:54:00

C#數(shù)據(jù)方法

2021-06-10 00:13:43

C#隊列數(shù)據(jù)

2024-02-21 20:46:48

C++編程volatile

2024-06-24 12:57:09

多線程C++編程語言

2024-01-29 16:55:38

C++引用開發(fā)

2024-11-05 16:29:57

2011-06-14 15:25:28

C++多線程

2024-04-03 08:25:11

DictionaryC#字典類型

2010-01-26 14:35:11

C++關(guān)鍵字

2017-06-19 13:36:12

Linux進程消息隊列
點贊
收藏

51CTO技術(shù)棧公眾號