自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

消息隊列選型看這一篇就夠了

系統(tǒng)
本文是關于消息隊列(MQ)選型和常見問題的精心整理。在這篇文章中,我們將詳細介紹消息隊列的概念、作用以及如何選擇適合自己需求的消息隊列系統(tǒng)。

作者 | emoryliang

消息隊列是重要的分布式系統(tǒng)組件,在高性能、高可用、低耦合等系統(tǒng)架構中扮演著重要作用??捎糜诋惒酵ㄐ拧⑾鞣逄罟?、解耦系統(tǒng)、數(shù)據(jù)緩存等多種業(yè)務場景。本文是關于消息隊列(MQ)選型和常見問題的精心整理。在這篇文章中,我們將詳細介紹消息隊列的概念、作用以及如何選擇適合自己需求的消息隊列系統(tǒng)。

一、概述

消息隊列是分布式系統(tǒng)中重要的中間件,在高性能、高可用、低耦合等系統(tǒng)架構中扮演著重要作用。分布式系統(tǒng)可以借助消息隊列的能力,輕松實現(xiàn)以下功能:

  • 解耦:將一個流程的上下游拆解開,上游專注于生產(chǎn)消息,下游專注于處理消息;
  • 廣播:上游生產(chǎn)的消息可以輕松被多個下游服務處理;
  • 緩沖:應對突發(fā)流量,消息隊列扮演緩沖器的作用,保護下游服務,使其可以根據(jù)自身的實際消費能力處理消息;
  • 異步:上游發(fā)送消息后可以馬上返回,下游可以異步處理消息;
  • 冗余:保留歷史消息,處理失敗或當出現(xiàn)異常時可以進行重試或者回溯,防止丟失;

二、架構簡介

1. Kafka

(1) 系統(tǒng)框架

一個 Kafka 集群由多個 Broker 和一個 ZooKeeper 集群組成,Broker 作為 Kafka 節(jié)點的服務器。同一個消息主題 Topic 可以由多個分區(qū) Partition 組成,分區(qū)物理存儲在 Broker 上。負載均衡考慮,同一個 Topic 的多個分區(qū)存儲在多個不同的 Broker 上,為了提高可靠性,每個分區(qū)在不同的 Broker 會存在副本。

ZookKeeper 是一個分布式開源的應用程序協(xié)調(diào)服務,可以實現(xiàn)統(tǒng)一命名服務、狀態(tài)同步服務、集群管理、分布式應用配置項的管理等工作。Kafka 里的 ZooKeeper 主要有一下幾個作用:

  • Broker 注冊,當有 Broker 故障的時候能及時感知。
  • Topic 注冊,維護 Topic 各分區(qū)的個副本所在的 Broker 節(jié)點,以及對應 leader/follower 的角色。
  • Consumer 注冊,維護消費者組的 offset 以及消費者與分區(qū)的對應關系,實現(xiàn)負載均衡。

(2) 基本術語

  • Producer:消息生產(chǎn)者。一般情況下,一條消息會被發(fā)送到特定的主題上。通常情況下,寫入的消息會通過輪詢將消息寫入各分區(qū)。生產(chǎn)者也可以通過設定消息 key 值將消息寫入指定分區(qū)。寫入分區(qū)的數(shù)據(jù)越均勻 Kafka 的性能才能更好發(fā)揮。
  • Topic:Topic 是個抽象的虛擬概念,一個集群可以有多個 Topic,作為一類消息的標識。一個生產(chǎn)者將消息發(fā)送到 topic,消費者通過訂閱 Topic 獲取分區(qū)消息。
  • Partition:Partition 是個物理概念,一個 Topic 對應一個或多個 Partition。新消息會以追加的方式寫入分區(qū)里,在同一個 Partition 里消息是有序的。Kafka 通過分區(qū),實現(xiàn)消息的冗余和伸縮性,以及支持物理上的并發(fā)讀、寫,大大提高了吞吐量。
  • Replicas:一個 Partition 有多個 Replicas 副本。這些副本保存在 broker,每個 broker 存儲著成百上千個不同主題和分區(qū)的副本,存儲的內(nèi)容分為兩種:master 副本,每個 Partition 都有一個 master 副本,所有內(nèi)容的寫入和消費都會經(jīng)過 master 副本;follower 副本不處理任何客戶端的請求,只同步 master 的內(nèi)容進行復制。如果 master 發(fā)生了異常,很快會有一個 follower 成為新的 master。
  • Consumer:消息讀取者。消費者訂閱主題,并按照一定順序讀取消息。Kafka 保證每個分區(qū)只能被一個消費者使用。
  • Offset:偏移量是一種元數(shù)據(jù),是不斷遞增的整數(shù)。在消息寫入時 Kafka 會把它添加到消息里。在分區(qū)內(nèi)偏移量是唯一的。消費過程中,會將最后讀取的偏移量存儲在 Kafka 中,消費者關閉偏移量不會丟失,重啟會繼續(xù)從上次位置開始消費。
  • Broker:獨立的 Kafka 服務器。一個 Topic 有 N 個 Partition,一個集群有 N 個 Broker,那么每個 Broker 都會存儲一個這個 Topic 的 Partition。如果某 topic 有 N 個 partition,集群有(N+M)個 broker,那么其中有 N 個 broker 存儲該 topic 的一個 partition,剩下的 M 個 broker 不存儲該 topic 的 partition 數(shù)據(jù)。如果某 topic 有 N 個 partition,集群中 broker 數(shù)目少于 N 個,那么一個 broker 存儲該 topic 的一個或多個 partition。在實際生產(chǎn)環(huán)境中,盡量避免這種情況的發(fā)生,這種情況容易導致 Kafka 集群數(shù)據(jù)不均衡。

2. Pulsar

(1) 系統(tǒng)框架

Pulsar 有三個重要的組件,Broker、BookKeeper 和ZooKeeper,Broker 是無狀態(tài)服務,客戶端需要連接到 Broker 上進行消息的傳遞。BookKeeper 與 ZooKeeper 是有狀態(tài)服務。BookKeeper 的節(jié)點叫 Bookie,負責存儲消息和游標,ZooKeeper 存儲 Broker 和 Bookie 的元數(shù)據(jù)。Pulsar 以這種架構,實現(xiàn)存儲和計算分離,Broker 負責計算,Bookie 負責有狀態(tài)存儲。

Pulsar 的多層架構影響了存儲數(shù)據(jù)的方式。Pulsar 將 Topic 分區(qū)劃分為分片(Segment),然后將這些分片存儲在 Apache BookKeeper 的存儲節(jié)點上,以提高性能、可伸縮性和可用性。Pulsar 的分布式日志以分片為中心,借助擴展日志存儲(通過 Apache BookKeeper)實現(xiàn),內(nèi)置分層存儲支持,因此分片可以均勻地分布在存儲節(jié)點上。由于與任一給定 Topic 相關的數(shù)據(jù)都不會與特定存儲節(jié)點進行捆綁,因此很容易替換存儲節(jié)點或縮擴容。另外,集群中最小或最慢的節(jié)點也不會成為存儲或帶寬的短板。

(2) 基本術語

  • Property:代表租戶,每個 property 都可以代表一個團隊、一個功能、一個產(chǎn)品線。一個 property 可包含多個 namesapce,多租戶是一種資源隔離手段,可以提高資源利用率;
  • Namespace:Pulsar 的基本管理單元,在 namaspace 級別可設置權限、消息 TTL、Retention 策略等。一個 namaspace 里的所有 topic 都繼承相同的設置。命名空間分為兩種:本地命名空間,只在集群內(nèi)可見、全局命名空間對多個集群可見集群命名空間;

  • Producer:數(shù)據(jù)生產(chǎn)方,負責創(chuàng)建消息并將消息投遞到 Pulsar 中;
  • Consumer:數(shù)據(jù)消費方,連接到 Pulsar 接收消息并進行相應的處理;
  • Broker:無狀態(tài) Proxy 服務,負責接收消息、傳遞消息、集群負載均衡等操作,它對 client 屏蔽了服務端讀寫流程的復雜性,是保證數(shù)據(jù)一致性與數(shù)據(jù)負載均衡的重要角色。Broker 不會持久化保存元數(shù)據(jù)??梢詳U容但不能縮容;
  • BookKeeper:有狀態(tài),負責持久化存儲消息。當集群擴容時,Pulsar 會在新增 BookKeeper 和 Segment(即 Bookeeper 的 Ledger),不需要像 kafka 一樣在擴容時進行 Rebalance。擴容結果是 Fragments 跨多個 Bookies 以帶狀分布,同一個 Ledger 的 Fragments 分布在多個 Bookie 上,導致讀取和寫入會在多個 Bookies 之間跳躍;
  • ZooKeeper:存儲 Pulsar 、 BookKeeper 的元數(shù)據(jù),集群配置等信息,負責集群間的協(xié)調(diào)、服務發(fā)現(xiàn)等;
  • Topic:用作從 producer 到 consumer 傳輸消息。Pulsar 在 Topic 級別擁有一個 leader Broker,稱之為擁有 Topic 的所有權,針對該 Topic 所有的 R/W 都經(jīng)過該 Broker 完成。Topic 的 Ledger 和 Fragment 之間映射關系等元數(shù)據(jù)存儲在 Zookeeper 中,Pulsar Broker 需要實時跟蹤這些關系進行讀寫流程;
  • Ledger:即 Segment,Pulsar 底層數(shù)據(jù)以 Ledger 的形式存儲在 BookKeeper 上。是 Pulsar 刪除的最小單位;
  • Fragment :每個 Ledger 由若干 Fragment 組成。

