自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink 中的 Savepoint 和 Checkpoint 有什么區(qū)別?

大數(shù)據(jù)
Savepoint 和Checkpoint 都是 Flink 中用于狀態(tài)持久化和恢復(fù)的機(jī) 制,但它們?cè)谀康?、觸發(fā)方式、生命周期管理和使用場(chǎng)景上有著明顯區(qū)別。

Savepoint(保存點(diǎn))和Checkpoint(檢查點(diǎn))都是Flink中用于狀態(tài)持久化和恢復(fù)的機(jī)制,但它們?cè)谀康?、觸發(fā)方式、生命周期管理和使用場(chǎng)景上有著明顯區(qū)別。

一、基本概念

1. Checkpoint(檢查點(diǎn))

檢查點(diǎn)是Flink自動(dòng)觸發(fā)的狀態(tài)快照,用于故障恢復(fù),主要特點(diǎn):

  • 自動(dòng)創(chuàng)建:由Flink定期自動(dòng)觸發(fā)
  • 生命周期:通常在作業(yè)運(yùn)行期間有限存在,舊的檢查點(diǎn)會(huì)被新的覆蓋 
  • 主要目的:實(shí)現(xiàn)容錯(cuò)機(jī)制,在故障發(fā)生時(shí)能夠恢復(fù)到最近的一致狀態(tài)
  • 存儲(chǔ)格式:針對(duì)性能優(yōu)化,可能使用增量存儲(chǔ)機(jī)制

2. Savepoint(保存點(diǎn))

保存點(diǎn)是用戶手動(dòng)觸發(fā)的狀態(tài)快照,用于有計(jì)劃的作業(yè)升級(jí)或維護(hù),主要特點(diǎn):

  • 手動(dòng)創(chuàng)建:由用戶通過命令或API手動(dòng)觸發(fā) 
  • 生命周期:長(zhǎng)期存在,直到用戶明確刪除 
  • 主要目的:應(yīng)用版本升級(jí)、集群遷移、A/B測(cè)試等有計(jì)劃的操作 
  • 存儲(chǔ)格式:更加完整和自包含,確保長(zhǎng)期兼容性

二、詳細(xì)對(duì)比

三、在FlinkSQL 中配置和使用

1. Checkpoint 配置

1.SQL Client 模式
-- 啟用檢查點(diǎn),每10秒觸發(fā)一次
SET 'execution.checkpointing.interval' = '10s';
-- 設(shè)置檢查點(diǎn)模式(EXACTLY_ONCE/AT_LEAST_ONCE)
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
-- 設(shè)置檢查點(diǎn)超時(shí)時(shí)間
SET 'execution.checkpointing.timeout' = '5min';
-- 設(shè)置檢查點(diǎn)最小間隔
SET 'execution.checkpointing.min-pause' = '1s';
-- 設(shè)置同時(shí)進(jìn)行的檢查點(diǎn)數(shù)量上限
SET 'execution.checkpointing.max-concurrent-checkpoints' = '1';
-- 設(shè)置檢查點(diǎn)存儲(chǔ)位置
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
-- 設(shè)置保留的檢查點(diǎn)數(shù)量
SET 'state.checkpoints.num-retained' = '5';
-- 作業(yè)取消時(shí)保留檢查點(diǎn)
SET 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION';
2. Java API模式
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每10秒觸發(fā)一次檢查點(diǎn)
env.enableCheckpointing(10000);
// 獲取檢查點(diǎn)配置
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
// 設(shè)置模式為EXACTLY_ONCE
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 設(shè)置檢查點(diǎn)超時(shí)時(shí)間
checkpointConfig.setCheckpointTimeout(300000);
// 設(shè)置最小間隔時(shí)間
checkpointConfig.setMinPauseBetweenCheckpoints(1000);
// 設(shè)置最大并發(fā)檢查點(diǎn)數(shù)
checkpointConfig.setMaxConcurrentCheckpoints(1);
// 設(shè)置外部化檢查點(diǎn)的清理行為
checkpointConfig.enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 設(shè)置檢查點(diǎn)存儲(chǔ)
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints"));

2. Savepoint 操作

1. 創(chuàng)建Savepoint


使用Flink CLI:
# 為作業(yè)創(chuàng)建保存點(diǎn)
flink savepoint :jobId [:targetDirectory]
# 例如
flink savepoint 1234567890abcdef hdfs:///flink/savepoints


使用REST API:
# 發(fā)送POST請(qǐng)求到作業(yè)管理器
curl -X POST "http://jobmanager:8081/jobs/:jobId/savepoints" \
  -d '{"target-directory": "hdfs:///flink/savepoints", "cancel-job": false}'


2. 從Savepoint恢復(fù)


使用Flink CLI:
# 從保存點(diǎn)恢復(fù)作業(yè)
flink run -s :savepointPath [:runArgs]
# 例如
flink run -s hdfs:///flink/savepoints/savepoint-1234567-aabbccdd jarfile.jar


