自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Sharding-JDBC源碼解析與vivo的定制開(kāi)發(fā)

開(kāi)發(fā) 數(shù)據(jù)庫(kù)
Sharding-JDBC是在JDBC層提供服務(wù)的數(shù)據(jù)庫(kù)中間件,在分庫(kù)分表場(chǎng)景具有廣泛應(yīng)用。本文對(duì)Sharding-JDBC的解析、路由、改寫(xiě)、執(zhí)行、歸并五大核心引擎進(jìn)行了源碼解析,并結(jié)合業(yè)務(wù)實(shí)踐經(jīng)驗(yàn),總結(jié)了使用Sharding-JDBC的一些痛點(diǎn)問(wèn)題并分享了對(duì)應(yīng)的定制開(kāi)發(fā)與改造方案。

本文源碼基于Sharding-JDBC 4.1.1版本。

一、業(yè)務(wù)背景

隨著業(yè)務(wù)并發(fā)請(qǐng)求和數(shù)據(jù)規(guī)模的不斷擴(kuò)大,單節(jié)點(diǎn)庫(kù)表壓力往往會(huì)成為系統(tǒng)的性能瓶頸。公司IT內(nèi)部營(yíng)銷(xiāo)庫(kù)存、交易訂單、財(cái)經(jīng)臺(tái)賬、考勤記錄等多領(lǐng)域的業(yè)務(wù)場(chǎng)景的日增數(shù)據(jù)量巨大,存在著數(shù)據(jù)庫(kù)節(jié)點(diǎn)壓力過(guò)大、連接過(guò)多、查詢速度變慢等情況,根據(jù)數(shù)據(jù)來(lái)源、時(shí)間、工號(hào)等信息來(lái)將沒(méi)有聯(lián)系的數(shù)據(jù)盡量均分到不同的庫(kù)表中,從而在不影響業(yè)務(wù)需求的前提下,減輕數(shù)據(jù)庫(kù)節(jié)點(diǎn)壓力,提升查詢效率和系統(tǒng)穩(wěn)定性。

圖片

二、技術(shù)選型

我們對(duì)比了幾款比較常見(jiàn)的支持分庫(kù)分表和讀寫(xiě)分離的中間件。

圖片

Sharding-JDBC作為輕量化的增強(qiáng)版的JDBC框架,相較其他中間件性能更好,接入難度更低,其數(shù)據(jù)分片、讀寫(xiě)分離功能也覆蓋了我們的業(yè)務(wù)訴求,因此我們?cè)跇I(yè)務(wù)中廣泛使用了Sharding-JDBC。但在使用Sharding-JDBC的過(guò)程中,我們也發(fā)現(xiàn)了諸多問(wèn)題,為了業(yè)務(wù)更便捷的使用Sharding-JDBC,我們對(duì)源碼做了針對(duì)性的定制開(kāi)發(fā)和組件封裝來(lái)滿足業(yè)務(wù)需求。

圖片

三、源碼解析

3.1 引言

Sharding-JDBC作為基于JDBC的數(shù)據(jù)庫(kù)中間件,實(shí)現(xiàn)了JDBC的標(biāo)準(zhǔn)api,Sharding-JDBC與原生JDBC的執(zhí)行對(duì)比流程如下圖所示:

圖片


相關(guān)執(zhí)行流程的代碼樣例如下:

JDBC執(zhí)行樣例

//獲取數(shù)據(jù)庫(kù)連接
try (Connection conn = DriverManager.getConnection("mysqlUrl", "userName", "password")) {
    String sql = "SELECT * FROM  t_user WHERE name = ?";
    //預(yù)編譯SQL
    try (PreparedStatement preparedStatement = conn.prepareStatement(sql)) {
        //參數(shù)設(shè)置與執(zhí)行
        preparedStatement.setString(1, "vivo");
        preparedStatement.execute(sql);
        //獲取結(jié)果集
        try (ResultSet resultSet = preparedStatement.getResultSet()) {
            while (resultSet.next()) {
                //處理結(jié)果
            }
        }
    }
}

Sharding-JDBC 源碼

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement#execute
    public boolean execute() throws SQLException {
        try {
            clearPrevious();
            //解析+路由+重寫(xiě) 內(nèi)部調(diào)用BasePrepareEngine#prepare方法
            prepare();
            initPreparedStatementExecutor();
            //執(zhí)行
            return preparedStatementExecutor.execute();
        } finally {
            clearBatch();
        }
    }
 
org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepare
    public ExecutionContext prepare(final String sql, final List<Object> parameters) {
        List<Object> clonedParameters = cloneParameters(parameters);
        //解析+路由(executeRoute內(nèi)部先進(jìn)行解析再執(zhí)行路由)
        RouteContext routeContext = executeRoute(sql, clonedParameters);
        ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
        //重寫(xiě)
        result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));
        if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
            SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
        }
        return result;
    }
 
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet
    public ResultSet getResultSet() throws SQLException {
        if (null != currentResultSet) {
            return currentResultSet;
        }
        if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
            List<ResultSet> resultSets = getResultSets();
            //歸并結(jié)果集
            MergedResult mergedResult = mergeQuery(getQueryResults(resultSets));
            currentResultSet = new ShardingResultSet(resultSets, mergedResult, this, executionContext);
        }
        return currentResultSet;
    }

從對(duì)比的執(zhí)行流程圖可見(jiàn):

  • 【JDBC】:執(zhí)行的主要流程是通過(guò)Datasource獲取Connection,再注入SQL語(yǔ)句生成PreparedStatement對(duì)象,PreparedStatement設(shè)置占位符參數(shù)執(zhí)行后得到結(jié)果集ResultSet。
  • 【Sharding-JDBC】:主要流程基本一致,但Sharding基于PreparedStatement進(jìn)行了實(shí)現(xiàn)與擴(kuò)展,具體實(shí)現(xiàn)類(lèi)ShardingPreparedStatement中會(huì)抽象出解析、路由、重寫(xiě)、歸并等引擎,從而實(shí)現(xiàn)分庫(kù)分表、讀寫(xiě)分離等能力,每個(gè)引擎的作用說(shuō)明如下表所示:

圖片

//*相關(guān)引擎的源碼解析在下文會(huì)作更深入的闡述。

3.2 解析引擎

3.2.1 引擎解析

解析引擎是Sharding-JDBC進(jìn)行分庫(kù)分表邏輯的基礎(chǔ),其作用是將SQL拆解為不可再分的原子符號(hào)(稱(chēng)為token),再根據(jù)數(shù)據(jù)庫(kù)類(lèi)型將這些token分類(lèi)成關(guān)鍵字、表達(dá)式、操作符、字面量等不同類(lèi)型,進(jìn)而生成抽象語(yǔ)法樹(shù),而語(yǔ)法樹(shù)是后續(xù)進(jìn)行路由、改寫(xiě)操作的前提(這也正是語(yǔ)法樹(shù)的存在使得Sharding-JDBC存在各式各樣的語(yǔ)法限制的原因之一)。

圖片

▲圖片來(lái)源:ShardingSphere 官方文檔

4.x的版本采用ANTLR(ANother Tool for Language Recognition)作為解析引擎,在ShardingSphere-sql-parser-dialect模塊中定義了適用于不同數(shù)據(jù)庫(kù)語(yǔ)法的解析規(guī)則(.g4文件),idea中也可以下載ANTLR v4的插件,輸入SQL查看解析后的語(yǔ)法樹(shù)結(jié)果。

圖片

解析方法的入口在DataNodeRouter的createRouteContext方法中,解析引擎根據(jù)數(shù)據(jù)庫(kù)類(lèi)型和SQL創(chuàng)建SQLParserExecutor執(zhí)行得到解析樹(shù),再通過(guò)ParseTreeVisitor()的visit方法,對(duì)解析樹(shù)進(jìn)行處理得到SQLStatement。ANTLR支持listener和visitor兩種模式的接口,visitor方式可以更靈活的控制解析樹(shù)的遍歷過(guò)程,更適用于SQL解析的場(chǎng)景。

解析引擎核心代碼:

org.apache.shardingsphere.underlying.route.DataNodeRouter#createRouteContext#96
    private RouteContext createRouteContext(final String sql, final List<Object> parameters, final boolean useCache) {
        //解析引擎解析SQL
        SQLStatement sqlStatement = parserEngine.parse(sql, useCache);
        try {
            SQLStatementContext sqlStatementContext = SQLStatementContextFactory.newInstance(metaData.getSchema(), sql, parameters, sqlStatement);
            return new RouteContext(sqlStatementContext, parameters, new RouteResult());
            // TODO should pass parameters for master-slave
        } catch (final IndexOutOfBoundsException ex) {
            return new RouteContext(new CommonSQLStatementContext(sqlStatement), parameters, new RouteResult());
        }
    }
 
org.apache.shardingsphere.sql.parser.SQLParserEngine#parse0#72
    private SQLStatement parse0(final String sql, final boolean useCache) {
        //緩存
        if (useCache) {
            Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql);
            if (cachedSQLStatement.isPresent()) {
                return cachedSQLStatement.get();
            }
        }
        //根據(jù)數(shù)據(jù)庫(kù)類(lèi)型和sql生成解析樹(shù)
        ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode();
        //ParseTreeVisitor的visit方法對(duì)解析樹(shù)進(jìn)行處理得到SQLStatement
      SQLStatement result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree);
        if (useCache) {
            cache.put(sql, result);
        }
        return result;
    }

SQLStatement實(shí)際上是一個(gè)接口,其實(shí)現(xiàn)對(duì)應(yīng)著不同的SQL類(lèi)型,如SelectStatement 類(lèi)中就包括查詢的字段、表名、where條件、分組、排序、分頁(yè)、lock等變量,可以看到這里并沒(méi)有對(duì)having這種字段做定義,相當(dāng)于Sharding-JDBC無(wú)法識(shí)別到SQL中的having,這使得Sharding-JDBC對(duì)having語(yǔ)法有一定的限制。

SelectStatement

public final class SelectStatement extends DMLStatement {
    // 字段
    private ProjectionsSegment projections;
    // 表
    private final Collection<TableReferenceSegment> tableReferences = new LinkedList<>();
    // where
    private WhereSegment where;
    // groupBy
    private GroupBySegment groupBy;
    // orderBy
    private OrderBySegment orderBy;
    // limit
    private LimitSegment limit;
    // 父statement
    private SelectStatement parentStatement;
    // lock
    private LockSegment lock;
}

SQLStatement還會(huì)被進(jìn)一步轉(zhuǎn)換成SQLStatementContext,如SelectStatement 會(huì)被轉(zhuǎn)換成SelectStatementContext ,其結(jié)構(gòu)與SelectStatement 類(lèi)似不再多說(shuō),值得注意的是雖然這里定義了containsSubquery來(lái)判斷是否包含子查詢,但4.1.1源碼永遠(yuǎn)是返回的false,與having類(lèi)似,這意味著Sharding-JDBC不會(huì)對(duì)子查詢語(yǔ)句做特殊處理。

SelectStatementContext

public final class SelectStatementContext extends CommonSQLStatementContext<SelectStatement> implements TableAvailable, WhereAvailable {
     
    private final TablesContext tablesContext;
     
    private final ProjectionsContext projectionsContext;
     
    private final GroupByContext groupByContext;
     
    private final OrderByContext orderByContext;
     
    private final PaginationContext paginationContext;
     
    private final boolean containsSubquery;
}
 
    private boolean containsSubquery() {
        // FIXME process subquery
//        Collection<SubqueryPredicateSegment> subqueryPredicateSegments = getSqlStatement().findSQLSegments(SubqueryPredicateSegment.class);
//        for (SubqueryPredicateSegment each : subqueryPredicateSegments) {
//            if (!each.getAndPredicates().isEmpty()) {
//                return true;
//            }
//        }
        return false;
    }

3.2.2 引擎總結(jié)

解析引擎是進(jìn)行路由改寫(xiě)的前提基礎(chǔ),其作用就是將SQL按照定義的語(yǔ)法規(guī)則拆分成原子符號(hào)(token),生成語(yǔ)法樹(shù),根據(jù)不同的SQL類(lèi)型生成對(duì)應(yīng)的SQLStatement,SQLStatement由各自的Segment組成,所有的Segment都包含startIndex和endIndex來(lái)定位token在SQL中所屬的位置,但解析語(yǔ)法難以涵蓋所有的SQL場(chǎng)景,使得部分SQL無(wú)法按照預(yù)期的結(jié)果路由執(zhí)行。

3.3 路由引擎

3.3.1 引擎解析