3. RocketMQ

(1) 系統(tǒng)框架

RocketMQ 是阿里開源的消息中間件,它是一個開源的分布式消息傳遞和流式數(shù)據(jù)平臺。總共有四大部分:NameServer,Broker,Producer,Consumer。

NameServer 主要用來管理 brokers 以及路由信息。broker 服務器啟動時會注冊到 NameServer 上,并且兩者之間保持心跳監(jiān)測機制,以此來保證 NameServer 知道 broker 的存活狀態(tài)。而且,每一臺 NameServer 都存有全部的 broker 集群信息和生產(chǎn)者/消費者客戶端的請求信息。

Broker 負責管理消息存儲分發(fā),主從數(shù)據(jù)同步,為消息建立索引,提供消息查詢等能力。

(2) 基本術語

  • Topic:一個 Topic 可以有 0 個、1 個、多個生產(chǎn)者向其發(fā)送消息,一個生產(chǎn)者也可以同時向不同的 Topic 發(fā)送消息。一個 Topic 也可以被 0 個、1 個、多個消費者訂閱;
  • Tag:消息二級類型,可以為用戶提供額外的靈活度,一條消息可以沒有 tag;
  • Producer:消息生產(chǎn)者;
  • Broker:存儲消息,以 Topic 為緯度輕量級的隊列;轉(zhuǎn)發(fā)消息,單個 Broker 節(jié)點與所有的 NameServer 節(jié)點保持長連接及心跳,會定時將 Topic 信息注冊到 NameServer;
  • Consumer:消息消費者,負責接收并消費消息;
  • MessageQueue:消息的物理管理單位,一個 Topic 可以有多個 Queue,Queue 的引入實現(xiàn)了水平擴展的能力;
  • NameServer:負責對原數(shù)據(jù)的管理,包括 Topic 和路由信息,每個 NameServer 之間是沒有通信的;
  • Group:一個組可以訂閱多個 Topic,ProducerGroup、ConsumerGroup 分別是一類生產(chǎn)者和一類消費者;
  • Offset:通過 Offset 訪問存儲單元,RocketMQ 中所有消息都是持久化的,且存儲單元定長。Offset 為 Java Long 類型,理論上 100 年內(nèi)不會溢出,所以認為 Message Queue 是無限長的數(shù)據(jù),Offset 是下標;
  • Consumer:支持 PUSH 和 PULL 兩種消費模式,支持集群消費和廣播消費。

4. RabbitMQ

(1) 系統(tǒng)框架

RabbitMQ 基于 AMQP 協(xié)議來實現(xiàn),主要由 Exchange 和 Queue 兩部分組成,然后通過 RoutingKey 關聯(lián)起來,消息投遞到 Exchange 然后通過 Queue 接收。

(2) 基本術語

  • Broker:接收客戶端鏈接實體,實現(xiàn) AMQP 消息隊列和路由功能;
  • Virtual Host:是一個虛擬概念,權限控制的最小單位。一個 Virtual Host 里包含多個 Exchange 和 Queue;
  • Exchange:接收消息生產(chǎn)者的消息并將消息轉(zhuǎn)發(fā)到隊列。發(fā)送消息時根據(jù)不同 ExchangeType 的決定路由規(guī)則,ExchangeType 常用的有:direct、fanout 和 topic 三種;
  • Message Queue:消息隊列,存儲為被消費的消息;
  • Message:由 Header 和 Body 組成,Header 是生產(chǎn)者添加的各種屬性,包含 Message 是否持久化、哪個 MessageQueue 接收、優(yōu)先級。Body 是具體的消息內(nèi)容;
  • Binding:Binding 連接起了 Exchange 和 Message Queue。在服務器運行時,會生成一張路由表,這張路由表上記錄著 MessageQueue 的條件和 BindingKey 值。當 Exchange 收到消息后,會解析消息中的 Header 得到 BindingKey,并根據(jù)路由表和 ExchangeType 將消息發(fā)送到對應的 MessageQueue。最終的匹配模式是由 ExchangeType 決定;
  • Connection:在 Broker 和客戶端之間的 TCP 連接;
  • Channel:信道。Broker 和客戶端只有 tcp 連接是不能發(fā)送消息的,必須創(chuàng)建信道。AMQP 協(xié)議規(guī)定只有通過 Channel 才能執(zhí)行 AMQP 命令。一個 Connection 可以包含多個 Channel。之所以需要建立 Channel,是因為每個 TCP 連接都是很寶貴的。如果每個客戶端、每個線程都需要和 Broker 交互,都需要維護一個 TCP 連接的話是機器耗費資源的,一般建議共享 Connection。RabbitMQ 不建議客戶端線程之前共享 Channel,至少保證同一 Channel 發(fā)小消息是穿行的;
  • Command:AMQP 命令,客戶端通過 Command 來完成和 AMQP 服務器的交互。

5. NSQ

(1) 系統(tǒng)框架

NSQ 主要有 nsqlookup、nsqd 兩部分組成:

  • Nsqlookup 為守護進程,負責管理拓撲信息并提供發(fā)現(xiàn)服務??蛻舳送ㄟ^查詢 nsqlookupd 獲取指定 Topic 所在的 nsqd 節(jié)點。nsqd 往 nsqlookup 上注冊和廣播自身 topic 和 channel 的信息。
  • nsqd 在服務端運行的守護進程,負責接收,排隊,投遞消息給客戶端。

NSQ 由 3 個守護進程組成:

  • nsqd 是接收、隊列和傳送消息到客戶端的守護進程。
  • nsqlookupd 是管理的拓撲信息,并提供了最終一致發(fā)現(xiàn)服務的守護進程??蛻舳送ㄟ^查詢 nsqlookupd 獲取指定 Topic 所在的 nsqd 節(jié)點。nsqd 往 nsqlookup 上注冊和廣播自身 topic 和 channel 的信息。
  • nsqadmin是一個 Web UI 來實時監(jiān)控集群(和執(zhí)行各種管理任務)。

三、選型要點

1. 選型參考

  • 消息順序:發(fā)送到隊列的消息,消費時是否可以保證消費的順序;
  • 伸縮:當消息隊列性能有問題,比如消費太慢,是否可以快速支持擴容;當消費隊列過多,浪費系統(tǒng)資源,是否可以支持縮容。
  • 消息留存:消息消費成功后,是否還會繼續(xù)保留在消息隊列;
  • 容錯性:當一條消息消費失敗后,是否有一些機制,保證這條消息一定能成功,比如異步第三方退款消息,需要保證這條消息消費掉,才能確定給用戶退款成功,所以必須保證這條消息消費成功的準確性;
  • 消息可靠性:是否會存在丟消息的情況,比如有 A/B 兩個消息,最后只有 B 消息能消費,A 消息丟失;
  • 消息時序:主要包括“消息存活時間”和“延遲消息”;
  • 吞吐量:支持的最高并發(fā)數(shù);
  • 消息路由:根據(jù)路由規(guī)則,只訂閱匹配路由規(guī)則的消息,比如有 A/B 兩者規(guī)則的消息,消費者可以只訂閱 A 消息,B 消息不會消費。

2. 消息隊列對比

注:作為 LShift 和 CohesiveFT 于 2007 年成立的合資企業(yè),RabbitMQ 于 2010 年 4 月被 VMware 旗下的 SpringSource 收購。

四、功能剖析

1. 消費推拉模式

客戶端消費者獲取消息的方式,Kafka 和 RocketMQ 是通過長輪詢 Pull 的方式拉取消息,RabbitMQ、Pulsar、NSQ 都是通過 Push 的方式。

