五分鐘了解Flink狀態(tài)管理
什么叫做Flink的有狀態(tài)計(jì)算呢?說白了就是將之前的中間結(jié)果暫時(shí)存儲(chǔ)起來,等待后續(xù)的事件數(shù)據(jù)過來后,可以使用之前的中間結(jié)果繼續(xù)計(jì)算。本文主要介紹Flink狀態(tài)計(jì)算和管理、代碼示例。
1、有狀態(tài)的計(jì)算
什么是Flink的有狀態(tài)的計(jì)算。在流式計(jì)算過程中將算子的中間結(jié)果保存在內(nèi)存或者文件系統(tǒng)中,等下一個(gè)事件進(jìn)入算子后可以從之前的狀態(tài)中獲取中間結(jié)果,以便計(jì)算當(dāng)前的結(jié)果,從而無(wú)需每次都基于全部的原始數(shù)據(jù)來統(tǒng)計(jì)結(jié)果,極大地提升了系統(tǒng)性能。
每一個(gè)具有一定復(fù)雜度的流計(jì)算應(yīng)用都是有狀態(tài)的,任何運(yùn)行基本業(yè)務(wù)邏輯的流處理應(yīng)用都需要在一定時(shí)間內(nèi)存儲(chǔ)所接受的事件或者中間結(jié)果。
2、狀態(tài)管理
Flink如何管理狀態(tài)?主要就是:本地訪問和存儲(chǔ)、容錯(cuò)性(可以自動(dòng)按一定的時(shí)間間隔產(chǎn)生快照,并且在任務(wù)失敗后進(jìn)行恢復(fù))。
狀態(tài)(State)操作是指需要把當(dāng)前數(shù)據(jù)和歷史計(jì)算結(jié)果進(jìn)行累加計(jì)算,即當(dāng)前數(shù)據(jù)的處理需要使用之前的數(shù)據(jù)或中間結(jié)果。
例如,對(duì)數(shù)據(jù)流中的實(shí)時(shí)單詞進(jìn)行計(jì)數(shù),每當(dāng)接收到新的單詞時(shí),需要將當(dāng)前單詞數(shù)量累加到之前的結(jié)果中。這里單詞的數(shù)量就是狀態(tài),對(duì)單詞數(shù)量的更新就是狀態(tài)的更新。如下圖:
狀態(tài)的計(jì)算模型,如下圖:
如下圖,Source、map()、keyBy()/window()/apply()算子的并行度為2,Sink算子的并行度為1。keyBy()/window()/apply()算子是有狀態(tài)的,并且map()與keyBy()/window()/apply()算子之間通過網(wǎng)絡(luò)進(jìn)行數(shù)據(jù)分發(fā)。
Flink應(yīng)用程序的狀態(tài)訪問都在本地進(jìn)行,這樣有助于提高吞吐量和降低延遲。通常情況下,F(xiàn)link應(yīng)用程序都是將狀態(tài)存儲(chǔ)在JVM堆內(nèi)存中,但如果狀態(tài)數(shù)據(jù)太大,也可以選擇將其以結(jié)構(gòu)化數(shù)據(jù)格式存儲(chǔ)在高速磁盤中。
通過狀態(tài)快照,F(xiàn)link能夠提供可容錯(cuò)的、精確一次的計(jì)算語(yǔ)義。Flink應(yīng)用程序在執(zhí)行時(shí)會(huì)獲取并存儲(chǔ)分布式Pipeline(流處理管道)中整體的狀態(tài),它會(huì)將數(shù)據(jù)源中消費(fèi)數(shù)據(jù)的偏移量記錄下來,并將整個(gè)作業(yè)圖中算子獲取到該數(shù)據(jù)(記錄的偏移量對(duì)應(yīng)的數(shù)據(jù))時(shí)的狀態(tài)記錄并存儲(chǔ)下來。
當(dāng)發(fā)生故障時(shí),F(xiàn)link作業(yè)會(huì)恢復(fù)上次存儲(chǔ)的狀態(tài),重置數(shù)據(jù)源從狀態(tài)中記錄的上次消費(fèi)的偏移量,開始重新進(jìn)行消費(fèi)處理。而且狀態(tài)快照在執(zhí)行時(shí)會(huì)異步獲取狀態(tài)并存儲(chǔ),并不會(huì)阻塞正在進(jìn)行的數(shù)據(jù)處理邏輯。這個(gè)機(jī)制跟Kafka等消息中間件的消費(fèi)方式很像。
Flink需要知道狀態(tài),以便可以使用Checkpoint和Savepoint來保證容錯(cuò)(下一篇會(huì)繼續(xù)介紹)。狀態(tài)還允許重新調(diào)整Flink應(yīng)用程序,這意味著Flink負(fù)責(zé)在并行實(shí)例之間重新分配狀態(tài)。
3、Keyed State
Keyed State是Flink提供的按照Key進(jìn)行分區(qū)的狀態(tài)機(jī)制。
在通過keyBy()分組的KeyedStream上使用,對(duì)每個(gè)Key的數(shù)據(jù)進(jìn)行狀態(tài)存儲(chǔ)和管理,狀態(tài)是跟每個(gè)Key綁定的,即每個(gè)Key對(duì)應(yīng)一個(gè)狀態(tài)對(duì)象。
Keyed State支持的狀態(tài)數(shù)據(jù)類型如下:ValueState、ListState、ReducingState、AggregatingState<IN, OUT>、MapState<UK, UV>。下文以ValueState為例,介紹如何使用。
4、狀態(tài)管理示例和代碼
我們來模擬這樣一個(gè)場(chǎng)景:如果某個(gè)用戶1分鐘內(nèi)連續(xù)兩次退款,第二次則發(fā)出告警。
模擬訂單對(duì)象:
public class OrderBO {
/**
* ID
*/
private Integer id;
/***
* 訂單標(biāo)題
*/
private String title;
/**
* 訂單金額
*/
private Integer amount;
/**
* 訂單狀態(tài):1-已支付,2-已退款
*/
private Integer state;
/**
* 用戶ID
*/
private String userId;
}
利用狀態(tài)管理,處理告警邏輯:
/**
* 告警處理邏輯
**/
private static class AlarmLogic extends KeyedProcessFunction<String, OrderBO, OrderBO> {
// 是否已經(jīng)出現(xiàn)退款的標(biāo)記
private ValueState<Boolean> flagState;
// 定時(shí)器,時(shí)間到了會(huì)清掉狀態(tài)
private ValueState<Long> timerState;
private static final long ONE_MINUTE = 60 * 1000;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
"timer-state",
Types.LONG);
timerState = getRuntimeContext().getState(timerDescriptor);
}
@Override
public void processElement(OrderBO value, KeyedProcessFunction<String, OrderBO, OrderBO>.Context context, Collector<OrderBO> collector) throws Exception {
Boolean refundFlag = flagState.value();
// 如果已經(jīng)退款過一次了,如果再出現(xiàn)退款則發(fā)射給下個(gè)算子,然后清理掉定時(shí)器。狀態(tài)2代表退款
if (refundFlag != null && refundFlag) {
if (value.getState() == 2) {
collector.collect(value);
}
cleanUp(context);
} else {
// 如果第一次出現(xiàn)退款,則寫入狀態(tài),同時(shí)開啟定時(shí)器。狀態(tài)2代表退款
if (value.getState() == 2) {
flagState.update(true);
long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
context.timerService().registerProcessingTimeTimer(timer);
timerState.update(timer);
}
}
}
/**
* 定時(shí)器到了之后,清理狀態(tài)值
*/
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, OrderBO, OrderBO>.OnTimerContext ctx, Collector<OrderBO> out) throws Exception {
timerState.clear();
flagState.clear();
}
/**
* 手動(dòng)清理狀態(tài)值
*
* @param ctx
* @throws Exception
*/
private void cleanUp(Context ctx) throws Exception {
Long timer = timerState.value();
ctx.timerService().deleteProcessingTimeTimer(timer);
timerState.clear();
flagState.clear();
}
}
模式生成數(shù)據(jù)和主流程代碼:
public static void main(String[] args) throws Exception {
// 1、執(zhí)行環(huán)境創(chuàng)建
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
// 2、讀取Socket數(shù)據(jù)端口。實(shí)際根據(jù)具體業(yè)務(wù)對(duì)接數(shù)據(jù)來源
DataStreamSource<String> orderStream = environment.socketTextStream("localhost", 9527);
// 3、數(shù)據(jù)讀取個(gè)切割方式
SingleOutputStreamOperator<OrderBO> resultDataStream = orderStream
.flatMap(new CleanDataAnd2Order()) // 清洗和處理數(shù)據(jù)
.keyBy(x -> x.getUserId()) // 分區(qū)
.process(new AlarmLogic()); // 處理告警邏輯
// 4、打印分析結(jié)果
resultDataStream.print("告警===>");
// 5、環(huán)境啟動(dòng)
environment.execute("OrderAlarmApp");
}
模擬數(shù)據(jù):
模擬場(chǎng)景:某個(gè)用戶1分鐘內(nèi)連續(xù)兩次退款,第二次發(fā)出告警。
示例數(shù)據(jù):
1,aaa,100,1,user1
2,bbb,200,1,user2
3,ccc,300,2,user1
4,ddd,400,2,user1
5,ddd,400,2,user1
6,bbb,200,2,user2
7,bbb,400,2,user2
完整代碼地址:https://github.com/yclxiao/flink-blog/blob/7eb84d18aa71d8f2023d6158796de34d331b9b3f/src/main/java/top/mangod/flinkblog/demo005/OrderAlarmApp.java#L43