關(guān)于Kafka消費(fèi)者的這些參數(shù),你應(yīng)該要知道?
本文將對(duì)Kafka Consumer做一個(gè)簡(jiǎn)單的介紹,是深入研究Kafka Conumer的一扇窗。主要從如下三個(gè)方面展開:
- 核心參數(shù)
- 核心組件
- 核心API
1、Kafka Consumer核心參數(shù)覽
個(gè)人覺得,要想深入了解Kafka Consumer的核心工作機(jī)制可以從它的核心參數(shù)切入,為后續(xù)深入了解它的隊(duì)列負(fù)載機(jī)制、消息拉取模型、消費(fèi)模型、位點(diǎn)提交等機(jī)制打下基礎(chǔ)。
kafka Consumer的核心屬性定義在ConsumerConfig中。
1.1 基礎(chǔ)功能參數(shù)
- group.id
消費(fèi)組名稱。
- client.id
客戶端標(biāo)識(shí)id,默認(rèn)為consumer-序號(hào),在實(shí)踐中建議包含客戶端IP,在一個(gè)消費(fèi)組中不能重復(fù)。
- bootstrap.servers
broker服務(wù)端地址列表。
- client.dns.lookup
客戶端尋找bootstrap地址的方式,支持如下兩種方式:
- resolve_canonical_bootstrap_servers_only
這種方式,會(huì)依據(jù)bootstrap.servers提供的主機(jī)名(hostname),根據(jù)主機(jī)上的名稱服務(wù)返回其IP地址的數(shù)組(InetAddress.getAllByName),然后依次獲取inetAddress.getCanonicalHostName(),再建立tcp連接。
一個(gè)主機(jī)可配置多個(gè)網(wǎng)卡,如果啟用該功能,應(yīng)該可以有效利用多網(wǎng)卡的優(yōu)勢(shì),降低Broker的網(wǎng)絡(luò)端負(fù)載壓力。
- use_all_dns_ips
這種方式會(huì)直接使用bootstrap.servers中提供的hostname、port創(chuàng)建tcp連接,默認(rèn)選項(xiàng)。
- enable.auto.commit
是否開啟自動(dòng)位點(diǎn)提交,默認(rèn)為true。
- auto.commit.interval.ms
如果開啟自動(dòng)位點(diǎn)提交,位點(diǎn)的提交頻率,默認(rèn)為5s。
- partition.assignment.strategy
消費(fèi)端隊(duì)列負(fù)載算法,默認(rèn)為按區(qū)間平均分配(RangeAssignor),可選值:輪詢(RoundRobinAssignor)
- auto.offset.reset
重置位點(diǎn)策略,但kafka提交位點(diǎn)時(shí),對(duì)應(yīng)的消息已被刪除時(shí)采取的恢復(fù)策略,默認(rèn)為latest,可選:earliest、none(會(huì)拋出異常)。
- key.deserializer
使用的key序列化類
- value.deserializer
消息體序列化類
- interceptor.classes
消費(fèi)端攔截器,可以有多個(gè)。
- check.crcs
在消費(fèi)端時(shí)是否需要校驗(yàn)CRC,默認(rèn)為true。
1.2 網(wǎng)絡(luò)相關(guān)參數(shù)
- send.buffer.bytes
網(wǎng)絡(luò)通道(TCP)的發(fā)送緩存區(qū)大小,默認(rèn)為128K。
- receive.buffer.bytes
網(wǎng)絡(luò)通道(TCP)的接收緩存區(qū)大小,默認(rèn)為32K。
- reconnect.backoff.ms
重新建立鏈接的等待時(shí)長(zhǎng),默認(rèn)為50ms,屬于底層網(wǎng)絡(luò)參數(shù),基本無需關(guān)注。
- reconnect.backoff.max.ms
重新建立鏈接的最大等待時(shí)長(zhǎng),默認(rèn)為1s,連續(xù)兩次對(duì)同一個(gè)連接建立重連,等待時(shí)間會(huì)在reconnect.backoff.ms的初始值上成指數(shù)級(jí)遞增,但超過max后,將不再指數(shù)級(jí)遞增。
- retry.backoff.ms
重試間隔時(shí)間,默認(rèn)為100ms。
- connections.max.idle.ms
連接的最大空閑時(shí)間,默認(rèn)為9s。
- request.timeout.ms
請(qǐng)求的超時(shí)時(shí)間,與Broker端的網(wǎng)絡(luò)通訊的請(qǐng)求超時(shí)時(shí)間。
1.3 核心工作參數(shù)
- max.poll.records
每一次poll方法調(diào)用拉取的最大消息條數(shù),默認(rèn)為500。
- max.poll.interval.ms
兩次poll方法調(diào)用的最大間隔時(shí)間,單位毫秒,默認(rèn)為5分鐘。如果消費(fèi)端在該間隔內(nèi)沒有發(fā)起poll操作,該消費(fèi)者將被剔除,觸發(fā)重平衡,將該消費(fèi)者分配的隊(duì)列分配給其他消費(fèi)者。
- session.timeout.ms
消費(fèi)者與broker的心跳超時(shí)時(shí)間,默認(rèn)10s,broker在指定時(shí)間內(nèi)沒有收到心跳請(qǐng)求,broker端將會(huì)將該消費(fèi)者移出,并觸發(fā)重平衡。
- heartbeat.interval.ms
心跳間隔時(shí)間,消費(fèi)者會(huì)以該頻率向broker發(fā)送心跳,默認(rèn)為3s,主要是確保session不會(huì)失效。
- fetch.min.bytes
一次拉取消息最小返回的字節(jié)數(shù)量,默認(rèn)為1字節(jié)。
- fetch.max.bytes
一次拉取消息最大返回的字節(jié)數(shù)量,默認(rèn)為1M,如果一個(gè)分區(qū)的第一批消息大小大于該值也會(huì)返回。
- max.partition.fetch.bytes
一次拉取每一個(gè)分區(qū)最大拉取字節(jié)數(shù),默認(rèn)為1M。
- fetch.max.wait.ms
fetch等待拉取數(shù)據(jù)符合fetch.min.bytes的最大等待時(shí)間。
- metadata.max.age.ms
元數(shù)據(jù)在客戶端的過期時(shí)間,過期后客戶端會(huì)向broker重新拉取最新的元數(shù)據(jù),默認(rèn)為5分鐘。
- internal.leave.group.on.close
消費(fèi)者關(guān)閉后是否立即離開訂閱組,默認(rèn)為true,即當(dāng)客戶端斷開后立即觸發(fā)重平衡。如果設(shè)置為false,則不會(huì)立即觸發(fā)重平衡,而是要等session過期后才會(huì)觸發(fā)。
2、KafkaConsumer核心組件與API
通過KafkaConsumer核心參數(shù),我們基本可以窺探Kafka中的核心要點(diǎn),接下來再介紹一下KafkaConsumer的核心組件,為后續(xù)深入研究Kafka消費(fèi)者消費(fèi)模型打下基礎(chǔ)。
2.1 核心組件
KafkaConsumer由如下幾個(gè)核心組件構(gòu)成:
- ConsumerNetworkClient
消費(fèi)端網(wǎng)絡(luò)客戶端,服務(wù)底層網(wǎng)絡(luò)通訊,負(fù)責(zé)客戶端與服務(wù)端的RPC通信。
- ConsumerCoordinator
消費(fèi)端協(xié)調(diào)器,在Kafka的設(shè)計(jì)中,每一個(gè)消費(fèi)組在集群中會(huì)選舉一個(gè)broker節(jié)點(diǎn)成為該消費(fèi)組的協(xié)調(diào)器,負(fù)責(zé)消費(fèi)組狀態(tài)的狀態(tài)管理,尤其是消費(fèi)組重平衡(消費(fèi)者的加入與退出),該類就是消費(fèi)者與broker協(xié)調(diào)器進(jìn)行交互。
- Fetcher
消息拉取。
溫馨提示:本文不打算對(duì)每一個(gè)組件進(jìn)行詳細(xì)解讀,這里建議大家按照本文第一部分關(guān)于各個(gè)參數(shù)的含義,然后對(duì)照這些參數(shù)最終是傳resume遞給哪些組件,進(jìn)行一個(gè)關(guān)聯(lián)思考。
2.2 核心API概述
最后我們?cè)賮砜匆幌孪M(fèi)者的核心API。
- Set< TopicPartition> assignment()
獲取該消費(fèi)者的隊(duì)列分配列表。
- Set< String> subscription()
獲取該消費(fèi)者的訂閱信息。
- void subscribe(Collection< String> topics)
訂閱主題。
- void subscribe(Collection< String> topics, ConsumerRebalanceListener callback)
訂閱主題,并指定隊(duì)列重平衡的監(jiān)聽器。
- void assign(Collection< TopicPartition> partitions)
取代 subscription,手動(dòng)指定消費(fèi)哪些隊(duì)列。
- void unsubscribe()
取消訂閱關(guān)系。
- ConsumerRecords
poll(Duration timeout)
拉取消息,是 KafkaConsumer 的核心方法,將在下文詳細(xì)介紹。
- void commitSync()
同步提交消費(fèi)進(jìn)度,為本批次的消費(fèi)提交,將在后續(xù)文章中詳細(xì)介紹。
- void commitSync(Duration timeout)
同步提交消費(fèi)進(jìn)度,可設(shè)置超時(shí)時(shí)間。
- void commitSync(Map
offsets)
顯示同步提交消費(fèi)進(jìn)度, offsets 指明需要提交消費(fèi)進(jìn)度的信息。
- void commitSync(final Map
offsets, final Duration timeout)
顯示同步提交消費(fèi)進(jìn)度,帶超時(shí)間。
- void seek(TopicPartition partition, long offset)
重置 consumer#poll 方法下一次拉消息的偏移量。
- void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata)
seek 方法重載方法。
- void seekToBeginning(Collection< TopicPartition> partitions)
將 poll 方法下一次的拉取偏移量設(shè)置為隊(duì)列的初始偏移量。
- void seekToEnd(Collection< TopicPartition> partitions)
將 poll 方法下一次的拉取偏移量設(shè)置為隊(duì)列的最大偏移量。
- long position(TopicPartition partition)
獲取將被拉取的偏移量。
- long position(TopicPartition partition, final Duration timeout)
同上。
- OffsetAndMetadata committed(TopicPartition partition)
獲取指定分區(qū)已提交的偏移量。
- OffsetAndMetadata committed(TopicPartition partition, final Duration timeout)
同上。
- Map metrics()
統(tǒng)計(jì)指標(biāo)。
- List< PartitionInfo> partitionsFor(String topic)
獲取主題的路由信息。
- List< PartitionInfo> partitionsFor(String topic, Duration timeout)
同上。
- Map listTopics()
獲取所有 topic 的路由信息。
- Map listTopics(Duration timeout)
同上。
- Set< TopicPartition> paused()
獲取已掛起的分區(qū)信息。
- void pause(Collection< TopicPartition> partitions)
掛起分區(qū),下一次 poll 方法將不會(huì)返回這些分區(qū)的消息。
- void resume(Collection< TopicPartition> partitions)
恢復(fù)掛起的分區(qū)。
- Map
offsetsForTimes(MaptimestampsToSearch)
根據(jù)時(shí)間戳查找最近的一條消息的偏移量。
- Map
offsetsForTimes(MaptimestampsToSearch, Duration timeout)
同上。
- Map
beginningOffsets(Collection< TopicPartition> partitions)
查詢指定分區(qū)當(dāng)前最小的偏移量。
- Map
beginningOffsets(Collection< TopicPartition> partitions, Duration timeout)
同上。
- Map
endOffsets(Collection< TopicPartition> partitions)
查詢指定分區(qū)當(dāng)前最大的偏移量。
- Map
endOffsets(Collection< TopicPartition> partitions, Duration timeout)
同上。
- void close()
關(guān)閉消費(fèi)者。
- void close(Duration timeout)
關(guān)閉消費(fèi)者。
- void wakeup()
喚醒消費(fèi)者。
Kafka提供的消費(fèi)者并不像RocketMQ提供了Push模式自動(dòng)拉取消息,需要應(yīng)用程序自動(dòng)組織這些API進(jìn)行消息拉取。
值得注意的kafka消費(fèi)者也支持位點(diǎn)自動(dòng)提交機(jī)制,kafka的消費(fèi)者(KafkaConsumer)對(duì)象是線程不安全的。
基于KafkaConsumer的pause(暫停某些分區(qū)的消費(fèi))與resume(恢復(fù)某些分區(qū)的消費(fèi)),可以輕松實(shí)現(xiàn)消費(fèi)端限流機(jī)制。
本文主要是對(duì)消費(fèi)者有一個(gè)大概的了解,后續(xù)文章將持續(xù)逐一解開消費(fèi)者的核心運(yùn)作機(jī)制,請(qǐng)持續(xù)關(guān)注。
本文轉(zhuǎn)載自微信公眾號(hào)「中間件興趣圈」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系中間件興趣圈公眾號(hào)。