pull 類型的消息隊列更適合高吞吐量的場景,允許消費者自己進行流量控制,根據(jù)消費者實際的消費能力去獲取消息。而 push 類型的消息隊列,實時性更好,但需要有一套良好的流控策略(backpressure)當消費者消費能力不足時,減少 push 的消費數(shù)量,避免壓垮消費端。

2. 延遲隊列

消息延遲投遞,當消息產(chǎn)生送達消息隊列時,有些業(yè)務場景并不希望消費者立刻收到消息,而是等待特定時間后,消費者才能拿到這個消息進行消費。延遲隊列一般分為兩種,基于消息的延遲和基于隊列的延遲?;谙⒌难舆t指為每條消息設置不同的延遲時間,當隊列有新消息進入的時候根據(jù)延遲時間排序,當然這樣會對性能造成較大影響。另一種基于隊列的延遲指的是設置不同延遲級別的隊列,隊列中每個消息的延遲時間都是相同的,這樣免去了基于延遲時間排序?qū)π阅軒淼膿p耗,通過一定的掃描策略即可投遞超時的消息。

延遲消息的使用場景比如異常檢測重試,訂單超時取消等,例如:

  • 服務請求異常,需要將異常請求放到單獨的隊列,隔 5 分鐘后進行重試;
  • 用戶購買商品,但一直處于未支付狀態(tài),需要定期提醒用戶支付,超時則關閉訂單;
  • 面試或者會議預約,在面試或者會議開始前半小時,發(fā)送通知再次提醒。

Kafka 不支持延遲消息。Pulsar 支持秒級的延遲消息,所有延遲投遞的消息會被 Delayed Message Tracker 記錄對應的 index,consumer 在消費時,會先去 Delayed Message Tracker 檢查,是否有到期需要投遞的消息,如果有到期的消息,則從 Tracker 中拿出對應的 index,找到對應的消息進行消費,如果沒有到期的消息,則直接消費正常的消息。對于長時間的延遲消息,會被存儲在磁盤中,當快到延遲間隔時才被加載到內(nèi)存里。

RocketMQ 開源版本延遲消息臨時存儲在一個內(nèi)部主題中,不支持任意時間精度,支持特定的 level,例如定時 5s,10s,1m 等。

RabbitMQ 需要安裝一個 rabbitmq_delayed_message_exchange 插件。

NSQ 通過內(nèi)存中的優(yōu)先級隊列來保存延遲消息,支持秒級精度,最多 2 個小時延遲。

3. 死信隊列

由于某些原因消息無法被正確的投遞,為了確保消息不會被無故的丟棄,一般將其置于一個特殊角色的隊列,這個隊列一般稱之為死信隊列。與此對應的還有一個“回退隊列”的概念,試想如果消費者在消費時發(fā)生了異常,那么就不會對這一次消費進行確認(Ack), 進而發(fā)生回滾消息的操作之后消息始終會放在隊列的頂部,然后不斷被處理和回滾,導致隊列陷入死循環(huán)。為了解決這個問題,可以為每個隊列設置一個回退隊列,它和死信隊列都是為異常的處理提供的一種機制保障。實際情況下,回退隊列的角色可以由死信隊列和重試隊列來扮演。

  • Kafka 沒有死信隊列,通過 Offset 的方式記錄當前消費的偏移量。
  • Pulsar 有重試機制,當某些消息第一次被消費者消費后,沒有得到正常的回應,則會進入重試 Topic 中,當重試達到一定次數(shù)后,停止重試,投遞到死信 Topic 中。
  • RocketMQ 通過 DLQ 來記錄所有消費失敗的消息。
  • RabbitMQ 是利用類似于延遲隊列的形式實現(xiàn)死信隊列。

NSQ 沒有死信隊列。

4. 優(yōu)先級隊列

有一些業(yè)務場景下,我們需要優(yōu)先處理一些消息,比如銀行里面的金卡客戶、銀卡客戶優(yōu)先級高于普通客戶,他們的業(yè)務需要優(yōu)先處理。如下圖:

優(yōu)先級隊列不同于先進先出隊列,優(yōu)先級高的消息具備優(yōu)先被消費的特權,這樣可以為下游提供不同消息級別的保證。不過這個優(yōu)先級也是需要有一個前提的:如果消費者的消費速度大于生產(chǎn)者的速度,并且消息中間件服務器(一般簡單的稱之為 Broker)中沒有消息堆積,那么對于發(fā)送的消息設置優(yōu)先級也就沒有什么實質(zhì)性的意義了,因為生產(chǎn)者剛發(fā)送完一條消息就被消費者消費了,那么就相當于 Broker 中至多只有一條消息,對于單條消息來說優(yōu)先級是沒有什么意義的。

Kafka、RocketMQ、Pulsar、NSQ 不支持優(yōu)先級隊列,可以通過不同的隊列來實現(xiàn)消息優(yōu)先級。

RabbitMQ 支持優(yōu)先級消息。

5. 消息回溯

一般消息在消費完成之后就被處理了,之后再也不能消費到該條消息。消息回溯正好相反,是指消息在消費完成之后,還能消費到之前被消費掉的消息。對于消息而言,經(jīng)常面臨的問題是“消息丟失”,至于是真正由于消息中間件的缺陷丟失還是由于使用方的誤用而丟失一般很難追查,如果消息中間件本身具備消息回溯功能的話,可以通過回溯消費復現(xiàn)“丟失的”消息進而查出問題的源頭之所在。消息回溯的作用遠不止與此,比如還有索引恢復、本地緩存重建,有些業(yè)務補償方案也可以采用回溯的方式來實現(xiàn)。

  • Kafka 支持消息回溯,可以根據(jù)時間戳或指定 Offset,重置 Consumer 的 Offset 使其可以重復消費。
  • Pulsar 支持按時間對消息進行回溯。
  • RocketMQ 支持按時間回溯,實現(xiàn)的原理跟 Kafka 一致。
  • RabbitMQ 不支持回溯,消息一旦標記確認就會被標記刪除。
  • NSQ 一般消息是不可回溯的,但可以通過 nsq_to_file 工具,將消息寫入到文件,然后從文件里重放消息。

6. 消息持久化

流量削峰是消息中間件的一個非常重要的功能,而這個功能其實得益于其消息堆積能力。從某種意義上來講,如果一個消息中間件不具備消息堆積的能力,那么就不能把它看做是一個合格的消息中間件。消息堆積分內(nèi)存式堆積和磁盤式堆積。一般來說,磁盤的容量會比內(nèi)存的容量要大得多,對于磁盤式的堆積其堆積能力就是整個磁盤的大小。從另外一個角度講,消息堆積也為消息中間件提供了冗余存儲的功能。

Kafka 和 RocketMQ 直接將消息刷入磁盤文件中進行持久化,所有的消息都存儲在磁盤中。只要磁盤容量夠,可以做到無限消息堆積。

RabbitMQ 是典型的內(nèi)存式堆積,但這并非絕對,在某些條件觸發(fā)后會有換頁動作來將內(nèi)存中的消息換頁到磁盤(換頁動作會影響吞吐),或者直接使用惰性隊列來將消息直接持久化至磁盤中。

Pulsar 消息是存儲在 BookKeeper 存儲集群上,也是磁盤文件。

NSQ 通過 nsq_to_file 工具,將消息寫入到文件。

7. 消息確認機制

消息隊列需要管理消費進度,確認消費者是否成功處理消息,使用 push 的方式的消息隊列組件往往是對單條消息進行確認,對于未確認的消息,進行延遲重新投遞或者進入死信隊列。

Kafka通過 Offset 的方式確認消息:

  • 發(fā)送方確認機制 ack=0,不管消息是否成功寫入分區(qū) ack=1,消息成功寫入首領分區(qū)后,返回成功 ack=all,消息成功寫入所有分區(qū)后,返回成功。
  • 接收方確認機制 自動或者手動提交分區(qū)偏移量,早期版本的 kafka 偏移量是提交給 Zookeeper 的,這樣使得 zookeeper 的壓力比較大,更新版本的 kafka 的偏移量是提交給 kafka 服務器的,不再依賴于 zookeeper 群組,集群的性能更加穩(wěn)定。

RocketMQ與 Kafka 類似也會提交 Offset,區(qū)別在于消費者對于消費失敗的消息,可以標記為消息消費失敗,Broker 會重試投遞,如果累計多次消費失敗,會投遞到死信隊列。