路由引擎是Sharding-JDBC的核心步驟,作用是根據(jù)定義的分庫(kù)分表規(guī)則將解析引擎生成的SQL上下文生成對(duì)應(yīng)的路由結(jié)果,RouteResult 包括DataNode和RouteUnit,DataNode是實(shí)際的數(shù)據(jù)源節(jié)點(diǎn),包括數(shù)據(jù)源名稱(chēng)和實(shí)際的物理表名,RouteUnit則記錄了邏輯表/庫(kù)與物理表/庫(kù)的映射關(guān)系,后面的改寫(xiě)引擎也是根據(jù)這個(gè)映射關(guān)系來(lái)決定如何替換SQL中的邏輯表(實(shí)際上RouteResult 就是維護(hù)了一條SQL需要往哪些庫(kù)哪些表執(zhí)行的關(guān)系)。

RouteResult

public final class RouteResult {
     
    private final Collection<Collection<DataNode>> originalDataNodes = new LinkedList<>();
     
    private final Collection<RouteUnit> routeUnits = new LinkedHashSet<>();
}
 
public final class DataNode {
     
    private static final String DELIMITER = ".";
     
    private final String dataSourceName;
     
    private final String tableName;
}
 
public final class RouteUnit {
     
    private final RouteMapper dataSourceMapper;
     
    private final Collection<RouteMapper> tableMappers;
}
 
public final class RouteMapper {
     
    private final String logicName;
     
    private final String actualName;
}

其中,路由有分為分片路由主從路由,兩者可以單獨(dú)使用,也可以組合使用。

分片路由

ShardingRouteDecorator的decorate方法是路由引擎的核心邏輯,經(jīng)過(guò)SQL校驗(yàn)->生成分片條件->合并分片值后得到路由結(jié)果。

分片路由decorate方法

org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate#57
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
        SQLStatementContext sqlStatementContext = routeContext.getSqlStatementContext();
        List<Object> parameters = routeContext.getParameters();
        //SQL校驗(yàn)  校驗(yàn)INSERT INTO .... ON DUPLICATE KEY UPDATE 和UPDATE語(yǔ)句中是否存在分片鍵
      ShardingStatementValidatorFactory.newInstance(
                sqlStatementContext.getSqlStatement()).ifPresent(validator -> validator.validate(shardingRule, sqlStatementContext.getSqlStatement(), parameters));
        //生成分片條件
        ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);
        //合并分片值
        boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);
        if (sqlStatementContext.getSqlStatement() instanceof DMLStatement && needMergeShardingValues) {
            checkSubqueryShardingValues(sqlStatementContext, shardingRule, shardingConditions);
            mergeShardingConditions(shardingConditions);
        }
        ShardingRouteEngine shardingRouteEngine = ShardingRouteEngineFactory.newInstance(shardingRule, metaData, sqlStatementContext, shardingConditions, properties);
        //得到路由結(jié)果
        RouteResult routeResult = shardingRouteEngine.route(shardingRule);
        if (needMergeShardingValues) {
            Preconditions.checkState(1 == routeResult.getRouteUnits().size(), "Must have one sharding with subquery.");
        }
        return new RouteContext(sqlStatementContext, parameters, routeResult);
    }

ShardingStatementValidator有ShardingInsertStatementValidator和ShardingUpdateStatementValidator兩種實(shí)現(xiàn),INSERT INTO .... ON DUPLICATE KEY UPDATE和UPDATE語(yǔ)法都會(huì)涉及到字段值的更新,Sharding-JDBC是不允許更新分片值的,畢竟修改分片值還需要將數(shù)據(jù)遷移至新分片值對(duì)應(yīng)的庫(kù)表中,才能保證數(shù)據(jù)分片規(guī)則一致。兩者的校驗(yàn)細(xì)節(jié)也有所不同:

  • INSERT INTO .... ON DUPLICATE KEY UPDATE僅僅是對(duì)UPDATE字段的校驗(yàn), ON DUPLICATE KEY UPDATE中包含分片鍵就會(huì)報(bào)錯(cuò);
  • 而UPDATE語(yǔ)句則會(huì)額外校驗(yàn)WHERE條件中分片鍵的原始值和SET的值是否一樣,不一樣則會(huì)拋出異常。

圖片

ShardingCondition中只有一個(gè)變量routeValues,RouteValue是一個(gè)接口,有ListRouteValue和RangeRouteValue兩種實(shí)現(xiàn),前者記錄了分片鍵的in或=條件的分片值,后者則記錄了范圍查詢的分片值,兩者被封裝為ShardingValue對(duì)象后,將會(huì)透?jìng)髦练制惴ㄖ杏?jì)算得到分片結(jié)果集。

ShardingCondition

public final class ShardingConditions {
     
    private final List<ShardingCondition> conditions;
}
 
public class ShardingCondition {
     
    private final List<RouteValue> routeValues = new LinkedList<>();
}
 
 
public final class ListRouteValue<T extends Comparable<?>> implements RouteValue {
     
    private final String columnName;
     
    private final String tableName;
    //in或=條件對(duì)應(yīng)的值
    private final Collection<T> values;
     
    @Override
    public String toString() {
        return tableName + "." + columnName + (1 == values.size() ? " = " + new ArrayList<>(values).get(0) : " in (" + Joiner.on(",").join(values) + ")");
    }
}
 
public final class RangeRouteValue<T extends Comparable<?>> implements RouteValue {
     
    private final String columnName;
     
    private final String tableName;
    //between and 大于小于等范圍值的上下限
    private final Range<T> valueRange;
}

生成分片條件后還會(huì)合并分片條件,但是前文提過(guò)在SelectStatementContext中的containsSubquery永遠(yuǎn)是false,所以這段邏輯永遠(yuǎn)返回false,即不會(huì)合并分片條件。

判斷是否需要合并分片條件

org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#isNeedMergeShardingValues#87
private boolean isNeedMergeShardingValues(final SQLStatementContext sqlStatementContext, final ShardingRule shardingRule) {
        return sqlStatementContext instanceof SelectStatementContext && ((SelectStatementContext) sqlStatementContext).isContainsSubquery()
                && !shardingRule.getShardingLogicTableNames(sqlStatementContext.getTablesContext().getTableNames()).isEmpty();
    }

然后就是通過(guò)分片路由引擎調(diào)用分片算法計(jì)算路由結(jié)果了,ShardingRouteEngine實(shí)現(xiàn)較多,介紹起來(lái)篇幅較多,這里就不展開(kāi)說(shuō)明了,可以參考官方文檔來(lái)了解路由引擎的選擇規(guī)則

圖片

▲圖片來(lái)源:ShardingSphere 官方文檔

Sharding-JDBC定義了多種分片策略和算法接口,主要的分配策略與算法說(shuō)明如下表所示:


圖片

補(bǔ)充兩個(gè)細(xì)節(jié):

(1)當(dāng)ALLOW_RANGE_QUERY_WITH

_INLINE_SHARDING配置設(shè)置true時(shí),InlineShardingStrategy支持范圍查詢,但是并不是根據(jù)分片值計(jì)算范圍,而是直接全路由至配置的數(shù)據(jù)節(jié)點(diǎn),會(huì)存在性能隱患。

InlineShardingStrategy.doSharding

org.apache.shardingsphere.core.strategy.route.inline.InlineShardingStrategy#doSharding
    public Collection<String> doSharding(final Collection<String> availableTargetNames, final Collection<RouteValue> shardingValues, final ConfigurationProperties properties) {
        RouteValue shardingValue = shardingValues.iterator().next();
        //ALLOW_RANGE_QUERY_WITH_INLINE_SHARDING設(shè)置為true,直接返回availableTargetNames,而不是根據(jù)RangeRouteValue計(jì)算
        if (properties.<Boolean>getValue(ConfigurationPropertyKey.ALLOW_RANGE_QUERY_WITH_INLINE_SHARDING) && shardingValue instanceof RangeRouteValue) {
            return availableTargetNames;
        }
        Preconditions.checkState(shardingValue instanceof ListRouteValue, "Inline strategy cannot support this type sharding:" + shardingValue.toString());
        Collection<String> shardingResult = doSharding((ListRouteValue) shardingValue);
        Collection<String> result = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
        for (String each : shardingResult) {
            if (availableTargetNames.contains(each)) {
                result.add(each);
            }
        }
        return result;
    }

(2)4.1.1的官方文檔雖然說(shuō)Hint可以跳過(guò)解析和改寫(xiě),但在我們上面解析引擎的源碼解析中,我們并沒(méi)有看到有對(duì)Hint策略的額外跳過(guò)。事實(shí)上,即使使用了Hint分片SQL也同樣需要解析重寫(xiě),也同樣受Sharding-JDBC的語(yǔ)法限制,這在官方的issue中也曾經(jīng)被提及。

圖片

▲圖片來(lái)源:ShardingSphere 官方文檔

主從路由

主從路由的核心邏輯就是通過(guò)MasterSlaveDataSourceRouter的route方法進(jìn)行判定SQL走主庫(kù)還是從庫(kù)。主從情況下,配置的數(shù)據(jù)源實(shí)際是一組主從,而不是單個(gè)的實(shí)例,所以需要通過(guò)masterSlaveRule獲取到具體的主庫(kù)或者從庫(kù)名字。

主從路由decorate

org.apache.shardingsphere.masterslave.route.engine.MasterSlaveRouteDecorator#decorate    
    public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final MasterSlaveRule masterSlaveRule, final ConfigurationProperties properties) {
        //為空證明沒(méi)有經(jīng)過(guò)分片路由
        if (routeContext.getRouteResult().getRouteUnits().isEmpty()) {
            //根據(jù)SQL判斷選擇走主庫(kù)還是從庫(kù)
            String dataSourceName = new MasterSlaveDataSourceRouter(masterSlaveRule).route(routeContext.getSqlStatementContext().getSqlStatement());
            RouteResult routeResult = new RouteResult();
           //根據(jù)具體的主庫(kù)/從庫(kù)名創(chuàng)建路由單元
            routeResult.getRouteUnits().add(new RouteUnit(new RouteMapper(dataSourceName, dataSourceName), Collections.emptyList()));
            return new RouteContext(routeContext.getSqlStatementContext(), Collections.emptyList(), routeResult);
        }
        Collection<RouteUnit> toBeRemoved = new LinkedList<>();
        Collection<RouteUnit> toBeAdded = new LinkedList<>();
        //不為空證明已經(jīng)被分片路由處理了
        for (RouteUnit each : routeContext.getRouteResult().getRouteUnits()) {
            if (masterSlaveRule.getName().equalsIgnoreCase(each.getDataSourceMapper().getActualName())) {
                //先標(biāo)記移除 因?yàn)檫@里是一組主從的名字而不是實(shí)際的庫(kù)
                toBeRemoved.add(each);
                //根據(jù)SQL判斷選擇走主庫(kù)還是從庫(kù)
                String actualDataSourceName = new MasterSlaveDataSourceRouter(masterSlaveRule).route(routeContext.getSqlStatementContext().getSqlStatement());
                //根據(jù)具體的主庫(kù)/從庫(kù)名創(chuàng)建路由單元
                toBeAdded.add(new RouteUnit(new RouteMapper(each.getDataSourceMapper().getLogicName(), actualDataSourceName), each.getTableMappers()));
            }
        }
        routeContext.getRouteResult().getRouteUnits().removeAll(toBeRemoved);
        routeContext.getRouteResult().getRouteUnits().addAll(toBeAdded);
        return routeContext;
    }

MasterSlaveDataSourceRouter中isMasterRoute方法會(huì)判斷SQL是否需要走主庫(kù),當(dāng)出現(xiàn)以下情況時(shí)走主庫(kù):

  • select語(yǔ)句包含鎖,如for update語(yǔ)句
  • 不是select語(yǔ)句
  • MasterVisitedManager.isMasterVisited()設(shè)置為true
  • HintManager.isMasterRouteOnly()設(shè)置為true

不走主庫(kù)則通過(guò)負(fù)載算法選擇從庫(kù),Sharding-JDBC提供了輪詢和隨機(jī)兩種算法。

MasterSlaveDataSourceRouter

public final class MasterSlaveDataSourceRouter {
     
    private final MasterSlaveRule masterSlaveRule;
     
