Kafka Connect如何實(shí)現(xiàn)同步RDS binlog數(shù)據(jù)?
1. 背景
在我們的業(yè)務(wù)開發(fā)中,往往會(huì)碰到下面這個(gè)場景:
- 業(yè)務(wù)更新數(shù)據(jù)寫到數(shù)據(jù)庫中
- 業(yè)務(wù)更新數(shù)據(jù)需要實(shí)時(shí)傳遞給下游依賴處理
所以傳統(tǒng)的處理架構(gòu)可能會(huì)這樣:

但這個(gè)架構(gòu)也存在著不少弊端:我們需要在項(xiàng)目中維護(hù)很多發(fā)送消息的代碼。新增或者更新消息都會(huì)帶來不少維護(hù)成本。所以,更好的處理方式應(yīng)該是直接將數(shù)據(jù)庫的數(shù)據(jù)接入到流式系統(tǒng)中,如下圖:

本文將演示如何在E-MapReduce上實(shí)現(xiàn)將RDS binlog實(shí)時(shí)同步到Kafka集群中。
2. 環(huán)境準(zhǔn)備
實(shí)驗(yàn)中使用VPC網(wǎng)絡(luò)環(huán)境,以下實(shí)例創(chuàng)建時(shí)默認(rèn)都是在VPC環(huán)境下。
2.1 準(zhǔn)備一個(gè)測試RDS數(shù)據(jù)庫
創(chuàng)建一個(gè)RDS實(shí)例,版本選擇5.7。這里不贅述如何創(chuàng)建RDS,詳細(xì)流程請(qǐng)參考RDS文檔。創(chuàng)建完如圖:

2.2 準(zhǔn)備一個(gè)Kafka集群
創(chuàng)建一個(gè)E-MapReduce Kafka集群,版本選擇EMR-3.11.0。需要注意,這里必須選擇EMR-3.11.0以上版本,否則不會(huì)默認(rèn)安裝啟動(dòng)Kafka Connect服務(wù)。詳細(xì)創(chuàng)建流程請(qǐng)參考E-MapReduce文檔。創(chuàng)建完如圖:

注意:RDS實(shí)例和E-MapReduce Kafka集群***在同一個(gè)VPC中,否則需要打通兩個(gè)VPC之間的網(wǎng)絡(luò)。
3. Kafka Connect
3.1 Connector
Kafka Connect是一個(gè)用于Kafka和其他數(shù)據(jù)系統(tǒng)之間進(jìn)行數(shù)據(jù)傳輸?shù)墓ぞ?,它可以?shí)現(xiàn)基于Kafka的數(shù)據(jù)管道,打通上下游數(shù)據(jù)源。我們需要做的就是在Kafka Connect服務(wù)上運(yùn)行一個(gè)Connector,這個(gè)Connector是具體實(shí)現(xiàn)如何從/向數(shù)據(jù)源中讀/寫數(shù)據(jù)。Confluent提供了很多Connector實(shí)現(xiàn),你可以在這里下載。不過今天我們使用Debezium提供的一個(gè)MySQL Connector插件,下載地址。
下載這個(gè)插件,并將解壓出來的jar包全部拷貝到kafka lib目錄下。注意:需要將這些jar包拷貝到Kafka集群所有機(jī)器上。
在Kafka集群的服務(wù)列表中重啟Kafka Connect組件。

3.2 啟動(dòng)Connector
在創(chuàng)建connector前,我們需要做一番配置,這里羅列一些Debezium MySQL Connector的主要配置項(xiàng):

登錄到Kafka集群,配置并創(chuàng)建一個(gè)connector,命令如下:

這時(shí),我們可以看到一個(gè)創(chuàng)建好的connector,如圖:

3.3 注意事項(xiàng)
server_id是多少?:你可以在RDS執(zhí)行"SELECT @@server_id;"查到。
創(chuàng)建connector時(shí)可能會(huì)出現(xiàn)連接失敗,請(qǐng)確保RDS的白名單已經(jīng)授權(quán)了Kafka集群機(jī)器訪問。
4 測試
4.1 創(chuàng)建一張表

一會(huì)之后,Kafka集群中會(huì)自動(dòng)創(chuàng)建一個(gè)對(duì)應(yīng)的topic

插入幾條數(shù)據(jù)

查看binlog數(shù)據(jù)
查看fulfillment.mugen.students這個(gè)topic,是否有剛剛新插入的數(shù)據(jù)

結(jié)果如圖所示:

5. 資料
- confluent官方文檔 https://docs.confluent.io
- debezium官網(wǎng) http://debezium.io/
- kafka官方文檔 http://kafka.apache.org/documentation.html