MySQL億級數(shù)據(jù)平滑遷移實(shí)戰(zhàn)
一、背景
預(yù)約業(yè)務(wù)是 vivo 游戲中心的重要業(yè)務(wù)之一。由于歷史原因,預(yù)約業(yè)務(wù)數(shù)據(jù)表與其他業(yè)務(wù)數(shù)據(jù)表存儲在同一個(gè)數(shù)據(jù)庫中。當(dāng)其他業(yè)務(wù)出現(xiàn)慢 SQL 等異常情況時(shí),可能會直接影響到預(yù)約業(yè)務(wù),從而降低系統(tǒng)整體的可靠性和穩(wěn)定性。為了盡可能提高系統(tǒng)的穩(wěn)定性和數(shù)據(jù)隔離性,我們迫切需要將預(yù)約相關(guān)數(shù)據(jù)表從原來的數(shù)據(jù)庫中遷移出來,單獨(dú)建立一個(gè)預(yù)約業(yè)務(wù)的數(shù)據(jù)庫。
二、方案選型
常見的遷移方案大致可以分為以下幾類:
而預(yù)約業(yè)務(wù)有以下特點(diǎn):
- 讀寫場景多,頻率高,在用戶預(yù)約/取消預(yù)約/福利發(fā)放等場景均涉及到大量的讀寫。
- 不可接受停機(jī),停機(jī)不可避免的會造成經(jīng)濟(jì)損失,在有其他方案的情況下不適合選擇此方案。
- 大部分的場景能接受秒級的數(shù)據(jù)不一致,少部分不能。
結(jié)合這些特點(diǎn),我們再評估下上面的方案:
停機(jī)遷移方案需要停機(jī),不適用于預(yù)約場景。預(yù)約場景存在不活躍的用戶數(shù)據(jù),如果用漸進(jìn)式遷移方案的話很難遷移干凈,可能還需要再寫一個(gè)遷移任務(wù)來輔助完成遷移。而雙寫方案最大的優(yōu)勢在于每一步操作都可向上回滾,能盡可能的保證業(yè)務(wù)不出問題。
因此,最終選擇的是雙寫方案。預(yù)約業(yè)務(wù)涉及到的讀寫場景多,每一個(gè)場景單獨(dú)進(jìn)行改造的成本大,采用 Mybatis 插件來實(shí)現(xiàn)遷移所需的雙寫等功能,可以有效降低改造成本。
三、前期準(zhǔn)備
3.1 全量同步&增量同步&一致性校驗(yàn)
這幾步使用了公司提供的數(shù)據(jù)同步工具。全量同步基于 MySQLDump 實(shí)現(xiàn);增量同步基于 binlog 實(shí)現(xiàn);一致性校驗(yàn)通過在新老庫各選一個(gè)分塊,然后聚合列數(shù)據(jù)計(jì)算并對比其特征值實(shí)現(xiàn)。
3.2 代碼改造
引入了新庫,那自然就需要在項(xiàng)目里新建數(shù)據(jù)源,并創(chuàng)建表對應(yīng)的 Mybatis Mapper 類。這里有一個(gè)小細(xì)節(jié)需要注意,Mybatis 默認(rèn)的 BeanNameGenerator 是
AnnotationBeanNameGenerator,它會使用類名作為 BeanName 注冊到 Spring 的 ioc 容器中,Spring 啟動(dòng)時(shí)如果發(fā)現(xiàn)有了兩個(gè)重名 Bean 就會啟動(dòng)失敗,筆者這里給 Mybatis 設(shè)置了一個(gè)新的 BeanNameGenerator ,使用類的全路徑名作為 BeanName 解決了問題。
public class FullPathBeanNameGenerator implements BeanNameGenerator {
@Override
public String generateBeanName(BeanDefinition definition, BeanDefinitionRegistry registry) {
return definition.getBeanClassName();
}
}
還有一點(diǎn)是主鍵 id,本次預(yù)約遷移需要保證新老庫主鍵 id 一致,預(yù)約業(yè)務(wù)沒做分庫分表,id 都是直接用 MySQL 的自增 id,沒有用 id 生成器之類的中間件。因此插入新表時(shí)只需要使用插入老表后 Mybatis 自動(dòng)設(shè)置好的 id 即可,這次遷移前先檢查了一遍業(yè)務(wù)代碼,確保插入語句都用了 Mybatis 的 useGeneratedKeys 功能來自動(dòng)設(shè)置 id。
3.3 插件實(shí)現(xiàn)
Mybatis 插件可以攔截 SQL 語句執(zhí)行過程中的某一點(diǎn)進(jìn)行干預(yù)和處理,而 Executor 是 Mybatis 中負(fù)責(zé)執(zhí)行 SQL 語句的核心組件。我們可以對 Executor 的 update 和 query 方法進(jìn)行代理以實(shí)現(xiàn)遷移所需的功能。
插件需要為讀寫場景分別實(shí)現(xiàn)以下功能:
考慮到開關(guān)切換部分的代碼邏輯較為簡單,因此在下文中,筆者將不再過多介紹該部分的具體實(shí)現(xiàn),而是著重介紹如何在插件中使用老庫的執(zhí)行語句來訪問新的數(shù)據(jù)庫。此外,代碼里會涉及到 Mybatis 相關(guān)的一些概念,由于網(wǎng)上已經(jīng)有較多詳盡的資料,這里就不再贅述。
遷移插件代理了 Executor 的 query 和 update 方法,首先在插件里獲取到當(dāng)前執(zhí)行的 SQL 語句所在的 Mapper 路徑。
@Intercepts(
{
@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class}),
@Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class}),
@Signature(type = Executor.class, method = "query", args = {MappedStatement.class, Object.class, RowBounds.class, ResultHandler.class, CacheKey.class, BoundSql.class}),
}
)
public class AppointMigrateInterceptor implements Interceptor {
@Override
public Object intercept(Invocation invocation) throws Throwable {
Object[] args = invocation.getArgs();
// Mybatis插件代理的Executor的update或者query方法,第一個(gè)參數(shù)就是MappedStatement
MappedStatement ms = (MappedStatement) args[0];
SqlCommandType sqlCommandType = ms.getSqlCommandType();
String id = ms.getId();
// 從MappedStatement id中獲取對應(yīng)的Mapper接口文件全路徑
String sourceMapper = id.substring(0, id.lastIndexOf("."));
// ...
}
// ...
}
得到老庫 Mapper 路徑后,將其轉(zhuǎn)換為新庫 Mapper 路徑,再使用 Class.forName 獲取到新庫 Mapper 類,然后用新庫的 sqlSessionFactory 開啟 sqlSession,再獲取反射調(diào)用所需的方法、對象、參數(shù),在新庫上執(zhí)行語句。
protected Object invoke(Invocation invocation, TableConfiguration tableConfiguration) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
// 獲取 MappedStatement
MappedStatement ms = (MappedStatement) invocation.getArgs()[0];
// 獲取 Mybatis 封裝好的入?yún)?,封裝函數(shù) MapperMethod.convertArgsToSqlCommandParam(Object[] args)
Object parameter = invocation.getArgs()[1];
// 使用 Class.forName 獲取到的新庫 Mapper
Class<?> targetMapperClass = tableConfiguration.getTargetMapperClazz();
// 使用新庫的 sqlSessionFactory 創(chuàng)建 sqlSession
SqlSession sqlSession = sqlSessionFactory.openSession();
Object result = null;
try{
// 使用新庫的 Mapper 路徑獲取對應(yīng)的 MapperProxy 對象
Object mapper = sqlSession.getMapper(targetMapperClass);
// 將 Mybatis 封裝好的參數(shù)轉(zhuǎn)換為原始參數(shù)
Object[] paramValues = getParamValue(parameter);
// 使用 mappedStatement Id 從新庫對應(yīng)的 Mapper 里獲取對應(yīng)的方法
Method method = getMethod(ms.getId(), targetMapperClass, paramValues);
paramValues = fixNullParam(method, paramValues);
// 反射調(diào)用新庫 Mapper 的方法,本質(zhì)上執(zhí)行的是 MapperProxy.invoke
result = method.invoke(mapper, paramValues);
} finally {
sqlSession.close();
}
return result;
}
private Object[] fixNullParam(Method method, Object[] paramValues) {
if (method.getParameterTypes().length > 0 && paramValues.length == 0) {
return new Object[]{null};
}
return paramValues;
}
上述代碼里,getMethod 方法負(fù)責(zé)從新庫 Mapper 類里找到對應(yīng)的方法,以用于后續(xù)的反射調(diào)用。
private Method getMethod(String id, Class mapperClass) throws NoSuchMethodException {
//獲取參數(shù)對應(yīng)的 class
String methodName = id.substring(id.lastIndexOf(".") + 1);
String key = id;
// methodCache 用來緩存 MappedStatement 和對應(yīng)的 Method,避免每次都從 Mapper 里查找
Method method = methodCache.get(key);
if (method == null){
method = findMethodByMethodSignature(mapperClass, methodName);
if (method == null){
throw new NoSuchMethodException("No such method " + methodName + " in class " + mapperClass.getName());
}
methodCache.put(key,method);
}
return method;
}
private Method findMethodByMethodSignature(Class mapperClass,String methodName) throws NoSuchMethodException {
// mybatis 的 Mapper 內(nèi)的方法不支持重載,所以這里只要方法名匹配到了就行,不用進(jìn)行參數(shù)的匹配
Method method = null;
for (Method m : mapperClass.getMethods()) {
if (m.getName().equals(methodName)) {
method = m;
break;
}
}
return method;
}
得到方法后,還需要得到反射調(diào)用所需的參數(shù)。Mybatis 執(zhí)行到 Executor.update/query 方法時(shí),參數(shù)已經(jīng)經(jīng)過 MapperMethod.convertArgsToSqlCommandParam(Object[] args) 方法封裝,不能直接用來執(zhí)行 MapperProxy.invoke ,需要轉(zhuǎn)換后才可用。下圖是MapperMethod.convertArgsToSqlCommandParam(Object[] args) 的封裝過程,而下面的 getParamValue 是這個(gè)函數(shù)的逆過程。
private Object[] getParamValue(Object parameter) {
List<Object> paramValues = new ArrayList<>();
if (parameter instanceof Map) {
Map<String, Object> paramMap = (Map<String, Object>) parameter;
if (paramMap.containsKey("collection")) {
paramValues.add(paramMap.get("collection"));
} else if (paramMap.containsKey("array")) {
paramValues.add(paramMap.get("array"));
} else {
int count = 1;
while (count <= paramMap.size() / 2){
try {
paramValues.add(paramMap.get("param"+(count++)));
}catch (BindingException e){
break;
}
}
}
} else if (parameter != null){
paramValues.add(parameter);
}
return paramValues.toArray();
}
通過上述流程,我們就能使用 Mybatis 插件攔截老庫的執(zhí)行過程,實(shí)現(xiàn)遷移所需的讀寫數(shù)據(jù)源切換/新老庫查詢結(jié)果對比/先寫老庫再異步寫新庫等功能。
四、雙寫流程
4.1 上線雙寫改造后的業(yè)務(wù)代碼,上線時(shí)只讀寫老庫
- 讀開關(guān):只讀老庫
- 寫開關(guān):只寫老庫
- 新老庫查詢結(jié)果對比開關(guān):關(guān)
此時(shí)業(yè)務(wù)仍只讀寫老庫。
4.2 使用公司中間件平臺提供的數(shù)據(jù)工具同步老庫數(shù)據(jù)到新庫
- 讀開關(guān):只讀老庫
- 寫開關(guān):只寫老庫
- 新老庫查詢結(jié)果對比開關(guān):關(guān)
第1步和第2步并沒有嚴(yán)格的順序要求,只要在切換為雙寫前做完第1步和第2步就好。
條件允許的情況下,全量+增量同步時(shí)應(yīng)選擇不對外提供服務(wù)的離線從庫作為數(shù)據(jù)源,避免主從延遲等問題對線上業(yè)務(wù)造成影響。
4.3 停止同步程序,然后開啟雙寫
- 讀開關(guān):只讀老庫(開啟查詢結(jié)果對比開關(guān))
- 寫開關(guān):雙寫
- 新老庫查詢結(jié)果對比開關(guān):開
老庫追上新庫后,對數(shù)據(jù)做一次全量校驗(yàn),避免出現(xiàn)數(shù)據(jù)不一致的情況。此外還需要開啟新老庫查詢結(jié)果對比開關(guān),通過日志監(jiān)控觀察新老庫的查詢結(jié)果是否一致。
停止數(shù)據(jù)同步和切換雙寫之間必然有時(shí)間差,如果先開啟雙寫再停止數(shù)據(jù)同步,則可能出現(xiàn)插入重復(fù)數(shù)據(jù)或數(shù)據(jù)被覆蓋的情況。因此需要對數(shù)據(jù)同步工具和遷移插件進(jìn)行改造,以處理數(shù)據(jù)異常的情況,但是這樣改造需要處理的情況較多,改造成本較高。所以這里選擇先停止同步,再切換到雙寫,中間丟失的數(shù)據(jù)使用對比&補(bǔ)償任務(wù)恢復(fù),由于此時(shí)仍然全量讀老庫,所以對業(yè)務(wù)不會有影響。需要注意的是,雙寫階段的時(shí)間不應(yīng)太長,只要確保新老庫數(shù)據(jù)一致就應(yīng)該前進(jìn)到下一步。
這一步在實(shí)際操作過程中需要注意以下情況:
4.3.1 自增主鍵
預(yù)約業(yè)務(wù)新庫的主鍵 id 需要和老庫保持一致,因此在遷移前檢查了一遍業(yè)務(wù)代碼,確保插入語句都用了 Mybatis 的 useGeneratedKeys 功能來返回 id ,這樣插入新庫時(shí)可以直接用設(shè)置好 id 的對象。但是這里有一個(gè)問題,批量插入時(shí) Mybatis 自動(dòng)設(shè)置的 id 和數(shù)據(jù)庫生成的自增主鍵不一定完全一致,比如批量 insert ignore 和 on duplicate key update 語句。
這個(gè)問題和 useGeneratedKeys 的實(shí)現(xiàn)有關(guān),代碼可參考
com.mysql.jdbc.StatementImpl#getGeneratedKeysInternal(long) 函數(shù),以下是其執(zhí)行邏輯:
- Mybatis 執(zhí)行完插入語句后,MySQL 會返回這次插入影響的數(shù)據(jù)行數(shù),注意,使用 insert ignore 插入時(shí),忽略的那部分?jǐn)?shù)據(jù)不會加到影響的行數(shù)上。
- Mybatis 使用 SELECT LAST_INSERT_ID() 查詢這次插入的最小 id 。
- Mybatis 循環(huán)遍歷插入時(shí)用的對象列表,循環(huán)的最大次數(shù)為第1步里獲取的這次插入影響的行數(shù),使用 n 代表當(dāng)前的循環(huán)次數(shù),列表中的每個(gè)對象的 id 被賦值為 LAST_INSERT_ID() + n*AUTO_INCREMENT 。
舉例來說,假設(shè)老庫的某張表里有數(shù)據(jù) b ,其 id=1,此時(shí)往該表使用 insert ignore 批量插入三條數(shù)據(jù) a,b,c,其在表內(nèi)的 id 為 a:2、b:1、c:3,返回的影響行數(shù)為2,SELECT LAST_INSERT_ID() 返回的是2,因此 Mybatis 往對象里設(shè)置的主鍵分別為 a:2、b:3、c:null,再使用這個(gè)設(shè)置好 id 的對象列表插入新庫時(shí)會導(dǎo)致新老庫 id 不一致。
解決方案:由于直接刪除 ignore 會改變這條 SQL 的語義,無法通過修改語句來解決問題。所以我們只能在遷移插件里跳過這條語句,使其固定寫入老庫。然后在業(yè)務(wù)層單獨(dú)對其進(jìn)行遷移改造,將插入新庫的流程修改為先使用 id 以外的唯一鍵查詢一次老庫的數(shù)據(jù),獲取到 id 以后設(shè)置到對象列表里,再插入新庫。
4.3.2 事務(wù)
預(yù)約業(yè)務(wù)有部分邏輯用到了事務(wù),但這部分邏輯在雙寫期間均可以暫停功能,因此遷移插件沒有實(shí)現(xiàn)事務(wù)的支持。如果需要支持業(yè)務(wù)的話可以不依賴插件,在業(yè)務(wù)層單獨(dú)對那部分代碼進(jìn)行改造。
4.3.3 異步寫入新庫引起的問題
雙寫過程中是異步寫新庫,需要重點(diǎn)關(guān)注是否會有線程安全問題。舉例來說,假設(shè)有個(gè)業(yè)務(wù)需要往表里插入一個(gè)列表,插入完列表后又對列表進(jìn)行了修改,比如執(zhí)行了 List.clear() 函數(shù)或者其中的對象發(fā)生了變更,由于是異步寫新庫,所以實(shí)際的執(zhí)行流程可能如下:
- 老庫 insert(list)
- list.clear()
- 新庫 insert(list)
這會導(dǎo)致新庫執(zhí)行操作時(shí),傳入的對象和老庫執(zhí)行操作時(shí)不一樣,導(dǎo)致新老庫數(shù)據(jù)不一致。建議在遷移前人為的確認(rèn)業(yè)務(wù)邏輯,避免異步寫入導(dǎo)致新老庫數(shù)據(jù)不一致。
4.4 開啟對比和補(bǔ)償程序,補(bǔ)償切換開關(guān)的過程中遺失的數(shù)據(jù)
- 讀開關(guān):只讀老庫(對比開關(guān)開啟)
- 寫開關(guān):雙寫
- 新老庫查詢結(jié)果對比開關(guān):開
- 對比&補(bǔ)償任務(wù):開啟
該對比&補(bǔ)償任務(wù)有一個(gè)缺陷,其不能處理數(shù)據(jù)被刪除的情況,如果老庫里的數(shù)據(jù)被刪除但是新庫的數(shù)據(jù)刪除失敗,那使用更新時(shí)間區(qū)間就無法從老庫查出這條數(shù)據(jù),自然也無法進(jìn)行對比&補(bǔ)償。
雙寫期間,如果出現(xiàn)刪老庫成功但是刪新庫失敗的情況會有日志告警,所以不會有問題。但是停止數(shù)據(jù)同步工具 → 開啟雙寫開關(guān)這一過程中刪除的數(shù)據(jù)無法補(bǔ)償。不過大部分業(yè)務(wù)用的都是邏輯刪除,只有一處用了物理刪除,筆者在這一處添加了日志,如果切換過程中出現(xiàn)刪除數(shù)據(jù)的日志,就需要手動(dòng)進(jìn)行補(bǔ)償操作。實(shí)際操作過程中,開關(guān)的切換的耗時(shí)較短,只花了30秒左右,在這過程中沒有打印刪除數(shù)據(jù)的日志。
4.5 逐步切量請求到新庫上
- 讀開關(guān):部分讀新庫 → 只讀新庫
- 寫開關(guān):雙寫
- 新老庫查詢結(jié)果對比開關(guān):開
- 對比&補(bǔ)償任務(wù):開啟
雙寫時(shí),由于數(shù)據(jù)先寫入老庫再異步寫入新庫,因此新庫的數(shù)據(jù)肯定會滯后于老庫。如果將一部分讀流量切換到新庫上,就可能會在一些對延遲要求較高的業(yè)務(wù)場景中出現(xiàn)問題。對于這種場景,我們不能采用逐步切量的策略,只能同時(shí)切換讀寫開關(guān),將其修改為只寫老庫+只讀新庫。
4.6 停止對比補(bǔ)償程序,關(guān)閉雙寫,讀寫都切換到新庫,開啟反向補(bǔ)償任務(wù)
- 讀開關(guān):只讀新庫
- 寫開關(guān):只寫新庫
- 新老庫查詢結(jié)果對比開關(guān):關(guān)
- 對比&補(bǔ)償任務(wù):開啟反向補(bǔ)償
反向補(bǔ)償是從新庫補(bǔ)償數(shù)據(jù)到老庫,由于該任務(wù)是定時(shí)執(zhí)行,開啟后,新庫和老庫的數(shù)據(jù)會有 1~2 分鐘的延遲,萬一寫新庫的邏輯有問題,可以切回老庫。至于為什么用反向補(bǔ)償任務(wù)而不是使用先寫新庫再異步寫老庫的策略,是因?yàn)殡p寫是用 MyBatis 插件實(shí)現(xiàn)的,插件代理的是 excutor 的 update 和 query 方法,如果異步寫入老庫,有可能會發(fā)生以下情況:
- 假設(shè)有兩個(gè)線程,業(yè)務(wù)線程 A 需要寫入一條數(shù)據(jù),遷移插件攔截后,先同步寫入新庫,寫完新庫后提交任務(wù)給線程 B 中異步寫入老庫,提交完任務(wù)后插件立刻返回。
- 由于插件已返回結(jié)果,executor 上層的 sqlsession 調(diào)用 close() 方法關(guān)閉 executor (見 org.mybatis.spring.SqlSessionTemplate.SqlSessionInterceptor#invoke ),此時(shí)線程 B 可能還沒執(zhí)行完寫老庫的操作。
- 線程 B 執(zhí)行過程中,由于 executor 已經(jīng)關(guān)閉,導(dǎo)致其寫老庫失敗。
因此無法使用 Mybatis 插件來實(shí)現(xiàn)異步寫老庫。
4.7 停止反向補(bǔ)償任務(wù),刪除表遷移相關(guān)代碼
停止反向補(bǔ)償前,需要關(guān)注是否還有業(yè)務(wù)在讀老庫。觀察一段時(shí)間,確認(rèn)老庫沒有補(bǔ)償任務(wù)以外的讀寫流量后,可以關(guān)閉補(bǔ)償任務(wù),清理遷移過程中產(chǎn)生的代碼,清理老庫數(shù)據(jù)。
五、總結(jié)
在進(jìn)行數(shù)據(jù)表遷移的過程中,雖然遇到了一些問題,但是制定的方案中每一步都有回退措施,即使出現(xiàn)問題也不會影響業(yè)務(wù)的正常運(yùn)行。此外,筆者在遷移過程中對各種異常情況進(jìn)行了監(jiān)控,能及時(shí)發(fā)現(xiàn)并解決問題。如果其他業(yè)務(wù)需要進(jìn)行類似的遷移,需要關(guān)注以下幾個(gè)方面:
- 遷移插件實(shí)現(xiàn):在對遷移過程進(jìn)行反思后,筆者人為通過代理或重寫 MapperProxy 的方式來實(shí)現(xiàn)遷移插件可能是更加合理的方案。這種方案有兩個(gè)優(yōu)點(diǎn):一方面,可以避免處理 Mybatis 復(fù)雜的參數(shù)轉(zhuǎn)換流程,從而減少潛在的錯(cuò)誤和異常;另一方面,可以實(shí)現(xiàn)先寫新庫再異步寫老庫的操作。但是這個(gè)方案沒有經(jīng)過實(shí)踐,還不能確定是否有可行性。
- 自增主鍵:需要確定業(yè)務(wù)是否需要保證新老庫的 id 一致。
- 事務(wù):雙寫過程中應(yīng)該結(jié)合業(yè)務(wù)考慮是否需要實(shí)現(xiàn)事務(wù)支持。本次遷移過程中,我們暫停了部分需要事務(wù)支持的業(yè)務(wù)。
- 異步寫入:先寫老庫再異步寫入新庫的方式可能導(dǎo)致新老庫數(shù)據(jù)不一致,遷移插件自身無法解決這個(gè)問題,只能人工提前排查可能存在的隱患。