什么是數(shù)據(jù)同步利器DataX,如何使用?
今天給大家分享一個阿里開源的數(shù)據(jù)同步工具DataX,在Github擁有14.8k的star,非常受歡迎,地址:https://github.com/alibaba/DataX
什么是 Datax?
DataX 是阿里云 DataWorks數(shù)據(jù)集成 的開源版本,使用Java 語言編寫,在阿里巴巴集團內(nèi)被廣泛使用的離線數(shù)據(jù)同步工具/平臺。DataX 實現(xiàn)了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各種異構(gòu)數(shù)據(jù)源之間高效的數(shù)據(jù)同步功能。
圖片
應(yīng)用場景有那些?
- 數(shù)據(jù)倉庫同步:DataX 可以幫助將數(shù)據(jù)從一個數(shù)據(jù)倉庫(如關(guān)系型數(shù)據(jù)庫、大數(shù)據(jù)存儲系統(tǒng)等)同步到另一個數(shù)據(jù)倉庫,實現(xiàn)數(shù)據(jù)的遷移、備份或復(fù)制。
- 數(shù)據(jù)庫遷移:當(dāng)我們需要將數(shù)據(jù)從一個數(shù)據(jù)庫平臺遷移到另一個數(shù)據(jù)庫平臺時,DataX 可以幫助完成數(shù)據(jù)的轉(zhuǎn)移和轉(zhuǎn)換工作
- 數(shù)據(jù)集成與同步:DataX 可以用作數(shù)據(jù)集成工具,用于將多個數(shù)據(jù)源的數(shù)據(jù)進行整合和同步。它支持多種數(shù)據(jù)源,包括關(guān)系型數(shù)據(jù)庫、NoSQL 數(shù)據(jù)庫、文件系統(tǒng)等,可以將這些數(shù)據(jù)源的數(shù)據(jù)整合到一個目標數(shù)據(jù)源中。
- 數(shù)據(jù)清洗與轉(zhuǎn)換:DataX 提供了豐富的數(shù)據(jù)轉(zhuǎn)換能力,可以對數(shù)據(jù)進行清洗、過濾、映射、格式轉(zhuǎn)換等操作。這對于數(shù)據(jù)倉庫、數(shù)據(jù)湖和數(shù)據(jù)集市等數(shù)據(jù)存儲和分析平臺非常有用,可以幫助提高數(shù)據(jù)質(zhì)量和一致性。
- 數(shù)據(jù)備份與恢復(fù):DataX 可以用于定期備份和恢復(fù)數(shù)據(jù)。通過配置定時任務(wù),可以將數(shù)據(jù)從源端備份到目標端,并在需要時進行數(shù)據(jù)恢復(fù)。
DataX支持那些數(shù)據(jù)源?
圖片
架構(gòu)設(shè)計
圖片
DataX作為離線數(shù)據(jù)同步框架,采用Framework + plugin架構(gòu)構(gòu)建。將數(shù)據(jù)源讀取和寫入抽象成為Reader/Writer插件,納入到整個同步框架中。
- Reader:Reader為數(shù)據(jù)采集模塊,負責(zé)采集數(shù)據(jù)源的數(shù)據(jù),將數(shù)據(jù)發(fā)送給Framework。
- Writer:Writer為數(shù)據(jù)寫入模塊,負責(zé)不斷向Framework取數(shù)據(jù),并將數(shù)據(jù)寫入到目的端。
- Framework:Framework用于連接reader和writer,作為兩者的數(shù)據(jù)傳輸通道,并處理緩沖,流控,并發(fā),數(shù)據(jù)轉(zhuǎn)換等核心技術(shù)問題。
DataX 開源版本支持單機多線程模式完成同步作業(yè)運行,如下圖
圖片
- DataX完成單個數(shù)據(jù)同步的作業(yè),稱之為Job,DataX接受到一個Job之后,將啟動一個進程來完成整個作業(yè)同步過程。DataX Job模塊是單個作業(yè)的中樞管理節(jié)點,承擔(dān)了數(shù)據(jù)清理、子任務(wù)切分(將單一作業(yè)計算轉(zhuǎn)化為多個子Task)、TaskGroup管理等功能。
- DataXJob啟動后,會根據(jù)不同的源端切分策略,將Job切分成多個小的Task(子任務(wù)),以便于并發(fā)執(zhí)行。Task便是DataX作業(yè)的最小單元,每一個Task都會負責(zé)一部分數(shù)據(jù)的同步工作。
- 切分多個Task之后,DataX Job會調(diào)用Scheduler模塊,根據(jù)配置的并發(fā)數(shù)據(jù)量,將拆分成的Task重新組合,組裝成TaskGroup(任務(wù)組)。每一個TaskGroup負責(zé)以一定的并發(fā)運行完畢分配好的所有Task,默認單個任務(wù)組的并發(fā)數(shù)量為5。
- 每一個Task都由TaskGroup負責(zé)啟動,Task啟動后,會固定啟動Reader—>Channel—>Writer的線程來完成任務(wù)同步工作。
- DataX作業(yè)運行起來之后, Job監(jiān)控并等待多個TaskGroup模塊任務(wù)完成,等待所有TaskGroup任務(wù)完成后Job成功退出。否則,異常退出,進程退出值非0
DataX調(diào)度流程
舉例來說,用戶提交了一個DataX作業(yè),并且配置了20個并發(fā),目的是將一個100張表的mysql數(shù)據(jù)同步到odps里面。DataX的調(diào)度決策是:
- Job根據(jù)分表切分成了100個Task。
- 根據(jù)20個并發(fā),DataX計算需要分配4個TaskGroup。
- 4個TaskGroup平分切分好的100個Task,每一個TaskGroup負責(zé)5個并發(fā)共計運行25個Task。
如何使用 Datax?
點擊datax 下載,下載后解壓至本地某個目錄,如下圖
圖片
用例說明
這里為了方便演示,我們同步MySQL的user_info表至MySQL的ods_test_mysql_user_info_m,同步條件為更新時間字段,如下
在實際工作中你可以選擇不同類型的數(shù)據(jù)源測試
drop table ods_test_mysql_user_info_m
CREATE TABLE `user_info` (
`id` int NOT NULL COMMENT 'ID',
`name` varchar(50) NOT NULL COMMENT '名稱',
`sex` tinyint NOT NULL COMMENT '性別 1男 2女',
`phone` varchar(11) COMMENT '手機',
`address` varchar(1000) COMMENT '地址',
`age` int COMMENT '年齡',
`create_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT '創(chuàng)建時間',
`update_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '修改時間',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='用戶信息表';
CREATE TABLE `ods_test_mysql_user_info_m` (
`id` int NOT NULL COMMENT 'ID',
`name` varchar(50) NOT NULL COMMENT '名稱',
`sex` tinyint NOT NULL COMMENT '性別 1男 2女',
`phone` varchar(11) COMMENT '手機',
`address` varchar(1000) COMMENT '地址',
`age` int COMMENT '年齡',
`create_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT '創(chuàng)建時間',
`update_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '修改時間',
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='用戶信息數(shù)倉表';
在user_info表中插入數(shù)據(jù)如下
圖片
創(chuàng)建作業(yè)的配置文件(json格式)
在 datax 的 script 目錄,創(chuàng)建ods_test_mysql_user_info_m.json文件,配置如下,mysqlreader表示讀取端,mysqlwriter表示寫入端
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["id","name","sex","phone","address","age","create_time","update_time"],
"splitPk": "id",
"connection": [
{
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false"],
"table": ["user_info"]
}
],
"password": "root",
"username": "root",
"where": "update_time > '${updateTime}' "
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "replace",
"column": ["id","name","sex","phone","address","age","create_time","update_time"],
"connection": [
{
"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false",
"table": ["ods_test_mysql_user_info_m"]
}
],
"username": "root",
"password": "root",
"preSql": [],
"session": [
"set session sql_mode='ANSI'"
]
}
}
}
],
"setting": {
"speed": {
"channel": "5"
}
}
}
}
創(chuàng)建執(zhí)行腳本
為了更貼合實際,寫一個調(diào)度腳本sync.sh支持動態(tài)參數(shù)來執(zhí)行任務(wù)
#!/bin/bash
## 執(zhí)行示例 sh /Users/weizhao.dong/Documents/soft/datax/datax-script/call.sh /Users/weizhao.dong/Documents/soft/datax/datax-script/dwd_g2park_inout_report_s.json 1
jsnotallow=$1
echo '執(zhí)行腳本:'$jsonScript
interval=$2
echo "時間間隔(分鐘):"$interval
now_time=$(date '+%Y-%m-%d %H:%M:%S')
echo "當(dāng)前時間:"$now_time
update_time=$(date -v -${interval}M '+%Y-%m-%d %H:%M:%S')
#linux 更新時間獲取
#update_time=$(date -d "${now_time} $interval minute ago" +"%Y-%m-%d %H:%M:%S")
echo "更新時間:"$update_time
#執(zhí)行
python3 /Users/weizhao.dong/Documents/soft/datax/bin/datax.py $jsonScript -p "-DupdateTime='${update_time}'"
假設(shè)我們要執(zhí)以上ods_test_mysql_user_info_m.json腳本,并且同步十分鐘之前的數(shù)據(jù),如下
./sync.sh ods_test_mysql_user_info_m.json 10
測試
圖片
執(zhí)行./sync.sh ods_test_mysql_user_info_m.json 10進行同步
圖片
圖片
以上結(jié)果可能有些人有疑問,就三條數(shù)據(jù)執(zhí)行時間為 10s,其實這個 10s主要是初始化時間,耗時過長,同步的數(shù)據(jù)量多了優(yōu)勢就體現(xiàn)出來了,以下為實際生產(chǎn)同步數(shù)據(jù)結(jié)果,可以看到同步63102條耗時22s
推薦用法
以上我們只是通過一個簡單的示例來演示了dataX如何使用,如果只是一次性同步,沒問題,但是如果是周期性進行同步,有以下幾種方式推薦
crontab調(diào)度
這種方式是最簡單的,可以使用操作系統(tǒng)中的crontab定時調(diào)度,通過crontab -e編輯corn 任務(wù),添加對應(yīng)腳本即可
海豚調(diào)度器
在種方式在大數(shù)據(jù)領(lǐng)域用的比較多,典型場景就是 mysql 同步到數(shù)倉,海豚調(diào)度器內(nèi)置了 datax 并且提供了圖形化配置界面,配置起來非常方便
圖片
圖片
同時每次執(zhí)行都有記錄,并且都有對應(yīng)的日志
圖片
定時任務(wù)框架(elasticjob/xxl-job)
定時調(diào)度框架都支持調(diào)度 shell 腳本,通過傳入對應(yīng)參數(shù)也可執(zhí)行