数据库中间件 MyCAT源码分析——【单库单表】插入

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 数据库中间件 MyCAT源码分析——【单库单表】插入

本文主要基于 MyCAT 1.6.5 正式版

  • 1. 概述
  • 2. 接收请求,解析 SQL
  • 3. 获得路由结果
  • 4. 获得 MySQL 连接,执行 SQL
  • 5. 响应执行 SQL 结果
  • 友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。

    友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】】搞基嗨皮。

    友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】】搞基嗨皮。

    1. 概述

    内容形态以 顺序图 + 核心代码 为主。  如果有地方表述不错误或者不清晰,欢迎留言。  对于内容形态,非常纠结,如果有建议,特别特别特别欢迎您提出。  微信号:wangwenbin-server。

    本文讲解 【单库单表】插入 所涉及到的代码。交互如下图:

    整个过程,MyCAT Server 流程如下:

  • 接收 MySQL Client 请求,解析 SQL。
  • 获得路由结果,进行路由。
  • 获得 MySQL 连接,执行 SQL。
  • 响应执行结果,发送结果给 MySQL Client。
  • 我们逐个步骤分析,一起来看看源码。

    2. 接收请求,解析 SQL

    【 1 - 2 】

    接收一条 MySQL 命令。在【1】之前,还有请求数据读取、拆成单条 MySQL SQL。

    【 3 】

    不同 MySQL 命令,分发到不同的方法执行。核心代码如下:

    
      1: // ⬇️⬇️⬇️【FrontendCommandHandler.java】
      2: public class FrontendCommandHandler implements NIOHandler {
      3: 
      4:     @Override
      5:     public void handle(byte[] data) {
      6:     
      7:         // .... 省略部分代码
      8:         switch (data[4]) // 
      9:         {
     10:             case MySQLPacket.COM_INIT_DB:
     11:                 commands.doInitDB();
     12:                 source.initDB(data);
     13:                 break;
     14:             case MySQLPacket.COM_QUERY: // 查询命令
     15:                 // 计数查询命令
     16:                 commands.doQuery();
     17:                 // 执行查询命令
     18:                 source.query(data);
     19:                 break;
     20:             case MySQLPacket.COM_PING:
     21:                 commands.doPing();
     22:                 source.ping();
     23:                 break;
     24:             // .... 省略部分case
     25:         }
     26:     }
     27: 
     28: }
    

    INSERT/ SELECT/ UPDATE/ DELETE 等 SQL 归属于  MySQLPacket.COM_QUERY,详细可见:《MySQL协议分析#4.2 客户端命令请求报文(客户端 - 服务器)》。

    【 4 】

    将 二进制数组 解析成 SQL。核心代码如下:

    
      1: // ⬇️⬇️⬇️【FrontendConnection.java】
      2: public void query(byte[] data) {
      3:     // 取得语句
      4:     String sql = null;      
      5:     try {
      6:         MySQLMessage mm = new MySQLMessage(data);
      7:         mm.position(5);
      8:         sql = mm.readString(charset);
      9:     } catch (UnsupportedEncodingException e) {
     10:         writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");
     11:         return;
     12:     }       
     13:     // 执行语句
     14:     this.query( sql );
     15: }
    

    【 5 】

    解析 SQL 类型。核心代码如下:

    
      1: // ⬇️⬇️⬇️【ServerQueryHandler.java】
      2: @Override
      3: public void query(String sql) {
      4:     // 解析 SQL 类型
      5:     int rs = ServerParse.parse(sql);
      6:     int sqlType = rs & 0xff;
      7:     
      8:     switch (sqlType) {
      9:     //explain sql
     10:     case ServerParse.EXPLAIN:
     11:         ExplainHandler.handle(sql, c, rs  8);
     12:         break;
     13:     // .... 省略部分case
     14:         break;
     15:     case ServerParse.SELECT:
     16:         SelectHandler.handle(sql, c, rs  8);
     17:         break;
     18:     // .... 省略部分case
     19:     default:
     20:         if(readOnly){
     21:             LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());
     22:             c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");
     23:             break;
     24:         }
     25:         c.execute(sql, rs & 0xff);
     26:     }
     27: }
     28: 
     29:
     30: // ⬇️⬇️⬇️【ServerParse.java】
     31: public static int parse(String stmt) {
     32:     int length = stmt.length();
     33:     //FIX BUG FOR SQL SUCH AS /XXXX/SQL
     34:     int rt = -1;
     35:     for (int i = 0; i  length; ++i) {
     36:         switch (stmt.charAt(i)) {
     37:         // .... 省略部分case            case 'I':
     38:         case 'i':
     39:             rt = insertCheck(stmt, i);
     40:             if (rt != OTHER) {
     41:                 return rt;
     42:             }
     43:             continue;
     44:             // .... 省略部分case
     45:         case 'S':
     46:         case 's':
     47:             rt = sCheck(stmt, i);
     48:             if (rt != OTHER) {
     49:                 return rt;
     50:             }
     51:             continue;
     52:             // .... 省略部分case
     53:         default:
     54:             continue;
     55:         }
     56:     }
     57:     return OTHER;
     58: }
    

    【 6 】

    执行 SQL,详细解析见下文,核心代码如下:

    
      1: // ⬇️⬇️⬇️【ServerConnection.java】
      2: public class ServerConnection extends FrontendConnection {
      3:     public void execute(String sql, int type) {
      4:         // .... 省略代码
      5:         SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);
      6:         if (schema == null) {
      7:             writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,
      8:                     "Unknown MyCAT Database '" + db + "'");
      9:             return;
     10:         }
     11: 
     12:         // .... 省略代码
     13: 
     14:         // 路由到后端数据库,执行 SQL
     15:         routeEndExecuteSQL(sql, type, schema);
     16:     }
     17:     
     18:     public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) {
     19:         // 路由计算
     20:         RouteResultset rrs = null;
     21:         try {
     22:             rrs = MycatServer
     23:                     .getInstance()
     24:                     .getRouterservice()
     25:                     .route(MycatServer.getInstance().getConfig().getSystem(),
     26:                             schema, type, sql, this.charset, this);
     27: 
     28:         } catch (Exception e) {
     29:             StringBuilder s = new StringBuilder();
     30:             LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e);
     31:             String msg = e.getMessage();
     32:             writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg);
     33:             return;
     34:         }
     35: 
     36:         // 执行 SQL
     37:         if (rrs != null) {
     38:             // session执行
     39:             session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);
     40:         }
     41:         
     42:      }
     43: 
     44: }
    

    3. 获得路由结果

    【 1 - 2 】【 12 】

    获得路由主流程。核心代码如下:

    
      1: // ⬇️⬇️⬇️【RouteService.java】
      2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema,
      3:         int sqlType, String stmt, String charset, ServerConnection sc)
      4:         throws SQLNonTransientException {
      5:     RouteResultset rrs = null;
      6:     // .... 省略代码
      7:     int hintLength = RouteService.isHintSql(stmt);
      8:     if(hintLength != -1){ // TODO 待读:hint
      9:         // .... 省略代码
     10:         }
     11:     } else {
     12:         stmt = stmt.trim();
     13:         rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt,
     14:                 charset, sc, tableId2DataNodeCache);
     15:     }
     16: 
     17:     // .... 省略代码        return rrs;
     18: }
     19: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】
     20: @Override
     21: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,
     22:         String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {
     23: 
     24:     // .... 省略代码
     25: 
     26:     // 处理一些路由之前的逻辑;全局序列号,父子表插入
     27:     if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) {
     28:         return null;
     29:     }
     30: 
     31:     // .... 省略代码
     32: 
     33:     // 检查是否有分片
     34:     if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
     35:         rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
     36:     } else {
     37:         RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);
     38:         if (returnedSet == null) {
     39:             rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);
     40:         }
     41:     }
     42: 
     43:     return rrs;
     44: }
    

    路由 详细解析,我们另开文章,避免内容过多,影响大家对【插入】流程和逻辑的理解。

    【 3 - 6 】

    路由前置处理。当符合如下三种情况下,进行处理:

    { 1 } 使用全局序列号

    
    insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')
    

    { 2 } ER 子表插入 
    { 3 } 主键使用自增 ID 插入:

    
    insert into table (name) values ('name')
    ===
    insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')
    

    情况 { 1 } { 3 } 情况类似,使用全局序列号。

    核心代码如下:

    
      1: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】
      2: private boolean beforeRouteProcess(SchemaConfig schema, int sqlType, String origSQL, ServerConnection sc)
      3:         throws SQLNonTransientException {
      4:     return  // 处理 id 使用 全局序列号
      5:             RouterUtil.processWithMycatSeq(schema, sqlType, origSQL, sc)
      6:             // 处理 ER 子表
      7:             || (sqlType == ServerParse.INSERT && RouterUtil.processERChildTable(schema, origSQL, sc))
      8:             // 处理 id 自增长
      9:             || (sqlType == ServerParse.INSERT && RouterUtil.processInsert(schema, sqlType, origSQL, sc));
     10: }
    

    RouterUtil.java 处理 SQL 考虑性能,实现会比较 C-style,代码咱就不贴了,传送门:https://github.com/YunaiV/Mycat-Server/blob/1.6/src/main/java/io/mycat/route/util/RouterUtil.java。 (😈该仓库从官方 Fork,逐步完善中文注释,欢迎 Star)

    【 7 - 11 】

    前置路由处理全局序列号时,添加到全局序列处理器( MyCATSequnceProcessor)。该处理器会异步生成 ID,替换 SQL 内的  NEXT VALUE FOR MYCATSEQ_ 正则。例如:

    
    insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')
    ===
    insert into table (id, name) values (868348974560579584, 'name')
    

    异步处理完后,调用  ServerConnection#routeEndExecuteSQL(sql, type, schema) 方法重新执行 SQL。

    核心代码如下:

    
      1: // ⬇️⬇️⬇️【RouterUtil.java】
      2: public static void processSQL(ServerConnection sc,SchemaConfig schema,String sql,int sqlType){
      3:     SessionSQLPair sessionSQLPair = new SessionSQLPair(sc.getSession2(), schema, sql, sqlType);
      4:     MycatServer.getInstance().getSequnceProcessor().addNewSql(sessionSQLPair);
      5: }
      6: // ⬇️⬇️⬇️【MyCATSequnceProcessor.java】
      7: public class MyCATSequnceProcessor {
      8:     private LinkedBlockingQueueSessionSQLPair seqSQLQueue = new LinkedBlockingQueueSessionSQLPair();
      9:     private volatile boolean running=true;
     10:     
     11:     public void addNewSql(SessionSQLPair pair) {
     12:         seqSQLQueue.add(pair);
     13:     }
     14: 
     15:     private void executeSeq(SessionSQLPair pair) {
     16:         try {
     17:             
     18:             // 使用Druid解析器实现sequence处理  @兵临城下
     19:             DruidSequenceHandler sequenceHandler = new DruidSequenceHandler(MycatServer
     20:                     .getInstance().getConfig().getSystem().getSequnceHandlerType());
     21: 
     22:             // 生成可执行 SQL :目前主要是生成 id
     23:             String charset = pair.session.getSource().getCharset();
     24:             String executeSql = sequenceHandler.getExecuteSql(pair.sql,charset == null ? "utf-8":charset);
     25: 
     26:             // 执行 SQL
     27:             pair.session.getSource().routeEndExecuteSQL(executeSql, pair.type,pair.schema);
     28:         } catch (Exception e) {
     29:             LOGGER.error("MyCATSequenceProcessor.executeSeq(SesionSQLPair)",e);
     30:             pair.session.getSource().writeErrMessage(ErrorCode.ER_YES,"mycat sequnce err." + e);
     31:             return;
     32:         }
     33:     }
     34:     
     35:     class ExecuteThread extends Thread {
     36:         
     37:         public ExecuteThread() {
     38:             setDaemon(true); // 设置为后台线程,防止throw RuntimeExecption进程仍然存在的问题
     39:         }
     40:         
     41:         public void run() {
     42:             while (running) {
     43:                 try {
     44:                     SessionSQLPair pair=seqSQLQueue.poll(100,TimeUnit.MILLISECONDS);
     45:                     if(pair!=null){
     46:                         executeSeq(pair);
     47:                     }
     48:                 } catch (Exception e) {
     49:                     LOGGER.warn("MyCATSequenceProcessor$ExecutorThread",e);
     50:                 }
     51:             }
     52:         }
     53:     }
     54: }
    

    ❓此处有个疑问: MyCATSequnceProcessor 是单线程,会不会插入性能有一定的影响?后续咱做下性能测试。

    4. 获得 MySQL 连接,执行 SQL

    【 1 - 8 】

    获得 MySQL 连接。

  • PhysicalDBNode :物理数据库节点。
  • PhysicalDatasource :物理数据库数据源。
  • 【 9 - 13 】

    发送 SQL 到 MySQL Server,执行 SQL。

    5. 响应执行 SQL 结果

    【 1 - 4 】

    处理 MySQL Server 响应数据包。

    【 5 - 8 】

    发送插入成功结果给 MySQL Client。

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 数据库中间件 MyCAT源码分析——【单库单表】插入


     上一篇
    数据库中间件 MyCAT 源码分析 —— 【单库单表】查询 数据库中间件 MyCAT 源码分析 —— 【单库单表】查询
    本文主要基于 MyCAT 1.6.5 正式版 1. 概述 2. 接收请求,解析 SQL 3. 获得路由结果 4. 获得 MySQL 连接,执行 SQL 5. 响应执行 SQL 结果 6. 其他 :更新 / 删除 友
    2021-04-05
    下一篇 
    数据库中间件 MyCAT 源码分析 —— 调试环境搭建 数据库中间件 MyCAT 源码分析 —— 调试环境搭建
    本文主要基于 MyCAT 1.6.5 正式版 1. 依赖工具 2. 源码拉取 3. 数据库配置 4. MyCat 配置 5. MyCAT 启动 6. MyCAT 测试 7. 交流 友情提示:欢迎关注公众号【芋道
    2021-04-05