自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

一文講清 Kafka 工作流程和存儲(chǔ)機(jī)制

存儲(chǔ) 存儲(chǔ)軟件 Kafka
Kafka 中消息是以 topic 進(jìn)行分類的,生產(chǎn)者生產(chǎn)消息,消費(fèi)者消費(fèi)消息,都是面向 topic 的。

 一、Kafka 文件存儲(chǔ)機(jī)制

 

topic構(gòu)成

Kafka 中消息是以 topic 進(jìn)行分類的,生產(chǎn)者生產(chǎn)消息,消費(fèi)者消費(fèi)消息,都是面向 topic 的。

在 Kafka 中,一個(gè) topic 可以分為多個(gè) partition,一個(gè) partition 分為多個(gè) segment,每個(gè) segment 對(duì)應(yīng)兩個(gè)文件:.index 和 .log 文件

 

topic 是邏輯上的概念,而 patition 是物理上的概念,每個(gè) patition 對(duì)應(yīng)一個(gè) log 文件,而 log 文件中存儲(chǔ)的就是 producer 生產(chǎn)的數(shù)據(jù),patition 生產(chǎn)的數(shù)據(jù)會(huì)被不斷的添加到 log 文件的末端,且每條數(shù)據(jù)都有自己的 offset。

消費(fèi)組中的每個(gè)消費(fèi)者,都是實(shí)時(shí)記錄自己消費(fèi)到哪個(gè) offset,以便出錯(cuò)恢復(fù),從上次的位置繼續(xù)消費(fèi)。

消息存儲(chǔ)原理

由于生產(chǎn)者生產(chǎn)的消息會(huì)不斷追加到 log 文件末尾,為防止 log 文件過(guò)大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka 采取了分片和索引機(jī)制,將每個(gè) partition 分為多個(gè) segment。每個(gè) segment 對(duì)應(yīng)兩個(gè)文件——.index文件和 .log文件。這些文件位于一個(gè)文件夾下,該文件夾的命名規(guī)則為:topic名稱+分區(qū)序號(hào)。

如下,我們創(chuàng)建一個(gè)只有一個(gè)分區(qū)一個(gè)副本的 topic

  1. > bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic starfish 

 

然后可以在 kafka-logs 目錄(server.properties 默認(rèn)配置)下看到會(huì)有個(gè)名為 starfish-0 的文件夾。如果,starfish 這個(gè) topic 有三個(gè)分區(qū),則其對(duì)應(yīng)的文件夾為 starfish-0,starfish-1,starfish-2。

 

這些文件的含義如下:

類別 作用
.index 偏移量索引文件,存儲(chǔ)數(shù)據(jù)對(duì)應(yīng)的偏移量
.timestamp 時(shí)間戳索引文件
.log 日志文件,存儲(chǔ)生產(chǎn)者生產(chǎn)的數(shù)據(jù)
.snaphot 快照文件
leader-epoch-checkpoint 保存了每一任leader開(kāi)始寫(xiě)入消息時(shí)的offset,會(huì)定時(shí)更新。follower被選為leader時(shí)會(huì)根據(jù)這個(gè)確定哪些消息可用

index 和 log 文件以當(dāng)前 segment 的第一條消息的 offset 命名。偏移量 offset 是一個(gè) 64 位的長(zhǎng)整形數(shù),固定是20 位數(shù)字,長(zhǎng)度未達(dá)到,用 0 進(jìn)行填補(bǔ),索引文件和日志文件都由此作為文件名命名規(guī)則。所以從上圖可以看出,我們的偏移量是從 0 開(kāi)始的,.index 和 .log 文件名稱都為 00000000000000000000。

接著往 topic 中發(fā)送一些消息,并啟動(dòng)消費(fèi)者消費(fèi)

  1. > bin /kafka-console-producer.sh --bootstrap-server localhost:9092 --topic starfishone 
  2.  
  3. > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic starfish --from-beginningone 

 

