From 3c97f2f1752ff828322355c99e05bb4e79baffed Mon Sep 17 00:00:00 2001 From: "tiger.yan" Date: Wed, 6 Mar 2019 14:48:49 +0800 Subject: [PATCH] Issue druid1.1.14 (#1037) * update druid to 1.1.14 #788 * lock multi-table --- pom.xml | 2 +- .../mysql/nio/handler/LockTablesHandler.java | 5 + .../handler/query/impl/BaseSelectHandler.java | 2 +- .../meta/table/DbleCreateTableParser.java | 113 ++++++++++++------ .../druid/impl/DruidLockTableParser.java | 104 ++++++++-------- .../dble/server/ServerConnection.java | 30 +++-- 6 files changed, 159 insertions(+), 97 deletions(-) diff --git a/pom.xml b/pom.xml index 14be48aa4..956a82090 100644 --- a/pom.xml +++ b/pom.xml @@ -96,7 +96,7 @@ com.alibaba druid - 1.1.10 + 1.1.14 mysql diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/LockTablesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/LockTablesHandler.java index f397c3ada..c6da392d7 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/LockTablesHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/LockTablesHandler.java @@ -16,6 +16,7 @@ import com.actiontech.dble.net.mysql.RowDataPacket; import com.actiontech.dble.route.RouteResultset; import com.actiontech.dble.route.RouteResultsetNode; import com.actiontech.dble.server.NonBlockingSession; +import com.actiontech.dble.server.parser.ServerParse; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -95,6 +96,10 @@ public class LockTablesHandler extends MultiNodeHandler { return; } boolean isEndPack = decrementCountBy(1); + final RouteResultsetNode node = (RouteResultsetNode) conn.getAttachment(); + if (node.getSqlType() == ServerParse.UNLOCK) { + session.releaseConnection(conn); + } if (isEndPack) { if (this.isFail() || session.closed()) { tryErrorFinished(true); diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/BaseSelectHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/BaseSelectHandler.java index f36f3bbc7..5af21e0d8 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/BaseSelectHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/BaseSelectHandler.java @@ -180,7 +180,7 @@ public class BaseSelectHandler extends BaseDMLHandler { @Override protected void onTerminate() { - if (autocommit) { + if (autocommit && !session.getSource().isLocked()) { this.session.releaseConnection(rrss, LOGGER.isDebugEnabled(), false); } else { //the connection should wait until the connection running finish diff --git a/src/main/java/com/actiontech/dble/meta/table/DbleCreateTableParser.java b/src/main/java/com/actiontech/dble/meta/table/DbleCreateTableParser.java index 294d560f5..f09fa94a8 100644 --- a/src/main/java/com/actiontech/dble/meta/table/DbleCreateTableParser.java +++ b/src/main/java/com/actiontech/dble/meta/table/DbleCreateTableParser.java @@ -78,7 +78,7 @@ public class DbleCreateTableParser extends MySqlCreateTableParser { stmt.setSelect(query); } else { for (; ; ) { - if (lexer.identifierEquals(FnvHash.Constants.FULLTEXT)) { + if (lexer.token() == Token.FULLTEXT) { Lexer.SavePoint mark = lexer.mark(); lexer.nextToken(); if (lexer.token() == Token.KEY) { @@ -91,8 +91,51 @@ public class DbleCreateTableParser extends MySqlCreateTableParser { lexer.nextToken(); continue; } + } else if (lexer.token() == Token.INDEX) { + lexer.nextToken(); + MySqlTableIndex idx = new MySqlTableIndex(); + idx.setIndexType("FULLTEXT"); + idx.setName(this.exprParser.name()); + + accept(Token.LPAREN); + for (; ; ) { + idx.addColumn(this.exprParser.parseSelectOrderByItem()); + if (!(lexer.token() == (Token.COMMA))) { + break; + } else { + lexer.nextToken(); + } + } + stmt.getTableElementList().add(idx); + accept(Token.RPAREN); + if (lexer.token() == Token.RPAREN) { + break; + } else if (lexer.token() == Token.COMMA) { + lexer.nextToken(); + continue; + } } else { - lexer.reset(mark); + MySqlTableIndex idx = new MySqlTableIndex(); + idx.setIndexType("FULLTEXT"); + idx.setName(this.exprParser.name()); + + accept(Token.LPAREN); + for (; ; ) { + idx.addColumn(this.exprParser.parseSelectOrderByItem()); + if (!(lexer.token() == (Token.COMMA))) { + break; + } else { + lexer.nextToken(); + } + } + stmt.getTableElementList().add(idx); + accept(Token.RPAREN); + if (lexer.token() == Token.RPAREN) { + break; + } else if (lexer.token() == Token.COMMA) { + lexer.nextToken(); + continue; + } } } else if (lexer.identifierEquals(FnvHash.Constants.SPATIAL)) { Lexer.SavePoint mark = lexer.mark(); @@ -549,39 +592,6 @@ public class DbleCreateTableParser extends MySqlCreateTableParser { return stmt; } - private boolean parseTableOptionCharsetOrCollate(MySqlCreateTableStatement stmt) { - if (lexer.identifierEquals("CHARACTER")) { - lexer.nextToken(); - accept(Token.SET); - if (lexer.token() == Token.EQ) { - lexer.nextToken(); - } - stmt.getTableOptions().put("CHARACTER SET", this.exprParser.expr()); - return true; - } - - if (lexer.identifierEquals("CHARSET")) { - lexer.nextToken(); - if (lexer.token() == Token.EQ) { - lexer.nextToken(); - } - stmt.getTableOptions().put("CHARSET", this.exprParser.expr()); - return true; - } - - if (lexer.identifierEquals("COLLATE")) { - lexer.nextToken(); - if (lexer.token() == Token.EQ) { - lexer.nextToken(); - } - stmt.getTableOptions().put("COLLATE", this.exprParser.expr()); - return true; - } - - return false; - } - - private SQLPartitionBy parsePartitionBy() { lexer.nextToken(); accept(Token.BY); @@ -702,4 +712,37 @@ public class DbleCreateTableParser extends MySqlCreateTableParser { } return partitionClause; } + + private boolean parseTableOptionCharsetOrCollate(MySqlCreateTableStatement stmt) { + if (lexer.identifierEquals("CHARACTER")) { + lexer.nextToken(); + accept(Token.SET); + if (lexer.token() == Token.EQ) { + lexer.nextToken(); + } + stmt.getTableOptions().put("CHARACTER SET", this.exprParser.expr()); + return true; + } + + if (lexer.identifierEquals("CHARSET")) { + lexer.nextToken(); + if (lexer.token() == Token.EQ) { + lexer.nextToken(); + } + stmt.getTableOptions().put("CHARSET", this.exprParser.expr()); + return true; + } + + if (lexer.identifierEquals("COLLATE")) { + lexer.nextToken(); + if (lexer.token() == Token.EQ) { + lexer.nextToken(); + } + stmt.getTableOptions().put("COLLATE", this.exprParser.expr()); + return true; + } + + return false; + } + } diff --git a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidLockTableParser.java b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidLockTableParser.java index e987cf822..f69bedeaf 100644 --- a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidLockTableParser.java +++ b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidLockTableParser.java @@ -14,14 +14,13 @@ import com.actiontech.dble.route.util.RouterUtil; import com.actiontech.dble.server.ServerConnection; import com.actiontech.dble.server.parser.ServerParse; import com.actiontech.dble.server.util.SchemaUtil; -import com.actiontech.dble.util.SplitUtil; +import com.actiontech.dble.util.StringUtil; import com.alibaba.druid.sql.ast.SQLStatement; import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlLockTableStatement; -import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlLockTableStatement.LockType; import java.sql.SQLException; import java.sql.SQLNonTransientException; -import java.util.List; +import java.util.*; /** * lock tables [table] [write|read] @@ -32,61 +31,60 @@ public class DruidLockTableParser extends DefaultDruidParser { @Override public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, ServerSchemaStatVisitor visitor, ServerConnection sc) throws SQLException { - // for lock tables table1 write, table2 - // DruidParser can only parser table1, - // use "," to judge - String sql = rrs.getStatement(); - sql = sql.replaceAll("\n", " ").replaceAll("\t", " "); - String[] stmts = SplitUtil.split(sql, ',', true); - // contains "," - if (stmts.length > 1) { - String tmpStmt = null; - String[] tmpWords = null; - for (int i = 1; i < stmts.length; i++) { - tmpStmt = stmts[i]; - tmpWords = SplitUtil.split(tmpStmt, ' ', true); - if (tmpWords.length == 2 && - ("READ".equalsIgnoreCase(tmpWords[1]) || "WRITE".equalsIgnoreCase(tmpWords[1]))) { - // unsupport lock multi-table - continue; - } else { - // unsupport lock multi-table - throw new SQLNonTransientException( - "You have an error in your SQL syntax, don't support lock multi tables!"); - } - } - LOGGER.info("can't lock multi-table"); - throw new SQLNonTransientException("can't lock multi-table"); - } MySqlLockTableStatement lockTableStat = (MySqlLockTableStatement) stmt; - String schemaName = schema == null ? null : schema.getName(); - SchemaUtil.SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(sc.getUser(), schemaName, lockTableStat.getTableSource()); - schema = schemaInfo.getSchemaConfig(); - String table = schemaInfo.getTable(); - String noShardingNode = RouterUtil.isNoShardingDDL(schema, table); - if (noShardingNode != null) { - RouterUtil.routeToSingleNode(rrs, noShardingNode); - rrs.setFinishedRoute(true); - return schema; + Map> dataNodeToLocks = new HashMap<>(); + for (MySqlLockTableStatement.Item item : lockTableStat.getItems()) { + String schemaName = schema == null ? null : schema.getName(); + SchemaUtil.SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(sc.getUser(), schemaName, item.getTableSource()); + SchemaConfig schemaConfig = schemaInfo.getSchemaConfig(); + String table = schemaInfo.getTable(); + String noShardingNode = RouterUtil.isNoShardingDDL(schemaConfig, table); + if (noShardingNode != null) { + StringBuilder sbItem = new StringBuilder(table); + if (item.getTableSource().getAlias() != null) { + sbItem.append(" as "); + sbItem.append(item.getTableSource().getAlias()); + } + sbItem.append(" "); + sbItem.append(item.getLockType()); + List locks = dataNodeToLocks.computeIfAbsent(noShardingNode, k -> new ArrayList<>()); + locks.add(sbItem.toString()); + continue; + } + TableConfig tableConfig = schemaConfig.getTables().get(table); + if (tableConfig == null) { + String msg = "can't find table define of " + table + " in schema:" + schemaConfig.getName(); + LOGGER.info(msg); + throw new SQLNonTransientException(msg); + } + List dataNodes = tableConfig.getDataNodes(); + for (String dataNode : dataNodes) { + StringBuilder sbItem = new StringBuilder(table); + if (item.getTableSource().getAlias() != null) { + sbItem.append(" as "); + sbItem.append(item.getTableSource().getAlias()); + } + sbItem.append(" "); + sbItem.append(item.getLockType()); + List locks = dataNodeToLocks.computeIfAbsent(dataNode, k -> new ArrayList<>()); + locks.add(sbItem.toString()); + } } - TableConfig tableConfig = schema.getTables().get(table); - if (tableConfig == null) { - String msg = "can't find table define of " + table + " in schema:" + schema.getName(); - LOGGER.info(msg); - throw new SQLNonTransientException(msg); + Set lockedNodes = new HashSet<>(); + if (sc.isLocked()) { + lockedNodes.addAll(sc.getSession2().getTargetMap().keySet()); } - LockType lockType = lockTableStat.getLockType(); - if (LockType.WRITE != lockType && LockType.READ != lockType) { - String msg = "lock type must be write or read"; - LOGGER.info(msg); - throw new SQLNonTransientException(msg); + List nodes = new ArrayList<>(); + for (Map.Entry> entry : dataNodeToLocks.entrySet()) { + RouteResultsetNode node = new RouteResultsetNode(entry.getKey(), ServerParse.LOCK, " LOCK TABLES " + StringUtil.join(entry.getValue(), ",")); + nodes.add(node); + lockedNodes.remove(node); } - List dataNodes = tableConfig.getDataNodes(); - RouteResultsetNode[] nodes = new RouteResultsetNode[dataNodes.size()]; - for (int i = 0; i < dataNodes.size(); i++) { - nodes[i] = new RouteResultsetNode(dataNodes.get(i), ServerParse.LOCK, statementToString(stmt)); + for (RouteResultsetNode toUnlockedNode : lockedNodes) { + RouteResultsetNode node = new RouteResultsetNode(toUnlockedNode.getName(), ServerParse.UNLOCK, " UNLOCK TABLES "); + nodes.add(node); } - rrs.setNodes(nodes); + rrs.setNodes(nodes.toArray(new RouteResultsetNode[nodes.size()])); rrs.setFinishedRoute(true); return schema; } diff --git a/src/main/java/com/actiontech/dble/server/ServerConnection.java b/src/main/java/com/actiontech/dble/server/ServerConnection.java index e5c128f1d..d0b0304f0 100644 --- a/src/main/java/com/actiontech/dble/server/ServerConnection.java +++ b/src/main/java/com/actiontech/dble/server/ServerConnection.java @@ -423,22 +423,38 @@ public class ServerConnection extends FrontendConnection { void lockTable(String sql) { // lock table is disable in transaction - if (!autocommit) { - writeErrMessage(ErrorCode.ER_YES, "can't lock table in transaction!"); + if (!autocommit || isTxStart()) { + writeErrMessage(ErrorCode.ER_YES, "can't lock tables in transaction in dble!"); return; } - // if lock table has been executed and unlock has not been executed, can't execute lock table again - if (isLocked) { - writeErrMessage(ErrorCode.ER_YES, "can't lock multi-table"); - return; + + + String db = this.schema; + SchemaConfig schema = null; + if (this.schema != null) { + schema = DbleServer.getInstance().getConfig().getSchemas().get(this.schema); + if (schema == null) { + writeErrMessage(ErrorCode.ERR_BAD_LOGICDB, "Unknown Database '" + db + "'"); + return; + } + } + RouteResultset rrs; + try { + rrs = DbleServer.getInstance().getRouterService().route(schema, ServerParse.LOCK, sql, this); + } catch (Exception e) { + executeException(e, sql); + return ; } - RouteResultset rrs = routeSQL(sql, ServerParse.LOCK); if (rrs != null) { session.lockTable(rrs); } } void unLockTable(String sql) { + if (!autocommit || isTxStart()) { + writeErrMessage(ErrorCode.ER_YES, "can't unlock tables in transaction in dble!"); + return; + } sql = sql.replaceAll("\n", " ").replaceAll("\t", " "); String[] words = SplitUtil.split(sql, ' ', true); if (words.length == 2 && ("table".equalsIgnoreCase(words[1]) || "tables".equalsIgnoreCase(words[1]))) {