    /**
     * Route.
     *
     * @param sqlStatement SQL statement
     * @return data source name
     */
    public String route(final SQLStatement sqlStatement) {
        if (isMasterRoute(sqlStatement)) {
            MasterVisitedManager.setMasterVisited();
            return masterSlaveRule.getMasterDataSourceName();
        }
        return masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(
                masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames()));
    }
     
    private boolean isMasterRoute(final SQLStatement sqlStatement) {
        return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly();
    }
     
    private boolean containsLockSegment(final SQLStatement sqlStatement) {
        return sqlStatement instanceof SelectStatement && ((SelectStatement) sqlStatement).getLock().isPresent();
    }
}

是否走主庫(kù)的信息存在MasterVisitedManager中,MasterVisitedManager是通過(guò)ThreadLocal實(shí)現(xiàn)的,但這種實(shí)現(xiàn)會(huì)有一個(gè)問(wèn)題,當(dāng)我們使用事務(wù)先查詢?cè)俑?插入時(shí),第一條查詢SQL并不會(huì)走主庫(kù),而是走從庫(kù),如果業(yè)務(wù)需要事務(wù)的第一條查詢也走主庫(kù),事務(wù)查詢前需要手動(dòng)調(diào)用一次MasterVisitedManager.setMasterVisited()。

MasterVisitedManager

public final class MasterVisitedManager {
     
    private static final ThreadLocal<Boolean> MASTER_VISITED = ThreadLocal.withInitial(() -> false);
     
    /**
     * Judge master data source visited in current thread.
     *
     * @return master data source visited or not in current thread
     */
    public static boolean isMasterVisited() {
        return MASTER_VISITED.get();
    }
     
    /**
     * Set master data source visited in current thread.
     */
    public static void setMasterVisited() {
        MASTER_VISITED.set(true);
    }
     
    /**
     * Clear master data source visited.
     */
    public static void clear() {
        MASTER_VISITED.remove();
    }
}

3.3.2 引擎總結(jié)

路由引擎的作用是將SQL根據(jù)參數(shù)通過(guò)實(shí)現(xiàn)的策略算法計(jì)算出實(shí)際該在哪些庫(kù)的哪些表執(zhí)行,也就是路由結(jié)果。路由引擎有兩種實(shí)現(xiàn),分別是分片路由和主從路由,兩者都提供了標(biāo)準(zhǔn)化的策略接口來(lái)讓業(yè)務(wù)實(shí)現(xiàn)自己的路由策略,分片路由需要注意自身SQL場(chǎng)景和策略算法相匹配,主從路由中同一線程且同一數(shù)據(jù)庫(kù)連接內(nèi),有寫(xiě)入操作后,之后的讀操作會(huì)從主庫(kù)讀取,寫(xiě)入操作前的讀操作不會(huì)走主庫(kù)。

3.4 改寫(xiě)引擎

3.4.1 引擎解析

經(jīng)過(guò)解析路由后雖然確定了執(zhí)行的實(shí)際庫(kù)表,但SQL中表名依舊是邏輯表,不能執(zhí)行,改寫(xiě)引擎可以將邏輯表替換為物理表。同時(shí),路由至多庫(kù)表的SQL也需要拆分為多條SQL執(zhí)行。

改寫(xiě)的入口仍舊在BasePrepareEngine中,創(chuàng)建重寫(xiě)上下文createSQLRewriteContext,再根據(jù)上下文進(jìn)行改寫(xiě)rewrite,最終返回執(zhí)行單元ExecutionUnit。

改寫(xiě)邏輯入口

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#executeRewrite
    private Collection<ExecutionUnit> executeRewrite(final String sql, final List<Object> parameters, final RouteContext routeContext) {
        //注冊(cè)重寫(xiě)裝飾器
        registerRewriteDecorator();
        //創(chuàng)建 SQLRewriteContext
        SQLRewriteContext sqlRewriteContext = rewriter.createSQLRewriteContext(sql, parameters, routeContext.getSqlStatementContext(), routeContext);
        //重寫(xiě)
        return routeContext.getRouteResult().getRouteUnits().isEmpty() ? rewrite(sqlRewriteContext) : rewrite(routeContext, sqlRewriteContext);
    }

執(zhí)行單元包含了數(shù)據(jù)源名稱(chēng),改寫(xiě)后的SQL,以及對(duì)應(yīng)的參數(shù),SQL一樣的兩個(gè)SQLUnit會(huì)被視為相等。

ExecutionUnit

@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class ExecutionUnit {
     
    private final String dataSourceName;
     
    private final SQLUnit sqlUnit;
}
 
@AllArgsConstructor
@RequiredArgsConstructor
@Getter
@Setter
//根據(jù)sql判斷是否相等
@EqualsAndHashCode(of = { "sql" })
@ToString
public final class SQLUnit {
 
    private String sql;
 
    private final List<Object> parameters;
 
}

createSQLRewriteContext完成了兩件事,一個(gè)是對(duì)SQL參數(shù)進(jìn)行了重寫(xiě),一個(gè)是生成了SQLToken。

createSQLRewriteContext

org.apache.shardingsphere.underlying.rewrite.SQLRewriteEntry#createSQLRewriteContext
    public SQLRewriteContext createSQLRewriteContext(final String sql, final List<Object> parameters, final SQLStatementContext sqlStatementContext, final RouteContext routeContext) {
        SQLRewriteContext result = new SQLRewriteContext(schemaMetaData, sqlStatementContext, sql, parameters);
        //sql參數(shù)重寫(xiě)
        decorate(decorators, result, routeContext);
        //生成SQLToken
        result.generateSQLTokens();
        return result;
    }
 
org.apache.shardingsphere.sharding.rewrite.context.ShardingSQLRewriteContextDecorator#decorate
    public void decorate(final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLRewriteContext sqlRewriteContext) {
        for (ParameterRewriter each : new ShardingParameterRewriterBuilder(shardingRule, routeContext).getParameterRewriters(sqlRewriteContext.getSchemaMetaData())) {
            if (!sqlRewriteContext.getParameters().isEmpty() && each.isNeedRewrite(sqlRewriteContext.getSqlStatementContext())) {
                //參數(shù)重寫(xiě)
                each.rewrite(sqlRewriteContext.getParameterBuilder(), sqlRewriteContext.getSqlStatementContext(), sqlRewriteContext.getParameters());
            }
        }
        //sqlTokenGenerators
        sqlRewriteContext.addSQLTokenGenerators(new ShardingTokenGenerateBuilder(shardingRule, routeContext).getSQLTokenGenerators());
    }
 
org.apache.shardingsphere.underlying.rewrite.context.SQLRewriteContext#generateSQLTokens
    public void generateSQLTokens() {
        sqlTokens.addAll(sqlTokenGenerators.generateSQLTokens(sqlStatementContext, parameters, schemaMetaData));
    }

ParameterRewriter中與分片相關(guān)的實(shí)現(xiàn)有兩種。

圖片

//*詳細(xì)的例子可以參考官方文檔中分頁(yè)修正和補(bǔ)列部分

SQLToken記錄了SQL中每個(gè)token(解析引擎中提過(guò)的不可再分的原子符號(hào))的起始位置,從而方便改寫(xiě)引擎知道哪些位置需要改寫(xiě)。

SQLToken

@RequiredArgsConstructor
@Getter
public abstract class SQLToken implements Comparable<SQLToken> {
     
    private final int startIndex;
     
    @Override
    public final int compareTo(final SQLToken sqlToken) {
        return startIndex - sqlToken.getStartIndex();
    }
}

創(chuàng)建完SQLRewriteContext后就對(duì)整條SQL進(jìn)行重寫(xiě)和組裝參數(shù),可以看出每個(gè)RouteUnit都會(huì)重寫(xiě)SQL并獲取自己對(duì)應(yīng)的參數(shù)。

SQLRouteRewriteEngine.rewrite

org.apache.shardingsphere.underlying.rewrite.engine.SQLRouteRewriteEngine#rewrite
    public Map<RouteUnit, SQLRewriteResult> rewrite(final SQLRewriteContext sqlRewriteContext, final RouteResult routeResult) {
        Map<RouteUnit, SQLRewriteResult> result = new LinkedHashMap<>(routeResult.getRouteUnits().size(), 1);
        for (RouteUnit each : routeResult.getRouteUnits()) {
            //重寫(xiě)SQL+組裝參數(shù)
            result.put(each, new SQLRewriteResult(new RouteSQLBuilder(sqlRewriteContext, each).toSQL(), getParameters(sqlRewriteContext.getParameterBuilder(), routeResult, each)));
        }
        return result;
    }

toSQL核心就是根據(jù)SQLToken將SQL拆分改寫(xiě)再拼裝,比如:

select * from t_order where created_by = '123' 

就會(huì)被拆分為select * from | t_order | where created_by = '123'三部分進(jìn)行改寫(xiě)拼裝。

toSQL

org.apache.shardingsphere.underlying.rewrite.sql.impl.AbstractSQLBuilder#toSQL
    public final String toSQL() {
        if (context.getSqlTokens().isEmpty()) {
            return context.getSql();
        }
        Collections.sort(context.getSqlTokens());
        StringBuilder result = new StringBuilder();
        //截取第一個(gè)SQLToken之前的內(nèi)容  select * from
        result.append(context.getSql().substring(0, context.getSqlTokens().get(0).getStartIndex()));
        for (SQLToken each : context.getSqlTokens()) {
            //重寫(xiě)拼接每個(gè)SQLToken對(duì)應(yīng)的內(nèi)容  t_order ->t_order_0
            result.append(getSQLTokenText(each));
            //拼接SQLToken中間不變的內(nèi)容 where created_by = '123'
            result.append(getConjunctionText(each));
        }
        return result.toString();
    }

ParameterBuilder有StandardParameterBuilder和GroupedParameterBuilder兩個(gè)實(shí)現(xiàn)。

  • StandardParameterBuilder:適用于非insert語(yǔ)句,getParameters無(wú)需分組處理直接返回即可。
  • GroupedParameterBuilder:適用于insert語(yǔ)句,需要根據(jù)路由情況對(duì)參數(shù)進(jìn)行分組。

原因和樣例可以參考官方文檔批量拆分部分。

getParameters

org.apache.shardingsphere.underlying.rewrite.engine.SQLRouteRewriteEngine#getParameters
    private List<Object> getParameters(final ParameterBuilder parameterBuilder, final RouteResult routeResult, final RouteUnit routeUnit) {
        if (parameterBuilder instanceof StandardParameterBuilder || routeResult.getOriginalDataNodes().isEmpty() || parameterBuilder.getParameters().isEmpty()) {
            //非插入語(yǔ)句直接返回
            return parameterBuilder.getParameters();
        }
        List<Object> result = new LinkedList<>();
        int count = 0;
        for (Collection<DataNode> each : routeResult.getOriginalDataNodes()) {
            if (isInSameDataNode(each, routeUnit)) {
                //插入語(yǔ)句參數(shù)分組構(gòu)造
                result.addAll(((GroupedParameterBuilder) parameterBuilder).getParameters(count));
            }
            count++;
        }
        return result;
    }

3.4.2 引擎總結(jié)

改寫(xiě)引擎的作用是將邏輯SQL轉(zhuǎn)換為實(shí)際可執(zhí)行的SQL,這其中既有邏輯表名的替換,也有多路由的SQL拆分,還有為了后續(xù)歸并操作而進(jìn)行的分頁(yè)、分組、排序等改寫(xiě),select語(yǔ)句不會(huì)對(duì)參數(shù)進(jìn)行重組,而insert語(yǔ)句為了避免插入多余數(shù)據(jù),會(huì)通過(guò)路由單元對(duì)參數(shù)進(jìn)行重組。

3.5 執(zhí)行引擎

3.5.1 引擎解析

改寫(xiě)完成后的SQL就可以執(zhí)行了,執(zhí)行引擎需要平衡好資源和效率,如果為每條真實(shí)SQL都創(chuàng)建一個(gè)數(shù)據(jù)庫(kù)連接顯然會(huì)造成資源的濫用,但如果單線程串行也必然會(huì)影響執(zhí)行效率。

執(zhí)行引擎會(huì)先將執(zhí)行單元中需要執(zhí)行的SQLUnit根據(jù)數(shù)據(jù)源分組,同一個(gè)數(shù)據(jù)源下的SQLUnit會(huì)放入一個(gè)list,然后會(huì)根據(jù)maxConnectionsSizePerQuery對(duì)同一個(gè)數(shù)據(jù)源的SQLUnit繼續(xù)分組,創(chuàng)建連接并綁定SQLUnit。

執(zhí)行組創(chuàng)建