查看 .log 文件下是否有數(shù)據(jù) one

 

內(nèi)容存在一些”亂碼“,因?yàn)閿?shù)據(jù)是經(jīng)過(guò)序列化壓縮的。

那么數(shù)據(jù)文件 .log 大小有限制嗎,能保存多久時(shí)間?這些我們都可以通過(guò) Kafka 目錄下 conf/server.properties 配置文件修改:

  1. # log文件存儲(chǔ)時(shí)間,單位為小時(shí),這里設(shè)置為1周 
  2. log.retention.hours=168 
  3.  
  4. # log文件大小的最大值,這里為1g,超過(guò)這個(gè)值,則會(huì)創(chuàng)建新的segment(也就是新的.index和.log文件) 
  5. log.segment.bytes=1073741824 

比如,當(dāng)生產(chǎn)者生產(chǎn)數(shù)據(jù)量較多,一個(gè) segment 存儲(chǔ)不下觸發(fā)分片時(shí),在日志 topic 目錄下你會(huì)看到類似如下所示的文件:

  1. 00000000000000000000.index 
  2. 00000000000000000000.log 
  3. 00000000000000170410.index 
  4. 00000000000000170410.log 
  5. 00000000000000239430.index 
  6. 00000000000000239430.log 

下圖展示了Kafka查找數(shù)據(jù)的過(guò)程:

 

.index文件 存儲(chǔ)大量的索引信息,.log文件 存儲(chǔ)大量的數(shù)據(jù),索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中 message 的物理偏移地址。

比如現(xiàn)在要查找偏移量 offset 為 3 的消息,根據(jù) .index 文件命名我們可以知道,offset 為 3 的索引應(yīng)該從00000000000000000000.index 里查找。根據(jù)上圖所示,其對(duì)應(yīng)的索引地址為 756-911,所以 Kafka 將讀取00000000000000000000.log 756~911區(qū)間的數(shù)據(jù)。

二、Kafka 生產(chǎn)過(guò)程

Kafka 生產(chǎn)者用于生產(chǎn)消息。通過(guò)前面的內(nèi)容我們知道,Kafka 的 topic 可以有多個(gè)分區(qū),那么生產(chǎn)者如何將這些數(shù)據(jù)可靠地發(fā)送到這些分區(qū)?生產(chǎn)者發(fā)送數(shù)據(jù)的不同的分區(qū)的依據(jù)是什么?針對(duì)這兩個(gè)疑問(wèn),這節(jié)簡(jiǎn)單記錄下。

3.2.1 寫(xiě)入流程

producer 寫(xiě)入消息流程如下:

 

  1. producer 先從 zookeeper 的 "/brokers/.../state"節(jié)點(diǎn)找到該 partition 的 leader
  2. producer 將消息發(fā)送給該 leader
  3. leader 將消息寫(xiě)入本地 log
  4. followers 從 leader pull 消息,寫(xiě)入本地 log 后向 leader 發(fā)送 ACK
  5. leader 收到所有 ISR 中的 replication 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset)并向 producer 發(fā)送 ACK

2.1 寫(xiě)入方式

producer 采用推(push) 模式將消息發(fā)布到 broker,每條消息都被追加(append) 到分區(qū)(patition) 中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機(jī)寫(xiě)內(nèi)存要高,保障 kafka 吞吐率)。

2.2 分區(qū)(Partition)

消息發(fā)送時(shí)都被發(fā)送到一個(gè) topic,其本質(zhì)就是一個(gè)目錄,而 topic 是由一些 Partition Logs(分區(qū)日志)組成

分區(qū)的原因:

方便在集群中擴(kuò)展,每個(gè) Partition 可以通過(guò)調(diào)整以適應(yīng)它所在的機(jī)器,而一個(gè) topic 又可以有多個(gè) Partition 組成,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)了;

