得物自建DTS平臺(tái)的技術(shù)演進(jìn)
前言
DTS是數(shù)據(jù)傳輸平臺(tái)(Data Transfer Platform的縮寫(xiě))
隨著得物App的用戶流量增長(zhǎng),業(yè)務(wù)選擇的數(shù)據(jù)庫(kù)越來(lái)越多樣化,異構(gòu)數(shù)據(jù)源之間的數(shù)據(jù)同步需求也逐漸增多。為了控制成本并更好地支持業(yè)務(wù)發(fā)展,我們決定自建DTS平臺(tái)。本文主要從技術(shù)選型、能力支持與演化的角度出發(fā),分享了在DTS平臺(tái)升級(jí)過(guò)程中獲得的經(jīng)驗(yàn),并提供一些參考。
1技術(shù)選型
DTS的主要目標(biāo)是支持不同類型的數(shù)據(jù)源之間的數(shù)據(jù)交互,包括關(guān)系型數(shù)據(jù)庫(kù)(RDBMS)、NoSQL數(shù)據(jù)庫(kù)、OLAP等,同時(shí)整合了數(shù)據(jù)庫(kù)配置管理、數(shù)據(jù)訂閱、數(shù)據(jù)同步、數(shù)據(jù)遷移、DRC雙活數(shù)據(jù)同步支持、數(shù)據(jù)巡檢、監(jiān)控報(bào)警、統(tǒng)一權(quán)限等多個(gè)模塊,以構(gòu)建安全、可擴(kuò)展、高可用的數(shù)據(jù)架構(gòu)平臺(tái)。
1.1 能力對(duì)比
圖片
1.2 DTS 1.0 - 以 canal/otter/datax 作為執(zhí)行引擎
圖片
1.3 為什么要切換到Flink?
為了支持多種讀端數(shù)據(jù)源和寫(xiě)端數(shù)據(jù)源,需要一個(gè)統(tǒng)一數(shù)據(jù)處理框架,以減少重復(fù)組件和提高開(kāi)發(fā)效率。同時(shí)數(shù)據(jù)源類型和組件的維護(hù)難度與復(fù)雜度呈線性增長(zhǎng),現(xiàn)有的組件需要統(tǒng)一維護(hù)到一個(gè)項(xiàng)目中。
Canal和Otter等組件的社區(qū)活躍度低,很長(zhǎng)時(shí)間沒(méi)有得到維護(hù)更新。因此,需要選擇一個(gè)新的、活躍的框架。此外,現(xiàn)有組件也無(wú)法有效支持全量+增量一體化的操作。
因此,使用一個(gè)統(tǒng)一的數(shù)據(jù)處理框架,能夠同時(shí)支持多種讀端數(shù)據(jù)源和寫(xiě)端數(shù)據(jù)源,以及全量+增量一體化的功能,是必要的。這樣能夠降低組件的維護(hù)難度和復(fù)雜度,提高開(kāi)發(fā)效率。
通過(guò)DTS 2.0,我們希望將canal/otter/datax演化為一個(gè)任務(wù)執(zhí)行框架+管理平臺(tái),能夠?yàn)楹罄m(xù)大量數(shù)據(jù)源迭代提速。
1.4 DTS 2.0 以Flink作為執(zhí)行引擎
現(xiàn)有的開(kāi)發(fā)流程:
- 統(tǒng)一的任務(wù)執(zhí)行框架,集成flink并引入connectors根據(jù)配置組裝出具體的DTS任務(wù)
- 維護(hù)并研發(fā)新的 connector
當(dāng)我們需要支持新的數(shù)據(jù)源, 首先將數(shù)據(jù)源相關(guān)插件維護(hù)在connector中,接著在執(zhí)行框架中引入需要的組件,其中存在大量的可復(fù)用的功能,這樣就做到了connector及功能組件復(fù)用的效果。
2DTS 現(xiàn)有能力
圖片
3我們做了什么?
3.1 DTS Connectors框架 - 數(shù)據(jù)源支持提速
在Flink CDC基礎(chǔ)上實(shí)現(xiàn)的全量/增量任務(wù)同步框架,基本的架構(gòu)如下
圖片
其中Connector中分別實(shí)現(xiàn)了Flink提供的SourceFunction和SinkFunction函數(shù),分別負(fù)責(zé)從讀端讀取數(shù)據(jù),往寫(xiě)端寫(xiě)入數(shù)據(jù),因此一個(gè)Connector可同時(shí)存在于上游或者下游。
任務(wù)的啟動(dòng)流程:
- 指定任務(wù)Json配置, 根據(jù)類型加載SourceFunction和SinkFunction構(gòu)建通用能力函數(shù)并啟動(dòng)
a. 任務(wù)的Main函數(shù)如下所示, 根據(jù)如下的Json文件加載到對(duì)應(yīng)的Connector中的SourceFactory或者SinkFactory來(lái)構(gòu)造對(duì)應(yīng)的DataStream。
DataStream是Flink中提供的數(shù)據(jù)流操作類
public class Main {
public static void main(String[] args) throws Exception {
// 解析參數(shù)
ParameterTool parameterTool = ParameterTool.fromArgs(args);
String[] parsedArgs = parseArgs(parameterTool);
Options options = new OptionParser(parsedArgs).getOptions();
options.setJobName(options.getJobName());
// 執(zhí)行任務(wù)
StreamExecutionEnvironment environment =
EnvFactory.createStreamExecutionEnvironment(options);
exeJob(environment, options);
}
任務(wù)Json配置:
{
"job":{
"content":{
"reader":{
"name":"binlogreader",
"parameter":{
"accessKey":"",
"binlogOssApiUrl":"",
"delayBetweenRestartAttempts":2000,
"fetchSize":1,
"instanceId":"",
"rdsPlatform":"",
"restartAttempts":5,
"secretKey":"",
"serverTimezone":"",
"splitSize":1024,
"startupMode":"LATEST_OFFSET"
}
},
"writer":{
"name":"jdbcwriter",
"parameter":{
"batchSize":10000,
"concurrentWrite":true,
],
"dryRun":false,
"dumpCommitData":false,
"errorRecord":0,
"flushIntervalMills":30000,
"poolSize":10,
"retries":3,
"smallBatchSize":200
}
}
},
}
}
b. 我們提供了兩個(gè)抽象工廠類,SourceFactory, SinkFactory, 其中的createSource, createSink便是子工廠需要實(shí)現(xiàn)的方法,不同的數(shù)據(jù)源實(shí)現(xiàn)不同。
public abstract class SourceFactory<T> {
public abstract DataStream<T> createSource();
}
public abstract class SinkFactory<T> {
public abstract void createSink(DataStream<T> rowData) throws Exception;
}
c. 接下來(lái),我們只需要實(shí)現(xiàn)對(duì)應(yīng)的子工廠方法就可以了
public class BinlogSourceFactory extends AbstractJdbcSourceFactory {
@Override
public DataStream<TableRowData> createSource() {
List<String> tables = this.binlogSourceConf.getConnection().getTable();
Set<String> databaseList = new HashSet<>(2);
// 使用對(duì)應(yīng)的Connector構(gòu)建DataStream
}
}
d. 通用能力函數(shù):RateLimitFunction, BinlogPositionFunction 其中分別實(shí)現(xiàn)了對(duì)應(yīng)的任務(wù)能力,例如限流,任務(wù)位點(diǎn)保存等。
public class RateLimiterMapFunction<T> extends RichMapFunction<T, T> {
private transient FlinkConnectorRateLimiter rateLimiter;
@Override
public T map(T value) throws Exception {
if (rateLimiterEnabled) {
rateLimiter.acquire(1);
}
return value;
}
當(dāng)任務(wù)所需的函數(shù)都創(chuàng)建完成后,任務(wù)就真正開(kāi)始運(yùn)行了。
收益:
3.2 RDS日志獲取
DTS通過(guò)提供增量和全量同步能力為業(yè)務(wù)提供數(shù)據(jù)同步功能,但在增量訂閱/同步任務(wù)執(zhí)行過(guò)程中,可能會(huì)遇到一些異常情況。其中,以下三種情況需要特別處理:
- Binlog可用性
云廠商的數(shù)據(jù)庫(kù)實(shí)例本地binlog有效期8小時(shí),過(guò)期部分進(jìn)行OSS備份。MySQL業(yè)務(wù)高峰期或者DDL變產(chǎn)生大量的binlog, DTS任務(wù)嘗試獲取過(guò)期數(shù)據(jù)失敗,任務(wù)因此中斷。因此,DTS支持了本地binlog+OSS備份binlog的獲取及切換,保障日志可用性。
- 數(shù)據(jù)庫(kù)實(shí)例主從切換
RDS經(jīng)常會(huì)發(fā)生主備節(jié)點(diǎn)切換,在切換的過(guò)程中要保證數(shù)據(jù)不丟。由于切換前后兩個(gè)數(shù)據(jù)庫(kù)實(shí)例 Binlog 文件一般都是不一致的,此時(shí)任務(wù)位點(diǎn)記錄方式是 BinlogPosition 模式,則在切換之后任務(wù)需要自動(dòng)進(jìn)行 Binlog 對(duì)齊操作,進(jìn)而保證數(shù)據(jù)的完整性。將新數(shù)據(jù)實(shí)例上的位點(diǎn)查詢時(shí)間戳提前1-2分鐘即可。
- 讀實(shí)例訂閱支持
DTS任務(wù)binlog dump連接數(shù)過(guò)多造成主庫(kù)壓力及影響DDL變更,因此需要支持讀庫(kù)訂閱。云廠商的讀庫(kù)不提供備份,在讀庫(kù)日志過(guò)期時(shí)需要切換到主庫(kù)進(jìn)行讀取。
3.3 全量增量一體化功能
圖片
全量增量一體化是指先同步存量數(shù)據(jù),待存量結(jié)束之后再開(kāi)始同步增量數(shù)據(jù)。其中也加入了增量階段的OSS備份日志獲取。但存量階段依然存在一些問(wèn)題,需要進(jìn)一步改造優(yōu)化。
全量模式下新增表先進(jìn)行存量數(shù)據(jù)同步再進(jìn)行增量數(shù)據(jù)同步,該任務(wù)中已存在的表會(huì)因此導(dǎo)致數(shù)據(jù)延遲。待新增表數(shù)據(jù)同步完成,任務(wù)延遲則會(huì)恢復(fù)正常。 |
3.4 數(shù)據(jù)源接入- starrocks, postgres等
支持從mysql同步到starrocks和postgres, 在任務(wù)執(zhí)行框架的基礎(chǔ)上,只需要開(kāi)發(fā)starrocks-connector, postgres connector支持對(duì)應(yīng)的數(shù)據(jù)源即可。其中的其他能力,像多表同步、分庫(kù)分表等場(chǎng)景都可以達(dá)到復(fù)用的效果。
3.5 JBDC寫(xiě)入改造
腳本擴(kuò)展和動(dòng)態(tài)表名路由:
圖片
數(shù)據(jù)合并和多線程寫(xiě)入:
圖片
3.6 監(jiān)控告警
DTS任務(wù)需要采集flink任務(wù)指標(biāo),主要包括任務(wù)延遲、各個(gè)算子階段的寫(xiě)入速率,算子被壓及使用率等。其中 任務(wù)延遲需要接入告警服務(wù),于是我們選擇了引入redis來(lái)緩存任務(wù)的延遲時(shí)間,再上報(bào)到告警服務(wù)來(lái)完成飛書(shū)的消息和電話告警。
4最佳實(shí)踐
4.1 0000-00-00 00:00:00時(shí)間戳的問(wèn)題
MySQL的時(shí)間戳允許為0000-00-00 00:00:00, 在Flink任務(wù)中通常會(huì)被轉(zhuǎn)換為null, 導(dǎo)致寫(xiě)入下游數(shù)據(jù)源失敗, 因此需要做特殊標(biāo)記對(duì)于不同的數(shù)據(jù)源做不同的轉(zhuǎn)化保證寫(xiě)入的正切行。
4.2 Flink CDC任務(wù)serverId唯一性
Flink CDC source 會(huì)偽裝成 MySQL slave節(jié)點(diǎn),為了保證數(shù)據(jù)的準(zhǔn)確性,每個(gè)slave必須擁有唯一的serverId來(lái)標(biāo)記該slave的唯一性。因此在flink cdc的任務(wù)中我們?yōu)槊恳粋€(gè)任務(wù)分配了一個(gè)唯一的serverId區(qū)間(范圍區(qū)間是為了支持多并行度)。
4.3 Flink任務(wù)數(shù)據(jù)序列化瓶頸
在flink任務(wù)中使用DataStreamAPI并使用比較復(fù)雜的數(shù)據(jù)結(jié)構(gòu)進(jìn)行傳輸時(shí),算子之間的序列化成本較高,兩個(gè)方向,一是建立更為高效的數(shù)據(jù)結(jié)構(gòu)進(jìn)行傳輸,二是開(kāi)啟flink對(duì)象復(fù)用,并盡可能減少不同并行度之間的數(shù)據(jù)傳輸。
5未來(lái)演進(jìn)
DTS作為一個(gè)數(shù)據(jù)同步平臺(tái)主要功能是盡可能提供高效的數(shù)據(jù)源同步功能,助力于多變的業(yè)務(wù)場(chǎng)景。
5.1 基于Flink SQL的ETL任務(wù)管理
流式數(shù)據(jù)處理除了現(xiàn)有的DataStream API還存在SQL的形式,SQL作為一種通用的語(yǔ)言,對(duì)于數(shù)據(jù)相關(guān)的業(yè)務(wù)同學(xué)極大的降低了學(xué)習(xí)成本。而通過(guò)Flink SQL可以做到的ETL流式數(shù)據(jù)加工也能解決一些復(fù)雜業(yè)務(wù)場(chǎng)景的處理邏輯,將業(yè)務(wù)邏輯轉(zhuǎn)化為DAG的流式處理圖,通過(guò)拖拽的方式也能方便使用,F(xiàn)LINK SQL的演進(jìn)方向能夠和現(xiàn)有的Flink DataStream API互補(bǔ)。
應(yīng)用場(chǎng)景:ETL強(qiáng)大的流式數(shù)據(jù)轉(zhuǎn)換處理能力大幅提升數(shù)據(jù)集成效率,也能建實(shí)時(shí)報(bào)表體系,提高分析效率,同時(shí)也可以應(yīng)用于一些實(shí)時(shí)大屏的場(chǎng)景。
5.2 統(tǒng)一技術(shù)棧
將現(xiàn)有的DTS能力都遷移到Flink平臺(tái)上,保持統(tǒng)一的技術(shù)棧,能夠極大的降低維護(hù)成本。現(xiàn)有遺留的雙向同步、數(shù)據(jù)比對(duì)等能力需要做進(jìn)一步的改造和遷移,符合整體技術(shù)收斂的趨勢(shì)。
6總結(jié)
本文主要分享了以下幾個(gè)方面:Flink相比現(xiàn)有的技術(shù)棧帶來(lái)的收益,切換到Flink以后的迭代方向及架構(gòu)功能上的變更、帶來(lái)新的問(wèn)題如何解決,以及未來(lái)的一些迭代方向,希望能讓大家有所收獲。