自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

五分鐘了解Flink狀態(tài)管理

開發(fā) 架構(gòu)
本文主要介紹了Flink的狀態(tài)和狀態(tài)管理,以及Demo和相關(guān)代碼,希望對(duì)你有幫助!

什么叫做Flink的有狀態(tài)計(jì)算呢?說白了就是將之前的中間結(jié)果暫時(shí)存儲(chǔ)起來,等待后續(xù)的事件數(shù)據(jù)過來后,可以使用之前的中間結(jié)果繼續(xù)計(jì)算。本文主要介紹Flink狀態(tài)計(jì)算和管理、代碼示例。

1、有狀態(tài)的計(jì)算

什么是Flink的有狀態(tài)的計(jì)算。在流式計(jì)算過程中將算子的中間結(jié)果保存在內(nèi)存或者文件系統(tǒng)中,等下一個(gè)事件進(jìn)入算子后可以從之前的狀態(tài)中獲取中間結(jié)果,以便計(jì)算當(dāng)前的結(jié)果,從而無(wú)需每次都基于全部的原始數(shù)據(jù)來統(tǒng)計(jì)結(jié)果,極大地提升了系統(tǒng)性能。

每一個(gè)具有一定復(fù)雜度的流計(jì)算應(yīng)用都是有狀態(tài)的,任何運(yùn)行基本業(yè)務(wù)邏輯的流處理應(yīng)用都需要在一定時(shí)間內(nèi)存儲(chǔ)所接受的事件或者中間結(jié)果。

2、狀態(tài)管理

Flink如何管理狀態(tài)?主要就是:本地訪問和存儲(chǔ)、容錯(cuò)性(可以自動(dòng)按一定的時(shí)間間隔產(chǎn)生快照,并且在任務(wù)失敗后進(jìn)行恢復(fù))。

狀態(tài)(State)操作是指需要把當(dāng)前數(shù)據(jù)和歷史計(jì)算結(jié)果進(jìn)行累加計(jì)算,即當(dāng)前數(shù)據(jù)的處理需要使用之前的數(shù)據(jù)或中間結(jié)果。

例如,對(duì)數(shù)據(jù)流中的實(shí)時(shí)單詞進(jìn)行計(jì)數(shù),每當(dāng)接收到新的單詞時(shí),需要將當(dāng)前單詞數(shù)量累加到之前的結(jié)果中。這里單詞的數(shù)量就是狀態(tài),對(duì)單詞數(shù)量的更新就是狀態(tài)的更新。如下圖:

狀態(tài)的計(jì)算模型,如下圖:

如下圖,Source、map()、keyBy()/window()/apply()算子的并行度為2,Sink算子的并行度為1。keyBy()/window()/apply()算子是有狀態(tài)的,并且map()與keyBy()/window()/apply()算子之間通過網(wǎng)絡(luò)進(jìn)行數(shù)據(jù)分發(fā)。

Flink應(yīng)用程序的狀態(tài)訪問都在本地進(jìn)行,這樣有助于提高吞吐量和降低延遲。通常情況下,F(xiàn)link應(yīng)用程序都是將狀態(tài)存儲(chǔ)在JVM堆內(nèi)存中,但如果狀態(tài)數(shù)據(jù)太大,也可以選擇將其以結(jié)構(gòu)化數(shù)據(jù)格式存儲(chǔ)在高速磁盤中。

通過狀態(tài)快照,F(xiàn)link能夠提供可容錯(cuò)的、精確一次的計(jì)算語(yǔ)義。Flink應(yīng)用程序在執(zhí)行時(shí)會(huì)獲取并存儲(chǔ)分布式Pipeline(流處理管道)中整體的狀態(tài),它會(huì)將數(shù)據(jù)源中消費(fèi)數(shù)據(jù)的偏移量記錄下來,并將整個(gè)作業(yè)圖中算子獲取到該數(shù)據(jù)(記錄的偏移量對(duì)應(yīng)的數(shù)據(jù))時(shí)的狀態(tài)記錄并存儲(chǔ)下來。