可以提高并發(fā),因?yàn)榭梢砸?Partition 為單位讀寫(xiě)了。

分區(qū)的原則:

我們需要將 producer 發(fā)送的數(shù)據(jù)封裝成一個(gè) ProducerRecord 對(duì)象。

  1. public ProducerRecord (String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) 
  2. public ProducerRecord (String topic, Integer partition, Long timestamp, K key, V value) 
  3. public ProducerRecord (String topic, Integer partition, K key, V value, Iterable<Header> headers) 
  4. public ProducerRecord (String topic, Integer partition, K key, V value) 
  5. public ProducerRecord (String topic, K key, V value) 
  6. public ProducerRecord (String topic, V value) 

 

  1. 指明 partition 的情況下,直接將指明的值直接作為 partiton 值;
  2. 沒(méi)有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition 數(shù)進(jìn)行取余得到 partition 值;
  3. 既沒(méi)有 partition 值又沒(méi)有 key 值的情況下,第一次調(diào)用時(shí)隨機(jī)生成一個(gè)整數(shù)(后面每次調(diào)用在這個(gè)整數(shù)上自增),將這個(gè)值與 topic 可用的 partition 總數(shù)取余得到 partition 值,也就是常說(shuō)的 round-robin 算法。

2.3 副本(Replication)

同一個(gè) partition 可能會(huì)有多個(gè) replication( 對(duì)應(yīng) server.properties 配置中的 default.replication.factor=N)。沒(méi)有 replication 的情況下,一旦 broker 宕機(jī),其上所有 patition 的數(shù)據(jù)都不可被消費(fèi),同時(shí) producer 也不能再將數(shù)據(jù)存于其上的 patition。引入 replication 之后,同一個(gè) partition 可能會(huì)有多個(gè) replication,而這時(shí)需要在這些 replication 之間選出一 個(gè) leader, producer 和 consumer 只與這個(gè) leader 交互,其它 replication 作為 follower 從 leader 中復(fù)制數(shù)據(jù)。

2.4 數(shù)據(jù)可靠性保證

為保證 producer 發(fā)送的數(shù)據(jù),能可靠的發(fā)送到指定的 topic,topic 的每個(gè) partition 收到 producer 數(shù)據(jù)后,都需要向 producer 發(fā)送 ack(acknowledgement確認(rèn)收到),如果 producer 收到 ack,就會(huì)進(jìn)行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)。

a) 副本數(shù)據(jù)同步策略主要有如下兩種

方案 優(yōu)點(diǎn) 缺點(diǎn)
半數(shù)以上完成同步,就發(fā)送ack 延遲低 選舉新的 leader 時(shí),容忍n臺(tái)節(jié)點(diǎn)的故障,需要2n+1個(gè)副本
全部完成同步,才發(fā)送ack 選舉新的 leader 時(shí),容忍n臺(tái)節(jié)點(diǎn)的故障,需要 n+1 個(gè)副本 延遲高

Kafka 選擇了第二種方案,原因如下:

  • 同樣為了容忍 n 臺(tái)節(jié)點(diǎn)的故障,第一種方案需要的副本數(shù)相對(duì)較多,而 Kafka 的每個(gè)分區(qū)都有大量的數(shù)據(jù),第一種方案會(huì)造成大量的數(shù)據(jù)冗余;
  • 雖然第二種方案的網(wǎng)絡(luò)延遲會(huì)比較高,但網(wǎng)絡(luò)延遲對(duì) Kafka 的影響較小。

b) ISR

采用第二種方案之后,設(shè)想一下情景:leader 收到數(shù)據(jù),所有 follower 都開(kāi)始同步數(shù)據(jù),但有一個(gè) follower 掛了,遲遲不能與 leader 保持同步,那 leader 就要一直等下去,直到它完成同步,才能發(fā)送 ack,這個(gè)問(wèn)題怎么解決呢?

