揭露數(shù)據(jù)不一致的利器 —— 實時核對系統(tǒng)
隨著企業(yè)業(yè)務發(fā)展,以及微服務化大趨勢下單體服務的拆分,服務間的通信交互越來越多。與單體服務不同,微服務間的數(shù)據(jù)往往需要通過額外的手段來保障一致性,例如事務消息、異步任務補償?shù)取3藦臋C制上最大程度保障以外,如何觀測并及時發(fā)現(xiàn)數(shù)據(jù)不一致也非常重要。
本文介紹 Shopee Financial Products 團隊設計和開發(fā)的 實時核對系統(tǒng)(Real-time Checking System) ,它接入簡單,只需根據(jù)核對需求配置對應的核對規(guī)則,實現(xiàn)了規(guī)則熱加載,并能在不侵入業(yè)務的前提下對系統(tǒng)數(shù)據(jù)進行實時監(jiān)測對比,及時發(fā)現(xiàn)數(shù)據(jù)的不一致。系統(tǒng)落地至今,已在 Shopee 多個產(chǎn)品線推廣使用,幫助不同團隊快速發(fā)現(xiàn)線上數(shù)據(jù)不一致問題,為數(shù)據(jù)保駕護航。
1. 背景
1.1 系統(tǒng)數(shù)據(jù)的不一致性
在日常的開發(fā)迭代中我們能發(fā)現(xiàn),系統(tǒng)的數(shù)據(jù)有時并不按照我們設想的那樣進行變更。常見的場景如:用戶進行了還款(Repay),系統(tǒng) A 收到了還款請求后調用系統(tǒng) B,將已凍結的賬戶進行解凍,但因為某些原因(如系統(tǒng)故障、網(wǎng)絡分區(qū)等),解凍的請求沒有抵達 B,或者解凍成功的響應沒有返回給 A,此時會出現(xiàn)已經(jīng)確定收款但未解凍,或未確認收款卻已解凍的情況,從而引起用戶投訴或資金損失。
Fig1. Data Inconsistency
造成這類問題的原因通常有:代碼邏輯 Bug、并發(fā)場景處理不當、基礎組件(網(wǎng)絡、數(shù)據(jù)庫、中間件)故障、跨系統(tǒng)間缺乏原生的一致性保障等等。隨著業(yè)務擴展,企業(yè)內的應用越來越多,且有許多 單體應用 (Monolithic Application)向 微服務 (Microservices)拆分轉型,分布式的場景下丟失了數(shù)據(jù)庫事務的支持,需要解決數(shù)據(jù)一致性的問題。
保障數(shù)據(jù)一致的方案有很多種,在單體服務且缺少不同組件間(例如跨 Database、不同存儲中間件)事務支持的場景下,可以使用本地事務表 + 補償任務的組合,將主表數(shù)據(jù)與檢查任務通過事務寫入,再通過異步任務不斷檢查目標數(shù)據(jù)是否一致并進行補償,可實現(xiàn)最終一致性;在跨服務場景下,Saga 模式通過可靠消息及服務提供回滾事務的能力,來實現(xiàn)分布式事務。
但是,對于重要的業(yè)務,不管使用何種一致性方案, 提供額外的檢查、核對、兜底手段都是必要的 ,由此衍生出了很多的業(yè)務核對、對賬的需求。服務間通過特定手段保障數(shù)據(jù)一致性,并設計無侵入的旁路系統(tǒng)進行數(shù)據(jù)核對和校驗,是微服務架構下的典型搭配。
Fig2. Data Consistency Insurance
1.2 離線核對的缺陷
常見的離線數(shù)據(jù)核對可以通過定時任務, 按照一定的篩選條件,從不同數(shù)據(jù)源中獲取特定數(shù)據(jù),再進行比較 。這種方案的偽代碼如:
func Check() {
// 獲取上游 update_time 落在 [a, b) 的數(shù)據(jù)行
upstreamRows := QueryUpstreamDB(a, b)
for uniqueKey, sourceData := range upstreamRows {
// 為每個上游數(shù)據(jù)查找對應的下游數(shù)據(jù)
targetData := QueryDownstreamDB(uniqueKey)
// 對比上下游數(shù)據(jù)
Compare(sourceData, targetData)
}
}
時效性低是這類查表方案的通病。核對操作通常放在異步任務中定時執(zhí)行,執(zhí)行時間和離數(shù)據(jù)變更時間有一定延遲,且定時任務的查詢條件也會對核對目標造成影響。當出現(xiàn)異常數(shù)據(jù)時,不能及時發(fā)現(xiàn)問題,只能等待下次定時任務執(zhí)行后才能發(fā)現(xiàn)。
引入了 額外的掃表開銷 同樣是個不容忽視的問題。在數(shù)據(jù)量較大,尤其是存在大量 ??INSERT?
? 操作的場景下,想要核對就需要 ??SELECT?
? 出上下游的目標數(shù)據(jù)。為了在不影響正常業(yè)務的情況下及時處理完核對任務,開發(fā)者可通過將查詢轉移到從庫,甚至引入核對任務獨占的從庫,但此類查表核對方案在資源使用和實現(xiàn)復雜度方面都不夠理想。
同時,由于查表得到的結果只是當前的數(shù)據(jù)版本,在兩次檢查之間,數(shù)據(jù)可能發(fā)生了多次變更, 定時任務無法感知和觀測到每個狀態(tài)變更 ,在數(shù)據(jù)被頻繁 ??UPDATE?
? 的場景下也存在一定的核對和檢測難度。
因此,要實現(xiàn)更好的數(shù)據(jù)核對,我們需要考慮以下幾點目標:
- 實現(xiàn)秒級核對。
- 盡量減少數(shù)據(jù)庫查詢。
- 核對數(shù)據(jù)變更,而非核對數(shù)據(jù)快照。
- 簡單靈活的接入方式。
2. 實時數(shù)據(jù)核對
為了更好地發(fā)現(xiàn)數(shù)據(jù)不一致的情況,Shopee Financial Products 團隊在 2021 年中設計并實現(xiàn)了 Real-time Checking System (實時核對系統(tǒng),RCS)。RCS 具有以下核心優(yōu)勢:
- 秒級數(shù)據(jù)核對。
- 對業(yè)務邏輯無侵入。
- 可配置化接入。
從上線至今,RCS 幫助團隊及時檢測到了多次數(shù)據(jù)問題,可以將原因歸納為以下幾個方面:
- 代碼邏輯 Bug:包括冪等處理、并發(fā)問題、業(yè)務邏輯錯誤等。
- 系統(tǒng)運行環(huán)境:DB 異常、網(wǎng)絡抖動、MQ 異常等。
Fig3. Types of spotted bugs
本節(jié)主要介紹 RCS 的實現(xiàn),包括系統(tǒng)架構和核對流程、核對性能優(yōu)化、消息通知機制等。
2.1 系統(tǒng)架構與核對流程
在系統(tǒng)設計上,我們將 RCS 分為了三層:
- 變更數(shù)據(jù)獲?。―ata Fetching Layer)
- 數(shù)據(jù)核對(Data Checking Layer)
- 核對結果處理(Result Handling Layer)
Fig4. System Layers
2.1.1 變更數(shù)據(jù)獲取
實時核對,顧名思義需要著重關注“實時”和“核對”兩個要點。Data Fetching Layer 負責達成實時的目標,通過對不同 CDC(Change Data Capture,變更數(shù)據(jù)抓?。┓桨傅恼{研,我們使用了 Log-Based 的方案來提供時效性保障。
擴展閱讀
CDC 模式用于感知數(shù)據(jù)變更,主要可以分為以下 4 類:
- Timestamps,基于 update_time 或類似字段進行查詢來獲取變更數(shù)據(jù)。
- Table Differencing,獲取完整數(shù)據(jù)快照進行比對。
- Triggers,為 DDL、DML 設置 Trigger,將變更內容用額外的操作記錄至數(shù)據(jù)庫。
- Log-Based,典型例子為利用 MySQL binlog 和 MongoDB oplog。
其中,Timestamps 方案和 Table Differencing 均由定時任務驅動,時效性較弱。Timestamps 方案無法感知被刪除的數(shù)據(jù),使用時需要由軟刪除代替;Table Differencing 方案彌補了這個缺點,但是多次獲取完整數(shù)據(jù)會讓整套方案顯得非常笨重。
Triggers 方案和 Log-Based 方案獲取到的均為數(shù)據(jù)變更而非數(shù)據(jù)快照,但 Triggers 感知后以特定的語句將其記錄下來,本質上是一次寫操作,仍給數(shù)據(jù)庫帶來了額外的負擔。
當 MySQL 產(chǎn)生數(shù)據(jù)變更時,高可用的 binlog 同步組件會獲取到對應 binlog,并將其投遞至 Kafka 中,以此獲取變更數(shù)據(jù)的數(shù)據(jù)值用于核對。
Fig5. Data Fetching Layer
在實際使用中,需要核對的數(shù)據(jù)可能并非都存在于 MySQL 中,例如我們也需要核對 MySQL 與 MongoDB 的數(shù)據(jù)、MySQL 與 Redis 的數(shù)據(jù)。為此,業(yè)務系統(tǒng)也可以通過自行投遞特定格式的 Kafka 消息來接入,從而保證接入的靈活性。
2.1.2 數(shù)據(jù)核對
Data Checking Layer 負責處理接收到的數(shù)據(jù)流,包括獲取特定的核對規(guī)則,接收到數(shù)據(jù)時進行暫存或比對。RCS 對 binlog 數(shù)據(jù)進行抽象,提煉了一套通用的可配置化的核對規(guī)則。用戶只需要填寫對應的規(guī)則,即可實現(xiàn)自助接入。規(guī)則定義示例如下:
Fig6. Config Example
不難想象,不同系統(tǒng)間數(shù)據(jù)的變更是有先后的,且變更的消息被 RCS 接收到也會有先后順序。因此,先抵達的數(shù)據(jù)需要被存儲下來作為后續(xù)比對的目標,后抵達的數(shù)據(jù)則按照規(guī)則與已有數(shù)據(jù)進行比對。
Fig7. Check Flow
為了便于描述,這里先定義幾個名稱:
- 數(shù)據(jù)上游:先到達 RCS 的數(shù)據(jù)為上游。
- 數(shù)據(jù)下游:后到達 RCS 的數(shù)據(jù)為下游。
- 核對項:某個數(shù)據(jù)核對需求,包括上游數(shù)據(jù)和下游數(shù)據(jù)。例如:System A 與 System B 核對用戶資金狀態(tài)的需求。
Fig8. Kafka Data Check Flow
以下面這一次核對為例,它需要判斷數(shù)據(jù)是否在 10 秒達成一致,整體的核對流程可以簡要描述為:
比對數(shù)據(jù)到達,進行核對,并刪除 Redis key;
比對數(shù)據(jù)未到達,判斷延遲隊列中的數(shù)據(jù)。
- (圖 8)核對項的上游數(shù)據(jù)到達,暫存 Redis 和延遲隊列。
- (圖 8)RCS 等待核對項的下游數(shù)據(jù):
- (圖 9)延遲隊列到達時間后,再次查詢在 Redis 中是否有對應數(shù)據(jù):
- 存在,則超過核對時間閾值,發(fā)送異常告警,刪除 Redis key;
- 不存在,則已核對。
Fig9. DelayQueue Check Flow
2.1.3 消息通知機制
RCS 的目標是及時發(fā)現(xiàn)數(shù)據(jù)不一致的問題,因此,在 Result Handling Layer 中接入了 Shopee 企業(yè) IM(SeaTalk)的機器人進行告警。未來告警接口也會進行開放,便于擴展和讓其它消息應用進行接入。
我們設計了四種消息通知機制:
- Mismatch Notice
- Aggregated Notice
- Recovery Notice
- Statistical Notice
Mismatch Notice 應對一般場景下的核對失敗,及時通知到對應的業(yè)務負責人,便于快速定位問題原因并修復數(shù)據(jù)。但當大量數(shù)據(jù)出現(xiàn)不一致時,Aggregated Notice 會取而代之,將告警進行聚合發(fā)送,避免影響到值班人員的正常閱讀。
RCS 也會將核對失敗的數(shù)據(jù)持久化,因而具備恢復感知的能力。當異常數(shù)據(jù)恢復時,Recovery Notice 會發(fā)送消息告知使用者何種不一致已經(jīng)恢復,間隔了多少時間。
最后,Statistical Notice 會向使用者報告常規(guī)的統(tǒng)計數(shù)據(jù),包括 DB 主從延遲、當日核對成功率等。
2.2 核對功能演進
系統(tǒng)上線至今,接入或自行部署使用 RCS 的團隊越來越多,對應的業(yè)務場景也各不相同,早期的核對規(guī)則難以滿足不同團隊的核對需求。在 2021 年末,Shopee Financial Products 研發(fā)團隊又對 Data Checking Layer 進行了一系列的擴展,目的是減少維護成本,以較為通用的方式支持不同團隊的使用。
2.2.1 等值 / 映射核對
在最早上線的版本中,RCS 系統(tǒng)包含了等值和狀態(tài)映射核對的功能,是針對組內實際面臨的場景設計的,滿足日常的使用需求。
核對系統(tǒng)主要處理的是上下游系統(tǒng)之間金額數(shù)值、狀態(tài)的變化,通常我們能獲取到的 binlog 核心字段示例和核對邏輯如下:
Fig10. Equivalence Check
假設先接收到 System A 的 binlog 消息,暫存 Redis,規(guī)定時間內也接收到了 System B 的 binlog 消息:
??loan_amount?
? 為 200,需要找到一條對應的 System A 的 binlog,且 ??order_amount?
? 需與之匹配;
??loan_status?
? 為 4,需要找到一條對應的 System A 的 binlog,且 ??order_status?
? 需為 2。
- 根據(jù) System B 這條 binlog 的特征,發(fā)現(xiàn)配置有兩條核對規(guī)則:
對于不同系統(tǒng)間產(chǎn)生的單條記錄變更的核對,等值和映射檢查能覆蓋到大部分場景。但是因為這兩種核對的邏輯都是固定下來的,所以業(yè)務方如果有不同的核對需要,則需要新的代碼邏輯實現(xiàn)。為此,研發(fā)團隊考慮 將核對邏輯交給使用方來描述 ,因而催生出了表達式核對的功能。
2.2.2 表達式核對
如果我們考慮以下的 binlog 示例,不同系統(tǒng)間的數(shù)據(jù)模型設計并不一致,字段非一一對應。
Fig11. Expression Check
System A 記錄了 訂單的金額為 100 ,而 System B 記錄了訂單的 已支付金額為 30,借貸金額為 70 ,需要核對的是 System A ??order_amount?
? 是否等于 System B ??paid_amount + loan_amount?
? ,原有的設計無法支持。
為此,我們引入了表達式求值的方案,當 binlog 抵達時, 使用方通過一個返回值為布爾類型的表達式來描述自己的核對邏輯 ,如:
??a.order_amount == b.loan_amount?
?
??a.order_status == 2 && b.loan_status == 4?
?
- 判斷 2.2.2 中求和場景: ?
?a.order_amount == b.paid_amount + b.loan_amount?
? - 兼容判斷 2.2.1 中場景:
在表達式核對方案下,兩個系統(tǒng)間的幾乎所有的單條數(shù)據(jù)核對場景都能進行覆蓋,且這種方案的好處在于研發(fā)團隊不用再費心思提供新的計算、映射、與或非邏輯實現(xiàn)的支持,大大減少了維護成本。
2.2.3 動態(tài)配置數(shù)據(jù)核對
在電商和金融的場景中,存在一些動態(tài)數(shù)據(jù),例如費率、活動優(yōu)惠折扣等,會隨著業(yè)務和運營計劃發(fā)生實時變化。這類數(shù)據(jù)通常存儲在配置表中,因此通過簡單的表達式無法進行定義,而不同業(yè)務系統(tǒng)中的配置表結構設計也不一樣,很難在核對系統(tǒng)代碼中進行聲明。
為了滿足這種場景,RCS 引入了對業(yè)務系統(tǒng) SQL 查詢的支持,當獲取到新的 binlog 時,檢查這條 binlog 滿足的核對規(guī)則,使用方在核對規(guī)則中會配置需要執(zhí)行的 SQL 語句,以及分庫分表規(guī)則,由核對系統(tǒng)執(zhí)行并得到比對的內容,再進行表達式核對:
- binlog 中獲取到當前訂單的費率 ?
?order_rate?
? 為 0.5。 - 根據(jù)配置信息執(zhí)行 ?
?SELECT?
? 語句查詢實時的費率 ??rate?
? 。 - 執(zhí)行表達式核對 ?
?a.order_rate == rate?
? 。
除此之外,RCS 也能支持 JSON 串核對,譬如 System A 需要核對 ??order_rate?
? ,但是存儲 ??order_rate?
? 信息是一個 JSON 串, ??rate_info = {"decimal_base":"10000", "order_rate":"0.5"}?
? 。可以在 RCS 的核對規(guī)則中,自定義 JSON 解析表達式,提取真實需要核對的字段。
3. 性能表現(xiàn)
RCS 系統(tǒng)的性能主要取決于 Data Fetching Layer 和 Data Checking Layer。
Data Fetching Layer 的性能代表實時獲取變更數(shù)據(jù)的能力,受 binlog 解析(CPU 密集型任務)及 Kafka 的消息持久化(I/O 密集型任務)影響。 業(yè)務團隊可根據(jù)需要選擇對應的硬件搭建 CDC 模塊 ,以我們使用場景為例,每秒可投遞的消息數(shù)量超過 20K 。
Data Checking Layer 則負責進行數(shù)據(jù)核對,為了測試 RCS 的性能極限,Data Fetching 采用 Kafka 發(fā)送源數(shù)據(jù),核對系統(tǒng)采用單機部署。測試結果表明, RCS 每秒可完成核對 10K+ 次 ,詳細數(shù)據(jù)如下:
Component | Machine |
Kafka | 3 * 48 Core 128 GB |
Redis | 3 * 48 Core 128 GB |
Real-time Checking System | 1 * 48 Core 128 GB |
Number of check entry | TPS | CPU Cost |
1 entry | 14.3K | 454% |
2 entries | 12.0K | 687% |
3 entries | 10.4K | 913% |
從壓測結果分析,RCS 的性能瓶頸主要取決于 Redis 集群的性能,單次核對耗時約為 0.5ms。當然,RCS 支持集群部署,做為 Kafka 的消費者,可以利用 Kafka consumer group 的 Rebanancing 機制,從而實現(xiàn)動態(tài)擴/縮容的機制。
4. 總結
Shopee Financial Products 團隊在 2021 年落地的 RCS 目前在多個產(chǎn)品線推廣和使用,主要解決傳統(tǒng) T+1 式離線數(shù)據(jù)核對延遲高、業(yè)務耦合緊密,且隨新業(yè)務上線還帶來額外的開發(fā)負擔的問題。
RCS 通過靈活的核對規(guī)則配置化、表達式場景覆蓋以及 Log-Based 的 CDC 方案,提供近實時的數(shù)據(jù)核對解決方案,最大程度地降低數(shù)據(jù)不一致導致的資金、信息安全等風險。我們也歡迎不同的用戶和團隊接入或部署使用,在后續(xù)的更新迭代中,RCS 會進一步提升核對的性能,以支撐業(yè)務量增長帶來的核對需求。
本文作者
Yizhong、Songtao,后端研發(fā)工程師。來自 Shopee Financial Products 團隊。
Jiekun,后端研發(fā)工程師,熱衷于分布式系統(tǒng) & Kubernetes。來自 Shopee Off-Platform Ads 團隊。