自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

生產(chǎn)者的實現(xiàn)邏輯-kafka知識體系(二)

開發(fā) 架構(gòu) Kafka
Kafka是最初由Linkedin公司開發(fā),是一個分布式、支持分區(qū)的(partition)、多副本的(replica),基于zookeeper協(xié)調(diào)的分布式消息系統(tǒng),它的最大的特性就是可以實時的處理大量數(shù)據(jù)以滿足各種需求場景。

[[409180]]

kafka 是單條發(fā)送還是批量發(fā)送消息?

kafka 怎么做到單條發(fā)送?

kafka 發(fā)送消息是順序的嗎?

生產(chǎn)者什么情況下可能會頻繁FullGC?

消息發(fā)送的邏輯

上帝視角來看消息發(fā)送的流程。

生產(chǎn)者的設(shè)計

消費發(fā)送機制:

1)序列化器:序列化消息對象轉(zhuǎn)成字節(jié)數(shù)組,然后通過網(wǎng)絡(luò)傳輸。

2)分區(qū)器:計算消息發(fā)往的具體分區(qū);如果顯示指定了partition,便不會走分區(qū)器。

3)消息緩沖池:客戶端的消息緩沖池,默認(rèn)大小32M,見參數(shù)buffer.memory。

4)批量發(fā)送:緩沖池中消息會按batch分批次發(fā)送,默認(rèn)批次大小16KB,見參數(shù)batch.size。

負(fù)載均衡設(shè)計:

由于消息topic 由多個partition 組成,且partition 會均衡分布到不同broker 上。因此,為了有效利用broker 集群的性能,提高消息的吞吐量,producer 可以通過隨機或者hash 等方式,將消息平均發(fā)送到多個partition 上,以實現(xiàn)負(fù)載均衡。

分區(qū)策略:

  1. 輪詢策略,默認(rèn)策略
  2. 隨機策略,實際表現(xiàn)來看,它要遜于輪詢策略
  3. 按消息鍵保序策略,一旦消息被定義了 Key,那么你就可以保證同一個 Key 的所有消息都進入到相同的分區(qū)里面,由于每個分區(qū)下的消息處理都是有順序的。

KafkaProducer

源碼

  1. //客戶端ID。在創(chuàng)建 KafkaProducer 時可通過 client.id 定義 clientId,如果未指定,則默認(rèn) producer- seq,seq 在進程內(nèi)遞增,強烈建議客戶端顯示指定 clientId。 
  2.  private final String clientId; 
  3.    //度量的相關(guān)存儲容器,例如消息體大小、發(fā)送耗時等與監(jiān)控相關(guān)的指標(biāo)。 
  4.     final Metrics metrics; 
  5.     //分區(qū)負(fù)載均衡算法,通過參數(shù) partitioner.class 指定。 
  6.     private final Partitioner partitioner; 
  7.     //調(diào)用 send 方法發(fā)送的最大請求大小,包括 key、消息體序列化后的消息總大小不能超過該值。通過參數(shù) max.request.size 來設(shè)置。 
  8.     private final int maxRequestSize; 
  9.     //生產(chǎn)者緩存所占內(nèi)存的總大小,通過參數(shù) buffer.memory 設(shè)置。 
  10.     private final long totalMemorySize; 
  11.     //元數(shù)據(jù)信息,例如 topic 的路由信息,由 KafkaProducer 自動更新。 
  12.     private final Metadata metadata; 
  13.     //消息記錄累積器 
  14.     private final RecordAccumulator accumulator; 
  15.     //用于封裝消息發(fā)送的邏輯,即向 broker 發(fā)送消息的處理邏輯。 
  16.     private final Sender sender; 
  17.     //用于消息發(fā)送的后臺線程,一個獨立的線程,內(nèi)部使用 Sender 來向 broker 發(fā)送消息。 
  18.     private final Thread ioThread; 
  19.     //壓縮類型,默認(rèn)不啟用壓縮,可通過參數(shù) compression.type 配置??蛇x值:none、gzip、snappy、lz4、zstd。 
  20.     private final CompressionType compressionType; 
  21.     //錯誤信息收集器,當(dāng)成一個 metrics,用來做監(jiān)控的。 
  22.     private final Sensor errors; 
  23.     //用于獲取系統(tǒng)時間或線程睡眠等。 
  24.     private final Time time
  25.     //用于對消息的 key 進行序列化。 
  26.     private final ExtendedSerializer<K> keySerializer; 
  27.     //Serializer< V> valueSerializer 
  28.     private final ExtendedSerializer<V> valueSerializer; 
  29.     //生產(chǎn)者的配置信息。 
  30.     private final ProducerConfig producerConfig; 
  31.     //最大阻塞時間,當(dāng)生產(chǎn)者使用的緩存已經(jīng)達到規(guī)定值后,此時消息發(fā)送會阻塞,通過參數(shù) max.block.ms 來設(shè)置最多等待多久。 
  32.     private final long maxBlockTimeMs; 
  33.    //配置控制客戶機等待請求響應(yīng)的最長時間。如果在超時超時之前沒有收到響應(yīng),客戶端將在需要時重新發(fā)送請求,或者在重試耗盡時失敗請求。 
  34.     private final int requestTimeoutMs; 
  35.     //生產(chǎn)者端的攔截器,在消息發(fā)送之前進行一些定制化處理。 
  36.     private final ProducerInterceptors<K, V> interceptors; 
  37.     //維護 api 版本的相關(guān)元信息,該類只能在 kafka 內(nèi)部使用。 
  38.     private final ApiVersions apiVersions; 
  39.     //kafka 消息事務(wù)管理器。 
  40.     private final TransactionManager transactionManager; 
  41.     //kafka 生產(chǎn)者事務(wù)上下文環(huán)境初始結(jié)果。 
  42.     private TransactionalRequestResult initTransactionsResult; 