leader 維護(hù)了一個(gè)動(dòng)態(tài)的 in-sync replica set(ISR),意為和 leader 保持同步的 follower 集合。當(dāng) ISR 中的follower 完成數(shù)據(jù)的同步之后,leader 就會(huì)給 follower 發(fā)送 ack。如果 follower 長(zhǎng)時(shí)間未向 leader 同步數(shù)據(jù),則該 follower 將會(huì)被踢出 ISR,該時(shí)間閾值由 replica.lag.time.max.ms 參數(shù)設(shè)定。leader 發(fā)生故障之后,就會(huì)從 ISR 中選舉新的 leader。(之前還有另一個(gè)參數(shù),0.9 版本之后 replica.lag.max.messages 參數(shù)被移除了)

c) ack應(yīng)答機(jī)制

對(duì)于某些不太重要的數(shù)據(jù),對(duì)數(shù)據(jù)的可靠性要求不是很高,能夠容忍數(shù)據(jù)的少量丟失,所以沒(méi)必要等 ISR 中的follower全部接收成功。

所以Kafka為用戶提供了三種可靠性級(jí)別,用戶根據(jù)對(duì)可靠性和延遲的要求進(jìn)行權(quán)衡,選擇以下的acks 參數(shù)配置

0:producer 不等待 broker 的 ack,這一操作提供了一個(gè)最低的延遲,broker 一接收到還沒(méi)有寫(xiě)入磁盤就已經(jīng)返回,當(dāng) broker 故障時(shí)有可能丟失數(shù)據(jù);

1:producer 等待 broker 的 ack,partition 的 leader 落盤成功后返回 ack,如果在 follower 同步成功之前 leader 故障,那么將會(huì)丟失數(shù)據(jù);

-1(all):producer 等待 broker 的 ack,partition 的 leader 和 follower 全部落盤成功后才返回 ack。但是 如果在 follower 同步完成后,broker 發(fā)送 ack 之前,leader 發(fā)生故障,那么就會(huì)造成數(shù)據(jù)重復(fù)。

d) 故障處理

由于我們并不能保證 Kafka 集群中每時(shí)每刻 follower 的長(zhǎng)度都和 leader 一致(即數(shù)據(jù)同步是有時(shí)延的),那么當(dāng)leader 掛掉選舉某個(gè) follower 為新的 leader 的時(shí)候(原先掛掉的 leader 恢復(fù)了成為了 follower),可能會(huì)出現(xiàn)leader 的數(shù)據(jù)比 follower 還少的情況。為了解決這種數(shù)據(jù)量不一致帶來(lái)的混亂情況,Kafka 提出了以下概念:

 

  • LEO(Log End Offset):指的是每個(gè)副本最后一個(gè)offset;
  • HW(High Wather):指的是消費(fèi)者能見(jiàn)到的最大的 offset,ISR 隊(duì)列中最小的 LEO。

消費(fèi)者和 leader 通信時(shí),只能消費(fèi) HW 之前的數(shù)據(jù),HW 之后的數(shù)據(jù)對(duì)消費(fèi)者不可見(jiàn)。

針對(duì)這個(gè)規(guī)則:

  • 當(dāng)follower發(fā)生故障時(shí):follower 發(fā)生故障后會(huì)被臨時(shí)踢出 ISR,待該 follower 恢復(fù)后,follower 會(huì)讀取本地磁盤記錄的上次的 HW,并將 log 文件高于 HW 的部分截取掉,從 HW 開(kāi)始向 leader 進(jìn)行同步。等該 follower 的 LEO 大于等于該 Partition 的 HW,即 follower 追上 leader 之后,就可以重新加入 ISR 了。
  • 當(dāng)leader發(fā)生故障時(shí):leader 發(fā)生故障之后,會(huì)從 ISR 中選出一個(gè)新的 leader,之后,為保證多個(gè)副本之間的數(shù)據(jù)一致性,其余的 follower 會(huì)先將各自的 log 文件高于 HW 的部分截掉,然后從新的 leader 同步數(shù)據(jù)。

