數(shù)據(jù)庫中間件 MyCAT源碼分析 —— 跨庫兩表Join
1. 概述
MyCAT 支持跨庫表 Join,目前版本僅支持跨庫兩表 Join。雖然如此,已經(jīng)能夠滿足我們大部分的業(yè)務(wù)場景。況且,Join 過多的表可能帶來的性能問題也是很麻煩的。
本文主要分享:
- 整體流程、調(diào)用順序圖
- 核心代碼的分析
前置閱讀:《MyCAT 源碼分析 —— 【單庫單表】查詢》。
OK,Let's Go。
2. 主流程
當(dāng)執(zhí)行跨庫兩表 Join SQL 時(shí),經(jīng)歷的大體流程如下:
SQL 上,需要添加注解 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ ${SQL} 。RouteService#route(...) 解析注解 mycat:catlet 后,路由給 HintCatletHandler 作進(jìn)一步處理。
HintCatletHandler 獲取注解對應(yīng)的 Catlet 實(shí)現(xiàn)類,io.mycat.catlets.ShareJoin 就是其中一種實(shí)現(xiàn)(目前也只有這一種實(shí)現(xiàn)),提供了跨庫兩表 Join 的功能。從類命名上看,ShareJoin 很大可能性后續(xù)會(huì)提供完整的跨庫多表的 Join 功能。
核心代碼如下:
- // HintCatletHandler.java
- public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema,
- int sqlType, String realSQL, String charset, ServerConnection sc,
- LayerCachePool cachePool, String hintSQLValue, int hintSqlType, Map hintMap)
- throws SQLNonTransientException {
- String cateletClass = hintSQLValue;
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("load catelet class:" + hintSQLValue + " to run sql " + realSQL);
- }
- try {
- Catlet catlet = (Catlet) MycatServer.getInstance().getCatletClassLoader().getInstanceofClass(cateletClass);
- catlet.route(sysConfig, schema, sqlType, realSQL, charset, sc, cachePool);
- catlet.processSQL(realSQL, new EngineCtx(sc.getSession2()));
- } catch (Exception e) {
- LOGGER.warn("catlet error " + e);
- throw new SQLNonTransientException(e);
- }
- return null;
- }
3. ShareJoin
目前支持跨庫兩表 Join。ShareJoin 將 SQL 拆分成左表 SQL 和 右表 SQL,發(fā)送給各數(shù)據(jù)節(jié)點(diǎn)執(zhí)行,匯總數(shù)據(jù)結(jié)果進(jìn)行合后返回。
偽代碼如下:
- // SELECT u.id, o.id FROM t_order o
- // INNER JOIN t_user u ON o.uid = u.id
- // 【順序】查詢左表
- String leftSQL = "SELECT o.id, u.id FROM t_order o";
- List leftList = dn[0].select(leftSQL) + dn[1].select(leftSQL) + ... + dn[n].select(leftsql);
- // 【并行】查詢右表
- String rightSQL = "SELECT u.id FROM t_user u WHERE u.id IN (${leftList.uid})";
- for (dn : dns) { // 此處是并行執(zhí)行,使用回調(diào)邏輯
- for (rightRecord : dn.select(rightSQL)) { // 查詢右表
- // 合并結(jié)果
- for (leftRecord : leftList) {
- if (leftRecord.uid == rightRecord.id) {
- write(leftRecord + leftRecord.uid 拼接結(jié)果);
- }
- }
- }
- }
實(shí)際情況會(huì)更加復(fù)雜,我們接下來一點(diǎn)點(diǎn)往下看。
3.1 JoinParser
JoinParser 負(fù)責(zé)對 SQL 進(jìn)行解析。整體流程如下:
舉個(gè)例子,/*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 解析后,TableFilter 結(jié)果如下:
- tName :表名
- tAlia :表自定義命名
- where :過濾條件
- order :排序條件
- parenTable :左連接的 Join 的表名。t_user表 在 join屬性 的 parenTable 為 "o",即 t_order。
- joinParentkey :左連接的 Join 字段
- joinKey :join 字段。t_user表 在 join屬性 為 id。
- join :子 tableFilter。即,該表連接的右邊的表。
- parent :和 join屬性 相對。
看到此處,大家可能有疑問,為什么要把 SQL 解析成 TableFilter。JoinParser 根據(jù) TableFilter 生成數(shù)據(jù)節(jié)點(diǎn)執(zhí)行 SQL。代碼如下:
- // TableFilter.java
- public String getSQL() {
- String sql = "";
- // fields
- for (Entry<String, String> entry : fieldAliasMap.entrySet()) {
- String key = entry.getKey();
- String val = entry.getValue();
- if (val == null) {
- sql = unionsql(sql, getFieldfrom(key), ",");
- } else {
- sql = unionsql(sql, getFieldfrom(key) + " as " + val, ",");
- }
- }
- // where
- if (parent == null) { // on/where 等于號左邊的表
- String parentJoinKey = getJoinKey(true);
- // fix sharejoin bug:
- // (AbstractConnection.java:458) -close connection,reason:program err:java.lang.IndexOutOfBoundsException:
- // 原因是左表的select列沒有包含 join 列,在獲取結(jié)果時(shí)報(bào)上面的錯(cuò)誤
- if (sql != null && parentJoinKey != null &&
- !sql.toUpperCase().contains(parentJoinKey.trim().toUpperCase())) {
- sql += ", " + parentJoinKey;
- }
- sql = "select " + sql + " from " + tName;
- if (!(where.trim().equals(""))) {
- sql += " where " + where.trim();
- }
- } else { // on/where 等于號右邊邊的表
- if (allField) {
- sql = "select " + sql + " from " + tName;
- } else {
- sql = unionField("select " + joinKey, sql, ",");
- sql = sql + " from " + tName;
- //sql="select "+joinKey+","+sql+" from "+tName;
- }
- if (!(where.trim().equals(""))) {
- sql += " where " + where.trim() + " and (" + joinKey + " in %s )";
- } else {
- sql += " where " + joinKey + " in %s ";
- }
- }
- // order
- if (!(order.trim().equals(""))) {
- sql += " order by " + order.trim();
- }
- // limit
- if (parent == null) {
- if ((rowCount > 0) && (offset > 0)) {
- sql += " limit" + offset + "," + rowCount;
- } else {
- if (rowCount > 0) {
- sql += " limit " + rowCount;
- }
- }
- }
- return sql;
- }
- 當(dāng) parent 為空時(shí),即on/where 等于號左邊的表。例如:select id, uid from t_order。
- 當(dāng) parent 不為空時(shí),即on/where 等于號右邊的表。例如:select id, username from t_user where id in (1, 2, 3)。
3.2 ShareJoin.processSQL(...)
當(dāng) SQL 解析完后,生成左邊的表執(zhí)行的 SQL,發(fā)送給對應(yīng)的數(shù)據(jù)節(jié)點(diǎn)查詢數(shù)據(jù)。大體流程如下:
當(dāng) SQL 為 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 時(shí), sql = getSql() 的返回結(jié)果為 select id, uid from t_order。
生成左邊的表執(zhí)行的 SQL 后,順序順序順序發(fā)送給對應(yīng)的數(shù)據(jù)節(jié)點(diǎn)查詢數(shù)據(jù)。具體順序查詢是怎么實(shí)現(xiàn)的,我們來看下章 BatchSQLJob。
3.3 BatchSQLJob
EngineCtx 對 BatchSQLJob 封裝,提供上層兩個(gè)方法:
- executeNativeSQLSequnceJob :順序(非并發(fā))在每個(gè)數(shù)據(jù)節(jié)點(diǎn)執(zhí)行SQL任務(wù)
- executeNativeSQLParallJob :并發(fā)在每個(gè)數(shù)據(jù)節(jié)點(diǎn)執(zhí)行SQL任務(wù)
核心代碼如下:
- // EngineCtx.java
- public void executeNativeSQLSequnceJob(String[] dataNodes, String sql,
- SQLJobHandler jobHandler) {
- for (String dataNode : dataNodes) {
- SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,
- jobHandler, this);
- bachJob.addJob(job, false);
- }
- }
- public void executeNativeSQLParallJob(String[] dataNodes, String sql,
- SQLJobHandler jobHandler) {
- for (String dataNode : dataNodes) {
- SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode,
- jobHandler, this);
- bachJob.addJob(job, true);
- }
- }
BatchSQLJob 通過執(zhí)行中任務(wù)列表、待執(zhí)行任務(wù)列表來實(shí)現(xiàn)順序/并發(fā)執(zhí)行任務(wù)。核心代碼如下:
- // BatchSQLJob.java
- /**
- * 執(zhí)行中任務(wù)列表
- */
- private ConcurrentHashMap<Integer, SQLJob> runningJobs = new ConcurrentHashMap<Integer, SQLJob>();
- /**
- * 待執(zhí)行任務(wù)列表
- */
- private ConcurrentLinkedQueue<SQLJob> waitingJobs = new ConcurrentLinkedQueue<SQLJob>();
- public void addJob(SQLJob newJob, boolean parallExecute) {
- if (parallExecute) {
- runJob(newJob);
- } else {
- waitingJobs.offer(newJob);
- if (runningJobs.isEmpty()) { // 若無正在執(zhí)行中的任務(wù),則從等待隊(duì)列里獲取任務(wù)進(jìn)行執(zhí)行。
- SQLJob job = waitingJobs.poll();
- if (job != null) {
- runJob(job);
- }
- }
- }
- }
- public boolean jobFinished(SQLJob sqlJob) {
- runningJobs.remove(sqlJob.getId());
- SQLJob job = waitingJobs.poll();
- if (job != null) {
- runJob(job);
- return false;
- } else {
- if (noMoreJobInput) {
- return runningJobs.isEmpty() && waitingJobs.isEmpty();
- } else {
- return false;
- }
- }
- }
- 順序執(zhí)行時(shí),當(dāng) runningJobs 存在執(zhí)行中的任務(wù)時(shí),#addJob(...) 時(shí),不立即執(zhí)行,添加到 waitingJobs。當(dāng) SQLJob 完成時(shí),順序調(diào)用下一個(gè)任務(wù)。
- 并發(fā)執(zhí)行時(shí),#addJob(...) 時(shí),立即執(zhí)行。
SQLJob SQL 異步執(zhí)行任務(wù)。其 jobHandler(SQLJobHandler) 屬性,在 SQL 執(zhí)行有返回結(jié)果時(shí),會(huì)進(jìn)行回調(diào),從而實(shí)現(xiàn)異步執(zhí)行。
在 ShareJoin 里,SQLJobHandler 有兩個(gè)實(shí)現(xiàn):ShareDBJoinHandler、ShareRowOutPutDataHandler。前者,左邊的表執(zhí)行的 SQL 回調(diào);后者,右邊的表執(zhí)行的 SQL 回調(diào)。
3.4 ShareDBJoinHandler
ShareDBJoinHandler,左邊的表執(zhí)行的 SQL 回調(diào)。流程如下:
- #fieldEofResponse(...) :接收數(shù)據(jù)節(jié)點(diǎn)返回的 fields,放入內(nèi)存。
- #rowResponse(...) :接收數(shù)據(jù)節(jié)點(diǎn)返回的 row,放入內(nèi)存。
- #rowEofResponse(...) :接收完一個(gè)數(shù)據(jù)節(jié)點(diǎn)返回所有的 row。當(dāng)所有數(shù)據(jù)節(jié)點(diǎn)都完成 SQL 執(zhí)行時(shí),提交右邊的表執(zhí)行的 SQL 任務(wù),并行執(zhí)行,即圖中#createQryJob(...)。
當(dāng) SQL 為 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 時(shí), sql = getChildSQL() 的返回結(jié)果為 select id, username from t_user where id in (1, 2, 3)。
核心代碼如下:
- // ShareJoin.java
- private void createQryJob(int batchSize) {
- int count = 0;
- Map<String, byte[]> batchRows = new ConcurrentHashMap<String, byte[]>();
- String theId = null;
- StringBuilder sb = new StringBuilder().append('(');
- String svalue = "";
- for (Map.Entry<String, String> e : ids.entrySet()) {
- theId = e.getKey();
- byte[] rowbyte = rows.remove(theId);
- if (rowbyte != null) {
- batchRows.put(theId, rowbyte);
- }
- if (!svalue.equals(e.getValue())) {
- if (joinKeyType == Fields.FIELD_TYPE_VAR_STRING
- || joinKeyType == Fields.FIELD_TYPE_STRING) { // joinkey 為varchar
- sb.append("'").append(e.getValue()).append("'").append(','); // ('digdeep','yuanfang')
- } else { // 默認(rèn)joinkey為int/long
- sb.append(e.getValue()).append(','); // (1,2,3)
- }
- }
- svalue = e.getValue();
- if (count++ > batchSize) {
- break;
- }
- }
- if (count == 0) {
- return;
- }
- jointTableIsData = true;
- sb.deleteCharAt(sb.length() - 1).append(')');
- String sql = String.format(joinParser.getChildSQL(), sb);
- getRoute(sql);
- ctx.executeNativeSQLParallJob(getDataNodes(), sql, new ShareRowOutPutDataHandler(this, fields, joinindex, joinParser.getJoinRkey(), batchRows, ctx.getSession()));
- }
3.5 ShareRowOutPutDataHandler
ShareRowOutPutDataHandler,右邊的表執(zhí)行的 SQL 回調(diào)。流程如下:
- #fieldEofResponse(...) :接收數(shù)據(jù)節(jié)點(diǎn)返回的 fields,返回 header 給 MySQL Client。
- #rowResponse(...) :接收數(shù)據(jù)節(jié)點(diǎn)返回的 row,匹配左表的記錄,返回合并后返回的 row 給 MySQL Client。
- #rowEofResponse(...) :當(dāng)所有 row 都返回完后,返回 eof 給 MySQL Client。
核心代碼如下:
- // ShareRowOutPutDataHandler.java
- public boolean onRowData(String dataNode, byte[] rowData) {
- RowDataPacket rowDataPkgold = ResultSetUtil.parseRowData(rowData, bfields);
- //拷貝一份batchRows
- Map<String, byte[]> batchRowsCopy = new ConcurrentHashMap<String, byte[]>();
- batchRowsCopy.putAll(arows);
- // 獲取Id字段,
- String id = ByteUtil.getString(rowDataPkgold.fieldValues.get(joinR));
- // 查找ID對應(yīng)的A表的記錄
- byte[] arow = getRow(batchRowsCopy, id, joinL);
- while (arow != null) {
- RowDataPacket rowDataPkg = ResultSetUtil.parseRowData(arow, afields);//ctx.getAllFields());
- for (int i = 1; i < rowDataPkgold.fieldCount; i++) {
- // 設(shè)置b.name 字段
- byte[] bname = rowDataPkgold.fieldValues.get(i);
- rowDataPkg.add(bname);
- rowDataPkg.addFieldCount(1);
- }
- // huangyiming add
- MiddlerResultHandler middlerResultHandler = session.getMiddlerResultHandler();
- if (null == middlerResultHandler) {
- ctx.writeRow(rowDataPkg);
- } else {
- if (middlerResultHandler instanceof MiddlerQueryResultHandler) {
- byte[] columnData = rowDataPkg.fieldValues.get(0);
- if (columnData != null && columnData.length > 0) {
- String rowValue = new String(columnData);
- middlerResultHandler.add(rowValue);
- }
- //}
- }
- }
- arow = getRow(batchRowsCopy, id, joinL);
- }
- return false;
- }
4. 彩蛋
如下是本文涉及到的核心類,有興趣的同學(xué)可以翻一翻。
ShareJoin 另外不支持的功能:
- 只支持 inner join,不支持 left join、right join 等等連接。
- 不支持 order by。
- 不支持 group by 以及 相關(guān)聚合函數(shù)。
- 即使 join 左表的字段未聲明為返回 fields 也會(huì)返回。
恩,MyCAT 弱XA 源碼繼續(xù)走起!