KafkaProducer 具有如下特征:

  1. KafkaProducer 是線程安全的,可以被多個線程交叉使用。
  2. KafkaProducer 內(nèi)部包含一個緩存池,存放待發(fā)送消息,即 ProducerRecord 隊列,與此同時會開啟一個IO線程將 ProducerRecord 對象發(fā)送到 Kafka 集群。
  3. KafkaProducer 的消息發(fā)送 API send 方法是異步,只負(fù)責(zé)將待發(fā)送消息 ProducerRecord 發(fā)送到緩存區(qū)中,立即返回,并返回一個結(jié)果憑證 Future。

acks 參數(shù)的作用

KafkaProducer 提供了一個核心參數(shù) acks 用來定義消息“已提交”的條件(標(biāo)準(zhǔn)),就是 Broker 端向客戶端承偌已提交的條件,可選值如下:

  1. 0:只要調(diào)用 KafkaProducer 的 send 方法返回后即認(rèn)為成功
  2. all 或 -1:表示消息不僅需要 Leader 節(jié)點已存儲該消息,并且要求其副本(準(zhǔn)確的來說是 ISR 中的節(jié)點)全部存儲才認(rèn)為已提交,才向客戶端返回提交成功。這是最嚴(yán)格的持久化保障,當(dāng)然性能也最低。
  3. 1:表示消息只需要寫入 Leader 節(jié)點后就可以向客戶端返回提交成功。

retries 參數(shù)的作用

kafka 在生產(chǎn)端提供的另外一個核心屬性,用來控制消息在發(fā)送失敗后的重試次數(shù),設(shè)置為 0 表示不重試,重試就有可能造成消息在發(fā)送端的重復(fù)。從消息發(fā)送接口來看:

  1. Future<RecordMetadata> send(ProducerRecord<K, V> record);Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback); 

從上面的 API 可以得知,用戶在使用 KafkaProducer 發(fā)送消息時,首先需要將待發(fā)送的消息封裝成 ProducerRecord,返回的是一個 Future 對象,典型的 Future 設(shè)計模式。

Kafka 消息追加流程

KafkaProducer 的 send 方法,并不會直接向 broker 發(fā)送消息,kafka 將消息發(fā)送異步化,即分解成兩個步驟,send 方法的職責(zé)是將消息追加到內(nèi)存中(分區(qū)的緩存隊列中),然后會由專門的 Send 線程異步將緩存中的消息批量發(fā)送到 Kafka Broker 中。

主要的方法在KafkaProducer#doSend

將消息追加到生產(chǎn)者的發(fā)送緩存區(qū),其實現(xiàn)類為:RecordAccumulator。我們先來看一下 Kafka 一條消息寫到內(nèi)存的流程圖:

Sender線程

到此為止,我們看到,當(dāng)我們調(diào)用send 方法的時候,其實只是發(fā)送到了 生產(chǎn)者客戶端的服務(wù)內(nèi)存中。還沒有到Broker。Kafka producer 客戶端后臺會啟動一個線程不停的輪詢消息批次存放的區(qū)域,把消息發(fā)送給Broker。

消息批次的內(nèi)存結(jié)構(gòu)和分配

根據(jù)上面的源碼我們可以了解到,每一個ProducerBatch 是一塊 大小為batch.size 字節(jié)大小的內(nèi)存。而且用到了池化技術(shù)。

