Sharding-JDBC源碼解析與vivo的定制開(kāi)發(fā)
本文源碼基于Sharding-JDBC 4.1.1版本。
一、業(yè)務(wù)背景
隨著業(yè)務(wù)并發(fā)請(qǐng)求和數(shù)據(jù)規(guī)模的不斷擴(kuò)大,單節(jié)點(diǎn)庫(kù)表壓力往往會(huì)成為系統(tǒng)的性能瓶頸。公司IT內(nèi)部營(yíng)銷(xiāo)庫(kù)存、交易訂單、財(cái)經(jīng)臺(tái)賬、考勤記錄等多領(lǐng)域的業(yè)務(wù)場(chǎng)景的日增數(shù)據(jù)量巨大,存在著數(shù)據(jù)庫(kù)節(jié)點(diǎn)壓力過(guò)大、連接過(guò)多、查詢速度變慢等情況,根據(jù)數(shù)據(jù)來(lái)源、時(shí)間、工號(hào)等信息來(lái)將沒(méi)有聯(lián)系的數(shù)據(jù)盡量均分到不同的庫(kù)表中,從而在不影響業(yè)務(wù)需求的前提下,減輕數(shù)據(jù)庫(kù)節(jié)點(diǎn)壓力,提升查詢效率和系統(tǒng)穩(wěn)定性。
二、技術(shù)選型
我們對(duì)比了幾款比較常見(jiàn)的支持分庫(kù)分表和讀寫(xiě)分離的中間件。
Sharding-JDBC作為輕量化的增強(qiáng)版的JDBC框架,相較其他中間件性能更好,接入難度更低,其數(shù)據(jù)分片、讀寫(xiě)分離功能也覆蓋了我們的業(yè)務(wù)訴求,因此我們?cè)跇I(yè)務(wù)中廣泛使用了Sharding-JDBC。但在使用Sharding-JDBC的過(guò)程中,我們也發(fā)現(xiàn)了諸多問(wèn)題,為了業(yè)務(wù)更便捷的使用Sharding-JDBC,我們對(duì)源碼做了針對(duì)性的定制開(kāi)發(fā)和組件封裝來(lái)滿足業(yè)務(wù)需求。
三、源碼解析
3.1 引言
Sharding-JDBC作為基于JDBC的數(shù)據(jù)庫(kù)中間件,實(shí)現(xiàn)了JDBC的標(biāo)準(zhǔn)api,Sharding-JDBC與原生JDBC的執(zhí)行對(duì)比流程如下圖所示:
相關(guān)執(zhí)行流程的代碼樣例如下:
JDBC執(zhí)行樣例
//獲取數(shù)據(jù)庫(kù)連接
try (Connection conn = DriverManager.getConnection("mysqlUrl", "userName", "password")) {
String sql = "SELECT * FROM t_user WHERE name = ?";
//預(yù)編譯SQL
try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
//參數(shù)設(shè)置與執(zhí)行
preparedStatement.setString(1, "vivo");
preparedStatement.execute(sql);
//獲取結(jié)果集
try (ResultSet resultSet = preparedStatement.getResultSet()) {
while (resultSet.next()) {
//處理結(jié)果
}
}
}
}
Sharding-JDBC 源碼
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement#execute
public boolean execute() throws SQLException {
try {
clearPrevious();
//解析+路由+重寫(xiě) 內(nèi)部調(diào)用BasePrepareEngine#prepare方法
prepare();
initPreparedStatementExecutor();
//執(zhí)行
return preparedStatementExecutor.execute();
} finally {
clearBatch();
}
}
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepare
public ExecutionContext prepare(final String sql, final List<Object> parameters) {
List<Object> clonedParameters = cloneParameters(parameters);
//解析+路由(executeRoute內(nèi)部先進(jìn)行解析再執(zhí)行路由)
RouteContext routeContext = executeRoute(sql, clonedParameters);
ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
//重寫(xiě)
result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));
if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
}
return result;
}
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet
public ResultSet getResultSet() throws SQLException {
if (null != currentResultSet) {
return currentResultSet;
}
if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
List<ResultSet> resultSets = getResultSets();
//歸并結(jié)果集
MergedResult mergedResult = mergeQuery(getQueryResults(resultSets));
currentResultSet = new ShardingResultSet(resultSets, mergedResult, this, executionContext);
}
return currentResultSet;
}
從對(duì)比的執(zhí)行流程圖可見(jiàn):
- 【JDBC】:執(zhí)行的主要流程是通過(guò)Datasource獲取Connection,再注入SQL語(yǔ)句生成PreparedStatement對(duì)象,PreparedStatement設(shè)置占位符參數(shù)執(zhí)行后得到結(jié)果集ResultSet。
- 【Sharding-JDBC】:主要流程基本一致,但Sharding基于PreparedStatement進(jìn)行了實(shí)現(xiàn)與擴(kuò)展,具體實(shí)現(xiàn)類(lèi)ShardingPreparedStatement中會(huì)抽象出解析、路由、重寫(xiě)、歸并等引擎,從而實(shí)現(xiàn)分庫(kù)分表、讀寫(xiě)分離等能力,每個(gè)引擎的作用說(shuō)明如下表所示:
//*相關(guān)引擎的源碼解析在下文會(huì)作更深入的闡述。
3.2 解析引擎
3.2.1 引擎解析
解析引擎是Sharding-JDBC進(jìn)行分庫(kù)分表邏輯的基礎(chǔ),其作用是將SQL拆解為不可再分的原子符號(hào)(稱(chēng)為token),再根據(jù)數(shù)據(jù)庫(kù)類(lèi)型將這些token分類(lèi)成關(guān)鍵字、表達(dá)式、操作符、字面量等不同類(lèi)型,進(jìn)而生成抽象語(yǔ)法樹(shù),而語(yǔ)法樹(shù)是后續(xù)進(jìn)行路由、改寫(xiě)操作的前提(這也正是語(yǔ)法樹(shù)的存在使得Sharding-JDBC存在各式各樣的語(yǔ)法限制的原因之一)。
▲圖片來(lái)源:ShardingSphere 官方文檔
4.x的版本采用ANTLR(ANother Tool for Language Recognition)作為解析引擎,在ShardingSphere-sql-parser-dialect模塊中定義了適用于不同數(shù)據(jù)庫(kù)語(yǔ)法的解析規(guī)則(.g4文件),idea中也可以下載ANTLR v4的插件,輸入SQL查看解析后的語(yǔ)法樹(shù)結(jié)果。
解析方法的入口在DataNodeRouter的createRouteContext方法中,解析引擎根據(jù)數(shù)據(jù)庫(kù)類(lèi)型和SQL創(chuàng)建SQLParserExecutor執(zhí)行得到解析樹(shù),再通過(guò)ParseTreeVisitor()的visit方法,對(duì)解析樹(shù)進(jìn)行處理得到SQLStatement。ANTLR支持listener和visitor兩種模式的接口,visitor方式可以更靈活的控制解析樹(shù)的遍歷過(guò)程,更適用于SQL解析的場(chǎng)景。
解析引擎核心代碼:
org.apache.shardingsphere.underlying.route.DataNodeRouter#createRouteContext#96
private RouteContext createRouteContext(final String sql, final List<Object> parameters, final boolean useCache) {
//解析引擎解析SQL
SQLStatement sqlStatement = parserEngine.parse(sql, useCache);
try {
SQLStatementContext sqlStatementContext = SQLStatementContextFactory.newInstance(metaData.getSchema(), sql, parameters, sqlStatement);
return new RouteContext(sqlStatementContext, parameters, new RouteResult());
// TODO should pass parameters for master-slave
} catch (final IndexOutOfBoundsException ex) {
return new RouteContext(new CommonSQLStatementContext(sqlStatement), parameters, new RouteResult());
}
}
org.apache.shardingsphere.sql.parser.SQLParserEngine#parse0#72
private SQLStatement parse0(final String sql, final boolean useCache) {
//緩存
if (useCache) {
Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql);
if (cachedSQLStatement.isPresent()) {
return cachedSQLStatement.get();
}
}
//根據(jù)數(shù)據(jù)庫(kù)類(lèi)型和sql生成解析樹(shù)
ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode();
//ParseTreeVisitor的visit方法對(duì)解析樹(shù)進(jìn)行處理得到SQLStatement
SQLStatement result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree);
if (useCache) {
cache.put(sql, result);
}
return result;
}
SQLStatement實(shí)際上是一個(gè)接口,其實(shí)現(xiàn)對(duì)應(yīng)著不同的SQL類(lèi)型,如SelectStatement 類(lèi)中就包括查詢的字段、表名、where條件、分組、排序、分頁(yè)、lock等變量,可以看到這里并沒(méi)有對(duì)having這種字段做定義,相當(dāng)于Sharding-JDBC無(wú)法識(shí)別到SQL中的having,這使得Sharding-JDBC對(duì)having語(yǔ)法有一定的限制。
SelectStatement
public final class SelectStatement extends DMLStatement {
// 字段
private ProjectionsSegment projections;
// 表
private final Collection<TableReferenceSegment> tableReferences = new LinkedList<>();
// where
private WhereSegment where;
// groupBy
private GroupBySegment groupBy;
// orderBy
private OrderBySegment orderBy;
// limit
private LimitSegment limit;
// 父statement
private SelectStatement parentStatement;
// lock
private LockSegment lock;
}
SQLStatement還會(huì)被進(jìn)一步轉(zhuǎn)換成SQLStatementContext,如SelectStatement 會(huì)被轉(zhuǎn)換成SelectStatementContext ,其結(jié)構(gòu)與SelectStatement 類(lèi)似不再多說(shuō),值得注意的是雖然這里定義了containsSubquery來(lái)判斷是否包含子查詢,但4.1.1源碼永遠(yuǎn)是返回的false,與having類(lèi)似,這意味著Sharding-JDBC不會(huì)對(duì)子查詢語(yǔ)句做特殊處理。
SelectStatementContext
public final class SelectStatementContext extends CommonSQLStatementContext<SelectStatement> implements TableAvailable, WhereAvailable {
private final TablesContext tablesContext;
private final ProjectionsContext projectionsContext;
private final GroupByContext groupByContext;
private final OrderByContext orderByContext;
private final PaginationContext paginationContext;
private final boolean containsSubquery;
}
private boolean containsSubquery() {
// FIXME process subquery
// Collection<SubqueryPredicateSegment> subqueryPredicateSegments = getSqlStatement().findSQLSegments(SubqueryPredicateSegment.class);
// for (SubqueryPredicateSegment each : subqueryPredicateSegments) {
// if (!each.getAndPredicates().isEmpty()) {
// return true;
// }
// }
return false;
}
3.2.2 引擎總結(jié)
解析引擎是進(jìn)行路由改寫(xiě)的前提基礎(chǔ),其作用就是將SQL按照定義的語(yǔ)法規(guī)則拆分成原子符號(hào)(token),生成語(yǔ)法樹(shù),根據(jù)不同的SQL類(lèi)型生成對(duì)應(yīng)的SQLStatement,SQLStatement由各自的Segment組成,所有的Segment都包含startIndex和endIndex來(lái)定位token在SQL中所屬的位置,但解析語(yǔ)法難以涵蓋所有的SQL場(chǎng)景,使得部分SQL無(wú)法按照預(yù)期的結(jié)果路由執(zhí)行。
3.3 路由引擎
3.3.1 引擎解析
路由引擎是Sharding-JDBC的核心步驟,作用是根據(jù)定義的分庫(kù)分表規(guī)則將解析引擎生成的SQL上下文生成對(duì)應(yīng)的路由結(jié)果,RouteResult 包括DataNode和RouteUnit,DataNode是實(shí)際的數(shù)據(jù)源節(jié)點(diǎn),包括數(shù)據(jù)源名稱(chēng)和實(shí)際的物理表名,RouteUnit則記錄了邏輯表/庫(kù)與物理表/庫(kù)的映射關(guān)系,后面的改寫(xiě)引擎也是根據(jù)這個(gè)映射關(guān)系來(lái)決定如何替換SQL中的邏輯表(實(shí)際上RouteResult 就是維護(hù)了一條SQL需要往哪些庫(kù)哪些表執(zhí)行的關(guān)系)。
RouteResult
public final class RouteResult {
private final Collection<Collection<DataNode>> originalDataNodes = new LinkedList<>();
private final Collection<RouteUnit> routeUnits = new LinkedHashSet<>();
}
public final class DataNode {
private static final String DELIMITER = ".";
private final String dataSourceName;
private final String tableName;
}
public final class RouteUnit {
private final RouteMapper dataSourceMapper;
private final Collection<RouteMapper> tableMappers;
}
public final class RouteMapper {
private final String logicName;
private final String actualName;
}
其中,路由有分為分片路由和主從路由,兩者可以單獨(dú)使用,也可以組合使用。
分片路由
ShardingRouteDecorator的decorate方法是路由引擎的核心邏輯,經(jīng)過(guò)SQL校驗(yàn)->生成分片條件->合并分片值后得到路由結(jié)果。
分片路由decorate方法
org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate#57
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
SQLStatementContext sqlStatementContext = routeContext.getSqlStatementContext();
List<Object> parameters = routeContext.getParameters();
//SQL校驗(yàn) 校驗(yàn)INSERT INTO .... ON DUPLICATE KEY UPDATE 和UPDATE語(yǔ)句中是否存在分片鍵
ShardingStatementValidatorFactory.newInstance(
sqlStatementContext.getSqlStatement()).ifPresent(validator -> validator.validate(shardingRule, sqlStatementContext.getSqlStatement(), parameters));
//生成分片條件
ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);
//合并分片值
boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);
if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) {
checkSubqueryShardingValues(sqlStatementContext, shardingRule, shardingConditions);
mergeShardingConditions(shardingConditions);
}
ShardingRouteEngine shardingRouteEngine = ShardingRouteEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions, properties);
//得到路由結(jié)果
RouteResult routeResult = shardingRouteEngine.route(shardingRule);
if (needMergeShardingValues) {
Preconditions.checkState(1 == routeResult.getRouteUnits().size(), "Must have one sharding with subquery.");
}
return new RouteContext(sqlStatementContext, parameters, routeResult);
}
ShardingStatementValidator有ShardingInsertStatementValidator和ShardingUpdateStatementValidator兩種實(shí)現(xiàn),INSERT INTO .... ON DUPLICATE KEY UPDATE和UPDATE語(yǔ)法都會(huì)涉及到字段值的更新,Sharding-JDBC是不允許更新分片值的,畢竟修改分片值還需要將數(shù)據(jù)遷移至新分片值對(duì)應(yīng)的庫(kù)表中,才能保證數(shù)據(jù)分片規(guī)則一致。兩者的校驗(yàn)細(xì)節(jié)也有所不同:
- INSERT INTO .... ON DUPLICATE KEY UPDATE僅僅是對(duì)UPDATE字段的校驗(yàn), ON DUPLICATE KEY UPDATE中包含分片鍵就會(huì)報(bào)錯(cuò);
- 而UPDATE語(yǔ)句則會(huì)額外校驗(yàn)WHERE條件中分片鍵的原始值和SET的值是否一樣,不一樣則會(huì)拋出異常。
ShardingCondition中只有一個(gè)變量routeValues,RouteValue是一個(gè)接口,有ListRouteValue和RangeRouteValue兩種實(shí)現(xiàn),前者記錄了分片鍵的in或=條件的分片值,后者則記錄了范圍查詢的分片值,兩者被封裝為ShardingValue對(duì)象后,將會(huì)透?jìng)髦练制惴ㄖ杏?jì)算得到分片結(jié)果集。
ShardingCondition
public final class ShardingConditions {
private final List<ShardingCondition> conditions;
}
public class ShardingCondition {
private final List<RouteValue> routeValues = new LinkedList<>();
}
public final class ListRouteValue<T extends Comparable<?>> implements RouteValue {
private final String columnName;
private final String tableName;
//in或=條件對(duì)應(yīng)的值
private final Collection<T> values;
@Override
public String toString() {
return tableName + "." + columnName + (1 == values.size() ? " = " + new ArrayList<>(values).get(0) : " in (" + Joiner.on(",").join(values) + ")");
}
}
public final class RangeRouteValue<T extends Comparable<?>> implements RouteValue {
private final String columnName;
private final String tableName;
//between and 大于小于等范圍值的上下限
private final Range<T> valueRange;
}
生成分片條件后還會(huì)合并分片條件,但是前文提過(guò)在SelectStatementContext中的containsSubquery永遠(yuǎn)是false,所以這段邏輯永遠(yuǎn)返回false,即不會(huì)合并分片條件。
判斷是否需要合并分片條件
org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#isNeedMergeShardingValues#87
private boolean isNeedMergeShardingValues(final SQLStatementContext sqlStatementContext, final ShardingRule shardingRule) {
return sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsSubquery()
&& !shardingRule.getShardingLogicTableNames(sqlStatementContext.getTablesContext().getTableNames()).isEmpty();
}
然后就是通過(guò)分片路由引擎調(diào)用分片算法計(jì)算路由結(jié)果了,ShardingRouteEngine實(shí)現(xiàn)較多,介紹起來(lái)篇幅較多,這里就不展開(kāi)說(shuō)明了,可以參考官方文檔來(lái)了解路由引擎的選擇規(guī)則。
▲圖片來(lái)源:ShardingSphere 官方文檔
Sharding-JDBC定義了多種分片策略和算法接口,主要的分配策略與算法說(shuō)明如下表所示:
補(bǔ)充兩個(gè)細(xì)節(jié):
(1)當(dāng)ALLOW_RANGE_QUERY_WITH
_INLINE_SHARDING配置設(shè)置true時(shí),InlineShardingStrategy支持范圍查詢,但是并不是根據(jù)分片值計(jì)算范圍,而是直接全路由至配置的數(shù)據(jù)節(jié)點(diǎn),會(huì)存在性能隱患。
InlineShardingStrategy.doSharding
org.apache.shardingsphere.core.strategy.route.inline.InlineShardingStrategy#doSharding
public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<RouteValue> shardingValues, final ConfigurationProperties properties) {
RouteValue shardingValue = shardingValues.iterator().next();
//ALLOW_RANGE_QUERY_WITH_INLINE_SHARDING設(shè)置為true,直接返回availableTargetNames,而不是根據(jù)RangeRouteValue計(jì)算
if (properties.<Boolean>getValue(ConfigurationPropertyKey.ALLOW_RANGE_QUERY_WITH_INLINE_SHARDING) && shardingValue instanceof RangeRouteValue) {
return availableTargetNames;
}
Preconditions.checkState(shardingValue instanceof ListRouteValue, "Inline strategy cannot support this type sharding:" + shardingValue.toString());
Collection<String> shardingResult = doSharding((ListRouteValue) shardingValue);
Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
for (String each : shardingResult) {
if (availableTargetNames.contains(each)) {
result.add(each);
}
}
return result;
}
(2)4.1.1的官方文檔雖然說(shuō)Hint可以跳過(guò)解析和改寫(xiě),但在我們上面解析引擎的源碼解析中,我們并沒(méi)有看到有對(duì)Hint策略的額外跳過(guò)。事實(shí)上,即使使用了Hint分片SQL也同樣需要解析重寫(xiě),也同樣受Sharding-JDBC的語(yǔ)法限制,這在官方的issue中也曾經(jīng)被提及。
▲圖片來(lái)源:ShardingSphere 官方文檔
主從路由
主從路由的核心邏輯就是通過(guò)MasterSlaveDataSourceRouter的route方法進(jìn)行判定SQL走主庫(kù)還是從庫(kù)。主從情況下,配置的數(shù)據(jù)源實(shí)際是一組主從,而不是單個(gè)的實(shí)例,所以需要通過(guò)masterSlaveRule獲取到具體的主庫(kù)或者從庫(kù)名字。
主從路由decorate
org.apache.shardingsphere.masterslave.route.engine.MasterSlaveRouteDecorator#decorate
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final MasterSlaveRule masterSlaveRule, final ConfigurationProperties properties) {
//為空證明沒(méi)有經(jīng)過(guò)分片路由
if (routeContext.getRouteResult().getRouteUnits().isEmpty()) {
//根據(jù)SQL判斷選擇走主庫(kù)還是從庫(kù)
String dataSourceName = new MasterSlaveDataSourceRouter(masterSlaveRule).route(routeContext.getSqlStatementContext().getSqlStatement());
RouteResult routeResult = new RouteResult();
//根據(jù)具體的主庫(kù)/從庫(kù)名創(chuàng)建路由單元
routeResult.getRouteUnits().add(new RouteUnit(new RouteMapper(dataSourceName, dataSourceName), Collections.emptyList()));
return new RouteContext(routeContext.getSqlStatementContext(), Collections.emptyList(), routeResult);
}
Collection<RouteUnit> toBeRemoved = new LinkedList<>();
Collection<RouteUnit> toBeAdded = new LinkedList<>();
//不為空證明已經(jīng)被分片路由處理了
for (RouteUnit each : routeContext.getRouteResult().getRouteUnits()) {
if (masterSlaveRule.getName().equalsIgnoreCase(each.getDataSourceMapper().getActualName())) {
//先標(biāo)記移除 因?yàn)檫@里是一組主從的名字而不是實(shí)際的庫(kù)
toBeRemoved.add(each);
//根據(jù)SQL判斷選擇走主庫(kù)還是從庫(kù)
String actualDataSourceName = new MasterSlaveDataSourceRouter(masterSlaveRule).route(routeContext.getSqlStatementContext().getSqlStatement());
//根據(jù)具體的主庫(kù)/從庫(kù)名創(chuàng)建路由單元
toBeAdded.add(new RouteUnit(new RouteMapper(each.getDataSourceMapper().getLogicName(), actualDataSourceName), each.getTableMappers()));
}
}
routeContext.getRouteResult().getRouteUnits().removeAll(toBeRemoved);
routeContext.getRouteResult().getRouteUnits().addAll(toBeAdded);
return routeContext;
}
MasterSlaveDataSourceRouter中isMasterRoute方法會(huì)判斷SQL是否需要走主庫(kù),當(dāng)出現(xiàn)以下情況時(shí)走主庫(kù):
- select語(yǔ)句包含鎖,如for update語(yǔ)句
- 不是select語(yǔ)句
- MasterVisitedManager.isMasterVisited()設(shè)置為true
- HintManager.isMasterRouteOnly()設(shè)置為true
不走主庫(kù)則通過(guò)負(fù)載算法選擇從庫(kù),Sharding-JDBC提供了輪詢和隨機(jī)兩種算法。
MasterSlaveDataSourceRouter
public final class MasterSlaveDataSourceRouter {
private final MasterSlaveRule masterSlaveRule;
/**
* Route.
*
* @param sqlStatement SQL statement
* @return data source name
*/
public String route(final SQLStatement sqlStatement) {
if (isMasterRoute(sqlStatement)) {
MasterVisitedManager.setMasterVisited();
return masterSlaveRule.getMasterDataSourceName();
}
return masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(
masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames()));
}
private boolean isMasterRoute(final SQLStatement sqlStatement) {
return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly();
}
private boolean containsLockSegment(final SQLStatement sqlStatement) {
return sqlStatement instanceof SelectStatement && ((SelectStatement) sqlStatement).getLock().isPresent();
}
}
是否走主庫(kù)的信息存在MasterVisitedManager中,MasterVisitedManager是通過(guò)ThreadLocal實(shí)現(xiàn)的,但這種實(shí)現(xiàn)會(huì)有一個(gè)問(wèn)題,當(dāng)我們使用事務(wù)先查詢?cè)俑?插入時(shí),第一條查詢SQL并不會(huì)走主庫(kù),而是走從庫(kù),如果業(yè)務(wù)需要事務(wù)的第一條查詢也走主庫(kù),事務(wù)查詢前需要手動(dòng)調(diào)用一次MasterVisitedManager.setMasterVisited()。
MasterVisitedManager
public final class MasterVisitedManager {
private static final ThreadLocal<Boolean> MASTER_VISITED = ThreadLocal.withInitial(() -> false);
/**
* Judge master data source visited in current thread.
*
* @return master data source visited or not in current thread
*/
public static boolean isMasterVisited() {
return MASTER_VISITED.get();
}
/**
* Set master data source visited in current thread.
*/
public static void setMasterVisited() {
MASTER_VISITED.set(true);
}
/**
* Clear master data source visited.
*/
public static void clear() {
MASTER_VISITED.remove();
}
}
3.3.2 引擎總結(jié)
路由引擎的作用是將SQL根據(jù)參數(shù)通過(guò)實(shí)現(xiàn)的策略算法計(jì)算出實(shí)際該在哪些庫(kù)的哪些表執(zhí)行,也就是路由結(jié)果。路由引擎有兩種實(shí)現(xiàn),分別是分片路由和主從路由,兩者都提供了標(biāo)準(zhǔn)化的策略接口來(lái)讓業(yè)務(wù)實(shí)現(xiàn)自己的路由策略,分片路由需要注意自身SQL場(chǎng)景和策略算法相匹配,主從路由中同一線程且同一數(shù)據(jù)庫(kù)連接內(nèi),有寫(xiě)入操作后,之后的讀操作會(huì)從主庫(kù)讀取,寫(xiě)入操作前的讀操作不會(huì)走主庫(kù)。
3.4 改寫(xiě)引擎
3.4.1 引擎解析
經(jīng)過(guò)解析路由后雖然確定了執(zhí)行的實(shí)際庫(kù)表,但SQL中表名依舊是邏輯表,不能執(zhí)行,改寫(xiě)引擎可以將邏輯表替換為物理表。同時(shí),路由至多庫(kù)表的SQL也需要拆分為多條SQL執(zhí)行。
改寫(xiě)的入口仍舊在BasePrepareEngine中,創(chuàng)建重寫(xiě)上下文createSQLRewriteContext,再根據(jù)上下文進(jìn)行改寫(xiě)rewrite,最終返回執(zhí)行單元ExecutionUnit。
改寫(xiě)邏輯入口
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#executeRewrite
private Collection<ExecutionUnit> executeRewrite(final String sql, final List<Object> parameters, final RouteContext routeContext) {
//注冊(cè)重寫(xiě)裝飾器
registerRewriteDecorator();
//創(chuàng)建 SQLRewriteContext
SQLRewriteContext sqlRewriteContext = rewriter.createSQLRewriteContext(sql, parameters, routeContext.getSqlStatementContext(), routeContext);
//重寫(xiě)
return routeContext.getRouteResult().getRouteUnits().isEmpty() ? rewrite(sqlRewriteContext) : rewrite(routeContext, sqlRewriteContext);
}
執(zhí)行單元包含了數(shù)據(jù)源名稱(chēng),改寫(xiě)后的SQL,以及對(duì)應(yīng)的參數(shù),SQL一樣的兩個(gè)SQLUnit會(huì)被視為相等。
ExecutionUnit
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class ExecutionUnit {
private final String dataSourceName;
private final SQLUnit sqlUnit;
}
@AllArgsConstructor
@RequiredArgsConstructor
@Getter
@Setter
//根據(jù)sql判斷是否相等
@EqualsAndHashCode(of = { "sql" })
@ToString
public final class SQLUnit {
private String sql;
private final List<Object> parameters;
}
createSQLRewriteContext完成了兩件事,一個(gè)是對(duì)SQL參數(shù)進(jìn)行了重寫(xiě),一個(gè)是生成了SQLToken。
createSQLRewriteContext
org.apache.shardingsphere.underlying.rewrite.SQLRewriteEntry#createSQLRewriteContext
public SQLRewriteContext createSQLRewriteContext(final String sql, final List<Object> parameters, final SQLStatementContext sqlStatementContext, final RouteContext routeContext) {
SQLRewriteContext result = new SQLRewriteContext(schemaMetaData, sqlStatementContext, sql, parameters);
//sql參數(shù)重寫(xiě)
decorate(decorators, result, routeContext);
//生成SQLToken
result.generateSQLTokens();
return result;
}
org.apache.shardingsphere.sharding.rewrite.context.ShardingSQLRewriteContextDecorator#decorate
public void decorate(final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLRewriteContext sqlRewriteContext) {
for (ParameterRewriter each : new ShardingParameterRewriterBuilder(shardingRule, routeContext).getParameterRewriters(sqlRewriteContext.getSchemaMetaData())) {
if (!sqlRewriteContext.getParameters().isEmpty() && each.isNeedRewrite(sqlRewriteContext.getSqlStatementContext())) {
//參數(shù)重寫(xiě)
each.rewrite(sqlRewriteContext.getParameterBuilder(), sqlRewriteContext.getSqlStatementContext(), sqlRewriteContext.getParameters());
}
}
//sqlTokenGenerators
sqlRewriteContext.addSQLTokenGenerators(new ShardingTokenGenerateBuilder(shardingRule, routeContext).getSQLTokenGenerators());
}
org.apache.shardingsphere.underlying.rewrite.context.SQLRewriteContext#generateSQLTokens
public void generateSQLTokens() {
sqlTokens.addAll(sqlTokenGenerators.generateSQLTokens(sqlStatementContext, parameters, schemaMetaData));
}
ParameterRewriter中與分片相關(guān)的實(shí)現(xiàn)有兩種。
//*詳細(xì)的例子可以參考官方文檔中分頁(yè)修正和補(bǔ)列部分。
SQLToken記錄了SQL中每個(gè)token(解析引擎中提過(guò)的不可再分的原子符號(hào))的起始位置,從而方便改寫(xiě)引擎知道哪些位置需要改寫(xiě)。
SQLToken
@RequiredArgsConstructor
@Getter
public abstract class SQLToken implements Comparable<SQLToken> {
private final int startIndex;
@Override
public final int compareTo(final SQLToken sqlToken) {
return startIndex - sqlToken.getStartIndex();
}
}
創(chuàng)建完SQLRewriteContext后就對(duì)整條SQL進(jìn)行重寫(xiě)和組裝參數(shù),可以看出每個(gè)RouteUnit都會(huì)重寫(xiě)SQL并獲取自己對(duì)應(yīng)的參數(shù)。
SQLRouteRewriteEngine.rewrite
org.apache.shardingsphere.underlying.rewrite.engine.SQLRouteRewriteEngine#rewrite
public Map<RouteUnit, SQLRewriteResult> rewrite(final SQLRewriteContext sqlRewriteContext, final RouteResult routeResult) {
Map<RouteUnit, SQLRewriteResult> result = new LinkedHashMap<>(routeResult.getRouteUnits().size(), 1);
for (RouteUnit each : routeResult.getRouteUnits()) {
//重寫(xiě)SQL+組裝參數(shù)
result.put(each, new SQLRewriteResult(new RouteSQLBuilder(sqlRewriteContext, each).toSQL(), getParameters(sqlRewriteContext.getParameterBuilder(), routeResult, each)));
}
return result;
}
toSQL核心就是根據(jù)SQLToken將SQL拆分改寫(xiě)再拼裝,比如:
select * from t_order where created_by = '123'
就會(huì)被拆分為select * from | t_order | where created_by = '123'三部分進(jìn)行改寫(xiě)拼裝。
toSQL
org.apache.shardingsphere.underlying.rewrite.sql.impl.AbstractSQLBuilder#toSQL
public final String toSQL() {
if (context.getSqlTokens().isEmpty()) {
return context.getSql();
}
Collections.sort(context.getSqlTokens());
StringBuilder result = new StringBuilder();
//截取第一個(gè)SQLToken之前的內(nèi)容 select * from
result.append(context.getSql().substring(0, context.getSqlTokens().get(0).getStartIndex()));
for (SQLToken each : context.getSqlTokens()) {
//重寫(xiě)拼接每個(gè)SQLToken對(duì)應(yīng)的內(nèi)容 t_order ->t_order_0
result.append(getSQLTokenText(each));
//拼接SQLToken中間不變的內(nèi)容 where created_by = '123'
result.append(getConjunctionText(each));
}
return result.toString();
}
ParameterBuilder有StandardParameterBuilder和GroupedParameterBuilder兩個(gè)實(shí)現(xiàn)。
- StandardParameterBuilder:適用于非insert語(yǔ)句,getParameters無(wú)需分組處理直接返回即可。
- GroupedParameterBuilder:適用于insert語(yǔ)句,需要根據(jù)路由情況對(duì)參數(shù)進(jìn)行分組。
原因和樣例可以參考官方文檔批量拆分部分。
getParameters
org.apache.shardingsphere.underlying.rewrite.engine.SQLRouteRewriteEngine#getParameters
private List<Object> getParameters(final ParameterBuilder parameterBuilder, final RouteResult routeResult, final RouteUnit routeUnit) {
if (parameterBuilder instanceof StandardParameterBuilder || routeResult.getOriginalDataNodes().isEmpty() || parameterBuilder.getParameters().isEmpty()) {
//非插入語(yǔ)句直接返回
return parameterBuilder.getParameters();
}
List<Object> result = new LinkedList<>();
int count = 0;
for (Collection<DataNode> each : routeResult.getOriginalDataNodes()) {
if (isInSameDataNode(each, routeUnit)) {
//插入語(yǔ)句參數(shù)分組構(gòu)造
result.addAll(((GroupedParameterBuilder) parameterBuilder).getParameters(count));
}
count++;
}
return result;
}
3.4.2 引擎總結(jié)
改寫(xiě)引擎的作用是將邏輯SQL轉(zhuǎn)換為實(shí)際可執(zhí)行的SQL,這其中既有邏輯表名的替換,也有多路由的SQL拆分,還有為了后續(xù)歸并操作而進(jìn)行的分頁(yè)、分組、排序等改寫(xiě),select語(yǔ)句不會(huì)對(duì)參數(shù)進(jìn)行重組,而insert語(yǔ)句為了避免插入多余數(shù)據(jù),會(huì)通過(guò)路由單元對(duì)參數(shù)進(jìn)行重組。
3.5 執(zhí)行引擎
3.5.1 引擎解析
改寫(xiě)完成后的SQL就可以執(zhí)行了,執(zhí)行引擎需要平衡好資源和效率,如果為每條真實(shí)SQL都創(chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接顯然會(huì)造成資源的濫用,但如果單線程串行也必然會(huì)影響執(zhí)行效率。
執(zhí)行引擎會(huì)先將執(zhí)行單元中需要執(zhí)行的SQLUnit根據(jù)數(shù)據(jù)源分組,同一個(gè)數(shù)據(jù)源下的SQLUnit會(huì)放入一個(gè)list,然后會(huì)根據(jù)maxConnectionsSizePerQuery對(duì)同一個(gè)數(shù)據(jù)源的SQLUnit繼續(xù)分組,創(chuàng)建連接并綁定SQLUnit。
執(zhí)行組創(chuàng)建
org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate#getSynchronizedExecuteUnitGroups
private Collection<InputGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups(
final Collection<ExecutionUnit> executionUnits, final SQLExecutePrepareCallback callback) throws SQLException {
//根據(jù)數(shù)據(jù)源將SQLUnit分組 key=dataSourceName
Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(executionUnits);
Collection<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();
//創(chuàng)建sql執(zhí)行組
for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback));
}
return result;
}
org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate#getSQLExecuteGroups
private List<InputGroup<StatementExecuteUnit>> getSQLExecuteGroups(final String dataSourceName,
final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
List<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();
//每個(gè)連接需要執(zhí)行的最大sql數(shù)量
int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);
//分組,每組對(duì)應(yīng)一條數(shù)據(jù)庫(kù)連接
List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);
//選擇連接模式 連接限制/內(nèi)存限制
ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
//創(chuàng)建連接
List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size());
int count = 0;
for (List<SQLUnit> each : sqlUnitPartitions) {
//綁定連接和SQLUnit 創(chuàng)建StatementExecuteUnit
result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));
}
return result;
}
SQLUnit分組和連接模式選擇沒(méi)有任何關(guān)系,連接模式的選擇只取決于maxConnectionsSizePerQuery和SQLUnit數(shù)量的大小關(guān)系,maxConnectionsSizePerQuery代表了一個(gè)數(shù)據(jù)源一次查詢?cè)试S的最大連接數(shù)。
- 當(dāng)maxConnectionsSizePerQuery<sqlunit數(shù)量時(shí),意味著無(wú)法做到每個(gè)sqlunit獨(dú)享一個(gè)連接,需要直接查詢出結(jié)果集至內(nèi)存中;< li="">
- 當(dāng)maxConnectionsSizePerQuery>=SQLUnit數(shù)量時(shí),意味著可以支持每個(gè)SQLUnit獨(dú)享一個(gè)連接,可以通過(guò)ResultSet游標(biāo)下移的方式查詢結(jié)果集。
不過(guò)maxConnectionsSizePerQuery默認(rèn)值為1,所以當(dāng)一條SQL需要路由至多張表時(shí)(即有多個(gè)SQLUnit)會(huì)采用連接限制,當(dāng)路由至單表時(shí)是內(nèi)存限制模式。
為了避免產(chǎn)生數(shù)據(jù)庫(kù)連接死鎖問(wèn)題,在內(nèi)存限制模式時(shí),Sharding-JDBC通過(guò)鎖住數(shù)據(jù)源對(duì)象一次性創(chuàng)建出本條SQL需要的所有數(shù)據(jù)庫(kù)連接。連接限制模式下,各連接一次性查出各自的結(jié)果,不會(huì)出現(xiàn)多連接相互等待的情況,因此不會(huì)發(fā)生死鎖,而內(nèi)存限制模式通過(guò)游標(biāo)讀取結(jié)果集,需要多條連接去查詢不同的表做合并,如果不一次性拿到所有需要的連接,則可能存在連接相互等待的情況造成死鎖。可以參照官方文檔中執(zhí)行引擎相關(guān)例子。
不同連接模式創(chuàng)建連接
private List<Connection> createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException {
if (1 == connectionSize) {
Connection connection = createConnection(dataSourceName, dataSource);
replayMethodsInvocation(connection);
return Collections.singletonList(connection);
}
if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
return createConnections(dataSourceName, dataSource, connectionSize);
}
//內(nèi)存限制模式加鎖 一次性獲取所有的連接
synchronized (dataSource) {
return createConnections(dataSourceName, dataSource, connectionSize);
}
}
此外,結(jié)果集的內(nèi)存合并和流式合并只在調(diào)用JDBC的executeQuery的情況下生效,如果使用execute方式進(jìn)行查詢,都是統(tǒng)一使用流式方式的查詢。
查詢結(jié)果歸并對(duì)比
org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#executeQuery#101
org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#getQueryResult
private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException {
PreparedStatement preparedStatement = (PreparedStatement) statement;
ResultSet resultSet = preparedStatement.executeQuery();
getResultSets().add(resultSet);
//executeQuery 中根據(jù)連接模式選擇流式/內(nèi)存
return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet);
}
//execute 單獨(dú)調(diào)用getResultSet中只會(huì)使用流式合并
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet#158
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getQueryResults
private List<QueryResult> getQueryResults(final List<ResultSet> resultSets) throws SQLException {
List<QueryResult> result = new ArrayList<>(resultSets.size());
for (ResultSet each : resultSets) {
if (null != each) {
result.add(new StreamQueryResult(each));
}
}
return result;
}
多條連接的執(zhí)行方式分為串行和并行,在本地事務(wù)和XA事務(wù)中是串行的方式,其余情況是并行,具體的執(zhí)行邏輯這里就不再展開(kāi)了。
isHoldTransaction
public boolean isHoldTransaction() {
return (TransactionType.LOCAL == transactionType && !getAutoCommit()) || (TransactionType.XA == transactionType && isInShardingTransaction());
}
3.5.2 引擎總結(jié)
執(zhí)行引擎通過(guò)maxConnectionsSizePerQuery和同數(shù)據(jù)源的SQLUnit的數(shù)量大小確定連接模式,maxConnectionsSizePerQuery=SQLUnit數(shù)量使用內(nèi)存限制模式,當(dāng)使用內(nèi)存限制模式時(shí)會(huì)通過(guò)對(duì)數(shù)據(jù)源對(duì)象加鎖來(lái)保證一次性獲取本條SQL需要的連接而避免死鎖。在使用executeQuery查詢時(shí),處理結(jié)果集時(shí)會(huì)根據(jù)連接模式選擇流式或者內(nèi)存合并,但使用execute方法查詢,處理結(jié)果集只會(huì)使用流式合并。
3.6 歸并引擎
3.6.1 引擎解析
查詢出的結(jié)果集需要經(jīng)過(guò)歸并引擎歸并后才是最終的結(jié)果,歸并的核心入口在MergeEntry的process方法中,優(yōu)先處理分片場(chǎng)景的合并,再進(jìn)行脫敏,只有讀寫(xiě)分離的情況下則直接返回TransparentMergedResult,TransparentMergedResult實(shí)際上沒(méi)做合并的額外處理,其內(nèi)部實(shí)現(xiàn)都是完全調(diào)用queryResult的實(shí)現(xiàn)。
歸并邏輯入口
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#mergeQuery#190
org.apache.shardingsphere.underlying.pluggble.merge.MergeEngine#merge#61
org.apache.shardingsphere.underlying.merge.MergeEntry#process
public MergedResult process(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
//分片合并
Optional<MergedResult> mergedResult = merge(queryResults, sqlStatementContext);
//脫敏處理
Optional<MergedResult> result = mergedResult.isPresent() ? Optional.of(decorate(mergedResult.get(), sqlStatementContext)) : decorate(queryResults.get(0), sqlStatementContext);
//只有讀寫(xiě)分離的情況下,orElseGet會(huì)不存在,TransparentMergedResult
return result.orElseGet(() -> new TransparentMergedResult(queryResults.get(0)));
}
TransparentMergedResult
@RequiredArgsConstructor
public final class TransparentMergedResult implements MergedResult {
private final QueryResult queryResult;
@Override
public boolean next() throws SQLException {
return queryResult.next();
}
@Override
public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
return queryResult.getValue(columnIndex, type);
}
@Override
public Object getCalendarValue(final int columnIndex, final Class<?> type, final Calendar calendar) throws SQLException {
return queryResult.getCalendarValue(columnIndex, type, calendar);
}
@Override
public InputStream getInputStream(final int columnIndex, final String type) throws SQLException {
return queryResult.getInputStream(columnIndex, type);
}
@Override
public boolean wasNull() throws SQLException {
return queryResult.wasNull();
}
}
我們只看分片相關(guān)的操作,ResultMergerEngine只有一個(gè)實(shí)現(xiàn)類(lèi)ShardingResultMergerEngine,所以只有存在分片情況的時(shí)候,上文的第一個(gè)merge才會(huì)有結(jié)果。根據(jù)SQL類(lèi)型的不同選擇ResultMerger實(shí)現(xiàn),查詢類(lèi)的合并是最常用也是最復(fù)雜的合并。
MergeEntry.merge
org.apache.shardingsphere.underlying.merge.MergeEntry#merge
private Optional<MergedResult> merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
if (entry.getValue() instanceof ResultMergerEngine) {
//選擇不同類(lèi)型的 resultMerger
ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(databaseType, entry.getKey(), properties, sqlStatementContext);
//歸并
return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, schemaMetaData));
}
}
return Optional.empty();
}
org.apache.shardingsphere.sharding.merge.ShardingResultMergerEngine#newInstance
public ResultMerger newInstance(final DatabaseType databaseType, final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLStatementContext sqlStatementContext) {
if (sqlStatementContext instanceof SelectStatementContext) {
return new ShardingDQLResultMerger(databaseType);
}
if (sqlStatementContext.getSqlStatement() instanceof DALStatement) {
return new ShardingDALResultMerger(shardingRule);
}
return new TransparentResultMerger();
}
ShardingDQLResultMerger的merge方法就是根據(jù)SQL解析結(jié)果中包含的token選擇合適的歸并方式(分組聚合、排序、遍歷),歸并后的mergedResult統(tǒng)一經(jīng)過(guò)decorate方法進(jìn)行判斷是否需要分頁(yè)歸并,整體處理流程圖可以概括如下。
歸并方式選擇
org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#merge
public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
if (1 == queryResults.size()) {
return new IteratorStreamMergedResult(queryResults);
}
Map<String, Integer> columnLabelIndexMap = getColumnLabelIndexMap(queryResults.get(0));
SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext;
selectStatementContext.setIndexes(columnLabelIndexMap);
//分組聚合,排序,遍歷
MergedResult mergedResult = build(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
//分頁(yè)歸并
return decorate(queryResults, selectStatementContext, mergedResult);
}
org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#build
private MergedResult build(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,
final Map<String, Integer> columnLabelIndexMap, final SchemaMetaData schemaMetaData) throws SQLException {
if (isNeedProcessGroupBy(selectStatementContext)) {
//分組聚合歸并
return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
}
if (isNeedProcessDistinctRow(selectStatementContext)) {
setGroupByForDistinctRow(selectStatementContext);
//分組聚合歸并
return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
}
if (isNeedProcessOrderBy(selectStatementContext)) {
//排序歸并
return new OrderByStreamMergedResult(queryResults, selectStatementContext, schemaMetaData);
}
//遍歷歸并
return new IteratorStreamMergedResult(queryResults);
}
org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#decorate
private MergedResult decorate(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final MergedResult mergedResult) throws SQLException {
PaginationContext paginationContext = selectStatementContext.getPaginationContext();
if (!paginationContext.isHasPagination() || 1 == queryResults.size()) {
return mergedResult;
}
String trunkDatabaseName = DatabaseTypes.getTrunkDatabaseType(databaseType.getName()).getName();
//根據(jù)數(shù)據(jù)庫(kù)類(lèi)型分頁(yè)歸并
if ("MySQL".equals(trunkDatabaseName) || "PostgreSQL".equals(trunkDatabaseName)) {
return new LimitDecoratorMergedResult(mergedResult, paginationContext);
}
if ("Oracle".equals(trunkDatabaseName)) {
return new RowNumberDecoratorMergedResult(mergedResult, paginationContext);
}
if ("SQLServer".equals(trunkDatabaseName)) {
return new TopAndRowNumberDecoratorMergedResult(mergedResult, paginationContext);
}
return mergedResult;
}
每種歸并方式的作用在官方文檔有比較詳細(xì)的案例,這里就不再重復(fù)介紹了。
3.6.2 引擎總結(jié)
歸并引擎是Sharding-JDBC執(zhí)行SQL的最后一步,其作用是將多個(gè)數(shù)節(jié)點(diǎn)的結(jié)果集組合為一個(gè)正確的結(jié)果集返回,查詢類(lèi)的歸并有分組歸并、聚合歸并、排序歸并、遍歷歸并、分頁(yè)歸并五種,這五種歸并方式并不是互斥的,而是相互組合的。
四、定制開(kāi)發(fā)
在使用Sharding-JDBC過(guò)程中,我們發(fā)現(xiàn)了一些問(wèn)題可以改進(jìn),比如存量系統(tǒng)數(shù)據(jù)量到達(dá)一定規(guī)模而需要分庫(kù)分表引入Sharding-JDBC時(shí),就會(huì)存在兩大問(wèn)題。
一個(gè)是存量數(shù)據(jù)的遷移,這個(gè)問(wèn)題我們可以通過(guò)分片算法兼容,前文已經(jīng)提過(guò)分片鍵的值是不允許更改的,而且SQL如果不包含分片鍵,如果這個(gè)分片鍵對(duì)應(yīng)的值是遞增的(如id,時(shí)間等),我們可以設(shè)置一個(gè)閾值,在分片算法的doSharding中判斷分片值與閾值的大小決定將數(shù)據(jù)路由至舊表或新表,避免數(shù)據(jù)遷移的麻煩。如果是根據(jù)用戶id取模分表,而新增的數(shù)據(jù)無(wú)法只通過(guò)用戶id判斷,這時(shí)可以考慮采用復(fù)合分片算法,將用戶id與訂單id或者時(shí)間等遞增的字段同時(shí)設(shè)置為分片鍵,根據(jù)訂單id或時(shí)間判斷是否是新數(shù)據(jù),再根據(jù)用戶id取模得到路由結(jié)果即可。
另一個(gè)是Sharding-JDBC語(yǔ)法限制會(huì)使得存量SQL面對(duì)巨大的改造壓力,而實(shí)際上業(yè)務(wù)更關(guān)心的是需要分片的表,非分片的表不應(yīng)該發(fā)生改動(dòng)和影響。實(shí)際上,非分片表理論上無(wú)需通過(guò)解析、路由、重寫(xiě)、合并,為此我們?cè)谠创a層面對(duì)這段邏輯進(jìn)行了優(yōu)化,支持跳過(guò)部分解析,完全跳過(guò)分片路由、重寫(xiě)和合并,盡可能減少Sharding-JDBC對(duì)非分片表的語(yǔ)法限制,來(lái)減少業(yè)務(wù)系統(tǒng)的改造壓力與風(fēng)險(xiǎn)。
4.1 跳過(guò)Sharding語(yǔ)法限制
Sharding-JDBC執(zhí)行解析路由重寫(xiě)的邏輯都是在BasePrepareEngine中,最終構(gòu)造ExecutionContext交由執(zhí)行引擎執(zhí)行,ExecutionContext中包含sqlStatementContext和executionUnits,非分片表不涉及路由改寫(xiě),所以其ExecutionUnit我們非常容易手動(dòng)構(gòu)造,而查看SQLStatementContext的使用情況,我們發(fā)現(xiàn)SQLStatementContext只會(huì)影響結(jié)果集的合并而不會(huì)影響實(shí)際的執(zhí)行,而不分片表也無(wú)需進(jìn)行結(jié)果集的合并,整體實(shí)現(xiàn)思路如圖。
ExecutionContext相關(guān)對(duì)象
public class ExecutionContext {
private final SQLStatementContext sqlStatementContext;
private final Collection<ExecutionUnit> executionUnits = new LinkedHashSet<>();
}
public final class ExecutionUnit {
private final String dataSourceName;
private final SQLUnit sqlUnit;
}
public final class SQLUnit {
private String sql;
private final List<Object> parameters;
}
(1)校驗(yàn)SQL中是否包含分片表:我們是通過(guò)正則將SQL中的各個(gè)單詞分隔成Set,然后再遍歷BaseRule判斷是否存在分片表。大家可能會(huì)奇怪明明解析引擎可以幫我們解析出SQL中的表名,為什么還要自己來(lái)解析。因?yàn)槲覀儨y(cè)試的過(guò)程中發(fā)現(xiàn),存量業(yè)務(wù)上的SQL很多在解析階段就會(huì)報(bào)錯(cuò),只能提前判斷,當(dāng)然這種判斷方式并不嚴(yán)謹(jǐn),比如 SELECT order_id FROM t_order_record WHERE order_id=1 AND remarks=' t_order xxx';,配置的分片表t_order時(shí)就會(huì)存在誤判,但這種場(chǎng)景在我們的業(yè)務(wù)中沒(méi)有,所以暫時(shí)并沒(méi)有處理。由于這個(gè)信息需要在多個(gè)對(duì)象方法中使用,為了避免修改大量的對(duì)象變量和方法入?yún)?,而又能方便的透?jìng)鬟@個(gè)信息,判斷的結(jié)果我們選擇放在ThreadLocal里。
RuleContextManager
public final class RuleContextManager {
private static final ThreadLocal<RuleContextManager> SKIP_CONTEXT_HOLDER = ThreadLocal.withInitial(RuleContextManager::new);
/**
* 是否跳過(guò)sharding
*/
private boolean skipSharding;
/**
* 是否路由至主庫(kù)
*/
private boolean masterRoute;
public static boolean isSkipSharding() {
return SKIP_CONTEXT_HOLDER.get().skipSharding;
}
public static void setSkipSharding(boolean skipSharding) {
SKIP_CONTEXT_HOLDER.get().skipSharding = skipSharding;
}
public static boolean isMasterRoute() {
return SKIP_CONTEXT_HOLDER.get().masterRoute;
}
public static void setMasterRoute(boolean masterRoute) {
SKIP_CONTEXT_HOLDER.get().masterRoute = masterRoute;
}
public static void clear(){
SKIP_CONTEXT_HOLDER.remove();
}
}
判斷SQL是否包含分片表
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#buildSkipContext
// 判斷是否可以跳過(guò)sharding,構(gòu)造RuleContextManager的值
private void buildSkipContext(final String sql){
Set<String> sqlTokenSet = new HashSet<>(Arrays.asList(sql.split("[\\s]")));
if (CollectionUtils.isNotEmpty(rules)) {
for (BaseRule baseRule : rules) {
//定制方法,ShardingRule實(shí)現(xiàn),判斷sqlTokenSet是否包含邏輯表即可
if(baseRule.hasContainShardingTable(sqlTokenSet)){
RuleContextManager.setSkipSharding(false);
break;
}else {
RuleContextManager.setSkipSharding(true);
}
}
}
}
org.apache.shardingsphere.core.rule.ShardingRule#hasContainShardingTable
public Boolean hasContainShardingTable(Set<String> sqlTokenSet) {
//logicTableNameList通過(guò)遍歷TableRule可以得到
for (String logicTable : logicTableNameList) {
if (sqlTokenSet.contains(logicTable)) {
return true;
}
}
return false;
}
(2)跳過(guò)解析路由:通過(guò)RuleContextManager中的skipSharding判斷是否需要跳過(guò)Sharding解析路由,但為了兼容讀寫(xiě)分離的場(chǎng)景,我們還需要知道這條SQL應(yīng)該走主庫(kù)還是從庫(kù),走主庫(kù)的場(chǎng)景在后面強(qiáng)制路由主庫(kù)部分有說(shuō)明,SQL走主庫(kù)實(shí)際上只有兩種情況,一種是非SELECT語(yǔ)句,另一種就是SELECT語(yǔ)句帶鎖,如SELECT...FOR UPDATE,因此整體實(shí)現(xiàn)的步驟如下:
- 如果標(biāo)記了跳過(guò)Sharding且不為select語(yǔ)句,直接返回SkipShardingStatement,單獨(dú)構(gòu)造一個(gè)SkipShardingStatement的目的是為了能利用解析引擎中的緩存,緩存中不能放入null值。
- 如果是select語(yǔ)句需要繼續(xù)解析,判斷是否有鎖后直接返回,避免后續(xù)解析造成語(yǔ)法不兼容,這里也曾嘗試用反射獲取lockClause來(lái)判斷是否包含鎖,但最終沒(méi)有成功。
- ShardingRouteDecorator根據(jù)
RuleContextManager.isSkipSharding判斷是否跳過(guò)路由。
跳過(guò)解析路由
public class SkipShardingStatement implements SQLStatement{
@Override
public int getParameterCount() {
return 0;
}
}
org.apache.shardingsphere.sql.parser.SQLParserEngine#parse0
private SQLStatement parse0(final String sql, final boolean useCache) {
if (useCache) {
Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql);
if (cachedSQLStatement.isPresent()) {
return cachedSQLStatement.get();
}
}
ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode();
/**
* 跳過(guò)sharding 需要判斷是否需要路由至主庫(kù) 如果不是select語(yǔ)句直接跳過(guò)
* 是select語(yǔ)句則需要通過(guò)繼續(xù)解析判斷是否有鎖
*/
SQLStatement result ;
if(RuleContextManager.isSkipSharding()&&!VisitorRule.SELECT.equals(VisitorRule.valueOf(parseTree.getClass()))){
RuleContextManager.setMasterRoute(true);
result = new SkipShardingStatement();
}else {
result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree);
}
if (useCache) {
cache.put(sql, result);
}
return result;
}
org.apache.shardingsphere.sql.parser.mysql.visitor.impl.MySQLDMLVisitor#visitSelectClause
public ASTNode visitSelectClause(final SelectClauseContext ctx) {
SelectStatement result = new SelectStatement();
// 跳過(guò)sharding 只需要判斷是否有鎖來(lái)決定是否路由至主庫(kù)即可
if(RuleContextManager.isSkipSharding()){
if (null != ctx.lockClause()) {
result.setLock((LockSegment) visit(ctx.lockClause()));
RuleContextManager.setMasterRoute(true);
}
return result;
}
//...后續(xù)解析
}
org.apache.shardingsphere.underlying.route.DataNodeRouter#createRouteContext
private RouteContext createRouteContext(final String sql, final List<Object> parameters, final boolean useCache) {
SQLStatement sqlStatement = parserEngine.parse(sql, useCache);
//如果需要跳過(guò)sharding 不進(jìn)行后續(xù)的解析直接返回
if (RuleContextManager.isSkipSharding()) {
return new RouteContext(sqlStatement, parameters, new RouteResult());
}
//...解析
}
org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
// 跳過(guò)sharding路由
if(RuleContextManager.isSkipSharding()){
return routeContext;
}
//...路由
}
(3)手動(dòng)構(gòu)造ExecutionUnit:ExecutionUnit中我們需要確定的內(nèi)容就是datasourceName,這里我們認(rèn)為跳過(guò)Sharding的SQL最終執(zhí)行的庫(kù)一定只有一個(gè)。如果只是跳過(guò)Sharding的情況,直接從元數(shù)據(jù)中獲取數(shù)據(jù)源名稱(chēng)即可,如果存在讀寫(xiě)分離的情況,主從路由的結(jié)果也一定是唯一的。創(chuàng)建完ExecutionUnit直接放入ExecutionContext返回即可,從而跳過(guò)后續(xù)的改寫(xiě)邏輯。
手動(dòng)構(gòu)造ExecutionUnit
public ExecutionContext prepare(final String sql, final List<Object> parameters) {
List<Object> clonedParameters = cloneParameters(parameters);
// 判斷是否可以跳過(guò)sharding,構(gòu)造RuleContextManager的值
buildSkipContext(sql);
RouteContext routeContext = executeRoute(sql, clonedParameters);
ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
// 跳過(guò)sharding的sql最后的路由結(jié)果一定只有一個(gè)庫(kù)
if(RuleContextManager.isSkipSharding()){
log.debug("可以跳過(guò)sharding的場(chǎng)景 {}", sql);
if(!Objects.isNull(routeContext.getRouteResult())){
Collection<String> allInstanceDataSourceNames = this.metaData.getDataSources().getAllInstanceDataSourceNames();
int routeUnitsSize = routeContext.getRouteResult().getRouteUnits().size();
/*
* 1. 沒(méi)有讀寫(xiě)分離的情況下 跳過(guò)sharding路由會(huì)導(dǎo)致routeUnitsSize為0 此時(shí)需要判斷數(shù)據(jù)源數(shù)量是否為1
* 2. 讀寫(xiě)分離情況下 只會(huì)路由至具體的主庫(kù)或從庫(kù) routeUnitsSize數(shù)量應(yīng)該為1
*/
if(!(routeUnitsSize == 0 && allInstanceDataSourceNames.size()==1)|| routeUnitsSize>1){
throw new ShardingSphereException("可以跳過(guò)sharding,但是路由結(jié)果不唯一,SQL= %s ,routeUnits= %s ",sql, routeContext.getRouteResult().getRouteUnits());
}
Collection<String> actualDataSourceNames = routeContext.getRouteResult().getActualDataSourceNames();
// 手動(dòng)創(chuàng)建執(zhí)行單元
String datasourceName = CollectionUtils.isEmpty(actualDataSourceNames)? allInstanceDataSourceNames.iterator().next():actualDataSourceNames.iterator().next();
ExecutionUnit executionUnit = new ExecutionUnit(datasourceName, new SQLUnit(sql, clonedParameters));
result.getExecutionUnits().add(executionUnit);
//標(biāo)記該結(jié)果需要跳過(guò)
result.setSkipShardingScenarioFlag(true);
}
}else {
result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));
}
if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
}
return result;
}
(4)跳過(guò)合并:跳過(guò)查詢結(jié)果的合并和影響行數(shù)計(jì)算的合并,注意ShardingPreparedStatement和ShardingStatement都需要跳過(guò)
跳過(guò)合并
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#executeQuery
public ResultSet executeQuery() throws SQLException {
ResultSet result;
try {
clearPrevious();
prepare();
initPreparedStatementExecutor();
List<QueryResult> queryResults = preparedStatementExecutor.executeQuery();
List<ResultSet> resultSets = preparedStatementExecutor.getResultSets();
// 定制開(kāi)發(fā),不分片跳過(guò)合并
if(executionContext.isSkipShardingScenarioFlag()){
return CollectionUtils.isNotEmpty(resultSets) ? resultSets.get(0) : null;
}
MergedResult mergedResult = mergeQuery(queryResults);
result = new ShardingResultSet(resultSets, mergedResult, this, executionContext);
} finally {
clearBatch();
}
currentResultSet = result;
return result;
}
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet
public ResultSet getResultSet() throws SQLException {
if (null != currentResultSet) {
return currentResultSet;
}
List<ResultSet> resultSets = getResultSets();
// 定制開(kāi)發(fā),不分片跳過(guò)合并
if(executionContext.isSkipShardingScenarioFlag()){
return CollectionUtils.isNotEmpty(resultSets) ? resultSets.get(0) : null;
}
if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
MergedResult mergedResult = mergeQuery(getQueryResults(resultSets));
currentResultSet = new ShardingResultSet(resultSets, mergedResult, this, executionContext);
}
return currentResultSet;
}
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#isAccumulate
public boolean isAccumulate() {
//定制開(kāi)發(fā),不分片跳過(guò)計(jì)算
if(executionContext.isSkipShardingScenarioFlag()){
return false;
}
return !connection.getRuntimeContext().getRule().isAllBroadcastTables(executionContext.getSqlStatementContext().getTablesContext().getTableNames());
}
(5)清空RuleContextManager:查看一下Sharding-JDBC其他ThreadLocal的清空位置,對(duì)應(yīng)的清空RuleContextManager就好。
清空ThreadLocal
org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractConnectionAdapter#close
public final void close() throws SQLException {
closed = true;
MasterVisitedManager.clear();
TransactionTypeHolder.clear();
RuleContextManager.clear();
int connectionSize = cachedConnections.size();
try {
forceExecuteTemplateForClose.execute(cachedConnections.entries(), cachedConnections -> cachedConnections.getValue().close());
} finally {
cachedConnections.clear();
rootInvokeHook.finish(connectionSize);
}
}
舉個(gè)例子,比如Sharding-JDBC本身是不支持INSERT INTO tbl_name (col1, col2, …) SELECT col1, col2, … FROM tbl_name WHERE col3 = ? 這種語(yǔ)法的,會(huì)報(bào)空指針異常。
經(jīng)過(guò)我們上述改造驗(yàn)證后,非分片表是可以跳過(guò)語(yǔ)法限制執(zhí)行如下的SQL的。
通過(guò)該功能的實(shí)現(xiàn),業(yè)務(wù)可以更關(guān)注與分片表的SQL改造,而無(wú)需擔(dān)心引入Sharding-JDBC造成所有SQL的驗(yàn)證改造,大幅減少改造成本和風(fēng)險(xiǎn)。
4.2 強(qiáng)制路由主庫(kù)
Sharding-JDBC可以通過(guò)配置主從庫(kù)數(shù)據(jù)源方便的實(shí)現(xiàn)讀寫(xiě)分離的功能,但使用讀寫(xiě)分離就必須面對(duì)主從延遲和從庫(kù)失聯(lián)的痛點(diǎn),針對(duì)這一問(wèn)題,我們實(shí)現(xiàn)了強(qiáng)制路由主庫(kù)的動(dòng)態(tài)配置,當(dāng)主從延遲過(guò)大或從庫(kù)失聯(lián)時(shí),通過(guò)修改配置來(lái)實(shí)現(xiàn)SQL語(yǔ)句強(qiáng)制走主庫(kù)的不停機(jī)路由切換。
后面會(huì)說(shuō)明了配置的動(dòng)態(tài)生效的實(shí)現(xiàn)方式,這里只說(shuō)明強(qiáng)制路由主庫(kù)的實(shí)現(xiàn),我們直接使用前文的RuleContextManager即可,在主從路由引擎里判斷下是否開(kāi)啟了強(qiáng)制主庫(kù)路由。
MasterSlaveRouteDecorator.decorate改造
org.apache.shardingsphere.masterslave.route.engine.MasterSlaveRouteDecorator#decorate
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final MasterSlaveRule masterSlaveRule, final ConfigurationProperties properties) {
/**
* 如果配置了強(qiáng)制主庫(kù) MasterVisitedManager設(shè)置為true
* 后續(xù)isMasterRoute中會(huì)保證路由至主庫(kù)
*/
if(properties.<Boolean>getValue(ConfigurationPropertyKey.MASTER_ROUTE_ONLY)){
MasterVisitedManager.setMasterVisited();
}
//...路由邏輯
return routeContext;
}
為了兼容之前跳過(guò)Sharding的功能,我們需要同步修改下isMasterRoute方法,如果是跳過(guò)了Sharding路由需要通過(guò)RuleContextManager來(lái)判斷是否走主庫(kù)。
isMasterRoute改造
org.apache.shardingsphere.masterslave.route.engine.impl.MasterSlaveDataSourceRouter#isMasterRoute
private boolean isMasterRoute(final SQLStatement sqlStatement) {
if(sqlStatement instanceof SkipShardingStatement){
// 優(yōu)先以MasterVisitedManager中的值為準(zhǔn)
return MasterVisitedManager.isMasterVisited()|| RuleContextManager.isMasterRoute();
}
return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly();
}
當(dāng)然,更理想的狀況是通過(guò)監(jiān)控主從同步延遲和數(shù)據(jù)庫(kù)撥測(cè),當(dāng)超過(guò)閾值時(shí)或從庫(kù)失聯(lián)時(shí)直接自動(dòng)修改配置中心的庫(kù),實(shí)現(xiàn)自動(dòng)切換主庫(kù),減少業(yè)務(wù)故障時(shí)間和運(yùn)維壓力。
4.3 配置動(dòng)態(tài)生效
Sharding-JDBC中的ConfigurationPropertyKey中提供了許多配置屬性,而Sharding-JDBCB并沒(méi)有為這些配置提供在線修改的方法,而在實(shí)際的應(yīng)用場(chǎng)景中,像SQL_SHOW這樣控制SQL打印的開(kāi)關(guān)配置,我們更希望能夠在線修改配置值來(lái)控制SQL日志的打印,而不是修改完配置再重啟服務(wù)。
以SQL打印為例,BasePrepareEngine中存在ConfigurationProperties對(duì)象,通過(guò)調(diào)用getValue方法來(lái)獲取SQL_SHOW的值。
SQL 打印
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepare
/**
* Prepare to execute.
*
* @param sql SQL
* @param parameters SQL parameters
* @return execution context
*/
public ExecutionContext prepare(final String sql, final List<Object> parameters) {
List<Object> clonedParameters = cloneParameters(parameters);
RouteContext routeContext = executeRoute(sql, clonedParameters);
ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));
//sql打印
if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
}
return result;
}
ConfigurationProperties繼承了抽象類(lèi)TypedProperties,其getValue方法就是根據(jù)key獲取對(duì)應(yīng)的配置值,因此我們直接在TypedProperties中實(shí)現(xiàn)刷新緩存中的配置值的方法。
TypedProperties刷新配置
public abstract class TypedProperties<E extends Enum & TypedPropertyKey> {
private static final String LINE_SEPARATOR = System.getProperty("line.separator");
@Getter
private final Properties props;
private final Map<E, TypedPropertyValue> cache;
public TypedProperties(final Class<E> keyClass, final Properties props) {
this.props = props;
cache = preload(keyClass);
}
private Map<E, TypedPropertyValue> preload(final Class<E> keyClass) {
E[] enumConstants = keyClass.getEnumConstants();
Map<E, TypedPropertyValue> result = new HashMap<>(enumConstants.length, 1);
Collection<String> errorMessages = new LinkedList<>();
for (E each : enumConstants) {
TypedPropertyValue value = null;
try {
value = new TypedPropertyValue(each, props.getOrDefault(each.getKey(), each.getDefaultValue()).toString());
} catch (final TypedPropertyValueException ex) {
errorMessages.add(ex.getMessage());
}
result.put(each, value);
}
if (!errorMessages.isEmpty()) {
throw new ShardingSphereConfigurationException(Joiner.on(LINE_SEPARATOR).join(errorMessages));
}
return result;
}
/**
* Get property value.
*
* @param key property key
* @param <T> class type of return value
* @return property value
*/
@SuppressWarnings("unchecked")
public <T> T getValue(final E key) {
return (T) cache.get(key).getValue();
}
/**
* vivo定制改造方法 refresh property value.
* @param key property key
* @param value property value
* @return 更新配置是否成功
*/
public boolean refreshValue(String key, String value){
//獲取配置類(lèi)支持的配置項(xiàng)
E[] enumConstants = targetKeyClass.getEnumConstants();
for (E each : enumConstants) {
//遍歷新的值
if(each.getKey().equals(key)){
try {
//空白value認(rèn)為無(wú)效,取默認(rèn)值
if(!StringUtils.isBlank(value)){
value = each.getDefaultValue();
}
//構(gòu)造新屬性
TypedPropertyValue typedPropertyValue = new TypedPropertyValue(each, value);
//替換緩存
cache.put(each, typedPropertyValue);
//原始屬性也替換下,有可能會(huì)通過(guò)RuntimeContext直接獲取Properties
props.put(key,value);
return true;
} catch (final TypedPropertyValueException ex) {
log.error("refreshValue error. key={} , value={}", key, value, ex);
}
}
}
return false;
}
}
實(shí)現(xiàn)了刷新方法后,我們還需要將該方法一步步暴露至一個(gè)外部可以調(diào)用的類(lèi)中,以便在服務(wù)監(jiān)聽(tīng)配置的方法中,能夠調(diào)用這個(gè)刷新方法。ConfigurationProperties直接在asePrepareEngine的構(gòu)造函數(shù)中傳入,我們通過(guò)構(gòu)造函數(shù)逐步反推最外層的這一對(duì)象調(diào)用來(lái)源,最終可以定位到在AbstractDataSourceAdapter中的getRuntimeContext()方法中可以獲取到這個(gè)配置,而這個(gè)就是Sharding-JDBC實(shí)現(xiàn)的JDBC中Datasource接口的抽象類(lèi),我們直接在這個(gè)類(lèi)中調(diào)用剛剛實(shí)現(xiàn)的refreshValue方法,剩下的就是監(jiān)聽(tīng)配置,通過(guò)自己實(shí)現(xiàn)的AbstractDataSourceAdapter來(lái)調(diào)用這個(gè)方法就好了。
通過(guò)這一功能,我們可以方便的控制一些開(kāi)關(guān)屬性的在線修改,如SQL打印、強(qiáng)制路由主庫(kù)等,業(yè)務(wù)無(wú)需重啟服務(wù)即可做到配置的動(dòng)態(tài)生效。
4.4 批量update語(yǔ)法支持
業(yè)務(wù)中存在使用foreach標(biāo)簽來(lái)批量update的語(yǔ)句,這種SQL在Sharding-JDBC中無(wú)法被正確路由,只會(huì)路由第一組參數(shù),后面的無(wú)法被路由改寫(xiě),原因是解析引擎無(wú)法將語(yǔ)句拆分解析。
批量update樣例
<update id="batchUpdate">
<foreach collectinotallow="orderList" item="item">
update t_order set
status = 1,
updated_by = #{item.updatedBy}
WHERE created_by = #{item.createdBy};
</foreach>
</update>
我們通過(guò)將批量update按照;拆分為多個(gè)語(yǔ)句,然后分別路由,最后手動(dòng)匯總路有結(jié)果生成執(zhí)行單元。
為了能正確重寫(xiě)SQL,批量update拆分后的語(yǔ)句需要完全一樣,這樣就不能使用動(dòng)態(tài)拼接set條件,而是使用ifnull語(yǔ)法或者字段值不發(fā)生變化時(shí)也將原來(lái)的值放入set中,只不過(guò)set前后的值保持一致,整體思路與實(shí)現(xiàn)如下。
prepareBatch實(shí)現(xiàn)
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepareBatch
private ExecutionContext prepareBatch(List<String> splitSqlList, final List<Object> allParameters) {
//SQL去重
List<String> sqlList = splitSqlList.stream().distinct().collect(Collectors.toList());
if (sqlList.size() > 1) {
throw new ShardingSphereException("不支持多條SQL,請(qǐng)檢查SQL," + sqlList.toString());
}
//以第一條SQL為標(biāo)準(zhǔn)
String sql = sqlList.get(0);
//所有的執(zhí)行單元
Collection<ExecutionUnit> globalExecutionUnitList = new ArrayList<>();
//初始化最后的執(zhí)行結(jié)果
ExecutionContext executionContextResult = null;
//根據(jù)所有參數(shù)數(shù)量和SQL語(yǔ)句數(shù)量 計(jì)算每組參數(shù)的數(shù)量
int eachSqlParameterCount = allParameters.size() / splitSqlList.size();
//平均分配每條SQL的參數(shù)
List<List<Object>> eachSqlParameterListList = Lists.partition(allParameters, eachSqlParameterCount);
for (List<Object> eachSqlParameterList : eachSqlParameterListList) {
//每條SQL參數(shù)不同 需要根據(jù)參數(shù)路由不同的結(jié)果 實(shí)際的SqlStatementContext 是一致的
RouteContext routeContext = executeRoute(sql, eachSqlParameterList);
//由于SQL一樣 實(shí)際的SqlStatementContext 是一致的 只需初始化一次
if (executionContextResult == null) {
executionContextResult = new ExecutionContext(routeContext.getSqlStatementContext());
}
globalExecutionUnitList.addAll(executeRewrite(sql, eachSqlParameterList, routeContext));
}
//排序打印日志
executionContextResult.getExtendMap().put(EXECUTION_UNIT_LIST, globalExecutionUnitList.stream().sorted(Comparator.comparing(ExecutionUnit::getDataSourceName)).collect(Collectors.toList()));
if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE),
executionContextResult.getSqlStatementContext(), (Collection<ExecutionUnit>) executionContextResult.getExtendMap().get(EXECUTION_UNIT_LIST));
}
return executionContextResult;
}
這里我們?cè)贓xecutionContext單獨(dú)構(gòu)造了一個(gè)了ExtendMap來(lái)存放ExecutionUnit,原因是ExecutionContext中的executionUnits是HashSet,而判斷ExecutionUnit中的SqlUnit只會(huì)根據(jù)SQL去重,批量update的SQL是一致的,但parameters不同,為了不影響原有的邏輯,單獨(dú)使用了另外的變量來(lái)存放。
ExecutionContext改造
@RequiredArgsConstructor
@Getter
public class ExecutionContext {
private final SQLStatementContext sqlStatementContext;
private final Collection<ExecutionUnit> executionUnits = new LinkedHashSet<>();
/**
* 自定義擴(kuò)展變量
*/
private final Map<ExtendEnum,Object> extendMap = new HashMap<>();
/**
* 定制擴(kuò)展,是否可以跳過(guò)分片邏輯
*/
@Setter
private boolean skipShardingScenarioFlag = false;
}
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class ExecutionUnit {
private final String dataSourceName;
private final SQLUnit sqlUnit;
}
@AllArgsConstructor
@RequiredArgsConstructor
@Getter
@Setter
//根據(jù)SQL判斷是否相等
@EqualsAndHashCode(of = { "sql" })
@ToString
public final class SQLUnit {
private String sql;
private final List<Object> parameters;
}
我們還需要改造下執(zhí)行方法,在初始化執(zhí)行器的時(shí)候,判斷下ExtendMap中存在我們自定義的EXECUTION_UNIT_LIST是否存在,存在則使用生成InputGroup,同一個(gè)數(shù)據(jù)源下的ExecutionUnit會(huì)被放入同一個(gè)InputGroup中。
InputGroup改造
org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#init
public void init(final ExecutionContext executionContext) throws SQLException {
setSqlStatementContext(executionContext.getSqlStatementContext());
//兼容批量update 分庫(kù)分表后同一張表的情況 判斷是否存在EXECUTION_UNIT_LIST 存在則使用未去重的List進(jìn)行后續(xù)的操作
if (MapUtils.isNotEmpty(executionContext.getExtendMap())){
Collection<ExecutionUnit> executionUnitCollection = (Collection<ExecutionUnit>) executionContext.getExtendMap().get(EXECUTION_UNIT_LIST);
if(CollectionUtils.isNotEmpty(executionUnitCollection)){
getInputGroups().addAll(obtainExecuteGroups(executionUnitCollection));
}
}else {
getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits()));
}
cacheStatements();
}
改造完成后,批量update中的每條SQL都可以被正確路由執(zhí)行。
4.5 ShardingCondition去重
當(dāng)where語(yǔ)句包括多個(gè)or條件時(shí),而or條件不包含分片鍵時(shí),會(huì)造成createShardingConditions方法生成重復(fù)的分片條件,導(dǎo)致重復(fù)調(diào)用doSharding方法。
如SELECT * FROM t_order WHERE created_by = ? and ( (status = ?) or (status = ?) or (status = ?) )這種SQL,存在三個(gè)or條件,分片鍵是created_by ,實(shí)際產(chǎn)生的shardingCondition會(huì)是三個(gè)一樣的值,并會(huì)調(diào)用三次doSharding的方法。雖然實(shí)際執(zhí)行還是只有一次(批量update那里說(shuō)明過(guò)執(zhí)行單元會(huì)去重),但為了減少方法的重復(fù)調(diào)用,我們還是對(duì)這里做了一次去重。
去重的方法也比較簡(jiǎn)單粗暴,我們對(duì)ListRouteValue和RangeRouteValue添加了@EqualsAndHashCode注解,然后在WhereClauseShardingConditionEngine的createShardingConditions方法返回最終結(jié)果前加一次去重,從而避免生成重復(fù)的shardingCondition造成doSharding方法的重復(fù)調(diào)用。
createShardingConditions去重
org.apache.shardingsphere.sharding.route.engine.condition.engine.WhereClauseShardingConditionEngine#createShardingConditions
private Collection<ShardingCondition> createShardingConditions(final SQLStatementContext sqlStatementContext, final Collection<AndPredicate> andPredicates, final List<Object> parameters) {
Collection<ShardingCondition> result = new LinkedList<>();
for (AndPredicate each : andPredicates) {
Map<Column, Collection<RouteValue>> routeValueMap = createRouteValueMap(sqlStatementContext, each, parameters);
if (routeValueMap.isEmpty()) {
return Collections.emptyList();
}
result.add(createShardingCondition(routeValueMap));
}
//去重
Collection<ShardingCondition> distinctResult = result.stream().distinct().collect(Collectors.toCollection(LinkedList::new));
return distinctResult;
}
4.6 全路由校驗(yàn)
分片表的SQL中如果沒(méi)有攜帶分片鍵(或者帶上了分片鍵結(jié)果沒(méi)有被正確解析)將會(huì)導(dǎo)致全路由,產(chǎn)生性能問(wèn)題,而這種SQL并不會(huì)報(bào)錯(cuò),這就導(dǎo)致在實(shí)際的業(yè)務(wù)改造中,開(kāi)發(fā)和測(cè)試很難保證百分百改造徹底。為此,我們?cè)谠创a層面對(duì)這種情況做了額外的校驗(yàn),當(dāng)產(chǎn)生全路由,也就是ShardingConditions為空時(shí),主動(dòng)拋出異常,從而方便開(kāi)發(fā)和測(cè)試能夠快速發(fā)現(xiàn)全路由SQL。
實(shí)現(xiàn)方式也比較簡(jiǎn)單,校驗(yàn)下ShardingConditions是否為空即可,只不過(guò)需要額外兼容下Hint策略ShardingConditions始終為空的特殊情況。
全路由校驗(yàn)
org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
//省略...
//獲取 ShardingConditions
ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);
boolean hintAlgorithm = isHintAlgorithm(sqlStatementContext, shardingRule);
//判斷是否允許全路由
if (!properties.<Boolean>getValue(ConfigurationPropertyKey.ALLOW_EMPTY_SHARDING_CONDITIONS)) {
//如果不是Hint算法
if(!isHintAlgorithm(sqlStatementContext, shardingRule)){
/** 如果是DML語(yǔ)句 則可能有兩種情況 這兩種情況是根據(jù)getShardingConditions方法的內(nèi)部邏輯而來(lái)的
* 一種是非插入語(yǔ)句 shardingConditions.getConditions()為空即可
* 一種是插入語(yǔ)句 插入語(yǔ)句shardingConditions.getConditions()不會(huì)為空 但是ShardingCondition的routeValues是空的
*/
if (sqlStatementContext.getSqlStatement() instanceof DMLStatement) {
if(shardingConditions.getConditions().isEmpty()) {
throw new ShardingSphereException("SQL不包含分庫(kù)分表鍵,請(qǐng)檢查SQL");
}else {
if (sqlStatementContext instanceof InsertStatementContext) {
List<ShardingCondition> routeValuesNotEmpty = shardingConditions.getConditions().stream().filter(r -> CollectionUtils.isNotEmpty(r.getRouteValues())).collect(Collectors.toList());
if(CollectionUtils.isEmpty(routeValuesNotEmpty)){
throw new ShardingSphereException("SQL不包含分庫(kù)分表鍵,請(qǐng)檢查SQL");
}
}
}
}
}
}
boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);
//省略...
return new RouteContext(sqlStatementContext, parameters, routeResult);
}
private boolean isHintAlgorithm(final SQLStatementContext sqlStatementContext, final ShardingRule shardingRule) {
// 場(chǎng)景a 全局默認(rèn)策略是否使用強(qiáng)制路由策略
if(shardingRule.getDefaultDatabaseShardingStrategy() instanceof HintShardingStrategy
|| shardingRule.getDefaultTableShardingStrategy() instanceof HintShardingStrategy){
return true;
}
for (String each : sqlStatementContext.getTablesContext().getTableNames()) {
Optional<TableRule> tableRule = shardingRule.findTableRule(each);
//場(chǎng)景b 指定表是否使用強(qiáng)制路由策略
if (tableRule.isPresent() && (shardingRule.getDatabaseShardingStrategy(tableRule.get()) instanceof HintShardingStrategy
|| shardingRule.getTableShardingStrategy(tableRule.get()) instanceof HintShardingStrategy)) {
return true;
}
}
return false;
}
當(dāng)然這塊功能也可以在完善些,比如對(duì)分片路由結(jié)果中的數(shù)據(jù)源數(shù)量進(jìn)行校驗(yàn),從而避免跨庫(kù)操作,我們這邊沒(méi)有實(shí)現(xiàn)也就不再贅述了。
4.7 組件封裝
業(yè)務(wù)接入Sharding-JDBC的步驟是一樣的,都需要通過(guò)Java創(chuàng)建數(shù)據(jù)源和配置對(duì)象或者使用SpringBoot進(jìn)行配置,存在一定的熟悉成本和重復(fù)開(kāi)發(fā)的問(wèn)題,為此我們也對(duì)定制開(kāi)發(fā)版本的Sharding-JDBC封裝了一個(gè)公共組件,從而簡(jiǎn)化業(yè)務(wù)配置,減少重復(fù)開(kāi)發(fā),提升業(yè)務(wù)的開(kāi)發(fā)效率,具體功能可見(jiàn)下。這塊沒(méi)有涉及源碼的改造,只是在定制版本上包裝的一個(gè)公共組件。
- 提供了默認(rèn)的數(shù)據(jù)源與連接池配置
- 簡(jiǎn)化分庫(kù)分表配置,業(yè)務(wù)配置邏輯表名和后綴,組件拼裝行表達(dá)式和actual-data-nodes
- 封裝常用的分片算法(時(shí)間、業(yè)務(wù)字段值等),
- 統(tǒng)一的配置監(jiān)聽(tīng)與動(dòng)態(tài)修改(SQL打印、強(qiáng)制主從切換等)
開(kāi)源Sharding-JDBC配置
//數(shù)據(jù)源名稱(chēng)
spring.shardingsphere.datasource.names=ds0,ds1
//ds0配置
spring.shardingsphere.datasource.ds0.type=org.apache.commons.dbcp.BasicDataSource
spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds0.url=jdbc:mysql://localhost:3306/ds0
spring.shardingsphere.datasource.ds0.username=root
spring.shardingsphere.datasource.ds0.password=
//ds1配置
spring.shardingsphere.datasource.ds1.type=org.apache.commons.dbcp.BasicDataSource
spring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds1.url=jdbc:mysql://localhost:3306/ds1
spring.shardingsphere.datasource.ds1.username=root
spring.shardingsphere.datasource.ds1.password=
//分表規(guī)則
spring.shardingsphere.sharding.tables.t_order.actual-data-nodes=ds$->{0..1}.t_order$->{0..1}
spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.sharding-column=order_id
spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.algorithm-expressinotallow=t_order$->{order_id % 2}
spring.shardingsphere.sharding.tables.t_order_item.actual-data-nodes=ds$->{0..1}.t_order_item$->{0..1}
spring.shardingsphere.sharding.tables.t_order_item.table-strategy.inline.sharding-column=order_id
spring.shardingsphere.sharding.tables.t_order_item.table-strategy.inline.algorithm-expressinotallow=t_order_item$->{order_id % 2}
//默認(rèn)分庫(kù)規(guī)則
spring.shardingsphere.sharding.default-database-strategy.inline.sharding-column=user_id
spring.shardingsphere.sharding.default-database-strategy.inline.algorithm-expressinotallow=ds$->{user_id % 2}
組件簡(jiǎn)化配置
//數(shù)據(jù)源名稱(chēng)
vivo.it.sharding.datasource.names = ds0,ds1
//ds0配置
vivo.it.sharding.datasource.ds0.url = jdbc:mysql://localhost:3306/ds1
vivo.it.sharding.datasource.ds0.username = root
vivo.it.sharding.datasource.ds0.password =
//ds1配置
vivo.it.sharding.datasource.ds1.url = jdbc:mysql://localhost:3306/ds1
vivo.it.sharding.datasource.ds1.username = root
vivo.it.sharding.datasource.ds1.password =
//分表規(guī)則
vivo.it.sharding.table.rule.config = [{"logicTable":"t_order,t_order_item","tableRange":"0..1","shardingColumn":"order_id ","algorithmExpression":"order_id %2"}]
//默認(rèn)分庫(kù)規(guī)則
vivo.it.sharding.default.db.rule.config = {"shardingColumn":"user_id","algorithmExpression":"user_id %2"}
五、使用建議
結(jié)合官方文檔和業(yè)務(wù)實(shí)踐經(jīng)驗(yàn),我們也梳理了部分使用Sharding-JDBC的建議供大家參考,實(shí)際具體如何優(yōu)化SQL寫(xiě)法(比如子查詢、分頁(yè)、分組排序等)還需要結(jié)合業(yè)務(wù)的實(shí)際場(chǎng)景來(lái)進(jìn)行測(cè)試和調(diào)優(yōu)。
(1)強(qiáng)制等級(jí)
建議①:涉及分片表的SQL必須攜帶分片鍵
原因:無(wú)分片鍵會(huì)導(dǎo)致全路由,存在嚴(yán)重的性能隱患
建議②:禁止一條SQL中的分片值路由至不同的庫(kù)
原因:跨庫(kù)操作存在嚴(yán)重的性能隱患,事務(wù)操作會(huì)升級(jí)為分布式事務(wù),增加業(yè)務(wù)復(fù)雜度
建議③:禁止對(duì)分片鍵使用運(yùn)算表達(dá)式或函數(shù)操作
原因:無(wú)法提前計(jì)算表達(dá)式和函數(shù)獲取分片值,導(dǎo)致全路由
說(shuō)明:詳見(jiàn)官方文檔
建議④:禁止在子查詢中使用分片表
原因:無(wú)法正常解析子查詢中的分片表,導(dǎo)致業(yè)務(wù)錯(cuò)誤
說(shuō)明:雖然官方文檔中說(shuō)有限支持子查詢 ,但在實(shí)際的使用中發(fā)現(xiàn)4.1.1并不支持子查詢,可見(jiàn)官方issue6164 | issue 6228。
建議⑤:包含CASE WHEN、HAVING、UNION (ALL)語(yǔ)法的分片SQL,不支持路由至多數(shù)據(jù)節(jié)點(diǎn)
說(shuō)明:詳見(jiàn)官方文檔
(2)建議等級(jí)
① 建議使用分布式id來(lái)保證分片表主鍵的全局唯一性
原因:方便判斷數(shù)據(jù)的唯一性和后續(xù)的遷移擴(kuò)容
說(shuō)明:詳見(jiàn)文章《vivo 自研魯班分布式 ID 服務(wù)實(shí)踐》
② 建議跨多表的分組SQL的分組字段與排序字段保證一致
原因:分組和排序字段不一致只能通過(guò)內(nèi)存合并,大數(shù)據(jù)量時(shí)存在性能隱患
說(shuō)明:詳見(jiàn)官方文檔
③ 建議通過(guò)全局遞增的分布式id來(lái)優(yōu)化分頁(yè)查詢
原因:Sharding-JDBC的分頁(yè)優(yōu)化側(cè)重于結(jié)果集的流式合并來(lái)避免內(nèi)存爆漲,但深度分頁(yè)自身的性能問(wèn)題并不能解決
說(shuō)明:詳見(jiàn)官方文檔
六、總結(jié)
本文結(jié)合個(gè)人理解梳理了各個(gè)引擎的源碼入口和關(guān)鍵邏輯,讀者可以結(jié)合本文和官方文檔更好的定位理解Sharding-JDBC的源碼實(shí)現(xiàn)。定制開(kāi)發(fā)的目的是為了降低業(yè)務(wù)接入成本,盡可能減少業(yè)務(wù)存量SQL的改造,部分改造思想其實(shí)與官方社區(qū)也存在差異,比如跳過(guò)語(yǔ)法解析,官方社區(qū)致力于通過(guò)優(yōu)化解析引擎來(lái)適配各種語(yǔ)法,而不是跳過(guò)解析階段,可參考官方issue。源碼分析和定制改造只涉及了Sharding-JDBC的數(shù)據(jù)分片和讀寫(xiě)分離功能,定制開(kāi)發(fā)的功能也在生產(chǎn)環(huán)境經(jīng)過(guò)了考驗(yàn),如有不足和優(yōu)化建議,也歡迎大家批評(píng)指正。