org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate#getSynchronizedExecuteUnitGroups
    private Collection<InputGroup<StatementExecuteUnit>> getSynchronizedExecuteUnitGroups(
            final Collection<ExecutionUnit> executionUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        //根據(jù)數(shù)據(jù)源將SQLUnit分組 key=dataSourceName
        Map<String, List<SQLUnit>> sqlUnitGroups = getSQLUnitGroups(executionUnits);
        Collection<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();
        //創(chuàng)建sql執(zhí)行組
        for (Entry<String, List<SQLUnit>> entry : sqlUnitGroups.entrySet()) {
            result.addAll(getSQLExecuteGroups(entry.getKey(), entry.getValue(), callback));
        }
        return result;
    }
 
org.apache.shardingsphere.sharding.execute.sql.prepare.SQLExecutePrepareTemplate#getSQLExecuteGroups
    private List<InputGroup<StatementExecuteUnit>> getSQLExecuteGroups(final String dataSourceName,
                                                                       final List<SQLUnit> sqlUnits, final SQLExecutePrepareCallback callback) throws SQLException {
        List<InputGroup<StatementExecuteUnit>> result = new LinkedList<>();
        //每個(gè)連接需要執(zhí)行的最大sql數(shù)量
        int desiredPartitionSize = Math.max(0 == sqlUnits.size() % maxConnectionsSizePerQuery ? sqlUnits.size() / maxConnectionsSizePerQuery : sqlUnits.size() / maxConnectionsSizePerQuery + 1, 1);
        //分組,每組對(duì)應(yīng)一條數(shù)據(jù)庫(kù)連接
        List<List<SQLUnit>> sqlUnitPartitions = Lists.partition(sqlUnits, desiredPartitionSize);
        //選擇連接模式 連接限制/內(nèi)存限制
        ConnectionMode connectionMode = maxConnectionsSizePerQuery < sqlUnits.size() ? ConnectionMode.CONNECTION_STRICTLY : ConnectionMode.MEMORY_STRICTLY;
        //創(chuàng)建連接
        List<Connection> connections = callback.getConnections(connectionMode, dataSourceName, sqlUnitPartitions.size());
        int count = 0;
        for (List<SQLUnit> each : sqlUnitPartitions) {
            //綁定連接和SQLUnit 創(chuàng)建StatementExecuteUnit
            result.add(getSQLExecuteGroup(connectionMode, connections.get(count++), dataSourceName, each, callback));
        }
        return result;
    }

SQLUnit分組和連接模式選擇沒(méi)有任何關(guān)系,連接模式的選擇只取決于maxConnectionsSizePerQuery和SQLUnit數(shù)量的大小關(guān)系,maxConnectionsSizePerQuery代表了一個(gè)數(shù)據(jù)源一次查詢?cè)试S的最大連接數(shù)。

  • 當(dāng)maxConnectionsSizePerQuery<sqlunit數(shù)量時(shí),意味著無(wú)法做到每個(gè)sqlunit獨(dú)享一個(gè)連接,需要直接查詢出結(jié)果集至內(nèi)存中;< li="">
  • 當(dāng)maxConnectionsSizePerQuery>=SQLUnit數(shù)量時(shí),意味著可以支持每個(gè)SQLUnit獨(dú)享一個(gè)連接,可以通過(guò)ResultSet游標(biāo)下移的方式查詢結(jié)果集。

不過(guò)maxConnectionsSizePerQuery默認(rèn)值為1,所以當(dāng)一條SQL需要路由至多張表時(shí)(即有多個(gè)SQLUnit)會(huì)采用連接限制,當(dāng)路由至單表時(shí)是內(nèi)存限制模式。

圖片

為了避免產(chǎn)生數(shù)據(jù)庫(kù)連接死鎖問(wèn)題,在內(nèi)存限制模式時(shí),Sharding-JDBC通過(guò)鎖住數(shù)據(jù)源對(duì)象一次性創(chuàng)建出本條SQL需要的所有數(shù)據(jù)庫(kù)連接。連接限制模式下,各連接一次性查出各自的結(jié)果,不會(huì)出現(xiàn)多連接相互等待的情況,因此不會(huì)發(fā)生死鎖,而內(nèi)存限制模式通過(guò)游標(biāo)讀取結(jié)果集,需要多條連接去查詢不同的表做合并,如果不一次性拿到所有需要的連接,則可能存在連接相互等待的情況造成死鎖。可以參照官方文檔中執(zhí)行引擎相關(guān)例子

不同連接模式創(chuàng)建連接

private List<Connection> createConnections(final String dataSourceName, final ConnectionMode connectionMode, final DataSource dataSource, final int connectionSize) throws SQLException {
    if (1 == connectionSize) {
        Connection connection = createConnection(dataSourceName, dataSource);
        replayMethodsInvocation(connection);
        return Collections.singletonList(connection);
    }
    if (ConnectionMode.CONNECTION_STRICTLY == connectionMode) {
        return createConnections(dataSourceName, dataSource, connectionSize);
    }
    //內(nèi)存限制模式加鎖 一次性獲取所有的連接
    synchronized (dataSource) {
        return createConnections(dataSourceName, dataSource, connectionSize);
    }
}

此外,結(jié)果集的內(nèi)存合并和流式合并只在調(diào)用JDBC的executeQuery的情況下生效,如果使用execute方式進(jìn)行查詢,都是統(tǒng)一使用流式方式的查詢。

查詢結(jié)果歸并對(duì)比

org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#executeQuery#101 
  org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#getQueryResult
    private QueryResult getQueryResult(final Statement statement, final ConnectionMode connectionMode) throws SQLException {
        PreparedStatement preparedStatement = (PreparedStatement) statement;
        ResultSet resultSet = preparedStatement.executeQuery();
        getResultSets().add(resultSet);
        //executeQuery 中根據(jù)連接模式選擇流式/內(nèi)存
        return ConnectionMode.MEMORY_STRICTLY == connectionMode ? new StreamQueryResult(resultSet) : new MemoryQueryResult(resultSet);
    }
 
//execute 單獨(dú)調(diào)用getResultSet中只會(huì)使用流式合并
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet#158
  org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getQueryResults 
    private List<QueryResult> getQueryResults(final List<ResultSet> resultSets) throws SQLException {
        List<QueryResult> result = new ArrayList<>(resultSets.size());
        for (ResultSet each : resultSets) {
            if (null != each) {
                result.add(new StreamQueryResult(each));
            }
        }
        return result;
    }

多條連接的執(zhí)行方式分為串行和并行,在本地事務(wù)和XA事務(wù)中是串行的方式,其余情況是并行,具體的執(zhí)行邏輯這里就不再展開(kāi)了。

isHoldTransaction

public boolean isHoldTransaction() {
        return (TransactionType.LOCAL == transactionType && !getAutoCommit()) || (TransactionType.XA == transactionType && isInShardingTransaction());
    }

3.5.2 引擎總結(jié)

執(zhí)行引擎通過(guò)maxConnectionsSizePerQuery和同數(shù)據(jù)源的SQLUnit的數(shù)量大小確定連接模式,maxConnectionsSizePerQuery=SQLUnit數(shù)量使用內(nèi)存限制模式,當(dāng)使用內(nèi)存限制模式時(shí)會(huì)通過(guò)對(duì)數(shù)據(jù)源對(duì)象加鎖來(lái)保證一次性獲取本條SQL需要的連接而避免死鎖。在使用executeQuery查詢時(shí),處理結(jié)果集時(shí)會(huì)根據(jù)連接模式選擇流式或者內(nèi)存合并,但使用execute方法查詢,處理結(jié)果集只會(huì)使用流式合并。

3.6 歸并引擎

3.6.1 引擎解析

查詢出的結(jié)果集需要經(jīng)過(guò)歸并引擎歸并后才是最終的結(jié)果,歸并的核心入口在MergeEntry的process方法中,優(yōu)先處理分片場(chǎng)景的合并,再進(jìn)行脫敏,只有讀寫(xiě)分離的情況下則直接返回TransparentMergedResult,TransparentMergedResult實(shí)際上沒(méi)做合并的額外處理,其內(nèi)部實(shí)現(xiàn)都是完全調(diào)用queryResult的實(shí)現(xiàn)。

圖片

歸并邏輯入口

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#mergeQuery#190
 org.apache.shardingsphere.underlying.pluggble.merge.MergeEngine#merge#61
    org.apache.shardingsphere.underlying.merge.MergeEntry#process
    public MergedResult process(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
        //分片合并
        Optional<MergedResult> mergedResult = merge(queryResults, sqlStatementContext);
        //脫敏處理
        Optional<MergedResult> result = mergedResult.isPresent() ? Optional.of(decorate(mergedResult.get(), sqlStatementContext)) : decorate(queryResults.get(0), sqlStatementContext);
        //只有讀寫(xiě)分離的情況下,orElseGet會(huì)不存在,TransparentMergedResult
        return result.orElseGet(() -> new TransparentMergedResult(queryResults.get(0)));
    }

TransparentMergedResult

@RequiredArgsConstructor
public final class TransparentMergedResult implements MergedResult {
     
    private final QueryResult queryResult;
     
    @Override
    public boolean next() throws SQLException {
        return queryResult.next();
    }
     
    @Override
    public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
        return queryResult.getValue(columnIndex, type);
    }
     
    @Override
    public Object getCalendarValue(final int columnIndex, final Class<?> type, final Calendar calendar) throws SQLException {
        return queryResult.getCalendarValue(columnIndex, type, calendar);
    }
     
    @Override
    public InputStream getInputStream(final int columnIndex, final String type) throws SQLException {
        return queryResult.getInputStream(columnIndex, type);
    }
     
    @Override
    public boolean wasNull() throws SQLException {
        return queryResult.wasNull();
    }
}

我們只看分片相關(guān)的操作,ResultMergerEngine只有一個(gè)實(shí)現(xiàn)類(lèi)ShardingResultMergerEngine,所以只有存在分片情況的時(shí)候,上文的第一個(gè)merge才會(huì)有結(jié)果。根據(jù)SQL類(lèi)型的不同選擇ResultMerger實(shí)現(xiàn),查詢類(lèi)的合并是最常用也是最復(fù)雜的合并。

MergeEntry.merge

org.apache.shardingsphere.underlying.merge.MergeEntry#merge
    private Optional<MergedResult> merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext) throws SQLException {
        for (Entry<BaseRule, ResultProcessEngine> entry : engines.entrySet()) {
            if (entry.getValue() instanceof ResultMergerEngine) {
                //選擇不同類(lèi)型的 resultMerger
                ResultMerger resultMerger = ((ResultMergerEngine) entry.getValue()).newInstance(databaseType, entry.getKey(), properties, sqlStatementContext);
                //歸并
                return Optional.of(resultMerger.merge(queryResults, sqlStatementContext, schemaMetaData));
            }
        }
        return Optional.empty();
    }
 
org.apache.shardingsphere.sharding.merge.ShardingResultMergerEngine#newInstance
    public ResultMerger newInstance(final DatabaseType databaseType, final ShardingRule shardingRule, final ConfigurationProperties properties, final SQLStatementContext sqlStatementContext) {
        if (sqlStatementContext instanceof SelectStatementContext) {
            return new ShardingDQLResultMerger(databaseType);
        }
        if (sqlStatementContext.getSqlStatement() instanceof DALStatement) {
            return new ShardingDALResultMerger(shardingRule);
        }
        return new TransparentResultMerger();
    }

ShardingDQLResultMerger的merge方法就是根據(jù)SQL解析結(jié)果中包含的token選擇合適的歸并方式(分組聚合、排序、遍歷),歸并后的mergedResult統(tǒng)一經(jīng)過(guò)decorate方法進(jìn)行判斷是否需要分頁(yè)歸并,整體處理流程圖可以概括如下。

歸并方式選擇

