数据库中间件 MyCAT 源码分析 —— 【单库单表】查询

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 数据库中间件 MyCAT 源码分析 —— 【单库单表】查询

本文主要基于 MyCAT 1.6.5 正式版

  • 1. 概述
  • 2. 接收请求,解析 SQL
  • 3. 获得路由结果
  • 4. 获得 MySQL 连接,执行 SQL
  • 5. 响应执行 SQL 结果
  • 6. 其他 :更新 / 删除
  • 友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】搞基嗨皮。

    友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】】搞基嗨皮。

    友情提示:欢迎关注公众号【芋道源码】。😈关注后,拉你进【源码圈】微信群和【芋艿】】搞基嗨皮。

    1. 概述

    内容形态以 顺序图 + 核心代码 为主。  如果有地方表述不错误或者不清晰,欢迎留言。  对于内容形态,非常纠结,如果有建议,特别特别特别欢迎您提出。  微信号:wangwenbin-server。

    本文讲解 【单库单表】查询 所涉及到的代码。

    😂内容和 《MyCAT 源码分析 —— 【单库单表】插入》 超级相似,一方面本身流程基本相同,另外一方面文章结构没拆分好。我们使用 🚀 标记差异的逻辑。

    交互如下图:

    整个过程,MyCAT Server 流程如下:

  • 接收 MySQL Client 请求,解析 SQL。
  • 获得路由结果,进行路由。
  • 获得 MySQL 连接,执行 SQL。
  • 响应执行结果,发送结果给 MySQL Client。
  • 我们逐个步骤分析,一起来看看源码。

    2. 接收请求,解析 SQL

    【1 - 2】

    接收一条 MySQL 命令。在【1】之前,还有请求数据读取、拆成单条 MySQL SQL。

    【3】

    
      1: // ⬇️⬇️⬇️【FrontendCommandHandler.java】
      2: public class FrontendCommandHandler implements NIOHandler {
      3: 
      4:     @Override
      5:     public void handle(byte[] data) {
      6:     
      7:         // .... 省略部分代码
      8:         switch (data[4]) // 
      9:         {
     10:             case MySQLPacket.COM_INIT_DB:
     11:                 commands.doInitDB();
     12:                 source.initDB(data);
     13:                 break;
     14:             case MySQLPacket.COM_QUERY: // 查询命令
     15:                 // 计数查询命令
     16:                 commands.doQuery();
     17:                 // 执行查询命令
     18:                 source.query(data);
     19:                 break;
     20:             case MySQLPacket.COM_PING:
     21:                 commands.doPing();
     22:                 source.ping();
     23:                 break;
     24:             // .... 省略部分case
     25:         }
     26:     }
     27: 
     28: }
    

    INSERT/ SELECT/ UPDATE/ DELETE 等 SQL 归属于  MySQLPacket.COM_QUERY,详细可见:《MySQL协议分析#4.2 客户端命令请求报文(客户端 - 服务器)》。

    【4】

    将 二进制数组 解析成 SQL。核心代码如下:

    
      1: // ⬇️⬇️⬇️【FrontendConnection.java】
      2: public void query(byte[] data) {
      3:     // 取得语句
      4:     String sql = null;      
      5:     try {
      6:         MySQLMessage mm = new MySQLMessage(data);
      7:         mm.position(5);
      8:         sql = mm.readString(charset);
      9:     } catch (UnsupportedEncodingException e) {
     10:         writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");
     11:         return;
     12:     }       
     13:     // 执行语句
     14:     this.query( sql );
     15: }
    

    【5】

    解析 SQL 类型。核心代码如下:

    
      1: // ⬇️⬇️⬇️【ServerQueryHandler.java】
      2: @Override
      3: public void query(String sql) {
      4:     // 解析 SQL 类型
      5:     int rs = ServerParse.parse(sql);
      6:     int sqlType = rs & 0xff;
      7:     
      8:     switch (sqlType) {
      9:     //explain sql
     10:     case ServerParse.EXPLAIN:
     11:         ExplainHandler.handle(sql, c, rs  8);
     12:         break;
     13:     // .... 省略部分case
     14:         break;
     15:     case ServerParse.SELECT:
     16:         SelectHandler.handle(sql, c, rs  8);
     17:         break;
     18:     // .... 省略部分case
     19:     default:
     20:         if(readOnly){
     21:             LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());
     22:             c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");
     23:             break;
     24:         }
     25:         c.execute(sql, rs & 0xff);
     26:     }
     27: }
     28: 
     29:
     30: // ⬇️⬇️⬇️【ServerParse.java】
     31: public static int parse(String stmt) {
     32:     int length = stmt.length();
     33:     //FIX BUG FOR SQL SUCH AS /XXXX/SQL
     34:     int rt = -1;
     35:     for (int i = 0; i  length; ++i) {
     36:         switch (stmt.charAt(i)) {
     37:         // .... 省略部分case            case 'I':
     38:         case 'i':
     39:             rt = insertCheck(stmt, i);
     40:             if (rt != OTHER) {
     41:                 return rt;
     42:             }
     43:             continue;
     44:             // .... 省略部分case
     45:         case 'S':
     46:         case 's':
     47:             rt = sCheck(stmt, i);
     48:             if (rt != OTHER) {
     49:                 return rt;
     50:             }
     51:             continue;
     52:             // .... 省略部分case
     53:         default:
     54:             continue;
     55:         }
     56:     }
     57:     return OTHER;
     58: }
    

    🚀【6】【7】

    解析 Select SQL 类型,分发到对应的逻辑。核心代码如下:

    
      1: // ⬇️⬇️⬇️【SelectHandler.java】
      2: public static void handle(String stmt, ServerConnection c, int offs) {
      3:     int offset = offs;
      4:     switch (ServerParseSelect.parse(stmt, offs)) { // 解析 Select SQL 类型
      5:     case ServerParseSelect.VERSION_COMMENT: // select @@VERSION_COMMENT;
      6:         SelectVersionComment.response(c);
      7:         break;
      8:     case ServerParseSelect.DATABASE: // select DATABASE();
      9:         SelectDatabase.response(c);
     10:         break;
     11:     case ServerParseSelect.USER: // select CURRENT_USER();
     12:         SelectUser.response(c);
     13:         break;
     14:     case ServerParseSelect.VERSION: // select VERSION();
     15:         SelectVersion.response(c);
     16:         break;
     17:     case ServerParseSelect.SESSION_INCREMENT: // select @@session.auto_increment_increment;
     18:         SessionIncrement.response(c);
     19:         break;
     20:     case ServerParseSelect.SESSION_ISOLATION: // select @@session.tx_isolation;
     21:         SessionIsolation.response(c);
     22:         break;
     23:     case ServerParseSelect.LAST_INSERT_ID: // select LAST_INSERT_ID();
     24:         // ....省略代码
     25:         break;
     26:     case ServerParseSelect.IDENTITY: // select @@identity
     27:         // ....省略代码
     28:         break;
     29:     case ServerParseSelect.SELECT_VAR_ALL: //
     30:         SelectVariables.execute(c,stmt);
     31:             break;
     32:     case ServerParseSelect.SESSION_TX_READ_ONLY: //
     33:         SelectTxReadOnly.response(c);
     34:             break;
     35:     default: // 其他,例如 select * from table
     36:         c.execute(stmt, ServerParse.SELECT);
     37:     }
     38: }
     39: // ⬇️⬇️⬇️【ServerParseSelect.java】
     40: public static int parse(String stmt, int offset) {
     41:     int i = offset;
     42:     for (; i  stmt.length(); ++i) {
     43:         switch (stmt.charAt(i)) {
     44:         case ' ':
     45:             continue;
     46:         case '/':
     47:         case '#':
     48:             i = ParseUtil.comment(stmt, i);
     49:             continue;
     50:         case '@':
     51:             return select2Check(stmt, i);
     52:         case 'D':
     53:         case 'd':
     54:             return databaseCheck(stmt, i);
     55:         case 'L':
     56:         case 'l':
     57:             return lastInsertCheck(stmt, i);
     58:         case 'U':
     59:         case 'u':
     60:             return userCheck(stmt, i);
     61:         case 'C':
     62:         case 'c':
     63:             return currentUserCheck(stmt, i);
     64:         case 'V':
     65:         case 'v':
     66:             return versionCheck(stmt, i);
     67:         default:
     68:             return OTHER;
     69:         }
     70:     }
     71:     return OTHER;
     72: }
    

    【8】

    执行 SQL,详细解析见下文,核心代码如下:

    
      1: // ⬇️⬇️⬇️【ServerConnection.java】
      2: public class ServerConnection extends FrontendConnection {
      3:     public void execute(String sql, int type) {
      4:         // .... 省略代码
      5:         SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);
      6:         if (schema == null) {
      7:             writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,
      8:                     "Unknown MyCAT Database '" + db + "'");
      9:             return;
     10:         }
     11: 
     12:         // .... 省略代码
     13: 
     14:         // 路由到后端数据库,执行 SQL
     15:         routeEndExecuteSQL(sql, type, schema);
     16:     }
     17:     
     18:     public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) {
     19:         // 路由计算
     20:         RouteResultset rrs = null;
     21:         try {
     22:             rrs = MycatServer
     23:                     .getInstance()
     24:                     .getRouterservice()
     25:                     .route(MycatServer.getInstance().getConfig().getSystem(),
     26:                             schema, type, sql, this.charset, this);
     27: 
     28:         } catch (Exception e) {
     29:             StringBuilder s = new StringBuilder();
     30:             LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e);
     31:             String msg = e.getMessage();
     32:             writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg);
     33:             return;
     34:         }
     35: 
     36:         // 执行 SQL
     37:         if (rrs != null) {
     38:             // session执行
     39:             session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);
     40:         }
     41:         
     42:      }
     43: 
     44: }
    

    3. 获得路由结果

    【 1 - 5 】

    获得路由主流程。核心代码如下:

    
      1: // ⬇️⬇️⬇️【SelectHandler.java】
      2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema,
      3:         int sqlType, String stmt, String charset, ServerConnection sc)
      4:         throws SQLNonTransientException {
      5:     RouteResultset rrs = null;
      6: 
      7:     // SELECT 类型的SQL, 检测缓存是否存在
      8:     if (sqlType == ServerParse.SELECT) {
      9:         cacheKey = schema.getName() + stmt;         
     10:         rrs = (RouteResultset) sqlRouteCache.get(cacheKey);
     11:         if (rrs != null) {
     12:             checkMigrateRule(schema.getName(),rrs,sqlType);
     13:             return rrs;
     14:             }
     15:         }
     16:     }
     17: 
     18:     // .... 省略代码
     19:     int hintLength = RouteService.isHintSql(stmt);
     20:     if(hintLength != -1){ // TODO 待读:hint
     21:         // .... 省略代码
     22:         }
     23:     } else {
     24:         stmt = stmt.trim();
     25:         rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt,
     26:                 charset, sc, tableId2DataNodeCache);
     27:     }
     28: 
     29:     // 记录查询命令路由结果缓存
     30:     if (rrs != null && sqlType == ServerParse.SELECT && rrs.isCacheAble()) {
     31:         sqlRouteCache.putIfAbsent(cacheKey, rrs);
     32:     }
     33:     // .... 省略代码        return rrs;
     34: }
     35: // ⬇️⬇️⬇️【AbstractRouteStrategy.java】
     36: @Override
     37: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,
     38:         String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {
     39: 
     40:     // .... 省略代码
     41: 
     42:     // 处理一些路由之前的逻辑;全局序列号,父子表插入
     43:     if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) {
     44:         return null;
     45:     }
     46: 
     47:     // .... 省略代码
     48: 
     49:     // 检查是否有分片
     50:     if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
     51:         rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
     52:     } else {
     53:         RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);
     54:         if (returnedSet == null) {
     55:             rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);
     56:         }
     57:     }
     58: 
     59:     return rrs;
     60: }
    

    🚀【3】第 7 至 16 行 :当 Select SQL 存在路由结果缓存时,直接返回缓存。
    🚀【6】第 29 至 32 行 :记录 Select SQL 路由结果到缓存。

    路由 详细解析,我们另开文章,避免内容过多,影响大家对【插入】流程和逻辑的理解。

    4. 获得 MySQL 连接,执行 SQL

    【 1 - 8 】

    获得 MySQL 连接。

  • PhysicalDBNode :物理数据库节点。
  • PhysicalDatasource :物理数据库数据源。
  • 【 9 - 13 】

    发送 SQL 到 MySQL Server,执行 SQL。

    🚀 5. 响应执行 SQL 结果

    核心代码如下:

    
      1: // ⬇️⬇️⬇️【MySQLConnectionHandler.java】
      2: @Override
      3: protected void handleData(byte[] data) {
      4:     switch (resultStatus) {
      5:     case RESULT_STATUS_INIT:
      6:         switch (data[4]) {
      7:         case OkPacket.FIELD_COUNT:
      8:             handleOkPacket(data);
      9:             break;
     10:         case ErrorPacket.FIELD_COUNT:
     11:             handleErrorPacket(data);
     12:             break;
     13:         case RequestFilePacket.FIELD_COUNT:
     14:             handleRequestPacket(data);
     15:             break;
     16:         default: // 初始化 header fields
     17:             resultStatus = RESULT_STATUS_HEADER;
     18:             header = data;
     19:             fields = new ArrayListbyte[]((int) ByteUtil.readLength(data,
     20:                     4));
     21:         }
     22:         break;
     23:     case RESULT_STATUS_HEADER:
     24:         switch (data[4]) {
     25:         case ErrorPacket.FIELD_COUNT:
     26:             resultStatus = RESULT_STATUS_INIT;
     27:             handleErrorPacket(data);
     28:             break;
     29:         case EOFPacket.FIELD_COUNT: // 解析 fields 结束
     30:             resultStatus = RESULT_STATUS_FIELD_EOF;
     31:             handleFieldEofPacket(data);
     32:             break;
     33:         default: // 解析 fields
     34:             fields.add(data);
     35:         }
     36:         break;
     37:     case RESULT_STATUS_FIELD_EOF:
     38:         switch (data[4]) {
     39:         case ErrorPacket.FIELD_COUNT:
     40:             resultStatus = RESULT_STATUS_INIT;
     41:             handleErrorPacket(data);
     42:             break;
     43:         case EOFPacket.FIELD_COUNT: // 解析 每行记录 结束
     44:             resultStatus = RESULT_STATUS_INIT;
     45:             handleRowEofPacket(data);
     46:             break;
     47:         default: // 每行记录
     48:             handleRowPacket(data);
     49:         }
     50:         break;
     51:     default:
     52:         throw new RuntimeException("unknown status!");
     53:     }
     54: }
    

    6. 其他 :更新 / 删除

    流程基本和 《MyCAT源码分析:【单库单表】插入》 相同。我们就不另外文章解析。

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 数据库中间件 MyCAT 源码分析 —— 【单库单表】查询


     上一篇
    并发编程系列之自定义线程池 并发编程系列之自定义线程池
    前言 前面我们在讲并发工具类的时候,多次提到线程池,今天我们就来走进线程池的旅地,首先我们先不讲线程池框架Executors,我们今天先来介绍如何自己定义一个线程池,是不是已经迫不及待了,那么就让我们开启今天的旅途吧。 什么是线
    2021-04-05
    下一篇 
    数据库中间件 MyCAT源码分析——【单库单表】插入 数据库中间件 MyCAT源码分析——【单库单表】插入
    本文主要基于 MyCAT 1.6.5 正式版 1. 概述 2. 接收请求,解析 SQL 3. 获得路由结果 4. 获得 MySQL 连接,执行 SQL 5. 响应执行 SQL 结果 友情提示:欢迎关注公众号【芋道源码】
    2021-04-05