數(shù)據(jù)庫(kù)中間件 MyCAT源碼分析 —— PreparedStatement 重新入門
1. 概述
相信很多同學(xué)在學(xué)習(xí) JDBC 時(shí),都碰到 PreparedStatement 和 Statement。究竟該使用哪個(gè)呢?最終很可能是懵里懵懂的看了各種總結(jié),使用 PreparedStatement。那么本文,通過(guò) MyCAT 對(duì) PreparedStatement 的實(shí)現(xiàn)對(duì)大家能夠重新理解下。
本文主要分成兩部分:
- JDBC Client 如何實(shí)現(xiàn) PreparedStatement。
- MyCAT Server 如何處理 PreparedStatement。
😈 Let's Go。
2. JDBC Client 實(shí)現(xiàn)
首先,我們來(lái)看一段大家最喜歡復(fù)制粘貼之一的代碼,JDBC PreparedStatement 查詢 MySQL 數(shù)據(jù)庫(kù):
- public class PreparedStatementDemo {
- public static void main(String[] args) throws ClassNotFoundException, SQLException {
- // 1. 獲得數(shù)據(jù)庫(kù)連接
- Class.forName("com.mysql.jdbc.Driver");
- Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:8066/dbtest?useServerPrepStmts=true", "root", "123456");
- // PreparedStatement
- PreparedStatement ps = conn.prepareStatement("SELECT id, username, password FROM t_user WHERE id = ?");
- ps.setLong(1, Math.abs(new Random().nextLong()));
- // execute
- ps.executeQuery();
- }
- }
獲取 MySQL 連接時(shí),useServerPrepStmts=true 是非常非常非常重要的參數(shù)。如果不配置,PreparedStatement 實(shí)際是個(gè)假的 PreparedStatement(新版本默認(rèn)為 FALSE,據(jù)說(shuō)部分老版本默認(rèn)為 TRUE),未開啟服務(wù)端級(jí)別的 SQL 預(yù)編譯。
WHY ?來(lái)看下 JDBC 里面是怎么實(shí)現(xiàn)的。
- // com.mysql.jdbc.ConnectionImpl.java
- public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
- synchronized (getConnectionMutex()) {
- checkClosed();
- PreparedStatement pStmt = null;
- boolean canServerPrepare = true;
- String nativeSql = getProcessEscapeCodesForPrepStmts() ? nativeSQL(sql) : sql;
- if (this.useServerPreparedStmts && getEmulateUnsupportedPstmts()) {
- canServerPrepare = canHandleAsServerPreparedStatement(nativeSql);
- }
- if (this.useServerPreparedStmts && canServerPrepare) {
- if (this.getCachePreparedStatements()) { // 從緩存中獲取 pStmt
- synchronized (this.serverSideStatementCache) {
- pStmt = (com.mysql.jdbc.ServerPreparedStatement) this.serverSideStatementCache
- .remove(makePreparedStatementCacheKey(this.database, sql));
- if (pStmt != null) {
- ((com.mysql.jdbc.ServerPreparedStatement) pStmt).setClosed(false);
- pStmt.clearParameters(); // 清理上次留下的參數(shù)
- }
- if (pStmt == null) {
- // .... 省略代碼 :向 Server 提交 SQL 預(yù)編譯。
- }
- }
- } else {
- try {
- // 向 Server 提交 SQL 預(yù)編譯。
- pStmt = ServerPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql, this.database, resultSetType, resultSetConcurrency);
- pStmt.setResultSetType(resultSetType);
- pStmt.setResultSetConcurrency(resultSetConcurrency);
- } catch (SQLException sqlEx) {
- // Punt, if necessary
- if (getEmulateUnsupportedPstmts()) {
- pStmt = (PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);
- } else {
- throw sqlEx;
- }
- }
- }
- } else {
- pStmt = (PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);
- }
- return pStmt;
- }
- }
- 【前者】當(dāng) Client 開啟 useServerPreparedStmts 并且 Server 支持 ServerPrepare,Client 會(huì)向 Server 提交 SQL 預(yù)編譯請(qǐng)求。
- if (this.useServerPreparedStmts && canServerPrepare) {
- pStmt = ServerPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql, this.database, resultSetType, resultSetConcurrency);
- }
- 【后者】當(dāng) Client 未開啟 useServerPreparedStmts 或者 Server 不支持 ServerPrepare,Client 創(chuàng)建 PreparedStatement,不會(huì)向 Server 提交 SQL 預(yù)編譯請(qǐng)求。
- pStmt = (PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);
即使這樣,究竟為什么性能會(huì)更好呢?
- 【前者】返回的 PreparedStatement 對(duì)象類是 JDBC42ServerPreparedStatement.java,后續(xù)每次執(zhí)行 SQL 只需將對(duì)應(yīng)占位符?對(duì)應(yīng)的值提交給 Server即可,減少網(wǎng)絡(luò)傳輸和 SQL 解析開銷。
- 【后者】返回的 PreparedStatement 對(duì)象類是 JDBC42PreparedStatement.java,后續(xù)每次執(zhí)行 SQL 需要將完整的 SQL 提交給 Server,增加了網(wǎng)絡(luò)傳輸和 SQL 解析開銷。
🌚:【前者】性能一定比【后者】好嗎?相信你已經(jīng)有了正確的答案。
3. MyCAT Server 實(shí)現(xiàn)
3.1 創(chuàng)建 PreparedStatement
該操作對(duì)應(yīng) Client conn.prepareStatement(....)。
MyCAT 接收到請(qǐng)求后,創(chuàng)建 PreparedStatement,并返回 statementId 等信息。Client 發(fā)起 SQL 執(zhí)行時(shí),需要將 statementId 帶給 MyCAT。核心代碼如下:
- // ServerPrepareHandler.java
- @Override
- public void prepare(String sql) {
- LOGGER.debug("use server prepare, sql: " + sql);
- PreparedStatement pstmt = pstmtForSql.get(sql);
- if (pstmt == null) { // 緩存中獲取
- // 解析獲取字段個(gè)數(shù)和參數(shù)個(gè)數(shù)
- int columnCount = getColumnCount(sql);
- int paramCount = getParamCount(sql);
- pstmt = new PreparedStatement(++pstmtId, sql, columnCount, paramCount);
- pstmtForSql.put(pstmt.getStatement(), pstmt);
- pstmtForId.put(pstmt.getId(), pstmt);
- }
- PreparedStmtResponse.response(pstmt, source);
- }
- // PreparedStmtResponse.java
- public static void response(PreparedStatement pstmt, FrontendConnection c) {
- byte packetId = 0;
- // write preparedOk packet
- PreparedOkPacket preparedOk = new PreparedOkPacket();
- preparedOk.packetId = ++packetId;
- preparedOk.statementId = pstmt.getId();
- preparedOk.columnsNumber = pstmt.getColumnsNumber();
- preparedOk.parametersNumber = pstmt.getParametersNumber();
- ByteBuffer buffer = preparedOk.write(c.allocate(), c,true);
- // write parameter field packet
- int parametersNumber = preparedOk.parametersNumber;
- if (parametersNumber > 0) {
- for (int i = 0; i < parametersNumber; i++) {
- FieldPacket field = new FieldPacket();
- field.packetId = ++packetId;
- buffer = field.write(buffer, c,true);
- }
- EOFPacket eof = new EOFPacket();
- eof.packetId = ++packetId;
- buffer = eof.write(buffer, c,true);
- }
- // write column field packet
- int columnsNumber = preparedOk.columnsNumber;
- if (columnsNumber > 0) {
- for (int i = 0; i < columnsNumber; i++) {
- FieldPacket field = new FieldPacket();
- field.packetId = ++packetId;
- buffer = field.write(buffer, c,true);
- }
- EOFPacket eof = new EOFPacket();
- eof.packetId = ++packetId;
- buffer = eof.write(buffer, c,true);
- }
- // send buffer
- c.write(buffer);
- }
每個(gè)連接之間,PreparedStatement 不共享,即不同連接,即使 SQL相同,對(duì)應(yīng)的 PreparedStatement 不同。
3.2 執(zhí)行 SQL
該操作對(duì)應(yīng) Client conn.execute(....)。
MyCAT 接收到請(qǐng)求后,將 PreparedStatement 使用請(qǐng)求的參數(shù)格式化成可執(zhí)行的 SQL 進(jìn)行執(zhí)行。偽代碼如下:
- String sql = pstmt.sql.format(request.params);
- execute(sql);
核心代碼如下:
- // ServerPrepareHandler.java
- @Override
- public void execute(byte[] data) {
- long pstmtId = ByteUtil.readUB4(data, 5);
- PreparedStatement pstmt = null;
- if ((pstmt = pstmtForId.get(pstmtId)) == null) {
- source.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND, "Unknown pstmtId when executing.");
- } else {
- // 參數(shù)讀取
- ExecutePacket packet = new ExecutePacket(pstmt);
- try {
- packet.read(data, source.getCharset());
- } catch (UnsupportedEncodingException e) {
- source.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND, e.getMessage());
- return;
- }
- BindValue[] bindValues = packet.values;
- // 還原sql中的動(dòng)態(tài)參數(shù)為實(shí)際參數(shù)值
- String sql = prepareStmtBindValue(pstmt, bindValues);
- // 執(zhí)行sql
- source.getSession2().setPrepared(true);
- source.query(sql);
- }
- }
- private String prepareStmtBindValue(PreparedStatement pstmt, BindValue[] bindValues) {
- String sql = pstmt.getStatement();
- int[] paramTypes = pstmt.getParametersType();
- StringBuilder sb = new StringBuilder();
- int idx = 0;
- for (int i = 0, len = sql.length(); i < len; i++) {
- char c = sql.charAt(i);
- if (c != '?') {
- sb.append(c);
- continue;
- }
- // 處理占位符?
- int paramType = paramTypes[idx];
- BindValue bindValue = bindValues[idx];
- idx++;
- // 處理字段為空的情況
- if (bindValue.isNull) {
- sb.append("NULL");
- continue;
- }
- // 非空情況, 根據(jù)字段類型獲取值
- switch (paramType & 0xff) {
- case Fields.FIELD_TYPE_TINY:
- sb.append(String.valueOf(bindValue.byteBinding));
- break;
- case Fields.FIELD_TYPE_SHORT:
- sb.append(String.valueOf(bindValue.shortBinding));
- break;
- case Fields.FIELD_TYPE_LONG:
- sb.append(String.valueOf(bindValue.intBinding));
- break;
- // .... 省略非核心代碼
- }
- }
- return sb.toString();
- }
4. 彩蛋
💯 看到此處是不是真愛?!反正我信了。
給老鐵們額外加個(gè)🍗。
細(xì)心的同學(xué)們可能已經(jīng)注意到 JDBC Client 是支持緩存 PreparedStatement,無(wú)需每次都讓 Server 進(jìn)行創(chuàng)建。
當(dāng)配置 MySQL 數(shù)據(jù)連接 cachePrepStmts=true 時(shí)開啟 Client 級(jí)別的緩存。But,此處的緩存又和一般的緩存不一樣,是使用 remove 的方式獲得的,并且創(chuàng)建好 PreparedStatement 時(shí)也不添加到緩存。那什么時(shí)候添加緩存呢?在 pstmt.close() 時(shí),并且pstmt 是通過(guò)緩存獲取時(shí),添加到緩存。核心代碼如下:
- // ServerPreparedStatement.java
- public void close() throws SQLException {
- MySQLConnection locallyScopedConn = this.connection;
- if (locallyScopedConn == null) {
- return; // already closed
- }
- synchronized (locallyScopedConn.getConnectionMutex()) {
- if (this.isCached && isPoolable() && !this.isClosed) {
- clearParameters();
- this.isClosed = true;
- this.connection.recachePreparedStatement(this);
- return;
- }
- realClose(true, true);
- }
- }
- // ConnectionImpl.java
- public void recachePreparedStatement(ServerPreparedStatement pstmt) throws SQLException {
- synchronized (getConnectionMutex()) {
- if (getCachePreparedStatements() && pstmt.isPoolable()) {
- synchronized (this.serverSideStatementCache) {
- this.serverSideStatementCache.put(makePreparedStatementCacheKey(pstmt.currentCatalog, pstmt.originalSql), pstmt);
- }
- }
- }
- }
為什么要這么實(shí)現(xiàn)?PreparedStatement 是有狀態(tài)的變量,我們會(huì)去 setXXX(pos, value),一旦多線程共享,會(huì)導(dǎo)致錯(cuò)亂。