所以數(shù)據(jù)一致性并不能保證數(shù)據(jù)不丟失或者不重復(fù),這是由 ack 控制的。HW 規(guī)則只能保證副本之間的數(shù)據(jù)一致性!

2.5 Exactly Once語(yǔ)義

將服務(wù)器的 ACK 級(jí)別設(shè)置為 -1,可以保證 Producer 到 Server 之間不會(huì)丟失數(shù)據(jù),即 At Least Once 語(yǔ)義。相對(duì)的,將服務(wù)器 ACK 級(jí)別設(shè)置為 0,可以保證生產(chǎn)者每條消息只會(huì)被發(fā)送一次,即 At Most Once語(yǔ)義。

At Least Once 可以保證數(shù)據(jù)不丟失,但是不能保證數(shù)據(jù)不重復(fù)。相對(duì)的,At Most Once 可以保證數(shù)據(jù)不重復(fù),但是不能保證數(shù)據(jù)不丟失。但是,對(duì)于一些非常重要的信息,比如說(shuō)交易數(shù)據(jù),下游數(shù)據(jù)消費(fèi)者要求數(shù)據(jù)既不重復(fù)也不丟失,即 Exactly Once 語(yǔ)義。在 0.11 版本以前的 Kafka,對(duì)此是無(wú)能為力的,只能保證數(shù)據(jù)不丟失,再在下游消費(fèi)者對(duì)數(shù)據(jù)做全局去重。對(duì)于多個(gè)下游應(yīng)用的情況,每個(gè)都需要單獨(dú)做全局去重,這就對(duì)性能造成了很大的影響。

0.11 版本的 Kafka,引入了一項(xiàng)重大特性:冪等性。所謂的冪等性就是指 Producer 不論向 Server 發(fā)送多少次重復(fù)數(shù)據(jù)。Server 端都會(huì)只持久化一條,冪等性結(jié)合 At Least Once 語(yǔ)義,就構(gòu)成了 Kafka 的 Exactily Once 語(yǔ)義,即:At Least Once + 冪等性 = Exactly Once

要啟用冪等性,只需要將 Producer 的參數(shù)中 enable.idompotence 設(shè)置為 true 即可。Kafka 的冪等性實(shí)現(xiàn)其實(shí)就是將原來(lái)下游需要做的去重放在了數(shù)據(jù)上游。開(kāi)啟冪等性的 Producer 在初始化的時(shí)候會(huì)被分配一個(gè) PID,發(fā)往同一 Partition 的消息會(huì)附帶 Sequence Number。而 Broker 端會(huì)對(duì)

但是 PID 重啟就會(huì)變化,同時(shí)不同的 Partition 也具有不同主鍵,所以冪等性無(wú)法保證跨分區(qū)會(huì)話的 Exactly Once。

三、Broker 保存消息

3.1 存儲(chǔ)方式

物理上把 topic 分成一個(gè)或多個(gè) patition(對(duì)應(yīng) server.properties 中的 num.partitions=3 配置),每個(gè) patition 物理上對(duì)應(yīng)一個(gè)文件夾(該文件夾存儲(chǔ)該 patition 的所有消息和索引文件)。

3.2 存儲(chǔ)策略

無(wú)論消息是否被消費(fèi), kafka 都會(huì)保留所有消息。有兩種策略可以刪除舊數(shù)據(jù):

基于時(shí)間:log.retention.hours=168

基于大小:log.retention.bytes=1073741824 需要注意的是,因?yàn)?Kafka 讀取特定消息的時(shí)間復(fù)雜度為 O(1),即與文件大小無(wú)關(guān), 所以這里刪除過(guò)期文件與提高 Kafka 性能無(wú)關(guān)。

四、Kafka 消費(fèi)過(guò)程