RabbitMQ和 NSQ 類似,消費者確認單條消息,否則會重新放回隊列中等待下次投遞:

  • 發(fā)送方確認機制,消息被投遞到所有匹配的隊列后,返回成功。如果消息和隊列是可持久化的,那么在寫入磁盤后,返回成功。支持批量確認和異步確認。
  • 接收方確認機制,設置 autoAck 為 false,需要顯式確認,設置 autoAck 為 true,自動確認。當 autoAck 為 false 的時候,rabbitmq 隊列會分成兩部分,一部分是等待投遞給 consumer 的消息,一部分是已經(jīng)投遞但是沒收到確認的消息。如果一直沒有收到確認信號,并且 consumer 已經(jīng)斷開連接,rabbitmq 會安排這個消息重新進入隊列,投遞給原來的消費者或者下一個消費者。未確認的消息不會有過期時間,如果一直沒有確認,并且沒有斷開連接,rabbitmq 會一直等待,rabbitmq 允許一條消息處理的時間可以很久很久。

Pulsar使用專門的 Cursor 管理。累積確認和 Kafka 效果一樣;提供單條或選擇性確認。

8. 消息 TTL

消息 TTL 表示一條消息的生存時間,如果消息發(fā)出來后,在 TTL 的時間內(nèi)沒有消費者進行消費,消息隊列會將消息刪除或者放入死信隊列中。

Kafka 根據(jù)設置的保留期來刪除消息。有可能消息沒被消費,過期后被刪除。不支持 TTL。

Pulsar 支持 TTL,如果消息未在配置的 TTL 時間段內(nèi)被任何消費者使用,則消息將自動標記為已確認。消息保留期與消息 TTL 之間的區(qū)別在于:消息保留期作用于標記為已確認并設置為已刪除的消息,而 TTL 作用于未 ack 的消息。上面的圖例中說明了 Pulsar 中的 TTL。例如,如果訂閱 B 沒有活動消費者,則在配置的 TTL 時間段過后,消息 M10 將自動標記為已確認,即使沒有消費者實際讀取該消息。

RocketMQ 提及到消息 TTL 的資料比較少,不過看接口似乎是支持的。

RabbitMQ 有兩種方式,一個是聲明隊列的時候在隊列屬性中設置,整個隊列中的消息都有相同的有效期。還可以發(fā)送消息的時候給消息設置屬性,可以位每條消息都設置不同的 TTL。

NSQ 似乎還沒支持,有一個 Feature Request 的 Issue 處于 Open 狀態(tài)。

9. 多租戶隔離

多租戶是指通過一個軟件實例為多個租戶提供服務的能力。租戶是指對系統(tǒng)有著相同“視圖”的一組用戶。不支持多租戶的系統(tǒng)里邊,往往要為不同用戶或者不同集群創(chuàng)建多個消息隊列實例實現(xiàn)物理隔離,這樣會帶來較高的運維成本。作為一種企業(yè)級的消息系統(tǒng),Pulsar 的多租戶能力按照設計可滿足下列需求:

  • 確保嚴苛的 SLA 可順利滿足。
  • 保證不同租戶之間的隔離。
  • 針對資源利用率強制實施配額。
  • 提供每租戶和系統(tǒng)級的安全性。
  • 確保低成本運維以及盡可能簡單的管理。

Pulsar 通過下列方式滿足了上述需求:

  • 通過為每個租戶進行身份驗證、授權和 ACL(訪問控制列表)獲得所需安全性。
  • 為每個租戶強制實施存儲配額。
  • 以策略的方式定義所有隔離機制,策略可在運行過程中更改,借此降低運維成本并簡化管理工作。

10. 消息順序性

消息順序性是指保證消息有序。消息消費順序跟生產(chǎn)的順序保持一致。

Kafka 保證了分區(qū)內(nèi)的消息有序。

Pulsar 支持兩種消費模式,獨占訂閱的流模式只保證了消息的順序性,共享訂閱隊列模型不保證有序性。

RocketMQ 需要用到鎖來保證一個隊列同時只有一個消費者線程進行消費,保證消息的有序性。

RabbitMQ 順序性的條件比較苛刻,需要單線程發(fā)送、單線程消費,并且不采用延遲隊列、優(yōu)先級隊列等高級功能。

NSQ 是利用了 golang 自身的 case/select 實現(xiàn)的消息分發(fā),本身不提供有序性保障,不能夠把特性消息和消費者對應起來,無法實現(xiàn)消息的有序性。

11. 消息查詢

在實際開發(fā)中,經(jīng)常要查看 MQ 中消息的內(nèi)容,比如通過某個 MessageKey/ID,查詢到 MQ 的具體消息。或者是對消息進行鏈路追蹤,知道消息從哪里來,發(fā)送到哪里去,進而快速對問題進行排查定位。

Kafka 存儲層是以分布式提交日志的形式實現(xiàn),每次寫操作都順序追加到日志的末尾。讀也是順序讀。不支持檢索功能。

Pulsar 可以通過消息 ID,查詢到具體某條消息的消息內(nèi)容、消息參數(shù)和消息軌跡。

RocketMQ 支持按 Message Key、Unique Key、Message Id 對消息進行查詢。

RabbitMQ 使用基于索引的存儲系統(tǒng)。這些將數(shù)據(jù)保存在樹結構中,以提供確認單個消息所需的快速訪問。由于 RabbitMQ 的消息在確認后會被刪除,因此只能查詢未確認的消息。

NSQ 自身不支持消息持久化和消息檢索,不過可以使用 nsq_to_http 等工具將消息寫入可支持索引的存儲里。

12. 消費模式

Kafka 有兩種消費模式,最終都會保證一個分區(qū)只有 1 個消費者在消費:

  • subscribe 方式:當主題分區(qū)數(shù)量變化或者 consumer 數(shù)量變化時,會進行 rebalance;注冊 rebalance 監(jiān)聽器,可以手動管理 offset 不注冊監(jiān)聽器,kafka 自動管理。
  • assign 方式:手動將 consumer 與 partition 進行對應,kafka 不會進行 rebanlance。

Pulsar 有以下四種消費模式,其中獨占模式和災備模式跟 Kafka 類似,為流模型,每個分區(qū)只有 1 個消費者消費,能保證消息有序性。共享模式和 Key 共享模式為隊列模型,多個消費者能提高消費速度,但不能保證有序性。

  • Exclusive 獨占模式(默認模式):一個 Subscription 只能與一個 Consumer 關聯(lián),只有這個 Consumer 可以接收到 Topic 的全部消息,如果該 Consumer 出現(xiàn)故障了就會停止消費。
  • 災備模式(Failover):當存在多個 consumer 時,將會按字典順序排序,第一個 consumer 被初始化為唯一接受消息的消費者。當?shù)谝粋€ consumer 斷開時,所有的消息(未被確認和后續(xù)進入的)將會被分發(fā)給隊列中的下一個 consumer。
  • 共享模式(Shared):消息通過 round robin 輪詢機制(也可以自定義)分發(fā)給不同的消費者,并且每個消息僅會被分發(fā)給一個消費者。當消費者斷開連接,所有被發(fā)送給他,但沒有被確認的消息將被重新安排,分發(fā)給其它存活的消費者。
  • KEY 共享模式(Key_Shared):當存在多個 consumer 時,將根據(jù)消息的 key 進行分發(fā),key 相同的消息只會被分發(fā)到同一個消費者。

RocketMQ 有兩種消費模式,BROADCASTING 廣播模式,CLUSTERING 集群模式。

  • 廣播消費指的是:一條消息被多個 consumer 消費,即使這些 consumer 屬于同一個 ConsumerGroup,消息也會被 ConsumerGroup 中的每個 Consumer 都消費一次,廣播消費中 ConsumerGroup 概念可以認為在消息劃分方面無意義。
  • 集群消費模式:一個 ConsumerGroup 中的 Consumer 實例平均分攤消費消息。例如某個 Topic 有 9 條消息,其中一個 ConsumerGroup 有 3 個實例(可能是 3 個進程,或者 3 臺機器),那么每個實例只消費其中部分,消費完的消息不能被其他實例消費。

RabbitMQ 和 NSQ 的消費比較類似,都是跟 Pulsar 共享模式類似的,隊列的形式,增加一個消費者組里的消費者數(shù)量能提高消費速度。

13. 消息可靠性

消息丟失是使用消息中間件時所不得不面對的一個同點,其背后消息可靠性也是衡量消息中間件好壞的一個關鍵因素。尤其是在金融支付領域,消息可靠性尤為重要。比如當服務出現(xiàn)故障時,一些對于生產(chǎn)者來說已經(jīng)生產(chǎn)成功的消息,是否會在高可用切換時丟失。同步刷盤是增強一個組件可靠性的有效方式,消息中間件也不例外,Kafka 和 RabbitMQ 都可以支持同步刷盤,但絕大多數(shù)情景下,一個組件的可靠性不應該由同步刷盤這種極其損耗性能的操作來保障,而是采用多副本的機制來保證。

