Issue druid1.1.14 (#1037)

* update druid to 1.1.14 #788

* lock multi-table
This commit is contained in:
tiger.yan
2019-03-06 14:48:49 +08:00
committed by GitHub
parent d3272d1556
commit 3c97f2f175
6 changed files with 159 additions and 97 deletions

View File

@@ -96,7 +96,7 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
<version>1.1.14</version>
</dependency>
<dependency>
<groupId>mysql</groupId>

View File

@@ -16,6 +16,7 @@ import com.actiontech.dble.net.mysql.RowDataPacket;
import com.actiontech.dble.route.RouteResultset;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.server.parser.ServerParse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,6 +96,10 @@ public class LockTablesHandler extends MultiNodeHandler {
return;
}
boolean isEndPack = decrementCountBy(1);
final RouteResultsetNode node = (RouteResultsetNode) conn.getAttachment();
if (node.getSqlType() == ServerParse.UNLOCK) {
session.releaseConnection(conn);
}
if (isEndPack) {
if (this.isFail() || session.closed()) {
tryErrorFinished(true);

View File

@@ -180,7 +180,7 @@ public class BaseSelectHandler extends BaseDMLHandler {
@Override
protected void onTerminate() {
if (autocommit) {
if (autocommit && !session.getSource().isLocked()) {
this.session.releaseConnection(rrss, LOGGER.isDebugEnabled(), false);
} else {
//the connection should wait until the connection running finish

View File

@@ -78,7 +78,7 @@ public class DbleCreateTableParser extends MySqlCreateTableParser {
stmt.setSelect(query);
} else {
for (; ; ) {
if (lexer.identifierEquals(FnvHash.Constants.FULLTEXT)) {
if (lexer.token() == Token.FULLTEXT) {
Lexer.SavePoint mark = lexer.mark();
lexer.nextToken();
if (lexer.token() == Token.KEY) {
@@ -91,8 +91,51 @@ public class DbleCreateTableParser extends MySqlCreateTableParser {
lexer.nextToken();
continue;
}
} else if (lexer.token() == Token.INDEX) {
lexer.nextToken();
MySqlTableIndex idx = new MySqlTableIndex();
idx.setIndexType("FULLTEXT");
idx.setName(this.exprParser.name());
accept(Token.LPAREN);
for (; ; ) {
idx.addColumn(this.exprParser.parseSelectOrderByItem());
if (!(lexer.token() == (Token.COMMA))) {
break;
} else {
lexer.nextToken();
}
}
stmt.getTableElementList().add(idx);
accept(Token.RPAREN);
if (lexer.token() == Token.RPAREN) {
break;
} else if (lexer.token() == Token.COMMA) {
lexer.nextToken();
continue;
}
} else {
lexer.reset(mark);
MySqlTableIndex idx = new MySqlTableIndex();
idx.setIndexType("FULLTEXT");
idx.setName(this.exprParser.name());
accept(Token.LPAREN);
for (; ; ) {
idx.addColumn(this.exprParser.parseSelectOrderByItem());
if (!(lexer.token() == (Token.COMMA))) {
break;
} else {
lexer.nextToken();
}
}
stmt.getTableElementList().add(idx);
accept(Token.RPAREN);
if (lexer.token() == Token.RPAREN) {
break;
} else if (lexer.token() == Token.COMMA) {
lexer.nextToken();
continue;
}
}
} else if (lexer.identifierEquals(FnvHash.Constants.SPATIAL)) {
Lexer.SavePoint mark = lexer.mark();
@@ -549,39 +592,6 @@ public class DbleCreateTableParser extends MySqlCreateTableParser {
return stmt;
}
private boolean parseTableOptionCharsetOrCollate(MySqlCreateTableStatement stmt) {
if (lexer.identifierEquals("CHARACTER")) {
lexer.nextToken();
accept(Token.SET);
if (lexer.token() == Token.EQ) {
lexer.nextToken();
}
stmt.getTableOptions().put("CHARACTER SET", this.exprParser.expr());
return true;
}
if (lexer.identifierEquals("CHARSET")) {
lexer.nextToken();
if (lexer.token() == Token.EQ) {
lexer.nextToken();
}
stmt.getTableOptions().put("CHARSET", this.exprParser.expr());
return true;
}
if (lexer.identifierEquals("COLLATE")) {
lexer.nextToken();
if (lexer.token() == Token.EQ) {
lexer.nextToken();
}
stmt.getTableOptions().put("COLLATE", this.exprParser.expr());
return true;
}
return false;
}
private SQLPartitionBy parsePartitionBy() {
lexer.nextToken();
accept(Token.BY);
@@ -702,4 +712,37 @@ public class DbleCreateTableParser extends MySqlCreateTableParser {
}
return partitionClause;
}
private boolean parseTableOptionCharsetOrCollate(MySqlCreateTableStatement stmt) {
if (lexer.identifierEquals("CHARACTER")) {
lexer.nextToken();
accept(Token.SET);
if (lexer.token() == Token.EQ) {
lexer.nextToken();
}
stmt.getTableOptions().put("CHARACTER SET", this.exprParser.expr());
return true;
}
if (lexer.identifierEquals("CHARSET")) {
lexer.nextToken();
if (lexer.token() == Token.EQ) {
lexer.nextToken();
}
stmt.getTableOptions().put("CHARSET", this.exprParser.expr());
return true;
}
if (lexer.identifierEquals("COLLATE")) {
lexer.nextToken();
if (lexer.token() == Token.EQ) {
lexer.nextToken();
}
stmt.getTableOptions().put("COLLATE", this.exprParser.expr());
return true;
}
return false;
}
}

View File

@@ -14,14 +14,13 @@ import com.actiontech.dble.route.util.RouterUtil;
import com.actiontech.dble.server.ServerConnection;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.server.util.SchemaUtil;
import com.actiontech.dble.util.SplitUtil;
import com.actiontech.dble.util.StringUtil;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlLockTableStatement;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlLockTableStatement.LockType;
import java.sql.SQLException;
import java.sql.SQLNonTransientException;
import java.util.List;
import java.util.*;
/**
* lock tables [table] [write|read]
@@ -32,61 +31,60 @@ public class DruidLockTableParser extends DefaultDruidParser {
@Override
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, ServerSchemaStatVisitor visitor, ServerConnection sc)
throws SQLException {
// for lock tables table1 write, table2
// DruidParser can only parser table1,
// use "," to judge
String sql = rrs.getStatement();
sql = sql.replaceAll("\n", " ").replaceAll("\t", " ");
String[] stmts = SplitUtil.split(sql, ',', true);
// contains ","
if (stmts.length > 1) {
String tmpStmt = null;
String[] tmpWords = null;
for (int i = 1; i < stmts.length; i++) {
tmpStmt = stmts[i];
tmpWords = SplitUtil.split(tmpStmt, ' ', true);
if (tmpWords.length == 2 &&
("READ".equalsIgnoreCase(tmpWords[1]) || "WRITE".equalsIgnoreCase(tmpWords[1]))) {
// unsupport lock multi-table
continue;
} else {
// unsupport lock multi-table
throw new SQLNonTransientException(
"You have an error in your SQL syntax, don't support lock multi tables!");
}
}
LOGGER.info("can't lock multi-table");
throw new SQLNonTransientException("can't lock multi-table");
}
MySqlLockTableStatement lockTableStat = (MySqlLockTableStatement) stmt;
String schemaName = schema == null ? null : schema.getName();
SchemaUtil.SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(sc.getUser(), schemaName, lockTableStat.getTableSource());
schema = schemaInfo.getSchemaConfig();
String table = schemaInfo.getTable();
String noShardingNode = RouterUtil.isNoShardingDDL(schema, table);
if (noShardingNode != null) {
RouterUtil.routeToSingleNode(rrs, noShardingNode);
rrs.setFinishedRoute(true);
return schema;
Map<String, List<String>> dataNodeToLocks = new HashMap<>();
for (MySqlLockTableStatement.Item item : lockTableStat.getItems()) {
String schemaName = schema == null ? null : schema.getName();
SchemaUtil.SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(sc.getUser(), schemaName, item.getTableSource());
SchemaConfig schemaConfig = schemaInfo.getSchemaConfig();
String table = schemaInfo.getTable();
String noShardingNode = RouterUtil.isNoShardingDDL(schemaConfig, table);
if (noShardingNode != null) {
StringBuilder sbItem = new StringBuilder(table);
if (item.getTableSource().getAlias() != null) {
sbItem.append(" as ");
sbItem.append(item.getTableSource().getAlias());
}
sbItem.append(" ");
sbItem.append(item.getLockType());
List<String> locks = dataNodeToLocks.computeIfAbsent(noShardingNode, k -> new ArrayList<>());
locks.add(sbItem.toString());
continue;
}
TableConfig tableConfig = schemaConfig.getTables().get(table);
if (tableConfig == null) {
String msg = "can't find table define of " + table + " in schema:" + schemaConfig.getName();
LOGGER.info(msg);
throw new SQLNonTransientException(msg);
}
List<String> dataNodes = tableConfig.getDataNodes();
for (String dataNode : dataNodes) {
StringBuilder sbItem = new StringBuilder(table);
if (item.getTableSource().getAlias() != null) {
sbItem.append(" as ");
sbItem.append(item.getTableSource().getAlias());
}
sbItem.append(" ");
sbItem.append(item.getLockType());
List<String> locks = dataNodeToLocks.computeIfAbsent(dataNode, k -> new ArrayList<>());
locks.add(sbItem.toString());
}
}
TableConfig tableConfig = schema.getTables().get(table);
if (tableConfig == null) {
String msg = "can't find table define of " + table + " in schema:" + schema.getName();
LOGGER.info(msg);
throw new SQLNonTransientException(msg);
Set<RouteResultsetNode> lockedNodes = new HashSet<>();
if (sc.isLocked()) {
lockedNodes.addAll(sc.getSession2().getTargetMap().keySet());
}
LockType lockType = lockTableStat.getLockType();
if (LockType.WRITE != lockType && LockType.READ != lockType) {
String msg = "lock type must be write or read";
LOGGER.info(msg);
throw new SQLNonTransientException(msg);
List<RouteResultsetNode> nodes = new ArrayList<>();
for (Map.Entry<String, List<String>> entry : dataNodeToLocks.entrySet()) {
RouteResultsetNode node = new RouteResultsetNode(entry.getKey(), ServerParse.LOCK, " LOCK TABLES " + StringUtil.join(entry.getValue(), ","));
nodes.add(node);
lockedNodes.remove(node);
}
List<String> dataNodes = tableConfig.getDataNodes();
RouteResultsetNode[] nodes = new RouteResultsetNode[dataNodes.size()];
for (int i = 0; i < dataNodes.size(); i++) {
nodes[i] = new RouteResultsetNode(dataNodes.get(i), ServerParse.LOCK, statementToString(stmt));
for (RouteResultsetNode toUnlockedNode : lockedNodes) {
RouteResultsetNode node = new RouteResultsetNode(toUnlockedNode.getName(), ServerParse.UNLOCK, " UNLOCK TABLES ");
nodes.add(node);
}
rrs.setNodes(nodes);
rrs.setNodes(nodes.toArray(new RouteResultsetNode[nodes.size()]));
rrs.setFinishedRoute(true);
return schema;
}

View File

@@ -423,22 +423,38 @@ public class ServerConnection extends FrontendConnection {
void lockTable(String sql) {
// lock table is disable in transaction
if (!autocommit) {
writeErrMessage(ErrorCode.ER_YES, "can't lock table in transaction!");
if (!autocommit || isTxStart()) {
writeErrMessage(ErrorCode.ER_YES, "can't lock tables in transaction in dble!");
return;
}
// if lock table has been executed and unlock has not been executed, can't execute lock table again
if (isLocked) {
writeErrMessage(ErrorCode.ER_YES, "can't lock multi-table");
return;
String db = this.schema;
SchemaConfig schema = null;
if (this.schema != null) {
schema = DbleServer.getInstance().getConfig().getSchemas().get(this.schema);
if (schema == null) {
writeErrMessage(ErrorCode.ERR_BAD_LOGICDB, "Unknown Database '" + db + "'");
return;
}
}
RouteResultset rrs;
try {
rrs = DbleServer.getInstance().getRouterService().route(schema, ServerParse.LOCK, sql, this);
} catch (Exception e) {
executeException(e, sql);
return ;
}
RouteResultset rrs = routeSQL(sql, ServerParse.LOCK);
if (rrs != null) {
session.lockTable(rrs);
}
}
void unLockTable(String sql) {
if (!autocommit || isTxStart()) {
writeErrMessage(ErrorCode.ER_YES, "can't unlock tables in transaction in dble!");
return;
}
sql = sql.replaceAll("\n", " ").replaceAll("\t", " ");
String[] words = SplitUtil.split(sql, ' ', true);
if (words.length == 2 && ("table".equalsIgnoreCase(words[1]) || "tables".equalsIgnoreCase(words[1]))) {