當(dāng)發(fā)生故障時(shí),F(xiàn)link作業(yè)會(huì)恢復(fù)上次存儲(chǔ)的狀態(tài),重置數(shù)據(jù)源從狀態(tài)中記錄的上次消費(fèi)的偏移量,開始重新進(jìn)行消費(fèi)處理。而且狀態(tài)快照在執(zhí)行時(shí)會(huì)異步獲取狀態(tài)并存儲(chǔ),并不會(huì)阻塞正在進(jìn)行的數(shù)據(jù)處理邏輯。這個(gè)機(jī)制跟Kafka等消息中間件的消費(fèi)方式很像。

Flink需要知道狀態(tài),以便可以使用Checkpoint和Savepoint來保證容錯(cuò)(下一篇會(huì)繼續(xù)介紹)。狀態(tài)還允許重新調(diào)整Flink應(yīng)用程序,這意味著Flink負(fù)責(zé)在并行實(shí)例之間重新分配狀態(tài)。

3、Keyed State

Keyed State是Flink提供的按照Key進(jìn)行分區(qū)的狀態(tài)機(jī)制。

在通過keyBy()分組的KeyedStream上使用,對(duì)每個(gè)Key的數(shù)據(jù)進(jìn)行狀態(tài)存儲(chǔ)和管理,狀態(tài)是跟每個(gè)Key綁定的,即每個(gè)Key對(duì)應(yīng)一個(gè)狀態(tài)對(duì)象。

Keyed State支持的狀態(tài)數(shù)據(jù)類型如下:ValueState、ListState、ReducingState、AggregatingState<IN, OUT>、MapState<UK, UV>。下文以ValueState為例,介紹如何使用。

4、狀態(tài)管理示例和代碼

我們來模擬這樣一個(gè)場(chǎng)景:如果某個(gè)用戶1分鐘內(nèi)連續(xù)兩次退款,第二次則發(fā)出告警。

模擬訂單對(duì)象:

public class OrderBO {
    /**
     * ID
     */
    private Integer id;
    /***
     * 訂單標(biāo)題
     */
    private String title;
    /**
     * 訂單金額
     */
    private Integer amount;
    /**
     * 訂單狀態(tài):1-已支付,2-已退款
     */
    private Integer state;
    /**
     * 用戶ID
     */
    private String userId;
}

利用狀態(tài)管理,處理告警邏輯:

/**
* 告警處理邏輯
**/
private static class AlarmLogic extends KeyedProcessFunction<String, OrderBO, OrderBO> {
    // 是否已經(jīng)出現(xiàn)退款的標(biāo)記
    private ValueState<Boolean> flagState;
    // 定時(shí)器,時(shí)間到了會(huì)清掉狀態(tài)
    private ValueState<Long> timerState;
    private static final long ONE_MINUTE = 60 * 1000;

    @Override
    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);

        ValueStateDescriptor<Long> timerDescriptor = new ValueStateDescriptor<>(
                "timer-state",
                Types.LONG);
        timerState = getRuntimeContext().getState(timerDescriptor);
    }

    @Override
    public void processElement(OrderBO value, KeyedProcessFunction<String, OrderBO, OrderBO>.Context context, Collector<OrderBO> collector) throws Exception {
        Boolean refundFlag = flagState.value();

        // 如果已經(jīng)退款過一次了,如果再出現(xiàn)退款則發(fā)射給下個(gè)算子,然后清理掉定時(shí)器。狀態(tài)2代表退款
        if (refundFlag != null && refundFlag) {
            if (value.getState() == 2) {
                collector.collect(value);
            }
            cleanUp(context);
        } else {
            // 如果第一次出現(xiàn)退款,則寫入狀態(tài),同時(shí)開啟定時(shí)器。狀態(tài)2代表退款
            if (value.getState() == 2) {
                flagState.update(true);
                long timer = context.timerService().currentProcessingTime() + ONE_MINUTE;
                context.timerService().registerProcessingTimeTimer(timer);
                timerState.update(timer);
            }
        }
    }

    /**
     * 定時(shí)器到了之后,清理狀態(tài)值
     */
    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, OrderBO, OrderBO>.OnTimerContext ctx, Collector<OrderBO> out) throws Exception {
        timerState.clear();
        flagState.clear();
    }

    /**
     * 手動(dòng)清理狀態(tài)值
     *
     * @param ctx
     * @throws Exception
     */
    private void cleanUp(Context ctx) throws Exception {
        Long timer = timerState.value();
        ctx.timerService().deleteProcessingTimeTimer(timer);

        timerState.clear();
        flagState.clear();
    }
}

