自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

成為 Kafka 高手的秘籍:生產(chǎn)者深度實(shí)踐總結(jié)

新聞 系統(tǒng)運(yùn)維 Kafka
kafka 是一款已經(jīng)發(fā)布了近10年的分布式消息隊(duì)列系統(tǒng),是一款非常成熟的產(chǎn)品,在各大公司或者產(chǎn)品中或多或少都有他的身影,特別是大數(shù)據(jù)流處理,log 流處理之類的場景,kafka 更是充當(dāng)著幾乎必不可少的角色。

 作者簡介

張晉尉,騰訊云消息隊(duì)列專項(xiàng)支持團(tuán)隊(duì)成員,kafka,puslar資深開發(fā)者,kafka sdk貢獻(xiàn)者,在流式數(shù)據(jù)處理,消息隊(duì)列方向有多年實(shí)踐經(jīng)驗(yàn)。

Kafka 簡介

kafka 是一款已經(jīng)發(fā)布了近10年的分布式消息隊(duì)列系統(tǒng),是一款非常成熟的產(chǎn)品,在各大公司或者產(chǎn)品中或多或少都有他的身影,特別是大數(shù)據(jù)流處理,log 流處理之類的場景,kafka 更是充當(dāng)著幾乎必不可少的角色。

這款消息隊(duì)列在官方給出的定義中被稱為“分布式流式處理平臺”,其主要目的是在大數(shù)據(jù)流處理中承擔(dān)著存儲記錄流的一個作用,不過到了現(xiàn)在這個年代,越來越多的業(yè)務(wù)架構(gòu)更傾向于將 kafka 當(dāng)作消息隊(duì)列來使用,用來取代比較厚重且性能有限的 RabbitMQ。

kafka 這樣一個系統(tǒng)為了確保其簡潔性和高性能,其實(shí)將很多邏輯細(xì)節(jié)和配置放到了 client 端,所以我們將從客戶端的視角出發(fā),從使用者的角度通過生產(chǎn)者和消費(fèi)者兩個方面來介紹 kafka 在實(shí)踐生產(chǎn)中遇到的一些問題和相應(yīng)的技術(shù)細(xì)節(jié)。本文是系列文章的第一篇,介紹生產(chǎn)者。

標(biāo)準(zhǔn) producer API 簡介

這里我們先介紹下最經(jīng)常使用的生產(chǎn)者 API,相信看本文各位已經(jīng)是 kafka 使用的熟手了,不過為了后續(xù)介紹可能會使用的一些術(shù)語,我們還是先復(fù)習(xí)下 kafka 基礎(chǔ)概念,這里我們只關(guān)注于生產(chǎn)這部分,忽略其他的無關(guān)細(xì)節(jié)。

首先我們畫出生產(chǎn)者和 kafka 交互的一張圖,這張圖用于描述生產(chǎn)者消息數(shù)據(jù)的流向和 kafka server 為了接受消息需要用到什么組件。