Kafka 可以通過配置 request.required.acks 參數(shù)設置可靠級別,表示一條消息有多少個副本確認接收成功后,才被任務發(fā)送成功。

  • request.required.acks=-1 (全量同步確認,強可靠性保證)
  • request.required.acks=1(leader 確認收到,默認)
  • request.required.acks=0 (不確認,但是吞吐量大)

Pulsar 有跟 Kafka 類似的概念,叫 Ack Quorum Size(Qa),Qa 是每次寫請求發(fā)送完畢后需要回復確認的 Bookie 的個數(shù),其數(shù)值越大則需要確認寫成功的時間越長,其值上限是副本數(shù) Qw。為了一致性,Qa 應該是:(Qw+1)/2 或者更,即為了確保數(shù)據(jù)安全性,Qa 下限是  (Qw+1)/2。

RocketMQ 與 Kafka 類似。

  • RabbitMQ 是主從架構,通過鏡像環(huán)形隊列實現(xiàn)多副本及強一致性語義的。多副本可以保證在 master 節(jié)點宕機異常之后可以提升 slave 作為新的 master 而繼續(xù)提供服務來保障可用性。
  • NSQ 會通過 go-diskqueue 組件將消息落盤到本地文件中,通過 mem-queue-size 參數(shù)控制內(nèi)存中隊列大小,如果 mem-queue-size=0 每條消息都會存儲到磁盤里,不用擔心節(jié)點重啟引起的消息丟失。但由于是存儲在本地磁盤中,如果節(jié)點離線,堆積在節(jié)點磁盤里的消息會丟失。

14. 負載均衡

Kafka:支持負載均衡。一個 broker 通常就是一臺服務器節(jié)點。對于同一個 Topic 的不同分區(qū),Kafka 會盡力將這些分區(qū)分布到不同的 Broker 服務器上,zookeeper 保存了 broker、主題和分區(qū)的元數(shù)據(jù)信息。分區(qū)首領會處理來自客戶端的生產(chǎn)請求,kafka 分區(qū)首領會被分配到不同的 broker 服務器上,讓不同的 broker 服務器共同分擔任務。

每一個 broker 都緩存了元數(shù)據(jù)信息,客戶端可以從任意一個 broker 獲取元數(shù)據(jù)信息并緩存起來,根據(jù)元數(shù)據(jù)信息知道要往哪里發(fā)送請求。

kafka 的消費者組訂閱同一個 topic,會盡可能地使得每一個消費者分配到相同數(shù)量的分區(qū),分攤負載。

當消費者加入或者退出消費者組的時候,還會觸發(fā)再均衡,為每一個消費者重新分配分區(qū),分攤負載。

kafka 的負載均衡大部分是自動完成的,分區(qū)的創(chuàng)建也是 kafka 完成的,隱藏了很多細節(jié),避免了繁瑣的配置和人為疏忽造成的負載問題。

發(fā)送端由 topic 和 key 來決定消息發(fā)往哪個分區(qū),如果 key 為 null,那么會使用輪詢算法將消息均衡地發(fā)送到同一個 topic 的不同分區(qū)中。如果 key 不為 null,那么會根據(jù) key 的 hashcode 取模計算出要發(fā)往的分區(qū)。

rabbitmq:對負載均衡的支持不好。消息被投遞到哪個隊列是由交換器和 key 決定的,交換器、路由鍵、隊列都需要手動創(chuàng)建。

rabbitmq 客戶端發(fā)送消息要和 broker 建立連接,需要事先知道 broker 上有哪些交換器,有哪些隊列。通常要聲明要發(fā)送的目標隊列,如果沒有目標隊列,會在 broker 上創(chuàng)建一個隊列,如果有,就什么都不處理,接著往這個隊列發(fā)送消息。假設大部分繁重任務的隊列都創(chuàng)建在同一個 broker 上,那么這個 broker 的負載就會過大。(可以在上線前預先創(chuàng)建隊列,無需聲明要發(fā)送的隊列,但是發(fā)送時不會嘗試創(chuàng)建隊列,可能出現(xiàn)找不到隊列的問題,rabbitmq 的備份交換器會把找不到隊列的消息保存到一個專門的隊列中,以便以后查詢使用)

使用鏡像隊列機制建立 rabbitmq 集群可以解決這個問題,形成 master-slave 的架構,master 節(jié)點會均勻分布在不同的服務器上,讓每一臺服務器分攤負載。slave 節(jié)點只是負責轉(zhuǎn)發(fā),在 master 失效時會選擇加入時間最長的 slave 成為 master。

當新節(jié)點加入鏡像隊列的時候,隊列中的消息不會同步到新的 slave 中,除非調(diào)用同步命令,但是調(diào)用命令后,隊列會阻塞,不能在生產(chǎn)環(huán)境中調(diào)用同步命令。

當 rabbitmq 隊列擁有多個消費者的時候,隊列收到的消息將以輪詢的分發(fā)方式發(fā)送給消費者。每條消息只會發(fā)送給訂閱列表里的一個消費者,不會重復。

這種方式非常適合擴展,而且是專門為并發(fā)程序設計的。

如果某些消費者的任務比較繁重,那么可以設置 basicQos 限制信道上消費者能保持的最大未確認消息的數(shù)量,在達到上限時,rabbitmq 不再向這個消費者發(fā)送任何消息。

對于 rabbitmq 而言,客戶端與集群建立的 TCP 連接不是與集群中所有的節(jié)點建立連接,而是挑選其中一個節(jié)點建立連接。

但是 rabbitmq 集群可以借助 HAProxy、LVS 技術,或者在客戶端使用算法實現(xiàn)負載均衡,引入負載均衡之后,各個客戶端的連接可以分攤到集群的各個節(jié)點之中。

客戶端均衡算法:

  • 輪詢法。按順序返回下一個服務器的連接地址。
  • 加權輪詢法。給配置高、負載低的機器配置更高的權重,讓其處理更多的請求;而配置低、負載高的機器,給其分配較低的權重,降低其系統(tǒng)負載。
  • 隨機法。隨機選取一個服務器的連接地址。
  • 加權隨機法。按照概率隨機選取連接地址。
  • 源地址哈希法。通過哈希函數(shù)計算得到的一個數(shù)值,用該數(shù)值對服務器列表的大小進行取模運算。
  • 最小連接數(shù)法。動態(tài)選擇當前連接數(shù)最少的一臺服務器的連接地址。

zeromq:去中心化,不支持負載均衡。本身只是一個多線程網(wǎng)絡庫。

rocketmq:支持負載均衡。一個 broker 通常是一個服務器節(jié)點,broker 分為 master 和 slave,master 和 slave 存儲的數(shù)據(jù)一樣,slave 從 master 同步數(shù)據(jù)。

nameserver 與每個集群成員保持心跳,保存著 Topic-Broker 路由信息,同一個 topic 的隊列會分布在不同的服務器上。

發(fā)送消息通過輪詢隊列的方式發(fā)送,每個隊列接收平均的消息量。發(fā)送消息指定 topic、tags、keys,無法指定投遞到哪個隊列(沒有意義,集群消費和廣播消費跟消息存放在哪個隊列沒有關系)。

tags 選填,類似于 Gmail 為每封郵件設置的標簽,方便服務器過濾使用。目前只支 持每個消息設置一個 tag,所以也可以類比為 Notify 的 MessageType 概念。

keys 選填,代表這條消息的業(yè)務關鍵詞,服務器會根據(jù) keys 創(chuàng)建哈希索引,設置后, 可以在 Console 系統(tǒng)根據(jù) Topic、Keys 來查詢消息,由于是哈希索引,請盡可能 保證 key 唯一,例如訂單號,商品 Id 等。

rocketmq 的負載均衡策略規(guī)定:Consumer 數(shù)量應該小于等于 Queue 數(shù)量,如果 Consumer 超過 Queue 數(shù)量,那么多余的 Consumer 將不能消費消息。這一點和 kafka 是一致的,rocketmq 會盡可能地為每一個 Consumer 分配相同數(shù)量的隊列,分攤負載。

activemq:支持負載均衡??梢曰?zookeeper 實現(xiàn)負載均衡。

15. 集群方式

Kafka:天然的‘Leader-Slave’無狀態(tài)集群,每臺服務器既是 Master 也是 Slave。分區(qū)首領均勻地分布在不同的 kafka 服務器上,分區(qū)副本也均勻地分布在不同的 kafka 服務器上,所以每一臺 kafka 服務器既含有分區(qū)首領,同時又含有分區(qū)副本,每一臺 kafka 服務器是某一臺 kafka 服務器的 Slave,同時也是某一臺 kafka 服務器的 leader。

