帶你入門Kafka,你知道的越多不知道的也越多!
本文轉(zhuǎn)載自微信公眾號(hào)「小菜良記」,作者蔡不菜丶。轉(zhuǎn)載本文請(qǐng)聯(lián)系小菜良記公眾號(hào)。
初始Kafka
1、介紹
Kafka 起初是由 Linkedin 公司采用 Scala 語言開發(fā)的一個(gè)多分區(qū)、多副本且基于 ZooKeeper協(xié)調(diào)的分布式消息系統(tǒng),現(xiàn)己被捐獻(xiàn)給 Apache 基金會(huì) 。目前 Kafka 已經(jīng)定位為一個(gè)分布式流式處理平臺(tái),它以高吞吐、可持久化、可水平擴(kuò)展、支持流數(shù)據(jù)處理等多種特性而被廣泛使用。
2、使用場景
消息系統(tǒng):Kafka 和傳統(tǒng)的消息系統(tǒng)(消息中間件)都具備系統(tǒng)解耦、冗余存儲(chǔ)、流量削峰、緩沖、異步通信、擴(kuò)展性、可恢復(fù)性等功能。與此同時(shí),Kafka還提供了大多數(shù)消息系統(tǒng)難以實(shí)現(xiàn)的消息順序性保障以及回溯消費(fèi)的功能。
存儲(chǔ)系統(tǒng):Kafka把消息持久化到磁盤,相比于其他基于內(nèi)存存儲(chǔ)的系統(tǒng)而言,有效地降低了數(shù)據(jù)丟失的風(fēng)險(xiǎn)。也正是得益于 Kafka 的消息持久化功能和多副本機(jī)制,我們可以把 Kafka 作為長期的數(shù)據(jù)存儲(chǔ)系統(tǒng)來使用,只需要把對(duì)應(yīng)的數(shù)據(jù)保留策略設(shè)置為 “永久” 或啟用主題的日志壓縮功能即可。
流式處理平臺(tái):Kafka 不僅為每個(gè)流行的流式處理框架里提供了可靠的數(shù)據(jù)來源,還提供了一個(gè)完整的流式處理類庫,比如窗口、連接、交換和聚合等各類操作。
3、基本概念
Kafka體系架構(gòu)包括若干 「Producer」,「Broker」,「Consumer」以及一個(gè)ZooKeeper集群。
- ZooKeeper:是 Kafka 用來負(fù)責(zé)集群元數(shù)據(jù)的管理、控制器的選舉等操作的。
- Producer:生產(chǎn)者,發(fā)送消息的一方。負(fù)責(zé)創(chuàng)建消息,然后將其投遞到 Kafka 中。
- Consumer:消費(fèi)者,接收消息的一方。連接到 Kafka 后接收消息,并進(jìn)行相應(yīng)的業(yè)務(wù)邏輯處理。
- Broker:服務(wù)代理節(jié)點(diǎn)。對(duì)于 Kafka 而言,Broker 可以簡單地看作一個(gè)獨(dú)立的 Kafka 服務(wù)節(jié)點(diǎn)或 Kafka 服務(wù)實(shí)例。大多數(shù)情況下也可以將 Broker 看作一臺(tái) Kafka 服務(wù)器,前提是這臺(tái)服務(wù)器上只部署了一個(gè) Kafka 實(shí)例。一個(gè)或多個(gè)Broker 組成了一個(gè) Kafka 集群。
整體 Kafka 體系大概是由上面幾部分構(gòu)成。除此之外,還有兩個(gè)特別重要的概念:主題(Topic)和分區(qū)(Partition)
- 主題:Kafka 中的消息以主題為單位進(jìn)行歸類,生產(chǎn)者負(fù)責(zé)將消息發(fā)送到特定的主題(發(fā)送到 Kafka 集群中的每一條消息都要指定一個(gè)主題),而消費(fèi)者負(fù)責(zé)訂閱主題并進(jìn)行消費(fèi)。
- 分區(qū):主題是一個(gè)邏輯上的概念。還可以細(xì)分為多個(gè)分區(qū),一個(gè)分區(qū)只屬于單個(gè)主題,很多時(shí)候也會(huì)把分區(qū)稱為主題分區(qū)(Topic-Partition)。同一主題下的不同分區(qū)包含的消息是不同的,分區(qū)在存儲(chǔ)層面可以看作一個(gè)可追加的「日志文件」,消息在被追加到分區(qū)日志文件的時(shí)候都會(huì)分配一個(gè)特定的偏移量(offset)。offset 是消息在分區(qū)中的唯一標(biāo)識(shí),Kafka 通過它來保證消息在分區(qū)內(nèi)的順序性,不過offset并不跨越分區(qū),也就是說,Kafka 保證的是分區(qū)有序而不是主題有序。
Kafka 為分區(qū)引入了多副本(Replica) 機(jī)制,通過增加副本數(shù)量可以提升容災(zāi)能力。
同一分區(qū)的不同副本中保存的是相同的消息(在同一時(shí)刻,副本之間并非完全一樣),副本之間是“ 一主多從”的關(guān)系,其中 leader 副本負(fù)責(zé)處理讀寫請(qǐng)求 ,follower 副本只負(fù)責(zé)與 leader 副本的消息同步。副本處于不同的 broker 中 ,當(dāng) leader 副本出現(xiàn)故障時(shí),從 follower 副本中重新選舉新的 leader 副本對(duì)外提供服務(wù)。
「Kafka 通過多副本機(jī)制實(shí)現(xiàn)了故障的自動(dòng)轉(zhuǎn)移,當(dāng) Kafka 集群中某個(gè) broker 失效時(shí)仍然能保證服務(wù)可用 ?!?/p>
在我們繼續(xù)了解 Kafka 之前,我們還需要明白幾個(gè)關(guān)鍵詞:
- AR(Assigned Replicas):分區(qū)中所有副本統(tǒng)稱為 AR
- ISR(In-Sync Replicas):所有與 leader 副本保持一定程度同步的副本(包括 leader 副本在內(nèi))組成 ISR。ISR 集合是 AR 集合中的一個(gè)子集 。消息會(huì)先發(fā)送到 leader 副本,然后 follower 副本才能從 leader 副本中拉取消息進(jìn)行同步,同步期間內(nèi)follower 副本相對(duì)于 leader 副本而言會(huì)有一定程度的滯后 。
- OSR(Out-of-Sync Replicas):與 leader 副本同步滯后過多的副本(不包括 leader 副本)組成 OSR
由以上關(guān)系我們可以得出一個(gè)公式:AR=ISR+OSR
- HW(High Watermark):俗稱高水位,是用來標(biāo)識(shí)一個(gè)特定的消息偏移量(offset),消費(fèi)者只能拉取到這個(gè) offset 之前的消息
- LEO(LogStartOffset):下一條待寫入消息的 offset
相信很多小伙伴看到這里有點(diǎn)不耐煩了,這 Kafka 怎么這么難,還能不能好好學(xué)習(xí)了
莫急莫急,理論知識(shí)咱們還是要先過一遍,這可不是勸退的開始,這是你成長的開始!下面小菜力求用最簡樸的語句帶你入最深的坑!
Kafka 之 生產(chǎn)大隊(duì)
眾所周知,Kafka 說高尚點(diǎn)是一個(gè)分布式消息隊(duì)列,簡單來說不就是一個(gè)消息隊(duì)列。消息隊(duì)列簡單來說不就是推數(shù)據(jù),拿數(shù)據(jù)的嘛。沒錯(cuò),高端的知識(shí)往往需要簡單的理解。
那么數(shù)據(jù)從哪來,數(shù)據(jù)從生產(chǎn)隊(duì)來!從編程的角度而言,生產(chǎn)大隊(duì)里面有一群生產(chǎn)者(當(dāng)然也可以只有一個(gè)),生產(chǎn)者就是負(fù)責(zé)向 Kafka 發(fā)送消息的應(yīng)用程序。
客戶端開發(fā)
生產(chǎn)過程大致得具備以下幾個(gè)步驟方能生產(chǎn):
- 配置生產(chǎn)者客戶端參數(shù)以及創(chuàng)建響應(yīng)的生產(chǎn)者實(shí)例
- 構(gòu)建待發(fā)送的消息
- 發(fā)送消息
- 關(guān)閉生產(chǎn)者實(shí)例
「四大步驟一梭子解決生產(chǎn)問題」
上面的代碼中可以看到我們往 Properties 文件中 put 進(jìn)了四個(gè)參數(shù),分別為:
- bootstrap.servers:改參數(shù)用來指定生產(chǎn)者客戶端連接 Kafka 集群所需的 broker 地址。格式為(host1:port1,host2:port2),可以設(shè)置一個(gè)或多個(gè)地址,中間以逗號(hào)隔開,默認(rèn)值為 “ ”
- key.serializer 和 value.serializer:分別用來指定 key 和 value 序列化操作的序列化器,這兩個(gè)參數(shù)無默認(rèn)值,需要填寫序列化器的全限定名
- client.id:用來設(shè)定 KafkaProducer 對(duì)應(yīng)的客戶端 id,默認(rèn)值為 “ ”。如果客戶端不設(shè)置,則KafkaProducer 會(huì)自動(dòng)生成一個(gè)非空字符串,內(nèi)容形式為 “producer-1”,“producer-2”,即字符串 “producer-” 和數(shù)字的拼接
其中ProducerRecord定義如下:
- topic和partition :分別代表消息要發(fā)往的主題和分區(qū)號(hào);
- headers:消息的頭部,不需要時(shí)可以不設(shè)置
- key:用來指定消息的鍵,它不經(jīng)是消息的附加消息,還可以用來計(jì)算分區(qū)號(hào)進(jìn)而可以讓消息發(fā)往特定的分區(qū)。
- value:消息體,一般不為空,如果為空則表示特定的消息 -- 「墓碑消息」
- timestamp:消息的時(shí)間戳,它有 CreateTime 和 LogAppendTime 兩種類型,前者表示消息創(chuàng)建的時(shí)間,后者表示消息追加到日志文件的時(shí)間
上面的操作就是創(chuàng)建生產(chǎn)者實(shí)例和構(gòu)建消息,發(fā)送消息主要有三種模式:
- 發(fā)后即忘(fire-and-forget)
- 同步(sync)
- 異步(async)
而我們上面使用的發(fā)送方式就是發(fā)后即忘,它只管往 Kafka 中發(fā)送消息而并不關(guān)心消息是否正確到達(dá),大多數(shù)情況下,這種發(fā)送方式是沒有什么問題的,不過在有些時(shí)候(發(fā)生不可重試異常)會(huì)造成消息丟失?!副M管這種發(fā)送方式性能最高,但是可靠性也最差?!?/p>
- public Future<RecordMetadata> send(ProducerRecord<K,V> record) {}
從send方法來看,是返回一個(gè)Future對(duì)象
- Future res = producer.send(record);
這說明 send()方法本身就是異步的,send()方法返回的Future對(duì)象可以使調(diào)用方稍后獲得發(fā)送的結(jié)果。如果我們想實(shí)現(xiàn)同步的效果,可以直接調(diào)用Future的get()方法實(shí)現(xiàn)。
- try {
- producer.send(record).get();
- } catch (Exception e) {
- e.printStackTrace();
- }
通過get()方法來阻塞等待 Kafka 的響應(yīng),直到消息發(fā)送成功,或者發(fā)生異常
生產(chǎn)也能異步?
在 Kafka 中 send()方法有另外一個(gè)重載方式:
- public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) {}
- producer.send(record, new Callback() {
- @Override
- public void onCompletion(RecordMetadata recordMetadata, Exception e) {
- if (Objects.isNull(e)) {
- System.out.println("主題:" + recordMetadata.topic());
- } else {
- System.out.println(e.getMessage());
- }
- }
- });
使用 Callback 的方式非常簡潔明了,Kafka 有響應(yīng)時(shí)就會(huì)回調(diào),要么發(fā)送成功,要么拋出異常。
onCompletion()方法中兩個(gè)參數(shù)是互斥的,如果發(fā)送成功則RecordMetadata不為空,Exception為空,如果發(fā)送失敗則相反。
生產(chǎn)也有困難?
在 KafkaProducer 中 一般會(huì)發(fā)生兩種類型的異常:
- 可重試異常
NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、
NotEnoughReplicasException、NotCoordinatorException
- 不可重試異常
RecordTooLargeException等
對(duì)可重試異常我們可以配置 「retries」參數(shù),如果在規(guī)定的重試次數(shù)內(nèi)自行恢復(fù),就不會(huì)拋出異常,「retries」參數(shù)的默認(rèn)值為 0 ,配置方式如下:
- properties.put(ProducerConfig.RETRIES_CONFIG, 10);
上述例子中含義為,重試次數(shù)為 10 次,如果超過 10 次還沒恢復(fù)則會(huì)拋出異常。
不可重試異常如RecordTooLargeException,暗示了如果發(fā)送消息太大,則不會(huì)進(jìn)行重試,直接拋出異常。
序列化來助力
生產(chǎn)者需要用序列化器(Serializer)把對(duì)象轉(zhuǎn)換成字節(jié)數(shù)組才能通過網(wǎng)絡(luò)發(fā)送給 Kafka,對(duì)應(yīng)的消費(fèi)者也需要用反序列化器(Deserializer)把 Kafka 中收到的字節(jié)數(shù)組轉(zhuǎn)換成相應(yīng)的對(duì)象。
在上面代碼使用到的StringSerializer實(shí)現(xiàn)了Serializer接口
其中 configure()方法用來配置當(dāng)前類,serizlize()方法用來執(zhí)行序列化操作
「生產(chǎn)者使用的序列化器和消費(fèi)者使用的反序列化器是需要一一對(duì)應(yīng)的」
當(dāng)然除了可以用 Kafka 提供的序列化器,我們也可以自定義序列化器:
「Student.class」:
- @Data
- public class Student {
- private String name;
- private String remark;
- }
「MySerializer」:
「使用」:
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MySerializer.class.getName());
只需要在 Properties 中 put 進(jìn)我們自己的序列化器即可,沒想到也挺簡單的嘛!
分區(qū)器又是啥?
消息在通過 send() 方法發(fā)送到 broker 的過程中,可能需要經(jīng)過 「攔截器(Interceptor)」,「序列化器(Serializer)」 和「分區(qū)器(Partitioner)」
其中 「攔截器」 不是必需的,「序列化器」 是必須的,經(jīng)過序列化器后就需要確定它發(fā)往的分區(qū),如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要「分區(qū)器」的作用,因?yàn)閜artition代表的就是所要發(fā)往的分區(qū)號(hào)。
- package org.apache.kafka.clients.producer;
- public interface Partitioner extends Configurable, Closeable {
- int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
- void close();
- }
上述是 kafka 中的Partitioner 接口,可以看到里面有個(gè)方法partition()是用來計(jì)算分區(qū)號(hào),返回 int 類型的值,其中六個(gè)參數(shù)分別代表:
- topic:主題
- key:鍵
- keyBytes:序列化后的鍵
- value:值
- valueBytes:序列化后的值
- cluster:集群的元數(shù)據(jù)信息
在partition()方法中定義了主要的分區(qū)分配邏輯,如果 key 不為空時(shí),那么默認(rèn)的分區(qū)器會(huì)對(duì) key 進(jìn)行haxi(采用MurmurHash2算法),最終根據(jù)得到的哈希值來計(jì)算分區(qū)號(hào),擁有相同 key 的消息會(huì)被寫入同一個(gè)分區(qū),如果 key 為空,那么消息將會(huì)以輪詢的方式發(fā)往主題內(nèi)的各個(gè)可用分區(qū)。
「如果 key 不為 null,那么計(jì)算得到的分區(qū)號(hào)會(huì)是所有分區(qū)中的任意一個(gè),如果 key 為 空 ,那么計(jì)算得到的分區(qū)號(hào)僅為可用分區(qū)中的任意一個(gè)」
當(dāng)然,分區(qū)器也是可以自定義的,操作如下:
「MyPartitioner.class」:
「使用」:
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());
自定義分區(qū)器使用起來也簡單,只需要實(shí)現(xiàn) Partitioner 接口
攔截器來了?
做 web 開發(fā)的同學(xué)相信對(duì)攔截器一點(diǎn)也不陌生,在 Kafka 中也具有攔截器的功能,攔截器又分為「生產(chǎn)者攔截器」和「消費(fèi)者攔截器」
生產(chǎn)者攔截器可以在消息發(fā)送前做一些準(zhǔn)備工作,比如按照某個(gè)規(guī)則過濾不符合要求的消息,修改消息的內(nèi)容等,也可以用來在發(fā)送回調(diào)邏輯前做一些定制化的需求。
那么有需要就會(huì)有自定義,在自定義攔截器的時(shí)候我們只需要實(shí)現(xiàn)ProducerInterceptor接口即可:
- package org.apache.kafka.clients.producer;
- public interface ProducerInterceptor <K, V> extends Configurable {
- ProducerRecord<K,V> onSend(ProducerRecord<K,V> producerRecord);
- void onAcknowledgement(RecordMetadata recordMetadata, Exception e);
- void close();
- }
其中onSend()方法可以對(duì)消息進(jìn)行相應(yīng)的定制化操作,onAcknowledgement()方法是在消息發(fā)送失敗或者消息被應(yīng)答(Acknowledgement)之前調(diào)用,優(yōu)先于用戶設(shè)定的 Callback。
自定義攔截器如下:MyProducerInterceptor.class:
在onSend()方法中我們修改了將要發(fā)送的消息,在onAcknowledgement()方法中我們統(tǒng)計(jì)了發(fā)送成功數(shù)和失敗數(shù),接著在close()方法中,我們將成功數(shù)和失敗數(shù)進(jìn)行了輸出
同樣的使用方法:
- properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
有一個(gè)攔截器自然就會(huì)形成一個(gè)攔截器鏈,我們可以自定義多個(gè)攔截器,然后在 Properties 文件中聲明:
- properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor1.class.getName() + "," + MyProducerInterceptor2.class.getName());
「這樣子下一個(gè)攔截器就會(huì)依賴于前一個(gè)攔截器的輸出」
重要參數(shù)
除了上述已經(jīng)出現(xiàn)的參數(shù),還有以下一些重要的參數(shù):
1. ack
這個(gè)參數(shù)用來指定分區(qū)中必須要有多少個(gè)副本收到這條消息,之后生產(chǎn)者才會(huì)認(rèn)為這條
- properties.put(ProducerConfig.ACKSCONFIG,”0”); //注意是字符串類型
消息是寫入成功的。ack 中有三種類型(String)的值
- acks = 1:默認(rèn)值為1,生產(chǎn)者發(fā)送消息之后,只要分區(qū)的leader 副本成功寫入消息,那么它就會(huì)收到來自服務(wù)端的成功響應(yīng) 。如果消息寫入 leader 副本并返回成功響應(yīng)給生產(chǎn)者,且在被其他 fo llower 副本拉取之前 leader 副本崩潰,那么此時(shí)消息還是會(huì)丟失。
- acks = 0:生產(chǎn)者發(fā)送消 息之后不需要等待任何服務(wù)端的響應(yīng)。如果在消息從發(fā)送到寫入 Kafka 的過程中出現(xiàn)某些異常,導(dǎo)致 Kafka 并沒有收到這條消息,那么生產(chǎn)者也無從得知,消息也就丟失了。在其他配置環(huán)境相同的情況下,acks 設(shè)置為 0 可以達(dá)到最大的吞吐量。
- acks = -1或 acks = all:生產(chǎn)者在消 息發(fā)送之后,需要等待 ISR 中的所有副本都成功 寫入消息之后才能夠收到來自服務(wù)端的成功響應(yīng)。在其他配置環(huán)境相同的情況下,acks 設(shè)置為 1或(all)可以達(dá)到最強(qiáng)的可靠性。
設(shè)置:
- properties.put(ProducerConfig.ACKSCONFIG,”0”); //注意是字符串類型
2. max.request.size
用來限制生產(chǎn)者客戶端能發(fā)送的消息的最大值,默認(rèn)值為1048576B ,即 1MB 。
3. retries
用來配置生產(chǎn)者重試的次數(shù),默認(rèn)值為 0,即在發(fā)生異常的時(shí)候不進(jìn)行任何重試動(dòng)作。
4. retry.backoff.ms
用來設(shè)定兩次重試之間的時(shí)間間隔,避免無效的頻繁重試,默認(rèn)值為 100
5. connections.max.idle.ms
這個(gè)參數(shù)用來指定在多久之后關(guān)閉限制的連接,默認(rèn)值是 540000( ms ),即 9 分鐘。
6.buffer.memory
用來設(shè)置緩存消息的緩沖區(qū)大小
7.batch.size
用來設(shè)定可以復(fù)用內(nèi)存區(qū)域的大小
Kafka 之 消費(fèi)群體
有生產(chǎn)就有消費(fèi),你說是吧!與生產(chǎn)者對(duì)應(yīng)的是消費(fèi)者,應(yīng)用程序可以通過 KafkaConsumer 來訂閱主題,并從訂閱的主題中拉取消息
個(gè)體和群體?
每個(gè)消費(fèi)者都有一個(gè)對(duì)應(yīng)的消費(fèi)組。消費(fèi)者( Consumer )負(fù)責(zé)訂閱 Kafka 中的主題( Topic ),并且從訂閱的主題上拉取消息。當(dāng)消息發(fā)布到主題后,只會(huì)被投遞給訂閱它的每個(gè)消費(fèi)組中的一個(gè)消費(fèi)者 。
當(dāng)消費(fèi)組中只有一個(gè)消費(fèi)者的時(shí)候,是這樣的情況:
當(dāng)消費(fèi)組中有兩個(gè)消費(fèi)者的時(shí)候,是這樣的情況:
從上面的分配情況可以看出,隨著消費(fèi)者的增加,可以讓整體的消費(fèi)能力具有橫向伸縮性。我們可以增加(或減少)消費(fèi)者的個(gè)數(shù)來提高(或降低)整體的消費(fèi)能力。當(dāng)時(shí)在分區(qū)數(shù)固定的情況下,盲目地增加消費(fèi)者并不會(huì)讓消費(fèi)能力一直得到提升,如果消費(fèi)者過多,就會(huì)出現(xiàn)消費(fèi)者個(gè)數(shù)大于分區(qū)個(gè)數(shù)的情況,就會(huì)有消費(fèi)者分配不到任何分區(qū)。
以上分配邏輯都是基于默認(rèn)的分區(qū)分配策略進(jìn)行分析的,可以通過消費(fèi)者客戶端配置partition.assignment.strategy來設(shè)置消費(fèi)者與訂閱主題之間的分區(qū)分配策略。
投遞模式
Kafka 中有兩種消息投遞模式:
點(diǎn)對(duì)點(diǎn)模式(Point-to-Point)
基于隊(duì)列的,消息生產(chǎn)者發(fā)送消息到隊(duì)列,消息消費(fèi)者從隊(duì)列中接收消息
發(fā)布/訂閱模式(Pub/Sub)
基于主題的,主題可以認(rèn)為是消息傳遞的中介,消息發(fā)布者將消息發(fā)布到某個(gè)主題,而消息訂閱者從主題中訂閱消息。主題使得消息的訂閱者和發(fā)布者互相保持獨(dú)立,不需要進(jìn)行接觸即可保證消息的傳遞,發(fā)布/訂閱模式在消息的一對(duì)多廣播時(shí)采用。
客戶端開發(fā)
消費(fèi)過程大致得具備以下幾個(gè)步驟方能消費(fèi):
- 配置消費(fèi)者客戶端參數(shù)以及創(chuàng)建相應(yīng)的消費(fèi)者實(shí)例
- 訂閱主題
- 拉取消息并消費(fèi)
- 提交消費(fèi)位移
- 關(guān)閉消費(fèi)者實(shí)例
可以看出在配置消費(fèi)者參數(shù)的時(shí)候,我們看到了幾個(gè)熟悉的參數(shù):
- bootstrap.servers:為了防止書寫出錯(cuò),可以用ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG表示,用來指定連接 Kafka 集群所需的 broker 地址清單,可以設(shè)置一個(gè)或多個(gè)地址,中間用逗號(hào)隔開,默認(rèn)值為 " "
- group.id:為了防止書寫出錯(cuò),可以用ConsumerConfig.GROUP_ID_CONFIG表示,消費(fèi)者所在消費(fèi)組的名稱,默認(rèn)值為 " ",如果設(shè)置為空,則會(huì)拋出異常
- key.deserializer/value.deserializer:為了防止書寫錯(cuò)誤,可以用ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG和ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG表示,消費(fèi)端所需要執(zhí)行響應(yīng)的反序列化操作,需要和生產(chǎn)端一致
client.id:為了防止書寫錯(cuò)誤,可以用ConsumerConfig.CLIENT_ID_CONFIG表示,用來設(shè)定 KafkaConsumer 對(duì)應(yīng)的客戶端 id,默認(rèn)值為 " "
主題的訂閱
消費(fèi)者消費(fèi)消息,重要的就是訂閱相對(duì)應(yīng)的主題。在上述的例子中我們是通過 consumer.subscribe(Arrays.asList(topic)); 來訂閱主題的,可以看出一個(gè)消費(fèi)者可以訂閱一個(gè)或多個(gè)主題。我們來看下 subscribe() 這個(gè)方法的重載:
- public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { /* compiled code */ }
- public void subscribe(Collection<String> topics) { /* compiled code */ }
- public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { /* compiled code */ }
- public void subscribe(Pattern pattern) { /* compiled code */ }
如果我們?cè)谟嗛喼黝}的過程中出現(xiàn)了以下情況:
- consumer.subscribe(Arrays.asList(topic1));
- consumer.subscribe(Arrays.asList(topic2));
那么最終情況只會(huì)訂閱到 topic2,而不是topic1,更不是topic1和topic2的結(jié)合。
subscribe()這個(gè)方法重載后也支持正則表達(dá)式:
- consumer.subscribe(Pattern.compile(”topic.*”));
這樣配置后,如果有人創(chuàng)建了新的主題,并且主題的名字與正則表達(dá)式相匹配,那么這個(gè)消費(fèi)者就可以消費(fèi)到新添加的主題中的消息。
subscribe()這個(gè)方法除了傳入主題和正則作為參數(shù),還有兩個(gè)方法支持了 ConsumerRebalanceListener 參數(shù)的傳入,這個(gè)是用來設(shè)置相應(yīng)的再均衡監(jiān)聽器。
消費(fèi)者除了可以通過subscribe()方法來訂閱主題之外,還可以通過assign()方法來實(shí)現(xiàn)直接訂閱某些主題的特定分區(qū)。
- public void assign(Collection<TopicPartition> partitions)
其中TopicPartition 對(duì)象定義如下:
構(gòu)造函數(shù)中需要傳入「訂閱的主題」和「分區(qū)編號(hào)」,使用如下:
- consumer.assign(Arrays.asList(new TopicPartition(”kafka-demo”, 0))) ;
這樣子我們就可以訂閱 kafka-demo中的 0 號(hào)分區(qū)了。
如果我們事先并不知道主題中有多少個(gè)分區(qū)怎么辦?KafkaConsumer 中的 partitionsFor()方法可以用來查詢指定主題的元數(shù)據(jù)信息,partitionsFor()方法定義如下:
- public List <PartitionInfo> partitionsFor(String topic);
其中 PartitionInfo對(duì)象定義如下:
- public class Partitioninfo {
- private final String topic; //主題名稱
- private final int partition; //分區(qū)編號(hào)
- private final Node leader; //分區(qū)的leader副本所在的位置
- private final Node[] replicas; //分區(qū)的AR集合
- private final Node[] inSyncReplicas; //分區(qū)的ISR集合
- private final Node[] offlineReplicas; //分區(qū)的OSR集合
- }
訂閱不是惡意捆綁的,能訂閱就能夠取消訂閱,我們可以使用 KafkaConsumer 中的 unsubscribe()方法采取消主題的訂閱。這個(gè)方法既可以取消通過subscribe(Collection)方式實(shí)現(xiàn)的訂閱,也可以取消通過subscribe(Pattem)方式實(shí)現(xiàn)的訂閱,還可以取消通過assign(Collection)方式實(shí)現(xiàn)的訂閱 。
- consumer.unsubscribe() ;
如果將 subscribe(Collection)或 assign(Collection) 中 的集合參數(shù)設(shè)置為空集合 ,那么 作用等同于unsubscribe()方法 ,下面示例中 的三行代碼的效果相同:
- consumer.unsubscribe();
- consumer.subscribe(new ArrayList<String>());
- consumer.assign(new ArrayList<TopicPartition>());
消費(fèi)模式
消息的消費(fèi)模式一般有兩種:「推模式」和「拉模式」。而 Kafka 中的消費(fèi)是基于「拉模式」
推模式:服務(wù)端主動(dòng)將消息推送給消費(fèi)者
拉模式:消費(fèi)者主動(dòng)向服務(wù)端發(fā)起拉取請(qǐng)求
Kafka 的消息消費(fèi)是一個(gè)不斷輪詢的過程,消費(fèi)者所要做的就是重復(fù)地調(diào)用poll()方法,如果某些分區(qū)中沒有可供消費(fèi)的消息,那么此分區(qū)對(duì)應(yīng)的消息拉取的結(jié)果就為空;如果訂閱的所有分區(qū)中都沒有可供消費(fèi)的消息,那么 poll()方法返回為空的消息集合。
- public ConsumerRecords<K, V> poll(final Duration timeout)
在poll()方法中可以傳入一個(gè)超時(shí)時(shí)間參數(shù) timeout,用來控制 poll()方法的阻塞時(shí)間,在消費(fèi)者的緩沖區(qū)里沒有可用數(shù)據(jù)時(shí)會(huì)發(fā)生阻塞。
通過poll()方法拉取到的消息是一個(gè)ConsumerRecord對(duì)象,定義如下:
我們?cè)谙M(fèi)消息的時(shí)候可以直接對(duì) ConsumerRecord 中感興趣的字段進(jìn)行具體的業(yè)務(wù)邏輯處理。
消費(fèi)者攔截器
我們上面已經(jīng)講到了生產(chǎn)者攔截器的使用,當(dāng)然,消費(fèi)者也有響應(yīng)的攔截器的概念。消費(fèi)者攔截器主要在消費(fèi)到消息或在提交消費(fèi)位移時(shí)進(jìn)行一些定制化的操作。
生產(chǎn)者定義攔截器的方式是實(shí)現(xiàn) ProducerInterceptor 接口,而消費(fèi)者定義攔截器的方式則是實(shí)現(xiàn)ConsumerInterceptor接口,ConsumerInterceptor定義如下:
- package org.apache.kafka.clients.consumer;
- public interface ConsumerInterceptor <K, V> extends Configurable, AutoCloseable {
- ConsumerRecords<K,V> onConsume(ConsumerRecords<K,V> consumerRecords);
- void onCommit(Map<TopicPartition,OffsetAndMetadata> map);
- void close();
- }
- onConsume():KafkaConsumer 會(huì)在 poll()方法返回之前調(diào)用攔截器的 onConsume()方法來對(duì)消息進(jìn)行相應(yīng)的定制化操作,比如修改返回的消息內(nèi)容、按照某種規(guī)則過濾消息(可能會(huì)減少poll()方法返回的消息的個(gè)數(shù))。如果onConsume()方法中拋出異常,那么會(huì)被捕獲并記錄到日志中,但是異常不會(huì)再向上傳遞。
- onCommit():KafkaConsumer 會(huì)在提交完消費(fèi)位移之后調(diào)用攔截器的 onCommit()方法,可以使用這個(gè)方法來記錄跟蹤所提交的位移信息,比如當(dāng)消費(fèi)者使用commitSync的無參方法時(shí),我們不知道提交的消費(fèi)位移的具體細(xì)節(jié),而使用攔截器的 onCommit()方法卻可以做到這一點(diǎn)。
我們?cè)谧远x攔截器后,也是用過相同的方式使用:
- properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG ,MyConsumerInterceptor.class.getName());
重要參數(shù)
除了上述已經(jīng)出現(xiàn)的參數(shù),還有以下一些重要的參數(shù):
1. fetch.min.bytes
該參數(shù)用來配置 Consumer 在一次拉取請(qǐng)求(調(diào)用poll()方法)中能從 Kafka 中拉取的最小數(shù)據(jù)量,默認(rèn)值為 1B。如果返回的數(shù)據(jù)量小于這個(gè)參數(shù)所設(shè)置的值,那么它就需要進(jìn)行等待,直到數(shù)據(jù)量滿足這個(gè)參數(shù)的配置大小
2. fetch.max.bytes
該參數(shù)用來配置 Consumer 在一次拉取請(qǐng)求中能從 Kafka 中拉取的最大數(shù)據(jù)量,默認(rèn)為 52428800 B(50M)
3. fetch.max.wait.ms
該參數(shù)用來指定 Kafka 的等待時(shí)間,默認(rèn)值為 500 ms
4. max.partition.fetch.bytes
該參數(shù)從來配置從每個(gè)分區(qū)里返回給 Consumer 的最大數(shù)據(jù)量,默認(rèn)值為 1048576 B(1MB)
5. max.poll.records
該參數(shù)用來配置 Consumer 再一次拉取請(qǐng)求中拉取的最大消息數(shù),默認(rèn)值為 500 條
6. request.timeout.ms
該參數(shù)用來配置 Consumer 等待請(qǐng)求響應(yīng)的最長時(shí)間,默認(rèn)值為 30000 ms
Kafka 之 主題管理
在前面的生產(chǎn)者端和消費(fèi)者端中我們都已經(jīng)見到了「主題」的概念,「主題」是 Kafka 中的核心。
主題作為消息的歸類,可以再細(xì)分為一個(gè)或多個(gè)分區(qū),分區(qū)也可以看作對(duì)消息的二次歸類。分區(qū)的劃分不僅為 Kafka 提供了可伸縮性、水平擴(kuò)展的功能,還通過多副本機(jī)制來為 Kafka 提供數(shù)據(jù)冗余以提高數(shù)據(jù)可靠性。
1. 創(chuàng)建主題
在 broker 端有個(gè)配置參數(shù)為 auto.create.topics.enable (默認(rèn)值為 true),當(dāng)該參數(shù)為 「true」 的時(shí)候,生產(chǎn)者想一個(gè)尚未創(chuàng)建的主題發(fā)送消息時(shí),會(huì)自動(dòng)創(chuàng)建一個(gè)分區(qū)數(shù)為num.partitions(默認(rèn)值為1),副本因子為 default.replication.factor(默認(rèn)值為1)的主題。
「使用腳本的方式創(chuàng)建」:
- bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic kafka-demo --partitions 4 --replication-factor 2
「使用 TopicCommand 創(chuàng)建主題」:
導(dǎo)出 Maven 依賴:
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_2.11</artifactId>
- <version>2.0.0</version>
- </dependency>
- public static void createTopic(String topicName) {
- String[] options = new String[]{
- "--zookeeper", "localhost:2181/kafka",
- "--create",
- "--replication-factor", "2",
- "--partitions", "4",
- "--topic", topicName
- };
- kafka.admin.TopicCommand.main(options);
- }
上述示例中,創(chuàng)建了一個(gè)分區(qū)數(shù)為 4,副本因子為 2 的主題
2. 查看主題
- -list:
通過list指令可以查看當(dāng)前所有可用的主題:
- bin/kafka-topics.sh --zookeeper localhost:2181/kafka -list
- describe
通過describe指令可以查看單個(gè)主題信息,如果不適用 --topic 指定主題,則會(huì)展示出所有主題的詳細(xì)信息。--topic還支持指定多個(gè)主題:
- bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic kafka-demo1,kafka-demo2
3.修改主題
當(dāng)一個(gè)主題被創(chuàng)建之后,我們可以對(duì)其做一定的修改,比如修改分區(qū)個(gè)數(shù)、修改配置等,借助于alter指令來實(shí)現(xiàn):
- bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic kafka- demo --partitions 3
修改分區(qū)的時(shí)候我們需要注意的是:
當(dāng)主題 kafka-demo 的分區(qū)數(shù)為 1 時(shí),不管消息的 key 為何值,消息都會(huì)發(fā)往這一個(gè)分區(qū),當(dāng)分區(qū)數(shù)增加到 3 時(shí),就會(huì)根據(jù)消息的 key 來計(jì)算分區(qū)號(hào),原本發(fā)往分區(qū) 0 的消息現(xiàn)在就有可能發(fā)往分區(qū) 1 或分區(qū) 2。因此建議一開始就要設(shè)置好分區(qū)數(shù)量。
目前 Kafka 只支持增加分區(qū)數(shù)而不支持減少分區(qū)數(shù),當(dāng)我們要把主題 kafka-demo 的分區(qū)數(shù)修改為 1 時(shí),就會(huì)報(bào)出 InvalidPartitionException 異常。
4. 刪除主題
如果確定不再使用一個(gè)主題,那么最好的方式就是將其刪除,這樣可以釋放一些資源,比如磁盤、文件句柄等。這個(gè)時(shí)候我們就可以借助 delete 指令來刪除主題:
- bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic kafka-demo
需要注意的是 我們必須將broker中的delete.topic.enable參數(shù)配置為 true 才能夠刪除主題,這個(gè)參數(shù)的默認(rèn)值就是true,如果配置為 false,那么刪除主題的操作將會(huì)被忽略。
如果要?jiǎng)h除的主題是 Kafka 的內(nèi)部主題,那么刪除時(shí)就會(huì)報(bào)錯(cuò)。例如:__consumer_offsets和__transaction_state
常見參數(shù)
參數(shù)名稱 | 釋義 |
---|---|
alter | 用于修改主題,包括分區(qū)數(shù)以及主題的配置 |
config<鍵值對(duì)> | 創(chuàng)建或修改主題,用于設(shè)置主題級(jí)別的參數(shù) |
create | 創(chuàng)建主題 |
delete | 刪除主題 |
delete-config<配置名稱> | 刪除主題級(jí)別被覆蓋的配置 |
describe | 查看主題的詳細(xì)信息 |
disable-rack-aware | 創(chuàng)建主題是不考慮機(jī)架信息 |
help | 打印幫助信息 |
if-exists | 修改或刪除主題時(shí)使用,只有當(dāng)主題存在時(shí)才會(huì)執(zhí)行操作 |
if-not-exists | 創(chuàng)建主題時(shí)使用,只有主題不存在時(shí)才會(huì)執(zhí)行動(dòng)作 |
list | 列出所有可用的主題 |
partitions<分區(qū)數(shù)> | 創(chuàng)建主題或增加分區(qū)時(shí)指定分區(qū)數(shù) |
replica-assignment<分配方案> | 手工指定分區(qū)副本分配方案 |
replication-factor<副本數(shù)> | 創(chuàng)建主題時(shí)指定副本因子 |
topic<主題名稱> | 指定主題名稱 |
topics-with-overrides | 使用describe查看主題信息時(shí),只展示包含覆蓋配置的主題 |
指定連接的 ZooKeeper 地址信息 |
上面大致就是 Kafka 的入門內(nèi)容啦,今天的知識(shí)就介紹到這里啦,內(nèi)容雖然不是很深入,但是字?jǐn)?shù)也不少,能完整看完的小伙伴,小菜給你點(diǎn)個(gè)贊哦!