自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

帶你入門Kafka,你知道的越多不知道的也越多!

開源 Kafka
目前 Kafka 已經(jīng)定位為一個(gè)分布式流式處理平臺(tái),它以高吞吐、可持久化、可水平擴(kuò)展、支持流數(shù)據(jù)處理等多種特性而被廣泛使用。

[[340900]]

本文轉(zhuǎn)載自微信公眾號(hào)「小菜良記」,作者蔡不菜丶。轉(zhuǎn)載本文請(qǐng)聯(lián)系小菜良記公眾號(hào)。  

初始Kafka

1、介紹

Kafka 起初是由 Linkedin 公司采用 Scala 語言開發(fā)的一個(gè)多分區(qū)、多副本且基于 ZooKeeper協(xié)調(diào)的分布式消息系統(tǒng),現(xiàn)己被捐獻(xiàn)給 Apache 基金會(huì) 。目前 Kafka 已經(jīng)定位為一個(gè)分布式流式處理平臺(tái),它以高吞吐、可持久化、可水平擴(kuò)展、支持流數(shù)據(jù)處理等多種特性而被廣泛使用。

2、使用場景

消息系統(tǒng):Kafka 和傳統(tǒng)的消息系統(tǒng)(消息中間件)都具備系統(tǒng)解耦、冗余存儲(chǔ)、流量削峰、緩沖、異步通信、擴(kuò)展性、可恢復(fù)性等功能。與此同時(shí),Kafka還提供了大多數(shù)消息系統(tǒng)難以實(shí)現(xiàn)的消息順序性保障以及回溯消費(fèi)的功能。

存儲(chǔ)系統(tǒng):Kafka把消息持久化到磁盤,相比于其他基于內(nèi)存存儲(chǔ)的系統(tǒng)而言,有效地降低了數(shù)據(jù)丟失的風(fēng)險(xiǎn)。也正是得益于 Kafka 的消息持久化功能和多副本機(jī)制,我們可以把 Kafka 作為長期的數(shù)據(jù)存儲(chǔ)系統(tǒng)來使用,只需要把對(duì)應(yīng)的數(shù)據(jù)保留策略設(shè)置為 “永久” 或啟用主題的日志壓縮功能即可。

流式處理平臺(tái):Kafka 不僅為每個(gè)流行的流式處理框架里提供了可靠的數(shù)據(jù)來源,還提供了一個(gè)完整的流式處理類庫,比如窗口、連接、交換和聚合等各類操作。

3、基本概念

Kafka體系架構(gòu)包括若干 「Producer」,「Broker」,「Consumer」以及一個(gè)ZooKeeper集群。

  • ZooKeeper:是 Kafka 用來負(fù)責(zé)集群元數(shù)據(jù)的管理、控制器的選舉等操作的。
  • Producer:生產(chǎn)者,發(fā)送消息的一方。負(fù)責(zé)創(chuàng)建消息,然后將其投遞到 Kafka 中。
  • Consumer:消費(fèi)者,接收消息的一方。連接到 Kafka 后接收消息,并進(jìn)行相應(yīng)的業(yè)務(wù)邏輯處理。
  • Broker:服務(wù)代理節(jié)點(diǎn)。對(duì)于 Kafka 而言,Broker 可以簡單地看作一個(gè)獨(dú)立的 Kafka 服務(wù)節(jié)點(diǎn)或 Kafka 服務(wù)實(shí)例。大多數(shù)情況下也可以將 Broker 看作一臺(tái) Kafka 服務(wù)器,前提是這臺(tái)服務(wù)器上只部署了一個(gè) Kafka 實(shí)例。一個(gè)或多個(gè)Broker 組成了一個(gè) Kafka 集群。

整體 Kafka 體系大概是由上面幾部分構(gòu)成。除此之外,還有兩個(gè)特別重要的概念:主題(Topic)和分區(qū)(Partition)

  • 主題:Kafka 中的消息以主題為單位進(jìn)行歸類,生產(chǎn)者負(fù)責(zé)將消息發(fā)送到特定的主題(發(fā)送到 Kafka 集群中的每一條消息都要指定一個(gè)主題),而消費(fèi)者負(fù)責(zé)訂閱主題并進(jìn)行消費(fèi)。
  • 分區(qū):主題是一個(gè)邏輯上的概念。還可以細(xì)分為多個(gè)分區(qū),一個(gè)分區(qū)只屬于單個(gè)主題,很多時(shí)候也會(huì)把分區(qū)稱為主題分區(qū)(Topic-Partition)。同一主題下的不同分區(qū)包含的消息是不同的,分區(qū)在存儲(chǔ)層面可以看作一個(gè)可追加的「日志文件」,消息在被追加到分區(qū)日志文件的時(shí)候都會(huì)分配一個(gè)特定的偏移量(offset)。offset 是消息在分區(qū)中的唯一標(biāo)識(shí),Kafka 通過它來保證消息在分區(qū)內(nèi)的順序性,不過offset并不跨越分區(qū),也就是說,Kafka 保證的是分區(qū)有序而不是主題有序。

Kafka 為分區(qū)引入了多副本(Replica) 機(jī)制,通過增加副本數(shù)量可以提升容災(zāi)能力。