kafka 的集群依賴于 zookeeper,zookeeper 支持熱擴展,所有的 broker、消費者、分區(qū)都可以動態(tài)加入移除,而無需關閉服務,與不依靠 zookeeper 集群的 mq 相比,這是最大的優(yōu)勢。

rabbitmq:支持簡單集群,'復制'模式,對高級集群模式支持不好。

rabbitmq 的每一個節(jié)點,不管是單一節(jié)點系統(tǒng)或者是集群中的一部分,要么是內(nèi)存節(jié)點,要么是磁盤節(jié)點,集群中至少要有一個是磁盤節(jié)點。

在 rabbitmq 集群中創(chuàng)建隊列,集群只會在單個節(jié)點創(chuàng)建隊列進程和完整的隊列信息(元數(shù)據(jù)、狀態(tài)、內(nèi)容),而不是在所有節(jié)點上創(chuàng)建。

引入鏡像隊列,可以避免單點故障,確保服務的可用性,但是需要人為地為某些重要的隊列配置鏡像。

  • zeromq:去中心化,不支持集群。
  • rocketmq:常用 多對'Master-Slave' 模式,開源版本需手動切換 Slave 變成 Master

Name Server 是一個幾乎無狀態(tài)節(jié)點,可集群部署,節(jié)點之間無任何信息同步。

Broker 部署相對復雜,Broker 分為 Master 與 Slave,一個 Master 可以對應多個 Slave,但是一個 Slave 只能對應一個 Master,Master 與 Slave 的對應關系通過指定相同的 BrokerName,不同的 BrokerId 來定義,BrokerId 為 0 表示 Master,非 0 表示 Slave。Master 也可以部署多個。每個 Broker 與 Name Server 集群中的所有節(jié)點建立長連接,定時注冊 Topic 信息到所有 Name Server。

Producer 與 Name Server 集群中的其中一個節(jié)點(隨機選擇)建立長連接,定期從 Name Server 取 Topic 路由信息,并向提供 Topic 服務的 Master 建立長連接,且定時向 Master 發(fā)送心跳。Producer 完全無狀態(tài),可集群部署。

Consumer 與 Name Server 集群中的其中一個節(jié)點(隨機選擇)建立長連接,定期從 Name Server 取 Topic 路由信息,并向提供 Topic 服務的 Master、Slave 建立長連接,且定時向 Master、Slave 發(fā)送心跳。Consumer 既可以從 Master 訂閱消息,也可以從 Slave 訂閱消息,訂閱規(guī)則由 Broker 配置決定。

客戶端先找到 NameServer, 然后通過 NameServer 再找到 Broker。

一個 topic 有多個隊列,這些隊列會均勻地分布在不同的 broker 服務器上。rocketmq 隊列的概念和 kafka 的分區(qū)概念是基本一致的,kafka 同一個 topic 的分區(qū)盡可能地分布在不同的 broker 上,分區(qū)副本也會分布在不同的 broker 上。

rocketmq 集群的 slave 會從 master 拉取數(shù)據(jù)備份,master 分布在不同的 broker 上。

activemq:支持簡單集群模式,比如'主-備',對高級集群模式支持不好。

五、性能

Kafka 的公司 Confluent 在 2020 年 8 月發(fā)了一篇 Benchmarking Apache Kafka, Apache Pulsar, and RabbitMQ: Which is the Fastest?文章,并且提出了一個開源的 MQ Benchmark 框架 THE OPENMESSAGING BENCHMARK FRAMEWORK,在這個文檔里,對比了 Kafka、Pulsar、RabbitMQ 的吞吐量、端到端延遲等性能數(shù)據(jù)。最后得出結論 Kafka 相對來說性能最好。

但接下來 StreamNative 在 2020 年 12 月指出了 Confluence 的基準測試的一些問題,并對 Pulsar 進行了參數(shù)調(diào)優(yōu)之后重新執(zhí)行了一遍結果,測試報告展示 Pulsar 能達到跟 Kafka 同樣的吞吐量,在某些場景下,Pulsar 的延遲顯著低于 Kafka。

而且在性能測試上,有很多客戶端、服務端參數(shù)設置、機器性能配置等影響,比如消息可靠性級別,壓縮算法等,很難做到“完全”控制變量公平的測試。而且 OpenMessaging Benchmark 的開源 Github 的 Readme 上也提到了。

不過有幾個關注點:

  • RabbitMQ 的延遲是微秒級的,其他組件的延遲都是毫秒級,RabbitMQ 應該是 MQ 組件里相對來說較低的。
  • Kafka 單實例在主題/分區(qū)數(shù)比較多的情況下,性能會明顯降低。
  • kafka 是一個分區(qū)一個文件,當 topic 過多,分區(qū)的總量也會增加,kafka 中存在過多的文件,當對消息刷盤時,就會出現(xiàn)文件競爭磁盤,出現(xiàn)性能的下降。
  • 還有 Kafka 每個消費者加入或退出都會進行重平衡,當分區(qū)數(shù)比較多時重平衡可能耗時較久,在重平衡的階段消費者是不能消費消息的。
  • 而 Pulsar 由于存儲與計算分離的架構,使得它可以支持百萬級別的 Topic 數(shù)量。

Pulsar 和 Kafka 都被廣泛用于各個企業(yè),也各有優(yōu)勢,都能通過數(shù)量基本相同的硬件處理大流量。部分用戶誤以為 Pulsar 使用了很多組件,因此需要很多服務器來實現(xiàn)與 Kafka 相匹敵的性能。這種想法適用于一些特定硬件配置,但在多數(shù)資源配置相同的情況中,Pulsar 的優(yōu)勢更加明顯,可以用相同的資源實現(xiàn)更好的性能。舉例來說,Splunk 最近分享了他們選擇 Pulsar 放棄 Kafka 的原因,其中提到“由于分層架構,Pulsar 幫助他們將成本降低了 30%-50%,延遲降低了 80%-98%,運營成本降低了 33%-50%”。Splunk 團隊發(fā)現(xiàn) Pulsar 可以更好地利用磁盤 IO,降低 CPU 利用率,同時更好地控制內(nèi)存。

在分布式系統(tǒng)里,單機性能指標雖然也很重要,分布式系統(tǒng)整體的性能以及靈活擴縮容、高可用容災等能力也會是評估的一個重要參考。MQ 中間件具體的性能指標,也需要我們自己根據(jù)實際的情況,根據(jù)實際購買的集群配置和客戶端參數(shù),進行壓測調(diào)優(yōu)來評估。

六、運維

在使用過程中難免會出現(xiàn)各種異常情況,比如宕機、網(wǎng)絡抖動、擴容等。消息隊列具備異地容災,高可用架構等能力,能避免一些計算節(jié)點、網(wǎng)絡等基礎設施不可用導致的故障。

1. 高可用

Kafka 通過分區(qū)多副本的方式解決高可用問題。

Pulsar 的計算集群 Broker 是無狀態(tài)的,可以靈活擴縮容,存儲節(jié)點 Bookie 上通過消息分區(qū)分片副本的方式,每個分片都有一個或多個副本,保證在某一個 Bookie 掛掉后,有其他分片可以提供服務。

RocketMQ 和 RabbitMQ 都是主從架構,當 master 掛掉后,由原來的從節(jié)點繼續(xù)提供服務。備機提供消費服務,保證消息不丟,但不提供寫服務。

NSQ 是類似分布式架構,不過由于消息存儲是在節(jié)點本地磁盤上,如果一個節(jié)點離線,堆積在節(jié)點磁盤上的消息會丟失。

2. 跨地域容災

Pulsar 原生支持跨地域容災功能,在這個圖中,每當 P1、P2 和 P3 的生產(chǎn)者分別向 Cluster-A、Cluster-B 和 Cluster-C 中的 T1 topic 發(fā)送消息時,這些消息很快在不同的集群中復制。一旦消息完成復制,消費者 C1 和 C2 會從各自的集群消費到這個消息。

在這個跨地域容災的設計支撐下,其一,我們可以比較容易的將服務分散到多個機房;其二,可以應對機房級別的故障,即在一個機房不可用的情況下,服務可以轉(zhuǎn)接到其它的機房來繼續(xù)對外提供服務。

一句話概括,Pulsar 的跨地域復制,其實就是在一個本地集群中創(chuàng)建一個 Producer,把異地的集群作為這個 Producer 的發(fā)送地址,將本地集群的消息發(fā)送過去,并且在本地維護一個 Cusor 來保證消息可靠性和冪等性。

