cherry pick from inner685/627、issue2225 (#2284)

* cherry pick from inner685/627、issue2225

* adjust code
This commit is contained in:
wenyh
2020-11-16 09:58:33 +08:00
committed by GitHub
parent 2d6bc73f61
commit 25f6c6c0ec
13 changed files with 106 additions and 15 deletions

View File

@@ -213,6 +213,10 @@ public class PhysicalDbGroup {
}
public PhysicalDbInstance select(Boolean master) throws IOException {
return select(master, false);
}
public PhysicalDbInstance select(Boolean master, boolean isSpecialDeal) throws IOException {
if (rwSplitMode == RW_SPLIT_OFF && (master != null && !master)) {
LOGGER.warn("force slave,but the dbGroup[{}] doesn't contains active slave dbInstance", groupName);
throw new IOException("force slave,but the dbGroup[" + groupName + "] doesn't contain active slave dbInstance");
@@ -230,6 +234,12 @@ public class PhysicalDbGroup {
throw new IOException("the dbGroup[" + groupName + "] doesn't contain active dbInstance.");
}
PhysicalDbInstance selectInstance = loadBalancer.select(instances);
if (isSpecialDeal && selectInstance.isSalveOrRead() && selectInstance.isReadOnly()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("select write {}, because previously selected {} is readOnly", writeDbInstance, selectInstance);
}
return writeDbInstance;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("select {}", selectInstance);
}

View File

@@ -89,7 +89,7 @@ public class ShardingNode {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("get-connection-from-sharding-node");
try {
checkRequest(schema);
PhysicalDbInstance instance = dbGroup.select(canRunOnMaster(rrs, !isMustWrite && autoCommit));
PhysicalDbInstance instance = dbGroup.select(canRunOnMaster(rrs, !isMustWrite && autoCommit), rrs.isSpecialDeal());
instance.getConnection(schema, handler, attachment, isMustWrite);
} finally {
TraceManager.finishSpan(traceObject);
@@ -98,7 +98,8 @@ public class ShardingNode {
public BackendConnection getConnection(String schema, boolean autocommit, Object attachment) throws IOException {
checkRequest(schema);
PhysicalDbInstance instance = dbGroup.select(canRunOnMaster((RouteResultsetNode) attachment, autocommit));
RouteResultsetNode rrs = (RouteResultsetNode) attachment;
PhysicalDbInstance instance = dbGroup.select(canRunOnMaster(rrs, autocommit), rrs.isSpecialDeal());
return instance.getConnection(schema, attachment);
}

View File

@@ -165,7 +165,7 @@ public class MySQLDetector implements SQLQueryResultListener<SQLQueryResult<Map<
AlertUtil.alertResolve(AlarmCode.DB_INSTANCE_LOWER_CASE_ERROR, Alert.AlertLevel.WARN, "mysql", this.heartbeat.getSource().getConfig().getId(), labels,
ToResolveContainer.DB_INSTANCE_LOWER_CASE_ERROR, url);
}
if (!source.isSalveOrRead()) { // writehost checkRecoverFail read only
if (heartbeat.getStatus() == MySQLHeartbeat.INIT_STATUS || !source.isSalveOrRead()) { // writehost checkRecoverFail read only
source.setReadOnly(variables.isReadOnly());
}
}

View File

@@ -275,6 +275,9 @@ public class ProxyMetaManager {
try {
while (true) {
int oldVersion = version.get();
if (catalogs.get(schema) == null) {
return null;
}
if (metaCount.get() == 0) {
PlanNode viewNode = catalogs.get(schema).getView(vName);
if (version.get() == oldVersion) {

View File

@@ -63,6 +63,16 @@ public final class RouteResultset implements Serializable {
private Boolean runOnSlave = null;
private String[] groupByCols;
private boolean isSpecialDeal = false;
public boolean isSpecialDeal() {
return isSpecialDeal;
}
public void setSpecialDeal(boolean specialDeal) {
isSpecialDeal = specialDeal;
}
public String[] getGroupByCols() {
return groupByCols;
}

View File

@@ -28,6 +28,7 @@ public final class RouteResultsetNode implements Serializable, Comparable<RouteR
private Boolean runOnSlave = null;
private AtomicLong multiplexNum;
private boolean isSpecialDeal = false;
public RouteResultsetNode(String name, int sqlType, String srcStatement) {
this.name = name;
@@ -40,6 +41,14 @@ public final class RouteResultsetNode implements Serializable, Comparable<RouteR
this.multiplexNum = new AtomicLong(0);
}
public boolean isSpecialDeal() {
return isSpecialDeal;
}
public void setSpecialDeal(boolean specialDeal) {
isSpecialDeal = specialDeal;
}
public Boolean getRunOnSlave() {
return runOnSlave;
}

View File

@@ -133,8 +133,12 @@ public class DruidSelectParser extends DefaultDruidParser {
SchemaConfig schema = schemaInfo.getSchemaConfig();
String noShardingNode = RouterUtil.isNoSharding(schema, schemaInfo.getTable());
if ((mysqlSelectQuery.isForUpdate() || mysqlSelectQuery.isLockInShareMode()) && !service.isAutocommit()) {
rrs.setCanRunInReadDB(false);
if ((mysqlSelectQuery.isForUpdate() || mysqlSelectQuery.isLockInShareMode())) {
if (!service.isAutocommit()) {
rrs.setCanRunInReadDB(false);
} else {
rrs.setSpecialDeal(true);
}
}
if (noShardingNode != null) {
//route to singleNode

View File

@@ -63,8 +63,12 @@ public class DruidSingleUnitSelectParser extends DefaultDruidParser {
this.getCtx().clearRouteCalculateUnit();
}
// change canRunInReadDB
if ((mysqlSelectQuery.isForUpdate() || mysqlSelectQuery.isLockInShareMode()) && !service.isAutocommit()) {
rrs.setCanRunInReadDB(false);
if ((mysqlSelectQuery.isForUpdate() || mysqlSelectQuery.isLockInShareMode())) {
if (!service.isAutocommit()) {
rrs.setCanRunInReadDB(false);
} else {
rrs.setSpecialDeal(true);
}
}
} else if (sqlSelectQuery instanceof SQLUnionQuery) {
StringPtr noShardingNode = new StringPtr(null);

View File

@@ -30,6 +30,7 @@ import com.actiontech.dble.singleton.ProxyMeta;
import com.actiontech.dble.sqlengine.mpp.ColumnRoute;
import com.actiontech.dble.sqlengine.mpp.RangeValue;
import com.actiontech.dble.util.HexFormatUtil;
import com.actiontech.dble.util.StringUtil;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLHexExpr;
@@ -266,7 +267,9 @@ public final class RouterUtil {
if (rrs.getRunOnSlave() != null) {
nodes[0].setRunOnSlave(rrs.getRunOnSlave());
}
if (rrs.isSpecialDeal()) {
nodes[0].setSpecialDeal(true);
}
return rrs;
}
@@ -312,16 +315,18 @@ public final class RouterUtil {
private static RouteResultset routeToMultiNode(boolean cache, RouteResultset rrs, Collection<String> shardingNodes) {
RouteResultsetNode[] nodes = new RouteResultsetNode[shardingNodes.size()];
int i = 0;
RouteResultsetNode node;
for (String shardingNode : shardingNodes) {
node = new RouteResultsetNode(shardingNode, rrs.getSqlType(), rrs.getStatement());
nodes[i] = new RouteResultsetNode(shardingNode, rrs.getSqlType(), rrs.getStatement());
if (rrs.getCanRunInReadDB() != null) {
node.setCanRunInReadDB(rrs.getCanRunInReadDB());
nodes[i].setCanRunInReadDB(rrs.getCanRunInReadDB());
}
if (rrs.getRunOnSlave() != null) {
nodes[0].setRunOnSlave(rrs.getRunOnSlave());
nodes[i].setRunOnSlave(rrs.getRunOnSlave());
}
nodes[i++] = node;
if (rrs.isSpecialDeal()) {
nodes[i].setSpecialDeal(true);
}
i++;
}
rrs.setSqlRouteCacheAble(cache);
rrs.setNodes(nodes);
@@ -523,6 +528,10 @@ public final class RouterUtil {
String schemaName = table.getKey();
String tableName = table.getValue();
SchemaConfig schema = DbleServer.getInstance().getConfig().getSchemas().get(schemaName);
if (schema == null) {
String msg = "Table " + StringUtil.getFullName(schemaName, tableName) + " doesn't exist";
throw new SQLException(msg, "42S02", ErrorCode.ER_NO_SUCH_TABLE);
}
schemaList.add(schemaName);
BaseTableConfig tableConfig = schema.getTables().get(tableName);
if (tableConfig == null) {
@@ -541,6 +550,15 @@ public final class RouterUtil {
if (globalTables.size() == tableSize) {
return tryRouteGlobalTablesToOneNode(tmpResultNodes, globalTables);
}
return tryCalculateRouteTablesToOneNodeForComplex(rrs, ctx, tmpResultNodes, globalTables, tablesSet, clientCharset);
}
private static String tryCalculateRouteTablesToOneNodeForComplex(
RouteResultset rrs, DruidShardingParseInfo ctx,
Set<String> tmpResultNodes,
Set<Pair<String, BaseTableConfig>> globalTables, Set<Pair<String, String>> tablesSet,
String clientCharset) throws SQLException {
if (tablesSet.size() != 0) {
Set<String> resultNodes = new HashSet<>();
for (RouteCalculateUnit routeUnit : ctx.getRouteCalculateUnits()) {

View File

@@ -58,8 +58,13 @@ public class ServerParse {
public static final int MIGRATE = 203;
/* don't set the constant to 255 */
public static final int UNSUPPORT = 254;
public static final int SELECT_FOR_UPDATE = 156;
public static final int LOCK_IN_SHARE_MODE = 157;
private static final Pattern PATTERN = Pattern.compile("(load)+\\s+(data)+\\s+\\w*\\s*(infile)+", Pattern.CASE_INSENSITIVE);
private static final Pattern CALL_PATTERN = Pattern.compile("\\w*\\;\\s*\\s*(call)+\\s+\\w*\\s*", Pattern.CASE_INSENSITIVE);
private static final Pattern SELECT_FOR_UPDATE_PATTERN = Pattern.compile(".*(\\s+for\\s+update)\\s*$", Pattern.CASE_INSENSITIVE);
private static final Pattern LOCK_IN_SHARE_MODE_PATTERN = Pattern.compile(".*(\\s+lock\\s+in\\s+share\\s+mode)\\s*$", Pattern.CASE_INSENSITIVE);
public static boolean startWithHint(String stmt) {
int length = stmt.length();
@@ -1289,4 +1294,18 @@ public class ServerParse {
}
return OTHER;
}
public static int parseSpecial(int sqlType, String stmt) {
if (ServerParse.SELECT != sqlType) {
return OTHER;
}
if (SELECT_FOR_UPDATE_PATTERN.matcher(stmt).matches()) {
return SELECT_FOR_UPDATE;
}
if (LOCK_IN_SHARE_MODE_PATTERN.matcher(stmt).matches()) {
return LOCK_IN_SHARE_MODE;
}
return OTHER;
}
}

View File

@@ -185,6 +185,9 @@ public final class SchemaUtil {
throws SQLException {
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(service.getUser(), contextSchema, table);
String currentSchema = schemaInfo.schema.toUpperCase();
if (schemaInfo.dual) {
return true;
}
if (SchemaUtil.MYSQL_SYS_SCHEMA.contains(currentSchema)) {
schemas.add(currentSchema);
return false;

View File

@@ -48,7 +48,12 @@ public class RWSplitQueryHandler implements FrontendQueryHandler {
session.execute(true, null);
break;
case RwSplitServerParse.SELECT:
session.execute(null, null);
int rs2 = RwSplitServerParse.parseSpecial(sqlType, sql);
if (rs2 == RwSplitServerParse.SELECT_FOR_UPDATE || rs2 == RwSplitServerParse.LOCK_IN_SHARE_MODE) {
session.execute(true, null);
} else {
session.execute(null, null);
}
break;
case RwSplitServerParse.SET:
SetHandler.handle(sql, session.getService(), rs >>> 8);

View File

@@ -193,7 +193,12 @@ public class RWSplitService extends BusinessService {
int sqlType = rs & 0xff;
switch (sqlType) {
case ServerParse.SELECT:
session.execute(null, data, null);
int rs2 = ServerParse.parseSpecial(sqlType, sql);
if (rs2 == ServerParse.SELECT_FOR_UPDATE || rs2 == ServerParse.LOCK_IN_SHARE_MODE) {
session.execute(true, data, null);
} else {
session.execute(null, data, null);
}
break;
default:
session.execute(true, data, null);