org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#merge
    public MergedResult merge(final List<QueryResult> queryResults, final SQLStatementContext sqlStatementContext, final SchemaMetaData schemaMetaData) throws SQLException {
        if (1 == queryResults.size()) {
            return new IteratorStreamMergedResult(queryResults);
        }
        Map<String, Integer> columnLabelIndexMap = getColumnLabelIndexMap(queryResults.get(0));
        SelectStatementContext selectStatementContext = (SelectStatementContext) sqlStatementContext;
        selectStatementContext.setIndexes(columnLabelIndexMap);
        //分組聚合,排序,遍歷
        MergedResult mergedResult = build(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
        //分頁(yè)歸并
        return decorate(queryResults, selectStatementContext, mergedResult);
    }
 
org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#build
    private MergedResult build(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext,
                               final Map<String, Integer> columnLabelIndexMap, final SchemaMetaData schemaMetaData) throws SQLException {
        if (isNeedProcessGroupBy(selectStatementContext)) {
            //分組聚合歸并
            return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
        }
        if (isNeedProcessDistinctRow(selectStatementContext)) {
            setGroupByForDistinctRow(selectStatementContext);
            //分組聚合歸并
            return getGroupByMergedResult(queryResults, selectStatementContext, columnLabelIndexMap, schemaMetaData);
        }
        if (isNeedProcessOrderBy(selectStatementContext)) {
            //排序歸并
            return new OrderByStreamMergedResult(queryResults, selectStatementContext, schemaMetaData);
        }
        //遍歷歸并
        return new IteratorStreamMergedResult(queryResults);
    }
 
org.apache.shardingsphere.sharding.merge.dql.ShardingDQLResultMerger#decorate
    private MergedResult decorate(final List<QueryResult> queryResults, final SelectStatementContext selectStatementContext, final MergedResult mergedResult) throws SQLException {
        PaginationContext paginationContext = selectStatementContext.getPaginationContext();
        if (!paginationContext.isHasPagination() || 1 == queryResults.size()) {
            return mergedResult;
        }
        String trunkDatabaseName = DatabaseTypes.getTrunkDatabaseType(databaseType.getName()).getName();
        //根據(jù)數(shù)據(jù)庫(kù)類(lèi)型分頁(yè)歸并
        if ("MySQL".equals(trunkDatabaseName) || "PostgreSQL".equals(trunkDatabaseName)) {
            return new LimitDecoratorMergedResult(mergedResult, paginationContext);
        }
        if ("Oracle".equals(trunkDatabaseName)) {
            return new RowNumberDecoratorMergedResult(mergedResult, paginationContext);
        }
        if ("SQLServer".equals(trunkDatabaseName)) {
            return new TopAndRowNumberDecoratorMergedResult(mergedResult, paginationContext);
        }
        return mergedResult;
    }

每種歸并方式的作用在官方文檔有比較詳細(xì)的案例,這里就不再重復(fù)介紹了。

3.6.2 引擎總結(jié)

歸并引擎是Sharding-JDBC執(zhí)行SQL的最后一步,其作用是將多個(gè)數(shù)節(jié)點(diǎn)的結(jié)果集組合為一個(gè)正確的結(jié)果集返回,查詢類(lèi)的歸并有分組歸并、聚合歸并、排序歸并、遍歷歸并、分頁(yè)歸并五種,這五種歸并方式并不是互斥的,而是相互組合的。

四、定制開(kāi)發(fā)

在使用Sharding-JDBC過(guò)程中,我們發(fā)現(xiàn)了一些問(wèn)題可以改進(jìn),比如存量系統(tǒng)數(shù)據(jù)量到達(dá)一定規(guī)模而需要分庫(kù)分表引入Sharding-JDBC時(shí),就會(huì)存在兩大問(wèn)題。

一個(gè)是存量數(shù)據(jù)的遷移,這個(gè)問(wèn)題我們可以通過(guò)分片算法兼容,前文已經(jīng)提過(guò)分片鍵的值是不允許更改的,而且SQL如果不包含分片鍵,如果這個(gè)分片鍵對(duì)應(yīng)的值是遞增的(如id,時(shí)間等),我們可以設(shè)置一個(gè)閾值,在分片算法的doSharding中判斷分片值與閾值的大小決定將數(shù)據(jù)路由至舊表或新表,避免數(shù)據(jù)遷移的麻煩。如果是根據(jù)用戶id取模分表,而新增的數(shù)據(jù)無(wú)法只通過(guò)用戶id判斷,這時(shí)可以考慮采用復(fù)合分片算法,將用戶id與訂單id或者時(shí)間等遞增的字段同時(shí)設(shè)置為分片鍵,根據(jù)訂單id或時(shí)間判斷是否是新數(shù)據(jù),再根據(jù)用戶id取模得到路由結(jié)果即可。

另一個(gè)是Sharding-JDBC語(yǔ)法限制會(huì)使得存量SQL面對(duì)巨大的改造壓力,而實(shí)際上業(yè)務(wù)更關(guān)心的是需要分片的表,非分片的表不應(yīng)該發(fā)生改動(dòng)和影響。實(shí)際上,非分片表理論上無(wú)需通過(guò)解析、路由、重寫(xiě)、合并,為此我們?cè)谠创a層面對(duì)這段邏輯進(jìn)行了優(yōu)化,支持跳過(guò)部分解析,完全跳過(guò)分片路由、重寫(xiě)和合并,盡可能減少Sharding-JDBC對(duì)非分片表的語(yǔ)法限制,來(lái)減少業(yè)務(wù)系統(tǒng)的改造壓力與風(fēng)險(xiǎn)。

圖片

4.1 跳過(guò)Sharding語(yǔ)法限制

Sharding-JDBC執(zhí)行解析路由重寫(xiě)的邏輯都是在BasePrepareEngine中,最終構(gòu)造ExecutionContext交由執(zhí)行引擎執(zhí)行,ExecutionContext中包含sqlStatementContext和executionUnits,非分片表不涉及路由改寫(xiě),所以其ExecutionUnit我們非常容易手動(dòng)構(gòu)造,而查看SQLStatementContext的使用情況,我們發(fā)現(xiàn)SQLStatementContext只會(huì)影響結(jié)果集的合并而不會(huì)影響實(shí)際的執(zhí)行,而不分片表也無(wú)需進(jìn)行結(jié)果集的合并,整體實(shí)現(xiàn)思路如圖。

圖片

ExecutionContext相關(guān)對(duì)象

public class ExecutionContext {
 
    private final SQLStatementContext sqlStatementContext;
 
    private final Collection<ExecutionUnit> executionUnits = new LinkedHashSet<>();
}
 
public final class ExecutionUnit {
     
    private final String dataSourceName;
     
    private final SQLUnit sqlUnit;
}
 
public final class SQLUnit {
 
    private String sql;
 
    private final List<Object> parameters;
 
}

(1)校驗(yàn)SQL中是否包含分片表:我們是通過(guò)正則將SQL中的各個(gè)單詞分隔成Set,然后再遍歷BaseRule判斷是否存在分片表。大家可能會(huì)奇怪明明解析引擎可以幫我們解析出SQL中的表名,為什么還要自己來(lái)解析。因?yàn)槲覀儨y(cè)試的過(guò)程中發(fā)現(xiàn),存量業(yè)務(wù)上的SQL很多在解析階段就會(huì)報(bào)錯(cuò),只能提前判斷,當(dāng)然這種判斷方式并不嚴(yán)謹(jǐn),比如 SELECT order_id FROM t_order_record WHERE order_id=1 AND remarks=' t_order xxx';,配置的分片表t_order時(shí)就會(huì)存在誤判,但這種場(chǎng)景在我們的業(yè)務(wù)中沒(méi)有,所以暫時(shí)并沒(méi)有處理。由于這個(gè)信息需要在多個(gè)對(duì)象方法中使用,為了避免修改大量的對(duì)象變量和方法入?yún)?,而又能方便的透?jìng)鬟@個(gè)信息,判斷的結(jié)果我們選擇放在ThreadLocal里。

RuleContextManager

public final class RuleContextManager {
 
    private static final ThreadLocal<RuleContextManager> SKIP_CONTEXT_HOLDER = ThreadLocal.withInitial(RuleContextManager::new);
 
    /**
     * 是否跳過(guò)sharding
     */
    private boolean skipSharding;
 
    /**
     * 是否路由至主庫(kù)
     */
    private boolean masterRoute;
 
    public static boolean isSkipSharding() {
        return SKIP_CONTEXT_HOLDER.get().skipSharding;
    }
 
    public static void setSkipSharding(boolean skipSharding) {
        SKIP_CONTEXT_HOLDER.get().skipSharding = skipSharding;
    }
 
    public static boolean isMasterRoute() {
 
        return SKIP_CONTEXT_HOLDER.get().masterRoute;
    }
 
    public static void setMasterRoute(boolean masterRoute) {
        SKIP_CONTEXT_HOLDER.get().masterRoute = masterRoute;
    }
 
    public static void clear(){
        SKIP_CONTEXT_HOLDER.remove();
    }
 
}

判斷SQL是否包含分片表

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#buildSkipContext
// 判斷是否可以跳過(guò)sharding,構(gòu)造RuleContextManager的值
private void buildSkipContext(final String sql){
    Set<String> sqlTokenSet = new HashSet<>(Arrays.asList(sql.split("[\\s]")));
        if (CollectionUtils.isNotEmpty(rules)) {
            for (BaseRule baseRule : rules) {
                //定制方法,ShardingRule實(shí)現(xiàn),判斷sqlTokenSet是否包含邏輯表即可
                if(baseRule.hasContainShardingTable(sqlTokenSet)){
                    RuleContextManager.setSkipSharding(false);
                    break;
                }else {
                    RuleContextManager.setSkipSharding(true);
                }
            }
        }
}
 
org.apache.shardingsphere.core.rule.ShardingRule#hasContainShardingTable
public Boolean hasContainShardingTable(Set<String> sqlTokenSet) {
      //logicTableNameList通過(guò)遍歷TableRule可以得到
       for (String logicTable : logicTableNameList) {
            if (sqlTokenSet.contains(logicTable)) {
                return true;
            }
        }
        return false;
    }

(2)跳過(guò)解析路由:通過(guò)RuleContextManager中的skipSharding判斷是否需要跳過(guò)Sharding解析路由,但為了兼容讀寫(xiě)分離的場(chǎng)景,我們還需要知道這條SQL應(yīng)該走主庫(kù)還是從庫(kù),走主庫(kù)的場(chǎng)景在后面強(qiáng)制路由主庫(kù)部分有說(shuō)明,SQL走主庫(kù)實(shí)際上只有兩種情況,一種是非SELECT語(yǔ)句,另一種就是SELECT語(yǔ)句帶鎖,如SELECT...FOR UPDATE,因此整體實(shí)現(xiàn)的步驟如下:

  • 如果標(biāo)記了跳過(guò)Sharding且不為select語(yǔ)句,直接返回SkipShardingStatement,單獨(dú)構(gòu)造一個(gè)SkipShardingStatement的目的是為了能利用解析引擎中的緩存,緩存中不能放入null值。
  • 如果是select語(yǔ)句需要繼續(xù)解析,判斷是否有鎖后直接返回,避免后續(xù)解析造成語(yǔ)法不兼容,這里也曾嘗試用反射獲取lockClause來(lái)判斷是否包含鎖,但最終沒(méi)有成功。
  • ShardingRouteDecorator根據(jù)

   RuleContextManager.isSkipSharding判斷是否跳過(guò)路由。

跳過(guò)解析路由

public class SkipShardingStatement implements SQLStatement{
    @Override
    public int getParameterCount() {
        return 0;
    }
}
 
org.apache.shardingsphere.sql.parser.SQLParserEngine#parse0
    private SQLStatement parse0(final String sql, final boolean useCache) {
        if (useCache) {
            Optional<SQLStatement> cachedSQLStatement = cache.getSQLStatement(sql);
            if (cachedSQLStatement.isPresent()) {
                return cachedSQLStatement.get();
            }
        }
        ParseTree parseTree = new SQLParserExecutor(databaseTypeName, sql).execute().getRootNode();
        /**
         * 跳過(guò)sharding 需要判斷是否需要路由至主庫(kù) 如果不是select語(yǔ)句直接跳過(guò)
         * 是select語(yǔ)句則需要通過(guò)繼續(xù)解析判斷是否有鎖
         */
        SQLStatement result ;
        if(RuleContextManager.isSkipSharding()&&!VisitorRule.SELECT.equals(VisitorRule.valueOf(parseTree.getClass()))){
            RuleContextManager.setMasterRoute(true);
            result = new SkipShardingStatement();
        }else {
            result = (SQLStatement) ParseTreeVisitorFactory.newInstance(databaseTypeName, VisitorRule.valueOf(parseTree.getClass())).visit(parseTree);
        }
        if (useCache) {
            cache.put(sql, result);
        }
        return result;
    }
 
org.apache.shardingsphere.sql.parser.mysql.visitor.impl.MySQLDMLVisitor#visitSelectClause
    public ASTNode visitSelectClause(final SelectClauseContext ctx) {
        SelectStatement result = new SelectStatement();
        // 跳過(guò)sharding 只需要判斷是否有鎖來(lái)決定是否路由至主庫(kù)即可
        if(RuleContextManager.isSkipSharding()){
            if (null != ctx.lockClause()) {
                result.setLock((LockSegment) visit(ctx.lockClause()));
                RuleContextManager.setMasterRoute(true);
            }
            return result;
        }
        //...后續(xù)解析
    }
 