3. 集群擴容

當消息量突然上漲,消息隊列集群到達瓶頸的時候,需要對集群進行擴容,擴容一般分為水平擴容和垂直擴容兩種方式,水平擴容指的是往往集群中增加節(jié)點,垂直擴容指的是把集群中部分節(jié)點的配置調(diào)高,增加處理能力。

Kafka 集群由于主題分區(qū)是物理存儲在 Broker 節(jié)點上的,新加入的集群的節(jié)點并沒有存儲分區(qū)分片,也就無法提供馬上提供服務,因此需要把一些 Topic 的分區(qū)分配到新加入的節(jié)點里,這里會涉及到一個分區(qū)數(shù)據(jù)均衡的過程,將某些分區(qū)的數(shù)據(jù)復制到新節(jié)點上。這個過程跟分區(qū)當前堆積的數(shù)據(jù)量、Broker 性能有關,有可能會出現(xiàn)由于源 Broker 負載過高,堆積數(shù)據(jù)過大,導致數(shù)據(jù)均衡的時間變長。

Pulsar 的無限分布式日志以分片為中心,借助擴展日志存儲(通過 Apache BookKeeper)實現(xiàn),內(nèi)置分層存儲支持,因此分片可以均勻地分布在存儲節(jié)點上。由于與任一給定 topic 相關的數(shù)據(jù)都不會與特定存儲節(jié)點進行捆綁,因此很容易替換存儲節(jié)點或縮擴容。另外,集群中最小或最慢的節(jié)點也不會成為存儲或帶寬的短板。

RocketMQ 新節(jié)點直接加入到集群中,在新的 broker 創(chuàng)建新 topic 并且分配隊列,或者在已有 topic 基礎上分配隊列。與 Kafka 的區(qū)別是,Kafka 的分區(qū)是在不同的物理機器上,而 Rocketmq 是邏輯分區(qū),用的隊列形式,因此不存在出現(xiàn)數(shù)據(jù)不均衡的情況。

RabbitMQ 和 NSQ 類似,由于不涉及過多的消息持久化,直接往集群中增加節(jié)點。

4. 使用成本

Kafka/Pulsar/RocketMQ/RabbitMQ 在騰訊云上都上線了標準產(chǎn)品,可以直接購買創(chuàng)建實例(產(chǎn)品選型),能大大降低部署運維成本。而 NSQ 目前暫時還沒有上線,需要自行部署。

七、常見問題 & 使用場景

1. Kafka

日志收集:大量的日志消息先寫入 kafka,數(shù)據(jù)服務通過消費 kafka 消息將數(shù)據(jù)落地;

2. RocketMQ

為金融互聯(lián)網(wǎng)領域而生,對于可靠性要求很高的場景。

3. 普通消息

消息隊列最基礎的功能就是生產(chǎn)者發(fā)送消息、Broker 保存消息,消費者來消費消息,以此實現(xiàn)系統(tǒng)解耦、削峰填谷的作用。

普通消息是消息隊列必備的消息類型,也是系統(tǒng)使用場景最多的一種消息。

4. 順序消息

順序消息是指生產(chǎn)者發(fā)送消息的順序和消費者消費消息的順序是一致的。比如在一個電商場景,同一個用戶提交訂單、訂單支付、訂單出庫,這三個消息消費者需要按照順序來進行消費。如下圖:

順序消息的實現(xiàn)并不容易,原因如下:

  • 生產(chǎn)者集群中,有多個生產(chǎn)者發(fā)送消息,網(wǎng)絡延遲不一樣,很難保證發(fā)送到 Broker 的消息落盤順序是一致的;
  • 如果 Broker 有多個分區(qū)或隊列,生產(chǎn)者發(fā)送的消息會進入多個分區(qū),也無法保證順序消費;
  • 如果有多個消費者來異步消費同一個分區(qū),很難保證消費順序跟生產(chǎn)者發(fā)送順序一致。

要保證消息有序,需要滿足兩個條件:

  • 同一個生產(chǎn)者必須同步發(fā)送消息到同一個分區(qū);
  • 一個分區(qū)只能給同一個消費者消費。

如下圖:

上面第二個條件是比較容易實現(xiàn)的,一個分區(qū)綁定一個消費者就可以,主要是第一個條件。

在主流消息隊列的實現(xiàn)中,Kafka 和 Pulsar 的實現(xiàn)方式類似,生產(chǎn)者給消息賦值一個 key,對 key 做 Hash 運算來指定消息發(fā)送到哪一個分區(qū)。比如上面電商的例子,對同一個用戶的一筆訂單,提交訂單、訂單支付、訂單出庫這三個消息賦值同一個 key,就可以把這三條消息發(fā)送到同一個分區(qū)。

對于 RocketMQ,生產(chǎn)者在發(fā)送消息的時候,可以通過 MessageQueueSelector 指定把消息投遞到那個 MessageQueue,如下圖:

5. 延時消息

或者也叫定時消息,是指消息發(fā)送后不會立即被消費,而是指定一個時間,到時間后再消費。經(jīng)典的場景比如電商購物時,30 分鐘未支付訂單,讓訂單自動失效。

(1) RocketMQ 實現(xiàn)

RocketMQ 定義了 18 個延時級別,每個延時級別對應一個延時時間。下面如果延遲級別是 3,則消息會延遲 10s 才會拉取。

RocketMQ 的延時消息如下圖:

生產(chǎn)者把消費發(fā)送到 Broker 后,Broker 首先把消息保存到 SCHEDULE_TOPIC_XXXX 這個 Topic,然后調(diào)度任務會判斷是否到期,如果到期,會把消息從 SCHEDULE_TOPIC_XXXX 取出投遞到原始的 queue,這樣消費者就可以消費到了。

RocketMQ 的延時消息只支持最大兩個小時的延時,不過 RocketMQ5.0 基于時間輪算法實現(xiàn)了定時消息,解決了這個問題。

(2) Pulsar 實現(xiàn)

Pulsar 的實現(xiàn)如下圖:

Pulsar 的延時消息首先會寫入一個 Delayed Message Tracker 的數(shù)據(jù)結構中,Delayed Message Tracker 根據(jù)延時時間構建 delayed index 優(yōu)先級隊列。消費者拉取消息時,首先去 Delayed Message Tracker 檢查是否有到期的消息。如果有則直接拉取進行消費。

(3) RabbitMQ 實現(xiàn)

RabbitMQ 的實現(xiàn)方式有兩種,一種是投遞到普通隊列都不消費,等消息過期后被投遞到死信隊列,消費者消費死信隊列。如下圖:

第二種方式是生產(chǎn)者發(fā)送消息時,先發(fā)送到本地 Mnesia 數(shù)據(jù)庫,消息到期后定時器再將消息投遞到 broker。

(4) Kafka 實現(xiàn)

Kafka 本身并沒有延時隊列,不過可以通過生產(chǎn)者攔截器來實現(xiàn)消息延時發(fā)送,也可以定義延時 Topic,利用類似 RocketMQ 的方案來實現(xiàn)延時消息。

6. 事務消息

事務消息是指生產(chǎn)消息和消費消息滿足事務的特性。

RabbitMQ 和 Kafka 的事務消息都是只支持生產(chǎn)消息的事務特性,即一批消息要不全部發(fā)送成功,要不全部發(fā)送失敗。

RabbitMQ 通過 Channel 來開啟事務消息,代碼如下:

ConnectionFactory factory=new ConnectionFactory();
connection=factory.newConnection();
Channel channel=connection.createChannel();
//開啟事務
channel.txSelect();
channel.basicPublish("directTransactionExchange","transactionRoutingKey",null,message.getBytes("utf-8"));
//提交事務 或者 channel.txRollback()回滾事務
channel.txCommit();

Kafka 可以給多個生產(chǎn)者設置同一個事務 ID ,從而把多個 Topic 、多個 Partition 放在一個事務中,實現(xiàn)原子性寫入。

Pulsar 的事務消息對于事務語義的定義是:允許事件流應用將消費、處理、生產(chǎn)消息整個過程定義為一個原子操作。可見,Pulsar 的事務消息可以覆蓋消息流整個過程。

RocketMQ 的事務消息是通過 half 消息來實現(xiàn)的。以電商購物場景來看,賬戶服務扣減賬戶金額后,發(fā)送消息給 Broker,庫存服務來消費這條消息進行扣減庫存。如下圖:

可見,RocketMQ 只能保證生產(chǎn)者發(fā)送消息和本地事務的原子性,并不能保證消費消息的原子性。

7. 軌跡消息