使用SQL Client:
-- 設(shè)置從保存點(diǎn)恢復(fù)
SET 'execution.savepoint.path' = 'hdfs:///flink/savepoints/savepoint-1234567-aabbccdd';
-- 執(zhí)行SQL任務(wù)
INSERT INTO target_table SELECT * FROM source_table;


使用Java API:


StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 設(shè)置從保存點(diǎn)恢復(fù)
env.setRestartStrategy(RestartStrategies.noRestart());
env.setStateBackend(new FsStateBackend("hdfs:///flink/checkpoints"));
// 指定保存點(diǎn)路徑
String savepointPath = "hdfs:///flink/savepoints/savepoint-1234567-aabbccdd";
Configuration configuration = new Configuration();
configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath);
env.configure(configuration);
// 構(gòu)建作業(yè)并執(zhí)行
// ...
env.execute("Job restored from savepoint");

四、常見應(yīng)用場(chǎng)景

1. Checkpoint 應(yīng)用場(chǎng)景 

  • 故障自動(dòng)恢復(fù): – 節(jié)點(diǎn)崩潰時(shí),作業(yè)自動(dòng)從最近的檢查點(diǎn)恢復(fù) – 網(wǎng)絡(luò)分區(qū)時(shí),保持?jǐn)?shù)據(jù)一致性 
  • 保證數(shù)據(jù)處理的一致性: – 實(shí)現(xiàn)exactly-once 或at-least-once 語義 – 確保在失敗時(shí)不丟失狀態(tài) 
  • 處理反壓和資源限制: – 當(dāng)系統(tǒng)出現(xiàn)反壓時(shí)可以從檢查點(diǎn)恢復(fù) – 資源緊張時(shí)重新平衡負(fù)載

2. Savepoint 應(yīng)用場(chǎng)景 

(1) 應(yīng)用升級(jí)和版本遷移:

① 在升級(jí)前備份當(dāng)前狀態(tài) -- (通過Flink CLI創(chuàng)建保存點(diǎn))

flink savepoint 1234567890abcdef hdfs:///flink/savepoints

② 停止當(dāng)前作業(yè) 

flink cancel 1234567890abcdef

③ 部署新版本代碼 

④ 從保存點(diǎn)恢復(fù)到新版本 

flink run -s hdfs:///flink/savepoints/savepoint-1234567-aabbccdd new-version.jar

(2) 集群遷移或擴(kuò)展: 

 ① 在原集群創(chuàng)建保存點(diǎn) 

flink savepoint 1234567890abcdef hdfs:///flink/savepoints

② 在新集群使用保存點(diǎn)恢復(fù)作業(yè) 

flink run -s hdfs:///flink/savepoints/savepoint-1234567-aabbccdd jarfile.jar

(3) A/B 測(cè)試和算法切換: 

# 創(chuàng)建保存點(diǎn) 
flink savepoint 1234567890abcdef hdfs:///flink/savepoints
# 使用保存點(diǎn)啟動(dòng)算法A的實(shí)現(xiàn)
flink run -s hdfs:///flink/savepoints/savepoint-1234567-aabbccdd algorithm-a.jar 
# 使用同一保存點(diǎn)啟動(dòng)算法B的實(shí)現(xiàn)進(jìn)行比較 
flink run -s hdfs:///flink/savepoints/savepoint-1234567-aabbccdd algorithm-b.jar

(4) 生產(chǎn)環(huán)境回滾:

# 當(dāng)新版本出現(xiàn)問題時(shí),使用之前的保存點(diǎn)回滾 
flink run -s hdfs:///flink/savepoints/savepoint-previous-version old-version.jar
責(zé)任編輯:趙寧寧 來源: 大數(shù)據(jù)技能圈
相關(guān)推薦

2022-08-31 08:33:54

Bash操作系統(tǒng)Linux

2020-08-02 23:20:36

JavaScriptmap()forEach()

2022-09-02 09:02:44

TypeInterface

2021-03-27 10:56:17

promisethenfinally

2023-11-01 08:08:47

PythonIS運(yùn)算符

2022-12-14 17:26:43

2020-03-09 20:56:19

LoRaLoRaWAN無線技術(shù)

2022-06-06 14:53:02

LoRaLoRaWAN

2022-09-07 18:32:57

并發(fā)編程線程

2020-11-09 14:07:53

PyQtQt編程

2022-09-08 18:38:26

LinuxWindowsmacOS

2023-10-27 08:23:10

CookieWeb存儲(chǔ)

2023-11-14 14:13:52

SQLNoSQLCAP

2021-12-17 14:40:02

while(1)for(;;)語言

2022-02-27 15:33:22

安全CASBSASE

2021-05-16 14:26:08

RPAIPACIO

2022-08-02 08:23:37

SessionCookies

2024-03-05 18:59:59

前端開發(fā)localhost

2024-09-09 13:10:14

2024-05-27 00:40:00

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)