同一分區(qū)的不同副本中保存的是相同的消息(在同一時(shí)刻,副本之間并非完全一樣),副本之間是“ 一主多從”的關(guān)系,其中 leader 副本負(fù)責(zé)處理讀寫請(qǐng)求 ,follower 副本只負(fù)責(zé)與 leader 副本的消息同步。副本處于不同的 broker 中 ,當(dāng) leader 副本出現(xiàn)故障時(shí),從 follower 副本中重新選舉新的 leader 副本對(duì)外提供服務(wù)。

「Kafka 通過多副本機(jī)制實(shí)現(xiàn)了故障的自動(dòng)轉(zhuǎn)移,當(dāng) Kafka 集群中某個(gè) broker 失效時(shí)仍然能保證服務(wù)可用 ?!?/p>

在我們繼續(xù)了解 Kafka 之前,我們還需要明白幾個(gè)關(guān)鍵詞:

  • AR(Assigned Replicas):分區(qū)中所有副本統(tǒng)稱為 AR
  • ISR(In-Sync Replicas):所有與 leader 副本保持一定程度同步的副本(包括 leader 副本在內(nèi))組成 ISR。ISR 集合是 AR 集合中的一個(gè)子集 。消息會(huì)先發(fā)送到 leader 副本,然后 follower 副本才能從 leader 副本中拉取消息進(jìn)行同步,同步期間內(nèi)follower 副本相對(duì)于 leader 副本而言會(huì)有一定程度的滯后 。
  • OSR(Out-of-Sync Replicas):與 leader 副本同步滯后過多的副本(不包括 leader 副本)組成 OSR

由以上關(guān)系我們可以得出一個(gè)公式:AR=ISR+OSR

  • HW(High Watermark):俗稱高水位,是用來標(biāo)識(shí)一個(gè)特定的消息偏移量(offset),消費(fèi)者只能拉取到這個(gè) offset 之前的消息
  • LEO(LogStartOffset):下一條待寫入消息的 offset

相信很多小伙伴看到這里有點(diǎn)不耐煩了,這 Kafka 怎么這么難,還能不能好好學(xué)習(xí)了

莫急莫急,理論知識(shí)咱們還是要先過一遍,這可不是勸退的開始,這是你成長的開始!下面小菜力求用最簡樸的語句帶你入最深的坑!

Kafka 之 生產(chǎn)大隊(duì)

眾所周知,Kafka 說高尚點(diǎn)是一個(gè)分布式消息隊(duì)列,簡單來說不就是一個(gè)消息隊(duì)列。消息隊(duì)列簡單來說不就是推數(shù)據(jù),拿數(shù)據(jù)的嘛。沒錯(cuò),高端的知識(shí)往往需要簡單的理解。

那么數(shù)據(jù)從哪來,數(shù)據(jù)從生產(chǎn)隊(duì)來!從編程的角度而言,生產(chǎn)大隊(duì)里面有一群生產(chǎn)者(當(dāng)然也可以只有一個(gè)),生產(chǎn)者就是負(fù)責(zé)向 Kafka 發(fā)送消息的應(yīng)用程序。

客戶端開發(fā)

生產(chǎn)過程大致得具備以下幾個(gè)步驟方能生產(chǎn):

  • 配置生產(chǎn)者客戶端參數(shù)以及創(chuàng)建響應(yīng)的生產(chǎn)者實(shí)例
  • 構(gòu)建待發(fā)送的消息
  • 發(fā)送消息
  • 關(guān)閉生產(chǎn)者實(shí)例

「四大步驟一梭子解決生產(chǎn)問題」

上面的代碼中可以看到我們往 Properties 文件中 put 進(jìn)了四個(gè)參數(shù),分別為:

  • bootstrap.servers:改參數(shù)用來指定生產(chǎn)者客戶端連接 Kafka 集群所需的 broker 地址。格式為(host1:port1,host2:port2),可以設(shè)置一個(gè)或多個(gè)地址,中間以逗號(hào)隔開,默認(rèn)值為 “ ”
  • key.serializer 和 value.serializer:分別用來指定 key 和 value 序列化操作的序列化器,這兩個(gè)參數(shù)無默認(rèn)值,需要填寫序列化器的全限定名
  • client.id:用來設(shè)定 KafkaProducer 對(duì)應(yīng)的客戶端 id,默認(rèn)值為 “ ”。如果客戶端不設(shè)置,則KafkaProducer 會(huì)自動(dòng)生成一個(gè)非空字符串,內(nèi)容形式為 “producer-1”,“producer-2”,即字符串 “producer-” 和數(shù)字的拼接

其中ProducerRecord定義如下:

  • topic和partition :分別代表消息要發(fā)往的主題和分區(qū)號(hào);
  • headers:消息的頭部,不需要時(shí)可以不設(shè)置
  • key:用來指定消息的鍵,它不經(jīng)是消息的附加消息,還可以用來計(jì)算分區(qū)號(hào)進(jìn)而可以讓消息發(fā)往特定的分區(qū)。
  • value:消息體,一般不為空,如果為空則表示特定的消息 -- 「墓碑消息」
  • timestamp:消息的時(shí)間戳,它有 CreateTime 和 LogAppendTime 兩種類型,前者表示消息創(chuàng)建的時(shí)間,后者表示消息追加到日志文件的時(shí)間

上面的操作就是創(chuàng)建生產(chǎn)者實(shí)例和構(gòu)建消息,發(fā)送消息主要有三種模式:

  • 發(fā)后即忘(fire-and-forget)
  • 同步(sync)
  • 異步(async)

