自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

數(shù)據(jù)庫中間件 MyCAT源碼分析 —— 跨庫兩表Join

數(shù)據(jù)庫
MyCAT 支持跨庫表 Join,目前版本僅支持跨庫兩表 Join。雖然如此,已經(jīng)能夠滿足我們大部分的業(yè)務(wù)場景。況且,Join 過多的表可能帶來的性能問題也是很麻煩的。

1. 概述

MyCAT 支持跨庫表 Join,目前版本僅支持跨庫兩表 Join。雖然如此,已經(jīng)能夠滿足我們大部分的業(yè)務(wù)場景。況且,Join 過多的表可能帶來的性能問題也是很麻煩的。

本文主要分享:

  1. 整體流程、調(diào)用順序圖
  2. 核心代碼的分析

前置閱讀:《MyCAT 源碼分析 —— 【單庫單表】查詢》。

OK,Let's Go。

2. 主流程

當(dāng)執(zhí)行跨庫兩表 Join SQL 時(shí),經(jīng)歷的大體流程如下:

 

SQL 上,需要添加注解 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ ${SQL} 。RouteService#route(...) 解析注解 mycat:catlet 后,路由給 HintCatletHandler 作進(jìn)一步處理。

HintCatletHandler 獲取注解對應(yīng)的 Catlet 實(shí)現(xiàn)類,io.mycat.catlets.ShareJoin 就是其中一種實(shí)現(xiàn)(目前也只有這一種實(shí)現(xiàn)),提供了跨庫兩表 Join 的功能。從類命名上看,ShareJoin 很大可能性后續(xù)會(huì)提供完整的跨庫多表的 Join 功能。