圖片如上,現(xiàn)在讓我們分別介紹下圖上所繪內(nèi)容以及相應(yīng)的專業(yè)術(shù)語

  • Client 指的是將會寫入消息的多個不同的客戶端,這里的客戶端是一個抽象化的概念,只要和 kafka server建立了連接,將會寫入消息到 kafka 中,無論是否在同一個服務(wù)器或者一個進(jìn)程中,我們都把它稱為一個 client。
  • Broker 指的是加入到了集群里面的服務(wù)器,這是一個物理層面的機(jī)器節(jié)點(diǎn)。一臺機(jī)器上部署了 kafka,并且加入到了 kafka 集群,那么這臺機(jī)器就是 kafka 集群的一個節(jié)點(diǎn)。一般情況下,一臺機(jī)器只會部署一個 kafka 服務(wù)。
  • Topic 是一個抽象的概念,主要的作用是將用戶處理的消息分為不同的類別,同時每個 topic 可以具有 topic緯度的一些配置,比如消息最大大小之類的,topic 下會創(chuàng)建不同數(shù)量的 partition 去實(shí)際的承載消息,這里值得注意的是 kafka 和 RabbitMQ 不一樣的地方,kafka 每創(chuàng)建一個 topic 都會在 broker 上去創(chuàng)建對應(yīng)數(shù)量的 partition,所以 kafka 的 topic 數(shù)量是有限的,而且盡量不能太多。
  • 而 RabbitMQ 的 topic 是一個路由概念,創(chuàng)建非常大數(shù)量的 topic 并不會實(shí)際創(chuàng)建承載的隊(duì)列,而只是在訂閱分發(fā)的時候執(zhí)行不同的路由策略。這是很多從 RabiitMQ 切換到 kafka 的用戶比較常見的問題。
  • Partiotion 是 topic 的分區(qū),用來實(shí)際的去承載消息,每個 partition 之間是沒有關(guān)聯(lián)的,他們各自有各自的順序和消息內(nèi)容,以及記錄的 offset。每個生產(chǎn)者的消息都會寫入到一個 partition 中去,生產(chǎn)者自己會根據(jù)自己的算法去選擇 partition 去寫入。
  • Replicas 是 partition 的副本,是物理和概念兩個層面的最小單位,它會將自己綁定到 broker上,每個partition 至少都需要有一個 replicas,它是消息實(shí)際寫入的地方。當(dāng) partition 有多個 replicas 的時候,控制器會決定哪一個 replicas 會是 leader。消息始終會被寫入到 leader 中,然后 leader 會同步數(shù)據(jù)到其他為 follower 狀態(tài)的 replicas 中,所以如上圖所示,client 的消息在選中寫入到某個 partition 中后,實(shí)際上,client 會去連接 replicas leader 所在的 broker,然后把消息寫進(jìn)去。

現(xiàn)在我們開始講述下關(guān)于生產(chǎn)者 API 的使用和一些在生產(chǎn)的時候需要注意的配置,這里的生產(chǎn)者 API 指的是 kafka 提供的幾乎無狀態(tài)的 API,非常的輕便,同時也可以提供非常不錯的性能。不過如果使用這個 API 來進(jìn)行生產(chǎn),kafka 只保證最少一次和最多一次語義。

  • 最少一次(at-least-once) 也就是說明消息可能重復(fù),如果消息重復(fù),那么消費(fèi)者需要在消費(fèi)的時候進(jìn)行去重。
  • 最多一次(at-most-once) 用的比較少,一般是大量不重要的數(shù)據(jù)處理的時候,容忍丟失數(shù)據(jù)的情況下,可以提供比較優(yōu)秀的性能,生產(chǎn)端生產(chǎn)消息并投遞后就不再關(guān)注是否成功了,也不會進(jìn)行重試。
  • 恰好一次(exactly-once) 這個語義在當(dāng)前這個生產(chǎn)者api中是不提供的,不過由于大量的流式計(jì)算系統(tǒng)都需要保證 exactly once semantics,而且 kafka 也推出了kafka stream 這樣的流式處理框架,所以后續(xù)新版本的 kafka 提供了事物消息來確保恰好一次語義,我們將在本文后續(xù)章節(jié)討論這個問題。

接下來我們通過一個代碼片段的實(shí)例來看下如何使用生產(chǎn)者 api,同時看一下一些重要的配置,首先讓我們創(chuàng)建一個有兩個 partition,每個 partition 都有兩個 replicas 的 topic

  1. bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 2 --partitions 2 --topic test 