而我們上面使用的發(fā)送方式就是發(fā)后即忘,它只管往 Kafka 中發(fā)送消息而并不關(guān)心消息是否正確到達(dá),大多數(shù)情況下,這種發(fā)送方式是沒有什么問題的,不過在有些時(shí)候(發(fā)生不可重試異常)會(huì)造成消息丟失?!副M管這種發(fā)送方式性能最高,但是可靠性也最差?!?/p>

  1. public Future<RecordMetadata> send(ProducerRecord<K,V> record) {} 

從send方法來看,是返回一個(gè)Future對(duì)象

  1. Future res = producer.send(record); 

這說明 send()方法本身就是異步的,send()方法返回的Future對(duì)象可以使調(diào)用方稍后獲得發(fā)送的結(jié)果。如果我們想實(shí)現(xiàn)同步的效果,可以直接調(diào)用Future的get()方法實(shí)現(xiàn)。

  1. try { 
  2.     producer.send(record).get(); 
  3. } catch (Exception e) { 
  4.     e.printStackTrace(); 

通過get()方法來阻塞等待 Kafka 的響應(yīng),直到消息發(fā)送成功,或者發(fā)生異常

生產(chǎn)也能異步?

在 Kafka 中 send()方法有另外一個(gè)重載方式:

  1. public Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback) {} 
  1. producer.send(record, new Callback() { 
  2.     @Override 
  3.     public void onCompletion(RecordMetadata recordMetadata, Exception e) { 
  4.         if (Objects.isNull(e)) { 
  5.             System.out.println("主題:" + recordMetadata.topic()); 
  6.         } else { 
  7.             System.out.println(e.getMessage()); 
  8.         } 
  9.     } 
  10. }); 

使用 Callback 的方式非常簡潔明了,Kafka 有響應(yīng)時(shí)就會(huì)回調(diào),要么發(fā)送成功,要么拋出異常。

onCompletion()方法中兩個(gè)參數(shù)是互斥的,如果發(fā)送成功則RecordMetadata不為空,Exception為空,如果發(fā)送失敗則相反。

生產(chǎn)也有困難?

在 KafkaProducer 中 一般會(huì)發(fā)生兩種類型的異常:

  • 可重試異常

NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、

NotEnoughReplicasException、NotCoordinatorException

  • 不可重試異常

RecordTooLargeException等

對(duì)可重試異常我們可以配置 「retries」參數(shù),如果在規(guī)定的重試次數(shù)內(nèi)自行恢復(fù),就不會(huì)拋出異常,「retries」參數(shù)的默認(rèn)值為 0 ,配置方式如下:

  1. properties.put(ProducerConfig.RETRIES_CONFIG, 10); 

上述例子中含義為,重試次數(shù)為 10 次,如果超過 10 次還沒恢復(fù)則會(huì)拋出異常。

不可重試異常如RecordTooLargeException,暗示了如果發(fā)送消息太大,則不會(huì)進(jìn)行重試,直接拋出異常。

序列化來助力

生產(chǎn)者需要用序列化器(Serializer)把對(duì)象轉(zhuǎn)換成字節(jié)數(shù)組才能通過網(wǎng)絡(luò)發(fā)送給 Kafka,對(duì)應(yīng)的消費(fèi)者也需要用反序列化器(Deserializer)把 Kafka 中收到的字節(jié)數(shù)組轉(zhuǎn)換成相應(yīng)的對(duì)象。

在上面代碼使用到的StringSerializer實(shí)現(xiàn)了Serializer接口

其中 configure()方法用來配置當(dāng)前類,serizlize()方法用來執(zhí)行序列化操作

「生產(chǎn)者使用的序列化器和消費(fèi)者使用的反序列化器是需要一一對(duì)應(yīng)的」

當(dāng)然除了可以用 Kafka 提供的序列化器,我們也可以自定義序列化器:

「Student.class」:

  1. @Data 
  2. public class Student { 
  3.  
  4.     private String name
  5.  
  6.     private String remark; 

「MySerializer」:

「使用」:

  1. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, MySerializer.class.getName()); 

只需要在 Properties 中 put 進(jìn)我們自己的序列化器即可,沒想到也挺簡單的嘛!

分區(qū)器又是啥?

消息在通過 send() 方法發(fā)送到 broker 的過程中,可能需要經(jīng)過 「攔截器(Interceptor)」,「序列化器(Serializer)」 和「分區(qū)器(Partitioner)」

其中 「攔截器」 不是必需的,「序列化器」 是必須的,經(jīng)過序列化器后就需要確定它發(fā)往的分區(qū),如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要「分區(qū)器」的作用,因?yàn)閜artition代表的就是所要發(fā)往的分區(qū)號(hào)。

  1. package org.apache.kafka.clients.producer; 
  2.  
  3. public interface Partitioner extends Configurable, Closeable { 
  4.     int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); 
  5.     void close(); 

上述是 kafka 中的Partitioner 接口,可以看到里面有個(gè)方法partition()是用來計(jì)算分區(qū)號(hào),返回 int 類型的值,其中六個(gè)參數(shù)分別代表:

  1. topic:主題
  2. key:鍵
  3. keyBytes:序列化后的鍵
  4. value:值
  5. valueBytes:序列化后的值
  6. cluster:集群的元數(shù)據(jù)信息