Kafka 消費(fèi)者采用 pull 拉模式從 broker 中消費(fèi)數(shù)據(jù)。與之相對(duì)的 push(推)模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因?yàn)橄l(fā)送速率是由 broker 決定的。它的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成 consumer 來(lái)不及處理消息。而 pull 模式則可以根據(jù) consumer 的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。

pull 模式不足之處是,如果 kafka 沒(méi)有數(shù)據(jù),消費(fèi)者可能會(huì)陷入循環(huán)中,一直返回空數(shù)據(jù)。為了避免這種情況,我們?cè)谖覀兊睦?qǐng)求中有參數(shù),允許消費(fèi)者請(qǐng)求在等待數(shù)據(jù)到達(dá)的“長(zhǎng)輪詢”中進(jìn)行阻塞(并且可選地等待到給定的字節(jié)數(shù),以確保大的傳輸大小,或者傳入等待超時(shí)時(shí)間)。

4.1 消費(fèi)者組

 

消費(fèi)者是以 consumer group 消費(fèi)者組的方式工作,由一個(gè)或者多個(gè)消費(fèi)者組成一個(gè)組, 共同消費(fèi)一個(gè) topic。每個(gè)分區(qū)在同一時(shí)間只能由 group 中的一個(gè)消費(fèi)者讀取,但是多個(gè) group 可以同時(shí)消費(fèi)這個(gè) partition。在圖中,有一個(gè)由三個(gè)消費(fèi)者組成的 group,有一個(gè)消費(fèi)者讀取主題中的兩個(gè)分區(qū),另外兩個(gè)分別讀取一個(gè)分區(qū)。某個(gè)消費(fèi)者讀取某個(gè)分區(qū),也可以叫做某個(gè)消費(fèi)者是某個(gè)分區(qū)的擁有者。

在這種情況下,消費(fèi)者可以通過(guò)水平擴(kuò)展的方式同時(shí)讀取大量的消息。另外,如果一個(gè)消費(fèi)者失敗了,那么其他的 group 成員會(huì)自動(dòng)負(fù)載均衡讀取之前失敗的消費(fèi)者讀取的分區(qū)。

消費(fèi)者組最為重要的一個(gè)功能是實(shí)現(xiàn)廣播與單播的功能。一個(gè)消費(fèi)者組可以確保其所訂閱的 Topic 的每個(gè)分區(qū)只能被從屬于該消費(fèi)者組中的唯一一個(gè)消費(fèi)者所消費(fèi);如果不同的消費(fèi)者組訂閱了同一個(gè) Topic,那么這些消費(fèi)者組之間是彼此獨(dú)立的,不會(huì)受到相互的干擾。

如果我們希望一條消息可以被多個(gè)消費(fèi)者所消費(fèi),那么可以將這些消費(fèi)者放到不同的消費(fèi)者組中,這實(shí)際上就是廣播的效果;如果希望一條消息只能被一個(gè)消費(fèi)者所消費(fèi),那么可以將這些消費(fèi)者放到同一個(gè)消費(fèi)者組中,這實(shí)際上就是單播的效果。

4.2 分區(qū)分配策略

一個(gè) consumer group 中有多個(gè) consumer,一個(gè) topic 有多個(gè) partition,所以必然會(huì)涉及到 partition 的分配問(wèn)題,即確定哪個(gè) partition 由哪個(gè) consumer 來(lái)消費(fèi)。

Kafka 有兩種分配策略,一是 RoundRobin,一是 Range。

RoundRobin

RoundRobin 即輪詢的意思,比如現(xiàn)在有一個(gè)三個(gè)消費(fèi)者 ConsumerA、ConsumerB 和 ConsumerC 組成的消費(fèi)者組,同時(shí)消費(fèi) TopicA 主題消息,TopicA 分為 7 個(gè)分區(qū),如果采用 RoundRobin 分配策略,過(guò)程如下所示:

圖片:mrbird.cc

 

