數(shù)據(jù)異構(gòu)就該這么玩兒
互聯(lián)網(wǎng)背景下的數(shù)據(jù)同步需求
在當(dāng)今互聯(lián)網(wǎng)行業(yè),尤其是現(xiàn)在分布式、微服務(wù)開發(fā)環(huán)境下,為了提高搜索效率,以及搜索的精準(zhǔn)度,會(huì)大量使用Redis、Memcached等NoSQL數(shù)據(jù)庫,也會(huì)使用大量的Solr、Elasticsearch等全文檢索服務(wù)。那么,這個(gè)時(shí)候,就會(huì)有一個(gè)問題需要我們來思考和解決:那就是數(shù)據(jù)同步的問題!如何將實(shí)時(shí)變化的數(shù)據(jù)庫中的數(shù)據(jù)同步到Redis/Memcached或者Solr/Elasticsearch中呢?
例如,我們?cè)诜植际江h(huán)境下向數(shù)據(jù)庫中不斷的寫入數(shù)據(jù),而我們讀數(shù)據(jù)可能需要從Redis、Memcached或者Elasticsearch、Solr等服務(wù)中讀取。那么,數(shù)據(jù)庫與各個(gè)服務(wù)中數(shù)據(jù)的實(shí)時(shí)同步問題,成為了我們亟待解決的問題。
試想,由于業(yè)務(wù)需要,我們引入了Redis、Memcached或者Elasticsearch、Solr等服務(wù)。使得我們的應(yīng)用程序可能會(huì)從不同的服務(wù)中讀取數(shù)據(jù),如下圖所示。
圖片
本質(zhì)上講,無論我們引入了何種服務(wù)或者中間件,數(shù)據(jù)最終都是從我們的MySQL數(shù)據(jù)庫中讀取出來的。那么,問題來了,如何將MySQL中的數(shù)據(jù)實(shí)時(shí)同步到其他的服務(wù)或者中間件呢?
注意:為了更好的說明問題,后面的內(nèi)容以MySQL數(shù)據(jù)庫中的數(shù)據(jù)同步到Solr索引庫為例進(jìn)行說明。
數(shù)據(jù)同步解決方案
1.在業(yè)務(wù)代碼中同步
在增加、修改、刪除之后,執(zhí)行操作Solr索引庫的邏輯代碼。例如下面的代碼片段。
public ResponseResult updateStatus(Long[] ids, String status){
try{
goodsService.updateStatus(ids, status);
if("status_success".equals(status)){
List<TbItem> itemList = goodsService.getItemList(ids, status);
itemSearchService.importList(itemList);
return new ResponseResult(true, "修改狀態(tài)成功")
}
}catch(Exception e){
return new ResponseResult(false, "修改狀態(tài)失敗");
}
}
優(yōu)點(diǎn):
操作簡便。
缺點(diǎn):
業(yè)務(wù)耦合度高。
執(zhí)行效率變低。
2.定時(shí)任務(wù)同步
在數(shù)據(jù)庫中執(zhí)行完增加、修改、刪除操作后,通過定時(shí)任務(wù)定時(shí)的將數(shù)據(jù)庫的數(shù)據(jù)同步到Solr索引庫中。
定時(shí)任務(wù)技術(shù)有:SpringTask,Quartz。
哈哈,還有我開源的mykit-delay框架,開源地址為:https://github.com/sunshinelyz/mykit-delay。
這里執(zhí)行定時(shí)任務(wù)時(shí),需要注意的一個(gè)技巧是:第一次執(zhí)行定時(shí)任務(wù)時(shí),從MySQL數(shù)據(jù)庫中以時(shí)間字段進(jìn)行倒序排列查詢相應(yīng)的數(shù)據(jù),并記錄當(dāng)前查詢數(shù)據(jù)的時(shí)間字段的最大值,以后每次執(zhí)行定時(shí)任務(wù)查詢數(shù)據(jù)的時(shí)候,只要按時(shí)間字段倒序查詢數(shù)據(jù)表中的時(shí)間字段大于上次記錄的時(shí)間值的數(shù)據(jù),并且記錄本次任務(wù)查詢出的時(shí)間字段的最大值即可,從而不需要再次查詢數(shù)據(jù)表中的所有數(shù)據(jù)。
注意:這里所說的時(shí)間字段指的是標(biāo)識(shí)數(shù)據(jù)更新的時(shí)間字段,也就是說,使用定時(shí)任務(wù)同步數(shù)據(jù)時(shí),為了避免每次執(zhí)行任務(wù)都會(huì)進(jìn)行全表掃描,最好是在數(shù)據(jù)表中增加一個(gè)更新記錄的時(shí)間字段。
優(yōu)點(diǎn):
同步Solr索引庫的操作與業(yè)務(wù)代碼完全解耦。
缺點(diǎn):
數(shù)據(jù)的實(shí)時(shí)性并不高。
3.通過MQ實(shí)現(xiàn)同步
在數(shù)據(jù)庫中執(zhí)行完增加、修改、刪除操作后,向MQ中發(fā)送一條消息,此時(shí),同步程序作為MQ中的消費(fèi)者,從消息隊(duì)列中獲取消息,然后執(zhí)行同步Solr索引庫的邏輯。
我們可以使用下圖來簡單的標(biāo)識(shí)通過MQ實(shí)現(xiàn)數(shù)據(jù)同步的過程。
圖片
我們可以使用如下代碼實(shí)現(xiàn)這個(gè)過程。
public ResponseResult updateStatus(Long[] ids, String status){
try{
goodsService.updateStatus(ids, status);
if("status_success".equals(status)){
List<TbItem> itemList = goodsService.getItemList(ids, status);
final String jsonString = JSON.toJSONString(itemList);
jmsTemplate.send(queueSolr, new MessageCreator(){
@Override
public Message createMessage(Session session) throws JMSException{
return session.createTextMessage(jsonString);
}
});
}
return new ResponseResult(true, "修改狀態(tài)成功");
}catch(Exception e){
return new ResponseResult(false, "修改狀態(tài)失敗");
}
}
優(yōu)點(diǎn):
業(yè)務(wù)代碼解耦,并且能夠做到準(zhǔn)實(shí)時(shí)。
缺點(diǎn):
需要在業(yè)務(wù)代碼中加入發(fā)送消息到MQ的代碼,數(shù)據(jù)調(diào)用接口耦合。
4.通過Canal實(shí)現(xiàn)實(shí)時(shí)同步
Canal是阿里巴巴開源的一款數(shù)據(jù)庫日志增量解析組件,通過Canal來解析數(shù)據(jù)庫的日志信息,來檢測數(shù)據(jù)庫中表結(jié)構(gòu)和數(shù)據(jù)的變化,從而更新Solr索引庫。
使用Canal可以做到業(yè)務(wù)代碼完全解耦,API完全解耦,可以做到準(zhǔn)實(shí)時(shí)。
Canal簡介
阿里巴巴MySQL數(shù)據(jù)庫binlog增量訂閱與消費(fèi)組件,基于數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱與消費(fèi),目前主要支持了MySQL。
Canal開源地址:https://github.com/alibaba/canal。
Canal工作原理
MySQL主從復(fù)制的實(shí)現(xiàn)
圖片
從上圖可以看出,主從復(fù)制主要分成三步:
- Master節(jié)點(diǎn)將數(shù)據(jù)的改變記錄到二進(jìn)制日志(binary log)中(這些記錄叫做二進(jìn)制日志事件,binary log events,可以通過show binlog events進(jìn)行查看)。
- Slave節(jié)點(diǎn)將Master節(jié)點(diǎn)的二進(jìn)制日志事件(binary log events)拷貝到它的中繼日志(relay log)。
- Slave節(jié)點(diǎn)重做中繼日志中的事件將改變反映到自己本身的數(shù)據(jù)庫中。
Canal內(nèi)部原理
首先,我們來看下Canal的原理圖,如下所示。
圖片
原理大致描述如下:
- Canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL Slave ,向 MySQL Master 發(fā)送dump 協(xié)議
- MySQL Master 收到 dump 請(qǐng)求,開始推送 binary log 給 Slave (即 Canal )
- Canal 解析 binary log 對(duì)象(原始為 byte 流)
Canal內(nèi)部結(jié)構(gòu)
圖片
說明如下:
- Server:代表一個(gè)Canal運(yùn)行實(shí)例,對(duì)應(yīng)一個(gè)JVM進(jìn)程。
- Instance:對(duì)應(yīng)一個(gè)數(shù)據(jù)隊(duì)列(1個(gè)Server對(duì)應(yīng)1個(gè)或者多個(gè)Instance)。
接下來,我們?cè)賮砜聪翴nstance下的子模塊,如下所示。
圖片
- EventParser:數(shù)據(jù)源接入,模擬Slave協(xié)議和Master節(jié)點(diǎn)進(jìn)行交互,協(xié)議解析。
- EventSink:EventParser和EventStore的連接器,對(duì)數(shù)據(jù)進(jìn)行過濾、加工、歸并和分發(fā)等處理。
- EventSore:數(shù)據(jù)存儲(chǔ)。
- MetaManager:增量訂閱和消費(fèi)信息管理。
Canal環(huán)境準(zhǔn)備
設(shè)置MySQL遠(yuǎn)程訪問
grant all privileges on *.* to 'root'@'%' identified by '123456';
flush privileges;
MySQL配置
注意:這里的MySQL是基于5.7版本進(jìn)行說明的。
Canal的原理基于MySQL binlog技術(shù),所以,要想使用Canal就要開啟MySQL的binlog寫入功能,建議配置binlog的模式為row。
可以在MySQL命令行輸入如下命令來查看binlog的模式。
SHOW VARIABLES LIKE 'binlog_format';
執(zhí)行效果如下所示。
圖片
可以看到,在MySQL中默認(rèn)的binlog格式為STATEMENT,這里我們需要將STATEMENT修改為ROW。修改/etc/my.cnf文件。
vim /etc/my.cnf
在[mysqld]下面新增如下三項(xiàng)配置。
log-bin=mysql-bin #開啟MySQL二進(jìn)制日志
binlog_format=ROW #將二進(jìn)制日志的格式設(shè)置為ROW
server_id=1 #server_id需要唯一,不能與Canal的slaveId重復(fù)
修改完my.cnf文件后,需要重啟MySQL服務(wù)。
service mysqld restart
接下來,我們?cè)俅尾榭碽inlog模式。
SHOW VARIABLES LIKE 'binlog_format';
圖片
可以看到,此時(shí),MySQL的binlog模式已經(jīng)被設(shè)置為ROW了。
MySQL創(chuàng)建用戶授權(quán)
Canal的原理是模式自己為MySQL Slave,所以一定要設(shè)置MySQL Slave的相關(guān)權(quán)限。這里,需要?jiǎng)?chuàng)建一個(gè)主從同步的賬戶,并且賦予這個(gè)賬戶相關(guān)的權(quán)限。
CREATE USER canal@'localhost' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'localhost';
FLUSH PRIVILEGES;
圖片
img
Canal部署安裝
下載Canal
這里,我們以Canal 1.1.1版本進(jìn)行說明,小伙伴們可以到鏈接 https://github.com/alibaba/canal/releases/tag/canal-1.1.1 下載Canal 1.1.1版本。
圖片
img
上傳解壓
將下載好的Canal安裝包,上傳到服務(wù)器,并執(zhí)行如下命令進(jìn)行解壓
mkdir -p /usr/local/canal
tar -zxvf canal.deployer-1.1.1.tar.gz -C /usr/local/canal/
解壓后的目錄如下所示。
圖片
各目錄的說明如下:
- bin:存儲(chǔ)可執(zhí)行腳本。
- conf:存放配置文件。
- lib:存放其他依賴或者第三方庫。
- logs:存放的是日志文件。
修改配置文件
在Canal的conf目錄下有一個(gè)canal.properties文件,這個(gè)文件中配置的是Canal Server相關(guān)的配置,在這個(gè)文件中有如下一行配置。
canal.destinatinotallow=example
這里的example就相當(dāng)于Canal的一個(gè)Instance,可以在這里配置多個(gè)Instance,多個(gè)Instance之間以逗號(hào)分隔即可。同時(shí),這里的example也對(duì)應(yīng)著Canal的conf目錄下的一個(gè)文件夾。也就是說,Canal中的每個(gè)Instance實(shí)例都對(duì)應(yīng)著conf目錄下的一個(gè)子目錄。
接下來,我們需要修改Canal的conf目錄下的example目錄的一個(gè)配置文件instance.properties。
vim instance.properties
修改如下配置項(xiàng)。
#################################################################
## canal slaveId,注意:不要與MySQL的server_id重復(fù)
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的數(shù)據(jù)庫信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的數(shù)據(jù)庫信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =canaldb
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = canaldb\\..*
#################################################################
選項(xiàng)含義:
- canal.instance.mysql.slaveId : mysql集群配置中的serverId概念,需要保證和當(dāng)前mysql集群中id唯一;
- canal.instance.master.address: mysql主庫鏈接地址;
- canal.instance.dbUsername : mysql數(shù)據(jù)庫帳號(hào);
- canal.instance.dbPassword : mysql數(shù)據(jù)庫密碼;
- canal.instance.defaultDatabaseName : mysql鏈接時(shí)默認(rèn)數(shù)據(jù)庫;
- canal.instance.connectionCharset : mysql 數(shù)據(jù)解析編碼;
- canal.instance.filter.regex : mysql 數(shù)據(jù)解析關(guān)注的表,Perl正則表達(dá)式.
啟動(dòng)Canal
配置完Canal后,就可以啟動(dòng)Canal了。進(jìn)入到Canal的bin目錄下,輸入如下命令啟動(dòng)Canal。
./startup.sh
測試Canal
導(dǎo)入并修改源碼
這里,我們使用Canal的源碼進(jìn)行測試,下載Canal的源碼后,將其導(dǎo)入到IDEA中。
圖片
接下來,我們找到example下的SimpleCanalClientTest類進(jìn)行測試。這個(gè)類的源碼如下所示。
package com.alibaba.otter.canal.example;
import java.net.InetSocketAddress;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
/**
* 單機(jī)模式的測試?yán)? *
* @author jianghang 2013-4-15 下午04:19:20
* @version 1.0.4
*/
public class SimpleCanalClientTest extends AbstractCanalClientTest {
public SimpleCanalClientTest(String destination){
super(destination);
}
public static void main(String args[]) {
// 根據(jù)ip,直接創(chuàng)建鏈接,無HA的功能
String destination = "example";
String ip = AddressUtils.getHostIp();
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(ip, 11111),
destination,
"canal",
"canal");
final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination);
clientTest.setConnector(connector);
clientTest.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try {
logger.info("## stop the canal client");
clientTest.stop();
} catch (Throwable e) {
logger.warn("##something goes wrong when stopping canal:", e);
} finally {
logger.info("## canal client is down.");
}
}
});
}
}
可以看到,這個(gè)類中,使用的destination為example。在這個(gè)類中,我們只需要將IP地址修改為Canal Server的IP即可。
具體為:將如下一行代碼。
String ip = AddressUtils.getHostIp();
修改為:
String ip = "192.168.175.100"
由于我們?cè)谂渲肅anal時(shí),沒有指定用戶名和密碼,所以,我們還需要將如下代碼。
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(ip, 11111),
destination,
"canal",
"canal");
修改為:
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(ip, 11111),
destination,
"",
"");
修改完成后,運(yùn)行main方法啟動(dòng)程序。
測試數(shù)據(jù)變更
接下來,在MySQL中創(chuàng)建一個(gè)canaldb數(shù)據(jù)庫。
create database canaldb;
此時(shí)會(huì)在IDEA的命令行輸出相關(guān)的日志信息。
****************************************************
* Batch Id: [7] ,count : [3] , memsize : [149] , Time : 2025-02-02 23:25:35
* Start : [mysql-bin.000007:6180:1540286735000(2025-02-02 23:25:35)]
* End : [mysql-bin.000007:6356:1540286735000(2025-02-02 23:25:35)]
****************************************************
接下來,我在canaldb數(shù)據(jù)庫中創(chuàng)建數(shù)據(jù)表,并對(duì)數(shù)據(jù)表中的數(shù)據(jù)進(jìn)行增刪改查,程序輸出的日志信息如下所示。
#在mysql進(jìn)行數(shù)據(jù)變更后,這里會(huì)顯示mysql的bin日志。
****************************************************
* Batch Id: [7] ,count : [3] , memsize : [149] , Time : 2025-02-02 23:25:35
* Start : [mysql-bin.000007:6180:1540286735000(2025-02-02 23:25:35)]
* End : [mysql-bin.000007:6356:1540286735000(2025-02-02 23:25:35)]
****************************************************
================> binlog[mysql-bin.000007:6180] , executeTime : 1540286735000(2025-02-02 23:25:35) , gtid : () , delay : 393ms
BEGIN ----> Thread id: 43
----------------> binlog[mysql-bin.000007:6311] , name[canal,canal_table] , eventType : DELETE , executeTime : 1540286735000(2025-02-02 23:25:35) , gtid : () , delay : 393 ms
id : 8 type=int(10) unsigned
name : 512 type=varchar(255)
----------------
END ----> transaction id: 249
================> binlog[mysql-bin.000007:6356] , executeTime : 1540286735000(2025-02-02 23:25:35) , gtid : () , delay : 394ms
****************************************************
* Batch Id: [8] ,count : [3] , memsize : [149] , Time : 2025-02-02 23:25:35
* Start : [mysql-bin.000007:6387:1540286869000(2025-02-02 23:25:49)]
* End : [mysql-bin.000007:6563:1540286869000(2025-02-02 23:25:49)]
****************************************************
================> binlog[mysql-bin.000007:6387] , executeTime : 1540286869000(2025-02-02 23:25:49) , gtid : () , delay : 976ms
BEGIN ----> Thread id: 43
----------------> binlog[mysql-bin.000007:6518] , name[canal,canal_table] , eventType : INSERT , executeTime : 1540286869000(2025-02-02 23:25:49) , gtid : () , delay : 976 ms
id : 21 type=int(10) unsigned update=true
name : aaa type=varchar(255) update=true
----------------
END ----> transaction id: 250
================> binlog[mysql-bin.000007:6563] , executeTime : 1540286869000(2025-02-02 23:25:49) , gtid : () , delay : 977ms
****************************************************
* Batch Id: [9] ,count : [3] , memsize : [161] , Time : 2025-02-02 23:26:22
* Start : [mysql-bin.000007:6594:1540286902000(2025-02-02 23:26:22)]
* End : [mysql-bin.000007:6782:1540286902000(2025-02-02 23:26:22)]
****************************************************
================> binlog[mysql-bin.000007:6594] , executeTime : 1540286902000(2025-02-02 23:26:22) , gtid : () , delay : 712ms
BEGIN ----> Thread id: 43
----------------> binlog[mysql-bin.000007:6725] , name[canal,canal_table] , eventType : UPDATE , executeTime : 1540286902000(2025-02-02 23:26:22) , gtid : () , delay : 712 ms
id : 21 type=int(10) unsigned
name : aaac type=varchar(255) update=true
----------------
END ----> transaction id: 252
================> binlog[mysql-bin.000007:6782] , executeTime : 1540286902000(2025-02-02 23:26:22) , gtid : () , delay : 713ms
數(shù)據(jù)同步實(shí)現(xiàn)
需求
將數(shù)據(jù)庫數(shù)據(jù)的變化, 通過canal解析binlog日志, 實(shí)時(shí)更新到solr的索引庫中。
具體實(shí)現(xiàn)
創(chuàng)建工程
創(chuàng)建Maven工程mykit-canal-demo,并在pom.xml文件中添加如下配置。
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.24</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.0.24</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.8.9</version>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>4.10.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.9</version>
<scope>test</scope>
</dependency>
</dependencies>
創(chuàng)建log4j配置文件
在工程的src/main/resources目錄下創(chuàng)建log4j.properties文件,內(nèi)容如下所示。
log4j.rootCategory=debug, CONSOLE
# CONSOLE is set to be a ConsoleAppender using a PatternLayout.
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.Cnotallow=%d{ISO8601} %-6r [%15.15t] %-5p %30.30c %x - %m\n
# LOGFILE is set to be a File appender using a PatternLayout.
# log4j.appender.LOGFILE=org.apache.log4j.FileAppender
# log4j.appender.LOGFILE.File=d:\axis.log
# log4j.appender.LOGFILE.Append=true
# log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout
# log4j.appender.LOGFILE.layout.Cnotallow=%d{ISO8601} %-6r [%15.15t] %-5p %30.30c %x - %m\n
創(chuàng)建實(shí)體類
在io.mykit.canal.demo.bean包下創(chuàng)建一個(gè)Book實(shí)體類,用于測試Canal的數(shù)據(jù)傳輸,如下所示。
package io.mykit.canal.demo.bean;
import org.apache.solr.client.solrj.beans.Field;
import java.util.Date;
public class Book implements Serializable {
private static final long serialVersionUID = -6350345408771427834L;{
@Field("id")
private Integer id;
@Field("book_name")
private String name;
@Field("book_author")
private String author;
@Field("book_publishtime")
private Date publishtime;
@Field("book_price")
private Double price;
@Field("book_publishgroup")
private String publishgroup;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getAuthor() {
return author;
}
public void setAuthor(String author) {
this.author = author;
}
public Date getPublishtime() {
return publishtime;
}
public void setPublishtime(Date publishtime) {
this.publishtime = publishtime;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
public String getPublishgroup() {
return publishgroup;
}
public void setPublishgroup(String publishgroup) {
this.publishgroup = publishgroup;
}
@Override
public String toString() {
return "Book{" +
"id=" + id +
", name='" + name + '\'' +
", author='" + author + '\'' +
", publishtime=" + publishtime +
", price=" + price +
", publishgroup='" + publishgroup + '\'' +
'}';
}
}
其中,我們?cè)贐ook實(shí)體類中,使用Solr的注解@Field定義了實(shí)體類字段與Solr域之間的關(guān)系。
各種工具類的實(shí)現(xiàn)
接下來,我們就在io.mykit.canal.demo.utils包下創(chuàng)建各種工具類。
- BinlogValue
用于存儲(chǔ)binlog分析的每行每列的value值,代碼如下所示。
package io.mykit.canal.demo.utils;
import java.io.Serializable;
/**
*
* ClassName: BinlogValue <br/>
*
* binlog分析的每行每列的value值;<br>
* 新增數(shù)據(jù):beforeValue 和 value 均為現(xiàn)有值;<br>
* 修改數(shù)據(jù):beforeValue是修改前的值;value為修改后的值;<br>
* 刪除數(shù)據(jù):beforeValue和value均是刪除前的值; 這個(gè)比較特殊主要是為了刪除數(shù)據(jù)時(shí)方便獲取刪除前的值<br>
*/
public class BinlogValue implements Serializable {
private static final long serialVersionUID = -6350345408773943086L;
private String value;
private String beforeValue;
/**
* binlog分析的每行每列的value值;<br>
* 新增數(shù)據(jù):value:為現(xiàn)有值;<br>
* 修改數(shù)據(jù):value為修改后的值;<br>
* 刪除數(shù)據(jù):value是刪除前的值; 這個(gè)比較特殊主要是為了刪除數(shù)據(jù)時(shí)方便獲取刪除前的值<br>
*/
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
/**
* binlog分析的每行每列的beforeValue值;<br>
* 新增數(shù)據(jù):beforeValue為現(xiàn)有值;<br>
* 修改數(shù)據(jù):beforeValue是修改前的值;<br>
* 刪除數(shù)據(jù):beforeValue為刪除前的值; <br>
*/
public String getBeforeValue() {
return beforeValue;
}
public void setBeforeValue(String beforeValue) {
this.beforeValue = beforeValue;
}
}
- CanalDataParser
用于解析數(shù)據(jù),代碼如下所示。
package io.mykit.canal.demo.utils;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* 解析數(shù)據(jù)
*/
public class CanalDataParser {
protected static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
protected static final String yyyyMMddHHmmss = "yyyyMMddHHmmss";
protected static final String yyyyMMdd = "yyyyMMdd";
protected static final String SEP = SystemUtils.LINE_SEPARATOR;
protected static String context_format = null;
protected static String row_format = null;
protected static String transaction_format = null;
protected static String row_log = null;
private static Logger logger = LoggerFactory.getLogger(CanalDataParser.class);
static {
context_format = SEP + "****************************************************" + SEP;
context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP;
context_format += "* Start : [{}] " + SEP;
context_format += "* End : [{}] " + SEP;
context_format += "****************************************************" + SEP;
row_format = SEP
+ "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {} , delay : {}ms"
+ SEP;
transaction_format = SEP + "================> binlog[{}:{}] , executeTime : {} , delay : {}ms" + SEP;
row_log = "schema[{}], table[{}]";
}
public static List<InnerBinlogEntry> convertToInnerBinlogEntry(Message message) {
List<InnerBinlogEntry> innerBinlogEntryList = new ArrayList<InnerBinlogEntry>();
if(message == null) {
logger.info("接收到空的 message; 忽略");
return innerBinlogEntryList;
}
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
logger.info("接收到空的message[size=" + size + "]; 忽略");
return innerBinlogEntryList;
}
printLog(message, batchId, size);
List<Entry> entrys = message.getEntries();
//輸出日志
for (Entry entry : entrys) {
long executeTime = entry.getHeader().getExecuteTime();
long delayTime = new Date().getTime() - executeTime;
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
TransactionBegin begin = null;
try {
begin = TransactionBegin.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
// 打印事務(wù)頭信息,執(zhí)行的線程id,事務(wù)耗時(shí)
logger.info("BEGIN ----> Thread id: {}", begin.getThreadId());
logger.info(transaction_format, new Object[] {entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });
} else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
TransactionEnd end = null;
try {
end = TransactionEnd.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
// 打印事務(wù)提交信息,事務(wù)id
logger.info("END ----> transaction id: {}", end.getTransactionId());
logger.info(transaction_format,
new Object[] {entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()),
String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });
}
continue;
}
//解析結(jié)果
if (entry.getEntryType() == EntryType.ROWDATA) {
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
}
EventType eventType = rowChage.getEventType();
logger.info(row_format, new Object[] { entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime) });
//組裝數(shù)據(jù)結(jié)果
if (eventType == EventType.INSERT || eventType == EventType.DELETE || eventType == EventType.UPDATE) {
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
List<Map<String, BinlogValue>> rows = parseEntry(entry);
InnerBinlogEntry innerBinlogEntry = new InnerBinlogEntry();
innerBinlogEntry.setEntry(entry);
innerBinlogEntry.setEventType(eventType);
innerBinlogEntry.setSchemaName(schemaName);
innerBinlogEntry.setTableName(tableName.toLowerCase());
innerBinlogEntry.setRows(rows);
innerBinlogEntryList.add(innerBinlogEntry);
} else {
logger.info(" 存在 INSERT INSERT UPDATE 操作之外的SQL [" + eventType.toString() + "]");
}
continue;
}
}
return innerBinlogEntryList;
}
private static List<Map<String, BinlogValue>> parseEntry(Entry entry) {
List<Map<String, BinlogValue>> rows = new ArrayList<Map<String, BinlogValue>>();
try {
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
RowChange rowChage = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChage.getEventType();
// 處理每個(gè)Entry中的每行數(shù)據(jù)
for (RowData rowData : rowChage.getRowDatasList()) {
StringBuilder rowlog = new StringBuilder("rowlog schema[" + schemaName + "], table[" + tableName + "], event[" + eventType.toString() + "]");
Map<String, BinlogValue> row = new HashMap<String, BinlogValue>();
List<Column> beforeColumns = rowData.getBeforeColumnsList();
List<Column> afterColumns = rowData.getAfterColumnsList();
beforeColumns = rowData.getBeforeColumnsList();
if (eventType == EventType.DELETE) {//delete
for(Column column : beforeColumns) {
BinlogValue binlogValue = new BinlogValue();
binlogValue.setValue(column.getValue());
binlogValue.setBeforeValue(column.getValue());
row.put(column.getName(), binlogValue);
}
} else if(eventType == EventType.UPDATE) {//update
for(Column column : beforeColumns) {
BinlogValue binlogValue = new BinlogValue();
binlogValue.setBeforeValue(column.getValue());
row.put(column.getName(), binlogValue);
}
for(Column column : afterColumns) {
BinlogValue binlogValue = row.get(column.getName());
if(binlogValue == null) {
binlogValue = new BinlogValue();
}
binlogValue.setValue(column.getValue());
row.put(column.getName(), binlogValue);
}
} else { // insert
for(Column column : afterColumns) {
BinlogValue binlogValue = new BinlogValue();
binlogValue.setValue(column.getValue());
binlogValue.setBeforeValue(column.getValue());
row.put(column.getName(), binlogValue);
}
}
rows.add(row);
String rowjson = JacksonUtil.obj2str(row);
logger.info("#################################### Data Parse Result ####################################");
logger.info(rowlog + " , " + rowjson);
logger.info("#################################### Data Parse Result ####################################");
logger.info("");
}
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException("parseEntry has an error , data:" + entry.toString(), e);
}
return rows;
}
private static void printLog(Message message, long batchId, int size) {
long memsize = 0;
for (Entry entry : message.getEntries()) {
memsize += entry.getHeader().getEventLength();
}
String startPosition = null;
String endPosition = null;
if (!CollectionUtils.isEmpty(message.getEntries())) {
startPosition = buildPositionForDump(message.getEntries().get(0));
endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
}
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
logger.info(context_format, new Object[] {batchId, size, memsize, format.format(new Date()), startPosition, endPosition });
}
private static String buildPositionForDump(Entry entry) {
long time = entry.getHeader().getExecuteTime();
Date date = new Date(time);
SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
return entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":" + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";
}
}
- DateUtils
時(shí)間工具類,代碼如下所示。
package io.mykit.canal.demo.utils;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
public class DateUtils {
private static final String FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss";
private static SimpleDateFormat sdf = new SimpleDateFormat(FORMAT_PATTERN);
public static Date parseDate(String datetime) throws ParseException{
if(datetime != null && !"".equals(datetime)){
return sdf.parse(datetime);
}
return null;
}
public static String formatDate(Date datetime) throws ParseException{
if(datetime != null ){
return sdf.format(datetime);
}
return null;
}
public static Long formatStringDateToLong(String datetime) throws ParseException{
if(datetime != null && !"".equals(datetime)){
Date d = sdf.parse(datetime);
return d.getTime();
}
return null;
}
public static Long formatDateToLong(Date datetime) throws ParseException{
if(datetime != null){
return datetime.getTime();
}
return null;
}
}
- InnerBinlogEntry
Binlog實(shí)體類,代碼如下所示。
package io.mykit.canal.demo.utils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
public class InnerBinlogEntry {
/**
* canal原生的Entry
*/
private Entry entry;
/**
* 該Entry歸屬于的表名
*/
private String tableName;
/**
* 該Entry歸屬數(shù)據(jù)庫名
*/
private String schemaName;
/**
* 該Entry本次的操作類型,對(duì)應(yīng)canal原生的枚舉;EventType.INSERT; EventType.UPDATE; EventType.DELETE;
*/
private EventType eventType;
private List<Map<String, BinlogValue>> rows = new ArrayList<Map<String, BinlogValue>>();
public Entry getEntry() {
return entry;
}
public void setEntry(Entry entry) {
this.entry = entry;
}
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public EventType getEventType() {
return eventType;
}
public void setEventType(EventType eventType) {
this.eventType = eventType;
}
public String getSchemaName() {
return schemaName;
}
public void setSchemaName(String schemaName) {
this.schemaName = schemaName;
}
public List<Map<String, BinlogValue>> getRows() {
return rows;
}
public void setRows(List<Map<String, BinlogValue>> rows) {
this.rows = rows;
}
}
- JacksonUtil
Json工具類,代碼如下所示。
package io.mykit.canal.demo.utils;
import java.io.IOException;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
public class JacksonUtil {
private static ObjectMapper mapper = new ObjectMapper();
public static String obj2str(Object obj) {
String json = null;
try {
json = mapper.writeValueAsString(obj);
} catch (JsonGenerationException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return json;
}
public static <T> T str2obj(String content, Class<T> valueType) {
try {
return mapper.readValue(content, valueType);
} catch (JsonParseException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
}
同步程序的實(shí)現(xiàn)
準(zhǔn)備好實(shí)體類和工具類后,我們就可以編寫同步程序來實(shí)現(xiàn)MySQL數(shù)據(jù)庫中的數(shù)據(jù)實(shí)時(shí)同步到Solr索引庫了,我們?cè)趇o.mykit.canal.demo.main包中常見MykitCanalDemoSync類,代碼如下所示。
package io.mykit.canal.demo.main;
import io.mykit.canal.demo.bean.Book;
import io.mykit.canal.demo.utils.BinlogValue;
import io.mykit.canal.demo.utils.CanalDataParser;
import io.mykit.canal.demo.utils.DateUtils;
import io.mykit.canal.demo.utils.InnerBinlogEntry;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.text.ParseException;
import java.util.List;
import java.util.Map;
public class SyncDataBootStart {
private static Logger logger = LoggerFactory.getLogger(SyncDataBootStart.class);
public static void main(String[] args) throws Exception {
String hostname = "192.168.175.100";
Integer port = 11111;
String destination = "example";
//獲取CanalServer 連接
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(hostname, port), destination, "", "");
//連接CanalServer
canalConnector.connect();
//訂閱Destination
canalConnector.subscribe();
//輪詢拉取數(shù)據(jù)
Integer batchSize = 5*1024;
while (true){
Message message = canalConnector.getWithoutAck(batchSize);
long messageId = message.getId();
int size = message.getEntries().size();
if(messageId == -1 || size == 0){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else{
//進(jìn)行數(shù)據(jù)同步
//1. 解析Message對(duì)象
List<InnerBinlogEntry> innerBinlogEntries = CanalDataParser.convertToInnerBinlogEntry(message);
//2. 將解析后的數(shù)據(jù)信息 同步到Solr的索引庫中.
syncDataToSolr(innerBinlogEntries);
}
//提交確認(rèn)
canalConnector.ack(messageId);
}
}
private static void syncDataToSolr(List<InnerBinlogEntry> innerBinlogEntries) throws Exception {
//獲取solr的連接
SolrServer solrServer = new HttpSolrServer("http://192.168.175.101:8080/solr");
//遍歷數(shù)據(jù)集合 , 根據(jù)數(shù)據(jù)集合中的數(shù)據(jù)信息, 來決定執(zhí)行增加, 修改 , 刪除操作 .
if(innerBinlogEntries != null){
for (InnerBinlogEntry innerBinlogEntry : innerBinlogEntries) {
CanalEntry.EventType eventType = innerBinlogEntry.getEventType();
//如果是Insert, update , 則需要同步數(shù)據(jù)到 solr 索引庫
if(eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE){
List<Map<String, BinlogValue>> rows = innerBinlogEntry.getRows();
if(rows != null){
for (Map<String, BinlogValue> row : rows) {
BinlogValue id = row.get("id");
BinlogValue name = row.get("name");
BinlogValue author = row.get("author");
BinlogValue publishtime = row.get("publishtime");
BinlogValue price = row.get("price");
BinlogValue publishgroup = row.get("publishgroup");
Book book = new Book();
book.setId(Integer.parseInt(id.getValue()));
book.setName(name.getValue());
book.setAuthor(author.getValue());
book.setPrice(Double.parseDouble(price.getValue()));
book.setPublishgroup(publishgroup.getValue());
book.setPublishtime(DateUtils.parseDate(publishtime.getValue()));
//導(dǎo)入數(shù)據(jù)到solr索引庫
solrServer.addBean(book);
solrServer.commit();
}
}
}else if(eventType == CanalEntry.EventType.DELETE){
//如果是Delete操作, 則需要?jiǎng)h除solr索引庫中的數(shù)據(jù) .
List<Map<String, BinlogValue>> rows = innerBinlogEntry.getRows();
if(rows != null){
for (Map<String, BinlogValue> row : rows) {
BinlogValue id = row.get("id");
//根據(jù)ID刪除solr的索引庫
solrServer.deleteById(id.getValue());
solrServer.commit();
}
}
}
}
}
}
}
接下來,啟動(dòng)SyncDataBootStart類的main方法,監(jiān)聽Canal Server,而Canal Server監(jiān)聽MySQL binlog的日志變化,一旦MySQL的binlog日志發(fā)生變化,則SyncDataBootStart會(huì)立刻收到變更信息,并將變更信息解析成Book對(duì)象實(shí)時(shí)更新到Solr庫中。如果在MySQL數(shù)據(jù)庫中刪除了數(shù)據(jù),則也會(huì)實(shí)時(shí)刪除Solr庫中的數(shù)據(jù)。
部分參考Canal官方文檔:https://github.com/alibaba/canal。