在partition()方法中定義了主要的分區(qū)分配邏輯,如果 key 不為空時(shí),那么默認(rèn)的分區(qū)器會(huì)對(duì) key 進(jìn)行haxi(采用MurmurHash2算法),最終根據(jù)得到的哈希值來計(jì)算分區(qū)號(hào),擁有相同 key 的消息會(huì)被寫入同一個(gè)分區(qū),如果 key 為空,那么消息將會(huì)以輪詢的方式發(fā)往主題內(nèi)的各個(gè)可用分區(qū)。

「如果 key 不為 null,那么計(jì)算得到的分區(qū)號(hào)會(huì)是所有分區(qū)中的任意一個(gè),如果 key 為 空 ,那么計(jì)算得到的分區(qū)號(hào)僅為可用分區(qū)中的任意一個(gè)」

當(dāng)然,分區(qū)器也是可以自定義的,操作如下:

「MyPartitioner.class」:

「使用」:

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());

自定義分區(qū)器使用起來也簡單,只需要實(shí)現(xiàn) Partitioner 接口

攔截器來了?

做 web 開發(fā)的同學(xué)相信對(duì)攔截器一點(diǎn)也不陌生,在 Kafka 中也具有攔截器的功能,攔截器又分為「生產(chǎn)者攔截器」和「消費(fèi)者攔截器」

生產(chǎn)者攔截器可以在消息發(fā)送前做一些準(zhǔn)備工作,比如按照某個(gè)規(guī)則過濾不符合要求的消息,修改消息的內(nèi)容等,也可以用來在發(fā)送回調(diào)邏輯前做一些定制化的需求。

那么有需要就會(huì)有自定義,在自定義攔截器的時(shí)候我們只需要實(shí)現(xiàn)ProducerInterceptor接口即可:

  1. package org.apache.kafka.clients.producer; 
  2.  
  3. public interface ProducerInterceptor <K, V> extends Configurable { 
  4.     ProducerRecord<K,V> onSend(ProducerRecord<K,V> producerRecord); 
  5.  
  6.     void onAcknowledgement(RecordMetadata recordMetadata, Exception e); 
  7.  
  8.     void close(); 

其中onSend()方法可以對(duì)消息進(jìn)行相應(yīng)的定制化操作,onAcknowledgement()方法是在消息發(fā)送失敗或者消息被應(yīng)答(Acknowledgement)之前調(diào)用,優(yōu)先于用戶設(shè)定的 Callback。

自定義攔截器如下:MyProducerInterceptor.class:

在onSend()方法中我們修改了將要發(fā)送的消息,在onAcknowledgement()方法中我們統(tǒng)計(jì)了發(fā)送成功數(shù)和失敗數(shù),接著在close()方法中,我們將成功數(shù)和失敗數(shù)進(jìn)行了輸出

同樣的使用方法:

  1. properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName()); 

有一個(gè)攔截器自然就會(huì)形成一個(gè)攔截器鏈,我們可以自定義多個(gè)攔截器,然后在 Properties 文件中聲明:

  1. properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor1.class.getName() + "," + MyProducerInterceptor2.class.getName()); 

「這樣子下一個(gè)攔截器就會(huì)依賴于前一個(gè)攔截器的輸出」

重要參數(shù)

除了上述已經(jīng)出現(xiàn)的參數(shù),還有以下一些重要的參數(shù):

1. ack

這個(gè)參數(shù)用來指定分區(qū)中必須要有多少個(gè)副本收到這條消息,之后生產(chǎn)者才會(huì)認(rèn)為這條

  1. properties.put(ProducerConfig.ACKSCONFIG,”0”); //注意是字符串類型 

消息是寫入成功的。ack 中有三種類型(String)的值

  1. acks = 1:默認(rèn)值為1,生產(chǎn)者發(fā)送消息之后,只要分區(qū)的leader 副本成功寫入消息,那么它就會(huì)收到來自服務(wù)端的成功響應(yīng) 。如果消息寫入 leader 副本并返回成功響應(yīng)給生產(chǎn)者,且在被其他 fo llower 副本拉取之前 leader 副本崩潰,那么此時(shí)消息還是會(huì)丟失。
  2. acks = 0:生產(chǎn)者發(fā)送消 息之后不需要等待任何服務(wù)端的響應(yīng)。如果在消息從發(fā)送到寫入 Kafka 的過程中出現(xiàn)某些異常,導(dǎo)致 Kafka 并沒有收到這條消息,那么生產(chǎn)者也無從得知,消息也就丟失了。在其他配置環(huán)境相同的情況下,acks 設(shè)置為 0 可以達(dá)到最大的吞吐量。
  3. acks = -1或 acks = all:生產(chǎn)者在消 息發(fā)送之后,需要等待 ISR 中的所有副本都成功 寫入消息之后才能夠收到來自服務(wù)端的成功響應(yīng)。在其他配置環(huán)境相同的情況下,acks 設(shè)置為 1或(all)可以達(dá)到最強(qiáng)的可靠性。

設(shè)置:

  1. properties.put(ProducerConfig.ACKSCONFIG,”0”); //注意是字符串類型 

2. max.request.size

用來限制生產(chǎn)者客戶端能發(fā)送的消息的最大值,默認(rèn)值為1048576B ,即 1MB 。

3. retries

用來配置生產(chǎn)者重試的次數(shù),默認(rèn)值為 0,即在發(fā)生異常的時(shí)候不進(jìn)行任何重試動(dòng)作。

