我們一起深入理解Flink State
寫在前面
State是指流計(jì)算過(guò)程中計(jì)算節(jié)點(diǎn)的中間計(jì)算結(jié)果或元數(shù)據(jù)屬性,比如 在aggregation過(guò)程中要在state中記錄中間聚合結(jié)果,比如 Apache Kafka 作為數(shù)據(jù)源時(shí)候,我們也要記錄已經(jīng)讀取記錄的offset,這些State數(shù)據(jù)在計(jì)算過(guò)程中會(huì)進(jìn)行持久化(插入或更新)。本文將詳細(xì)介紹一下Flink State,通過(guò)本文,你可以了解到:
- State分類
- 什么是狀態(tài)后端(state backend)
- State對(duì)擴(kuò)縮容的處理
感謝關(guān)注,希望本文對(duì)你有所幫助。
State分類
Flink 中的狀態(tài)分為兩種主要類型:Keyed State 和 Operator State。
Keyed State
- 概念:Keyed State 是和鍵(key)相關(guān)聯(lián)的狀態(tài)。在 Flink 的 Keyed Streams 上進(jìn)行有狀態(tài)操作時(shí)(例如在使用 keyBy 方法后),每個(gè) key 都會(huì)有自己的狀態(tài)實(shí)例,這個(gè)狀態(tài)是獨(dú)立的,即每個(gè) key 的狀態(tài)對(duì)于其他 keys 不可見(jiàn)。
- 用法:Keyed State 常用于需要按 key 進(jìn)行分區(qū)處理的情況,如聚合計(jì)算(sum、min、max)、窗口操作和其他需要按 key 維護(hù)和更新?tīng)顟B(tài)的計(jì)算。在 SQL 語(yǔ)句中,Keyed State 對(duì)應(yīng)的就是通過(guò) GroupBy 或 PartitionBy 所定義的字段分組。
- 數(shù)據(jù)結(jié)構(gòu):Keyed State 底層通常是基于哈希表的實(shí)現(xiàn),確保每個(gè) key 都能快速地找到對(duì)應(yīng)的狀態(tài)。這種狀態(tài)通常存儲(chǔ)在 Keyed State 后端中,可以是內(nèi)存中,也可以是 RocksDB 這種本地存儲(chǔ)。
Operator State
- 概念:Operator State 與特定的操作符實(shí)例(Task)相關(guān)聯(lián),而不是和特定的 key 關(guān)聯(lián)。每個(gè)操作符實(shí)例維護(hù)自己的狀態(tài),所有的 Operator State 實(shí)例對(duì)于同一操作符是可見(jiàn)的。
- 用法:Operator State 通常用于記錄源(Source)和接收器(Sink)的相關(guān)狀態(tài),或者用于需要操作符級(jí)別聚合的場(chǎng)合。例如,一個(gè) Source Connector 可能會(huì)使用 Operator State 來(lái)記錄已經(jīng)讀取的數(shù)據(jù)源的 offset。
- 實(shí)現(xiàn):Flink 提供了幾種不同的 Operator State 類型,包括列表狀態(tài)(ListState)、聯(lián)合列表狀態(tài)(UnionListState)、廣播狀態(tài)(BroadcastState)等。這些狀態(tài)通常存儲(chǔ)在 Operator State 后端中,可以是內(nèi)存中,也可以是持久化存儲(chǔ)。
值得注意的是:
在 Flink 的 Table API 或 SQL API 中,對(duì)于內(nèi)部的 GroupBy/PartitionBy 操作,F(xiàn)link 會(huì)自動(dòng)管理 Keyed State。而對(duì)于 Source Connector 記錄 offset 這樣的操作,通常是在底層的 DataStream API 中實(shí)現(xiàn)的,可能直接使用 Operator State 來(lái)管理。例如,F(xiàn)link Kafka Consumer 會(huì)使用 Operator State 來(lái)存儲(chǔ) Kafka 主題的分區(qū) offset,以便在發(fā)生故障時(shí)能夠從上次成功的檢查點(diǎn)恢復(fù)。
什么是狀態(tài)后端(state backend)
State的具體存儲(chǔ)、訪問(wèn)和維護(hù)是由**狀態(tài)后端(state backend)**決定的。狀態(tài)后端主要負(fù)責(zé)兩件事情:
- 本地狀態(tài)管理
- 將狀態(tài)以checkpoint的形式寫入遠(yuǎn)程存儲(chǔ)
Flink提供了三種狀態(tài)后端:
MemoryStateBackend(內(nèi)存狀態(tài)后端)
- 存儲(chǔ):狀態(tài)存儲(chǔ)在 TaskManager 的 JVM 堆內(nèi)存上。生成checkpoint時(shí),*MemoryStateBackend會(huì)將狀態(tài)發(fā)送至JobManager并保存到它的堆內(nèi)存中。
- 使用場(chǎng)景:適用于小規(guī)模狀態(tài)或本地測(cè)試,因?yàn)樗鼘⑺袪顟B(tài)作為序列化數(shù)據(jù)保存在 JVM 堆上。如果 TaskManager 發(fā)生故障,狀態(tài)會(huì)丟失。
- 性能:由于狀態(tài)是直接存儲(chǔ)在內(nèi)存中的,所以訪問(wèn)速度很快。
- 限制:狀態(tài)大小受限于 TaskManager 可用內(nèi)存。大規(guī)模狀態(tài)可能導(dǎo)致內(nèi)存溢出錯(cuò)誤。
FsStateBackend(文件系統(tǒng)狀態(tài)后端)
- 存儲(chǔ):狀態(tài)存儲(chǔ)在 TaskManager 的 JVM 堆內(nèi)存中(作為緩存),但在檢查點(diǎn)(checkpoint)時(shí),會(huì)持久化到配置的文件系統(tǒng)(如 HDFS)中。
- 使用場(chǎng)景:適用于需要持久化狀態(tài)以避免數(shù)據(jù)丟失的場(chǎng)景。在發(fā)生故障時(shí),F(xiàn)link 作業(yè)可以從文件系統(tǒng)中的檢查點(diǎn)恢復(fù)狀態(tài)。
- 性能:由于狀態(tài)在內(nèi)存中進(jìn)行操作,并在檢查點(diǎn)時(shí)異步寫入文件系統(tǒng),因此可以提供較快的狀態(tài)訪問(wèn)速度,但可能受文件系統(tǒng)性能的限制。
- 限制:內(nèi)存中的狀態(tài)大小仍然受限于 TaskManager 可用內(nèi)存,但由于檢查點(diǎn)數(shù)據(jù)被寫入到更穩(wěn)定的文件系統(tǒng),因此可以支持更大的狀態(tài)。
RocksDBStateBackend(RocksDB 狀態(tài)后端)
RocksDB是一個(gè)嵌入式鍵值存儲(chǔ)(key-value store),它可以將數(shù)據(jù)保存到本地磁盤上,為了從RocksDB中讀寫數(shù)據(jù),系統(tǒng)需要對(duì)數(shù)據(jù)進(jìn)行序列化和反序列化。
- 存儲(chǔ):狀態(tài)存儲(chǔ)在本地磁盤上的 RocksDB 數(shù)據(jù)庫(kù)中,檢查點(diǎn)數(shù)據(jù)會(huì)持久化到配置的文件系統(tǒng)中。
- 使用場(chǎng)景:適用于大規(guī)模狀態(tài)管理的場(chǎng)景。由于 RocksDB 是一個(gè)優(yōu)化的鍵值存儲(chǔ),因此可以有效地管理大量狀態(tài)數(shù)據(jù)。
- 性能:狀態(tài)訪問(wèn)速度可能比內(nèi)存狀態(tài)后端慢(磁盤讀寫以及序列化和反序列化對(duì)象的開(kāi)銷),但 RocksDB 提供了針對(duì)大量狀態(tài)數(shù)據(jù)的優(yōu)化。
- 限制:對(duì)本地磁盤空間有需求,但由于狀態(tài)是在本地磁盤上操作,因此可以支持非常大的狀態(tài)。
在選擇狀態(tài)后端時(shí),需要考慮應(yīng)用的狀態(tài)大小、恢復(fù)速度、持久性和部署環(huán)境。對(duì)于生產(chǎn)環(huán)境,通常推薦使用 RocksDBStateBackend,因?yàn)樗軌蛱峁┝己玫臄U(kuò)展性和容錯(cuò)性。
State對(duì)擴(kuò)縮容的處理
Operator State 的擴(kuò)容處理
在 Apache Flink 中,對(duì)于有狀態(tài)的流處理作業(yè),當(dāng)作業(yè)進(jìn)行擴(kuò)容(scaling out)或縮容(scaling in)時(shí),即增加或減少并行子任務(wù)的數(shù)量時(shí),F(xiàn)link 需要重新分配 OperatorState。這個(gè)過(guò)程稱為狀態(tài)重分配(state redistribution)。
對(duì)于 Operator State 的擴(kuò)容處理,F(xiàn)link 提供了不同的重分配模式來(lái)處理狀態(tài):
ListState
對(duì)于 ListState 類型的 Operator State,如果流任務(wù)的并行度從 N 增加到 M,F(xiàn)link 會(huì)將每個(gè)并行實(shí)例的狀態(tài)分成 M 份,然后將這些分片分配給新的并行實(shí)例。如果并行度減少,則相反,狀態(tài)將會(huì)聚合起來(lái)。
圖片
擴(kuò)容時(shí):
- 假設(shè)原來(lái)有 2 個(gè)并行實(shí)例,每個(gè)實(shí)例有自己的 ListState。
- 擴(kuò)容到 3 個(gè)并行實(shí)例。
- Flink 會(huì)將每個(gè)原來(lái)的 ListState 平均分成 3 份。
- 新的 3 個(gè)并行實(shí)例每個(gè)都會(huì)接收一份來(lái)自每個(gè)原始 ListState 的數(shù)據(jù)。
縮容時(shí):
- 假設(shè)原來(lái)有 3個(gè)并行實(shí)例。
- 縮容到 1 個(gè)并行實(shí)例。
- 現(xiàn)有的狀態(tài)將會(huì)被聚合,確保新的 1 個(gè)實(shí)例完整地包含原始狀態(tài)的全部數(shù)據(jù)。
BroadcastState
BroadcastState 的數(shù)據(jù)在擴(kuò)容或縮容時(shí)會(huì)被復(fù)制到所有的并行實(shí)例中。由于 BroadcastState 是以廣播的方式存儲(chǔ)數(shù)據(jù),所有并行實(shí)例的狀態(tài)都是相同的。
圖片
UnionListState
對(duì)于 UnionListState 類型的 Operator State,在擴(kuò)容或縮容時(shí),狀態(tài)的每個(gè)元素將保持不變,原始狀態(tài)的所有元素將被統(tǒng)一地分發(fā)到新的并行實(shí)例中。這意味著每個(gè)元素僅分配給一個(gè)并行實(shí)例,但所有并行實(shí)例的狀態(tài)的并集會(huì)包括所有原始狀態(tài)的元素。隨后由任務(wù)自己決定哪些條目該保留,哪些該丟棄。
圖片
思考:Source的擴(kuò)容(并發(fā)數(shù))是否可以超過(guò)Source物理存儲(chǔ)的partition數(shù)量呢?
在使用像 Apache Kafka 這樣的消息隊(duì)列作為數(shù)據(jù)源(Source)時(shí),消息隊(duì)列中的數(shù)據(jù)被劃分為多個(gè)分區(qū)(partitions)。這種設(shè)計(jì)主要是為了支持?jǐn)?shù)據(jù)的并行處理以及提高吞吐量。在使用 Flink 或類似的流處理框架時(shí),一個(gè)常見(jiàn)的做法是將每個(gè)分區(qū)分配給一個(gè)并行的 Source 實(shí)例(也稱為 Source Task 或 Source Operator)進(jìn)行處理。
如果嘗試將 Source 的并行度(并發(fā)數(shù))設(shè)置得比物理存儲(chǔ)(比如 Kafka 主題)的分區(qū)數(shù)量還要高,那么將會(huì)有一些并行實(shí)例分配不到任何分區(qū),因?yàn)榉謪^(qū)的數(shù)量是固定的,且每個(gè)分區(qū)只能被一個(gè)并行實(shí)例消費(fèi)(至少在 Flink 的默認(rèn)設(shè)置下是這樣)。這會(huì)導(dǎo)致資源浪費(fèi),因?yàn)槌龇謪^(qū)數(shù)量的那部分并行實(shí)例不會(huì)做任何實(shí)際的數(shù)據(jù)處理工作,但仍然占用系統(tǒng)資源。
因此,在設(shè)置 Source 的并行度時(shí),通常的最佳實(shí)踐是:
- 確保 Source 的并行度不超過(guò)其對(duì)應(yīng)物理存儲(chǔ)(如 Kafka 主題)的分區(qū)數(shù)量。
如果需要增加并行度以提高處理能力,相應(yīng)地也需要增加物理存儲(chǔ)的分區(qū)數(shù)量。對(duì)于 Kafka 來(lái)說(shuō),可以通過(guò)修改主題的分區(qū)配置來(lái)實(shí)現(xiàn)。
對(duì)于 Apache Flink,如果使用的是 Flink Kafka Connector,并且嘗試將并行度設(shè)置得比 Kafka 主題的分區(qū)數(shù)量還要高,F(xiàn)link 會(huì)在作業(yè)啟動(dòng)時(shí)進(jìn)行檢查。如果發(fā)現(xiàn)這種配置不匹配的情況,F(xiàn)link 會(huì)拋出異常并終止作業(yè)啟動(dòng),以避免資源浪費(fèi)和潛在的配置錯(cuò)誤。這種設(shè)計(jì)選擇確保了資源的有效利用和處理能力的合理分配,同時(shí)也避免了由于配置錯(cuò)誤而導(dǎo)致的潛在問(wèn)題。
KeyedState對(duì)擴(kuò)容的處理
- 什么是Key-Groups
KeyedState的算子在擴(kuò)容時(shí)會(huì)根據(jù)新的任務(wù)數(shù)量對(duì)key進(jìn)行重分區(qū),為了降低狀態(tài)在不同任務(wù)之間遷移的成本,F(xiàn)link不會(huì)單獨(dú)對(duì)key進(jìn)行在分配,而是會(huì)把所有的鍵值分別存到不同的key-group中,每個(gè)key-group都包含了部分鍵值對(duì)。一個(gè)key-group是State分配的原子單位。
- 什么決定Key-Groups的個(gè)數(shù)
key-group的數(shù)量在job啟動(dòng)前必須是確定的且運(yùn)行中不能改變。由于key-group是state分配的原子單位,而每個(gè)operator并行實(shí)例至少包含一個(gè)key-group,因此operator的最大并行度不能超過(guò)設(shè)定的key-group的個(gè)數(shù),那么在Flink的內(nèi)部實(shí)現(xiàn)上key-group的數(shù)量就是最大并行度的值。
- 如何決定key屬于哪個(gè)Key-Group
為了決定一個(gè)key屬于哪個(gè)Key-Group,通常會(huì)采用一種叫做一致性哈希(Consistent Hashing)的算法。一致性哈希算法的基本思想是將所有的Key和所有的Key-Group都映射到同一個(gè)哈希環(huán)上。對(duì)每個(gè)Key進(jìn)行哈希運(yùn)算得到一個(gè)哈希值,然后在哈希環(huán)上找到一個(gè)順時(shí)針?lè)较蜃罱腒ey-Group,這個(gè)Key就屬于這個(gè)Key-Group。即:Key到指定的key-group的邏輯是利用key的hashCode和maxParallelism取余操作的來(lái)分配的。
如下圖當(dāng)parallelism=2,maxParallelism=10的情況下流上key與key-group的對(duì)應(yīng)關(guān)系如下圖所示:
圖片
如上圖key(a)的hashCode是97,與最大并發(fā)10取余后是7,被分配到了KG-7中,流上每個(gè)event都會(huì)分配到KG-0至KG-9其中一個(gè)Key-Group中。
上面的Stateful Operation節(jié)點(diǎn)的最大并行度maxParallelism的值是10,也就是我們一共有10個(gè)Key-Group,當(dāng)我們并發(fā)是2的時(shí)候和并發(fā)是3的時(shí)候分配的情況如下圖:
圖片
先計(jì)算每個(gè)Operator實(shí)例至少分配的Key-Group個(gè)數(shù),將不能整除的部分N個(gè),平均分給前N個(gè)實(shí)例。最終每個(gè)Operator實(shí)例管理的Key-Groups會(huì)在GroupRange中表示,本質(zhì)是一個(gè)區(qū)間值。比如上圖是2->3擴(kuò)容,那每個(gè)task的key-group的數(shù)量是:10/3≈3,也即是每個(gè)task先分3個(gè)key-group,然后把剩余的1個(gè)key-group分配給第一task。
值得注意的是:
Key-Group機(jī)制的特點(diǎn)就是每個(gè)具體的key(event)不關(guān)心落到具體的哪個(gè)task來(lái)處理,只關(guān)心會(huì)落到哪個(gè)Key-Group中:
- 首先 一個(gè)job運(yùn)行之后,如果要復(fù)用state,不允許在修改maxParallelism。
- key 值的hash code決定落到哪個(gè)KG中,key本身不關(guān)系被哪個(gè)task處理,也就是說(shuō)相同的KG在擴(kuò)容前后可能被不同的task處理。
總結(jié)
State是Flink流計(jì)算的關(guān)鍵部分。Flink 中的狀態(tài)分為兩種主要類型:Keyed State 和 Operator State。Flink提供了三種狀態(tài)后端:MemoryStateBackend、FsStateBackend、RocksDBStateBackend。對(duì)于Keyed State 和 Operator State應(yīng)對(duì)擴(kuò)縮容時(shí)有不同的分配方式。