mirror of
https://github.com/actiontech/dble.git
synced 2026-01-27 07:50:48 -06:00
inner-1374: RoutePenetration support (#2997)
Signed-off-by: dcy <dcy10000@gmail.com>
This commit is contained in:
@@ -125,6 +125,7 @@ public final class DbleServer {
|
||||
AlertUtil.switchAlert(true);
|
||||
}
|
||||
AlertManager.getInstance().startAlert();
|
||||
RoutePenetrationManager.getInstance().init();
|
||||
LOGGER.info("========================================Alert Manager start finish================================");
|
||||
|
||||
// server startup
|
||||
|
||||
@@ -174,6 +174,8 @@ public final class SystemConfig {
|
||||
private boolean useOuterHa = true;
|
||||
private String traceEndPoint = null;
|
||||
private String fakeMySQLVersion = "5.7.21";
|
||||
private int enableRoutePenetration = 0;
|
||||
private String routePenetrationRules = "";
|
||||
|
||||
private int enableStatistic = 0;
|
||||
private int associateTablesByEntryByUserTableSize = 1024;
|
||||
@@ -1370,6 +1372,27 @@ public final class SystemConfig {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public int isEnableRoutePenetration() {
|
||||
return enableRoutePenetration;
|
||||
}
|
||||
|
||||
public void setEnableRoutePenetration(int enableRoutePenetrationTmp) {
|
||||
if (enableRoutePenetrationTmp >= 0 && enableRoutePenetrationTmp <= 1) {
|
||||
this.enableRoutePenetration = enableRoutePenetrationTmp;
|
||||
} else if (this.problemReporter != null) {
|
||||
problemReporter.warn(String.format(WARNING_FORMAT, "enableRoutePenetration", enableRoutePenetrationTmp, this.enableRoutePenetration));
|
||||
}
|
||||
}
|
||||
|
||||
public String getRoutePenetrationRules() {
|
||||
return routePenetrationRules;
|
||||
}
|
||||
|
||||
public void setRoutePenetrationRules(String sqlPenetrationRegexesTmp) {
|
||||
routePenetrationRules = sqlPenetrationRegexesTmp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SystemConfig [" +
|
||||
@@ -1466,6 +1489,8 @@ public final class SystemConfig {
|
||||
", tableByUserByEntryTableSize=" + tableByUserByEntryTableSize +
|
||||
", statisticQueueSize=" + statisticQueueSize +
|
||||
", closeHeartBeatRecord=" + closeHeartBeatRecord +
|
||||
", enableRoutePenetration=" + enableRoutePenetration +
|
||||
", routePenetrationRules='" + routePenetrationRules + '\'' +
|
||||
"]";
|
||||
}
|
||||
|
||||
|
||||
@@ -66,6 +66,8 @@ public final class RouteResultset implements Serializable {
|
||||
|
||||
private Map<String, List<LoadDataRouteResultsetNode>> multiRouteResultSetNodeMap;
|
||||
|
||||
private boolean routePenetration = false;
|
||||
|
||||
private boolean isForUpdate = false;
|
||||
|
||||
private boolean enableLoadDataFlag = false;
|
||||
@@ -105,6 +107,13 @@ public final class RouteResultset implements Serializable {
|
||||
this.runOnSlave = runOnSlave;
|
||||
}
|
||||
|
||||
public boolean isRoutePenetration() {
|
||||
return routePenetration;
|
||||
}
|
||||
|
||||
public void setRoutePenetration(boolean routePenetration) {
|
||||
this.routePenetration = routePenetration;
|
||||
}
|
||||
|
||||
public boolean isLoadData() {
|
||||
return isLoadData;
|
||||
|
||||
@@ -11,6 +11,7 @@ import com.actiontech.dble.route.RouteStrategy;
|
||||
import com.actiontech.dble.route.util.RouterUtil;
|
||||
import com.actiontech.dble.server.parser.ServerParse;
|
||||
import com.actiontech.dble.services.mysqlsharding.ShardingService;
|
||||
import com.actiontech.dble.singleton.RoutePenetrationManager;
|
||||
import com.actiontech.dble.sqlengine.mpp.LoadData;
|
||||
import com.alibaba.druid.sql.ast.SQLStatement;
|
||||
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
|
||||
@@ -55,6 +56,9 @@ public abstract class AbstractRouteStrategy implements RouteStrategy {
|
||||
|
||||
RouteResultset rrs = new RouteResultset(origSQL, sqlType);
|
||||
|
||||
if (RoutePenetrationManager.getInstance().isEnabled() && RoutePenetrationManager.getInstance().match(origSQL)) {
|
||||
rrs.setRoutePenetration(true);
|
||||
}
|
||||
/*
|
||||
* debug mode and load data ,no cache
|
||||
*/
|
||||
|
||||
@@ -22,6 +22,7 @@ import com.actiontech.dble.plan.common.ptr.StringPtr;
|
||||
import com.actiontech.dble.plan.visitor.MySQLItemVisitor;
|
||||
import com.actiontech.dble.route.RouteResultset;
|
||||
import com.actiontech.dble.route.RouteResultsetNode;
|
||||
import com.actiontech.dble.route.function.AbstractPartitionAlgorithm;
|
||||
import com.actiontech.dble.route.parser.druid.RouteCalculateUnit;
|
||||
import com.actiontech.dble.route.parser.druid.ServerSchemaStatVisitor;
|
||||
import com.actiontech.dble.route.parser.util.Pair;
|
||||
@@ -39,12 +40,15 @@ import com.alibaba.druid.sql.ast.statement.*;
|
||||
import com.alibaba.druid.sql.dialect.mysql.ast.expr.MySqlOrderingExpr;
|
||||
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
|
||||
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlExprParser;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.sql.SQLNonTransientException;
|
||||
import java.util.*;
|
||||
|
||||
public class DruidSelectParser extends DefaultDruidParser {
|
||||
private static final Logger LOGGER = LogManager.getLogger(DruidSelectParser.class);
|
||||
private static HashSet<String> aggregateSet = new HashSet<>(16, 1);
|
||||
|
||||
static {
|
||||
@@ -171,6 +175,14 @@ public class DruidSelectParser extends DefaultDruidParser {
|
||||
LOGGER.info(msg);
|
||||
throw new SQLNonTransientException(msg);
|
||||
} else if (nodeSet.size() > 1) {
|
||||
if (rrs.isRoutePenetration()) {
|
||||
LOGGER.debug("the query {} match the route penetration regex", rrs.getSrcStatement());
|
||||
rrs = tryDirectRoute(schema, rrs);
|
||||
if (rrs.isFinishedRoute()) {
|
||||
LOGGER.debug("the query {} match the route penetration rule, will direct route", rrs.getSrcStatement());
|
||||
return;
|
||||
}
|
||||
}
|
||||
//if the sql involved node more than 1 ,Aggregate function/Group by/Order by should use complexQuery
|
||||
parseOrderAggGroupMysql(schema, selectStmt, rrs, mysqlSelectQuery, tc);
|
||||
if (rrs.isNeedOptimizer()) {
|
||||
@@ -216,6 +228,97 @@ public class DruidSelectParser extends DefaultDruidParser {
|
||||
return false;
|
||||
}
|
||||
|
||||
private RouteResultset tryDirectRoute(SchemaConfig schemaConfig, RouteResultset rrs) throws SQLException {
|
||||
if (schemaConfig == null) {
|
||||
return rrs;
|
||||
}
|
||||
|
||||
List<Pair<String, String>> tables = ctx.getTables();
|
||||
int index = 0;
|
||||
AbstractPartitionAlgorithm firstRule = null;
|
||||
boolean directRoute = true;
|
||||
Set<String> firstDataNodes = new HashSet<>();
|
||||
Map<String, BaseTableConfig> tableConfigMap = schemaConfig.getTables() == null ? null : schemaConfig.getTables();
|
||||
|
||||
if (tableConfigMap != null) {
|
||||
for (Pair<String, String> table : tables) {
|
||||
String tableName = table.getValue();
|
||||
BaseTableConfig tc = tableConfigMap.get(tableName);
|
||||
if (tc == null) {
|
||||
Map<String, String> tableAliasMap = ctx.getTableAliasMap();
|
||||
if (tableAliasMap != null && tableAliasMap.get(tableName) != null) {
|
||||
tc = tableConfigMap.get(tableAliasMap.get(tableName));
|
||||
}
|
||||
}
|
||||
|
||||
if (index == 0) {
|
||||
if (tc != null) {
|
||||
if (!(tc instanceof ShardingTableConfig)) {
|
||||
continue;
|
||||
}
|
||||
firstRule = ((ShardingTableConfig) tc).getFunction();
|
||||
firstDataNodes.addAll(tc.getShardingNodes());
|
||||
}
|
||||
} else {
|
||||
if (tc != null) {
|
||||
if (!(tc instanceof ShardingTableConfig)) {
|
||||
continue;
|
||||
}
|
||||
AbstractPartitionAlgorithm ruleCfg = ((ShardingTableConfig) tc).getFunction();
|
||||
Set<String> dataNodes = new HashSet<>(tc.getShardingNodes());
|
||||
if (firstRule != null && ((!ruleCfg.equals(firstRule)) || !dataNodes.equals(firstDataNodes))) {
|
||||
directRoute = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
index++;
|
||||
}
|
||||
}
|
||||
|
||||
RouteResultset rrsResult = rrs;
|
||||
if (directRoute) {
|
||||
rrs.setStatement(RouterUtil.removeSchema(rrs.getStatement(), schemaConfig.getName()));
|
||||
rrsResult = tryRoute(schemaConfig, rrs);
|
||||
}
|
||||
return rrsResult;
|
||||
}
|
||||
|
||||
private RouteResultset tryRoute(SchemaConfig schema, RouteResultset rrs) throws SQLException {
|
||||
if ((ctx.getTables() == null || ctx.getTables().size() == 0) && (ctx.getTableAliasMap() == null || ctx.getTableAliasMap().isEmpty())) {
|
||||
rrs = RouterUtil.routeToSingleNode(rrs, schema.getRandomShardingNode());
|
||||
rrs.setSchema(schema.getName());
|
||||
rrs.setFinishedRoute(true);
|
||||
return rrs;
|
||||
}
|
||||
SortedSet<RouteResultsetNode> nodeSet = new TreeSet<>();
|
||||
boolean isAllGlobalTable = RouterUtil.isAllGlobalTable(ctx, schema);
|
||||
for (RouteCalculateUnit unit : ctx.getRouteCalculateUnits()) {
|
||||
RouteResultset rrsTmp = RouterUtil.tryRouteForTables(schema, ctx, unit, rrs, true, null);
|
||||
if (rrsTmp != null && rrsTmp.getNodes() != null) {
|
||||
nodeSet.addAll(Arrays.asList(rrsTmp.getNodes()));
|
||||
}
|
||||
if (isAllGlobalTable) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (nodeSet.size() > 0) {
|
||||
RouteResultsetNode[] nodes = new RouteResultsetNode[nodeSet.size()];
|
||||
int i = 0;
|
||||
for (RouteResultsetNode routeResultsetNode : nodeSet) {
|
||||
nodes[i++] = routeResultsetNode;
|
||||
}
|
||||
|
||||
rrs.setNodes(nodes);
|
||||
rrs.setSchema(schema.getName());
|
||||
rrs.setFinishedRoute(true);
|
||||
|
||||
}
|
||||
return rrs;
|
||||
}
|
||||
|
||||
|
||||
private void tryRouteToOneNodeForComplex(RouteResultset rrs, SQLSelectStatement selectStmt, int tableSize, String clientCharset) throws SQLException {
|
||||
Set<String> schemaList = new HashSet<>();
|
||||
String shardingNode = RouterUtil.tryRouteTablesToOneNodeForComplex(rrs, ctx, schemaList, tableSize, clientCharset);
|
||||
@@ -501,6 +604,14 @@ public class DruidSelectParser extends DefaultDruidParser {
|
||||
rrs.setSqlStatement(selectStmt);
|
||||
return schema;
|
||||
} else {
|
||||
if (rrs.isRoutePenetration()) {
|
||||
LOGGER.debug("the query {} match the route penetration regex", rrs.getSrcStatement());
|
||||
rrs = tryDirectRoute(schema, rrs);
|
||||
if (rrs.isFinishedRoute()) {
|
||||
LOGGER.debug("the query {} match the route penetration rule, will direct route", rrs.getSrcStatement());
|
||||
return schema;
|
||||
}
|
||||
}
|
||||
tryRouteToOneNodeForComplex(rrs, selectStmt, tableSize, service.getCharset().getClient());
|
||||
return schema;
|
||||
}
|
||||
@@ -512,6 +623,7 @@ public class DruidSelectParser extends DefaultDruidParser {
|
||||
@Override
|
||||
public void changeSql(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt)
|
||||
throws SQLException {
|
||||
|
||||
if (rrs.isFinishedExecute() || rrs.isNeedOptimizer()) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -694,7 +694,7 @@ public final class RouterUtil {
|
||||
/**
|
||||
* tryRouteFor multiTables
|
||||
*/
|
||||
private static RouteResultset tryRouteForTables(
|
||||
public static RouteResultset tryRouteForTables(
|
||||
SchemaConfig schema, DruidShardingParseInfo ctx, RouteCalculateUnit routeUnit, RouteResultset rrs,
|
||||
boolean isSelect, String clientCharset) throws SQLException {
|
||||
List<Pair<String, String>> tables = ctx.getTables();
|
||||
@@ -1108,5 +1108,16 @@ public final class RouterUtil {
|
||||
return Boolean.FALSE.equals(o);
|
||||
}
|
||||
|
||||
|
||||
public static boolean isAllGlobalTable(DruidShardingParseInfo ctx, SchemaConfig schema) {
|
||||
boolean isAllGlobal = false;
|
||||
for (Pair<String, String> table : ctx.getTables()) {
|
||||
BaseTableConfig tableConfig = schema.getTables().get(table.getValue());
|
||||
if (tableConfig != null && tableConfig instanceof GlobalTableConfig) {
|
||||
isAllGlobal = true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return isAllGlobal;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -171,7 +171,7 @@ public final class ExplainHandler {
|
||||
return null;
|
||||
} else {
|
||||
StringBuilder s = new StringBuilder();
|
||||
LOGGER.warn(s.append(service).append(stmt).toString() + " error:", e);
|
||||
LOGGER.warn(s.append(service).append(stmt).append(" error:").toString(), e);
|
||||
String msg = e.getMessage();
|
||||
service.writeErrMessage(ErrorCode.ER_PARSE_ERROR, msg == null ? e.getClass().getSimpleName() : msg);
|
||||
return null;
|
||||
|
||||
@@ -0,0 +1,160 @@
|
||||
/*
|
||||
* Copyright (C) 2016-2021 ActionTech.
|
||||
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
|
||||
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
|
||||
*/
|
||||
package com.actiontech.dble.singleton;
|
||||
|
||||
import com.actiontech.dble.config.model.SystemConfig;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.gson.*;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.lang.reflect.Type;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* @author dcy
|
||||
* Create Date: 2021-09-14
|
||||
*/
|
||||
public final class RoutePenetrationManager {
|
||||
private static final RoutePenetrationManager INSTANCE = new RoutePenetrationManager();
|
||||
private List<PenetrationRule> rules = Lists.newArrayList();
|
||||
private static final Logger LOGGER = LogManager.getLogger(RoutePenetrationManager.class);
|
||||
|
||||
private RoutePenetrationManager() {
|
||||
}
|
||||
|
||||
public static RoutePenetrationManager getInstance() {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
|
||||
public void init() {
|
||||
final SystemConfig config = SystemConfig.getInstance();
|
||||
try {
|
||||
final JsonBooleanDeserializer deserializer = new JsonBooleanDeserializer();
|
||||
final Gson gson = new GsonBuilder().registerTypeAdapter(Boolean.class, deserializer).registerTypeAdapter(boolean.class, deserializer).create();
|
||||
if (config.isEnableRoutePenetration() == 1) {
|
||||
final String routePenetrationRules = config.getRoutePenetrationRules();
|
||||
if (StringUtils.isBlank(routePenetrationRules)) {
|
||||
throw new IllegalStateException("property routePenetrationRules can't be null");
|
||||
}
|
||||
final PenetrationConfig penetrationConfig = gson.fromJson(routePenetrationRules, PenetrationConfig.class);
|
||||
if (penetrationConfig.getRules() == null) {
|
||||
throw new IllegalStateException("rules can't be null");
|
||||
}
|
||||
rules = Arrays.asList(penetrationConfig.getRules());
|
||||
rules.forEach(PenetrationRule::init);
|
||||
}
|
||||
LOGGER.info("init {} route-penetration rules success", rules.size());
|
||||
LOGGER.debug("route-penetration rules :{}", rules);
|
||||
} catch (Exception e) {
|
||||
final String msg = "can't parse the route-penetration rule, please check the 'routePenetrationRules', detail exception is :" + e;
|
||||
LOGGER.error(msg);
|
||||
throw new IllegalStateException("The system property routePenetrationRules in server.xml is illegal or unset, for more detail, please check dble.log .");
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isEnabled() {
|
||||
return SystemConfig.getInstance().isEnableRoutePenetration() == 1;
|
||||
}
|
||||
|
||||
public boolean match(String sql) {
|
||||
return rules.stream().anyMatch((rule) -> rule.match(sql));
|
||||
}
|
||||
|
||||
private static final class PenetrationConfig {
|
||||
|
||||
private PenetrationRule[] rules = new PenetrationRule[0];
|
||||
|
||||
public PenetrationRule[] getRules() {
|
||||
return rules;
|
||||
}
|
||||
|
||||
public void setRules(PenetrationRule[] rules) {
|
||||
this.rules = rules;
|
||||
}
|
||||
}
|
||||
|
||||
private static final class PenetrationRule {
|
||||
private Pattern pattern;
|
||||
private String regex;
|
||||
private boolean caseSensitive = true;
|
||||
private boolean partMatch = true;
|
||||
|
||||
public String getRegex() {
|
||||
return regex;
|
||||
}
|
||||
|
||||
public void setRegex(String regex) {
|
||||
this.regex = regex;
|
||||
}
|
||||
|
||||
public boolean isCaseSensitive() {
|
||||
return caseSensitive;
|
||||
}
|
||||
|
||||
public void setCaseSensitive(boolean caseSensitive) {
|
||||
this.caseSensitive = caseSensitive;
|
||||
}
|
||||
|
||||
public boolean isPartMatch() {
|
||||
return partMatch;
|
||||
}
|
||||
|
||||
public void setPartMatch(boolean partMatch) {
|
||||
this.partMatch = partMatch;
|
||||
}
|
||||
|
||||
|
||||
public void init() {
|
||||
if (StringUtils.isBlank(regex)) {
|
||||
throw new IllegalStateException("regex can't be null or empty.");
|
||||
}
|
||||
int flag = Pattern.DOTALL;
|
||||
if (!caseSensitive) {
|
||||
flag |= Pattern.CASE_INSENSITIVE;
|
||||
}
|
||||
pattern = Pattern.compile(regex, flag);
|
||||
}
|
||||
|
||||
public boolean match(String sql) {
|
||||
final Matcher matcher = pattern.matcher(sql);
|
||||
return partMatch ? matcher.find() : matcher.matches();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "PenetrationRule{" +
|
||||
", regex='" + regex + '\'' +
|
||||
", caseSensitive=" + caseSensitive +
|
||||
", partMatch=" + partMatch +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
private static class JsonBooleanDeserializer implements JsonDeserializer<Boolean> {
|
||||
@Override
|
||||
public Boolean deserialize(JsonElement json, Type typeOfT, JsonDeserializationContext context) throws JsonParseException {
|
||||
try {
|
||||
String value = json.getAsJsonPrimitive().getAsString();
|
||||
if (value != null) {
|
||||
value = value.toLowerCase();
|
||||
}
|
||||
if ("true".equals(value) || "false".equals(value)) {
|
||||
return Boolean.valueOf(value);
|
||||
} else {
|
||||
throw new JsonParseException("Cannot parse json '" + json.toString() + "' to boolean value");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new JsonParseException("Cannot parse json '" + json.toString() + "' to boolean value", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -133,6 +133,9 @@ public final class SystemParams {
|
||||
readOnlyParams.add(new ParamInfo("heapTableBufferChunkSize", sysConfig.getHeapTableBufferChunkSize() + "B", "Used for temp table persistence of cursor, setting for read-buffer size."));
|
||||
readOnlyParams.add(new ParamInfo("statisticQueueSize", StatisticManager.getInstance().getStatisticQueueSize() + "", "Sets the queue size for statistic, value must not be less than 1 and must be a power of 2,The default value is 4096"));
|
||||
readOnlyParams.add(new ParamInfo("closeHeartBeatRecord", sysConfig.isCloseHeartBeatRecord() + "", "close heartbeat record. if closed, `show @@dbinstance.synstatus`,`show @@dbinstance.syndetail`,`show @@heartbeat.detail` will be empty and `show @@heartbeat`'s EXECUTE_TIME will be '-' .The default value is false"));
|
||||
readOnlyParams.add(new ParamInfo("enableRoutePenetration", sysConfig.isEnableRoutePenetration() + "", "Whether enable route penetration"));
|
||||
readOnlyParams.add(new ParamInfo("routePenetrationRules", sysConfig.getRoutePenetrationRules() + "", "The config of route penetration"));
|
||||
|
||||
}
|
||||
|
||||
public List<ParamInfo> getVolatileParams() {
|
||||
|
||||
Reference in New Issue
Block a user