4. retry.backoff.ms

用來設(shè)定兩次重試之間的時(shí)間間隔,避免無效的頻繁重試,默認(rèn)值為 100

5. connections.max.idle.ms

這個(gè)參數(shù)用來指定在多久之后關(guān)閉限制的連接,默認(rèn)值是 540000( ms ),即 9 分鐘。

6.buffer.memory

用來設(shè)置緩存消息的緩沖區(qū)大小

7.batch.size

用來設(shè)定可以復(fù)用內(nèi)存區(qū)域的大小

Kafka 之 消費(fèi)群體

有生產(chǎn)就有消費(fèi),你說是吧!與生產(chǎn)者對(duì)應(yīng)的是消費(fèi)者,應(yīng)用程序可以通過 KafkaConsumer 來訂閱主題,并從訂閱的主題中拉取消息

個(gè)體和群體?

每個(gè)消費(fèi)者都有一個(gè)對(duì)應(yīng)的消費(fèi)組。消費(fèi)者( Consumer )負(fù)責(zé)訂閱 Kafka 中的主題( Topic ),并且從訂閱的主題上拉取消息。當(dāng)消息發(fā)布到主題后,只會(huì)被投遞給訂閱它的每個(gè)消費(fèi)組中的一個(gè)消費(fèi)者 。

當(dāng)消費(fèi)組中只有一個(gè)消費(fèi)者的時(shí)候,是這樣的情況:

當(dāng)消費(fèi)組中有兩個(gè)消費(fèi)者的時(shí)候,是這樣的情況:

從上面的分配情況可以看出,隨著消費(fèi)者的增加,可以讓整體的消費(fèi)能力具有橫向伸縮性。我們可以增加(或減少)消費(fèi)者的個(gè)數(shù)來提高(或降低)整體的消費(fèi)能力。當(dāng)時(shí)在分區(qū)數(shù)固定的情況下,盲目地增加消費(fèi)者并不會(huì)讓消費(fèi)能力一直得到提升,如果消費(fèi)者過多,就會(huì)出現(xiàn)消費(fèi)者個(gè)數(shù)大于分區(qū)個(gè)數(shù)的情況,就會(huì)有消費(fèi)者分配不到任何分區(qū)。

以上分配邏輯都是基于默認(rèn)的分區(qū)分配策略進(jìn)行分析的,可以通過消費(fèi)者客戶端配置partition.assignment.strategy來設(shè)置消費(fèi)者與訂閱主題之間的分區(qū)分配策略。

投遞模式

Kafka 中有兩種消息投遞模式:

點(diǎn)對(duì)點(diǎn)模式(Point-to-Point)

基于隊(duì)列的,消息生產(chǎn)者發(fā)送消息到隊(duì)列,消息消費(fèi)者從隊(duì)列中接收消息

發(fā)布/訂閱模式(Pub/Sub)

基于主題的,主題可以認(rèn)為是消息傳遞的中介,消息發(fā)布者將消息發(fā)布到某個(gè)主題,而消息訂閱者從主題中訂閱消息。主題使得消息的訂閱者和發(fā)布者互相保持獨(dú)立,不需要進(jìn)行接觸即可保證消息的傳遞,發(fā)布/訂閱模式在消息的一對(duì)多廣播時(shí)采用。

客戶端開發(fā)

消費(fèi)過程大致得具備以下幾個(gè)步驟方能消費(fèi):

  • 配置消費(fèi)者客戶端參數(shù)以及創(chuàng)建相應(yīng)的消費(fèi)者實(shí)例
  • 訂閱主題
  • 拉取消息并消費(fèi)
  • 提交消費(fèi)位移
  • 關(guān)閉消費(fèi)者實(shí)例

可以看出在配置消費(fèi)者參數(shù)的時(shí)候,我們看到了幾個(gè)熟悉的參數(shù):

  • bootstrap.servers:為了防止書寫出錯(cuò),可以用ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG表示,用來指定連接 Kafka 集群所需的 broker 地址清單,可以設(shè)置一個(gè)或多個(gè)地址,中間用逗號(hào)隔開,默認(rèn)值為 " "
  • group.id:為了防止書寫出錯(cuò),可以用ConsumerConfig.GROUP_ID_CONFIG表示,消費(fèi)者所在消費(fèi)組的名稱,默認(rèn)值為 " ",如果設(shè)置為空,則會(huì)拋出異常
  • key.deserializer/value.deserializer:為了防止書寫錯(cuò)誤,可以用ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG和ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG表示,消費(fèi)端所需要執(zhí)行響應(yīng)的反序列化操作,需要和生產(chǎn)端一致

client.id:為了防止書寫錯(cuò)誤,可以用ConsumerConfig.CLIENT_ID_CONFIG表示,用來設(shè)定 KafkaConsumer 對(duì)應(yīng)的客戶端 id,默認(rèn)值為 " "

主題的訂閱

消費(fèi)者消費(fèi)消息,重要的就是訂閱相對(duì)應(yīng)的主題。在上述的例子中我們是通過 consumer.subscribe(Arrays.asList(topic)); 來訂閱主題的,可以看出一個(gè)消費(fèi)者可以訂閱一個(gè)或多個(gè)主題。我們來看下 subscribe() 這個(gè)方法的重載:

  1. public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) { /* compiled code */ } 
  2.  
  3. public void subscribe(Collection<String> topics) { /* compiled code */ } 
  4.  
  5. public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { /* compiled code */ } 
  6.  
  7. public void subscribe(Pattern pattern) { /* compiled code */ } 

