數(shù)倉(cāng) | 幾種常見(jiàn)的數(shù)據(jù)同步方式
本文轉(zhuǎn)載自微信公眾號(hào)「大數(shù)據(jù)技術(shù)與數(shù)倉(cāng) 」,作者西貝。轉(zhuǎn)載本文請(qǐng)聯(lián)系大數(shù)據(jù)技術(shù)與數(shù)倉(cāng)公眾號(hào)。
寫(xiě)在前面
數(shù)據(jù)倉(cāng)庫(kù)的特性之一是集成,即首先把未經(jīng)過(guò)加工處理的、不同來(lái)源的、不同形式的數(shù)據(jù)同步到ODS層,一般情況下,這些ODS層數(shù)據(jù)包括日志數(shù)據(jù)和業(yè)務(wù)DB數(shù)據(jù)。對(duì)于業(yè)務(wù)DB數(shù)據(jù)而言(比如存儲(chǔ)在MySQL中),將數(shù)據(jù)采集并導(dǎo)入到數(shù)倉(cāng)中(通常是Hive或者M(jìn)axCompute)是非常重要的一個(gè)環(huán)節(jié)。
那么,該如何將業(yè)務(wù)DB數(shù)據(jù)高效準(zhǔn)確地同步到數(shù)倉(cāng)中呢?一般企業(yè)會(huì)使用兩種方案:直連同步與實(shí)時(shí)增量同步(數(shù)據(jù)庫(kù)日志解析)。其中直連同步的基本思路是直連數(shù)據(jù)庫(kù)進(jìn)行SELECT,然后將查詢的數(shù)據(jù)存儲(chǔ)到本地文件作為中間存儲(chǔ),最后把文件Load到數(shù)倉(cāng)中。這種方式非常的簡(jiǎn)單方便,但是隨著業(yè)務(wù)的發(fā)展,會(huì)遇到一些瓶頸,具體見(jiàn)下文分析。
為了解決這些問(wèn)題,一般會(huì)使用實(shí)時(shí)增量的方式進(jìn)行數(shù)據(jù)同步,其基本原理是CDC (Change Data Capture) + Merge,即實(shí)時(shí)Binlog采集 + 離線處理Binlog還原業(yè)務(wù)數(shù)據(jù)這樣一套解決方案。
本文主要包括以下內(nèi)容,希望對(duì)你有所幫助
- 常見(jiàn)數(shù)據(jù)同步方式
- 流式數(shù)據(jù)集成
數(shù)據(jù)同步的方式
直連同步
直連同步是指通過(guò)定義好的規(guī)范接口API和基于動(dòng)態(tài)鏈接庫(kù)的方式直接連接業(yè)務(wù)庫(kù),比如ODBC/JDBC等規(guī)定了統(tǒng)一的標(biāo)準(zhǔn)接口,不同的數(shù)據(jù)庫(kù)基于這套標(biāo)準(zhǔn)提供規(guī)范的驅(qū)動(dòng),從而支持完全相同的函數(shù)調(diào)用和SQL實(shí)現(xiàn)。比如經(jīng)常使用的Sqoop就是采取這種方式進(jìn)行批量數(shù)據(jù)同步的。
直連同步的方式配置十分簡(jiǎn)單,很容易上手操作,比較適合操作型業(yè)務(wù)系統(tǒng)的數(shù)據(jù)同步,但是會(huì)存在以下問(wèn)題:
- 數(shù)據(jù)同步時(shí)間:隨著業(yè)務(wù)規(guī)模的增長(zhǎng),數(shù)據(jù)同步花費(fèi)的時(shí)間會(huì)越來(lái)越長(zhǎng),無(wú)法滿足下游數(shù)倉(cāng)生產(chǎn)的時(shí)間要求。
- 性能瓶頸:直連數(shù)據(jù)庫(kù)查詢數(shù)據(jù),對(duì)數(shù)據(jù)庫(kù)影響非常大,容易造成慢查詢,如果業(yè)務(wù)庫(kù)沒(méi)有采取主備策略,則會(huì)影響業(yè)務(wù)線上的正常服務(wù),如果采取了主備策略,雖然可以避免對(duì)業(yè)務(wù)系統(tǒng)的性能影響,但當(dāng)數(shù)據(jù)量較大時(shí),性能依然會(huì)很差。
日志解析
所謂日志解析,即解析數(shù)據(jù)庫(kù)的變更日志,比如MySQL的Binlog日志,Oracle的歸檔日志文件。通過(guò)讀取這些日志信息,收集變化的數(shù)據(jù)并將其解析到目標(biāo)存儲(chǔ)中即可完成數(shù)據(jù)的實(shí)時(shí)同步。這種讀操作是在操作系統(tǒng)層面完成的,不需要通過(guò)數(shù)據(jù)庫(kù),因此不會(huì)給源數(shù)據(jù)庫(kù)帶來(lái)性能上的瓶頸。
數(shù)據(jù)庫(kù)日志解析的同步方式可以實(shí)現(xiàn)實(shí)時(shí)與準(zhǔn)實(shí)時(shí)的同步,延遲可以控制在毫秒級(jí)別的,其最大的優(yōu)勢(shì)就是性能好、效率高,不會(huì)對(duì)源數(shù)據(jù)庫(kù)造成影響,目前,從業(yè)務(wù)系統(tǒng)到數(shù)據(jù)倉(cāng)庫(kù)中的實(shí)時(shí)增量同步,廣泛采取這種方式。當(dāng)然,這種方式也會(huì)存在一些問(wèn)題,比如批量補(bǔ)數(shù)時(shí)造成大量數(shù)據(jù)更新,日志解析會(huì)處理較慢,造成數(shù)據(jù)延遲。除此之外,這種方式比較復(fù)雜,投入也較大,因?yàn)樾枰粋€(gè)實(shí)時(shí)的抽取系統(tǒng)去抽取并解析日志,下文會(huì)對(duì)此進(jìn)行詳細(xì)解釋。
如上圖所示架構(gòu),在直連同步基礎(chǔ)之上增加了流式同步的鏈路,經(jīng)過(guò)流式計(jì)算引擎把相應(yīng)的 Binlog 采集到 Kafka,同時(shí)會(huì)經(jīng)過(guò)一個(gè) Kafka 2Hive 的程序把它導(dǎo)入到原始數(shù)據(jù),再經(jīng)過(guò)一層 Merge,產(chǎn)出下游需要的 ODS 數(shù)據(jù)。
上述的數(shù)據(jù)集成方式優(yōu)勢(shì)是非常明顯的,把數(shù)據(jù)傳輸?shù)臅r(shí)間放到了 T+0 這一天去做,在第二天的時(shí)候只需要去做一次 merge 就可以了。非常節(jié)省時(shí)間和計(jì)算資源。
流式數(shù)據(jù)集成實(shí)現(xiàn)
實(shí)現(xiàn)思路
首先,采用Flink負(fù)責(zé)把Kafka上的Binlog數(shù)據(jù)拉取到HDFS上,生成增量表。
然后,對(duì)每張ODS表,首先需要一次性制作快照(Snapshot),把MySQL里的存量數(shù)據(jù)讀取到Hive上,這一過(guò)程底層采用直連MySQL去Select數(shù)據(jù)的方式,可以使用Sqoop進(jìn)行一次性全量導(dǎo)入,生成一張全量表。
最后,對(duì)每張ODS表,每天基于存量數(shù)據(jù)和當(dāng)天增量產(chǎn)生的Binlog做Merge,從而還原出業(yè)務(wù)數(shù)據(jù)。
Binlog是流式產(chǎn)生的,通過(guò)對(duì)Binlog的實(shí)時(shí)采集,把部分?jǐn)?shù)據(jù)處理需求由每天一次的批處理分?jǐn)偟綄?shí)時(shí)流上。無(wú)論從性能上還是對(duì)MySQL的訪問(wèn)壓力上,都會(huì)有明顯地改善。Binlog本身記錄了數(shù)據(jù)變更的類型(Insert/Update/Delete),通過(guò)一些語(yǔ)義方面的處理,完全能夠做到精準(zhǔn)的數(shù)據(jù)還原。
關(guān)于Binlog解析部分,可以使用canal工具,采集到Kafka之后,可以使用Flink解析kafka數(shù)據(jù)并寫(xiě)入到HDFS上,解析kafka的數(shù)據(jù)可以使用Flink的DataStreamAPI,也可以使用FlinkSQL的canal-json數(shù)據(jù)源格式進(jìn)行解析,使用FlinkSQL相對(duì)來(lái)說(shuō)是比較簡(jiǎn)單的。下面是canal-json格式的kafka數(shù)據(jù)源。
- CREATE TABLE region (
- id BIGINT,
- region_name STRING
- ) WITH (
- 'connector' = 'kafka',
- 'topic' = 'mydw.base_region',
- 'properties.bootstrap.servers' = 'kms-3:9092',
- 'properties.group.id' = 'testGroup',
- 'format' = 'canal-json' ,
- 'scan.startup.mode' = 'earliest-offset'
- );
數(shù)據(jù)解析完成之后,下面的就是合并還原完整數(shù)據(jù)的過(guò)程,關(guān)于合并還原數(shù)據(jù),一種比較常見(jiàn)的方式就是全外連接(FULL OUTER JOIN)。具體如下:
生成增量表與全量表的Merge任務(wù),當(dāng)天的增量數(shù)據(jù)與昨天的全量數(shù)據(jù)進(jìn)行全外連接,該Merge任務(wù)的基本邏輯是:
- INSERT OVERWRITE TABLE user_order PARTITION(ds='20211012')
- SELECT CASE WHEN n.id IS NULL THEN o.id
- ELSE n.id
- END
- ,CASE WHEN n.id IS NULL THEN o.create_time
- ELSE n.create_time
- END
- ,CASE WHEN n.id IS NULL THEN o.modified_time
- ELSE n.modified_time
- END
- ,CASE WHEN n.id IS NULL THEN o.user_id
- ELSE n.user_id
- END
- ,CASE WHEN n.id IS NULL THEN o.sku_code
- ELSE n.sku_code
- END
- ,CASE WHEN n.id IS NULL THEN o.pay_fee
- ELSE n.pay_fee
- END
- FROM (
- SELECT *
- FROM user_order_delta
- WHERE ds = '20211012'
- AND id IS NOT NULL
- AND user_id IS NOT NULL
- ) n
- FULL OUTER JOIN (-- 全外連接進(jìn)行數(shù)據(jù)merge
- SELECT *
- FROM user_order
- WHERE ds = '20211011'
- AND id IS NOT NULL
- AND user_id IS NOT NULL
- ) o
- ON o.id = n.id
- AND o.user_id = n.user_id
- ;
經(jīng)過(guò)上述步驟,即可將數(shù)據(jù)還原完整。
總結(jié)
本文首先介紹了數(shù)據(jù)倉(cāng)庫(kù)構(gòu)建ODS層常見(jiàn)的數(shù)據(jù)同步方式,并對(duì)每種方式進(jìn)行了解釋,給出了相對(duì)應(yīng)的示意圖。接著給出了CDC+Merge的數(shù)據(jù)同步方案。值得注意的是,F(xiàn)link1.11引入了CDC的connector,比如MySQL CDC和Postgres CDC,同時(shí)對(duì)Kafka的Connector支持canal-json和debezium-json以及changelog-json的format,通過(guò)這種方式可以很方便地捕獲變化的數(shù)據(jù),大大簡(jiǎn)化了數(shù)據(jù)處理的流程和數(shù)據(jù)同步的復(fù)雜度。