然后讓我們寫一段 java 生產(chǎn)者片段,代碼非常簡單,只是配置一些生產(chǎn)者 client 的相關(guān)配置,然后調(diào)用 producer的 send 方法將需要發(fā)送的消息提交到 kafka 的 client 庫

  1. Properties props = new Properties(); 
  2.  
  3. props.put("bootstrap.servers""localhost:9092"); 
  4.  
  5. props.put("acks""all"); 
  6.  
  7. props.put("retries",3); 
  8.  
  9. props.put("retry.backoff.ms",2000); 
  10.  
  11. props.put("compression.type","lz4"); 
  12.  
  13. props.put("batch.size"16384); 
  14.  
  15. props.put("linger.ms",200); 
  16.  
  17. props.put("max.request.size",1048576); 
  18.  
  19. props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer"); 
  20.  
  21. props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer"); 
  22.  
  23. props.put("request.timeout.ms"10000); 
  24.  
  25. props.put("max.block.ms"30000); 
  26.  
  27. Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 1000; i++) { 
  28.  
  29. Future<RecordMetadata> future = producer.send(new ProducerRecord<>("test", UUID.randomUUID().toString())); 
  30.  
  31. System.out.println("produce offset:" + future.get().offset()); 
  32.  
  33.  
  34. producer.close(); 

通過以上代碼就可以完成消息的生產(chǎn)了,kafka 給我們提供的這個 API 的功能確實(shí)非常簡單易用,當(dāng)然這里面實(shí)際上包含上比較多的細(xì)節(jié),不過被 client 封裝了進(jìn)去,這里我們繼續(xù)往深處挖掘下,看看隱藏在這段代碼里面的可能存在的坑。

首先我們來簡單分析下在這段代碼里面 client 會做什么,(注:這里我們更傾向于給出一個通用工作流程,所以可能會忽略部分 java客戶端獨(dú)有特性 )。

1. client 通過代碼中給出的 bootstrap.servers 去連接 broker,這里如果第一個broker 連接失敗,那么 client 會從左往右重試去連接,直到全部連接失敗或者某一個地址成功連接。

2. 當(dāng)連接成功后,kafka client 會發(fā)起 ApiVersions request去kafka server 查詢server 端支持各個 api 以及每個 api 最大的支持版本。從而達(dá)到 kafka 一個向下兼容的目的。當(dāng)然由于 ApiVersions 是一個大約在 0.10 版本加入的 api,所以新版client 如果訪問 0.9 版本的 kafka server 會引起ArrayIndexOutOfBoundsException 的報(bào)錯,這個錯誤 kafka 官方在 0.10 的時候修復(fù)了。

3. 接下來 client 會查詢將要發(fā)送消息的 topic 的元數(shù)據(jù)信息,向已經(jīng)連接的 broker 發(fā)送 Metadata request,通過這個 api,kafka client 將會拿到集群 broker 的各種信息,包括 ip 和 port,以及 broker 對應(yīng)的唯一 id,同時 client 也將獲取到 topic的相關(guān)信息,parition 的 id 和 partition 選擇出來的 leader replicas 所在 broker 的id,然后 client 將會建立 leader replicas 所在 broker 的連接,作為實(shí)際發(fā)送消息的數(shù)據(jù)鏈路。

在這里我們有三個細(xì)節(jié)需要注意

  • 第一點(diǎn),如果 kafka 的 server 配置了 auto.create.topics.enable 為 true,那么如果 client 查詢了一個不存在的 topic 元數(shù)據(jù),這個 topic 隨后會被 kafka server 自動創(chuàng)建。
  • 第二點(diǎn),一般來說,kafka client 處理 Metadata 是一個定期刷新的動作,假如 Metadata 每過 30s 刷新一次,那么在這 30s 中,用戶修改了 topic 配置增加了一個 partition,client 是無法感知的,需要等待到client 更新了 Metadata,生產(chǎn)端才會知道這個 topic 多出了一個 partition,才能往新的 partition 寫入數(shù)據(jù)。
  • 那么如果在 Metadata 刷新時,由于 client 生產(chǎn)流量持續(xù)超過 kafka 配額限制,導(dǎo)致 kafka 限流,使得獲取 Metadata 數(shù)據(jù)一直重試和超時,這種極端情況下,client 可能會非常長一段時間無法感知到 partition 的新增。
  • 這種情況在生產(chǎn)實(shí)踐中也是發(fā)生過的,如果大家使用過程中發(fā)現(xiàn)了這種生產(chǎn)端遲遲不寫入任何消息到新建的 partition 的情況,那么多半可以從這個方向入手。
  • 第三點(diǎn),從 kafka 的建立鏈接的邏輯來看,kafka 實(shí)際上是會建立一條更多的鏈接的,同時也會直接鏈接到集群中不同的 broker 上,所以這里如果要申請防火墻策略,那么一定要為每個 broker 都申請好策略,否則可能會出現(xiàn),能夠拿到 Metadata,但無法生產(chǎn)消息的情況。

4.client 開始根據(jù) message 中的 key 來計(jì)算 hash,確定這個 message 會被投遞到哪個 partition 中去,然后 client 投遞消息到本地的一個隊(duì)列中,實(shí)際連接到partition 的投遞者類,將從隊(duì)列中取出消息,然后 client 會做兩個檢查之后調(diào)用Produce request 去投遞消息。

  • 如果消息大于 max.request.size 則直接返回 RecordTooLargeException
  • 如果消息小于 batch.size 則等待后續(xù)消息,直到到消息大小總和大于 batch.size或者超過 linger.ms 規(guī)定的時間
  • client 將會啟動一個異步過程或者同步過程等待 Produce request 的返回,然后將依據(jù)配置的重試策略來執(zhí)行重試或者返回發(fā)送失敗的錯誤到業(yè)務(wù)邏輯中,讓業(yè)務(wù)邏輯進(jìn)行錯誤處理。

5. 在這一步中主要涉及到以下幾個配置

  • acks 為 -1 或者 all,代表所有處于 isr(in-sync) 列表中的 replicas 都寫入消息成功后才會返回成功給客戶端,同時在 topic 級別也提供了一個min.insync.replicas 配置,如果 isr 中的 replicas 少于這個配置的值,那么寫入同樣會失敗。這是 kafka 所能提供的最強(qiáng)約束了。
  • acks 為 0,代表消息只要投遞到 client 的 tcp socket 緩沖區(qū)后就認(rèn)為已經(jīng)發(fā)送出去了,client 不再關(guān)注是否 kafka 集群是否收到或者寫入成功。在這種模式下kafka 只提供 at-most-once 語義,在容忍數(shù)據(jù)丟失的情況下,是性能最好的模式。
  • acks 為 1 代表發(fā)送消息到 replicas leader 寫入成功就返回成功,不關(guān)注其余follower 是否寫入成功,如果投遞消息后,leader 馬上掛掉了,消息是會丟失的。
  • 這個模式在大多數(shù)時候可以確保消息不丟失,是一個性能和安全性權(quán)衡的模式。
  • server 將根據(jù) client 提供的 acks 配置值來確定服務(wù)端的寫入會在什么情況下返回給客戶端
  • client 如果接收到 produce 寫入失敗,那么將會重試 retries 配置的次數(shù),每次重試之間間隔 retry.backoff.ms 所定義的時間。重試次數(shù)耗盡之后才會返回失敗到業(yè)務(wù)邏輯。

以上就是整個 producer api 在使用過程中的一些細(xì)節(jié)了,明白了這些細(xì)節(jié),在生產(chǎn)時遇到kafka的一些奇怪報(bào)錯就會有一些思路去定位和處理。當(dāng)然從代碼上來看,代碼里面還有一些配置在上面的文章中沒有覆蓋到,這里我在一起介紹一下

  • compression.type 用于配置壓縮,kafka 提供不同的壓縮模式,包括 none(不壓縮),gzip,snappy,lz4,以及 zstd(需要2.1.0以上版本的kafka)。
  • 一般來說我們比較推薦 lz4 格式的壓縮,在比較輕的 cpu 負(fù)載下,可以提供不錯的壓縮比,和非常高的吞吐量,整體的性能和性價(jià)比會優(yōu)于其他幾個壓縮方式,所以一般沒有強(qiáng)烈的壓縮比需求的話,使用 lz4 是比較好的選擇。
  • key.serializer 和 value.serializer 是序列化器,這個只是在 java 的客戶端中特有的,用于決定如何把 key 和 value 的值序列化,這里就不細(xì)說了。
  • request.timeout.ms 這個配置定義了網(wǎng)絡(luò)請求超時時間,任何一個 kafka client對于 server 的請求,如果在本參數(shù)規(guī)定時間內(nèi)沒有收到答復(fù),那么就都會取消請求并認(rèn)為請求失敗,并將邏輯轉(zhuǎn)移到失敗處理邏輯,這個約束是比較強(qiáng)的約束。
  • max.block.ms 這個配置項(xiàng)定義了 client 內(nèi)部的一個阻塞時間,比如如果內(nèi)部的異步隊(duì)列滿了,kafka client 調(diào)用 send 會等待這樣一個時間,直到超時返回,這個參數(shù)需要注意的一點(diǎn)是用戶自定義配置的序列化器和分區(qū)器中花費(fèi)的時間不會計(jì)入這個參數(shù)超時中。

冪等生產(chǎn)者(Idempotent Producer)簡介

冪等生產(chǎn)者提供了生產(chǎn)者在單一分區(qū)上的恰好一次語義,但是他不能覆蓋到生產(chǎn)者對于復(fù)數(shù) partition 操作的一致性,這種一致性需要通過后續(xù)的事務(wù)消息來解決。

現(xiàn)在讓我們先看下冪等生產(chǎn)者如何使用,以及一些涉及到的細(xì)節(jié)。

為什么我們需要使用到冪等生產(chǎn)者,其主要的原因是生產(chǎn)者發(fā)送消息到服務(wù)端后,如果遇到了網(wǎng)絡(luò)問題導(dǎo)致連接斷開,生產(chǎn)者是無法感知到消息到底是寫入成功還是失敗,對于 kafka 一般的生產(chǎn)者 api 來說我們會設(shè)置 retries 參數(shù),始終去進(jìn)行重試,這也就是我們所謂至少一次語義,因?yàn)槲覀儫o法感知是否寫入成功,如果寫入成功,但是我們沒有接收到成功的回復(fù),我們進(jìn)行重試動作,就會導(dǎo)致消息的重復(fù)寫入,如果消息消費(fèi)依賴于消息順序,這種重試甚至?xí)?dǎo)致順序的錯亂。

