数据库中间件 MyCAT源码分析 —— PreparedStatement 重新入门

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 数据库中间件 MyCAT源码分析 —— PreparedStatement 重新入门

  1. 概述
    1. JDBC Client 实现
    2. 3. MyCAT Server 实现
  • 3.1 创建 PreparedStatement
  • 3.2 执行 SQL

1. 概述

相信很多同学在学习 JDBC 时,都碰到 PreparedStatement Statement。究竟该使用哪个呢?最终很可能是懵里懵懂的看了各种总结,使用 PreparedStatement。那么本文,通过 MyCAT 对 PreparedStatement 的实现对大家能够重新理解下。

本文主要分成两部分:

  1. JDBC Client 如何实现  PreparedStatement
  2. MyCAT Server 如何处理  PreparedStatement

😈 Let’s Go,

2. JDBC Client 实现

首先,我们来看一段大家最喜欢复制粘贴之一的代码,JDBC PreparedStatement 查询 MySQL 数据库:

1234567891011121314151617181920212223
public class PreparedStatementDemo {    public static void main(String[] args) throws ClassNotFoundException, SQLException {        // 1. 获得数据库连接        Class.forName("com.mysql.jdbc.Driver");        Connection conn = DriverManager.getConnection("jdbc:mysql://127.0.0.1:8066/dbtest?useServerPrepStmts=true", "root", "123456");        // PreparedStatement        PreparedStatement ps = conn.prepareStatement("SELECT id, username, password FROM t_user WHERE id = ?");        ps.setLong(1, Math.abs(new Random().nextLong()));        // execute        ps.executeQuery();    } }

public class PreparedStatementDemo {

   public static void main(String[] args) throws ClassNotFoundException, SQLException {

       // 1. 获得数据库连接

       Class.forName(“com.mysql.jdbc.Driver”);

       Connection conn = DriverManager.getConnection(“jdbc:mysql://127.0.0.1:8066/dbtest?useServerPrepStmts=true”, “root”, “123456”);

       // PreparedStatement

       PreparedStatement ps = conn.prepareStatement(“SELECT id, username, password FROM t_user WHERE id = ?”);

       ps.setLong(1, Math.abs(new Random().nextLong()));

       // execute

       ps.executeQuery();

   }

}

获取 MySQL 连接时, useServerPrepStmts=true非常非常非常重要的参数。如果不配置, PreparedStatement 实际是个 PreparedStatement(新版本默认为 FALSE,据说部分老版本默认为 TRUE),未开启服务端级别的 SQL 预编译。

WHY ?来看下 JDBC 里面是怎么实现的。

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
// com.mysql.jdbc.ConnectionImpl.java public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {   synchronized (getConnectionMutex()) {       checkClosed();       PreparedStatement pStmt = null;       boolean canServerPrepare = true;       String nativeSql = getProcessEscapeCodesForPrepStmts() ? nativeSQL(sql) : sql;       if (this.useServerPreparedStmts && getEmulateUnsupportedPstmts()) {           canServerPrepare = canHandleAsServerPreparedStatement(nativeSql);       }       if (this.useServerPreparedStmts && canServerPrepare) {           if (this.getCachePreparedStatements()) { // 从缓存中获取 pStmt               synchronized (this.serverSideStatementCache) {                   pStmt = (com.mysql.jdbc.ServerPreparedStatement) this.serverSideStatementCache                           .remove(makePreparedStatementCacheKey(this.database, sql));                   if (pStmt != null) {                       ((com.mysql.jdbc.ServerPreparedStatement) pStmt).setClosed(false);                       pStmt.clearParameters(); // 清理上次留下的参数                   }                   if (pStmt == null) {                        // .... 省略代码 :向 Server 提交 SQL 预编译。                   }               }           } else {               try {                   // 向 Server 提交 SQL 预编译。                   pStmt = ServerPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql, this.database, resultSetType, resultSetConcurrency);                   pStmt.setResultSetType(resultSetType);                   pStmt.setResultSetConcurrency(resultSetConcurrency);               } catch (SQLException sqlEx) {                   // Punt, if necessary                   if (getEmulateUnsupportedPstmts()) {                       pStmt = (PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);                   } else {                       throw sqlEx;                   }               }           }       } else {           pStmt = (PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);       }       return pStmt;   } }

// com.mysql.jdbc.ConnectionImpl.java

public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {

  synchronized (getConnectionMutex()) {

      checkClosed();

      PreparedStatement pStmt = null;

      boolean canServerPrepare = true;

      String nativeSql = getProcessEscapeCodesForPrepStmts() ? nativeSQL(sql) : sql;

      if (this.useServerPreparedStmts && getEmulateUnsupportedPstmts()) {

          canServerPrepare = canHandleAsServerPreparedStatement(nativeSql);

      }

      if (this.useServerPreparedStmts && canServerPrepare) {

          if (this.getCachePreparedStatements()) { // 从缓存中获取 pStmt

              synchronized (this.serverSideStatementCache) {

                  pStmt = (com.mysql.jdbc.ServerPreparedStatement) this.serverSideStatementCache

                          .remove(makePreparedStatementCacheKey(this.database, sql));

                  if (pStmt != null) {

                      ((com.mysql.jdbc.ServerPreparedStatement) pStmt).setClosed(false);

                      pStmt.clearParameters(); // 清理上次留下的参数

                  }

                  if (pStmt == null) {

                       // …. 省略代码 :向 Server 提交 SQL 预编译。

                  }

              }

          } else {

              try {

                  // 向 Server 提交 SQL 预编译。

                  pStmt = ServerPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql, this.database, resultSetType, resultSetConcurrency);

                  pStmt.setResultSetType(resultSetType);

                  pStmt.setResultSetConcurrency(resultSetConcurrency);

              } catch (SQLException sqlEx) {

                  // Punt, if necessary

                  if (getEmulateUnsupportedPstmts()) {

                      pStmt = (PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);

                  } else {

                      throw sqlEx;

                  }

              }

          }

      } else {

          pStmt = (PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);

      }

      return pStmt;

  }

}

