数据库中间件 MyCAT源码分析:【单库单表】插入【推荐阅读】
???關(guān)注微信公眾號(hào):【芋艿的后端小屋】有福利:
- 1. 概述
- 2. 接收請(qǐng)求,解析 SQL
- 3. 獲得路由結(jié)果
- 4. 獲得 MySQL 連接,執(zhí)行 SQL
- 5. 響應(yīng)執(zhí)行 SQL 結(jié)果
1. 概述
內(nèi)容形態(tài)以 順序圖 + 核心代碼 為主。
如果有地方表述不錯(cuò)誤或者不清晰,歡迎留言。
對(duì)于內(nèi)容形態(tài),非常糾結(jié),如果有建議,特別特別特別歡迎您提出。
微信號(hào):wangwenbin-server。
本文講解 【單庫(kù)單表】插入 所涉及到的代碼。交互如下圖:
單庫(kù)單表插入簡(jiǎn)圖整個(gè)過(guò)程,MyCAT Server 流程如下:
我們逐個(gè)步驟分析,一起來(lái)看看源碼。
2. 接收請(qǐng)求,解析 SQL
【單庫(kù)單表】插入(01主流程)【 1 - 2 】
接收一條 MySQL 命令。在【1】之前,還有請(qǐng)求數(shù)據(jù)讀取、拆成單條 MySQL SQL。
【 3 】
不同 MySQL 命令,分發(fā)到不同的方法執(zhí)行。核心代碼如下:
1: // ??????【FrontendCommandHandler.java】2: public class FrontendCommandHandler implements NIOHandler {3: 4: 5: public void handle(byte[] data) {6: 7: // .... 省略部分代碼8: switch (data[4]) // 9: {10: case MySQLPacket.COM_INIT_DB:11: commands.doInitDB();12: source.initDB(data);13: break;14: case MySQLPacket.COM_QUERY: // 查詢命令15: // 計(jì)數(shù)查詢命令16: commands.doQuery();17: // 執(zhí)行查詢命令18: source.query(data);19: break;20: case MySQLPacket.COM_PING:21: commands.doPing();22: source.ping();23: break;24: // .... 省略部分case25: }26: }27: 28: }復(fù)制代碼INSERT/SELECT/UPDATE/DELETE 等 SQL 歸屬于 MySQLPacket.COM_QUERY,詳細(xì)可見(jiàn):《MySQL協(xié)議分析#4.2 客戶端命令請(qǐng)求報(bào)文(客戶端 -> 服務(wù)器)》。
##【 4 】
將 二進(jìn)制數(shù)組 解析成 SQL。核心代碼如下:
1: // ??????【FrontendConnection.java】2: public void query(byte[] data) {3: // 取得語(yǔ)句4: String sql = null; 5: try {6: MySQLMessage mm = new MySQLMessage(data);7: mm.position(5);8: sql = mm.readString(charset);9: } catch (UnsupportedEncodingException e) {10: writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + charset + "'");11: return;12: } 13: // 執(zhí)行語(yǔ)句14: this.query( sql );15: }復(fù)制代碼##【 5 】
解析 SQL 類型。核心代碼如下:
1: // ??????【ServerQueryHandler.java】2: 3: public void query(String sql) {4: // 解析 SQL 類型5: int rs = ServerParse.parse(sql);6: int sqlType = rs & 0xff;7: 8: switch (sqlType) {9: //explain sql10: case ServerParse.EXPLAIN:11: ExplainHandler.handle(sql, c, rs >>> 8);12: break;13: // .... 省略部分case14: break;15: case ServerParse.SELECT:16: SelectHandler.handle(sql, c, rs >>> 8);17: break;18: // .... 省略部分case19: default:20: if(readOnly){21: LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString());22: c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly");23: break;24: }25: c.execute(sql, rs & 0xff);26: }27: }28: 29:30: // ??????【ServerParse.java】31: public static int parse(String stmt) {32: int length = stmt.length();33: //FIX BUG FOR SQL SUCH AS /XXXX/SQL34: int rt = -1;35: for (int i = 0; i < length; ++i) {36: switch (stmt.charAt(i)) {37: // .... 省略部分case case 'I':38: case 'i':39: rt = insertCheck(stmt, i);40: if (rt != OTHER) {41: return rt;42: }43: continue;44: // .... 省略部分case45: case 'S':46: case 's':47: rt = sCheck(stmt, i);48: if (rt != OTHER) {49: return rt;50: }51: continue;52: // .... 省略部分case53: default:54: continue;55: }56: }57: return OTHER;58: }復(fù)制代碼##【 6 】
執(zhí)行 SQL,詳細(xì)解析見(jiàn)下文,核心代碼如下:
1: // ??????【ServerConnection.java】2: public class ServerConnection extends FrontendConnection {3: public void execute(String sql, int type) {4: // .... 省略代碼5: SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);6: if (schema == null) {7: writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,8: "Unknown MyCAT Database '" + db + "'");9: return;10: }11: 12: // .... 省略代碼13: 14: // 路由到后端數(shù)據(jù)庫(kù),執(zhí)行 SQL15: routeEndExecuteSQL(sql, type, schema);16: }17: 18: public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) {19: // 路由計(jì)算20: RouteResultset rrs = null;21: try {22: rrs = MycatServer23: .getInstance()24: .getRouterservice()25: .route(MycatServer.getInstance().getConfig().getSystem(),26: schema, type, sql, this.charset, this);27: 28: } catch (Exception e) {29: StringBuilder s = new StringBuilder();30: LOGGER.warn(s.append(this).append(sql).toString() + " err:" + e.toString(),e);31: String msg = e.getMessage();32: writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg);33: return;34: }35: 36: // 執(zhí)行 SQL37: if (rrs != null) {38: // session執(zhí)行39: session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type);40: }41: 42: }43: 44: }復(fù)制代碼3. 獲得路由結(jié)果
【單庫(kù)單表】插入(02獲取路由)【 1 - 2 】【 12 】
獲得路由主流程。核心代碼如下:
1: // ??????【RouteService.java】2: public RouteResultset route(SystemConfig sysconf, SchemaConfig schema,3: int sqlType, String stmt, String charset, ServerConnection sc)4: throws SQLNonTransientException {5: RouteResultset rrs = null;6: // .... 省略代碼7: int hintLength = RouteService.isHintSql(stmt);8: if(hintLength != -1){ // TODO 待讀:hint9: // .... 省略代碼10: }11: } else {12: stmt = stmt.trim();13: rrs = RouteStrategyFactory.getRouteStrategy().route(sysconf, schema, sqlType, stmt,14: charset, sc, tableId2DataNodeCache);15: }16: 17: // .... 省略代碼 return rrs;18: }19: // ??????【AbstractRouteStrategy.java】20: 21: public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema, int sqlType, String origSQL,22: String charset, ServerConnection sc, LayerCachePool cachePool) throws SQLNonTransientException {23: 24: // .... 省略代碼25: 26: // 處理一些路由之前的邏輯;全局序列號(hào),父子表插入27: if (beforeRouteProcess(schema, sqlType, origSQL, sc) ) {28: return null;29: }30: 31: // .... 省略代碼32: 33: // 檢查是否有分片34: if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {35: rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);36: } else {37: RouteResultset returnedSet = routeSystemInfo(schema, sqlType, stmt, rrs);38: if (returnedSet == null) {39: rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool,sqlType,sc);40: }41: }42: 43: return rrs;44: }復(fù)制代碼路由 詳細(xì)解析,我們另開(kāi)文章,避免內(nèi)容過(guò)多,影響大家對(duì)【插入】流程和邏輯的理解。
【 3 - 6 】
路由前置處理。當(dāng)符合如下三種情況下,進(jìn)行處理:
{ 1 } 使用全局序列號(hào):
insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name')復(fù)制代碼{ 2 } ER 子表插入
{ 3 } 主鍵使用自增 ID 插入:
情況 { 1 } { 3 } 情況類似,使用全局序列號(hào)。
核心代碼如下:
1: // ??????【AbstractRouteStrategy.java】2: private boolean beforeRouteProcess(SchemaConfig schema, int sqlType, String origSQL, ServerConnection sc)3: throws SQLNonTransientException {4: return // 處理 id 使用 全局序列號(hào)5: RouterUtil.processWithMycatSeq(schema, sqlType, origSQL, sc)6: // 處理 ER 子表7: || (sqlType == ServerParse.INSERT && RouterUtil.processERChildTable(schema, origSQL, sc))8: // 處理 id 自增長(zhǎng)9: || (sqlType == ServerParse.INSERT && RouterUtil.processInsert(schema, sqlType, origSQL, sc));10: }復(fù)制代碼RouterUtil.java 處理 SQL 考慮性能,實(shí)現(xiàn)會(huì)比較 C-style,代碼咱就不貼了,傳送門(mén):github.com/YunaiV/Myca… (?該倉(cāng)庫(kù)從官方 Fork,逐步完善中文注釋,歡迎 Star)
【 7 - 11 】
當(dāng)前置路由處理全局序列號(hào)時(shí),添加到全局序列處理器(MyCATSequnceProcessor)。該處理器會(huì)異步生成 ID,替換 SQL 內(nèi)的 NEXT VALUE FOR MYCATSEQ_ 正則。例如:
insert into table (id, name) values (NEXT VALUE FOR MYCATSEQ_ID, 'name') ===> insert into table (id, name) values (868348974560579584, 'name')復(fù)制代碼異步處理完后,調(diào)用 ServerConnection#routeEndExecuteSQL(sql, type, schema) 方法重新執(zhí)行 SQL。
核心代碼如下:
1: // ??????【RouterUtil.java】2: public static void processSQL(ServerConnection sc,SchemaConfig schema,String sql,int sqlType){3: SessionSQLPair sessionSQLPair = new SessionSQLPair(sc.getSession2(), schema, sql, sqlType);4: MycatServer.getInstance().getSequnceProcessor().addNewSql(sessionSQLPair);5: }6: // ??????【MyCATSequnceProcessor.java】7: public class MyCATSequnceProcessor {8: private LinkedBlockingQueue<SessionSQLPair> seqSQLQueue = new LinkedBlockingQueue<SessionSQLPair>();9: private volatile boolean running=true;10: 11: public void addNewSql(SessionSQLPair pair) {12: seqSQLQueue.add(pair);13: }14: 15: private void executeSeq(SessionSQLPair pair) {16: try {17: 18: // 使用Druid解析器實(shí)現(xiàn)sequence處理 @兵臨城下19: DruidSequenceHandler sequenceHandler = new DruidSequenceHandler(MycatServer20: .getInstance().getConfig().getSystem().getSequnceHandlerType());21: 22: // 生成可執(zhí)行 SQL :目前主要是生成 id23: String charset = pair.session.getSource().getCharset();24: String executeSql = sequenceHandler.getExecuteSql(pair.sql,charset == null ? "utf-8":charset);25: 26: // 執(zhí)行 SQL27: pair.session.getSource().routeEndExecuteSQL(executeSql, pair.type,pair.schema);28: } catch (Exception e) {29: LOGGER.error("MyCATSequenceProcessor.executeSeq(SesionSQLPair)",e);30: pair.session.getSource().writeErrMessage(ErrorCode.ER_YES,"mycat sequnce err." + e);31: return;32: }33: }34: 35: class ExecuteThread extends Thread {36: 37: public ExecuteThread() {38: setDaemon(true); // 設(shè)置為后臺(tái)線程,防止throw RuntimeExecption進(jìn)程仍然存在的問(wèn)題39: }40: 41: public void run() {42: while (running) {43: try {44: SessionSQLPair pair=seqSQLQueue.poll(100,TimeUnit.MILLISECONDS);45: if(pair!=null){46: executeSeq(pair);47: }48: } catch (Exception e) {49: LOGGER.warn("MyCATSequenceProcessor$ExecutorThread",e);50: }51: }52: }53: }54: }復(fù)制代碼?此處有個(gè)疑問(wèn):MyCATSequnceProcessor 是單線程,會(huì)不會(huì)插入性能有一定的影響?后續(xù)咱做下性能測(cè)試。
4. 獲得 MySQL 連接,執(zhí)行 SQL
【單庫(kù)單表】插入(03執(zhí)行 SQL)【 1 - 8 】
獲得 MySQL 連接。
- PhysicalDBNode :物理數(shù)據(jù)庫(kù)節(jié)點(diǎn)。
- PhysicalDatasource :物理數(shù)據(jù)庫(kù)數(shù)據(jù)源。
【 9 - 13 】
發(fā)送 SQL 到 MySQL Server,執(zhí)行 SQL。
5. 響應(yīng)執(zhí)行 SQL 結(jié)果
【單庫(kù)單表】插入(04執(zhí)行響應(yīng))【 1 - 4 】
處理 MySQL Server 響應(yīng)數(shù)據(jù)包。
【 5 - 8 】
發(fā)送插入成功結(jié)果給 MySQL Client。
總結(jié)
以上是生活随笔為你收集整理的数据库中间件 MyCAT源码分析:【单库单表】插入【推荐阅读】的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 以高效节能为使命 绿色数据中心势在必行
- 下一篇: 又一轮电邮中间人攻击来袭 企业如何自保?