超大規(guī)模系統(tǒng)下,MySQL到Redis的數(shù)據(jù)同步也不難吧?
一、緩存穿透
超大規(guī)模系統(tǒng)的不能承受之痛
如何構(gòu)建Redis集群?由于集群可以水平擴(kuò)容,因此只要集群足夠大,理論上支持海量并發(fā)就不是問題。但是,如果并發(fā)請求數(shù)量的基數(shù)過大,那么即使只有很小比率的請求穿透緩存,直接訪問數(shù)據(jù)庫的請求其絕對數(shù)量也仍然不小。再加上大促期間的流量峰值,還是會存在因?yàn)榫彺娲┩付l(fā)系統(tǒng)雪崩的風(fēng)險(xiǎn)。
那么,這個(gè)問題該如何解決呢?其實(shí)方法并不難想到,不讓請求穿透緩存就行了。如今內(nèi)存存儲的價(jià)格一路走低,只要能買得起足夠多的服務(wù)器,Redis集群的容量就是無限的。 我們可以把全量數(shù)據(jù)都放在Redis集群中,處理讀請求的時(shí)候,只需要讀取Redis,而不用訪問數(shù)據(jù)庫,這樣就完全沒有“緩存穿透”的風(fēng)險(xiǎn)了。 實(shí)際上,很多大型互聯(lián)網(wǎng)公司都在使用這種方法。
不過,在Redis中緩存全量數(shù)據(jù),又會引發(fā)一個(gè)新的問題。那就是,緩存中的數(shù)據(jù)應(yīng)該如何更新呢?因?yàn)槲覀內(nèi)∠司彺娲┩傅臋C(jī)制,在這種情況下,如果能從緩存中直接讀到數(shù)據(jù),則可以直接返回,如果沒能讀到數(shù)據(jù),那就只能返回錯(cuò)誤了! 所以,當(dāng)系統(tǒng)更新數(shù)據(jù)庫的數(shù)據(jù)之后,必須及時(shí)更新緩存。
至此,我們又要面對一個(gè)老問題:如何保證Redis中的數(shù)據(jù)與數(shù)據(jù)庫中的數(shù)據(jù)同步更新?可以用分布式事務(wù)來解決數(shù)據(jù)一致性的問題,但是這些方法都不太適合用來更新緩存。原因是,分布式事務(wù)對數(shù)據(jù)更新服務(wù)有很強(qiáng)的侵入性。這里仍以下單服務(wù)為例來說明,如果為了更新緩存,增加一個(gè)分布式事務(wù),那么無論我們使用哪種分布式事務(wù),下單服務(wù)的性能或多或少都會受到影響。還有一個(gè)問題是,如果Redis本身出現(xiàn)了故障,寫入數(shù)據(jù)失敗,則還會導(dǎo)致下單失敗的問題,相當(dāng)于是降低了下單服務(wù)的性能和可用性,這樣肯定是不行的。
對于像訂單服務(wù)之類的核心業(yè)務(wù),一個(gè)可行的方法是,啟動一個(gè)更新訂單緩存的服務(wù),接收訂單變更的消息隊(duì)列(Message Queue,MQ)中的消息,然后更新Redis中緩存的訂單數(shù)據(jù)。使用訂單變更消息更新緩存的結(jié)構(gòu)如圖1所示。因?yàn)閷τ谶@類核心的業(yè)務(wù)數(shù)據(jù),使用方通常會非常多,服務(wù)本來就需要向外發(fā)送消息,增加一個(gè)消費(fèi)訂閱,基本上不會增加額外的開發(fā)成本,也不需要對訂單服務(wù)本身做出任何更改。
圖1使用訂單變更消息更新緩存
對于上述方法,我們唯一需要擔(dān)心的問題是,如果消息丟失了,應(yīng)該怎么辦?因?yàn)楝F(xiàn)在消息是緩存數(shù)據(jù)的唯一來源,一旦出現(xiàn)消息丟失的問題,緩存里缺失的那條數(shù)據(jù)就會永遠(yuǎn)也無法補(bǔ)上,所以,必須保證整個(gè)消息鏈條的可靠性。不過,好在現(xiàn)在的MQ集群(比如Kafka或RocketMQ),都擁有高可用性和高可靠性的保證機(jī)制,只要能事先正確配置好,就可以滿足數(shù)據(jù)的可靠性要求。
像訂單服務(wù)這樣,由于本來就有現(xiàn)成的數(shù)據(jù)變更消息可以訂閱,因此像這樣更新緩存也是一個(gè)不錯(cuò)的選擇,因?yàn)檫@種方式實(shí)現(xiàn)起來很簡單,對系統(tǒng)的其他模塊也完全沒有侵入。
二、使用Binlog實(shí)時(shí)更新Redis緩存
如果我們要緩存的數(shù)據(jù),原本就沒有一份數(shù)據(jù)更新的消息隊(duì)列可以訂閱,又該怎么辦呢?下面就來介紹很多大型互聯(lián)網(wǎng)企業(yè)所采用的,也是更通用的解決方案。
數(shù)據(jù)更新服務(wù)只負(fù)責(zé)處理業(yè)務(wù)邏輯,更新MySQL,完全不用考慮如何更新緩存。 負(fù)責(zé)更新緩存的服務(wù),把自己偽裝成一個(gè)MySQL的從節(jié)點(diǎn),從MySQL接收并解析Binlog之后,就可以得到實(shí)時(shí)的數(shù)據(jù)變更信息,然后該服務(wù)就會根據(jù)這個(gè)變更信息去更新Redis緩存。訂閱Binlog更新緩存的結(jié)構(gòu)如圖2所示。
圖2訂閱Binlog更新緩存的結(jié)構(gòu)
訂閱Binlog更新緩存的方案,相較于上文中接收消息更新Redis緩存的方案,兩者的實(shí)現(xiàn)思路其實(shí)是一樣的,都是 異步實(shí)時(shí)訂閱數(shù)據(jù)變更信息以更新Redis緩存。 只不過,直接讀取Binlog這種方式,通用性更強(qiáng)。該方式不會要求訂單服務(wù)再發(fā)送訂單消息,訂單更新服務(wù)也不用額外考慮如何解決“消息發(fā)送失敗了該怎么辦?”這種數(shù)據(jù)一致性問題。
除此之外,由于在整個(gè)緩存更新鏈路上,減少了一個(gè)收發(fā)消息隊(duì)列的環(huán)節(jié),從MySQL更新到Redis更新的時(shí)延變得更短,出現(xiàn)故障的可能性也更低,因此很多大型互聯(lián)網(wǎng)企業(yè)更青睞于采用這種方案。
訂閱Binlog更新緩存的方案唯一的缺點(diǎn)是,實(shí)現(xiàn)訂單緩存更新服務(wù)比較復(fù)雜,該方案畢竟不像接收消息那樣,收到的直接就是訂單數(shù)據(jù),解析Binlog還是挺麻煩的。
很多開源的項(xiàng)目都提供了訂閱和解析MySQL Binlog的功能,下面就以比較常用的開源項(xiàng)目Canal為例來演示,如何實(shí)時(shí)接收Binlog更新Redis緩存。
Canal通過模擬MySQL主從復(fù)制的交互協(xié)議,把自己偽裝成一個(gè)MySQL的從節(jié)點(diǎn),向MySQL主節(jié)點(diǎn)發(fā)送dump請求。MySQL收到請求后,就會向Canal開始推送Binlog,Canal解析Binlog字節(jié)流之后,將其轉(zhuǎn)換為便于讀取的結(jié)構(gòu)化數(shù)據(jù),供下游程序訂閱使用。圖3展示了如何使用Canal訂閱Binlog更新Redis中的訂單緩存。
圖3使用Canal訂閱Binlog更新緩存
在這個(gè)示例中,MySQL和Redis都在本地的默認(rèn)端口上運(yùn)行,MySQL的端口為3306,Redis的端口為6379。為了便于大家操作,下面還是以第5章中提到的賬戶余額表account_balance作為演示數(shù)據(jù)。
首先,下載并在本地解壓Canal當(dāng)前最新的1.1.4版本,操作命令如下:
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
tar zvfx canal.deployer-1.1.4.tar.gz
然后,配置MySQL,我們需要在MySQL的配置文件中開啟Binlog,并將Binlog的格式設(shè)置為ROW,配置項(xiàng)如下:
[mysqld]
log-bin=mysql-bin # 開啟Binlog。
binlog-format=ROW # 將Binlog格式設(shè)置為ROW。
server_id=1 # 配置一個(gè)ServerID。
接下來,為Canal新建一個(gè)專門的MySQL用戶并授權(quán),以確保這個(gè)用戶有復(fù)制Binlog的權(quán)限,具體操作的SQL命令如下:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
然后,重啟MySQL,以確保所有的配置都能生效。重啟后再檢查一下當(dāng)前的Binlog文件和位置,SQL命令和輸出結(jié)果具體如下:
mysql> show master status;
+-------------+--------+------------+----------------+-----------------+
| File |Position|Binlog_Do_DB|Binlog_Ignore_DB|Executed_Gtid_Set|
+-------------+--------+------------+----------------+-----------------+
|binlog.000009| 155| | | |
+-------------+--------+------------+----------------+-----------------+
記錄下File和Position兩列的值,然后再來配置Canal。編輯Canal的實(shí)例配置文件canal/conf/example/instance.properties,以便讓Canal連接到我們的MySQL上,具體配置如下:
canal.instance.gtidon=false
# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=binlog.000009
canal.instance.master.position=155
canal.instance.master.timestamp=
canal.instance.master.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName=test
# table regex
canal.instance.filter.regex=.*\\..*
這個(gè)配置文件需要配置MySQL的連接地址、庫名、用戶名和密碼,除此之外,還要配置canal.instance.master.journal.name和canal.instance.master.position這兩個(gè)屬性,取值就是剛剛記錄的File和Position兩列。然后就可以啟動Canal服務(wù)了,命令如下:
canal/bin/startup.sh
啟動之后再查看一下日志文件canal/logs/example/example.log,如果日志中沒有報(bào)錯(cuò)信息,就說明Canal服務(wù)已啟動成功并連接到我們的MySQL上了。
Canal服務(wù)啟動之后,會開啟一個(gè)端口(11111)等待客戶端連接,客戶端連接上Canal服務(wù)之后,就可以從Canal服務(wù)拉?。≒ULL)數(shù)據(jù)了,每拉取一批數(shù)據(jù),正確寫入Redis之后,需要向Canal服務(wù)返回處理成功的響應(yīng)。如果發(fā)生客戶端程序宕機(jī),或者處理失敗等異常情況,Canal服務(wù)沒有收到處理成功的響應(yīng),那么下次客戶端來拉取的就還是同一批數(shù)據(jù),這樣就可以保證讀到的Binlog順序不會亂,并且不會丟失數(shù)據(jù)。
接下來,我們來開發(fā)一個(gè)賬戶余額緩存的更新程序,以下代碼都是用Java語言編寫的:
while (true) {
Message message = connector.getWithoutAck(batchSize); // 獲取指定數(shù)量的數(shù)據(jù)。
long batchId = message.getId();
try {
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
processEntries(message.getEntries(), jedis);
}
connector.ack(batchId); // 提交確認(rèn)。
} catch (Throwable t) {
connector.rollback(batchId); // 處理失敗,回滾數(shù)據(jù)。
}
}
這個(gè)程序的邏輯并不復(fù)雜,程序啟動并連接到Canal服務(wù)后,就不停地拉取數(shù)據(jù),如果沒有數(shù)據(jù)就休眠一會兒,如果有數(shù)據(jù)就調(diào)用processEntries方法處理并更新緩存。每批數(shù)據(jù)更新成功之后,都會調(diào)用ack方法向Canal服務(wù)返回成功響應(yīng),如果失敗則拋出異常之后再回滾。下面是processEntries方法的主要代碼:
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) { // 刪除。
jedis.del(row2Key("user_id", rowData.getBeforeColumnsList()));
} else if (eventType == CanalEntry.EventType.INSERT) { // 插入。
jedis.set(row2Key("user_id", rowData.getAfterColumnsList()), row2Value(rowData.getAfterColumnsList()));
} else { // 更新。
jedis.set(row2Key("user_id", rowData.getAfterColumnsList()), row2Value(rowData.getAfterColumnsList()));
}
}
上述代碼會根據(jù)事件類型分別進(jìn)行處理,如果MySQL中的數(shù)據(jù)刪除了,就刪除Redis中對應(yīng)的數(shù)據(jù)。如果是更新和插入操作,就調(diào)用Redis的SET命令來寫入數(shù)據(jù)。
下面就來啟動這個(gè)賬戶緩存更新服務(wù)以進(jìn)行驗(yàn)證。在賬戶余額表中插入一條記錄,SQL命令如下:
mysql> insert into account_balance values (888, 100, NOW(), 999);
然后,我們再來看一下Redis緩存,操作命令和輸出結(jié)果如下:
127.0.0.1:6379> get 888
"{\"log_id\":\"999\",\"balance\":\"100\",\"user_id\":\"888\",\
"timestamp\":\"2020-03-08 16:18:10\"}"
從上述輸出結(jié)果中我們可以看到,數(shù)據(jù)已經(jīng)自動同步到Redis中了。GitHub上可以下載該示例的完整代碼,鏈接地址是:https://github.com/liyue2008/canal-to-redis-example。
三、總結(jié)
在處理超大規(guī)模并發(fā)的場景時(shí),由于并發(fā)請求的數(shù)量非常大,即使只有少量的緩存穿透,也有可能卡死數(shù)據(jù)庫引發(fā)雪崩效應(yīng)。對于這種情況,我們可以通過Redis緩存全量數(shù)據(jù)來徹底避免緩存穿透的問題。對于緩存數(shù)據(jù)更新的方法,我們可以通過訂閱數(shù)據(jù)更新的消息隊(duì)列來異步更新緩存,更通用的方法是,把緩存更新服務(wù)偽裝成一個(gè)MySQL從節(jié)點(diǎn),訂閱MySQL的Binlog,通過Binlog來更新Redis緩存。
需要特別注意的是,無論是通過消息隊(duì)列還是Canal來異步更新緩存,系統(tǒng)對整個(gè)更新服務(wù)的數(shù)據(jù)可靠性和實(shí)時(shí)性要求都比較高,數(shù)據(jù)丟失或者更新慢了,都會造成Redis中的數(shù)據(jù)與MySQL中的數(shù)據(jù)不同步的問題。在把這套方案應(yīng)用到生產(chǎn)環(huán)境之前,我們需要考慮一旦出現(xiàn)不同步的問題,應(yīng)該采取什么樣的降級或補(bǔ)償方案。
作者介紹
李玥, 美團(tuán)基礎(chǔ)技術(shù)部高級技術(shù)專家,極客時(shí)間《后端存儲實(shí)戰(zhàn)課》《消息隊(duì)列高手課》等專欄作者。曾在當(dāng)當(dāng)網(wǎng)、京東零售等公司任職。從事互聯(lián)網(wǎng)電商行業(yè)基礎(chǔ)架構(gòu)領(lǐng)域的架構(gòu)設(shè)計(jì)和研發(fā)工作多年,曾多次參與雙十一和618電商大促。專注于分布式存儲、云原生架構(gòu)下的服務(wù)治理、分布式消息和實(shí)時(shí)計(jì)算等技術(shù)領(lǐng)域,致力于推進(jìn)基礎(chǔ)架構(gòu)技術(shù)的創(chuàng)新與開源。
本文摘編自《電商存儲系統(tǒng)實(shí)戰(zhàn):架構(gòu)設(shè)計(jì)與海量數(shù)據(jù)處理》,經(jīng)出版方授權(quán)發(fā)布。