現(xiàn)在通過冪等生產(chǎn)者,kafka 可以在我們進(jìn)行這樣的重試的時候丟棄掉這種重復(fù)寫入的消息。

現(xiàn)在讓我們看看如何使用冪等生產(chǎn)者。(這里讓我們來看下代碼,代碼中讓我們忽略掉一些不重要的配置)。

  1. Properties props = new Properties(); 
  2.  
  3. props.put("bootstrap.servers""localhost:9092"); 
  4.  
  5. props.put("enable.idempotence"true); 
  6.  
  7. Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 1000; i++) { 
  8.  
  9. Future<RecordMetadata> future = producer.send(new ProducerRecord<>("test", UUID.randomUUID().toString())); 
  10.  
  11. System.out.println("produce offset:" + future.get().offset()); 
  12.  
  13.  
  14. producer.close(); 

在用戶的使用上,要啟動冪等生產(chǎn)者只需要添加設(shè)置 enable.idempotence 為 true 就好,讓我們繼續(xù)關(guān)注下細(xì)節(jié),看看啟用冪等生產(chǎn)者后 kafka client 會做什么。首先 client會強(qiáng)制設(shè)置一些生產(chǎn)者的配置值。

 

  1. acks 會被強(qiáng)制設(shè)置為all,如果客戶本來使用的是0,1級別的 acks 那么用戶需要考慮下被設(shè)置為 all 的時候?qū)τ谧约簶I(yè)務(wù)性能的影響,如果用戶本來就是設(shè)置為all的情況,那么使用冪等生產(chǎn)者是幾乎不會有額外代價(jià)的。
  2. retries 必須設(shè)置為大于1的數(shù)字,一般 librdkafka 和 java kafka client 會把 retries設(shè)置為一個非常大的數(shù)比如 Integer.MAX_VALUE,基本靠近于無限重試。確保消息一定會成功發(fā)送。
  3. max.inflight.requests.per.connection 必須小于5,其中 java kafka client 如果版本小于1.0.0,會把 max.inflight.requests.per.connection 設(shè)置為 1,確保一條數(shù)據(jù)鏈路上一次只有一個請求,這會導(dǎo)致一定情況下 tps 有所下降。
  4. 發(fā)送的消息格式必須是 v2 格式。不支持低版本的消息格式。

