如何快速同步第三方平臺(tái)數(shù)據(jù)?
大家好,我是蘇三,又跟大家見面了。
前言
最近知識(shí)星球中有位小伙伴問了我一個(gè)問題:如何快速同步第三方平臺(tái)數(shù)據(jù)?
他們有個(gè)業(yè)務(wù)需求是:需要同步全國34個(gè)省市,多個(gè)系統(tǒng)的8種業(yè)務(wù)數(shù)據(jù),到他們公司的系統(tǒng)當(dāng)中。
他們需求同步全量的數(shù)據(jù)和增量的數(shù)據(jù)。
全量的數(shù)據(jù)主要是針對(duì)多個(gè)系統(tǒng)的歷史數(shù)據(jù),大概有幾千萬數(shù)據(jù),只需要初始化一次即可。
而增量的數(shù)據(jù),是系統(tǒng)后續(xù)變更的數(shù)據(jù)。
這個(gè)需求其實(shí)不簡單,至少有以下難點(diǎn):
- 不能直接訪問第三方數(shù)據(jù)庫。
- 不能將歷史數(shù)據(jù)導(dǎo)出到excel中,有泄露數(shù)據(jù)的風(fēng)險(xiǎn)。
- 如何快速同步歷史數(shù)據(jù)?
- 增量數(shù)據(jù)如何處理?
- 接口需要做限流嗎?
- 增量數(shù)據(jù)如何校驗(yàn)數(shù)據(jù)的一致性?
帶著這些問題,開始今天的文章之旅。
1. 如何快速同步歷史數(shù)據(jù)?
想要快速同步歷史數(shù)據(jù),第一個(gè)想到的可能是直接同步數(shù)據(jù)庫中的數(shù)據(jù)。
但多個(gè)第三方系統(tǒng)為了數(shù)據(jù)安全考慮,不可能直接把他們的數(shù)據(jù)庫訪問地址和相關(guān)賬號(hào)密碼告訴你。
即使他們告訴你了,但有很多個(gè)系統(tǒng),你一個(gè)個(gè)去連數(shù)據(jù)庫查數(shù)據(jù),也非常麻煩。
有些小伙伴可能會(huì)說:這好辦,讓第三方系統(tǒng)把他們的歷史數(shù)據(jù)導(dǎo)出到excel中,我們寫個(gè)程序解析去這些excel,就能將數(shù)據(jù)快速導(dǎo)入到我們的數(shù)據(jù)庫中。
這是個(gè)好辦法,但忽略了一點(diǎn):這些數(shù)據(jù)是敏感數(shù)據(jù),不能對(duì)外暴露。
因此導(dǎo)出excel的方案行不通。
那么,該如何快速同步歷史數(shù)據(jù)呢?
答:使用SFTP。
不知道你有沒有跟銀行對(duì)接過,SFTP在銀行業(yè)務(wù)中經(jīng)常會(huì)用到。
那么,如何用SFTP同步數(shù)據(jù)呢?
2. 如何使用SFTP?
說起SFTP,就不得不說一說FTP。
我們都知道,F(xiàn)TP是用來傳送文件的協(xié)議。使用FTP實(shí)現(xiàn)遠(yuǎn)程文件傳輸?shù)耐瑫r(shí),還可以保證數(shù)據(jù)傳輸?shù)目煽啃院透咝浴?/p>
而SFTP是一種可以安全傳輸文件的協(xié)議,它是一種基于SSH(Secure Shell)的文件傳輸協(xié)議,它允許用戶將文件以加密的形式傳輸?shù)竭h(yuǎn)程服務(wù)器上,以保護(hù)文件的安全性。
FTP和SFTP有哪些區(qū)別呢?
- 鏈接方式不同:FTP使用TCP的21號(hào)端口建立連接。而SFTP是在客戶端和服務(wù)器之間通過 SSH 協(xié)議 (即TCP22號(hào)端口) 建立的安全連接來傳輸文件。
- 安全性不同:SFTP使用加密傳輸認(rèn)證信息和傳輸?shù)臄?shù)據(jù),相對(duì)于FTP更安全一些。
- 傳輸效率不同:SFTP傳輸文件時(shí)使用了加密解密技術(shù),因此傳輸效率比普通的FTP要低一些。
- 使用協(xié)議不同:FTP使用了TCP/IP協(xié)議,而SFTP使用了SSH協(xié)議。
- 安全通道:SFTP協(xié)議提供了一個(gè)安全通道,用于在網(wǎng)絡(luò)上的主機(jī)之間傳輸文件。而FTP協(xié)議沒有安全通道。
因此可見,我們使用SFTP來傳輸文件還是比較安全的。
那么,如何使用SFTP來實(shí)現(xiàn)同步歷史數(shù)據(jù)的需求呢?
答:這就需要我們做好SFTP的賬號(hào)、目錄和文件格式的規(guī)劃了。
2.1 賬號(hào)權(quán)限控制
首先需要運(yùn)維同學(xué)搭建一個(gè)SFTP服務(wù)器,提供一個(gè)可以對(duì)外訪問的域名和端口號(hào)。
然后需要在根目錄下,創(chuàng)建一個(gè)存放文件的目錄,比如:/data。
然后給每個(gè)省市的第三方系統(tǒng)都創(chuàng)建一個(gè)子目錄,比如:/data/sichuan、/data/shenzhen、/data/beijing等。
接下來,我們需要給每個(gè)子目錄創(chuàng)建一個(gè)賬號(hào),以及分配權(quán)限。
比如有個(gè)賬號(hào)是:sichuan,密碼是:sisuan123。這個(gè)賬號(hào)只擁有/data/sichuan目錄讀數(shù)據(jù)和寫數(shù)據(jù)的權(quán)限。
另外一個(gè)賬號(hào)是:shenzhen,密碼是:shenzhen123。這個(gè)賬號(hào)只擁有/data/目錄讀數(shù)據(jù)和寫數(shù)據(jù)的權(quán)限。
以此類推。
當(dāng)然大家如果不放心,可以用在線工具,將密碼設(shè)置成一個(gè)8位的隨機(jī)字符串,包含字母、數(shù)字和特殊字符,這樣的密碼安全性相對(duì)來說要高一些。
這樣相關(guān)的第三方系統(tǒng)都有往SFTP自己目錄下讀和寫數(shù)據(jù)的權(quán)限。
在這里溫馨提醒一下:上面這些賬號(hào)讀數(shù)據(jù)的權(quán)限,主要是為了后面他們好排查問題用的,不是必須分配的,我們需要根據(jù)實(shí)際情況而定。
此外,還需要給我們自己分配一個(gè)賬號(hào),開通對(duì)/data整個(gè)目錄的只讀權(quán)限。
2.2 統(tǒng)一數(shù)據(jù)格式
接下來,最關(guān)鍵的一步是要制定一個(gè)統(tǒng)一的文件格式和數(shù)據(jù)格式。
文件名稱為:sichuan_20230724.txt。
也就是用 省市拼音_日期.txt 的格式。
這樣大家就能非常清楚的看出,是哪個(gè)省市,哪個(gè)日期產(chǎn)生的數(shù)據(jù)。
然后我們需要規(guī)定txt文件的格式。
比如:id占20個(gè)字符,name占30個(gè)字符,金額占10個(gè)字符等等。
如果有些列的數(shù)據(jù)不滿對(duì)應(yīng)的字符長度,前面可以補(bǔ)0。
這樣我們的程序,只需要在解析txt文件時(shí),先讀取一行數(shù)據(jù),是一個(gè)比較長的字符串,然后按照固定的長度,去解析字符串中每一列的數(shù)據(jù)即可。
2.3 使用job同步數(shù)據(jù)
假如第三方系統(tǒng)都按照我們要求,已將歷史數(shù)據(jù)寫入到指定目錄下的指定文件中。
這時(shí)我們需要提供一個(gè)job,去讀取/data目錄下,所有子目錄的txt文件,一個(gè)個(gè)解析里面包含的歷史數(shù)據(jù),然后將這些數(shù)據(jù),做一些業(yè)務(wù)邏輯處理,然后寫入我們的數(shù)據(jù)庫當(dāng)中。
如圖所示:
圖片
當(dāng)然如果想快一點(diǎn)處理完,我們可以在job中使用多線程解析和讀取不同的txt文件,然后寫數(shù)據(jù)。
3. 增量數(shù)據(jù)如何處理?
對(duì)于歷史數(shù)據(jù),我們通過上面的方案,可以快速的同步數(shù)據(jù)。
但對(duì)于增量的數(shù)據(jù)如何處理呢?
增量的數(shù)據(jù),對(duì)實(shí)時(shí)性要求比較高。
我們沒辦法跟之前一下,走SFTP同步文件,然后使用job定時(shí)解析文件的方案。
為了滿足數(shù)據(jù)實(shí)時(shí)性的需求,我們不得不走接口實(shí)時(shí)數(shù)據(jù)同步的方案。
那么,是第三方系統(tǒng)提供接口,還是我們這邊提供接口呢?
很顯然,如果讓第三方提供接口,第三方有那么多系統(tǒng),我們需要對(duì)接很多很多接口,非常麻煩。
因此,這個(gè)接口必須由我們這邊提供。
我們這邊提供一個(gè)統(tǒng)一的數(shù)據(jù)上報(bào)接口,支持傳入批量的數(shù)據(jù)。
為了防止第三方系統(tǒng),一次性傳入過多的參數(shù),導(dǎo)致該接口超時(shí),我們需要對(duì)單次上傳的數(shù)據(jù)條數(shù)做限制,例如:一次請(qǐng)求,最大允許上傳500條數(shù)據(jù)。
其實(shí),光限制請(qǐng)求參數(shù)還不夠。
我們的這個(gè)數(shù)據(jù)上報(bào)接口,可能會(huì)被多個(gè)系統(tǒng)調(diào)用,并發(fā)量可能也不小。
為了防止在高并發(fā)下,請(qǐng)求量突增把我們的接口搞掛了,我們需要對(duì)接口限流。
我們可以使用redis記錄第三方系統(tǒng)請(qǐng)求的url和請(qǐng)求賬號(hào),然后在程序中查詢r(jià)edis中的次數(shù),是否超過限額。允許每一個(gè)第三方系統(tǒng),在1秒之內(nèi)調(diào)用10次。第三方系統(tǒng)總的請(qǐng)求次數(shù),1秒不超過500次。
如果超過了限額,則數(shù)據(jù)上報(bào)接口提示:請(qǐng)求太頻繁,請(qǐng)稍后再試。
圖片
為了增加數(shù)據(jù)上報(bào)接口的性能,在接收到數(shù)據(jù)之后,不直接寫庫。
我們可以將接口中接收到的數(shù)據(jù)作為mq消息,發(fā)送到mq服務(wù)器。
然后有專門的mq消費(fèi)者,實(shí)時(shí)監(jiān)聽mq服務(wù)器的消息,異步讀取消息寫入數(shù)據(jù)庫。
該方案比較適合,寫庫操作,包含了一些復(fù)雜的業(yè)務(wù)邏輯。
如果消費(fèi)速度有點(diǎn)慢,我們可以及時(shí)調(diào)整mq消費(fèi)者,使用多線程處理,或者增加mq中隊(duì)列的數(shù)量,增加mq消費(fèi)者來增加消息的處理速度。
圖片
如果mq消費(fèi)者在處理mq消息的過程中,由于網(wǎng)絡(luò)問題,寫庫失敗了,可以增加自動(dòng)重試機(jī)制。
圖片
一旦mq消費(fèi)者在mq消費(fèi)過程中出現(xiàn)失敗的情況,則自動(dòng)重試3次,如果還是失敗,則將消息寫入死信隊(duì)列,目前RocketMQ自帶了失敗重試功能。
然后有個(gè)job監(jiān)控死信隊(duì)列,如果一旦發(fā)現(xiàn)異常數(shù)據(jù),則發(fā)報(bào)警郵件給相關(guān)開發(fā),后面人工處理。
4. 如何校驗(yàn)數(shù)據(jù)一致性?
通過上面的方案,我們把歷史數(shù)據(jù)和增量的數(shù)據(jù)都已經(jīng)處理了。
但還有一個(gè)問題:如何校驗(yàn)數(shù)據(jù)一致性。
對(duì)于歷史數(shù)據(jù),其實(shí)我們好處理,第三方系統(tǒng)已經(jīng)生成好txt文件上傳到SFTP上了,我們可以直接對(duì)比那些文件即可。
但對(duì)于增量的數(shù)據(jù),是第三方系統(tǒng)調(diào)用我們的數(shù)據(jù)上報(bào)接口,去上報(bào)的數(shù)據(jù),這部分?jǐn)?shù)據(jù)如何校驗(yàn)數(shù)據(jù)一致性呢?
答:我們可以要求第三方系統(tǒng),在某日凌晨,生成一份昨日的增量數(shù)據(jù)到txt文件,然后上傳到SFTP上。
我們有個(gè)job,在每天的凌晨1點(diǎn)會(huì)讀取第三方系統(tǒng)生成昨日增量數(shù)據(jù),跟我們數(shù)據(jù)庫中昨日的增量數(shù)據(jù)做對(duì)比,校驗(yàn)數(shù)據(jù)的差異性。
如果第三方后面產(chǎn)生的增量數(shù)據(jù),只有新增,沒有刪除和修改,使用上面的方案是沒有問題的。
但如果增量的數(shù)據(jù),包含了刪除和修改的數(shù)據(jù),可能會(huì)有問題。
因?yàn)槲覀冏霰容^的數(shù)據(jù)源是昨日的增量數(shù)據(jù),而我們的job在比較數(shù)據(jù)的過程中,萬一第三方系統(tǒng)上報(bào)了我們正在對(duì)比的數(shù)據(jù),更新成了一個(gè)新值,跟昨日的值不一樣,這樣對(duì)比數(shù)據(jù)就會(huì)產(chǎn)生差異。
那么,該如何解決這個(gè)問題呢?
答:我們可以只校驗(yàn)昨日的數(shù)據(jù)(就修改時(shí)間是昨天),今日產(chǎn)生的增量數(shù)據(jù),會(huì)在明日凌晨1點(diǎn)的job中會(huì)去校驗(yàn)的。
在比較時(shí),遍歷昨日增量txt文件中的每行數(shù)據(jù),跟數(shù)據(jù)庫中的數(shù)據(jù)做對(duì)比,如果id相同,但是修改時(shí)間是今天,則忽略這條數(shù)據(jù)。
如果id相同,修改時(shí)間是昨天,則判斷數(shù)據(jù)是否一致,如果不一致,則用txt文件中的數(shù)據(jù)修復(fù)我們數(shù)據(jù)庫中的異常數(shù)據(jù)。
如果txt文件中的id,在我們數(shù)據(jù)庫中不存在,則新增一條數(shù)據(jù)。
圖片
這兩種情況產(chǎn)生的數(shù)據(jù)變動(dòng),修改時(shí)間要設(shè)置成昨天,不然明日的job又會(huì)再重新處理一次這條數(shù)據(jù)。