模式生成數(shù)據(jù)和主流程代碼:

public static void main(String[] args) throws Exception {
    // 1、執(zhí)行環(huán)境創(chuàng)建
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    // 2、讀取Socket數(shù)據(jù)端口。實(shí)際根據(jù)具體業(yè)務(wù)對(duì)接數(shù)據(jù)來源
    DataStreamSource<String> orderStream = environment.socketTextStream("localhost", 9527);
    // 3、數(shù)據(jù)讀取個(gè)切割方式
    SingleOutputStreamOperator<OrderBO> resultDataStream = orderStream
            .flatMap(new CleanDataAnd2Order()) // 清洗和處理數(shù)據(jù)
            .keyBy(x -> x.getUserId()) // 分區(qū)
            .process(new AlarmLogic()); // 處理告警邏輯

    // 4、打印分析結(jié)果
    resultDataStream.print("告警===>");
    // 5、環(huán)境啟動(dòng)
    environment.execute("OrderAlarmApp");
}

模擬數(shù)據(jù):

模擬場(chǎng)景:某個(gè)用戶1分鐘內(nèi)連續(xù)兩次退款,第二次發(fā)出告警。
示例數(shù)據(jù):
1,aaa,100,1,user1
2,bbb,200,1,user2
3,ccc,300,2,user1
4,ddd,400,2,user1

5,ddd,400,2,user1
6,bbb,200,2,user2
7,bbb,400,2,user2

完整代碼地址:https://github.com/yclxiao/flink-blog/blob/7eb84d18aa71d8f2023d6158796de34d331b9b3f/src/main/java/top/mangod/flinkblog/demo005/OrderAlarmApp.java#L43

責(zé)任編輯:趙寧寧 來源: 不焦躁的程序員
相關(guān)推薦

2009-11-05 14:53:54

Visual Stud

2021-10-19 07:27:08

HTTP代理網(wǎng)絡(luò)

2022-12-16 09:55:50

網(wǎng)絡(luò)架構(gòu)OSI

2024-06-25 12:25:12

LangChain路由鏈

2024-05-13 09:28:43

Flink SQL大數(shù)據(jù)

2009-10-26 15:45:43

VB.NET類構(gòu)造

2009-11-06 10:25:34

WCF元數(shù)據(jù)交換

2024-09-23 17:05:44

2020-02-19 19:26:27

K8S開源平臺(tái)容器技術(shù)

2020-05-12 09:10:24

瀏覽器服務(wù)器網(wǎng)絡(luò)

2023-07-26 07:11:50

LVM底層抽象

2024-08-13 11:13:18

2009-11-02 18:07:58

Oracle數(shù)據(jù)庫(kù)

2020-03-06 10:45:48

機(jī)器學(xué)習(xí)人工智能神經(jīng)網(wǎng)絡(luò)

2021-09-18 11:36:38

混沌工程云原生故障

2024-04-28 12:55:46

redis頻道機(jī)制

2023-12-12 08:00:50

節(jié)點(diǎn)哈希算法

2009-11-16 10:53:30

Oracle Hint

2024-12-11 07:00:00

面向?qū)ο?/a>代碼

2025-03-13 06:22:59

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)