mirror of
https://github.com/actiontech/dble.git
synced 2026-05-05 05:50:32 -05:00
[inner-2217&inner-2219]feature: add OLAP routing rules
This commit is contained in:
+2
-2
@@ -6,7 +6,7 @@
|
||||
package com.actiontech.dble.backend.mysql.nio.handler;
|
||||
|
||||
import com.actiontech.dble.DbleServer;
|
||||
import com.actiontech.dble.backend.datasource.ShardingNode;
|
||||
import com.actiontech.dble.backend.datasource.BaseNode;
|
||||
import com.actiontech.dble.backend.mysql.LoadDataUtil;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.transaction.AutoCommitHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.transaction.AutoTxOperation;
|
||||
@@ -146,7 +146,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
|
||||
connRrns.add(node);
|
||||
// create new connection
|
||||
node.setRunOnSlave(rrs.getRunOnSlave());
|
||||
ShardingNode dn = DbleServer.getInstance().getConfig().getShardingNodes().get(node.getName());
|
||||
BaseNode dn = DbleServer.getInstance().getConfig().getAllNodes().get(node.getName());
|
||||
dn.getConnection(dn.getDatabase(), session.getShardingService().isTxStart(), sessionAutocommit, node, this, node);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
package com.actiontech.dble.backend.mysql.nio.handler;
|
||||
|
||||
import com.actiontech.dble.DbleServer;
|
||||
import com.actiontech.dble.backend.datasource.ShardingNode;
|
||||
import com.actiontech.dble.backend.datasource.BaseNode;
|
||||
import com.actiontech.dble.backend.mysql.LoadDataUtil;
|
||||
import com.actiontech.dble.config.ErrorCode;
|
||||
import com.actiontech.dble.config.ServerConfig;
|
||||
@@ -106,7 +106,7 @@ public class SingleNodeHandler implements ResponseHandler, LoadDataResponseHandl
|
||||
// create new connection
|
||||
node.setRunOnSlave(rrs.getRunOnSlave());
|
||||
ServerConfig conf = DbleServer.getInstance().getConfig();
|
||||
ShardingNode dn = conf.getShardingNodes().get(node.getName());
|
||||
BaseNode dn = conf.getAllNodes().get(node.getName());
|
||||
dn.getConnection(dn.getDatabase(), session.getShardingService().isTxStart(), session.getShardingService().isAutocommit(), node, this, node);
|
||||
} finally {
|
||||
TraceManager.finishSpan(session.getShardingService(), traceObject);
|
||||
|
||||
+1
-1
@@ -181,7 +181,7 @@ public class HandlerBuilder {
|
||||
sql = sql.replace(tableToSimple.getKey(), tableToSimple.getValue());
|
||||
}
|
||||
}
|
||||
return new RouteResultsetNode(routeNode.getName(), ServerParse.SELECT, sql, tableSet);
|
||||
return new RouteResultsetNode(routeNode.getName(), ServerParse.SELECT, sql, tableSet, routeNode.isApNode());
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
+2
-2
@@ -258,8 +258,8 @@ public class PushDownVisitor extends MysqlVisitor {
|
||||
colNameCount = colNameCount.replace(getMadeAggAlias(funName), getMadeAggAlias("COUNT"));
|
||||
String colNameSum = replaceAll(colName, toReplace, "SUM(");
|
||||
colNameSum = colNameSum.replace(getMadeAggAlias(funName), getMadeAggAlias("SUM"));
|
||||
String colNameVar = replaceAll(colName, toReplace, "VARIANCE(");
|
||||
colNameVar = colNameVar.replace(getMadeAggAlias(funName), getMadeAggAlias("VARIANCE"));
|
||||
String colNameVar = replaceAll(colName, toReplace, "VAR_POP(");
|
||||
colNameVar = colNameVar.replace(getMadeAggAlias(funName), getMadeAggAlias("VAR_POP"));
|
||||
sqlBuilder.append(colNameCount).append(",").append(colNameSum).append(",").append(colNameVar).append(",");
|
||||
continue;
|
||||
}
|
||||
|
||||
+2
-2
@@ -6,7 +6,7 @@
|
||||
package com.actiontech.dble.backend.mysql.nio.handler.query.impl;
|
||||
|
||||
import com.actiontech.dble.DbleServer;
|
||||
import com.actiontech.dble.backend.datasource.ShardingNode;
|
||||
import com.actiontech.dble.backend.datasource.BaseNode;
|
||||
import com.actiontech.dble.backend.mysql.CharsetUtil;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.BaseDMLHandler;
|
||||
import com.actiontech.dble.net.Session;
|
||||
@@ -58,7 +58,7 @@ public class BaseSelectHandler extends BaseDMLHandler {
|
||||
exeConn.getBackendService().setResponseHandler(this);
|
||||
return exeConn;
|
||||
} else {
|
||||
ShardingNode dn = DbleServer.getInstance().getConfig().getShardingNodes().get(rrss.getName());
|
||||
BaseNode dn = DbleServer.getInstance().getConfig().getAllNodes().get(rrss.getName());
|
||||
//autocommit is serverSession.getWriteSource().isAutocommit() && !serverSession.getWriteSource().isTxStart()
|
||||
final BackendConnection newConn = dn.getConnection(dn.getDatabase(), autocommit, rrss);
|
||||
serverSession.bindConnection(rrss, newConn);
|
||||
|
||||
@@ -212,7 +212,7 @@ public final class HandlerTool {
|
||||
// variance: v[0]:count,v[1]:sum,v[2]:variance(locally)
|
||||
String colNameCount = colName.replace(funName + "(", "COUNT(");
|
||||
String colNameSum = colName.replace(funName + "(", "SUM(");
|
||||
String colNameVar = colName.replace(funName + "(", "VARIANCE(");
|
||||
String colNameVar = colName.replace(funName + "(", "VAR_POP(");
|
||||
Item sumFuncCount = new ItemField(null, null, colNameCount);
|
||||
sumFuncCount.setPushDownName(
|
||||
pdName.replace(MysqlVisitor.getMadeAggAlias(funName), MysqlVisitor.getMadeAggAlias("COUNT")));
|
||||
@@ -221,7 +221,7 @@ public final class HandlerTool {
|
||||
pdName.replace(MysqlVisitor.getMadeAggAlias(funName), MysqlVisitor.getMadeAggAlias("SUM")));
|
||||
Item sumFuncVar = new ItemField(null, null, colNameVar);
|
||||
sumFuncVar.setPushDownName(
|
||||
pdName.replace(MysqlVisitor.getMadeAggAlias(funName), MysqlVisitor.getMadeAggAlias("VARIANCE")));
|
||||
pdName.replace(MysqlVisitor.getMadeAggAlias(funName), MysqlVisitor.getMadeAggAlias("VAR_POP")));
|
||||
Item itemCount = createFieldItem(sumFuncCount, fields, startIndex);
|
||||
Item itemSum = createFieldItem(sumFuncSum, fields, startIndex);
|
||||
Item itemVar = createFieldItem(sumFuncVar, fields, startIndex);
|
||||
|
||||
+1
-1
@@ -200,7 +200,7 @@ public class ItemSumVariance extends ItemSumNum {
|
||||
|
||||
@Override
|
||||
public String funcName() {
|
||||
return sample == 1 ? "VAR_SAMP" : "VARIANCE";
|
||||
return sample == 1 ? "VAR_SAMP" : "VAR_POP";
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -36,6 +36,7 @@ public class RouteResultsetNode implements Serializable, Comparable<RouteResults
|
||||
private boolean isForUpdate = false;
|
||||
private volatile byte loadDataRrnStatus;
|
||||
private boolean nodeRepeat = false;
|
||||
private boolean isApNode;
|
||||
|
||||
public RouteResultsetNode(String name, int sqlType, String srcStatement) {
|
||||
this.name = name;
|
||||
@@ -64,6 +65,21 @@ public class RouteResultsetNode implements Serializable, Comparable<RouteResults
|
||||
this.tableSet = tableSet;
|
||||
}
|
||||
|
||||
public RouteResultsetNode(String name, int sqlType, String srcStatement, Set<String> tableSet, boolean isApNode) {
|
||||
this.name = name;
|
||||
this.limitStart = 0;
|
||||
this.limitSize = -1;
|
||||
this.sqlType = sqlType;
|
||||
this.statement = srcStatement;
|
||||
this.statementHash = srcStatement.hashCode();
|
||||
this.canRunInReadDB = (sqlType == ServerParse.SELECT || sqlType == ServerParse.SHOW);
|
||||
this.multiplexNum = new AtomicLong(0);
|
||||
this.repeatTableIndex = new AtomicLong(0);
|
||||
loadDataRrnStatus = 0;
|
||||
this.tableSet = tableSet;
|
||||
this.isApNode = isApNode;
|
||||
}
|
||||
|
||||
public byte getLoadDataRrnStatus() {
|
||||
return loadDataRrnStatus;
|
||||
}
|
||||
@@ -178,6 +194,14 @@ public class RouteResultsetNode implements Serializable, Comparable<RouteResults
|
||||
return tableSet;
|
||||
}
|
||||
|
||||
public boolean isApNode() {
|
||||
return isApNode;
|
||||
}
|
||||
|
||||
public void setApNode(boolean apNode) {
|
||||
isApNode = apNode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (this == obj) {
|
||||
|
||||
@@ -20,11 +20,14 @@ import com.actiontech.dble.route.parser.druid.ServerSchemaStatVisitor;
|
||||
import com.actiontech.dble.route.parser.util.Pair;
|
||||
import com.actiontech.dble.route.util.ConditionUtil;
|
||||
import com.actiontech.dble.route.util.RouterUtil;
|
||||
import com.actiontech.dble.server.parser.ServerParse;
|
||||
import com.actiontech.dble.server.util.SchemaUtil;
|
||||
import com.actiontech.dble.services.mysqlsharding.ShardingService;
|
||||
import com.actiontech.dble.singleton.ProxyMeta;
|
||||
import com.actiontech.dble.util.StringUtil;
|
||||
import com.alibaba.druid.sql.ast.SQLStatement;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLSelectQuery;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
|
||||
import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlOutputVisitor;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.slf4j.Logger;
|
||||
@@ -52,6 +55,9 @@ public class DefaultDruidParser implements DruidParser {
|
||||
public SchemaConfig parser(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, ServerSchemaStatVisitor schemaStatVisitor, ShardingService service, boolean isExplain) throws SQLException {
|
||||
ctx = new DruidShardingParseInfo();
|
||||
schema = visitorParse(schema, rrs, stmt, schemaStatVisitor, service, isExplain);
|
||||
if (this instanceof DruidSelectParser && rrs.isFinishedRoute() && !rrs.isNeedOptimizer()) {
|
||||
tryRouteToApNode(schema, rrs, stmt, service);
|
||||
}
|
||||
changeSql(schema, rrs, stmt);
|
||||
return schema;
|
||||
}
|
||||
@@ -60,6 +66,26 @@ public class DefaultDruidParser implements DruidParser {
|
||||
return this.parser(schema, rrs, stmt, schemaStatVisitor, service, false);
|
||||
}
|
||||
|
||||
public void tryRouteToApNode(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, ShardingService service) {
|
||||
if (rrs.getSqlType() == ServerParse.SELECT) {
|
||||
//simple query
|
||||
boolean inTransaction = service.isInTransaction();
|
||||
if (!inTransaction) {
|
||||
SQLSelectStatement selectStmt = (SQLSelectStatement) stmt;
|
||||
SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();
|
||||
boolean notSupport = RouterUtil.checkSQLNotSupport(sqlSelectQuery);
|
||||
if (notSupport) {
|
||||
return;
|
||||
}
|
||||
boolean isAggregate = schema != null && schema.getDefaultApNode() != null && RouterUtil.checkFunction(sqlSelectQuery);
|
||||
if (isAggregate) {
|
||||
Set<String> tableSet = ctx.getTables().stream().map(tableEntry -> tableEntry.getKey() + "." + tableEntry.getValue()).collect(Collectors.toSet());
|
||||
rrs.setNeedOptimizer(false);
|
||||
RouterUtil.routeToApNode(rrs, schema.getDefaultApNode(), tableSet);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void changeSql(SchemaConfig schema, RouteResultset rrs,
|
||||
|
||||
@@ -22,6 +22,7 @@ import com.actiontech.dble.route.parser.druid.DruidParser;
|
||||
import com.actiontech.dble.route.parser.druid.DruidShardingParseInfo;
|
||||
import com.actiontech.dble.route.parser.druid.RouteCalculateUnit;
|
||||
import com.actiontech.dble.route.parser.druid.ServerSchemaStatVisitor;
|
||||
import com.actiontech.dble.route.parser.druid.impl.DruidSingleUnitSelectParser;
|
||||
import com.actiontech.dble.route.parser.util.Pair;
|
||||
import com.actiontech.dble.server.parser.ServerParse;
|
||||
import com.actiontech.dble.server.util.SchemaUtil;
|
||||
@@ -34,15 +35,15 @@ import com.actiontech.dble.util.CharsetContext;
|
||||
import com.actiontech.dble.util.HexFormatUtil;
|
||||
import com.actiontech.dble.util.StringUtil;
|
||||
import com.alibaba.druid.sql.ast.SQLExpr;
|
||||
import com.alibaba.druid.sql.ast.SQLSetQuantifier;
|
||||
import com.alibaba.druid.sql.ast.SQLStatement;
|
||||
import com.alibaba.druid.sql.ast.expr.SQLHexExpr;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLJoinTableSource;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLSelectQuery;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLTableSource;
|
||||
import com.alibaba.druid.sql.ast.expr.SQLMethodInvokeExpr;
|
||||
import com.alibaba.druid.sql.ast.statement.*;
|
||||
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
|
||||
import com.alibaba.druid.stat.TableStat;
|
||||
import com.alibaba.druid.wall.spi.WallVisitorUtils;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -213,6 +214,21 @@ public final class RouterUtil {
|
||||
}
|
||||
rrs.setNodes(nodes);
|
||||
|
||||
if (rrs.getSqlType() == ServerParse.SELECT) {
|
||||
((DruidSingleUnitSelectParser) druidParser).tryRouteToApNode(schema, rrs, statement, service);
|
||||
}
|
||||
return rrs;
|
||||
}
|
||||
|
||||
public static RouteResultset routeToApNode(RouteResultset rrs, String apNode, Set<String> tableSet) {
|
||||
if (apNode == null) {
|
||||
return rrs;
|
||||
}
|
||||
RouteResultsetNode[] nodes = new RouteResultsetNode[1];
|
||||
nodes[0] = new RouteResultsetNode(apNode, rrs.getSqlType(), rrs.getStatement(), tableSet);
|
||||
nodes[0].setApNode(true);
|
||||
rrs.setNodes(nodes);
|
||||
rrs.setFinishedRoute(true);
|
||||
return rrs;
|
||||
}
|
||||
|
||||
@@ -1296,4 +1312,152 @@ public final class RouterUtil {
|
||||
}
|
||||
return isAllGlobal;
|
||||
}
|
||||
|
||||
/**
|
||||
* clickhouse and mysql syntax incompatibility
|
||||
*
|
||||
* @param selectQuery
|
||||
* @return
|
||||
*/
|
||||
public static boolean checkSQLNotSupport(SQLSelectQuery selectQuery) {
|
||||
if (selectQuery instanceof MySqlSelectQueryBlock) {
|
||||
MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock) selectQuery;
|
||||
//only support distinct
|
||||
if (mysqlSelectQuery.getDistionOption() != 0 && mysqlSelectQuery.getDistionOption() != SQLSetQuantifier.DISTINCT) {
|
||||
return true;
|
||||
}
|
||||
// [HIGH_PRIORITY]
|
||||
if (mysqlSelectQuery.isHignPriority()) {
|
||||
return true;
|
||||
}
|
||||
// [STRAIGHT_JOIN]
|
||||
if (mysqlSelectQuery.isStraightJoin()) {
|
||||
return true;
|
||||
}
|
||||
// [SQL_SMALL_RESULT] [SQL_BIG_RESULT] [SQL_BUFFER_RESULT]
|
||||
if (mysqlSelectQuery.isSmallResult() || mysqlSelectQuery.isBigResult() || mysqlSelectQuery.isBufferResult()) {
|
||||
return true;
|
||||
}
|
||||
// [SQL_NO_CACHE|SQL_CACHE] [SQL_CALC_FOUND_ROWS]
|
||||
if (mysqlSelectQuery.getCache() != null || mysqlSelectQuery.isCalcFoundRows()) {
|
||||
return true;
|
||||
}
|
||||
//[WINDOW window_name AS (window_spec)
|
||||
// [, window_name AS (window_spec)] ...]
|
||||
if (mysqlSelectQuery.getWindows() != null && !mysqlSelectQuery.getWindows().isEmpty()) {
|
||||
return true;
|
||||
}
|
||||
//[FOR {UPDATE | SHARE}
|
||||
// [OF tbl_name [, tbl_name] ...]
|
||||
// [NOWAIT | SKIP LOCKED]
|
||||
// | LOCK IN SHARE MODE]
|
||||
if (mysqlSelectQuery.isForUpdate() || mysqlSelectQuery.isForShare() || mysqlSelectQuery.isNoWait() || mysqlSelectQuery.isSkipLocked() || mysqlSelectQuery.isLockInShareMode()) {
|
||||
return true;
|
||||
}
|
||||
//from
|
||||
SQLTableSource tableSource = mysqlSelectQuery.getFrom();
|
||||
return checkSQLNotSupportOfTableSource(tableSource);
|
||||
} else if (selectQuery instanceof SQLUnionQuery) {
|
||||
SQLUnionQuery query = (SQLUnionQuery) selectQuery;
|
||||
List<SQLSelectQuery> relations = query.getRelations();
|
||||
for (SQLSelectQuery relation : relations) {
|
||||
if (checkSQLNotSupport(relation)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
private static boolean checkSQLNotSupportOfTableSource(SQLTableSource tableSource) {
|
||||
if (tableSource instanceof SQLExprTableSource) {
|
||||
SQLExprTableSource exprTableSource = (SQLExprTableSource) tableSource;
|
||||
if (exprTableSource.getPartitionSize() != 0) {
|
||||
return true;
|
||||
}
|
||||
} else if (tableSource instanceof SQLSubqueryTableSource) {
|
||||
SQLSubqueryTableSource fromSource = (SQLSubqueryTableSource) tableSource;
|
||||
SQLSelectQuery sqlSelectQuery = fromSource.getSelect().getQuery();
|
||||
return checkSQLNotSupport(sqlSelectQuery);
|
||||
} else if (tableSource instanceof SQLJoinTableSource) {
|
||||
SQLJoinTableSource fromSource = (SQLJoinTableSource) tableSource;
|
||||
SQLTableSource left = fromSource.getLeft();
|
||||
if (checkSQLNotSupportOfTableSource(left)) {
|
||||
return true;
|
||||
}
|
||||
SQLTableSource right = fromSource.getRight();
|
||||
return checkSQLNotSupportOfTableSource(right);
|
||||
} else if (tableSource instanceof SQLUnionQueryTableSource) {
|
||||
SQLUnionQueryTableSource fromSource = (SQLUnionQueryTableSource) tableSource;
|
||||
SQLUnionQuery unionQuery = fromSource.getUnion();
|
||||
return checkSQLNotSupport(unionQuery);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* check contains aggregate function
|
||||
*
|
||||
* @param selectQuery
|
||||
* @return
|
||||
*/
|
||||
public static boolean checkFunction(SQLSelectQuery selectQuery) {
|
||||
boolean isAggregate;
|
||||
if (selectQuery instanceof MySqlSelectQueryBlock) {
|
||||
MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock) selectQuery;
|
||||
if (mysqlSelectQuery.getGroupBy() != null) {
|
||||
return true;
|
||||
}
|
||||
//select item
|
||||
List<String> aggregateFunctionList = Lists.newArrayList("AVG", "COUNT", "MAX", "MIN", "SUM", "STDDEV_POP", "STDDEV_SAMP", "VAR_POP", "VAR_SAMP");
|
||||
for (SQLSelectItem sqlSelectItem : mysqlSelectQuery.getSelectList()) {
|
||||
SQLExpr expr = sqlSelectItem.getExpr();
|
||||
if (expr instanceof SQLMethodInvokeExpr) {
|
||||
SQLMethodInvokeExpr aggregateExpr = (SQLMethodInvokeExpr) expr;
|
||||
String methodName = aggregateExpr.getMethodName();
|
||||
isAggregate = aggregateFunctionList.contains(methodName.toLowerCase()) || aggregateFunctionList.contains(methodName.toUpperCase());
|
||||
if (isAggregate) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
//from
|
||||
SQLTableSource tableSource = mysqlSelectQuery.getFrom();
|
||||
return checkFunctionOfTableSource(tableSource);
|
||||
} else if (selectQuery instanceof SQLUnionQuery) {
|
||||
SQLUnionQuery query = (SQLUnionQuery) selectQuery;
|
||||
List<SQLSelectQuery> relations = query.getRelations();
|
||||
for (SQLSelectQuery relation : relations) {
|
||||
if (checkFunction(relation)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static boolean checkFunctionOfTableSource(SQLTableSource tableSource) {
|
||||
if (tableSource == null || tableSource instanceof SQLExprTableSource) {
|
||||
return false;
|
||||
} else if (tableSource instanceof SQLSubqueryTableSource) {
|
||||
SQLSubqueryTableSource fromSource = (SQLSubqueryTableSource) tableSource;
|
||||
SQLSelectQuery sqlSelectQuery = fromSource.getSelect().getQuery();
|
||||
return checkFunction(sqlSelectQuery);
|
||||
} else if (tableSource instanceof SQLJoinTableSource) {
|
||||
SQLJoinTableSource fromSource = (SQLJoinTableSource) tableSource;
|
||||
SQLTableSource left = fromSource.getLeft();
|
||||
if (checkFunctionOfTableSource(left)) {
|
||||
return true;
|
||||
}
|
||||
SQLTableSource right = fromSource.getRight();
|
||||
return checkFunctionOfTableSource(right);
|
||||
} else if (tableSource instanceof SQLUnionQueryTableSource) {
|
||||
SQLUnionQueryTableSource fromSource = (SQLUnionQueryTableSource) tableSource;
|
||||
SQLUnionQuery unionQuery = fromSource.getUnion();
|
||||
return checkFunction(unionQuery);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -394,6 +394,27 @@ public abstract class BackendService extends AbstractService {
|
||||
}
|
||||
}
|
||||
|
||||
protected StringBuilder getSynSqlOfAP(boolean expectAutocommit, VariablesService front) {
|
||||
// schema
|
||||
String schema = connection.getSchema();
|
||||
int schemaSyn = StringUtil.equals(schema, connection.getOldSchema()) || schema == null ? 0 : 1;
|
||||
// autocommit
|
||||
if (schemaSyn == 0 || ignoreSql(front)) {
|
||||
return null;
|
||||
}
|
||||
StringBuilder sb = new StringBuilder();
|
||||
if (schemaSyn == 1) {
|
||||
getChangeSchemaCommand(sb, schema);
|
||||
} else {
|
||||
schema = null;
|
||||
}
|
||||
metaDataSynced = false;
|
||||
statusSync = new StatusSync(schema,
|
||||
front.getCharset(), front.getTxIsolation(), expectAutocommit, front.isReadOnly(),
|
||||
schemaSyn, usrVariables, sysVariables, new HashSet<>());
|
||||
return sb;
|
||||
}
|
||||
|
||||
protected StringBuilder getSynSql(boolean expectAutocommit, VariablesService front) {
|
||||
// variables
|
||||
Set<String> toResetSys = new HashSet<>();
|
||||
|
||||
+62
-27
@@ -39,6 +39,7 @@ import com.actiontech.dble.statistic.sql.StatisticListener;
|
||||
import com.actiontech.dble.util.StringUtil;
|
||||
import com.actiontech.dble.util.TimeUtil;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -135,7 +136,7 @@ public class MySQLResponseService extends BackendService {
|
||||
if (protocolResponseHandler != defaultResponseHandler) {
|
||||
protocolResponseHandler = defaultResponseHandler;
|
||||
}
|
||||
synAndDoExecute(synSQL, sql, service.getCharset());
|
||||
synAndDoExecute(synSQL, sql, service.getCharset(), false);
|
||||
}
|
||||
|
||||
public void execute(RWSplitService service, byte[] originPacket) {
|
||||
@@ -190,7 +191,7 @@ public class MySQLResponseService extends BackendService {
|
||||
}
|
||||
String xaTxId = getConnXID(session.getSessionXaID(), rrn.getMultiplexNum().longValue());
|
||||
StringBuilder synSQL = getSynSql(xaTxId, rrn, isAutoCommit, service);
|
||||
synAndDoExecute(synSQL, rrn.getStatement(), service.getCharset());
|
||||
synAndDoExecute(synSQL, rrn.getStatement(), service.getCharset(), rrn.isApNode());
|
||||
} finally {
|
||||
TraceManager.finishSpan(this, traceObject);
|
||||
}
|
||||
@@ -215,7 +216,7 @@ public class MySQLResponseService extends BackendService {
|
||||
if (protocolResponseHandler != defaultResponseHandler) {
|
||||
protocolResponseHandler = defaultResponseHandler;
|
||||
}
|
||||
synAndDoExecute(synSQL, rrn.getStatement(), charsetName);
|
||||
synAndDoExecute(synSQL, rrn.getStatement(), charsetName, rrn.isApNode());
|
||||
}
|
||||
|
||||
public void executeMultiNode(RouteResultsetNode rrn, ShardingService service, boolean isAutoCommit) {
|
||||
@@ -232,7 +233,7 @@ public class MySQLResponseService extends BackendService {
|
||||
} else if (protocolResponseHandler != defaultResponseHandler) {
|
||||
protocolResponseHandler = defaultResponseHandler;
|
||||
}
|
||||
synAndDoExecuteMultiNode(synSQL, rrn, service.getCharset());
|
||||
synAndDoExecuteMultiNode(synSQL, rrn, service.getCharset(), rrn.isApNode());
|
||||
} catch (Exception e) {
|
||||
LOGGER.info("route error {},{},{}", rrn, this, service);
|
||||
throw e;
|
||||
@@ -241,7 +242,7 @@ public class MySQLResponseService extends BackendService {
|
||||
}
|
||||
}
|
||||
|
||||
private void synAndDoExecuteMultiNode(StringBuilder synSQL, RouteResultsetNode rrn, CharsetNames clientCharset) {
|
||||
private void synAndDoExecuteMultiNode(StringBuilder synSQL, RouteResultsetNode rrn, CharsetNames clientCharset, boolean apNode) {
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("send cmd by WriteToBackendExecutor to conn[" + this + "]");
|
||||
}
|
||||
@@ -256,22 +257,37 @@ public class MySQLResponseService extends BackendService {
|
||||
}
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("con need syn,sync sql is " + synSQL.toString() + ", con:" + this);
|
||||
LOGGER.debug("con need syn,sync sql is " + synSQL + ", con:" + this);
|
||||
}
|
||||
// and our query sql to multi command at last
|
||||
synSQL.append(rrn.getStatement()).append(";");
|
||||
// syn and execute others
|
||||
if (session != null) {
|
||||
session.setBackendRequestTime(this);
|
||||
|
||||
if (apNode) {
|
||||
// clickhouse does not support multi-statements
|
||||
// syn and execute others
|
||||
if (session != null) {
|
||||
session.setBackendRequestTime(this);
|
||||
}
|
||||
// syn sharding
|
||||
List<WriteToBackendTask> taskList = new ArrayList<>(2);
|
||||
taskList.add(sendQueryCmdTask(synSQL.toString(), clientCharset));
|
||||
taskList.add(sendQueryCmdTask(rrn.getStatement(), clientCharset));
|
||||
DbleServer.getInstance().getWriteToBackendQueue().add(taskList);
|
||||
// waiting syn result...
|
||||
} else {
|
||||
// and our query sql to multi command at last
|
||||
synSQL.append(rrn.getStatement()).append(";");
|
||||
// syn and execute others
|
||||
if (session != null) {
|
||||
session.setBackendRequestTime(this);
|
||||
}
|
||||
// syn sharding
|
||||
List<WriteToBackendTask> taskList = new ArrayList<>(1);
|
||||
taskList.add(sendQueryCmdTask(synSQL.toString(), clientCharset));
|
||||
DbleServer.getInstance().getWriteToBackendQueue().add(taskList);
|
||||
// waiting syn result...
|
||||
}
|
||||
// syn sharding
|
||||
List<WriteToBackendTask> taskList = new ArrayList<>(1);
|
||||
taskList.add(sendQueryCmdTask(synSQL.toString(), clientCharset));
|
||||
DbleServer.getInstance().getWriteToBackendQueue().add(taskList);
|
||||
// waiting syn result...
|
||||
}
|
||||
|
||||
private void synAndDoExecute(StringBuilder synSQL, String sql, CharsetNames clientCharset) {
|
||||
private void synAndDoExecute(StringBuilder synSQL, String sql, CharsetNames clientCharset, boolean apNode) {
|
||||
TraceManager.TraceObject traceObject = TraceManager.serviceTrace(this, "syn&do-execute-sql");
|
||||
if (synSQL != null && traceObject != null) {
|
||||
TraceManager.log(ImmutableMap.of("synSQL", synSQL), traceObject);
|
||||
@@ -287,16 +303,30 @@ public class MySQLResponseService extends BackendService {
|
||||
}
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("con need syn,sync sql is " + synSQL.toString() + ", con:" + this);
|
||||
LOGGER.debug("con need syn,sync sql is " + synSQL + ", con:" + this);
|
||||
}
|
||||
// and our query sql to multi command at last
|
||||
synSQL.append(sql).append(";");
|
||||
// syn and execute others
|
||||
if (session != null) {
|
||||
session.setBackendRequestTime(this);
|
||||
if (apNode) {
|
||||
// clickhouse does not support multi-statements
|
||||
// syn and execute others
|
||||
if (session != null) {
|
||||
session.setBackendRequestTime(this);
|
||||
}
|
||||
List<String> sqlList = Lists.newArrayList(synSQL.toString(), sql);
|
||||
for (String statement : sqlList) {
|
||||
this.sendQueryCmd(statement, clientCharset);
|
||||
}
|
||||
// waiting syn result...
|
||||
} else {
|
||||
// and our query sql to multi command at last
|
||||
synSQL.append(sql).append(";");
|
||||
// syn and execute others
|
||||
if (session != null) {
|
||||
session.setBackendRequestTime(this);
|
||||
}
|
||||
this.sendQueryCmd(synSQL.toString(), clientCharset);
|
||||
// waiting syn result...
|
||||
}
|
||||
this.sendQueryCmd(synSQL.toString(), clientCharset);
|
||||
// waiting syn result...
|
||||
|
||||
} finally {
|
||||
TraceManager.finishSpan(this, traceObject);
|
||||
}
|
||||
@@ -314,8 +344,13 @@ public class MySQLResponseService extends BackendService {
|
||||
}
|
||||
|
||||
private StringBuilder getSynSql(String xaTxID, RouteResultsetNode rrn, boolean expectAutocommit, VariablesService front) {
|
||||
|
||||
StringBuilder sb = getSynSql(expectAutocommit, front);
|
||||
StringBuilder sb;
|
||||
if (rrn != null && rrn.isApNode()) {
|
||||
sb = getSynSqlOfAP(expectAutocommit, front);
|
||||
return sb;
|
||||
} else {
|
||||
sb = getSynSql(expectAutocommit, front);
|
||||
}
|
||||
|
||||
if (!expectAutocommit && xaTxID != null && xaStatus == TxState.TX_INITIALIZE_STATE && !isDDL) {
|
||||
// clientTxIsolation = Isolation.SERIALIZABLE;TODO:NEEDED?
|
||||
|
||||
@@ -6,13 +6,21 @@
|
||||
package com.actiontech.dble.parser.druid;
|
||||
|
||||
import com.actiontech.dble.route.parser.druid.impl.DruidSelectParser;
|
||||
import com.actiontech.dble.route.parser.util.DruidUtil;
|
||||
import com.actiontech.dble.route.util.RouterUtil;
|
||||
import com.alibaba.druid.sql.ast.SQLExpr;
|
||||
import com.alibaba.druid.sql.ast.SQLStatement;
|
||||
import com.alibaba.druid.sql.ast.expr.SQLIdentifierExpr;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLSelectQuery;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
|
||||
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -65,5 +73,134 @@ public class DruidSelectParserTest {
|
||||
return method.invoke(druidSelectParser, groupByItems, aliaColumns);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAPNotSupport() throws SQLException {
|
||||
String origSQL1 = "select ALL id from sbtest1";
|
||||
String origSQL2 = "select DISTINCTROW id from sbtest1";
|
||||
String origSQL3 = "select HIGH_PRIORITY id from sbtest1";
|
||||
String origSQL4 = "select STRAIGHT_JOIN id from sbtest1";
|
||||
String origSQL5 = "select SQL_SMALL_RESULT id from sbtest1";
|
||||
String origSQL6 = "select SQL_BIG_RESULT id from sbtest1";
|
||||
String origSQL7 = "select SQL_BUFFER_RESULT id from sbtest1";
|
||||
String origSQL8 = "select SQL_NO_CACHE id from sbtest1";
|
||||
String origSQL9 = "select SQL_CALC_FOUND_ROWS id from sbtest1";
|
||||
String origSQL10 = "select id,RANK() OVER w AS 'rank' from sbtest1 WINDOW w AS (ORDER BY id)";
|
||||
String origSQL11 = "select id from sbtest1 for update";
|
||||
String origSQL12 = "select id from sbtest1 for share";
|
||||
String origSQL13 = "select id from sbtest1 for update NOWAIT";
|
||||
String origSQL14 = "select id from sbtest1 for update SKIP LOCKED";
|
||||
String origSQL15 = "select id from sbtest1 LOCK IN SHARE MODE";
|
||||
ArrayList<String> list = Lists.newArrayList(origSQL1, origSQL2, origSQL3, origSQL4, origSQL5, origSQL6, origSQL7, origSQL8, origSQL9, origSQL10, origSQL11, origSQL12, origSQL13, origSQL14, origSQL15);
|
||||
for (String sql : list) {
|
||||
SQLStatement stmt = DruidUtil.parseSQL(sql);
|
||||
MySqlSelectQueryBlock query = (MySqlSelectQueryBlock) ((SQLSelectStatement) stmt).getSelect().getQuery();
|
||||
boolean notSupport = RouterUtil.checkSQLNotSupport(query);
|
||||
Assert.assertTrue(notSupport);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAPNotSupport_join() throws SQLException {
|
||||
String sql = "select id from sbtest1 a union all select ALL id from sbtest1";
|
||||
SQLStatement stmt = DruidUtil.parseSQL(sql);
|
||||
SQLSelectQuery query = ((SQLSelectStatement) stmt).getSelect().getQuery();
|
||||
boolean notSupport = RouterUtil.checkSQLNotSupport(query);
|
||||
Assert.assertTrue(notSupport);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testAPNotSupport_subQuery_union() throws SQLException {
|
||||
String sql = "select a.* from (select * from sbtest1 LOCK IN SHARE MODE) a union select * from sbtest1";
|
||||
SQLStatement stmt = DruidUtil.parseSQL(sql);
|
||||
SQLSelectQuery query = ((SQLSelectStatement) stmt).getSelect().getQuery();
|
||||
boolean notSupport = RouterUtil.checkSQLNotSupport(query);
|
||||
Assert.assertTrue(notSupport);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOLAP() throws SQLException {
|
||||
String origSQL1 = "select VAR_SAMP(id) from sbtest1";
|
||||
String origSQL2 = "select min(id) from sbtest1";
|
||||
String origSQL3 = "select sum(id) from sbtest1";
|
||||
String origSQL4 = "select max(id) from sbtest1";
|
||||
String origSQL5 = "select count(id) from sbtest1";
|
||||
String origSQL6 = "select avg(id) from sbtest1";
|
||||
String origSQL7 = "select STDDEV_POP(id) from sbtest1";
|
||||
String origSQL8 = "select STDDEV_SAMP(id) from sbtest1";
|
||||
String origSQL9 = "select VAR_POP(id) from sbtest1";
|
||||
String origSQL10 = "select VAR_SAMP(id) from sbtest1";
|
||||
String origSQL11 = "select id from sbtest1 group by id";
|
||||
String origSQL12 = "select id from (select min(id) as id from sbtest1) a";
|
||||
ArrayList<String> list = Lists.newArrayList(origSQL1, origSQL2, origSQL3, origSQL4, origSQL5, origSQL6, origSQL7, origSQL8, origSQL9, origSQL10, origSQL11, origSQL12);
|
||||
for (String sql : list) {
|
||||
SQLStatement stmt = DruidUtil.parseSQL(sql);
|
||||
MySqlSelectQueryBlock query = (MySqlSelectQueryBlock) ((SQLSelectStatement) stmt).getSelect().getQuery();
|
||||
boolean checkFunction = RouterUtil.checkFunction(query);
|
||||
Assert.assertTrue(checkFunction);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOLAP_subQuery() throws SQLException {
|
||||
String sql = "select b.id from (select a.id from (select min(id) as id from sbtest1) a) b";
|
||||
SQLStatement stmt = DruidUtil.parseSQL(sql);
|
||||
MySqlSelectQueryBlock query = (MySqlSelectQueryBlock) ((SQLSelectStatement) stmt).getSelect().getQuery();
|
||||
boolean checkFunction = RouterUtil.checkFunction(query);
|
||||
Assert.assertTrue(checkFunction);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOLAP_join() throws SQLException {
|
||||
String sql = "select a.id,min(b.id) from table_1 a join sbtest1 b on a.id = b.id";
|
||||
SQLStatement stmt = DruidUtil.parseSQL(sql);
|
||||
MySqlSelectQueryBlock query = (MySqlSelectQueryBlock) ((SQLSelectStatement) stmt).getSelect().getQuery();
|
||||
boolean checkFunction = RouterUtil.checkFunction(query);
|
||||
Assert.assertTrue(checkFunction);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOLAP_union() throws SQLException {
|
||||
String sql = "select a.id from table_1 a union all select b.id from table_1 b union all select min(c.id) from sbtest1 c";
|
||||
SQLStatement stmt = DruidUtil.parseSQL(sql);
|
||||
SQLSelectQuery query = ((SQLSelectStatement) stmt).getSelect().getQuery();
|
||||
boolean checkFunction = RouterUtil.checkFunction(query);
|
||||
Assert.assertTrue(checkFunction);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOLAP_join_subQuery() throws SQLException {
|
||||
String sql = "select a.id,b.id from table_1 a join (select min(id) as id from sbtest1) b on a.id = b.id";
|
||||
SQLStatement stmt = DruidUtil.parseSQL(sql);
|
||||
MySqlSelectQueryBlock query = (MySqlSelectQueryBlock) ((SQLSelectStatement) stmt).getSelect().getQuery();
|
||||
boolean checkFunction = RouterUtil.checkFunction(query);
|
||||
Assert.assertTrue(checkFunction);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOLAP_subQuery_join() throws SQLException {
|
||||
String sql = "select c.id from (select a.id,min(b.id) from table_1 a join sbtest1 b on a.id = b.id) c";
|
||||
SQLStatement stmt = DruidUtil.parseSQL(sql);
|
||||
MySqlSelectQueryBlock query = (MySqlSelectQueryBlock) ((SQLSelectStatement) stmt).getSelect().getQuery();
|
||||
boolean checkFunction = RouterUtil.checkFunction(query);
|
||||
Assert.assertTrue(checkFunction);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOLAP_subQuery_union() throws SQLException {
|
||||
String sql = "select c.id from (select a.id from table_1 a union all select min(b.id) from sbtest1 b) c";
|
||||
SQLStatement stmt = DruidUtil.parseSQL(sql);
|
||||
MySqlSelectQueryBlock query = (MySqlSelectQueryBlock) ((SQLSelectStatement) stmt).getSelect().getQuery();
|
||||
boolean checkFunction = RouterUtil.checkFunction(query);
|
||||
Assert.assertTrue(checkFunction);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOLAP_union_subQuery() throws SQLException {
|
||||
String sql = "select a.id from table_1 a union all select b.id from (select VAR_SAMP(id) as id from sbtest1) b";
|
||||
SQLStatement stmt = DruidUtil.parseSQL(sql);
|
||||
SQLSelectQuery query = ((SQLSelectStatement) stmt).getSelect().getQuery();
|
||||
boolean checkFunction = RouterUtil.checkFunction(query);
|
||||
Assert.assertTrue(checkFunction);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user