如果我們?cè)谟嗛喼黝}的過程中出現(xiàn)了以下情況:

  1. consumer.subscribe(Arrays.asList(topic1)); 
  2. consumer.subscribe(Arrays.asList(topic2)); 

那么最終情況只會(huì)訂閱到 topic2,而不是topic1,更不是topic1和topic2的結(jié)合。

subscribe()這個(gè)方法重載后也支持正則表達(dá)式:

  1. consumer.subscribe(Pattern.compile(”topic.*”)); 

這樣配置后,如果有人創(chuàng)建了新的主題,并且主題的名字與正則表達(dá)式相匹配,那么這個(gè)消費(fèi)者就可以消費(fèi)到新添加的主題中的消息。

subscribe()這個(gè)方法除了傳入主題和正則作為參數(shù),還有兩個(gè)方法支持了 ConsumerRebalanceListener 參數(shù)的傳入,這個(gè)是用來設(shè)置相應(yīng)的再均衡監(jiān)聽器。

消費(fèi)者除了可以通過subscribe()方法來訂閱主題之外,還可以通過assign()方法來實(shí)現(xiàn)直接訂閱某些主題的特定分區(qū)。

  1. public void assign(Collection<TopicPartition> partitions) 

其中TopicPartition 對(duì)象定義如下:

構(gòu)造函數(shù)中需要傳入「訂閱的主題」和「分區(qū)編號(hào)」,使用如下:

  1. consumer.assign(Arrays.asList(new TopicPartition(”kafka-demo”, 0))) ; 

這樣子我們就可以訂閱 kafka-demo中的 0 號(hào)分區(qū)了。

如果我們事先并不知道主題中有多少個(gè)分區(qū)怎么辦?KafkaConsumer 中的 partitionsFor()方法可以用來查詢指定主題的元數(shù)據(jù)信息,partitionsFor()方法定義如下:

  1. public List <PartitionInfo> partitionsFor(String topic); 

其中 PartitionInfo對(duì)象定義如下:

  1. public class Partitioninfo { 
  2.     private final String topic;             //主題名稱 
  3.     private final int partition;            //分區(qū)編號(hào) 
  4.     private final Node leader;              //分區(qū)的leader副本所在的位置 
  5.     private final Node[] replicas;          //分區(qū)的AR集合 
  6.     private final Node[] inSyncReplicas;    //分區(qū)的ISR集合 
  7.     private final Node[] offlineReplicas;   //分區(qū)的OSR集合 

訂閱不是惡意捆綁的,能訂閱就能夠取消訂閱,我們可以使用 KafkaConsumer 中的 unsubscribe()方法采取消主題的訂閱。這個(gè)方法既可以取消通過subscribe(Collection)方式實(shí)現(xiàn)的訂閱,也可以取消通過subscribe(Pattem)方式實(shí)現(xiàn)的訂閱,還可以取消通過assign(Collection)方式實(shí)現(xiàn)的訂閱 。

  1. consumer.unsubscribe() ; 

如果將 subscribe(Collection)或 assign(Collection) 中 的集合參數(shù)設(shè)置為空集合 ,那么 作用等同于unsubscribe()方法 ,下面示例中 的三行代碼的效果相同:

  1. consumer.unsubscribe(); 
  2. consumer.subscribe(new ArrayList<String>()); 
  3. consumer.assign(new ArrayList<TopicPartition>()); 

消費(fèi)模式

消息的消費(fèi)模式一般有兩種:「推模式」和「拉模式」。而 Kafka 中的消費(fèi)是基于「拉模式」

推模式:服務(wù)端主動(dòng)將消息推送給消費(fèi)者

拉模式:消費(fèi)者主動(dòng)向服務(wù)端發(fā)起拉取請(qǐng)求

Kafka 的消息消費(fèi)是一個(gè)不斷輪詢的過程,消費(fèi)者所要做的就是重復(fù)地調(diào)用poll()方法,如果某些分區(qū)中沒有可供消費(fèi)的消息,那么此分區(qū)對(duì)應(yīng)的消息拉取的結(jié)果就為空;如果訂閱的所有分區(qū)中都沒有可供消費(fèi)的消息,那么 poll()方法返回為空的消息集合。

  1. public ConsumerRecords<K, V> poll(final Duration timeout) 

在poll()方法中可以傳入一個(gè)超時(shí)時(shí)間參數(shù) timeout,用來控制 poll()方法的阻塞時(shí)間,在消費(fèi)者的緩沖區(qū)里沒有可用數(shù)據(jù)時(shí)會(huì)發(fā)生阻塞。

通過poll()方法拉取到的消息是一個(gè)ConsumerRecord對(duì)象,定義如下:

我們?cè)谙M(fèi)消息的時(shí)候可以直接對(duì) ConsumerRecord 中感興趣的字段進(jìn)行具體的業(yè)務(wù)邏輯處理。

消費(fèi)者攔截器

我們上面已經(jīng)講到了生產(chǎn)者攔截器的使用,當(dāng)然,消費(fèi)者也有響應(yīng)的攔截器的概念。消費(fèi)者攔截器主要在消費(fèi)到消息或在提交消費(fèi)位移時(shí)進(jìn)行一些定制化的操作。