完成生產(chǎn)者配置之后,client 開始執(zhí)行生產(chǎn)消息的發(fā)送,這里我們省略在上文提到過的生產(chǎn) api 的邏輯,只關(guān)注于啟用冪等后多出來的邏輯和步驟

  1. 生產(chǎn)者 client 向 broker 發(fā)起 InitProducerId request 請求一個 PID,后續(xù)發(fā)送的消息,都會帶上這一個 PID 用于標(biāo)明生產(chǎn)者的身份。
  2. 每個消息會帶上一個單調(diào)遞增的 Sequence ID。kafka server會記錄下同一個PID最后一次提交消息的 Sequence ID,如果當(dāng)前發(fā)送的消息 Sequence ID 小于等于最后一次提交的 ID,那么 server 會認(rèn)為當(dāng)前消息已經(jīng)過期了,并拒絕接受消息。client 收到這樣的拒絕請求后就可以感知到之前的消息一定是投遞成功了,并停止重試發(fā)送,丟棄掉消息。

通過以上的這些步驟,kafka 確保了每個消費(fèi)者對于單 partition 操作的一個冪等性,這是一個非常實(shí)用的功能,特別是在使用消費(fèi)者 api 的時候本來就已經(jīng)設(shè)置了acks 為 all 的業(yè)務(wù),啟用生產(chǎn)者冪等幾乎沒有額外消耗,這也是一個 kafka 推出了比較久的功能了(從 kafka 0.11 開始支持),但是目前看起使用本功能的用戶還是比較少。