org.apache.shardingsphere.underlying.route.DataNodeRouter#createRouteContext
    private RouteContext createRouteContext(final String sql, final List<Object> parameters, final boolean useCache) {
        SQLStatement sqlStatement = parserEngine.parse(sql, useCache);
        //如果需要跳過(guò)sharding 不進(jìn)行后續(xù)的解析直接返回
        if (RuleContextManager.isSkipSharding()) {
            return new RouteContext(sqlStatement, parameters, new RouteResult());
        }
        //...解析
    }
 
org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate
    public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
        // 跳過(guò)sharding路由
        if(RuleContextManager.isSkipSharding()){
            return routeContext;
        }
        //...路由
    }

(3)手動(dòng)構(gòu)造ExecutionUnit:ExecutionUnit中我們需要確定的內(nèi)容就是datasourceName,這里我們認(rèn)為跳過(guò)Sharding的SQL最終執(zhí)行的庫(kù)一定只有一個(gè)。如果只是跳過(guò)Sharding的情況,直接從元數(shù)據(jù)中獲取數(shù)據(jù)源名稱(chēng)即可,如果存在讀寫(xiě)分離的情況,主從路由的結(jié)果也一定是唯一的。創(chuàng)建完ExecutionUnit直接放入ExecutionContext返回即可,從而跳過(guò)后續(xù)的改寫(xiě)邏輯。

手動(dòng)構(gòu)造ExecutionUnit

public ExecutionContext prepare(final String sql, final List<Object> parameters) {
    List<Object> clonedParameters = cloneParameters(parameters);
    // 判斷是否可以跳過(guò)sharding,構(gòu)造RuleContextManager的值
    buildSkipContext(sql);  
    RouteContext routeContext = executeRoute(sql, clonedParameters);
    ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
    // 跳過(guò)sharding的sql最后的路由結(jié)果一定只有一個(gè)庫(kù)
    if(RuleContextManager.isSkipSharding()){
        log.debug("可以跳過(guò)sharding的場(chǎng)景 {}", sql);
        if(!Objects.isNull(routeContext.getRouteResult())){
            Collection<String> allInstanceDataSourceNames = this.metaData.getDataSources().getAllInstanceDataSourceNames();
            int routeUnitsSize = routeContext.getRouteResult().getRouteUnits().size();
            /*
             * 1. 沒(méi)有讀寫(xiě)分離的情況下  跳過(guò)sharding路由會(huì)導(dǎo)致routeUnitsSize為0 此時(shí)需要判斷數(shù)據(jù)源數(shù)量是否為1
             * 2. 讀寫(xiě)分離情況下 只會(huì)路由至具體的主庫(kù)或從庫(kù) routeUnitsSize數(shù)量應(yīng)該為1
             */
            if(!(routeUnitsSize == 0 && allInstanceDataSourceNames.size()==1)|| routeUnitsSize>1){
                throw new ShardingSphereException("可以跳過(guò)sharding,但是路由結(jié)果不唯一,SQL= %s ,routeUnits= %s ",sql, routeContext.getRouteResult().getRouteUnits());
            }
            Collection<String> actualDataSourceNames = routeContext.getRouteResult().getActualDataSourceNames();
            // 手動(dòng)創(chuàng)建執(zhí)行單元
            String datasourceName = CollectionUtils.isEmpty(actualDataSourceNames)? allInstanceDataSourceNames.iterator().next():actualDataSourceNames.iterator().next();
            ExecutionUnit executionUnit = new ExecutionUnit(datasourceName, new SQLUnit(sql, clonedParameters));
            result.getExecutionUnits().add(executionUnit);
            //標(biāo)記該結(jié)果需要跳過(guò)
            result.setSkipShardingScenarioFlag(true);
        }
    }else {
        result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));
    }
    if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
        SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
    }
    return result;
}

(4)跳過(guò)合并:跳過(guò)查詢結(jié)果的合并和影響行數(shù)計(jì)算的合并,注意ShardingPreparedStatement和ShardingStatement都需要跳過(guò)

跳過(guò)合并

org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#executeQuery
    public ResultSet executeQuery() throws SQLException {
        ResultSet result;
        try {
            clearPrevious();
            prepare();
            initPreparedStatementExecutor();
            List<QueryResult> queryResults = preparedStatementExecutor.executeQuery();
            List<ResultSet> resultSets = preparedStatementExecutor.getResultSets();
        // 定制開(kāi)發(fā),不分片跳過(guò)合并
            if(executionContext.isSkipShardingScenarioFlag()){
                return CollectionUtils.isNotEmpty(resultSets) ? resultSets.get(0) : null;
            }
            MergedResult mergedResult = mergeQuery(queryResults);
            result = new ShardingResultSet(resultSets, mergedResult, this, executionContext);
        } finally {
            clearBatch();
        }
        currentResultSet = result;
        return result;
    }
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#getResultSet
    public ResultSet getResultSet() throws SQLException {
        if (null != currentResultSet) {
            return currentResultSet;
        }
        List<ResultSet> resultSets = getResultSets();
        // 定制開(kāi)發(fā),不分片跳過(guò)合并
        if(executionContext.isSkipShardingScenarioFlag()){
            return CollectionUtils.isNotEmpty(resultSets) ? resultSets.get(0) : null;
        }
 
        if (executionContext.getSqlStatementContext() instanceof SelectStatementContext || executionContext.getSqlStatementContext().getSqlStatement() instanceof DALStatement) {
            MergedResult mergedResult = mergeQuery(getQueryResults(resultSets));
            currentResultSet = new ShardingResultSet(resultSets, mergedResult, this, executionContext);
        }
        return currentResultSet;
    }
org.apache.shardingsphere.shardingjdbc.jdbc.core.statement.ShardingPreparedStatement#isAccumulate
    public boolean isAccumulate() {
        //定制開(kāi)發(fā),不分片跳過(guò)計(jì)算
        if(executionContext.isSkipShardingScenarioFlag()){
            return false;
        }
        return !connection.getRuntimeContext().getRule().isAllBroadcastTables(executionContext.getSqlStatementContext().getTablesContext().getTableNames());
    }

(5)清空RuleContextManager:查看一下Sharding-JDBC其他ThreadLocal的清空位置,對(duì)應(yīng)的清空RuleContextManager就好。

清空ThreadLocal

org.apache.shardingsphere.shardingjdbc.jdbc.adapter.AbstractConnectionAdapter#close
public final void close() throws SQLException {
        closed = true;
        MasterVisitedManager.clear();
        TransactionTypeHolder.clear();
        RuleContextManager.clear();
        int connectionSize = cachedConnections.size();
        try {
            forceExecuteTemplateForClose.execute(cachedConnections.entries(), cachedConnections -> cachedConnections.getValue().close());
        } finally {
            cachedConnections.clear();
            rootInvokeHook.finish(connectionSize);
        }
    }

舉個(gè)例子,比如Sharding-JDBC本身是不支持INSERT INTO tbl_name (col1, col2, …) SELECT col1, col2, … FROM tbl_name WHERE col3 = ?    這種語(yǔ)法的,會(huì)報(bào)空指針異常。

圖片

經(jīng)過(guò)我們上述改造驗(yàn)證后,非分片表是可以跳過(guò)語(yǔ)法限制執(zhí)行如下的SQL的。

圖片

通過(guò)該功能的實(shí)現(xiàn),業(yè)務(wù)可以更關(guān)注與分片表的SQL改造,而無(wú)需擔(dān)心引入Sharding-JDBC造成所有SQL的驗(yàn)證改造,大幅減少改造成本和風(fēng)險(xiǎn)。

4.2 強(qiáng)制路由主庫(kù)

Sharding-JDBC可以通過(guò)配置主從庫(kù)數(shù)據(jù)源方便的實(shí)現(xiàn)讀寫(xiě)分離的功能,但使用讀寫(xiě)分離就必須面對(duì)主從延遲和從庫(kù)失聯(lián)的痛點(diǎn),針對(duì)這一問(wèn)題,我們實(shí)現(xiàn)了強(qiáng)制路由主庫(kù)的動(dòng)態(tài)配置,當(dāng)主從延遲過(guò)大或從庫(kù)失聯(lián)時(shí),通過(guò)修改配置來(lái)實(shí)現(xiàn)SQL語(yǔ)句強(qiáng)制走主庫(kù)的不停機(jī)路由切換。

后面會(huì)說(shuō)明了配置的動(dòng)態(tài)生效的實(shí)現(xiàn)方式,這里只說(shuō)明強(qiáng)制路由主庫(kù)的實(shí)現(xiàn),我們直接使用前文的RuleContextManager即可,在主從路由引擎里判斷下是否開(kāi)啟了強(qiáng)制主庫(kù)路由。

MasterSlaveRouteDecorator.decorate改造

org.apache.shardingsphere.masterslave.route.engine.MasterSlaveRouteDecorator#decorate
    public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final MasterSlaveRule masterSlaveRule, final ConfigurationProperties properties) {
        /**
         * 如果配置了強(qiáng)制主庫(kù) MasterVisitedManager設(shè)置為true
         * 后續(xù)isMasterRoute中會(huì)保證路由至主庫(kù)
         */
        if(properties.<Boolean>getValue(ConfigurationPropertyKey.MASTER_ROUTE_ONLY)){
            MasterVisitedManager.setMasterVisited();
        }
        //...路由邏輯
        return routeContext;
    }

為了兼容之前跳過(guò)Sharding的功能,我們需要同步修改下isMasterRoute方法,如果是跳過(guò)了Sharding路由需要通過(guò)RuleContextManager來(lái)判斷是否走主庫(kù)。

isMasterRoute改造

org.apache.shardingsphere.masterslave.route.engine.impl.MasterSlaveDataSourceRouter#isMasterRoute
    private boolean isMasterRoute(final SQLStatement sqlStatement) {
        if(sqlStatement instanceof SkipShardingStatement){
            // 優(yōu)先以MasterVisitedManager中的值為準(zhǔn)
            return MasterVisitedManager.isMasterVisited()|| RuleContextManager.isMasterRoute();
        }
        return containsLockSegment(sqlStatement) || !(sqlStatement instanceof SelectStatement) || MasterVisitedManager.isMasterVisited() || HintManager.isMasterRouteOnly();
    }

當(dāng)然,更理想的狀況是通過(guò)監(jiān)控主從同步延遲和數(shù)據(jù)庫(kù)撥測(cè),當(dāng)超過(guò)閾值時(shí)或從庫(kù)失聯(lián)時(shí)直接自動(dòng)修改配置中心的庫(kù),實(shí)現(xiàn)自動(dòng)切換主庫(kù),減少業(yè)務(wù)故障時(shí)間和運(yùn)維壓力。

4.3 配置動(dòng)態(tài)生效

Sharding-JDBC中的ConfigurationPropertyKey中提供了許多配置屬性,而Sharding-JDBCB并沒(méi)有為這些配置提供在線修改的方法,而在實(shí)際的應(yīng)用場(chǎng)景中,像SQL_SHOW這樣控制SQL打印的開(kāi)關(guān)配置,我們更希望能夠在線修改配置值來(lái)控制SQL日志的打印,而不是修改完配置再重啟服務(wù)。

以SQL打印為例,BasePrepareEngine中存在ConfigurationProperties對(duì)象,通過(guò)調(diào)用getValue方法來(lái)獲取SQL_SHOW的值。

SQL 打印

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepare
    /**
     * Prepare to execute.
     *
     * @param sql SQL
     * @param parameters SQL parameters
     * @return execution context
     */
    public ExecutionContext prepare(final String sql, final List<Object> parameters) {
        List<Object> clonedParameters = cloneParameters(parameters);
        RouteContext routeContext = executeRoute(sql, clonedParameters);
        ExecutionContext result = new ExecutionContext(routeContext.getSqlStatementContext());
        result.getExecutionUnits().addAll(executeRewrite(sql, clonedParameters, routeContext));
        //sql打印
        if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
            SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE), result.getSqlStatementContext(), result.getExecutionUnits());
        }
        return result;
    }

ConfigurationProperties繼承了抽象類(lèi)TypedProperties,其getValue方法就是根據(jù)key獲取對(duì)應(yīng)的配置值,因此我們直接在TypedProperties中實(shí)現(xiàn)刷新緩存中的配置值的方法。

TypedProperties刷新配置

public abstract class TypedProperties<E extends Enum & TypedPropertyKey> {
     
    private static final String LINE_SEPARATOR = System.getProperty("line.separator");
     
    @Getter
    private final Properties props;
     