軌跡消息主要用于跟蹤消息的生命周期,當消息丟失時可以很方便地找出原因。

軌跡消息也跟普通消息一樣,也需要存儲和查詢,也會占用消息隊列的資源,所以選擇軌跡消息要考慮下面幾點:

  • 消息生命周期的關鍵節(jié)點一定要記錄;
  • 不能影響正常消息的發(fā)送和消費性能;
  • 不能影響 Broker 的消息存儲性能;
  • 要考慮消息查詢維度和性能。

RabbitMQ Broker 實現(xiàn)了軌跡消息的功能,打開 Trace 開關,就可以把軌跡消息發(fā)送到 amq.rabbitmq.trace 這個 exchange,但是要考慮軌跡消息會不會給 Broker 造成 壓力進而導致消息積壓。RabbitMQ 的生產(chǎn)者和消費者都沒有實現(xiàn)軌跡消息,需要開發(fā)者自己來實現(xiàn)。

RocketMQ 生產(chǎn)者、Broker 和消費者都實現(xiàn)了軌跡消息,不過默認是關閉的,需要手工開啟。

使用軌跡消息,需要考慮記錄哪些節(jié)點、存儲介質(zhì)、性能、查詢方式等問題。

8. Kafka 是否會消息丟失?

(1)只對“已提交”的消息做有限度的持久化保證

  • 已提交的消息:消息寫入日志文件
  • 有限度的持久化保證:N 個 broker 至少一個存活

(2)生產(chǎn)者丟失數(shù)據(jù)

  • producer.send(msg) 異步發(fā)送消息,不保證數(shù)據(jù)到達 Kafka
  • producer.send(msg, callback) 判斷回調(diào)

(3) 消費者程序丟失數(shù)據(jù)

  • 應該「先消費消息,后更新位移的順序」
  • 新問題:消息的重復處理
  • 多線程異步處理消息,Consumer 不要開啟自動提交位移,應用程序手動提交位移

9. Kafka 如何持久化?

(1)消息日志(Log)保存數(shù)據(jù),磁盤追加寫(Append-only)

  • 避免緩慢的隨機 I/O 操作
  • 高吞吐

(2)定期刪除消息(日志段)

10. Kafka 文件存儲機制

(1)每個 partition 相當于一個巨型文件 → 多個大小相等 segment 數(shù)據(jù)文件中

(2)每個 partition 只需要順序讀寫就行了,segment 文件生命周期由配置決定

(3)segment file 組成:

  • index file:索引文件
  • data file:數(shù)據(jù)文件

(4)segment file 文件命名規(guī)則:

  • 全局第一個 segment 是 0
  • 后序每個加上全局 partition 的最大 offset

一對 segment file

message 物理結構

11. Kafka 分區(qū)

為什么分區(qū)?

  • Kafka 的消息組織方式:主題-分區(qū)-消息
  • 一條消息,僅存在某一個分區(qū)中
  • 提高伸縮性,不同分區(qū)可以放到不同機器,讀寫操作也是以分區(qū)粒度

分區(qū)策略?

  • 輪詢
  • 隨機
  • 按 key 保序,單分區(qū)有序

12. MQ 消息堆積問題處理

消息堆積可能的原因: 隊列中消息不能被及時的消費,導致大量堆積在隊列里面 rocketMq Kafka RabbitMq 都會有這樣的問題 產(chǎn)生消息堆積的可以從 mq 的生產(chǎn)消費模型去考慮,從生產(chǎn)者到消息中間件、再到消費者,都會發(fā)生堆積。

  • 消費者:消費者處理速度過慢,或者消費者故障、延遲,無法即使的處理消息,導致消息堆積 
  • 生產(chǎn)者:生產(chǎn)者產(chǎn)生速度過快,消費者無法即使處理 
  • MQ 消息隊列:Mq 服務器的性能不足,比如它所在的機器,cpu、內(nèi)存、磁盤等超載,無法即使的處理消息,導致消息堆積 
  • 其他:其他方面也會有這樣的問題, 比如網(wǎng)絡故障,連接問題,消息在傳遞過程中過慢,從而導致消息堆積 業(yè)務方面,消息消費失敗重試,不斷的重試,沒有設置重試次數(shù),導致消息堆積。

處理消息堆積問題: 

(1)消費者:

  • 增加消費者的數(shù)量,提高消費的處理速度;(注意這個不通用,只適合 RabbitMq) 需要注意不能一味的水平擴展消費者 因為其他關鍵鏈路性能是否抗的住大量的水平擴展,比如 mysq、redis,詳細見下方 rabbitmq 消息堆積解決方案
  • 或者提高消費者的處理能力,比如通過并發(fā)處理、異步處理提高消費者吞吐量。這個則要注意通過線程池、隊列,把 mq 拉到程序的隊列中,要承擔對應的宕機導致消息丟失風險。

(2)MQ 消息隊列: 增加 MQ 的服務器資源,cpu、內(nèi)存、磁盤,提高 mq 處理能力 也可以通過分區(qū)隊列將消息分散到多個隊列中,提高整體的處理能力。(這個則是 Kafka、Rocket 采用的)

控制隊列容量,避免堆積過多,設置持久化策略。rabbitMQ 的懶加載隊列,兼顧了持久化和堆積上限

(3)監(jiān)控告警(重要) 設置監(jiān)控系統(tǒng),比如普羅米修斯,監(jiān)控消息數(shù)量,消費者處理速度,隊列狀態(tài)等等,在堆積發(fā)生前,即使的告警,及時采取措施。

But 上面的策略是通用的一些解決方案,不同的 MQ,生產(chǎn)消費模型是不一樣的,導致需要針對不同 mq 的消息堆積解決方案不一樣。

RabbitMq、Kafka、RocketMq 發(fā)生消息堆積,分別該如何去解決?

這里先點一下,增加消費者數(shù)量,并不是通用的,只適合 RabbitMq。

總結

Kafka 與 Pulsar 都是騰訊云主打的消息隊列中間件,都具有高性能,高可靠,支持多種場景。Kafka 推出的時間較早,各種場景比如日志、大數(shù)據(jù)處理等都有較成熟的解決方案。而 Pulsar 作為一個新秀,支持的功能比 CKafka 更豐富,而且跨地域容災,多租戶等功能,解決了很多 Kafka 設計缺陷和運維成本問題,整體穩(wěn)定性更強。很多國內(nèi)外大公司也有很多 Pulsar 的實踐案例。因此,一些傳統(tǒng)的日志、大數(shù)據(jù)處理等場景,對高吞吐量有要求的,對消息可靠性的要求沒那么高的,可以選用 Kafka,有很多優(yōu)秀的文檔說明怎么參數(shù)調(diào)優(yōu)提高性能。而一些對消息可靠性、容災要求更好,或者有高分區(qū)、延遲隊列等需求的場景,可以選用 Pulsar。

我們后臺的技術棧是基于 Golang 的,在上文的對比中,還挑了一個基于 Golang 開發(fā)的消息隊列 NSQ,如果有一些定制化需求或者需要二次開發(fā)的,可以選用 NSQ。也可以通過閱讀 NSQ 的源碼,學習一些優(yōu)秀高性能消息隊列中間件的實現(xiàn)方式,比如里邊 diskqueue 組件,一個基于磁盤的消息隊列,在某些場景下可能也可以進行二次利用。

責任編輯:趙寧寧 來源: 騰訊技術工程
相關推薦

2021-04-08 07:37:39

隊列數(shù)據(jù)結構算法

2023-02-10 09:04:27

2020-02-18 16:20:03

Redis ANSI C語言日志型

2022-06-20 09:01:23

Git插件項目

2022-08-01 11:33:09

用戶分析標簽策略

2023-09-11 08:13:03

分布式跟蹤工具

2019-05-14 09:31:16

架構整潔軟件編程范式

2018-05-22 08:24:50

PythonPyMongoMongoDB

2023-10-17 08:15:28

API前后端分離

2020-07-03 08:21:57

Java集合框架

2017-03-11 22:19:09

深度學習

2022-04-07 10:39:21

反射Java安全

2023-11-18 09:30:42

模型AI

2022-07-06 12:07:06

Python函數(shù)式編程

2019-04-01 10:43:59

Linux問題故障

2022-05-19 08:28:19

索引數(shù)據(jù)庫

2020-10-21 14:12:02

Single Sign

2020-10-18 07:32:06

SD-WAN網(wǎng)絡傳統(tǒng)廣域網(wǎng)

2023-11-06 07:21:13

內(nèi)存結構Jvm

2019-04-10 10:43:15

Redis內(nèi)存淘汰策略
點贊
收藏

51CTO技術棧公眾號