  • 【前者】当 Client 开启  useServerPreparedStmts 并且 Server 支持  ServerPrepareClient 会向 Server 提交 SQL 预编译请求
  • 1
    2
    3
    4
    5
    
    if (this.useServerPreparedStmts && canServerPrepare) {
     
          pStmt = ServerPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql, this.database, resultSetType, resultSetConcurrency);
     
    }
    
      pStmt = ServerPreparedStatement.getInstance(getMultiHostSafeProxy(), nativeSql, this.database, resultSetType, resultSetConcurrency);
    

    }

     

    • 【后者】当 Client 未开启  useServerPreparedStmts 或者 Server 不支持  ServerPrepare,Client 创建  PreparedStatement不会向 Server 提交 SQL 预编译请求
      1
      pStmt = (PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);

    pStmt = (PreparedStatement) clientPrepareStatement(nativeSql, resultSetType, resultSetConcurrency, false);

    即使这样,究竟为什么性能会更好呢?

    • 【前者】返回的  PreparedStatement 对象类是  JDBC42ServerPreparedStatement.java,后续每次执行 SQL 只需将对应占位符?对应的值提交给 Server即可,减少网络传输和 SQL 解析开销。
    • 【后者】返回的  PreparedStatement 对象类是  JDBC42PreparedStatement.java,后续每次执行 SQL 需要将完整的 SQL 提交给 Server,增加了网络传输和 SQL 解析开销。

    🌚:【前者】性能一定比【后者】好吗?相信你已经有了正确的答案。

    3. MyCAT Server 实现

    3.1 创建 PreparedStatement

    该操作对应 Client conn.prepareStatement(....)

    数据库中间件 MyCAT源码分析 —— PreparedStatement 重新入门

    MyCAT 接收到请求后,创建 PreparedStatement,并返回 statementId 等信息。Client 发起 SQL 执行时,需要将 statementId 带给 MyCAT。核心代码如下:

    123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
    // ServerPrepareHandler.java@Override public void prepare(String sql) {     LOGGER.debug("use server prepare, sql: " + sql);      PreparedStatement pstmt = pstmtForSql.get(sql);          if (pstmt == null) {        // 缓存中获取                   // 解析获取字段个数和参数个数           int columnCount = getColumnCount(sql);            int paramCount = getParamCount(sql);            pstmt = new PreparedStatement(++pstmtId, sql, columnCount, paramCount);            pstmtForSql.put(pstmt.getStatement(), pstmt);            pstmtForId.put(pstmt.getId(), pstmt);              }      PreparedStmtResponse.response(pstmt, source); } // PreparedStmtResponse.javapublic static void response(PreparedStatement pstmt, FrontendConnection c) {      byte packetId = 0;          // write preparedOk packet     PreparedOkPacket preparedOk = new PreparedOkPacket();      preparedOk.packetId = ++packetId;      preparedOk.statementId = pstmt.getId();      preparedOk.columnsNumber = pstmt.getColumnsNumber();      preparedOk.parametersNumber = pstmt.getParametersNumber();      ByteBuffer buffer = preparedOk.write(c.allocate(), c, true);          // write parameter field packet     int parametersNumber = preparedOk.parametersNumber;          if (parametersNumber  0) {                    for (int i = 0; i  parametersNumber; i++) {                  FieldPacket field = new FieldPacket();                  field.packetId = ++packetId;                  buffer = field.write(buffer, c, true);                        }            EOFPacket eof = new EOFPacket();            eof.packetId = ++packetId;            buffer = eof.write(buffer, c, true);              }          // write column field packet     int columnsNumber = preparedOk.columnsNumber;          if (columnsNumber  0) {                    for (int i = 0; i  columnsNumber; i++) {                  FieldPacket field = new FieldPacket();                  field.packetId = ++packetId;                  buffer = field.write(buffer, c, true);                        }            EOFPacket eof = new EOFPacket();            eof.packetId = ++packetId;            buffer = eof.write(buffer, c, true);              }          // send buffer     c.write(buffer); }

    // ServerPrepareHandler.java
    @Override

    public void prepare(String sql) {

    
    LOGGER.debug("use server prepare, sql: " + sql);
    
     PreparedStatement pstmt = pstmtForSql.get(sql);
    
     
    if (pstmt == null) {
        // 缓存中获取
           
        // 解析获取字段个数和参数个数
           int columnCount = getColumnCount(sql);
    
           int paramCount = getParamCount(sql);
    
           pstmt = new PreparedStatement(++pstmtId, sql, columnCount, paramCount);
    
           pstmtForSql.put(pstmt.getStatement(), pstmt);
    
           pstmtForId.put(pstmt.getId(), pstmt);
    
         
    }
    
     PreparedStmtResponse.response(pstmt, source);
    

    }

    // PreparedStmtResponse.java
    public static void response(PreparedStatement pstmt, FrontendConnection c) {

    
     byte packetId = 0;
    
     
    // write preparedOk packet
     PreparedOkPacket preparedOk = new PreparedOkPacket();
    
     preparedOk.packetId = ++packetId;
    
     preparedOk.statementId = pstmt.getId();
    
     preparedOk.columnsNumber = pstmt.getColumnsNumber();
    
     preparedOk.parametersNumber = pstmt.getParametersNumber();
    
     ByteBuffer buffer = preparedOk.write(c.allocate(), c, true);
    
     
    // write parameter field packet
     int parametersNumber = preparedOk.parametersNumber;
    
     
    if (parametersNumber  0) {
    
           
        for (int i = 0; i  parametersNumber; i++) {
    
                 FieldPacket field = new FieldPacket();
    
                 field.packetId = ++packetId;
    
                 buffer = field.write(buffer, c, true);
    
               
        }
    
           EOFPacket eof = new EOFPacket();
    
           eof.packetId = ++packetId;
    
           buffer = eof.write(buffer, c, true);
    
         
    }
    
     
    // write column field packet
     int columnsNumber = preparedOk.columnsNumber;
    
     
    if (columnsNumber  0) {
    
           
        for (int i = 0; i  columnsNumber; i++) {
    
                 FieldPacket field = new FieldPacket();
    
                 field.packetId = ++packetId;
    
                 buffer = field.write(buffer, c, true);
    
               
        }
    
           EOFPacket eof = new EOFPacket();
    
           eof.packetId = ++packetId;
    
           buffer = eof.write(buffer, c, true);
    
         
    }
    
     
    // send buffer
     c.write(buffer);
    

    }

    每个连接之间,PreparedStatement 不共享,即不同连接,即使 SQL相同,对应的 PreparedStatement 不同。

    3.2 执行 SQL

    该操作对应 Client conn.execute(....)

    数据库中间件 MyCAT源码分析 —— PreparedStatement 重新入门

    MyCAT 接收到请求后,将 PreparedStatement 使用请求的参数格式化成可执行的 SQL 进行执行。伪代码如下:

    123
    String sql = pstmt.sql.format(request.params); execute(sql);

    String sql = pstmt.sql.format(request.params);

    execute(sql);

    核心代码如下:

    123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
    // ServerPrepareHandler.java@Override public void execute(byte[] data) {      long pstmtId = ByteUtil.readUB4(data, 5);      PreparedStatement pstmt = null;          if ((pstmt = pstmtForId.get(pstmtId)) == null) {            source.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND, "Unknown pstmtId when executing.");              } else {                    // 参数读取           ExecutePacket packet = new ExecutePacket(pstmt);                    try {                  packet.read(data, source.getCharset());                        } catch(UnsupportedEncodingException e) {                  source.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND, e.getMessage());                              return;                        }            BindValue[] bindValues = packet.values;                    // 还原sql中的动态参数为实际参数值           String sql = prepareStmtBindValue(pstmt, bindValues);                    // 执行sql           source.getSession2().setPrepared(true);            source.query(sql);              } } private String prepareStmtBindValue(PreparedStatement pstmt, BindValue[] bindValues) {      String sql = pstmt.getStatement();      int[] paramTypes = pstmt.getParametersType();      StringBuilder sb = new StringBuilder();      int idx = 0;          for (int i = 0, len = sql.length(); i  len; i++) {            char c = sql.charAt(i);                    if (c != '?') {                  sb.append(c);                              continue;                        }                    // 处理占位符?           int paramType = paramTypes[idx];            BindValue bindValue = bindValues[idx];            idx++;                    // 处理字段为空的情况                   if (bindValue.isNull) {                  sb.append("NULL");                              continue;                        }                    // 非空情况, 根据字段类型获取值                   switch (paramType & 0xff) {                          case Fields.FIELD_TYPE_TINY:                    sb.append(String.valueOf(bindValue.byteBinding));                                break;                          case Fields.FIELD_TYPE_SHORT:                    sb.append(String.valueOf(bindValue.shortBinding));                                break;                          case Fields.FIELD_TYPE_LONG:                    sb.append(String.valueOf(bindValue.intBinding));                                break;                              // .... 省略非核心代码                        }              }          return sb.toString(); }

    // ServerPrepareHandler.java
    @Override

    public void execute(byte[] data) {

    
     long pstmtId = ByteUtil.readUB4(data, 5);
    
     PreparedStatement pstmt = null;
    
     
    if ((pstmt = pstmtForId.get(pstmtId)) == null) {
    
           source.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND, "Unknown pstmtId when executing.");
    
         
    } else {
    
           
        // 参数读取
           ExecutePacket packet = new ExecutePacket(pstmt);
    
           
        try {
    
                 packet.read(data, source.getCharset());
    
               
        } catch(UnsupportedEncodingException e) {
    
                 source.writeErrMessage(ErrorCode.ER_ERROR_WHEN_EXECUTING_COMMAND, e.getMessage());
    
                 
            return;
    
               
        }
    
           BindValue[] bindValues = packet.values;
    
           
        // 还原sql中的动态参数为实际参数值
           String sql = prepareStmtBindValue(pstmt, bindValues);
    
           
        // 执行sql
           source.getSession2().setPrepared(true);
    
           source.query(sql);
    
         
    }
    

    }

    private String prepareStmtBindValue(PreparedStatement pstmt, BindValue[] bindValues) {

    
     String sql = pstmt.getStatement();
    
     int[] paramTypes = pstmt.getParametersType();
    
     StringBuilder sb = new StringBuilder();
    
     int idx = 0;
    
     
    for (int i = 0, len = sql.length(); i  len; i++) {
    
           char c = sql.charAt(i);
    
           
        if (c != '?') {
    
                 sb.append(c);
    
                 
            continue;
    
               
        }
    
           
        // 处理占位符?
           int paramType = paramTypes[idx];
    
           BindValue bindValue = bindValues[idx];
    
           idx++;
    
           
        // 处理字段为空的情况
           
        if (bindValue.isNull) {
    
                 sb.append("NULL");
    
                 
            continue;
    
               
        }
    
           
        // 非空情况, 根据字段类型获取值
           
        switch (paramType & 0xff) {
    
                 
        case Fields.FIELD_TYPE_TINY:
    
                   sb.append(String.valueOf(bindValue.byteBinding));
    
                   
            break;
    
                 
        case Fields.FIELD_TYPE_SHORT:
    
                   sb.append(String.valueOf(bindValue.shortBinding));
    
                   
            break;
    
                 
        case Fields.FIELD_TYPE_LONG:
    
                   sb.append(String.valueOf(bindValue.intBinding));
    
                   
            break;
    
                 
            // .... 省略非核心代码
                
        }
    
         
    }
    
     
    return sb.toString();
    

    }

    4. 彩蛋

    💯 看到此处是不是真爱?!反正我信了。
    给老铁们额外加个🍗。

    细心的同学们可能已经注意到 JDBC Client 是支持缓存 PreparedStatement,无需每次都让 Server 进行创建。

    当配置 MySQL 数据连接 cachePrepStmts=true 时开启 Client 级别的缓存。But,此处的缓存又和一般的缓存不一样,是使用 remove 的方式获得的,并且创建好 PreparedStatement 时也不添加到缓存。那什么时候添加缓存呢?在 pstmt.close() 时,并且** pstmt 是通过缓存获取时**,添加到缓存。核心代码如下:

    123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
    // ServerPreparedStatement.java public void close() throws SQLException {   MySQLConnection locallyScopedConn = this.connection;   if (locallyScopedConn == null) {       return; // already closed   }   synchronized (locallyScopedConn.getConnectionMutex()) {       if (this.isCached && isPoolable() && !this.isClosed) {           clearParameters();           this.isClosed = true;           this.connection.recachePreparedStatement(this);           return;       }       realClose(true, true);   } } // ConnectionImpl.java public void recachePreparedStatement(ServerPreparedStatement pstmt) throws SQLException {   synchronized (getConnectionMutex()) {       if (getCachePreparedStatements() && pstmt.isPoolable()) {           synchronized (this.serverSideStatementCache) {               this.serverSideStatementCache.put(makePreparedStatementCacheKey(pstmt.currentCatalog, pstmt.originalSql), pstmt);           }       }   } }

    // ServerPreparedStatement.java

    public void close() throws SQLException {

      MySQLConnection locallyScopedConn = this.connection;

      if (locallyScopedConn == null) {

          return; // already closed

      }

      synchronized (locallyScopedConn.getConnectionMutex()) {

          if (this.isCached && isPoolable() && !this.isClosed) {

              clearParameters();

              this.isClosed = true;

              this.connection.recachePreparedStatement(this);

              return;

          }

          realClose(true, true);

      }

    }

    // ConnectionImpl.java

    public void recachePreparedStatement(ServerPreparedStatement pstmt) throws SQLException {

      synchronized (getConnectionMutex()) {

          if (getCachePreparedStatements() && pstmt.isPoolable()) {

              synchronized (this.serverSideStatementCache) {

                  this.serverSideStatementCache.put(makePreparedStatementCacheKey(pstmt.currentCatalog, pstmt.originalSql), pstmt);

              }

          }

      }

    }

    为什么要这么实现? PreparedStatement 是有状态的变量,我们会去 setXXX(pos,value),一旦多线程共享,会导致错乱。

    原文地址:http://www.yunai.me/MyCAT/what-is-PreparedStatement/ `MyCat-Server` **带注释代码**地址 :https://github.com/YunaiV/Mycat-Server **😈本系列每 1-2 周更新一篇,欢迎订阅、关注、收藏 公众号** QQ :7685413

    数据库中间件 MyCAT源码分析 —— PreparedStatement 重新入门

    另外推荐一篇文章:《JDBC PreparedStatement》。

     

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 数据库中间件 MyCAT源码分析 —— PreparedStatement 重新入门