    private final Map<E, TypedPropertyValue> cache;
     
    public TypedProperties(final Class<E> keyClass, final Properties props) {
        this.props = props;
        cache = preload(keyClass);
    }
     
    private Map<E, TypedPropertyValue> preload(final Class<E> keyClass) {
        E[] enumConstants = keyClass.getEnumConstants();
        Map<E, TypedPropertyValue> result = new HashMap<>(enumConstants.length, 1);
        Collection<String> errorMessages = new LinkedList<>();
        for (E each : enumConstants) {
            TypedPropertyValue value = null;
            try {
                value = new TypedPropertyValue(each, props.getOrDefault(each.getKey(), each.getDefaultValue()).toString());
            } catch (final TypedPropertyValueException ex) {
                errorMessages.add(ex.getMessage());
            }
            result.put(each, value);
        }
        if (!errorMessages.isEmpty()) {
            throw new ShardingSphereConfigurationException(Joiner.on(LINE_SEPARATOR).join(errorMessages));
        }
        return result;
    }
     
    /**
     * Get property value.
     *
     * @param key property key
     * @param <T> class type of return value
     * @return property value
     */
    @SuppressWarnings("unchecked")
    public <T> T getValue(final E key) {
        return (T) cache.get(key).getValue();
    }
 
    /**
     * vivo定制改造方法 refresh property value.
     * @param key property key
     * @param value property value
     * @return 更新配置是否成功
     */
    public boolean refreshValue(String key, String value){
        //獲取配置類(lèi)支持的配置項(xiàng)
        E[] enumConstants = targetKeyClass.getEnumConstants();
        for (E each : enumConstants) {
            //遍歷新的值
            if(each.getKey().equals(key)){
                try {
                    //空白value認(rèn)為無(wú)效,取默認(rèn)值
                    if(!StringUtils.isBlank(value)){
                        value = each.getDefaultValue();
                    }
                    //構(gòu)造新屬性
                    TypedPropertyValue typedPropertyValue = new TypedPropertyValue(each, value);
                    //替換緩存
                    cache.put(each, typedPropertyValue);
                    //原始屬性也替換下,有可能會(huì)通過(guò)RuntimeContext直接獲取Properties
                    props.put(key,value);
                    return true;
                } catch (final TypedPropertyValueException ex) {
                    log.error("refreshValue error. key={} , value={}", key, value, ex);
                }
            }
        }
        return false;
    }
}

實(shí)現(xiàn)了刷新方法后,我們還需要將該方法一步步暴露至一個(gè)外部可以調(diào)用的類(lèi)中,以便在服務(wù)監(jiān)聽(tīng)配置的方法中,能夠調(diào)用這個(gè)刷新方法。ConfigurationProperties直接在asePrepareEngine的構(gòu)造函數(shù)中傳入,我們通過(guò)構(gòu)造函數(shù)逐步反推最外層的這一對(duì)象調(diào)用來(lái)源,最終可以定位到在AbstractDataSourceAdapter中的getRuntimeContext()方法中可以獲取到這個(gè)配置,而這個(gè)就是Sharding-JDBC實(shí)現(xiàn)的JDBC中Datasource接口的抽象類(lèi),我們直接在這個(gè)類(lèi)中調(diào)用剛剛實(shí)現(xiàn)的refreshValue方法,剩下的就是監(jiān)聽(tīng)配置,通過(guò)自己實(shí)現(xiàn)的AbstractDataSourceAdapter來(lái)調(diào)用這個(gè)方法就好了。

圖片

通過(guò)這一功能,我們可以方便的控制一些開(kāi)關(guān)屬性的在線修改,如SQL打印、強(qiáng)制路由主庫(kù)等,業(yè)務(wù)無(wú)需重啟服務(wù)即可做到配置的動(dòng)態(tài)生效。

4.4 批量update語(yǔ)法支持

業(yè)務(wù)中存在使用foreach標(biāo)簽來(lái)批量update的語(yǔ)句,這種SQL在Sharding-JDBC中無(wú)法被正確路由,只會(huì)路由第一組參數(shù),后面的無(wú)法被路由改寫(xiě),原因是解析引擎無(wú)法將語(yǔ)句拆分解析。

批量update樣例

<update id="batchUpdate">
        <foreach collectinotallow="orderList" item="item">
               update t_order set
               status = 1,
               updated_by = #{item.updatedBy}
               WHERE created_by = #{item.createdBy};
        </foreach>
    </update>

圖片

圖片

我們通過(guò)將批量update按照;拆分為多個(gè)語(yǔ)句,然后分別路由,最后手動(dòng)匯總路有結(jié)果生成執(zhí)行單元。

為了能正確重寫(xiě)SQL,批量update拆分后的語(yǔ)句需要完全一樣,這樣就不能使用動(dòng)態(tài)拼接set條件,而是使用ifnull語(yǔ)法或者字段值不發(fā)生變化時(shí)也將原來(lái)的值放入set中,只不過(guò)set前后的值保持一致,整體思路與實(shí)現(xiàn)如下。

圖片

prepareBatch實(shí)現(xiàn)

org.apache.shardingsphere.underlying.pluggble.prepare.BasePrepareEngine#prepareBatch
   private ExecutionContext prepareBatch(List<String> splitSqlList, final List<Object> allParameters) {
       //SQL去重
       List<String> sqlList = splitSqlList.stream().distinct().collect(Collectors.toList());
       if (sqlList.size() > 1) {
           throw new ShardingSphereException("不支持多條SQL,請(qǐng)檢查SQL," + sqlList.toString());
       }
       //以第一條SQL為標(biāo)準(zhǔn)
       String sql = sqlList.get(0);
       //所有的執(zhí)行單元
       Collection<ExecutionUnit> globalExecutionUnitList = new ArrayList<>();
       //初始化最后的執(zhí)行結(jié)果
       ExecutionContext executionContextResult = null;
       //根據(jù)所有參數(shù)數(shù)量和SQL語(yǔ)句數(shù)量 計(jì)算每組參數(shù)的數(shù)量
       int eachSqlParameterCount = allParameters.size() / splitSqlList.size();
       //平均分配每條SQL的參數(shù)
       List<List<Object>> eachSqlParameterListList = Lists.partition(allParameters, eachSqlParameterCount);
       for (List<Object> eachSqlParameterList : eachSqlParameterListList) {
           //每條SQL參數(shù)不同 需要根據(jù)參數(shù)路由不同的結(jié)果  實(shí)際的SqlStatementContext 是一致的
           RouteContext routeContext = executeRoute(sql, eachSqlParameterList);
           //由于SQL一樣  實(shí)際的SqlStatementContext 是一致的 只需初始化一次
           if (executionContextResult == null) {
               executionContextResult = new ExecutionContext(routeContext.getSqlStatementContext());
           }
           globalExecutionUnitList.addAll(executeRewrite(sql, eachSqlParameterList, routeContext));
       }
       //排序打印日志
       executionContextResult.getExtendMap().put(EXECUTION_UNIT_LIST, globalExecutionUnitList.stream().sorted(Comparator.comparing(ExecutionUnit::getDataSourceName)).collect(Collectors.toList()));
       if (properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SHOW)) {
           SQLLogger.logSQL(sql, properties.<Boolean>getValue(ConfigurationPropertyKey.SQL_SIMPLE),
                   executionContextResult.getSqlStatementContext(), (Collection<ExecutionUnit>) executionContextResult.getExtendMap().get(EXECUTION_UNIT_LIST));
       }
       return executionContextResult;
   }

這里我們?cè)贓xecutionContext單獨(dú)構(gòu)造了一個(gè)了ExtendMap來(lái)存放ExecutionUnit,原因是ExecutionContext中的executionUnits是HashSet,而判斷ExecutionUnit中的SqlUnit只會(huì)根據(jù)SQL去重,批量update的SQL是一致的,但parameters不同,為了不影響原有的邏輯,單獨(dú)使用了另外的變量來(lái)存放。

ExecutionContext改造

@RequiredArgsConstructor
@Getter
public class ExecutionContext {
 
    private final SQLStatementContext sqlStatementContext;
 
    private final Collection<ExecutionUnit> executionUnits = new LinkedHashSet<>();
 
    /**
     * 自定義擴(kuò)展變量
     */
    private final Map<ExtendEnum,Object> extendMap = new HashMap<>();
 
    /**
     * 定制擴(kuò)展,是否可以跳過(guò)分片邏輯
     */
    @Setter
    private boolean skipShardingScenarioFlag = false;
}
 
@RequiredArgsConstructor
@Getter
@EqualsAndHashCode
@ToString
public final class ExecutionUnit {
     
    private final String dataSourceName;
     
    private final SQLUnit sqlUnit;
}
 
@AllArgsConstructor
@RequiredArgsConstructor
@Getter
@Setter
//根據(jù)SQL判斷是否相等
@EqualsAndHashCode(of = { "sql" })
@ToString
public final class SQLUnit {
 
    private String sql;
 
    private final List<Object> parameters;
 
}

我們還需要改造下執(zhí)行方法,在初始化執(zhí)行器的時(shí)候,判斷下ExtendMap中存在我們自定義的EXECUTION_UNIT_LIST是否存在,存在則使用生成InputGroup,同一個(gè)數(shù)據(jù)源下的ExecutionUnit會(huì)被放入同一個(gè)InputGroup中。

InputGroup改造

org.apache.shardingsphere.shardingjdbc.executor.PreparedStatementExecutor#init
    public void init(final ExecutionContext executionContext) throws SQLException {
        setSqlStatementContext(executionContext.getSqlStatementContext());
        //兼容批量update 分庫(kù)分表后同一張表的情況 判斷是否存在EXECUTION_UNIT_LIST 存在則使用未去重的List進(jìn)行后續(xù)的操作
        if (MapUtils.isNotEmpty(executionContext.getExtendMap())){
            Collection<ExecutionUnit> executionUnitCollection = (Collection<ExecutionUnit>) executionContext.getExtendMap().get(EXECUTION_UNIT_LIST);
            if(CollectionUtils.isNotEmpty(executionUnitCollection)){
                getInputGroups().addAll(obtainExecuteGroups(executionUnitCollection));
            }
        }else {
            getInputGroups().addAll(obtainExecuteGroups(executionContext.getExecutionUnits()));
        }
        cacheStatements();
    }

改造完成后,批量update中的每條SQL都可以被正確路由執(zhí)行。

圖片

4.5 ShardingCondition去重

當(dāng)where語(yǔ)句包括多個(gè)or條件時(shí),而or條件不包含分片鍵時(shí),會(huì)造成createShardingConditions方法生成重復(fù)的分片條件,導(dǎo)致重復(fù)調(diào)用doSharding方法。

SELECT * FROM t_order  WHERE created_by = ? and (   (status = ?) or  (status = ?) or  (status = ?) )這種SQL,存在三個(gè)or條件,分片鍵是created_by ,實(shí)際產(chǎn)生的shardingCondition會(huì)是三個(gè)一樣的值,并會(huì)調(diào)用三次doSharding的方法。雖然實(shí)際執(zhí)行還是只有一次(批量update那里說(shuō)明過(guò)執(zhí)行單元會(huì)去重),但為了減少方法的重復(fù)調(diào)用,我們還是對(duì)這里做了一次去重。

圖片

圖片

去重的方法也比較簡(jiǎn)單粗暴,我們對(duì)ListRouteValue和RangeRouteValue添加了@EqualsAndHashCode注解,然后在WhereClauseShardingConditionEngine的createShardingConditions方法返回最終結(jié)果前加一次去重,從而避免生成重復(fù)的shardingCondition造成doSharding方法的重復(fù)調(diào)用。

createShardingConditions去重

org.apache.shardingsphere.sharding.route.engine.condition.engine.WhereClauseShardingConditionEngine#createShardingConditions
    private Collection<ShardingCondition> createShardingConditions(final SQLStatementContext sqlStatementContext, final Collection<AndPredicate> andPredicates, final List<Object> parameters) {
        Collection<ShardingCondition> result = new LinkedList<>();
        for (AndPredicate each : andPredicates) {
            Map<Column, Collection<RouteValue>> routeValueMap = createRouteValueMap(sqlStatementContext, each, parameters);
            if (routeValueMap.isEmpty()) {
                return Collections.emptyList();
            }
            result.add(createShardingCondition(routeValueMap));
        }
        //去重
        Collection<ShardingCondition> distinctResult = result.stream().distinct().collect(Collectors.toCollection(LinkedList::new));
        return distinctResult;
    }
4.6  全路由校驗(yàn)

