From 25f6c6c0ecf47b2a554b0f6934cf5e9b62591029 Mon Sep 17 00:00:00 2001 From: wenyh <44251917+wenyh1@users.noreply.github.com> Date: Mon, 16 Nov 2020 09:58:33 +0800 Subject: [PATCH] =?UTF-8?q?cherry=20pick=20from=20inner685/627=E3=80=81iss?= =?UTF-8?q?ue2225=20(#2284)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * cherry pick from inner685/627、issue2225 * adjust code --- .../backend/datasource/PhysicalDbGroup.java | 10 +++++++ .../dble/backend/datasource/ShardingNode.java | 5 ++-- .../dble/backend/heartbeat/MySQLDetector.java | 2 +- .../dble/meta/ProxyMetaManager.java | 3 ++ .../actiontech/dble/route/RouteResultset.java | 10 +++++++ .../dble/route/RouteResultsetNode.java | 9 ++++++ .../parser/druid/impl/DruidSelectParser.java | 8 +++-- .../impl/DruidSingleUnitSelectParser.java | 8 +++-- .../dble/route/util/RouterUtil.java | 30 +++++++++++++++---- .../dble/server/parser/ServerParse.java | 19 ++++++++++++ .../dble/server/util/SchemaUtil.java | 3 ++ .../services/rwsplit/RWSplitQueryHandler.java | 7 ++++- .../dble/services/rwsplit/RWSplitService.java | 7 ++++- 13 files changed, 106 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java index bdf2b8d4a..4e2a404cd 100644 --- a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java +++ b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java @@ -213,6 +213,10 @@ public class PhysicalDbGroup { } public PhysicalDbInstance select(Boolean master) throws IOException { + return select(master, false); + } + + public PhysicalDbInstance select(Boolean master, boolean isSpecialDeal) throws IOException { if (rwSplitMode == RW_SPLIT_OFF && (master != null && !master)) { LOGGER.warn("force slave,but the dbGroup[{}] doesn't contains active slave dbInstance", groupName); throw new IOException("force slave,but the dbGroup[" + groupName + "] doesn't contain active slave dbInstance"); @@ -230,6 +234,12 @@ public class PhysicalDbGroup { throw new IOException("the dbGroup[" + groupName + "] doesn't contain active dbInstance."); } PhysicalDbInstance selectInstance = loadBalancer.select(instances); + if (isSpecialDeal && selectInstance.isSalveOrRead() && selectInstance.isReadOnly()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("select write {}, because previously selected {} is readOnly", writeDbInstance, selectInstance); + } + return writeDbInstance; + } if (LOGGER.isDebugEnabled()) { LOGGER.debug("select {}", selectInstance); } diff --git a/src/main/java/com/actiontech/dble/backend/datasource/ShardingNode.java b/src/main/java/com/actiontech/dble/backend/datasource/ShardingNode.java index 756eb4032..ff8e92d97 100644 --- a/src/main/java/com/actiontech/dble/backend/datasource/ShardingNode.java +++ b/src/main/java/com/actiontech/dble/backend/datasource/ShardingNode.java @@ -89,7 +89,7 @@ public class ShardingNode { TraceManager.TraceObject traceObject = TraceManager.threadTrace("get-connection-from-sharding-node"); try { checkRequest(schema); - PhysicalDbInstance instance = dbGroup.select(canRunOnMaster(rrs, !isMustWrite && autoCommit)); + PhysicalDbInstance instance = dbGroup.select(canRunOnMaster(rrs, !isMustWrite && autoCommit), rrs.isSpecialDeal()); instance.getConnection(schema, handler, attachment, isMustWrite); } finally { TraceManager.finishSpan(traceObject); @@ -98,7 +98,8 @@ public class ShardingNode { public BackendConnection getConnection(String schema, boolean autocommit, Object attachment) throws IOException { checkRequest(schema); - PhysicalDbInstance instance = dbGroup.select(canRunOnMaster((RouteResultsetNode) attachment, autocommit)); + RouteResultsetNode rrs = (RouteResultsetNode) attachment; + PhysicalDbInstance instance = dbGroup.select(canRunOnMaster(rrs, autocommit), rrs.isSpecialDeal()); return instance.getConnection(schema, attachment); } diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDetector.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDetector.java index 0ae76aaf6..67d26bd86 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDetector.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDetector.java @@ -165,7 +165,7 @@ public class MySQLDetector implements SQLQueryResultListener shardingNodes) { RouteResultsetNode[] nodes = new RouteResultsetNode[shardingNodes.size()]; int i = 0; - RouteResultsetNode node; for (String shardingNode : shardingNodes) { - node = new RouteResultsetNode(shardingNode, rrs.getSqlType(), rrs.getStatement()); + nodes[i] = new RouteResultsetNode(shardingNode, rrs.getSqlType(), rrs.getStatement()); if (rrs.getCanRunInReadDB() != null) { - node.setCanRunInReadDB(rrs.getCanRunInReadDB()); + nodes[i].setCanRunInReadDB(rrs.getCanRunInReadDB()); } if (rrs.getRunOnSlave() != null) { - nodes[0].setRunOnSlave(rrs.getRunOnSlave()); + nodes[i].setRunOnSlave(rrs.getRunOnSlave()); } - nodes[i++] = node; + if (rrs.isSpecialDeal()) { + nodes[i].setSpecialDeal(true); + } + i++; } rrs.setSqlRouteCacheAble(cache); rrs.setNodes(nodes); @@ -523,6 +528,10 @@ public final class RouterUtil { String schemaName = table.getKey(); String tableName = table.getValue(); SchemaConfig schema = DbleServer.getInstance().getConfig().getSchemas().get(schemaName); + if (schema == null) { + String msg = "Table " + StringUtil.getFullName(schemaName, tableName) + " doesn't exist"; + throw new SQLException(msg, "42S02", ErrorCode.ER_NO_SUCH_TABLE); + } schemaList.add(schemaName); BaseTableConfig tableConfig = schema.getTables().get(tableName); if (tableConfig == null) { @@ -541,6 +550,15 @@ public final class RouterUtil { if (globalTables.size() == tableSize) { return tryRouteGlobalTablesToOneNode(tmpResultNodes, globalTables); } + + return tryCalculateRouteTablesToOneNodeForComplex(rrs, ctx, tmpResultNodes, globalTables, tablesSet, clientCharset); + } + + private static String tryCalculateRouteTablesToOneNodeForComplex( + RouteResultset rrs, DruidShardingParseInfo ctx, + Set tmpResultNodes, + Set> globalTables, Set> tablesSet, + String clientCharset) throws SQLException { if (tablesSet.size() != 0) { Set resultNodes = new HashSet<>(); for (RouteCalculateUnit routeUnit : ctx.getRouteCalculateUnits()) { diff --git a/src/main/java/com/actiontech/dble/server/parser/ServerParse.java b/src/main/java/com/actiontech/dble/server/parser/ServerParse.java index f50923146..067d70d54 100644 --- a/src/main/java/com/actiontech/dble/server/parser/ServerParse.java +++ b/src/main/java/com/actiontech/dble/server/parser/ServerParse.java @@ -58,8 +58,13 @@ public class ServerParse { public static final int MIGRATE = 203; /* don't set the constant to 255 */ public static final int UNSUPPORT = 254; + public static final int SELECT_FOR_UPDATE = 156; + public static final int LOCK_IN_SHARE_MODE = 157; + private static final Pattern PATTERN = Pattern.compile("(load)+\\s+(data)+\\s+\\w*\\s*(infile)+", Pattern.CASE_INSENSITIVE); private static final Pattern CALL_PATTERN = Pattern.compile("\\w*\\;\\s*\\s*(call)+\\s+\\w*\\s*", Pattern.CASE_INSENSITIVE); + private static final Pattern SELECT_FOR_UPDATE_PATTERN = Pattern.compile(".*(\\s+for\\s+update)\\s*$", Pattern.CASE_INSENSITIVE); + private static final Pattern LOCK_IN_SHARE_MODE_PATTERN = Pattern.compile(".*(\\s+lock\\s+in\\s+share\\s+mode)\\s*$", Pattern.CASE_INSENSITIVE); public static boolean startWithHint(String stmt) { int length = stmt.length(); @@ -1289,4 +1294,18 @@ public class ServerParse { } return OTHER; } + + public static int parseSpecial(int sqlType, String stmt) { + if (ServerParse.SELECT != sqlType) { + return OTHER; + } + if (SELECT_FOR_UPDATE_PATTERN.matcher(stmt).matches()) { + return SELECT_FOR_UPDATE; + } + if (LOCK_IN_SHARE_MODE_PATTERN.matcher(stmt).matches()) { + return LOCK_IN_SHARE_MODE; + } + return OTHER; + } + } diff --git a/src/main/java/com/actiontech/dble/server/util/SchemaUtil.java b/src/main/java/com/actiontech/dble/server/util/SchemaUtil.java index 1f037ba44..658451c34 100644 --- a/src/main/java/com/actiontech/dble/server/util/SchemaUtil.java +++ b/src/main/java/com/actiontech/dble/server/util/SchemaUtil.java @@ -185,6 +185,9 @@ public final class SchemaUtil { throws SQLException { SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(service.getUser(), contextSchema, table); String currentSchema = schemaInfo.schema.toUpperCase(); + if (schemaInfo.dual) { + return true; + } if (SchemaUtil.MYSQL_SYS_SCHEMA.contains(currentSchema)) { schemas.add(currentSchema); return false; diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitQueryHandler.java b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitQueryHandler.java index 639329563..ffa5dec69 100644 --- a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitQueryHandler.java +++ b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitQueryHandler.java @@ -48,7 +48,12 @@ public class RWSplitQueryHandler implements FrontendQueryHandler { session.execute(true, null); break; case RwSplitServerParse.SELECT: - session.execute(null, null); + int rs2 = RwSplitServerParse.parseSpecial(sqlType, sql); + if (rs2 == RwSplitServerParse.SELECT_FOR_UPDATE || rs2 == RwSplitServerParse.LOCK_IN_SHARE_MODE) { + session.execute(true, null); + } else { + session.execute(null, null); + } break; case RwSplitServerParse.SET: SetHandler.handle(sql, session.getService(), rs >>> 8); diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java index 98ed6b773..2bf4d2acc 100644 --- a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java +++ b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java @@ -193,7 +193,12 @@ public class RWSplitService extends BusinessService { int sqlType = rs & 0xff; switch (sqlType) { case ServerParse.SELECT: - session.execute(null, data, null); + int rs2 = ServerParse.parseSpecial(sqlType, sql); + if (rs2 == ServerParse.SELECT_FOR_UPDATE || rs2 == ServerParse.LOCK_IN_SHARE_MODE) { + session.execute(true, data, null); + } else { + session.execute(null, data, null); + } break; default: session.execute(true, data, null);