核心代碼如下:

  1. // HintCatletHandler.java 
  2. public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema
  3.                            int sqlType, String realSQL, String charset, ServerConnection sc, 
  4.                            LayerCachePool cachePool, String hintSQLValue, int hintSqlType, Map hintMap) 
  5.        throws SQLNonTransientException { 
  6.    String cateletClass = hintSQLValue; 
  7.    if (LOGGER.isDebugEnabled()) { 
  8.        LOGGER.debug("load catelet class:" + hintSQLValue + " to run sql " + realSQL); 
  9.    } 
  10.    try { 
  11.        Catlet catlet = (Catlet) MycatServer.getInstance().getCatletClassLoader().getInstanceofClass(cateletClass); 
  12.        catlet.route(sysConfig, schema, sqlType, realSQL, charset, sc, cachePool); 
  13.        catlet.processSQL(realSQL, new EngineCtx(sc.getSession2())); 
  14.    } catch (Exception e) { 
  15.        LOGGER.warn("catlet error " + e); 
  16.        throw new SQLNonTransientException(e); 
  17.    } 
  18.    return null
  19.  

3. ShareJoin

目前支持跨庫兩表 Join。ShareJoin 將 SQL 拆分成左表 SQL 和 右表 SQL,發(fā)送給各數(shù)據(jù)節(jié)點(diǎn)執(zhí)行,匯總數(shù)據(jù)結(jié)果進(jìn)行合后返回。

偽代碼如下:

  1. // SELECT u.id, o.id FROM t_order o  
  2. // INNER JOIN t_user u ON o.uid = u.id 
  3. // 【順序】查詢左表 
  4. String leftSQL = "SELECT o.id, u.id FROM t_order o"
  5. List leftList = dn[0].select(leftSQL) + dn[1].select(leftSQL) + ... + dn[n].select(leftsql); 
  6. // 【并行】查詢右表 
  7. String rightSQL = "SELECT u.id FROM t_user u WHERE u.id IN (${leftList.uid})"
  8. for (dn : dns) { // 此處是并行執(zhí)行,使用回調(diào)邏輯 
  9.     for (rightRecord : dn.select(rightSQL)) { // 查詢右表 
  10.         // 合并結(jié)果 
  11.         for (leftRecord : leftList) { 
  12.             if (leftRecord.uid == rightRecord.id) { 
  13.                 write(leftRecord + leftRecord.uid 拼接結(jié)果); 
  14.             } 
  15.         } 
  16.     } 
  17.  

實(shí)際情況會(huì)更加復(fù)雜,我們接下來一點(diǎn)點(diǎn)往下看。

3.1 JoinParser

JoinParser 負(fù)責(zé)對 SQL 進(jìn)行解析。整體流程如下:

 

舉個(gè)例子,/*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 解析后,TableFilter 結(jié)果如下:

 

  • tName :表名
  • tAlia :表自定義命名
  • where :過濾條件
  • order :排序條件
  • parenTable :左連接的 Join 的表名。t_user表 在 join屬性 的 parenTable 為 "o",即 t_order。
  • joinParentkey :左連接的 Join 字段
  • joinKey :join 字段。t_user表 在 join屬性 為 id。
  • join :子 tableFilter。即,該表連接的右邊的表。
  • parent :和 join屬性 相對。

看到此處,大家可能有疑問,為什么要把 SQL 解析成 TableFilter。JoinParser 根據(jù) TableFilter 生成數(shù)據(jù)節(jié)點(diǎn)執(zhí)行 SQL。代碼如下:

  1. // TableFilter.java 
  2. public String getSQL() { 
  3.    String sql = ""
  4.    // fields 
  5.    for (Entry<String, String> entry : fieldAliasMap.entrySet()) { 
  6.        String key = entry.getKey(); 
  7.        String val = entry.getValue(); 
  8.        if (val == null) { 
  9.            sql = unionsql(sql, getFieldfrom(key), ","); 
  10.        } else { 
  11.            sql = unionsql(sql, getFieldfrom(key) + " as " + val, ","); 
  12.        } 
  13.    } 
  14.    // where 
  15.    if (parent == null) {    // on/where 等于號左邊的表 
  16.        String parentJoinKey = getJoinKey(true); 
  17.        // fix sharejoin bug: 
  18.        // (AbstractConnection.java:458) -close connection,reason:program err:java.lang.IndexOutOfBoundsException: 
  19.        // 原因是左表的select列沒有包含 join 列,在獲取結(jié)果時(shí)報(bào)上面的錯(cuò)誤 
  20.        if (sql != null && parentJoinKey != null && 
  21.                !sql.toUpperCase().contains(parentJoinKey.trim().toUpperCase())) { 
  22.            sql += ", " + parentJoinKey; 
  23.        } 
  24.        sql = "select " + sql + " from " + tName; 
  25.        if (!(where.trim().equals(""))) { 
  26.            sql += " where " + where.trim(); 
  27.        } 
  28.    } else {    // on/where 等于號右邊邊的表 
  29.        if (allField) { 
  30.            sql = "select " + sql + " from " + tName; 
  31.        } else { 
  32.            sql = unionField("select " + joinKey, sql, ","); 
  33.            sql = sql + " from " + tName; 
  34.            //sql="select "+joinKey+","+sql+" from "+tName; 
  35.        } 
  36.        if (!(where.trim().equals(""))) { 
  37.            sql += " where " + where.trim() + " and (" + joinKey + " in %s )"
  38.        } else { 
  39.            sql += " where " + joinKey + " in %s "
  40.        } 
  41.    } 
  42.    // order 
  43.    if (!(order.trim().equals(""))) { 
  44.        sql += " order by " + order.trim(); 
  45.    } 
  46.    // limit 
  47.    if (parent == null) { 
  48.        if ((rowCount > 0) && (offset > 0)) { 
  49.            sql += " limit" + offset + "," + rowCount; 
  50.        } else { 
  51.            if (rowCount > 0) { 
  52.                sql += " limit " + rowCount; 
  53.            } 
  54.        } 
  55.    } 
  56.    return sql; 
  57.  
  • 當(dāng) parent 為空時(shí),即on/where 等于號左邊的表。例如:select id, uid from t_order。
  • 當(dāng) parent 不為空時(shí),即on/where 等于號右邊的表。例如:select id, username from t_user where id in (1, 2, 3)。

3.2 ShareJoin.processSQL(...)

當(dāng) SQL 解析完后,生成左邊的表執(zhí)行的 SQL,發(fā)送給對應(yīng)的數(shù)據(jù)節(jié)點(diǎn)查詢數(shù)據(jù)。大體流程如下:

 

當(dāng) SQL 為 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 時(shí), sql = getSql() 的返回結(jié)果為 select id, uid from t_order。

生成左邊的表執(zhí)行的 SQL 后,順序順序順序發(fā)送給對應(yīng)的數(shù)據(jù)節(jié)點(diǎn)查詢數(shù)據(jù)。具體順序查詢是怎么實(shí)現(xiàn)的,我們來看下章 BatchSQLJob。

3.3 BatchSQLJob

 

EngineCtx 對 BatchSQLJob 封裝,提供上層兩個(gè)方法:

  • executeNativeSQLSequnceJob :順序(非并發(fā))在每個(gè)數(shù)據(jù)節(jié)點(diǎn)執(zhí)行SQL任務(wù)
  • executeNativeSQLParallJob :并發(fā)在每個(gè)數(shù)據(jù)節(jié)點(diǎn)執(zhí)行SQL任務(wù)

核心代碼如下:

  1. // EngineCtx.java 
  2. public void executeNativeSQLSequnceJob(String[] dataNodes, String sql, 
  3.         SQLJobHandler jobHandler) { 
  4.     for (String dataNode : dataNodes) { 
  5.         SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode, 
  6.                 jobHandler, this); 
  7.         bachJob.addJob(job, false); 
  8.     } 
  9.  
  10. public void executeNativeSQLParallJob(String[] dataNodes, String sql, 
  11.         SQLJobHandler jobHandler) { 
  12.     for (String dataNode : dataNodes) { 
  13.         SQLJob job = new SQLJob(jobId.incrementAndGet(), sql, dataNode, 
  14.                 jobHandler, this); 
  15.         bachJob.addJob(job, true); 
  16.     } 
  17.  

BatchSQLJob 通過執(zhí)行中任務(wù)列表、待執(zhí)行任務(wù)列表來實(shí)現(xiàn)順序/并發(fā)執(zhí)行任務(wù)。核心代碼如下:

  1. // BatchSQLJob.java 
  2. /** 
  3. * 執(zhí)行中任務(wù)列表 
  4. */ 
  5. private ConcurrentHashMap<Integer, SQLJob> runningJobs = new ConcurrentHashMap<Integer, SQLJob>(); 
  6. /** 
  7. * 待執(zhí)行任務(wù)列表 
  8. */ 
  9. private ConcurrentLinkedQueue<SQLJob> waitingJobs = new ConcurrentLinkedQueue<SQLJob>(); 
  10.  
  11. public void addJob(SQLJob newJob, boolean parallExecute) { 
  12.    if (parallExecute) { 
  13.        runJob(newJob); 
  14.    } else { 
  15.        waitingJobs.offer(newJob); 
  16.        if (runningJobs.isEmpty()) { // 若無正在執(zhí)行中的任務(wù),則從等待隊(duì)列里獲取任務(wù)進(jìn)行執(zhí)行。 
  17.            SQLJob job = waitingJobs.poll(); 
  18.            if (job != null) { 
  19.                runJob(job); 
  20.            } 
  21.        } 
  22.    } 
  23.  
  24. public boolean jobFinished(SQLJob sqlJob) { 
  25.     runningJobs.remove(sqlJob.getId()); 
  26.     SQLJob job = waitingJobs.poll(); 
  27.     if (job != null) { 
  28.         runJob(job); 
  29.         return false
  30.     } else { 
  31.         if (noMoreJobInput) { 
  32.             return runningJobs.isEmpty() && waitingJobs.isEmpty(); 
  33.         } else { 
  34.             return false
  35.         } 
  36.     } 
  37.  
  • 順序執(zhí)行時(shí),當(dāng) runningJobs 存在執(zhí)行中的任務(wù)時(shí),#addJob(...) 時(shí),不立即執(zhí)行,添加到 waitingJobs。當(dāng) SQLJob 完成時(shí),順序調(diào)用下一個(gè)任務(wù)。
  • 并發(fā)執(zhí)行時(shí),#addJob(...) 時(shí),立即執(zhí)行。

SQLJob SQL 異步執(zhí)行任務(wù)。其 jobHandler(SQLJobHandler) 屬性,在 SQL 執(zhí)行有返回結(jié)果時(shí),會(huì)進(jìn)行回調(diào),從而實(shí)現(xiàn)異步執(zhí)行。

在 ShareJoin 里,SQLJobHandler 有兩個(gè)實(shí)現(xiàn):ShareDBJoinHandler、ShareRowOutPutDataHandler。前者,左邊的表執(zhí)行的 SQL 回調(diào);后者,右邊的表執(zhí)行的 SQL 回調(diào)。

 

3.4 ShareDBJoinHandler

ShareDBJoinHandler,左邊的表執(zhí)行的 SQL 回調(diào)。流程如下:

 

  • #fieldEofResponse(...) :接收數(shù)據(jù)節(jié)點(diǎn)返回的 fields,放入內(nèi)存。
  • #rowResponse(...) :接收數(shù)據(jù)節(jié)點(diǎn)返回的 row,放入內(nèi)存。
  • #rowEofResponse(...) :接收完一個(gè)數(shù)據(jù)節(jié)點(diǎn)返回所有的 row。當(dāng)所有數(shù)據(jù)節(jié)點(diǎn)都完成 SQL 執(zhí)行時(shí),提交右邊的表執(zhí)行的 SQL 任務(wù),并行執(zhí)行,即圖中#createQryJob(...)。

當(dāng) SQL 為 /*!mycat:catlet=io.mycat.catlets.ShareJoin */ SELECT o.id, u.username from t_order o join t_user u on o.uid = u.id; 時(shí), sql = getChildSQL() 的返回結(jié)果為 select id, username from t_user where id in (1, 2, 3)。

核心代碼如下:

  1. // ShareJoin.java 
  2. private void createQryJob(int batchSize) { 
  3.    int count = 0; 
  4.    Map<String, byte[]> batchRows = new ConcurrentHashMap<String, byte[]>(); 
  5.    String theId = null
  6.    StringBuilder sb = new StringBuilder().append('('); 
  7.    String svalue = ""
  8.    for (Map.Entry<String, String> e : ids.entrySet()) { 
  9.        theId = e.getKey(); 
  10.        byte[] rowbyte = rows.remove(theId); 
  11.        if (rowbyte != null) { 
  12.            batchRows.put(theId, rowbyte); 
  13.        } 
  14.        if (!svalue.equals(e.getValue())) { 
  15.            if (joinKeyType == Fields.FIELD_TYPE_VAR_STRING 
  16.                    || joinKeyType == Fields.FIELD_TYPE_STRING) { // joinkey 為varchar 
  17.                sb.append("'").append(e.getValue()).append("'").append(','); // ('digdeep','yuanfang'
  18.            } else { // 默認(rèn)joinkey為int/long 
  19.                sb.append(e.getValue()).append(','); // (1,2,3) 
  20.            } 
  21.        } 
  22.        svalue = e.getValue(); 
  23.        if (count++ > batchSize) { 
  24.            break; 
  25.        } 
  26.    } 
  27.    if (count == 0) { 
  28.        return
  29.    } 
  30.    jointTableIsData = true
  31.    sb.deleteCharAt(sb.length() - 1).append(')'); 
  32.    String sql = String.format(joinParser.getChildSQL(), sb); 
  33.    getRoute(sql); 
  34.    ctx.executeNativeSQLParallJob(getDataNodes(), sql, new ShareRowOutPutDataHandler(this, fields, joinindex, joinParser.getJoinRkey(), batchRows, ctx.getSession())); 
  35.  

3.5 ShareRowOutPutDataHandler

ShareRowOutPutDataHandler,右邊的表執(zhí)行的 SQL 回調(diào)。流程如下:

 

  • #fieldEofResponse(...) :接收數(shù)據(jù)節(jié)點(diǎn)返回的 fields,返回 header 給 MySQL Client。
  • #rowResponse(...) :接收數(shù)據(jù)節(jié)點(diǎn)返回的 row,匹配左表的記錄,返回合并后返回的 row 給 MySQL Client。
  • #rowEofResponse(...) :當(dāng)所有 row 都返回完后,返回 eof 給 MySQL Client。

核心代碼如下:

  1. // ShareRowOutPutDataHandler.java 
  2. public boolean onRowData(String dataNode, byte[] rowData) { 
  3.    RowDataPacket rowDataPkgold = ResultSetUtil.parseRowData(rowData, bfields); 
  4.    //拷貝一份batchRows 
  5.    Map<String, byte[]> batchRowsCopy = new ConcurrentHashMap<String, byte[]>(); 
  6.    batchRowsCopy.putAll(arows); 
  7.    // 獲取Id字段, 
  8.    String id = ByteUtil.getString(rowDataPkgold.fieldValues.get(joinR)); 
  9.    // 查找ID對應(yīng)的A表的記錄 
  10.    byte[] arow = getRow(batchRowsCopy, id, joinL); 
  11.    while (arow != null) { 
  12.        RowDataPacket rowDataPkg = ResultSetUtil.parseRowData(arow, afields);//ctx.getAllFields()); 
  13.        for (int i = 1; i < rowDataPkgold.fieldCount; i++) { 
  14.            // 設(shè)置b.name 字段 
  15.            byte[] bname = rowDataPkgold.fieldValues.get(i); 
  16.            rowDataPkg.add(bname); 
  17.            rowDataPkg.addFieldCount(1); 
  18.        } 
  19.        // huangyiming add 
  20.        MiddlerResultHandler middlerResultHandler = session.getMiddlerResultHandler(); 
  21.        if (null == middlerResultHandler) { 
  22.            ctx.writeRow(rowDataPkg); 
  23.        } else { 
  24.            if (middlerResultHandler instanceof MiddlerQueryResultHandler) { 
  25.                byte[] columnData = rowDataPkg.fieldValues.get(0); 
  26.                if (columnData != null && columnData.length > 0) { 
  27.                    String rowValue = new String(columnData); 
  28.                    middlerResultHandler.add(rowValue); 
  29.                } 
  30.                //} 
  31.            } 
  32.  
  33.        } 
  34.        arow = getRow(batchRowsCopy, id, joinL); 
  35.    } 
  36.    return false
  37.  

4. 彩蛋

如下是本文涉及到的核心類,有興趣的同學(xué)可以翻一翻。

 

ShareJoin 另外不支持的功能:

  1. 只支持 inner join,不支持 left join、right join 等等連接。
  2. 不支持 order by。
  3. 不支持 group by 以及 相關(guān)聚合函數(shù)。
  4. 即使 join 左表的字段未聲明為返回 fields 也會(huì)返回。

恩,MyCAT 弱XA 源碼繼續(xù)走起!

責(zé)任編輯:龐桂玉 來源: 芋艿V的博客
相關(guān)推薦

2017-07-26 09:41:28

MyCATSQLMongoDB

2017-07-18 17:35:16

數(shù)據(jù)庫MyCATPreparedSta

2017-12-01 05:40:56

數(shù)據(jù)庫中間件join

2017-12-01 05:04:32

數(shù)據(jù)庫中間件Atlas

2018-02-24 19:37:33

Java8數(shù)據(jù)庫中間件

2017-11-27 05:36:16

數(shù)據(jù)庫中間件TDDL

2017-11-27 05:06:42

數(shù)據(jù)庫中間件cobar

2011-08-10 13:03:58

CJDBC數(shù)據(jù)庫集群

2020-04-10 17:00:33

Mycat分庫分表SpringBoot

2017-05-23 18:55:05

mysql-proxy數(shù)據(jù)庫架構(gòu)

2009-01-20 10:45:55

Oracle數(shù)據(jù)庫中間件

2024-12-06 08:29:29

2017-11-27 06:01:37

數(shù)據(jù)庫中間件中間層

2017-12-11 13:30:49

Go語言數(shù)據(jù)庫中間件

2017-11-03 11:02:08

數(shù)據(jù)庫中間件

2017-11-30 08:56:14

數(shù)據(jù)庫中間件架構(gòu)師

2019-05-13 15:00:14

MySQLMyCat數(shù)據(jù)庫

2018-11-07 15:30:19

數(shù)據(jù)庫NewSQLNoSQL

2021-07-27 05:49:59

MySQL數(shù)據(jù)庫中間件

2020-10-15 08:34:32

數(shù)據(jù)庫中間件漫談
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號