自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Kafka不常見但是很高級的功能:Kafka 攔截器

開發(fā) 前端
今天我們花了一些時間討論 Kafka 提供的冷門功能:攔截器。如之前所說,攔截器的出場率極低,以至于我從未看到過國內(nèi)大廠實際應用 Kafka 攔截器的報道。但冷門不代表沒用。事實上,我們可以利用攔截器滿足實際的需求,比如端到端系統(tǒng)性能檢測、消息審計等。?

既然是不常見,那就說明在實際場景中并沒有太高的出場率,但它們依然是很高級很實用的。下面就有請今天的主角登場:Kafka 攔截器。

什么是攔截器?

如果你用過 Spring Interceptor 或是 Apache Flume,那么應該不會對攔截器這個概念感到陌生,其基本思想就是允許應用程序在不修改邏輯的情況下,動態(tài)地實現(xiàn)一組可插拔的事件處理邏輯鏈。它能夠在主業(yè)務操作的前后多個時間點上插入對應的“攔截”邏輯。下面這張圖展示了 Spring MVC 攔截器的工作原理:

圖片來源:https://o7planning.org/en/11229/spring-mvc-interceptors-tutorial

攔截器 1 和攔截器 2 分別在請求發(fā)送之前、發(fā)送之后以及完成之后三個地方插入了對應的處理邏輯。而 Flume 中的攔截器也是同理,它們插入的邏輯可以是修改待發(fā)送的消息,也可以是創(chuàng)建新的消息,甚至是丟棄消息。這些功能都是以配置攔截器類的方式動態(tài)插入到應用程序中的,故可以快速地切換不同的攔截器而不影響主程序邏輯。

Kafka 攔截器借鑒了這樣的設計思路。你可以在消息處理的前后多個時點動態(tài)植入不同的處理邏輯,比如在消息發(fā)送前或者在消息被消費后。

作為一個非常小眾的功能,Kafka 攔截器自 0.10.0.0 版本被引入后并未得到太多的實際應用,我也從未在任何 Kafka 技術(shù)峰會上看到有公司分享其使用攔截器的成功案例。但即便如此,在自己的 Kafka 工具箱中放入這么一個有用的東西依然是值得的。今天我們就讓它來發(fā)揮威力,展示一些非常酷炫的功能。

Kafka 攔截器

Kafka 攔截器分為生產(chǎn)者攔截器和消費者攔截器。生產(chǎn)者攔截器允許你在發(fā)送消息前以及消息提交成功后植入你的攔截器邏輯;而消費者攔截器支持在消費消息前以及提交位移后編寫特定邏輯。值得一提的是,這兩種攔截器都支持鏈的方式,即你可以將一組攔截器串連成一個大的攔截器,Kafka 會按照添加順序依次執(zhí)行攔截器邏輯。

舉個例子,假設你想在生產(chǎn)消息前執(zhí)行兩個“前置動作”:第一個是為消息增加一個頭信息,封裝發(fā)送該消息的時間,第二個是更新發(fā)送消息數(shù)字段,那么當你將這兩個攔截器串聯(lián)在一起統(tǒng)一指定給 Producer 后,Producer 會按順序執(zhí)行上面的動作,然后再發(fā)送消息。

當前 Kafka 攔截器的設置方法是通過參數(shù)配置完成的。生產(chǎn)者和消費者兩端有一個相同的參數(shù),名字叫 interceptor.classes,它指定的是一組類的列表,每個類就是特定邏輯的攔截器實現(xiàn)類。拿上面的例子來說,假設第一個攔截器的完整類路徑是com.yourcompany.kafkaproject.interceptors.AddTimeStampInterceptor,第二個類是 com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor,那么你需要按照以下方法在 Producer 端指定攔截器:

Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 攔截器 1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 攔截器 2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
……