這種輪詢的方式應(yīng)該很好理解。但如果消費(fèi)者組消費(fèi)多個(gè)主題的多個(gè)分區(qū),會(huì)發(fā)生什么情況呢?比如現(xiàn)在有一個(gè)兩個(gè)消費(fèi)者 ConsumerA 和 ConsumerB 組成的消費(fèi)者組,同時(shí)消費(fèi) TopicA 和 TopicB 主題消息,如果采用RoundRobin 分配策略,過(guò)程如下所示:

 

注:TAP0 表示 TopicA Partition0 分區(qū)數(shù)據(jù),以此類推。

這種情況下,采用 RoundRobin 算法分配,多個(gè)主題會(huì)被當(dāng)做一個(gè)整體來(lái)看,這個(gè)整體包含了各自的 Partition,比如在 Kafka-clients 依賴中,與之對(duì)應(yīng)的對(duì)象為 TopicPartition。接著將這些 TopicPartition 根據(jù)其哈希值進(jìn)行排序,排序后采用輪詢的方式分配給消費(fèi)者。

但這會(huì)帶來(lái)一個(gè)問(wèn)題:假如上圖中的消費(fèi)者組中,ConsumerA 只訂閱了 TopicA 主題,ConsumerB 只訂閱了TopicB 主題,采用 RoundRobin 輪詢算法后,可能會(huì)出現(xiàn) ConsumerA 消費(fèi)了 TopicB 主題分區(qū)里的消息,ConsumerB 消費(fèi)了 TopicA 主題分區(qū)里的消息。

綜上所述,RoundRobin 算法只適用于消費(fèi)者組中消費(fèi)者訂閱的主題相同的情況。同時(shí)會(huì)發(fā)現(xiàn),采用 RoundRobin 算法,消費(fèi)者組里的消費(fèi)者之間消費(fèi)的消息個(gè)數(shù)最多相差 1 個(gè)。

Range

Kafka 默認(rèn)采用 Range 分配策略,Range 顧名思義就是按范圍劃分的意思。

比如現(xiàn)在有一個(gè)三個(gè)消費(fèi)者 ConsumerA、ConsumerB 和 ConsumerC 組成的消費(fèi)者組,同時(shí)消費(fèi) TopicA 主題消息,TopicA分為7個(gè)分區(qū),如果采用 Range 分配策略,過(guò)程如下所示:

 

假如現(xiàn)在有一個(gè)兩個(gè)消費(fèi)者 ConsumerA 和 ConsumerB 組成的消費(fèi)者組,同時(shí)消費(fèi) TopicA 和 TopicB 主題消息,如果采用 Range 分配策略,過(guò)程如下所示:

 

Range 算法并不會(huì)把多個(gè)主題分區(qū)當(dāng)成一個(gè)整體。

從上面的例子我們可以總結(jié)出Range算法的一個(gè)弊端:那就是同一個(gè)消費(fèi)者組內(nèi)的消費(fèi)者消費(fèi)的消息數(shù)量相差可能較大。

4.3 offset 的維護(hù)

由于 consumer 在消費(fèi)過(guò)程中可能會(huì)出現(xiàn)斷電宕機(jī)等故障,consumer 恢復(fù)后,需要從故障前的位置繼續(xù)消費(fèi),所以 consumer 需要實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè) offset,以便故障恢復(fù)后繼續(xù)消費(fèi)。

Kafka 0.9 版本之前,consumer 默認(rèn)將 offset 保存在 Zookeeper 中,從 0.9 版本開(kāi)始,consumer 默認(rèn)將 offset保存在 Kafka 一個(gè)內(nèi)置的 topic 中,該 topic 為 _consumer_offsets。

  1. > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic starfish --from-beginning 
  2. one 

消費(fèi) topic 后,查看 kafka-logs 目錄,會(huì)發(fā)現(xiàn)多出 50 個(gè)分區(qū)。