事務(wù)消息(Transactional Messaging)簡介

事務(wù)消息是目前 kafka 為了確保恰好一次語義所提供的最強(qiáng)約束,他確保了一個生產(chǎn)者如果生產(chǎn)多個相互關(guān)聯(lián)的消息到不同的 partition 上時要么最后同時成功,要么同時失敗。同時啟用事物消息的前提必須啟用冪等生產(chǎn)者,所以單 partition 上的恰好一次語義就由冪等的特性來保證。

不過一般很少有業(yè)務(wù)會直接使用 kafka 的事物消息,會涉及使用事物消息的業(yè)務(wù)其實(shí)基本上都是通過 kafka stream 進(jìn)行流處理,而 kafka stream 依賴于事務(wù)消息并且對于業(yè)務(wù)隱藏掉了事務(wù)細(xì)節(jié),所以這里我們來看看如何直接使用事務(wù)消息并繼續(xù)嘗試分析下client 在這期間做了什么,先讓我們放出一份代碼片段。

  1. Properties props = new Properties(); 
  2.  
  3. props.put("bootstrap.servers""localhost:9092"); 
  4.  
  5. props.put("enable.idempotence""true"); 
  6.  
  7. props.put("transactional.id""testtrans-1"); 
  8.  
  9. KafkaProducer<String, String> producer = new KafkaProducer(props); 
  10.  
  11. producer.initTransactions(); try
  12.  
  13. producer.beginTransaction(); 
  14.  
  15. producer.send(record0); 
  16.  
  17. producer.send(record1); 
  18.  
  19. producer.sendOffsetsToTxn(…); 
  20.  
  21. producer.commitTransaction(); 
  22.  
  23. catch( ProducerFencedException e) { 
  24.  
  25. producer.close(); 
  26.  
  27. catch( KafkaException e ) { 
  28.  
  29. producer.abortTransaction(); 
  30.  

首先代碼里面執(zhí)行 initTransactions 作為第一步,在這個邏輯中 client 將會請求 InitProducerId 并傳遞事務(wù) id,用來建立一個事務(wù) id 和 PID 一對一的關(guān)系。如果有多個生產(chǎn)者加入到同一個事務(wù) id 中,前面加入的生產(chǎn)者都會被后面加入的替代。前面生產(chǎn)者的請求都會被拒絕。

值得注意的是如果 client 短線重新連接,它會在請求 InitProducerId 的時候提交之前使用的 PID 以及 epoch,如果成功隨后 server 會返回 epoch+1,同時會拒絕所有 epoch小于當(dāng)前 epoch 的生產(chǎn)者消息,這是為了解決分布式系統(tǒng)中所謂的僵死問題。

然后接下來的代碼調(diào)用就和很多事務(wù)代碼一樣了,啟動一個事務(wù),寫入所有需要寫入的信息,最后再 commit,如果失敗則回滾,如果成功就會一起提交所有寫入,然后做接下來的業(yè)務(wù)邏輯。一般大部分事務(wù)的實(shí)現(xiàn)都是一個狀態(tài)機(jī),這里我們就放上一張圖不繼續(xù)分析下去了。

在看完了事務(wù)代碼后,我們似乎沒有提到 sendOffsetsToTxn 這個函數(shù),這個函數(shù)實(shí)際上是用于當(dāng)前事務(wù)消息是一個從一個 topic 消費(fèi),然后寫入到事務(wù)消息的時候使用的,消費(fèi)的 offset 可以通過這個函數(shù)提交到協(xié)調(diào)者,后續(xù)在事物提交的時候再一并提交消費(fèi)者消費(fèi)掉的 offset。防止事務(wù)失敗的時候用戶還需要手動管理消費(fèi)者 offset。是一個非常有用的幫助函數(shù)。

總結(jié)

到此為止,我們從客戶端視角出發(fā)簡單的去分析了 kafka 生產(chǎn)者的一些用法和相應(yīng)需要注意的坑,由于作者的本篇文章是從工作中遇到的一些問題出發(fā)的,所以相應(yīng)的如果某些地方用得多有人咨詢的多,那么可能會寫的稍微詳細(xì)一些,有的地方咨詢的人少,遇到的問題也相應(yīng)比較少,可能就會簡略一些。

希望本文能夠讓各位讀者有所收獲,能夠?qū)?kafka 生產(chǎn)者這部分有更好的了解。感謝各位的閱讀,讓我們下一篇文章 kafka 消費(fèi)者再見。

 

 

責(zé)任編輯:張燕妮 來源: 高效運(yùn)維
相關(guān)推薦

2021-09-09 06:55:43

kafka冪等生產(chǎn)者

2021-07-05 06:26:08

生產(chǎn)者kafka架構(gòu)

2022-05-10 10:06:03

Kafka

2020-04-17 14:49:34

Kafka分區(qū)數(shù)據(jù)

2015-08-26 09:39:30

java消費(fèi)者

2021-12-28 12:01:59

Kafka 消費(fèi)者機(jī)制

2022-05-23 08:20:29

Kafka生產(chǎn)者元數(shù)據(jù)管理

2013-07-29 18:09:45

3D打印Autodesk

2009-08-13 13:14:31

C#生產(chǎn)者和消費(fèi)者

2021-12-22 11:00:05

模型Golang語言

2024-03-14 11:58:43

2012-02-14 12:31:27

Java

2021-08-31 10:26:24

存儲

2017-05-16 12:30:21

Python多線程生產(chǎn)者消費(fèi)者模式

2024-10-11 09:27:52

2025-04-29 01:10:00

Kafka高并發(fā)系統(tǒng)

2016-10-21 16:30:18

Linux操作系統(tǒng)

2016-10-21 20:27:03

Linux

2015-10-08 10:04:39

Python高手

2021-04-20 08:32:51

消息MQ隊(duì)列
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號