現(xiàn)在問題來了,我們應該怎么編寫 AddTimeStampInterceptor 和 UpdateCounterInterceptor 類呢?其實很簡單,這兩個類以及你自己編寫的所有 Producer 端攔截器實現(xiàn)類都要繼承org.apache.kafka.clients.producer.ProducerInterceptor 接口。該接口是 Kafka 提供的,里面有兩個核心的方法。

  1. onSend:該方法會在消息發(fā)送之前被調(diào)用。如果你想在發(fā)送之前對消息“美美容”,這個方法是你唯一的機會。
  2. onAcknowledgement:該方法會在消息成功提交或發(fā)送失敗之后被調(diào)用。還記得我在上一期中提到的發(fā)送回調(diào)通知 callback 嗎?onAcknowledgement 的調(diào)用要早于 callback 的調(diào)用。值得注意的是,這個方法和 onSend 不是在同一個線程中被調(diào)用的,因此如果你在這兩個方法中調(diào)用了某個共享可變對象,一定要保證線程安全哦。還有一點很重要,這個方法處在 Producer 發(fā)送的主路徑中,所以最好別放一些太重的邏輯進去,否則你會發(fā)現(xiàn)你的 Producer TPS 直線下降。

同理,指定消費者攔截器也是同樣的方法,只是具體的實現(xiàn)類要實現(xiàn)org.apache.kafka.clients.consumer.ConsumerInterceptor 接口,這里面也有兩個核心方法。

  1. onConsume:該方法在消息返回給 Consumer 程序之前調(diào)用。也就是說在開始正式處理消息之前,攔截器會先攔一道,搞一些事情,之后再返回給你。
  2. onCommit:Consumer 在提交位移之后調(diào)用該方法。通常你可以在該方法中做一些記賬類的動作,比如打日志等。

一定要注意的是,指定攔截器類時要指定它們的全限定名,即 full qualified name。通俗點說就是要把完整包名也加上,不要只有一個類名在那里,并且還要保證你的 Producer 程序能夠正確加載你的攔截器類。

典型使用場景

Kafka 攔截器都能用在哪些地方呢?其實,跟很多攔截器的用法相同,Kafka 攔截器可以應用于包括客戶端監(jiān)控、端到端系統(tǒng)性能檢測、消息審計等多種功能在內(nèi)的場景。

我以端到端系統(tǒng)性能檢測和消息審計為例來展開介紹下。

今天 Kafka 默認提供的監(jiān)控指標都是針對單個客戶端或 Broker 的,你很難從具體的消息維度去追蹤集群間消息的流轉(zhuǎn)路徑。同時,如何監(jiān)控一條消息從生產(chǎn)到最后消費的端到端延時也是很多 Kafka 用戶迫切需要解決的問題。

從技術(shù)上來說,我們可以在客戶端程序中增加這樣的統(tǒng)計邏輯,但是對于那些將 Kafka 作為企業(yè)級基礎(chǔ)架構(gòu)的公司來說,在應用代碼中編寫統(tǒng)一的監(jiān)控邏輯其實是很難的,畢竟這東西非常靈活,不太可能提前確定好所有的計算邏輯。另外,將監(jiān)控邏輯與主業(yè)務邏輯耦合也是軟件工程中不提倡的做法。

現(xiàn)在,通過實現(xiàn)攔截器的邏輯以及可插拔的機制,我們能夠快速地觀測、驗證以及監(jiān)控集群間的客戶端性能指標,特別是能夠從具體的消息層面上去收集這些數(shù)據(jù)。這就是 Kafka 攔截器的一個非常典型的使用場景。

我們再來看看消息審計(message audit)的場景。設想你的公司把 Kafka 作為一個私有云消息引擎平臺向全公司提供服務,這必然要涉及多租戶以及消息審計的功能。

作為私有云的 PaaS 提供方,你肯定要能夠隨時查看每條消息是哪個業(yè)務方在什么時間發(fā)布的,之后又被哪些業(yè)務方在什么時刻消費。一個可行的做法就是你編寫一個攔截器類,實現(xiàn)相應的消息審計邏輯,然后強行規(guī)定所有接入你的 Kafka 服務的客戶端程序必須設置該攔截器。

案例分享

下面我以一個具體的案例來說明一下攔截器的使用。在這個案例中,我們通過編寫攔截器類來統(tǒng)計消息端到端處理的延時,非常實用,我建議你可以直接移植到你自己的生產(chǎn)環(huán)境中。

我曾經(jīng)給一個公司做 Kafka 培訓,在培訓過程中,那個公司的人提出了一個訴求。他們的場景很簡單,某個業(yè)務只有一個 Producer 和一個 Consumer,他們想知道該業(yè)務消息從被生產(chǎn)出來到最后被消費的平均總時長是多少,但是目前 Kafka 并沒有提供這種端到端的延時統(tǒng)計。

