自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

面試官:Kafka中的key有什么用?

開(kāi)發(fā) 架構(gòu)
對(duì)于某些應(yīng)用場(chǎng)景,消費(fèi)者可以根據(jù)消息的鍵來(lái)進(jìn)行過(guò)濾或聚合操作。例如,在實(shí)時(shí)數(shù)據(jù)分析場(chǎng)景中,可能需要對(duì)具有相同鍵的消息進(jìn)行分組處理。

我們?cè)谑褂?Kafka 時(shí),最簡(jiǎn)單、最常用的方式是只設(shè)置 topic(主題)和 value(消息體),如下所示:圖片這樣的話(huà)獲取消息的代碼也很簡(jiǎn)單,如下所示:

@KafkaListener(topics = "mytopic", groupId = "my-group")
public void listen(String data) {
    System.out.println("監(jiān)聽(tīng)到消息:" + data);
}

但是,除了我們可以設(shè)置和傳遞 topic 和 value 之外,我們還可以傳遞 key,如下圖所示:圖片那問(wèn)題來(lái)了,發(fā)送消息時(shí)設(shè)置這個(gè) key 有什么用呢?

key的作用

發(fā)送消息時(shí),設(shè)置 key 的作用如下:圖片

1.決定分區(qū)

當(dāng)生產(chǎn)者發(fā)送消息時(shí),如果指定了 key,Kafka 會(huì)根據(jù) key 的 hash 值來(lái)決定這條消息應(yīng)該發(fā)送到哪個(gè)分區(qū)。

如果沒(méi)有指定 key,Kafka 會(huì)采用輪詢(xún)(早期版本)或隨機(jī)(最新版本)的方式將消息分配到其他分區(qū)中。

分區(qū)的具體實(shí)現(xiàn)源碼在 DefaultPartitioner 中 partition 方法中體現(xiàn),核心源碼如下:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster, int numPartitions) {
    return keyBytes == null ? this.stickyPartitionCache.partition(topic, cluster) : BuiltInPartitioner.partitionForKey(keyBytes, numPartitions);
}

指定 key 之后的分區(qū)實(shí)現(xiàn)代碼如下:

public static int partitionForKey(byte[] serializedKey, int numPartitions) {
    return Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
}

以上源碼的大概含義是:使用 MurmurHash2 算法對(duì)字節(jié)數(shù)組 serializedKey 進(jìn)行哈希運(yùn)算,并將其結(jié)果轉(zhuǎn)換為正數(shù),然后對(duì) numPartitions 取模,以確定鍵在分區(qū)中的位置,返回值表示鍵所在的分區(qū)編號(hào)。

所以,從上述源碼可以看出,發(fā)送消息如果設(shè)置了 key 之后,會(huì)將相同 key 放到同一個(gè)分區(qū)中。

2.保證消息順序

在 Kafka 中,同一個(gè)分區(qū)中的消息是有序的。而相同的 key,根據(jù)上面的分區(qū)算法可知,它們會(huì)存放到同一個(gè)分區(qū),這樣就能保證消息的有序性了。

3.消息過(guò)濾

對(duì)于某些應(yīng)用場(chǎng)景,消費(fèi)者可以根據(jù)消息的鍵來(lái)進(jìn)行過(guò)濾或聚合操作。例如,在實(shí)時(shí)數(shù)據(jù)分析場(chǎng)景中,可能需要對(duì)具有相同鍵的消息進(jìn)行分組處理。

Kafka 設(shè)置了 key 之后,可以通過(guò)以下方式實(shí)現(xiàn)消息過(guò)濾,如下代碼所示:

@KafkaListener(topics = "topicName", groupId = "groupId")
public void listen(String message, ConsumerRecord<?,?> record) {
    Object key = record.key();
    if (key instanceof String && ((String) key).matches("regexPattern")) {
        // 處理滿(mǎn)足正則表達(dá)式條件的消息
    }
}

也就是,我們?cè)诮邮盏较⒅螅ㄟ^(guò)對(duì) key 的正則匹配實(shí)現(xiàn)消息的過(guò)濾和聚合等操作。

責(zé)任編輯:武曉燕 來(lái)源: 磊哥和Java
相關(guān)推薦

2020-04-23 14:09:13

URI挖坑前端

2021-07-06 07:27:45

React元素屬性

2024-03-20 15:12:59

KafkaES中間件

2022-05-05 07:38:32

volatilJava并發(fā)

2025-04-01 00:00:00

項(xiàng)目CRUD單例模式

2023-06-05 07:57:53

Kafka消息事務(wù)消息

2024-02-19 00:00:00

Python?starmap函數(shù)

2021-12-20 10:30:33

forforEach前端

2023-02-17 08:10:24

2021-11-30 07:44:50

FinalFinallyFinalize

2024-04-03 15:33:04

JWTSession傳輸信息

2024-09-19 08:42:43

2021-12-10 12:01:37

finalfinallyfinalize

2024-09-09 08:30:56

代碼

2024-04-15 00:01:00

STWJava垃圾

2023-02-20 08:08:48

限流算法計(jì)數(shù)器算法令牌桶算法

2023-12-13 15:28:32

Python工具數(shù)據(jù)

2024-04-19 08:23:06

2024-02-26 14:07:18

2023-07-11 08:40:02

IO模型后臺(tái)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)