緩沖池的內(nèi)存持有類是 BufferPool,我們先來看下 BufferPool 都有哪些成員:

  1. public class BufferPool { 
  2.   // 總的內(nèi)存大小 
  3.   private final long totalMemory; 
  4.   // 每個內(nèi)存塊大小,即 batch.size 
  5.   private final int poolableSize; 
  6.   // 申請、歸還內(nèi)存的方法的同步鎖 
  7.   private final ReentrantLock lock; 
  8.   // 空閑的內(nèi)存塊 
  9.   private final Deque<ByteBuffer> free
  10.   // 需要等待空閑內(nèi)存塊的事件 
  11.   private final Deque<Condition> waiters; 
  12.   /** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize.  */ 
  13.   // 緩沖池還未分配的空閑內(nèi)存,新申請的內(nèi)存塊就是從這里獲取內(nèi)存值 
  14.   private long nonPooledAvailableMemory; 
  15.   // ... 

從 BufferPool 的成員可看出,緩沖池實際上由一個個 ByteBuffer 組成的,BufferPool 持有這些內(nèi)存塊,并保存在成員 free 中,free 的總大小由 totalMemory 作限制,而 nonPooledAvailableMemory 則表示還剩下緩沖池還剩下多少內(nèi)存還未被分配。

當(dāng) Batch 的消息發(fā)送完畢后,就會將它持有的內(nèi)存塊歸還到 free 中,以便后面的 Batch 申請內(nèi)存塊時不再創(chuàng)建新的 ByteBuffer,從 free 中取就可以了,從而避免了內(nèi)存塊被 JVM 回收的問題。

創(chuàng)建內(nèi)存塊的流程如下:

歸還內(nèi)存塊的邏輯流程

如果歸還的內(nèi)存塊大小等于 batchSize,則將其清空后添加到緩沖池的 free 中,即將其歸還給緩沖池,避免了 JVM GC 回收該內(nèi)存塊。如果不等于就直接將內(nèi)存大小累加到未分配并且空閑的內(nèi)存大小值中即可,內(nèi)存就無需歸還了,等待 JVM GC 回收掉,最后喚醒正在等待空閑內(nèi)存的線程。

Java生產(chǎn)者是如何管理TCP連接的

為何采用 TCP?

Apache Kafka 的所有通信都是基于 TCP 的,而不是基于 HTTP 或其他協(xié)議。無論是生產(chǎn)者、消費者,還是 Broker 之間的通信都是如此。

從社區(qū)的角度來看,在開發(fā)客戶端時,人們能夠利用 TCP 本身提供的一些高級功能,比如多路復(fù)用請求以及同時輪詢多個連接的能力。

TCP 的多路復(fù)用請求會在一條物理連接上創(chuàng)建若干個虛擬連接,每個虛擬連接負(fù)責(zé)流轉(zhuǎn)各自對應(yīng)的數(shù)據(jù)流。其實嚴(yán)格來說,TCP 并不能多路復(fù)用,它只是提供可靠的消息交付語義保證,比如自動重傳丟失的報文。

而且目前已知的 HTTP 庫在很多編程語言中都略顯簡陋。

何時創(chuàng)建 TCP 連接?

TCP 連接是在創(chuàng)建 KafkaProducer 實例時建立的 ,在創(chuàng)建 KafkaProducer 實例時,生產(chǎn)者應(yīng)用會在后臺創(chuàng)建并啟動一個名為 Sender 的線程,該 Sender 線程開始運行時首先會創(chuàng)建與 Broker 的連接。

  1. Properties properties = new Properties(); 
  2. properties.put("bootstrap.servers""localhost:9092"); 
  3. properties.put("key.serializer", StringSerializer.class.getName()); 
  4. properties.put("value.serializer", StringSerializer.class.getName()); 
  5. // try-with-resources 
  6. // 創(chuàng)建KafkaProducer實例時,會在后臺創(chuàng)建并啟動Sender線程,Sender線程開始運行時首先會創(chuàng)建與Broker的TCP連接 
  7. try (Producer<String, String> producer = new KafkaProducer<>(properties)) { 
  8.     ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, KEY, VALUE); 
  9.     Callback callback = (metadata, exception) -> { 
  10.     }; 
  11.     producer.send(record, callback); 
  1. bootstrap.servers是Producer的核心參數(shù)之一,指定了Producer啟動時要連接的Broker地址
  2. 如果bootstrap.servers指定了1000個Broker,那么Producer啟動時會首先創(chuàng)建與這1000個Broker的TCP連接
  3. 因此不建議把集群中所有的Broker信息都配置到bootstrap.servers中,通常配置3~4臺足夠
  4. Producer一旦連接到集群中的任意一臺Broker,就能拿到整個集群的Broker信息(metadata request)

TCP 連接還可能在兩個地方被創(chuàng)建:一個是在更新元數(shù)據(jù)后,另一個是在消息發(fā)送時。

  • 當(dāng)Producer更新了集群的元數(shù)據(jù)后,如果發(fā)現(xiàn)與某些Broker當(dāng)前沒有連接,那么Producer會創(chuàng)建一個TCP連接

【場景1】

當(dāng)Producer嘗試向不存在的主題發(fā)送消息時,Broker會告訴Producer這個主題不存在,此時Producer會發(fā)送metadata request到Kafka集群,去嘗試獲取最新的元數(shù)據(jù)信息,與集群中所有的Broker建立TCP連接。

【場景2】

Producer通過metadata.max.age.ms參數(shù)定期地去更新元數(shù)據(jù)信息,默認(rèn)值300000,即5分鐘。

  • 當(dāng)Producer要發(fā)送消息時,Producer發(fā)現(xiàn)與目標(biāo)Broker(依賴負(fù)載均衡算法)還沒有連接,也會創(chuàng)建一個TCP連接。

何時關(guān)閉 TCP 連接?

Producer端關(guān)閉TCP連接有兩種方式:用戶主動關(guān)閉、Kafka自動關(guān)閉。

【用戶主動關(guān)閉】

廣義的主動關(guān)閉,包括用戶調(diào)用kill -9來殺掉Producer,最推薦的方式:producer.close()

【Kafka自動關(guān)閉】

Producer端參數(shù)connections.max.idle.ms,默認(rèn)值540000,即9分鐘

如果9分鐘內(nèi)沒有任何請求經(jīng)過某個TCP連接,Kafka會主動把TCP連接關(guān)閉

connections.max.idle.ms=-1會禁用這種機制,TCP連接將成為永久長連接

Kafka創(chuàng)建的Socket連接都開啟了keepalive。

【注意】

關(guān)閉TCP連接的發(fā)起方是Kafka客戶端,屬于被動關(guān)閉的場景

被動關(guān)閉的后果就是會產(chǎn)生大量的CLOSE_WAIT連接

Producer端或Client端沒有機會顯式地觀測到此TCP連接已被中斷

總結(jié)

現(xiàn)在我們可以回答開頭的3個問題了。

1、kafka 是單條發(fā)送還是批量發(fā)送消息?

正常情況下都是批量發(fā)送的。封裝成一個ProducerBatch 發(fā)送。

2.kafka 怎么做到單條發(fā)送?

只能設(shè)置單生產(chǎn)者單線程同步調(diào)用send 方法。

3.kafka 發(fā)送消息是順序的嗎?

不是的,如果需求順序必須設(shè)置key,并且是生產(chǎn)者是單線程的。

4.生產(chǎn)者什么情況下可能會頻繁FullGC?

如果你的消息大小比 batchSize 還要大,則不會從 free 中循環(huán)獲取已分配好的內(nèi)存塊,而是重新創(chuàng)建一個新的 ByteBuffer,并且該 ByteBuffer 不會被歸還到緩沖池中(JVM GC 回收),如果此時 nonPooledAvailableMemory 比消息體還要小,還會將 free 中空閑的內(nèi)存塊銷毀(JVM GC 回收),以便緩沖池中有足夠的內(nèi)存空間提供給用戶申請,這些動作都會導(dǎo)致頻繁 GC 的問題出現(xiàn)。

因此,需要根據(jù)業(yè)務(wù)消息的大小,適當(dāng)調(diào)整 batch.size 的大小,避免頻繁 GC。

責(zé)任編輯:姜華 來源: 今日頭條
相關(guān)推薦

2021-07-08 05:52:34

Kafka架構(gòu)主從架構(gòu)

2021-07-07 07:06:31

Brokerkafka架構(gòu)

2021-07-08 07:16:24

RocketMQ數(shù)據(jù)結(jié)構(gòu)Message

2021-07-14 17:18:14

RocketMQ消息分布式

2021-07-13 11:52:47

順序消息RocketMQkafka

2021-09-09 06:55:43

kafka冪等生產(chǎn)者

2020-08-04 10:45:05

運維架構(gòu)技術(shù)

2015-07-28 17:52:36

IOS知識體系

2022-05-10 10:06:03

Kafka

2021-07-12 10:25:03

RocketMQ數(shù)據(jù)結(jié)構(gòu)kafka

2021-07-02 06:27:00

Kafka架構(gòu)主從架構(gòu)

2021-12-22 11:00:05

模型Golang語言

2012-03-08 11:13:23

企業(yè)架構(gòu)

2017-06-22 13:07:21

2017-02-27 16:42:23

Spark識體系

2017-04-03 15:35:13

知識體系架構(gòu)

2020-04-17 14:49:34

Kafka分區(qū)數(shù)據(jù)

2015-08-26 09:39:30

java消費者

2021-12-28 12:01:59

Kafka 消費者機制

2022-05-23 08:20:29

Kafka生產(chǎn)者元數(shù)據(jù)管理
點贊
收藏

51CTO技術(shù)棧公眾號