學習了攔截器之后,我們現(xiàn)在知道可以用攔截器來滿足這個需求。既然是要計算總延時,那么一定要有個公共的地方來保存它,并且這個公共的地方還是要讓生產(chǎn)者和消費者程序都能訪問的。在這個例子中,我們假設數(shù)據(jù)被保存在 Redis 中。

Okay,這個需求顯然要實現(xiàn)生產(chǎn)者攔截器,也要實現(xiàn)消費者攔截器。我們先來實現(xiàn)前者:

public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {


private Jedis jedis; // 省略 Jedis 初始化


@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
jedis.incr("totalSentMessage");
return record;
}


@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}


@Override
public void close() {
}


@Override
public void configure(Map<java.lang.String, ?> configs) {
}

上面的代碼比較關(guān)鍵的是在發(fā)送消息前更新總的已發(fā)送消息數(shù)。為了節(jié)省時間,我沒有考慮發(fā)送失敗的情況,因為發(fā)送失敗可能導致總發(fā)送數(shù)不準確。不過好在處理思路是相同的,你可以有針對性地調(diào)整下代碼邏輯。

下面是消費者端的攔截器實現(xiàn),代碼如下:

public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {


private Jedis jedis; // 省略 Jedis 初始化


@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long lantency = 0L;
for (ConsumerRecord<String, String> record : records) {
lantency += (System.currentTimeMillis() - record.timestamp());
}
jedis.incrBy("totalLatency", lantency);
long totalLatency = Long.parseLong(jedis.get("totalLatency"));
long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
return records;
}


@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}


@Override
public void close() {
}


@Override
public void configure(Map<String, ?> configs) {

在上面的消費者攔截器中,我們在真正消費一批消息前首先更新了它們的總延時,方法就是用當前的時鐘時間減去封裝在消息中的創(chuàng)建時間,然后累計得到這批消息總的端到端處理延時并更新到 Redis 中。之后的邏輯就很簡單了,我們分別從 Redis 中讀取更新過的總延時和總消息數(shù),兩者相除即得到端到端消息的平均處理延時。

創(chuàng)建好生產(chǎn)者和消費者攔截器后,我們按照上面指定的方法分別將它們配置到各自的 Producer 和 Consumer 程序中,這樣就能計算消息從 Producer 端到 Consumer 端平均的處理延時了。這種端到端的指標監(jiān)控能夠從全局角度俯察和審視業(yè)務運行情況,及時查看業(yè)務是否滿足端到端的 SLA 目標。

小結(jié)

今天我們花了一些時間討論 Kafka 提供的冷門功能:攔截器。如之前所說,攔截器的出場率極低,以至于我從未看到過國內(nèi)大廠實際應用 Kafka 攔截器的報道。但冷門不代表沒用。事實上,我們可以利用攔截器滿足實際的需求,比如端到端系統(tǒng)性能檢測、消息審計等。

責任編輯:武曉燕 來源: 今日頭條
相關(guān)推薦

2021-07-06 11:25:20

Chrome前端代碼

2016-12-14 20:53:04

Linuxgcc命令行

2016-12-14 19:19:19

Linuxgcc命令行

2021-10-21 22:03:00

PythonNumpy函數(shù)

2009-06-24 16:00:00

2025-02-28 08:14:53

2009-09-27 17:37:32

Hibernate攔截

2020-03-25 17:55:30

SpringBoot攔截器Java

2023-09-05 08:58:07

2011-05-16 10:14:11

Hibernate

2011-11-21 14:21:26

SpringMVCJava框架

2009-07-08 17:02:11

JDK實現(xiàn)調(diào)用攔截器

2021-08-17 10:34:19

Python數(shù)據(jù)科學機器學習

2009-06-25 15:54:42

Struts2教程攔截器

2022-02-19 21:22:23

Kafka事務API的

2021-11-03 17:04:11

攔截器操作Servlet

2009-06-25 15:59:21

Struts2教程攔截器

2012-02-03 13:27:16

2018-10-15 17:24:31

Kafka分布式數(shù)據(jù)

2009-02-04 14:19:38

點贊
收藏

51CTO技術(shù)棧公眾號