分片表的SQL中如果沒(méi)有攜帶分片鍵(或者帶上了分片鍵結(jié)果沒(méi)有被正確解析)將會(huì)導(dǎo)致全路由,產(chǎn)生性能問(wèn)題,而這種SQL并不會(huì)報(bào)錯(cuò),這就導(dǎo)致在實(shí)際的業(yè)務(wù)改造中,開(kāi)發(fā)和測(cè)試很難保證百分百改造徹底。為此,我們?cè)谠创a層面對(duì)這種情況做了額外的校驗(yàn),當(dāng)產(chǎn)生全路由,也就是ShardingConditions為空時(shí),主動(dòng)拋出異常,從而方便開(kāi)發(fā)和測(cè)試能夠快速發(fā)現(xiàn)全路由SQL。

實(shí)現(xiàn)方式也比較簡(jiǎn)單,校驗(yàn)下ShardingConditions是否為空即可,只不過(guò)需要額外兼容下Hint策略ShardingConditions始終為空的特殊情況。

全路由校驗(yàn)

org.apache.shardingsphere.sharding.route.engine.ShardingRouteDecorator#decorate
public RouteContext decorate(final RouteContext routeContext, final ShardingSphereMetaData metaData, final ShardingRule shardingRule, final ConfigurationProperties properties) {
        //省略...
        //獲取 ShardingConditions
        ShardingConditions shardingConditions = getShardingConditions(parameters, sqlStatementContext, metaData.getSchema(), shardingRule);
        boolean hintAlgorithm = isHintAlgorithm(sqlStatementContext, shardingRule);
        //判斷是否允許全路由
        if (!properties.<Boolean>getValue(ConfigurationPropertyKey.ALLOW_EMPTY_SHARDING_CONDITIONS)) {
            //如果不是Hint算法
            if(!isHintAlgorithm(sqlStatementContext, shardingRule)){
                /** 如果是DML語(yǔ)句  則可能有兩種情況 這兩種情況是根據(jù)getShardingConditions方法的內(nèi)部邏輯而來(lái)的
                 *  一種是非插入語(yǔ)句  shardingConditions.getConditions()為空即可
                 *  一種是插入語(yǔ)句 插入語(yǔ)句shardingConditions.getConditions()不會(huì)為空  但是ShardingCondition的routeValues是空的
                 */
                if (sqlStatementContext.getSqlStatement() instanceof DMLStatement) {
                    if(shardingConditions.getConditions().isEmpty()) {
                        throw new ShardingSphereException("SQL不包含分庫(kù)分表鍵,請(qǐng)檢查SQL");
                    }else {
                        if (sqlStatementContext instanceof InsertStatementContext) {
                            List<ShardingCondition> routeValuesNotEmpty = shardingConditions.getConditions().stream().filter(r -> CollectionUtils.isNotEmpty(r.getRouteValues())).collect(Collectors.toList());
                            if(CollectionUtils.isEmpty(routeValuesNotEmpty)){
                                throw new ShardingSphereException("SQL不包含分庫(kù)分表鍵,請(qǐng)檢查SQL");
                            }
                        }
                    }
                }
            }
        }
        boolean needMergeShardingValues = isNeedMergeShardingValues(sqlStatementContext, shardingRule);
        //省略...
        return new RouteContext(sqlStatementContext, parameters, routeResult);
    }
 
private boolean isHintAlgorithm(final SQLStatementContext sqlStatementContext, final ShardingRule shardingRule) {
        // 場(chǎng)景a 全局默認(rèn)策略是否使用強(qiáng)制路由策略
        if(shardingRule.getDefaultDatabaseShardingStrategy() instanceof HintShardingStrategy
                || shardingRule.getDefaultTableShardingStrategy() instanceof HintShardingStrategy){
            return true;
        }
        for (String each : sqlStatementContext.getTablesContext().getTableNames()) {
            Optional<TableRule> tableRule = shardingRule.findTableRule(each);
            //場(chǎng)景b 指定表是否使用強(qiáng)制路由策略
            if (tableRule.isPresent() && (shardingRule.getDatabaseShardingStrategy(tableRule.get()) instanceof HintShardingStrategy
                    || shardingRule.getTableShardingStrategy(tableRule.get()) instanceof HintShardingStrategy)) {
                return true;
            }
        }
        return false;
    }

當(dāng)然這塊功能也可以在完善些,比如對(duì)分片路由結(jié)果中的數(shù)據(jù)源數(shù)量進(jìn)行校驗(yàn),從而避免跨庫(kù)操作,我們這邊沒(méi)有實(shí)現(xiàn)也就不再贅述了。

4.7 組件封裝

業(yè)務(wù)接入Sharding-JDBC的步驟是一樣的,都需要通過(guò)Java創(chuàng)建數(shù)據(jù)源和配置對(duì)象或者使用SpringBoot進(jìn)行配置,存在一定的熟悉成本和重復(fù)開(kāi)發(fā)的問(wèn)題,為此我們也對(duì)定制開(kāi)發(fā)版本的Sharding-JDBC封裝了一個(gè)公共組件,從而簡(jiǎn)化業(yè)務(wù)配置,減少重復(fù)開(kāi)發(fā),提升業(yè)務(wù)的開(kāi)發(fā)效率,具體功能可見(jiàn)下。這塊沒(méi)有涉及源碼的改造,只是在定制版本上包裝的一個(gè)公共組件。

  • 提供了默認(rèn)的數(shù)據(jù)源與連接池配置
  • 簡(jiǎn)化分庫(kù)分表配置,業(yè)務(wù)配置邏輯表名和后綴,組件拼裝行表達(dá)式和actual-data-nodes
  • 封裝常用的分片算法(時(shí)間、業(yè)務(wù)字段值等),
  • 統(tǒng)一的配置監(jiān)聽(tīng)與動(dòng)態(tài)修改(SQL打印、強(qiáng)制主從切換等)

開(kāi)源Sharding-JDBC配置

//數(shù)據(jù)源名稱(chēng)
spring.shardingsphere.datasource.names=ds0,ds1
//ds0配置
spring.shardingsphere.datasource.ds0.type=org.apache.commons.dbcp.BasicDataSource
spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds0.url=jdbc:mysql://localhost:3306/ds0
spring.shardingsphere.datasource.ds0.username=root
spring.shardingsphere.datasource.ds0.password=
//ds1配置
spring.shardingsphere.datasource.ds1.type=org.apache.commons.dbcp.BasicDataSource
spring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.jdbc.Driver
spring.shardingsphere.datasource.ds1.url=jdbc:mysql://localhost:3306/ds1
spring.shardingsphere.datasource.ds1.username=root
spring.shardingsphere.datasource.ds1.password=
//分表規(guī)則
spring.shardingsphere.sharding.tables.t_order.actual-data-nodes=ds$->{0..1}.t_order$->{0..1}
spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.sharding-column=order_id
spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.algorithm-expressinotallow=t_order$->{order_id % 2}
spring.shardingsphere.sharding.tables.t_order_item.actual-data-nodes=ds$->{0..1}.t_order_item$->{0..1}
spring.shardingsphere.sharding.tables.t_order_item.table-strategy.inline.sharding-column=order_id
spring.shardingsphere.sharding.tables.t_order_item.table-strategy.inline.algorithm-expressinotallow=t_order_item$->{order_id % 2}
//默認(rèn)分庫(kù)規(guī)則
spring.shardingsphere.sharding.default-database-strategy.inline.sharding-column=user_id
spring.shardingsphere.sharding.default-database-strategy.inline.algorithm-expressinotallow=ds$->{user_id % 2}

組件簡(jiǎn)化配置

//數(shù)據(jù)源名稱(chēng)
vivo.it.sharding.datasource.names = ds0,ds1
//ds0配置
vivo.it.sharding.datasource.ds0.url = jdbc:mysql://localhost:3306/ds1
vivo.it.sharding.datasource.ds0.username = root
vivo.it.sharding.datasource.ds0.password =
//ds1配置
vivo.it.sharding.datasource.ds1.url = jdbc:mysql://localhost:3306/ds1
vivo.it.sharding.datasource.ds1.username = root
vivo.it.sharding.datasource.ds1.password =
//分表規(guī)則
vivo.it.sharding.table.rule.config = [{"logicTable":"t_order,t_order_item","tableRange":"0..1","shardingColumn":"order_id ","algorithmExpression":"order_id %2"}]
//默認(rèn)分庫(kù)規(guī)則
vivo.it.sharding.default.db.rule.config = {"shardingColumn":"user_id","algorithmExpression":"user_id %2"}

五、使用建議

結(jié)合官方文檔和業(yè)務(wù)實(shí)踐經(jīng)驗(yàn),我們也梳理了部分使用Sharding-JDBC的建議供大家參考,實(shí)際具體如何優(yōu)化SQL寫(xiě)法(比如子查詢、分頁(yè)、分組排序等)還需要結(jié)合業(yè)務(wù)的實(shí)際場(chǎng)景來(lái)進(jìn)行測(cè)試和調(diào)優(yōu)。

(1)強(qiáng)制等級(jí)

建議①:涉及分片表的SQL必須攜帶分片鍵

原因:無(wú)分片鍵會(huì)導(dǎo)致全路由,存在嚴(yán)重的性能隱患

建議②:禁止一條SQL中的分片值路由至不同的庫(kù)

原因:跨庫(kù)操作存在嚴(yán)重的性能隱患,事務(wù)操作會(huì)升級(jí)為分布式事務(wù),增加業(yè)務(wù)復(fù)雜度

建議③:禁止對(duì)分片鍵使用運(yùn)算表達(dá)式或函數(shù)操作

原因:無(wú)法提前計(jì)算表達(dá)式和函數(shù)獲取分片值,導(dǎo)致全路由

說(shuō)明:詳見(jiàn)官方文檔

圖片

建議④:禁止在子查詢中使用分片表

原因:無(wú)法正常解析子查詢中的分片表,導(dǎo)致業(yè)務(wù)錯(cuò)誤

說(shuō)明:雖然官方文檔中說(shuō)有限支持子查詢 ,但在實(shí)際的使用中發(fā)現(xiàn)4.1.1并不支持子查詢,可見(jiàn)官方issue6164 | issue 6228。

圖片

建議⑤:包含CASE WHEN、HAVING、UNION (ALL)語(yǔ)法的分片SQL,不支持路由至多數(shù)據(jù)節(jié)點(diǎn)

說(shuō)明:詳見(jiàn)官方文檔

(2)建議等級(jí)

① 建議使用分布式id來(lái)保證分片表主鍵的全局唯一性

原因:方便判斷數(shù)據(jù)的唯一性和后續(xù)的遷移擴(kuò)容

說(shuō)明:詳見(jiàn)文章《vivo 自研魯班分布式 ID 服務(wù)實(shí)踐》

② 建議跨多表的分組SQL的分組字段與排序字段保證一致

原因:分組和排序字段不一致只能通過(guò)內(nèi)存合并,大數(shù)據(jù)量時(shí)存在性能隱患

說(shuō)明:詳見(jiàn)官方文檔

③ 建議通過(guò)全局遞增的分布式id來(lái)優(yōu)化分頁(yè)查詢

原因:Sharding-JDBC的分頁(yè)優(yōu)化側(cè)重于結(jié)果集的流式合并來(lái)避免內(nèi)存爆漲,但深度分頁(yè)自身的性能問(wèn)題并不能解決

說(shuō)明:詳見(jiàn)官方文檔

六、總結(jié)

本文結(jié)合個(gè)人理解梳理了各個(gè)引擎的源碼入口和關(guān)鍵邏輯,讀者可以結(jié)合本文和官方文檔更好的定位理解Sharding-JDBC的源碼實(shí)現(xiàn)。定制開(kāi)發(fā)的目的是為了降低業(yè)務(wù)接入成本,盡可能減少業(yè)務(wù)存量SQL的改造,部分改造思想其實(shí)與官方社區(qū)也存在差異,比如跳過(guò)語(yǔ)法解析,官方社區(qū)致力于通過(guò)優(yōu)化解析引擎來(lái)適配各種語(yǔ)法,而不是跳過(guò)解析階段,可參考官方issue。源碼分析和定制改造只涉及了Sharding-JDBC的數(shù)據(jù)分片和讀寫(xiě)分離功能,定制開(kāi)發(fā)的功能也在生產(chǎn)環(huán)境經(jīng)過(guò)了考驗(yàn),如有不足和優(yōu)化建議,也歡迎大家批評(píng)指正。

責(zé)任編輯:龐桂玉 來(lái)源: vivo互聯(lián)網(wǎng)技術(shù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)