生產(chǎn)者定義攔截器的方式是實(shí)現(xiàn) ProducerInterceptor 接口,而消費(fèi)者定義攔截器的方式則是實(shí)現(xiàn)ConsumerInterceptor接口,ConsumerInterceptor定義如下:

  1. package org.apache.kafka.clients.consumer; 
  2.  
  3. public interface ConsumerInterceptor <K, V> extends Configurable, AutoCloseable { 
  4.     ConsumerRecords<K,V> onConsume(ConsumerRecords<K,V> consumerRecords); 
  5.  
  6.     void onCommit(Map<TopicPartition,OffsetAndMetadata> map); 
  7.  
  8.     void close(); 
  • onConsume():KafkaConsumer 會(huì)在 poll()方法返回之前調(diào)用攔截器的 onConsume()方法來對(duì)消息進(jìn)行相應(yīng)的定制化操作,比如修改返回的消息內(nèi)容、按照某種規(guī)則過濾消息(可能會(huì)減少poll()方法返回的消息的個(gè)數(shù))。如果onConsume()方法中拋出異常,那么會(huì)被捕獲并記錄到日志中,但是異常不會(huì)再向上傳遞。
  • onCommit():KafkaConsumer 會(huì)在提交完消費(fèi)位移之后調(diào)用攔截器的 onCommit()方法,可以使用這個(gè)方法來記錄跟蹤所提交的位移信息,比如當(dāng)消費(fèi)者使用commitSync的無參方法時(shí),我們不知道提交的消費(fèi)位移的具體細(xì)節(jié),而使用攔截器的 onCommit()方法卻可以做到這一點(diǎn)。

我們?cè)谧远x攔截器后,也是用過相同的方式使用:

  1. properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG ,MyConsumerInterceptor.class.getName()); 

重要參數(shù)

除了上述已經(jīng)出現(xiàn)的參數(shù),還有以下一些重要的參數(shù):

1. fetch.min.bytes

該參數(shù)用來配置 Consumer 在一次拉取請(qǐng)求(調(diào)用poll()方法)中能從 Kafka 中拉取的最小數(shù)據(jù)量,默認(rèn)值為 1B。如果返回的數(shù)據(jù)量小于這個(gè)參數(shù)所設(shè)置的值,那么它就需要進(jìn)行等待,直到數(shù)據(jù)量滿足這個(gè)參數(shù)的配置大小

2. fetch.max.bytes

該參數(shù)用來配置 Consumer 在一次拉取請(qǐng)求中能從 Kafka 中拉取的最大數(shù)據(jù)量,默認(rèn)為 52428800 B(50M)

3. fetch.max.wait.ms

該參數(shù)用來指定 Kafka 的等待時(shí)間,默認(rèn)值為 500 ms

4. max.partition.fetch.bytes

該參數(shù)從來配置從每個(gè)分區(qū)里返回給 Consumer 的最大數(shù)據(jù)量,默認(rèn)值為 1048576 B(1MB)

5. max.poll.records

該參數(shù)用來配置 Consumer 再一次拉取請(qǐng)求中拉取的最大消息數(shù),默認(rèn)值為 500 條

6. request.timeout.ms

該參數(shù)用來配置 Consumer 等待請(qǐng)求響應(yīng)的最長時(shí)間,默認(rèn)值為 30000 ms

Kafka 之 主題管理

在前面的生產(chǎn)者端和消費(fèi)者端中我們都已經(jīng)見到了「主題」的概念,「主題」是 Kafka 中的核心。

主題作為消息的歸類,可以再細(xì)分為一個(gè)或多個(gè)分區(qū),分區(qū)也可以看作對(duì)消息的二次歸類。分區(qū)的劃分不僅為 Kafka 提供了可伸縮性、水平擴(kuò)展的功能,還通過多副本機(jī)制來為 Kafka 提供數(shù)據(jù)冗余以提高數(shù)據(jù)可靠性。

1. 創(chuàng)建主題

在 broker 端有個(gè)配置參數(shù)為 auto.create.topics.enable (默認(rèn)值為 true),當(dāng)該參數(shù)為 「true」 的時(shí)候,生產(chǎn)者想一個(gè)尚未創(chuàng)建的主題發(fā)送消息時(shí),會(huì)自動(dòng)創(chuàng)建一個(gè)分區(qū)數(shù)為num.partitions(默認(rèn)值為1),副本因子為 default.replication.factor(默認(rèn)值為1)的主題。

「使用腳本的方式創(chuàng)建」:

  1. bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic kafka-demo --partitions 4 --replication-factor 2 

「使用 TopicCommand 創(chuàng)建主題」:

導(dǎo)出 Maven 依賴:

  1. <dependency> 
  2.     <groupId>org.apache.kafka</groupId> 
  3.     <artifactId>kafka_2.11</artifactId> 
  4.     <version>2.0.0</version> 
  5. </dependency> 
  1. public static void createTopic(String topicName) { 
  2.     String[] options = new String[]{ 
  3.         "--zookeeper""localhost:2181/kafka"
  4.         "--create"
  5.         "--replication-factor""2"
  6.         "--partitions""4"
  7.         "--topic", topicName 
  8.     }; 
  9.     kafka.admin.TopicCommand.main(options); 

上述示例中,創(chuàng)建了一個(gè)分區(qū)數(shù)為 4,副本因子為 2 的主題

2. 查看主題

  • -list:

通過list指令可以查看當(dāng)前所有可用的主題:

  1. bin/kafka-topics.sh --zookeeper localhost:2181/kafka -list 
  • describe

通過describe指令可以查看單個(gè)主題信息,如果不適用 --topic 指定主題,則會(huì)展示出所有主題的詳細(xì)信息。--topic還支持指定多個(gè)主題:

  1. bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic kafka-demo1,kafka-demo2 

3.修改主題

當(dāng)一個(gè)主題被創(chuàng)建之后,我們可以對(duì)其做一定的修改,比如修改分區(qū)個(gè)數(shù)、修改配置等,借助于alter指令來實(shí)現(xiàn):

  1. bin/kafka-topics.sh --zookeeper localhost:2181/kafka --alter --topic kafka- demo --partitions 3 

修改分區(qū)的時(shí)候我們需要注意的是:

當(dāng)主題 kafka-demo 的分區(qū)數(shù)為 1 時(shí),不管消息的 key 為何值,消息都會(huì)發(fā)往這一個(gè)分區(qū),當(dāng)分區(qū)數(shù)增加到 3 時(shí),就會(huì)根據(jù)消息的 key 來計(jì)算分區(qū)號(hào),原本發(fā)往分區(qū) 0 的消息現(xiàn)在就有可能發(fā)往分區(qū) 1 或分區(qū) 2。因此建議一開始就要設(shè)置好分區(qū)數(shù)量。

目前 Kafka 只支持增加分區(qū)數(shù)而不支持減少分區(qū)數(shù),當(dāng)我們要把主題 kafka-demo 的分區(qū)數(shù)修改為 1 時(shí),就會(huì)報(bào)出 InvalidPartitionException 異常。

4. 刪除主題

如果確定不再使用一個(gè)主題,那么最好的方式就是將其刪除,這樣可以釋放一些資源,比如磁盤、文件句柄等。這個(gè)時(shí)候我們就可以借助 delete 指令來刪除主題:

  1. bin/kafka-topics.sh --zookeeper localhost:2181/kafka --delete --topic kafka-demo 

需要注意的是 我們必須將broker中的delete.topic.enable參數(shù)配置為 true 才能夠刪除主題,這個(gè)參數(shù)的默認(rèn)值就是true,如果配置為 false,那么刪除主題的操作將會(huì)被忽略。

如果要?jiǎng)h除的主題是 Kafka 的內(nèi)部主題,那么刪除時(shí)就會(huì)報(bào)錯(cuò)。例如:__consumer_offsets和__transaction_state

常見參數(shù)

參數(shù)名稱 釋義
alter 用于修改主題,包括分區(qū)數(shù)以及主題的配置
config<鍵值對(duì)> 創(chuàng)建或修改主題,用于設(shè)置主題級(jí)別的參數(shù)
create 創(chuàng)建主題
delete 刪除主題
delete-config<配置名稱> 刪除主題級(jí)別被覆蓋的配置
describe 查看主題的詳細(xì)信息
disable-rack-aware 創(chuàng)建主題是不考慮機(jī)架信息
help 打印幫助信息
if-exists 修改或刪除主題時(shí)使用,只有當(dāng)主題存在時(shí)才會(huì)執(zhí)行操作
if-not-exists 創(chuàng)建主題時(shí)使用,只有主題不存在時(shí)才會(huì)執(zhí)行動(dòng)作
list 列出所有可用的主題
partitions<分區(qū)數(shù)> 創(chuàng)建主題或增加分區(qū)時(shí)指定分區(qū)數(shù)
replica-assignment<分配方案> 手工指定分區(qū)副本分配方案
replication-factor<副本數(shù)> 創(chuàng)建主題時(shí)指定副本因子
topic<主題名稱> 指定主題名稱
topics-with-overrides 使用describe查看主題信息時(shí),只展示包含覆蓋配置的主題
  指定連接的 ZooKeeper 地址信息

上面大致就是 Kafka 的入門內(nèi)容啦,今天的知識(shí)就介紹到這里啦,內(nèi)容雖然不是很深入,但是字?jǐn)?shù)也不少,能完整看完的小伙伴,小菜給你點(diǎn)個(gè)贊哦!

 

責(zé)任編輯:武曉燕 來源: 小菜良記
相關(guān)推薦

2020-06-12 09:20:33

前端Blob字符串

2020-07-28 08:26:34

WebSocket瀏覽器

2011-09-15 17:10:41

2022-10-13 11:48:37

Web共享機(jī)制操作系統(tǒng)

2009-12-10 09:37:43

2021-02-01 23:23:39

FiddlerCharlesWeb

2010-08-23 09:56:09

Java性能監(jiān)控

2020-09-15 08:35:57

TypeScript JavaScript類型

2022-11-04 08:19:18

gRPC框架項(xiàng)目

2021-12-29 11:38:59

JS前端沙箱

2021-12-22 09:08:39

JSON.stringJavaScript字符串

2012-11-23 10:57:44

Shell

2015-06-19 13:54:49

2020-08-11 11:20:49

Linux命令使用技巧

2021-10-17 13:10:56

函數(shù)TypeScript泛型

2012-06-26 15:49:05

2014-03-12 09:23:06

DevOps團(tuán)隊(duì)合作

2017-03-02 14:05:42

AndroidAndroid Stu調(diào)試技巧

2019-11-29 16:49:42

HTML語言開發(fā)

2023-12-21 14:40:09

Python編程語言
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)