默認(rèn)情況下__consumer_offsets 有 50 個(gè)分區(qū),如果你的系統(tǒng)中 consumer group 也很多的話,那么這個(gè)命令的輸出結(jié)果會(huì)很多

 

五、Kafka事務(wù)

Kafka 從 0.11 版本開(kāi)始引入了事務(wù)支持。事務(wù)可以保證 Kafka 在 Exactly Once 語(yǔ)義的基礎(chǔ)上,生產(chǎn)和消費(fèi)可以跨分區(qū)和會(huì)話,要么全部成功,要么全部失敗。

5.1 Producer事務(wù)

為了了實(shí)現(xiàn)跨分區(qū)跨會(huì)話的事務(wù),需要引入一個(gè)全局唯一的 TransactionID,并將 Producer 獲得的 PID 和Transaction ID 綁定。這樣當(dāng) Producer 重啟后就可以通過(guò)正在進(jìn)行的 TransactionID 獲得原來(lái)的 PID。

為了管理 Transaction,Kafka 引入了一個(gè)新的組件 Transaction Coordinator。Producer 就是通過(guò)和 Transaction Coordinator 交互獲得 Transaction ID 對(duì)應(yīng)的任務(wù)狀態(tài)。Transaction Coordinator 還負(fù)責(zé)將事務(wù)所有寫(xiě)入 Kafka 的一個(gè)內(nèi)部 Topic,這樣即使整個(gè)服務(wù)重啟,由于事務(wù)狀態(tài)得到保存,進(jìn)行中的事務(wù)狀態(tài)可以得到恢復(fù),從而繼續(xù)進(jìn)行。

5.2 Consumer事務(wù)

對(duì) Consumer 而言,事務(wù)的保證就會(huì)相對(duì)較弱,尤其是無(wú)法保證 Commit 的消息被準(zhǔn)確消費(fèi)。這是由于Consumer 可以通過(guò) offset 訪問(wèn)任意信息,而且不同的 SegmentFile 生命周期不同,同一事務(wù)的消息可能會(huì)出現(xiàn)重啟后被刪除的情況。

參考:

尚硅谷Kafka教學(xué)

部分圖片來(lái)源:mrbird.cc

https://gitbook.cn/books/5ae1e77197c22f130e67ec4e/index.html

 

責(zé)任編輯:武曉燕 來(lái)源: JavaKeeper
相關(guān)推薦

2020-10-26 09:18:50

RedisCluste

2021-10-19 10:10:51

MySQL事務(wù)隔離級(jí)別數(shù)據(jù)庫(kù)

2021-10-29 11:30:31

補(bǔ)碼二進(jìn)制反碼

2024-01-12 07:14:52

AI應(yīng)用架構(gòu)

2021-05-29 10:11:00

Kafa數(shù)據(jù)業(yè)務(wù)

2021-01-15 08:52:09

GitHub GitHubActio博文發(fā)布

2023-06-05 08:14:17

RabbitMQ兔子MQ開(kāi)源

2020-06-03 08:19:00

Kubernetes

2014-02-20 09:50:15

云存儲(chǔ)云服務(wù)工作流程

2021-11-18 15:08:19

MySQLSQL索引

2018-05-21 07:08:18

行為驅(qū)動(dòng)開(kāi)發(fā)BDD編碼

2024-12-04 13:02:34

數(shù)據(jù)庫(kù)分庫(kù)分表

2021-10-25 15:25:38

MySQL索引數(shù)據(jù)庫(kù)

2019-10-12 08:59:36

軟件DevOps技術(shù)

2009-06-05 10:26:05

struts工作流程

2010-09-27 10:19:09

DHCP工作流程

2022-11-02 15:11:44

LightHouseChrome插件

2023-07-07 08:00:00

KafkaSpringBoo

2011-03-31 10:54:01

Cacti工作流程

2021-05-11 10:40:29

JUCAQSJava
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)