Kafka不常見但是很高級的功能:Kafka 攔截器
既然是不常見,那就說明在實際場景中并沒有太高的出場率,但它們依然是很高級很實用的。下面就有請今天的主角登場:Kafka 攔截器。
什么是攔截器?
如果你用過 Spring Interceptor 或是 Apache Flume,那么應該不會對攔截器這個概念感到陌生,其基本思想就是允許應用程序在不修改邏輯的情況下,動態(tài)地實現(xiàn)一組可插拔的事件處理邏輯鏈。它能夠在主業(yè)務操作的前后多個時間點上插入對應的“攔截”邏輯。下面這張圖展示了 Spring MVC 攔截器的工作原理:
圖片來源:https://o7planning.org/en/11229/spring-mvc-interceptors-tutorial
攔截器 1 和攔截器 2 分別在請求發(fā)送之前、發(fā)送之后以及完成之后三個地方插入了對應的處理邏輯。而 Flume 中的攔截器也是同理,它們插入的邏輯可以是修改待發(fā)送的消息,也可以是創(chuàng)建新的消息,甚至是丟棄消息。這些功能都是以配置攔截器類的方式動態(tài)插入到應用程序中的,故可以快速地切換不同的攔截器而不影響主程序邏輯。
Kafka 攔截器借鑒了這樣的設計思路。你可以在消息處理的前后多個時點動態(tài)植入不同的處理邏輯,比如在消息發(fā)送前或者在消息被消費后。
作為一個非常小眾的功能,Kafka 攔截器自 0.10.0.0 版本被引入后并未得到太多的實際應用,我也從未在任何 Kafka 技術(shù)峰會上看到有公司分享其使用攔截器的成功案例。但即便如此,在自己的 Kafka 工具箱中放入這么一個有用的東西依然是值得的。今天我們就讓它來發(fā)揮威力,展示一些非常酷炫的功能。
Kafka 攔截器
Kafka 攔截器分為生產(chǎn)者攔截器和消費者攔截器。生產(chǎn)者攔截器允許你在發(fā)送消息前以及消息提交成功后植入你的攔截器邏輯;而消費者攔截器支持在消費消息前以及提交位移后編寫特定邏輯。值得一提的是,這兩種攔截器都支持鏈的方式,即你可以將一組攔截器串連成一個大的攔截器,Kafka 會按照添加順序依次執(zhí)行攔截器邏輯。
舉個例子,假設你想在生產(chǎn)消息前執(zhí)行兩個“前置動作”:第一個是為消息增加一個頭信息,封裝發(fā)送該消息的時間,第二個是更新發(fā)送消息數(shù)字段,那么當你將這兩個攔截器串聯(lián)在一起統(tǒng)一指定給 Producer 后,Producer 會按順序執(zhí)行上面的動作,然后再發(fā)送消息。
當前 Kafka 攔截器的設置方法是通過參數(shù)配置完成的。生產(chǎn)者和消費者兩端有一個相同的參數(shù),名字叫 interceptor.classes,它指定的是一組類的列表,每個類就是特定邏輯的攔截器實現(xiàn)類。拿上面的例子來說,假設第一個攔截器的完整類路徑是com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二個類是 com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,那么你需要按照以下方法在 Producer 端指定攔截器:
現(xiàn)在問題來了,我們應該怎么編寫 AddTimeStampInterceptor 和 UpdateCounterInterceptor 類呢?其實很簡單,這兩個類以及你自己編寫的所有 Producer 端攔截器實現(xiàn)類都要繼承org.apache.kafka.clients.producer.ProducerInterceptor 接口。該接口是 Kafka 提供的,里面有兩個核心的方法。
- onSend:該方法會在消息發(fā)送之前被調(diào)用。如果你想在發(fā)送之前對消息“美美容”,這個方法是你唯一的機會。
- onAcknowledgement:該方法會在消息成功提交或發(fā)送失敗之后被調(diào)用。還記得我在上一期中提到的發(fā)送回調(diào)通知 callback 嗎?onAcknowledgement 的調(diào)用要早于 callback 的調(diào)用。值得注意的是,這個方法和 onSend 不是在同一個線程中被調(diào)用的,因此如果你在這兩個方法中調(diào)用了某個共享可變對象,一定要保證線程安全哦。還有一點很重要,這個方法處在 Producer 發(fā)送的主路徑中,所以最好別放一些太重的邏輯進去,否則你會發(fā)現(xiàn)你的 Producer TPS 直線下降。
同理,指定消費者攔截器也是同樣的方法,只是具體的實現(xiàn)類要實現(xiàn)org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,這里面也有兩個核心方法。
- onConsume:該方法在消息返回給 Consumer 程序之前調(diào)用。也就是說在開始正式處理消息之前,攔截器會先攔一道,搞一些事情,之后再返回給你。
- onCommit:Consumer 在提交位移之后調(diào)用該方法。通常你可以在該方法中做一些記賬類的動作,比如打日志等。
一定要注意的是,指定攔截器類時要指定它們的全限定名,即 full qualified name。通俗點說就是要把完整包名也加上,不要只有一個類名在那里,并且還要保證你的 Producer 程序能夠正確加載你的攔截器類。
典型使用場景
Kafka 攔截器都能用在哪些地方呢?其實,跟很多攔截器的用法相同,Kafka 攔截器可以應用于包括客戶端監(jiān)控、端到端系統(tǒng)性能檢測、消息審計等多種功能在內(nèi)的場景。
我以端到端系統(tǒng)性能檢測和消息審計為例來展開介紹下。
今天 Kafka 默認提供的監(jiān)控指標都是針對單個客戶端或 Broker 的,你很難從具體的消息維度去追蹤集群間消息的流轉(zhuǎn)路徑。同時,如何監(jiān)控一條消息從生產(chǎn)到最后消費的端到端延時也是很多 Kafka 用戶迫切需要解決的問題。
從技術(shù)上來說,我們可以在客戶端程序中增加這樣的統(tǒng)計邏輯,但是對于那些將 Kafka 作為企業(yè)級基礎(chǔ)架構(gòu)的公司來說,在應用代碼中編寫統(tǒng)一的監(jiān)控邏輯其實是很難的,畢竟這東西非常靈活,不太可能提前確定好所有的計算邏輯。另外,將監(jiān)控邏輯與主業(yè)務邏輯耦合也是軟件工程中不提倡的做法。
現(xiàn)在,通過實現(xiàn)攔截器的邏輯以及可插拔的機制,我們能夠快速地觀測、驗證以及監(jiān)控集群間的客戶端性能指標,特別是能夠從具體的消息層面上去收集這些數(shù)據(jù)。這就是 Kafka 攔截器的一個非常典型的使用場景。
我們再來看看消息審計(message audit)的場景。設想你的公司把 Kafka 作為一個私有云消息引擎平臺向全公司提供服務,這必然要涉及多租戶以及消息審計的功能。
作為私有云的 PaaS 提供方,你肯定要能夠隨時查看每條消息是哪個業(yè)務方在什么時間發(fā)布的,之后又被哪些業(yè)務方在什么時刻消費。一個可行的做法就是你編寫一個攔截器類,實現(xiàn)相應的消息審計邏輯,然后強行規(guī)定所有接入你的 Kafka 服務的客戶端程序必須設置該攔截器。
案例分享
下面我以一個具體的案例來說明一下攔截器的使用。在這個案例中,我們通過編寫攔截器類來統(tǒng)計消息端到端處理的延時,非常實用,我建議你可以直接移植到你自己的生產(chǎn)環(huán)境中。
我曾經(jīng)給一個公司做 Kafka 培訓,在培訓過程中,那個公司的人提出了一個訴求。他們的場景很簡單,某個業(yè)務只有一個 Producer 和一個 Consumer,他們想知道該業(yè)務消息從被生產(chǎn)出來到最后被消費的平均總時長是多少,但是目前 Kafka 并沒有提供這種端到端的延時統(tǒng)計。
學習了攔截器之后,我們現(xiàn)在知道可以用攔截器來滿足這個需求。既然是要計算總延時,那么一定要有個公共的地方來保存它,并且這個公共的地方還是要讓生產(chǎn)者和消費者程序都能訪問的。在這個例子中,我們假設數(shù)據(jù)被保存在 Redis 中。
Okay,這個需求顯然要實現(xiàn)生產(chǎn)者攔截器,也要實現(xiàn)消費者攔截器。我們先來實現(xiàn)前者:
上面的代碼比較關(guān)鍵的是在發(fā)送消息前更新總的已發(fā)送消息數(shù)。為了節(jié)省時間,我沒有考慮發(fā)送失敗的情況,因為發(fā)送失敗可能導致總發(fā)送數(shù)不準確。不過好在處理思路是相同的,你可以有針對性地調(diào)整下代碼邏輯。
下面是消費者端的攔截器實現(xiàn),代碼如下:
在上面的消費者攔截器中,我們在真正消費一批消息前首先更新了它們的總延時,方法就是用當前的時鐘時間減去封裝在消息中的創(chuàng)建時間,然后累計得到這批消息總的端到端處理延時并更新到 Redis 中。之后的邏輯就很簡單了,我們分別從 Redis 中讀取更新過的總延時和總消息數(shù),兩者相除即得到端到端消息的平均處理延時。
創(chuàng)建好生產(chǎn)者和消費者攔截器后,我們按照上面指定的方法分別將它們配置到各自的 Producer 和 Consumer 程序中,這樣就能計算消息從 Producer 端到 Consumer 端平均的處理延時了。這種端到端的指標監(jiān)控能夠從全局角度俯察和審視業(yè)務運行情況,及時查看業(yè)務是否滿足端到端的 SLA 目標。
小結(jié)
今天我們花了一些時間討論 Kafka 提供的冷門功能:攔截器。如之前所說,攔截器的出場率極低,以至于我從未看到過國內(nèi)大廠實際應用 Kafka 攔截器的報道。但冷門不代表沒用。事實上,我們可以利用攔截器滿足實際的需求,比如端到端系統(tǒng)性能檢測、消息審計等。