mirror of
https://github.com/actiontech/dble.git
synced 2026-01-06 04:40:17 -06:00
fix: support multi-table update split delivery
This commit is contained in:
@@ -12,10 +12,7 @@ import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.*;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.AggregateHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.DirectGroupByHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.AllAnySubQueryHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.InSubQueryHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.SingleRowSubQueryHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.SubQueryHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.*;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.util.CallBackHandler;
|
||||
import com.actiontech.dble.config.ErrorCode;
|
||||
import com.actiontech.dble.config.model.sharding.SchemaConfig;
|
||||
@@ -25,10 +22,7 @@ import com.actiontech.dble.plan.Order;
|
||||
import com.actiontech.dble.plan.common.exception.MySQLOutPutException;
|
||||
import com.actiontech.dble.plan.common.item.Item;
|
||||
import com.actiontech.dble.plan.common.item.function.sumfunc.ItemSum;
|
||||
import com.actiontech.dble.plan.common.item.subquery.ItemAllAnySubQuery;
|
||||
import com.actiontech.dble.plan.common.item.subquery.ItemInSubQuery;
|
||||
import com.actiontech.dble.plan.common.item.subquery.ItemSingleRowSubQuery;
|
||||
import com.actiontech.dble.plan.common.item.subquery.ItemSubQuery;
|
||||
import com.actiontech.dble.plan.common.item.subquery.*;
|
||||
import com.actiontech.dble.plan.node.JoinNode;
|
||||
import com.actiontech.dble.plan.node.PlanNode;
|
||||
import com.actiontech.dble.plan.node.PlanNode.PlanNodeType;
|
||||
@@ -76,6 +70,7 @@ public abstract class BaseHandlerBuilder {
|
||||
|
||||
protected boolean isExplain;
|
||||
private final List<BaseHandlerBuilder> subQueryBuilderList = new CopyOnWriteArrayList<>();
|
||||
protected boolean isFastBack;
|
||||
|
||||
protected BaseHandlerBuilder(NonBlockingSession session, PlanNode node, HandlerBuilder hBuilder, boolean isExplain) {
|
||||
this.session = session;
|
||||
@@ -114,6 +109,9 @@ public abstract class BaseHandlerBuilder {
|
||||
} else {
|
||||
//need to split to simple query
|
||||
preHandlers = buildPre();
|
||||
if (isFastBack) {
|
||||
return;
|
||||
}
|
||||
buildOwn();
|
||||
}
|
||||
if (!node.isSingleRoute()) {
|
||||
@@ -535,6 +533,11 @@ public abstract class BaseHandlerBuilder {
|
||||
final SubQueryHandler tempHandler = new AllAnySubQueryHandler(getSequenceId(), session, (ItemAllAnySubQuery) itemSubQuery, isExplain);
|
||||
DMLResponseHandler endHandler = getSubQueryHandler(itemSubQuery.getPlanNode(), tempHandler);
|
||||
endHandlers.add(endHandler);
|
||||
} else if (itemSubQuery instanceof UpdateItemSubQuery) {
|
||||
SubQueryHandler tempHandler = new UpdateSubQueryHandler(getSequenceId(), session, (UpdateItemSubQuery) itemSubQuery, isExplain);
|
||||
PlanNode queryNode = ((UpdateItemSubQuery) itemSubQuery).getQueryNode();
|
||||
DMLResponseHandler endHandler = getSubQueryHandler(queryNode == null ? itemSubQuery.getPlanNode() : queryNode, tempHandler);
|
||||
endHandlers.add(endHandler);
|
||||
}
|
||||
}
|
||||
return endHandlers;
|
||||
@@ -682,6 +685,9 @@ public abstract class BaseHandlerBuilder {
|
||||
return subQueryBuilderList.stream().anyMatch(BaseHandlerBuilder::isExistView) || node.isExistView();
|
||||
}
|
||||
|
||||
public boolean isFastBack() {
|
||||
return isFastBack;
|
||||
}
|
||||
|
||||
public boolean isContainSubQuery(PlanNode planNode) {
|
||||
return planNode.getSubQueries().size() > 0 || planNode.getChildren().stream().anyMatch(this::isContainSubQuery);
|
||||
|
||||
@@ -6,8 +6,13 @@
|
||||
package com.actiontech.dble.backend.mysql.nio.handler.builder;
|
||||
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.builder.sqlvisitor.GlobalVisitor;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.BaseDMLHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.*;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.MultiNodeEasyMergeHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.MultiNodeMergeHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.SendMakeHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.TempTableHandler;
|
||||
import com.actiontech.dble.net.mysql.OkPacket;
|
||||
import com.actiontech.dble.plan.node.*;
|
||||
import com.actiontech.dble.route.RouteResultsetNode;
|
||||
import com.actiontech.dble.route.util.RouterUtil;
|
||||
@@ -82,6 +87,11 @@ public class HandlerBuilder {
|
||||
try {
|
||||
final long startTime = System.nanoTime();
|
||||
BaseHandlerBuilder builder = getBuilder(session, node, false);
|
||||
if (builder.isFastBack()) {
|
||||
//fast back
|
||||
session.getShardingService().write(OkPacket.getDefault());
|
||||
return null;
|
||||
}
|
||||
DMLResponseHandler endHandler = builder.getEndHandler();
|
||||
DMLResponseHandler fh = FinalHandlerFactory.createFinalHandler(session);
|
||||
endHandler.setNextHandler(fh);
|
||||
@@ -97,7 +107,7 @@ public class HandlerBuilder {
|
||||
}
|
||||
}
|
||||
}
|
||||
for (BaseSelectHandler baseHandler : mergeHandler.getExeHandlers()) {
|
||||
for (BaseDMLHandler baseHandler : mergeHandler.getExeHandlers()) {
|
||||
baseHandler.getRrss().setRunOnSlave(this.session.getComplexRrs().getRunOnSlave());
|
||||
}
|
||||
}
|
||||
@@ -255,6 +265,8 @@ public class HandlerBuilder {
|
||||
return new NoNameNodeHandlerBuilder(nonBlockingSession, (NoNameNode) planNode, this, isExplain);
|
||||
} else if (i == PlanNode.PlanNodeType.JOIN_INNER) {
|
||||
return new JoinInnerHandlerBuilder(nonBlockingSession, (JoinInnerNode) planNode, this, isExplain);
|
||||
} else if (i == PlanNode.PlanNodeType.MODIFY) {
|
||||
return new ModifyNodeHandlerBuilder(nonBlockingSession, (ModifyNode) planNode, this, isExplain);
|
||||
}
|
||||
throw new RuntimeException("not supported tree node type:" + planNode.type());
|
||||
}
|
||||
|
||||
@@ -0,0 +1,110 @@
|
||||
/*
|
||||
* Copyright (C) 2016-2022 ActionTech.
|
||||
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
|
||||
*/
|
||||
|
||||
package com.actiontech.dble.backend.mysql.nio.handler.builder;
|
||||
|
||||
import com.actiontech.dble.DbleServer;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.builder.sqlvisitor.UpdateVisitor;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.MultiNodeUpdateHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.SendMakeHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.foreach.MergeUpdateHandler;
|
||||
import com.actiontech.dble.config.ErrorCode;
|
||||
import com.actiontech.dble.config.model.sharding.SchemaConfig;
|
||||
import com.actiontech.dble.plan.common.exception.MySQLOutPutException;
|
||||
import com.actiontech.dble.plan.common.item.Item;
|
||||
import com.actiontech.dble.plan.common.item.subquery.ItemSubQuery;
|
||||
import com.actiontech.dble.plan.common.item.subquery.UpdateItemSubQuery;
|
||||
import com.actiontech.dble.plan.node.ModifyNode;
|
||||
import com.actiontech.dble.route.RouteResultset;
|
||||
import com.actiontech.dble.route.RouteResultsetNode;
|
||||
import com.actiontech.dble.route.parser.druid.ServerSchemaStatVisitor;
|
||||
import com.actiontech.dble.route.parser.druid.impl.DruidUpdateParser;
|
||||
import com.actiontech.dble.route.util.RouterUtil;
|
||||
import com.actiontech.dble.server.NonBlockingSession;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLUpdateStatement;
|
||||
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
|
||||
import com.alibaba.druid.sql.parser.SQLStatementParser;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
class ModifyNodeHandlerBuilder extends BaseHandlerBuilder {
|
||||
private final ModifyNode node;
|
||||
private long preHandlerSize;
|
||||
|
||||
ModifyNodeHandlerBuilder(NonBlockingSession session, ModifyNode node, HandlerBuilder hBuilder, boolean isExplain) {
|
||||
super(session, node, hBuilder, isExplain);
|
||||
this.node = node;
|
||||
this.canPushDown = !node.existUnPushDownGroup();
|
||||
this.needWhereHandler = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean tryBuildWithCurrentNode(List<DMLResponseHandler> subQueryEndHandlers, Set<String> subQueryRouteNodes) throws SQLException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<DMLResponseHandler> buildPre() {
|
||||
if (node.getSubQueries().size() == 1) {
|
||||
// no optimizer
|
||||
List<DMLResponseHandler> subQueryEndHandlers;
|
||||
subQueryEndHandlers = getSubQueriesEndHandlers(node.getSubQueries());
|
||||
if (!isExplain) {
|
||||
// execute sub query sync
|
||||
executeSubQueries(subQueryEndHandlers);
|
||||
}
|
||||
}
|
||||
ItemSubQuery itemSubQuery = node.getSubQueries().get(0);
|
||||
if (!(itemSubQuery instanceof UpdateItemSubQuery)) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
UpdateItemSubQuery updateItemSubQuery = (UpdateItemSubQuery) itemSubQuery;
|
||||
if (updateItemSubQuery.getValue().isEmpty()) {
|
||||
this.isFastBack = true;
|
||||
return null;
|
||||
}
|
||||
return buildUpdateHandler(updateItemSubQuery);
|
||||
}
|
||||
|
||||
private List<DMLResponseHandler> buildUpdateHandler(UpdateItemSubQuery updateItemSubQuery) {
|
||||
List<DMLResponseHandler> preHandlers = Lists.newArrayList();
|
||||
for (List<Item> valueItemList : updateItemSubQuery.getValue()) {
|
||||
UpdateVisitor updateVisitor = new UpdateVisitor(node, true, valueItemList, updateItemSubQuery.getSelect(), isExplain);
|
||||
RouteResultset rrs = updateVisitor.buildRouteResultset();
|
||||
|
||||
SchemaConfig schemaConfig = DbleServer.getInstance().getConfig().getSchemas().get(this.session.getShardingService().getSchema());
|
||||
DruidUpdateParser updateParser = new DruidUpdateParser();
|
||||
SQLStatementParser parser = new MySqlStatementParser(rrs.getSrcStatement());
|
||||
SQLUpdateStatement updateStatement = (SQLUpdateStatement) parser.parseStatement();
|
||||
try {
|
||||
rrs = RouterUtil.routeFromParser(updateParser, schemaConfig, rrs, updateStatement, new ServerSchemaStatVisitor(schemaConfig.getName()), session.getShardingService(), isExplain);
|
||||
} catch (SQLException e) {
|
||||
throw new MySQLOutPutException(ErrorCode.ER_YES, "", e.getMessage());
|
||||
}
|
||||
|
||||
RouteResultsetNode[] rrssArray = rrs.getNodes();
|
||||
hBuilder.checkRRSs(rrssArray);
|
||||
MultiNodeUpdateHandler mh = new MultiNodeUpdateHandler(getSequenceId(), session, rrssArray, session.getShardingService().isAutocommit() && !session.getShardingService().isTxStart());
|
||||
|
||||
SendMakeHandler sh = new SendMakeHandler(getSequenceId(), session, node.getColumnsSelected(), schemaConfig.getName(), null, node.getAlias());
|
||||
mh.setNextHandler(sh);
|
||||
preHandlers.add(sh);
|
||||
}
|
||||
this.preHandlerSize = preHandlers.size();
|
||||
return preHandlers;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void buildOwn() {
|
||||
MergeUpdateHandler mergeUpdateHandler = new MergeUpdateHandler(getSequenceId(), session, preHandlerSize);
|
||||
addHandler(mergeUpdateHandler);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,280 @@
|
||||
/*
|
||||
* Copyright (C) 2016-2022 ActionTech.
|
||||
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
|
||||
*/
|
||||
|
||||
package com.actiontech.dble.backend.mysql.nio.handler.builder.sqlvisitor;
|
||||
|
||||
import com.actiontech.dble.plan.common.item.Item;
|
||||
import com.actiontech.dble.plan.common.item.ItemNull;
|
||||
import com.actiontech.dble.plan.common.item.ItemString;
|
||||
import com.actiontech.dble.plan.common.item.function.ItemFunc;
|
||||
import com.actiontech.dble.plan.common.item.function.operator.ItemBoolFunc2;
|
||||
import com.actiontech.dble.plan.common.item.function.operator.cmpfunc.ItemFuncEqual;
|
||||
import com.actiontech.dble.plan.common.item.function.operator.cmpfunc.ItemFuncIn;
|
||||
import com.actiontech.dble.plan.common.item.function.operator.cmpfunc.ItemFuncNe;
|
||||
import com.actiontech.dble.plan.common.item.function.operator.logic.ItemCondAnd;
|
||||
import com.actiontech.dble.plan.common.item.function.operator.logic.ItemCondOr;
|
||||
import com.actiontech.dble.plan.common.item.function.operator.logic.ItemFuncNot;
|
||||
import com.actiontech.dble.plan.common.ptr.BoolPtr;
|
||||
import com.actiontech.dble.plan.node.ModifyNode;
|
||||
import com.actiontech.dble.plan.node.PlanNode;
|
||||
import com.actiontech.dble.plan.node.TableNode;
|
||||
import com.actiontech.dble.plan.util.PlanUtil;
|
||||
import com.actiontech.dble.route.RouteResultset;
|
||||
import com.actiontech.dble.server.parser.ServerParse;
|
||||
import com.actiontech.dble.util.StringUtil;
|
||||
import com.alibaba.druid.sql.ast.expr.SQLBinaryOperator;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import static com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.UpdateSubQueryHandler.NEED_REPLACE;
|
||||
|
||||
|
||||
public class UpdateVisitor extends MysqlVisitor {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(UpdateVisitor.class);
|
||||
private final List<Item> valueItemList;
|
||||
private final List<Item> fieldList;
|
||||
protected boolean isExplain;
|
||||
|
||||
public UpdateVisitor(PlanNode update, boolean isTopQuery, List<Item> valueItemList, List<Item> fieldList, boolean isExplain) {
|
||||
super(update, isTopQuery);
|
||||
this.valueItemList = valueItemList;
|
||||
this.fieldList = fieldList;
|
||||
this.isExplain = isExplain;
|
||||
}
|
||||
|
||||
public void visit() {
|
||||
if (!visited) {
|
||||
replaceableSqlBuilder.clear();
|
||||
sqlBuilder = replaceableSqlBuilder.getCurrentElement().getSb();
|
||||
// if visited,push down visitor need just replace the name
|
||||
PlanNode.PlanNodeType i = query.type();
|
||||
if (i == PlanNode.PlanNodeType.MODIFY) {
|
||||
visit((ModifyNode) query);
|
||||
|
||||
} else {
|
||||
throw new RuntimeException("not implement yet!");
|
||||
}
|
||||
visited = true;
|
||||
}
|
||||
}
|
||||
|
||||
protected void visit(ModifyNode update) {
|
||||
sqlBuilder.append("update ");
|
||||
|
||||
//from
|
||||
update.getReferedTableNodes().stream()
|
||||
.forEach(tableNode -> {
|
||||
if (update.getSetItemList().stream()
|
||||
.anyMatch(setItem -> !StringUtil.isEmpty(setItem.arguments().get(0).getTableName()) && (setItem.arguments().get(0).getTableName().equals(tableNode.getAlias()) || setItem.arguments().get(0).getTableName().equals(tableNode.getTableName())))) {
|
||||
buildTableName(tableNode, sqlBuilder);
|
||||
}
|
||||
});
|
||||
|
||||
//set
|
||||
sqlBuilder.append(" set ");
|
||||
String tableName = null;
|
||||
StringBuilder setBuilder = new StringBuilder();
|
||||
for (ItemFuncEqual itemFuncEqual : update.getSetItemList()) {
|
||||
Item setItem = itemFuncEqual.arguments().get(0);
|
||||
tableName = setItem.getTableName();
|
||||
if (StringUtil.isEmpty(tableName)) {
|
||||
setBuilder.append("`" + setItem.getItemName() + "`");
|
||||
} else {
|
||||
setBuilder.append("`" + setItem.getTableName() + "`.`" + setItem.getItemName() + "`");
|
||||
}
|
||||
setBuilder.append(" = ");
|
||||
Item valueItem = itemFuncEqual.arguments().get(1);
|
||||
String sqlStr = buildSQLStr(valueItem, tableName);
|
||||
setBuilder.append(sqlStr);
|
||||
setBuilder.append(",");
|
||||
}
|
||||
setBuilder.deleteCharAt(setBuilder.length() - 1);
|
||||
sqlBuilder.append(setBuilder);
|
||||
|
||||
//where
|
||||
Item whereFilter = update.getWhereFilter();
|
||||
if (whereFilter != null) {
|
||||
Item item = rebuildUpdateItem(whereFilter, tableName);
|
||||
String selName = getUpdateItemName(item);
|
||||
sqlBuilder.append(" where ").append(selName);
|
||||
}
|
||||
}
|
||||
|
||||
private String buildSQLStr(Item valueItem, String tableName) {
|
||||
if (!StringUtil.equalsIgnoreCase(tableName, valueItem.getTableName()) && isExplain) {
|
||||
return new ItemString(NEED_REPLACE, valueItem.getCharsetIndex()).toString();
|
||||
}
|
||||
if (valueItemList.size() == 1) {
|
||||
//autoalias_scalar
|
||||
return valueItemList.get(0).toString();
|
||||
}
|
||||
int index = getItemIndex(valueItem);
|
||||
if (index < 0) {
|
||||
return valueItem.toExpression().toString();
|
||||
} else {
|
||||
return valueItemList.get(index).toString();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String visitPushDownNameSel(Item o) {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
// try to trim sharding from field_item
|
||||
protected String getUpdateItemName(Item item) {
|
||||
if (item instanceof ItemCondOr) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(" ( ");
|
||||
for (int index = 0; index < item.getArgCount(); index++) {
|
||||
if (index > 0) {
|
||||
sb.append(" OR ");
|
||||
}
|
||||
sb.append(getUpdateItemName(item.arguments().get(index)));
|
||||
}
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
} else if (item instanceof ItemCondAnd) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(" ( ");
|
||||
for (int index = 0; index < item.getArgCount(); index++) {
|
||||
if (index > 0) {
|
||||
sb.append(" AND ");
|
||||
}
|
||||
sb.append(getUpdateItemName(item.arguments().get(index)));
|
||||
}
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
} else if (item instanceof ItemFuncNot) {
|
||||
return " ( NOT " + getUpdateItemName(item.arguments().get(0)) + ")";
|
||||
} else if (item instanceof ItemBoolFunc2) {
|
||||
return getBoolFuncItemName(item);
|
||||
} else if (item.type().equals(Item.ItemType.FIELD_ITEM)) {
|
||||
String tableName = "`" + item.getTableName() + "`.`" + item.getItemName() + "`";
|
||||
if (item.getDbName() == null) {
|
||||
return tableName;
|
||||
}
|
||||
if (item.getReferTables().size() == 0) {
|
||||
return tableName;
|
||||
}
|
||||
PlanNode tbNode = item.getReferTables().iterator().next();
|
||||
if (!(tbNode instanceof TableNode)) {
|
||||
return tableName;
|
||||
}
|
||||
if (!((TableNode) tbNode).getTableName().equals(item.getTableName())) {
|
||||
return tableName;
|
||||
}
|
||||
return "`" + item.getDbName() + "`." + tableName;
|
||||
} else if (item instanceof ItemFuncIn) {
|
||||
Item a = item.arguments().get(0);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(getUpdateItemName(a));
|
||||
if (((ItemFuncIn) item).isNegate()) {
|
||||
sb.append(" not ");
|
||||
}
|
||||
sb.append(" in (");
|
||||
for (int index = 1; index < item.arguments().size(); index++) {
|
||||
if (index > 1) {
|
||||
sb.append(",");
|
||||
}
|
||||
sb.append(getUpdateItemName(item.arguments().get(index)));
|
||||
}
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
} else {
|
||||
return item.getItemName();
|
||||
}
|
||||
}
|
||||
|
||||
private String getBoolFuncItemName(Item item) {
|
||||
Item a = item.arguments().get(0);
|
||||
Item b = item.arguments().get(1);
|
||||
String left = getUpdateItemName(a), right = getUpdateItemName(b);
|
||||
if (a instanceof ItemNull && !(b instanceof ItemNull)) {
|
||||
left = getUpdateItemName(b);
|
||||
right = getUpdateItemName(a);
|
||||
}
|
||||
String operator = ((ItemBoolFunc2) item).funcName();
|
||||
if (a instanceof ItemNull || b instanceof ItemNull) {
|
||||
if (item instanceof ItemFuncEqual) {
|
||||
operator = SQLBinaryOperator.Is.getName();
|
||||
} else if (item instanceof ItemFuncNe) {
|
||||
operator = SQLBinaryOperator.IsNot.getName();
|
||||
}
|
||||
}
|
||||
return left + " " + operator + " " + right;
|
||||
}
|
||||
|
||||
|
||||
public Item rebuildUpdateItem(Item item, String tableName) {
|
||||
BoolPtr reBuild = new BoolPtr(false);
|
||||
if (PlanUtil.isCmpFunc(item)) {
|
||||
Item res1 = PlanUtil.rebuildBoolSubQuery(item, 0, reBuild, new BoolPtr(false), new BoolPtr(false));
|
||||
if (res1 != null) {
|
||||
return res1;
|
||||
}
|
||||
|
||||
BoolPtr needExecuteNull = new BoolPtr(false);
|
||||
BoolPtr isAll = new BoolPtr(false);
|
||||
Item res2 = PlanUtil.rebuildBoolSubQuery(item, 1, reBuild, needExecuteNull, isAll);
|
||||
if (res2 != null) {
|
||||
return res2;
|
||||
}
|
||||
|
||||
ItemFunc func = (ItemFunc) item;
|
||||
item.setWithSubQuery(false);
|
||||
Item itemTmp = item.cloneStruct();
|
||||
for (int index = 0; index < func.getArgCount(); index++) {
|
||||
Item arg = item.arguments().get(index);
|
||||
if (isExplain && !StringUtil.equalsIgnoreCase(tableName, arg.getTableName())) {
|
||||
itemTmp.arguments().set(index, new ItemString(NEED_REPLACE, itemTmp.getCharsetIndex()));
|
||||
itemTmp.setItemName(null);
|
||||
} else {
|
||||
int fieldIndex = getItemIndex(arg);
|
||||
if (fieldIndex >= 0) {
|
||||
itemTmp.arguments().set(index, valueItemList.get(fieldIndex));
|
||||
itemTmp.setItemName(null);
|
||||
}
|
||||
}
|
||||
}
|
||||
return itemTmp;
|
||||
} else if (item instanceof ItemCondAnd || item instanceof ItemCondOr) {
|
||||
item.setWithSubQuery(false);
|
||||
Item itemTmp = item.cloneStruct();
|
||||
for (int index = 0; index < item.getArgCount(); index++) {
|
||||
Item rebuildItem = rebuildUpdateItem(item.arguments().get(index), tableName);
|
||||
itemTmp.arguments().set(index, rebuildItem);
|
||||
itemTmp.setItemName(null);
|
||||
}
|
||||
return itemTmp;
|
||||
}
|
||||
return item;
|
||||
}
|
||||
|
||||
private int getItemIndex(Item valueItem) {
|
||||
for (int i = 0; i < fieldList.size(); i++) {
|
||||
if (fieldList.get(i).equals(valueItem)) {
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
@NotNull
|
||||
public RouteResultset buildRouteResultset() {
|
||||
this.visit();
|
||||
String sql = sqlBuilder.toString();
|
||||
LOGGER.debug("merge update——update sql:{}", sql);
|
||||
RouteResultset rrs = new RouteResultset(sql, ServerParse.UPDATE);
|
||||
rrs.setStatement(sql);
|
||||
rrs.setComplexSQL(true);
|
||||
return rrs;
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import com.actiontech.dble.net.Session;
|
||||
import com.actiontech.dble.net.connection.BackendConnection;
|
||||
import com.actiontech.dble.net.mysql.FieldPacket;
|
||||
import com.actiontech.dble.net.service.AbstractService;
|
||||
import com.actiontech.dble.route.RouteResultsetNode;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -35,6 +36,8 @@ public abstract class BaseDMLHandler implements DMLResponseHandler {
|
||||
protected Session session;
|
||||
protected AtomicBoolean terminate = new AtomicBoolean(false);
|
||||
protected List<DMLResponseHandler> merges;
|
||||
protected RouteResultsetNode rrss;
|
||||
|
||||
|
||||
public BaseDMLHandler(long id, Session session) {
|
||||
this.id = id;
|
||||
@@ -42,6 +45,17 @@ public abstract class BaseDMLHandler implements DMLResponseHandler {
|
||||
this.merges = new ArrayList<>();
|
||||
}
|
||||
|
||||
public BaseDMLHandler(long id, Session session, RouteResultsetNode rrss) {
|
||||
this.id = id;
|
||||
this.rrss = rrss;
|
||||
this.session = session;
|
||||
this.merges = new ArrayList<>();
|
||||
}
|
||||
|
||||
public RouteResultsetNode getRrss() {
|
||||
return rrss;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final BaseDMLHandler getNextHandler() {
|
||||
return this.nextHandler;
|
||||
|
||||
@@ -6,12 +6,43 @@
|
||||
package com.actiontech.dble.backend.mysql.nio.handler.query;
|
||||
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
|
||||
import com.actiontech.dble.plan.util.ComplexQueryPlanUtil;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface DMLResponseHandler extends ResponseHandler {
|
||||
enum HandlerType {
|
||||
TEMPTABLE, BASESEL, EASY_MERGE, MERGE_AND_ORDER, FAKE_MERGE, JOIN, NOT_IN, WHERE, GROUPBY, HAVING, ORDERBY, LIMIT, UNION, DISTINCT, SENDMAKER, FINAL, SCALAR_SUB_QUERY, IN_SUB_QUERY, ALL_ANY_SUB_QUERY, RENAME_FIELD, MANAGER_SENDMAKER
|
||||
TEMPTABLE, BASESEL, EASY_MERGE, MERGE_AND_ORDER, FAKE_MERGE, JOIN, NOT_IN, WHERE, GROUPBY, HAVING, ORDERBY, LIMIT, UNION, DISTINCT, SENDMAKER, FINAL, SCALAR_SUB_QUERY, IN_SUB_QUERY, ALL_ANY_SUB_QUERY, RENAME_FIELD, MANAGER_SENDMAKER, UPDATE_QUERY, BASE_UPDATE, EASY_MERGE_UPDATE, MERGE_UPDATE
|
||||
}
|
||||
|
||||
enum ExplainType {
|
||||
|
||||
AGGREGATE, DISTINCT, LIMIT, WHERE_FILTER, HAVING_FILTER, SHUFFLE_FIELD, UNION_ALL, ORDER, NOT_IN,
|
||||
INNER_FUNC_ADD, JOIN, DIRECT_GROUP, NEST_LOOP, IN_SUB_QUERY, ALL_ANY_SUB_QUERY, SCALAR_SUB_QUERY,
|
||||
RENAME_DERIVED_SUB_QUERY, WRITE_TO_CLIENT, HINT_NEST_LOOP,
|
||||
TYPE_UPDATE_SUB_QUERY(ComplexQueryPlanUtil.TYPE_UPDATE_SUB_QUERY), MERGE_UPDATE, OTHER;
|
||||
|
||||
private String content;
|
||||
|
||||
ExplainType(String content) {
|
||||
this.content = content;
|
||||
}
|
||||
|
||||
ExplainType() {
|
||||
}
|
||||
|
||||
public String getContent() {
|
||||
if (content == null || content.isEmpty()) {
|
||||
return super.name();
|
||||
} else {
|
||||
return content;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
default ExplainType explainType() {
|
||||
return ExplainType.OTHER;
|
||||
}
|
||||
|
||||
HandlerType type();
|
||||
|
||||
@@ -36,13 +36,12 @@ public class BaseSelectHandler extends BaseDMLHandler {
|
||||
|
||||
private final boolean autocommit;
|
||||
private volatile int fieldCounts = -1;
|
||||
private final RouteResultsetNode rrss;
|
||||
|
||||
private final NonBlockingSession serverSession;
|
||||
|
||||
public BaseSelectHandler(long id, RouteResultsetNode rrss, boolean autocommit, Session session) {
|
||||
super(id, session);
|
||||
super(id, session, rrss);
|
||||
serverSession = (NonBlockingSession) session;
|
||||
this.rrss = rrss;
|
||||
this.autocommit = autocommit;
|
||||
}
|
||||
|
||||
@@ -86,9 +85,6 @@ public class BaseSelectHandler extends BaseDMLHandler {
|
||||
service.executeMultiNode(rrss, serverSession.getShardingService(), autocommit);
|
||||
}
|
||||
|
||||
public RouteResultsetNode getRrss() {
|
||||
return rrss;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void okResponse(byte[] ok, @NotNull AbstractService service) {
|
||||
|
||||
@@ -0,0 +1,173 @@
|
||||
/*
|
||||
* Copyright (C) 2016-2022 ActionTech.
|
||||
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
|
||||
*/
|
||||
|
||||
package com.actiontech.dble.backend.mysql.nio.handler.query.impl;
|
||||
|
||||
import com.actiontech.dble.DbleServer;
|
||||
import com.actiontech.dble.backend.datasource.ShardingNode;
|
||||
import com.actiontech.dble.backend.mysql.CharsetUtil;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.BaseDMLHandler;
|
||||
import com.actiontech.dble.net.Session;
|
||||
import com.actiontech.dble.net.connection.BackendConnection;
|
||||
import com.actiontech.dble.net.mysql.ErrorPacket;
|
||||
import com.actiontech.dble.net.mysql.FieldPacket;
|
||||
import com.actiontech.dble.net.mysql.RowDataPacket;
|
||||
import com.actiontech.dble.net.service.AbstractService;
|
||||
import com.actiontech.dble.plan.common.exception.MySQLOutPutException;
|
||||
import com.actiontech.dble.route.RouteResultsetNode;
|
||||
import com.actiontech.dble.server.NonBlockingSession;
|
||||
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
|
||||
import com.actiontech.dble.singleton.TraceManager;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.List;
|
||||
|
||||
public class BaseUpdateHandler extends BaseDMLHandler {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(BaseUpdateHandler.class);
|
||||
|
||||
private final boolean autocommit;
|
||||
private final RouteResultsetNode rrss;
|
||||
private final NonBlockingSession serverSession;
|
||||
|
||||
public BaseUpdateHandler(long id, RouteResultsetNode rrss, boolean autocommit, Session session) {
|
||||
super(id, session, rrss);
|
||||
serverSession = (NonBlockingSession) session;
|
||||
this.rrss = rrss;
|
||||
this.autocommit = autocommit;
|
||||
}
|
||||
|
||||
public BackendConnection initConnection() throws Exception {
|
||||
if (serverSession.closed()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
BackendConnection exeConn = serverSession.getTarget(rrss);
|
||||
if (serverSession.tryExistsCon(exeConn, rrss)) {
|
||||
exeConn.getBackendService().setRowDataFlowing(true);
|
||||
exeConn.getBackendService().setResponseHandler(this);
|
||||
return exeConn;
|
||||
} else {
|
||||
ShardingNode dn = DbleServer.getInstance().getConfig().getShardingNodes().get(rrss.getName());
|
||||
//autocommit is serverSession.getWriteSource().isAutocommit() && !serverSession.getWriteSource().isTxStart()
|
||||
final BackendConnection newConn = dn.getConnection(dn.getDatabase(), autocommit, rrss);
|
||||
serverSession.bindConnection(rrss, newConn);
|
||||
newConn.getBackendService().setResponseHandler(this);
|
||||
newConn.getBackendService().setRowDataFlowing(true);
|
||||
return newConn;
|
||||
}
|
||||
}
|
||||
|
||||
public void execute(MySQLResponseService service) {
|
||||
TraceManager.crossThread(service, "base-sql-execute", serverSession.getShardingService());
|
||||
if (serverSession.closed()) {
|
||||
service.setRowDataFlowing(false);
|
||||
serverSession.clearResources(true);
|
||||
return;
|
||||
}
|
||||
service.setSession(serverSession);
|
||||
if (service.getConnection().isClosed()) {
|
||||
service.setRowDataFlowing(false);
|
||||
serverSession.onQueryError("failed or cancelled by other thread".getBytes());
|
||||
return;
|
||||
}
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug(service.toString() + " send sql:" + rrss.getStatement());
|
||||
}
|
||||
service.executeMultiNode(rrss, serverSession.getShardingService(), autocommit);
|
||||
}
|
||||
|
||||
public RouteResultsetNode getRrss() {
|
||||
return rrss;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void okResponse(byte[] ok, @NotNull AbstractService service) {
|
||||
LOGGER.debug("receive ok packet for sync context, service {}", service);
|
||||
if (terminate.get()) {
|
||||
return;
|
||||
}
|
||||
nextHandler.okResponse(ok, service);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacket> fieldPacketsNull, byte[] eof,
|
||||
boolean isLeft, @NotNull AbstractService service) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, @NotNull AbstractService conn) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rowEofResponse(byte[] data, boolean isLeft, @NotNull AbstractService service) {
|
||||
}
|
||||
|
||||
/**
|
||||
* 1. if some connection's thread status is await. 2. if some connection's
|
||||
* thread status is running.
|
||||
*/
|
||||
@Override
|
||||
public void connectionError(Throwable e, Object attachment) {
|
||||
if (terminate.get())
|
||||
return;
|
||||
String errMsg;
|
||||
if (e instanceof MySQLOutPutException) {
|
||||
errMsg = e.getMessage() == null ? e.toString() : e.getMessage();
|
||||
} else if (e instanceof NullPointerException) {
|
||||
errMsg = e.getMessage() == null ? e.toString() : e.getMessage();
|
||||
} else {
|
||||
RouteResultsetNode node = (RouteResultsetNode) attachment;
|
||||
errMsg = "can't connect to shardingNode[" + node.getName() + "],due to " + e.getMessage();
|
||||
}
|
||||
LOGGER.warn(errMsg, e);
|
||||
serverSession.onQueryError(errMsg.getBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectionClose(@NotNull AbstractService service, String reason) {
|
||||
if (terminate.get())
|
||||
return;
|
||||
LOGGER.warn(service.toString() + "|connectionClose()|" + reason);
|
||||
reason = "Connection {dbInstance[" + service.getConnection().getHost() + ":" + service.getConnection().getPort() + "],Schema[" + ((MySQLResponseService) service).getConnection().getSchema() + "],threadID[" +
|
||||
((BackendConnection) service.getConnection()).getThreadId() + "]} was closed ,reason is [" + reason + "]";
|
||||
serverSession.onQueryError(reason.getBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void errorResponse(byte[] err, @NotNull AbstractService service) {
|
||||
ErrorPacket errPacket = new ErrorPacket();
|
||||
errPacket.read(err);
|
||||
String errMsg;
|
||||
try {
|
||||
errMsg = new String(errPacket.getMessage(), CharsetUtil.getJavaCharset(service.getCharset().getResults()));
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
errMsg = "UnsupportedEncodingException:" + service.getCharset();
|
||||
}
|
||||
LOGGER.info(service.toString() + errMsg);
|
||||
if (terminate.get())
|
||||
return;
|
||||
serverSession.onQueryError(errMsg.getBytes());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void onTerminate() {
|
||||
if (autocommit && !serverSession.getShardingService().isLockTable()) {
|
||||
this.serverSession.releaseConnection(rrss, false);
|
||||
} else {
|
||||
//the connection should wait until the connection running finish
|
||||
this.serverSession.waitFinishConnection(rrss);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public HandlerType type() {
|
||||
return HandlerType.BASE_UPDATE;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -192,4 +192,9 @@ public class DelayTableHandler extends BaseDMLHandler {
|
||||
public void setTableAlias(String tableAlias) {
|
||||
this.tableAlias = tableAlias;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.HINT_NEST_LOOP;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -116,4 +116,9 @@ public class DistinctHandler extends BaseDMLHandler {
|
||||
this.localResult.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.DISTINCT;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -91,4 +91,9 @@ public class HavingHandler extends BaseDMLHandler {
|
||||
@Override
|
||||
public void onTerminate() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.HAVING_FILTER;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,4 +75,9 @@ public class LimitHandler extends BaseDMLHandler {
|
||||
@Override
|
||||
protected void onTerminate() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.LIMIT;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
package com.actiontech.dble.backend.mysql.nio.handler.query.impl;
|
||||
|
||||
import com.actiontech.dble.DbleServer;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.BaseDMLHandler;
|
||||
import com.actiontech.dble.net.connection.BackendConnection;
|
||||
import com.actiontech.dble.net.mysql.FieldPacket;
|
||||
import com.actiontech.dble.net.mysql.RowDataPacket;
|
||||
@@ -30,7 +31,7 @@ public class MultiNodeEasyMergeHandler extends MultiNodeMergeHandler {
|
||||
private Set<String> globalBackNodes;
|
||||
|
||||
public MultiNodeEasyMergeHandler(long id, RouteResultsetNode[] route, boolean autocommit, NonBlockingSession session, Set<String> globalBackNodes) {
|
||||
super(id, route, autocommit, session);
|
||||
super(id, route, autocommit, session, true);
|
||||
this.merges.add(this);
|
||||
this.globalBackNodes = globalBackNodes;
|
||||
}
|
||||
@@ -55,15 +56,18 @@ public class MultiNodeEasyMergeHandler extends MultiNodeMergeHandler {
|
||||
}
|
||||
|
||||
private void doExecute() {
|
||||
for (BaseSelectHandler exeHandler : exeHandlers) {
|
||||
session.setHandlerStart(exeHandler); //base start execute
|
||||
try {
|
||||
BackendConnection exeConn = exeHandler.initConnection();
|
||||
exeConn.getBackendService().setComplexQuery(true);
|
||||
exeHandler.execute(exeConn.getBackendService());
|
||||
} catch (Exception e) {
|
||||
exeHandler.connectionError(e, exeHandler.getRrss());
|
||||
return;
|
||||
for (BaseDMLHandler exeHandler : exeHandlers) {
|
||||
if (exeHandler instanceof BaseSelectHandler) {
|
||||
BaseSelectHandler baseSelectHandler = (BaseSelectHandler) exeHandler;
|
||||
session.setHandlerStart(baseSelectHandler); //base start execute
|
||||
try {
|
||||
BackendConnection exeConn = baseSelectHandler.initConnection();
|
||||
exeConn.getBackendService().setComplexQuery(true);
|
||||
baseSelectHandler.execute(exeConn.getBackendService());
|
||||
} catch (Exception e) {
|
||||
baseSelectHandler.connectionError(e, baseSelectHandler.getRrss());
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
package com.actiontech.dble.backend.mysql.nio.handler.query.impl;
|
||||
|
||||
import com.actiontech.dble.DbleServer;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.BaseDMLHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.util.ArrayMinHeap;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.util.HeapItem;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.util.RowDataComparator;
|
||||
@@ -47,7 +48,7 @@ public class MultiNodeMergeAndOrderHandler extends MultiNodeMergeHandler {
|
||||
|
||||
public MultiNodeMergeAndOrderHandler(long id, RouteResultsetNode[] route, boolean autocommit, NonBlockingSession session,
|
||||
List<Order> orderBys, boolean nestLoopDependOn) {
|
||||
super(id, route, autocommit, session);
|
||||
super(id, route, autocommit, session, true);
|
||||
this.orderBys = orderBys;
|
||||
this.queueSize = SystemConfig.getInstance().getMergeQueueSize();
|
||||
this.queues = new ConcurrentHashMap<>();
|
||||
@@ -76,16 +77,19 @@ public class MultiNodeMergeAndOrderHandler extends MultiNodeMergeHandler {
|
||||
}
|
||||
|
||||
private void doExecute() {
|
||||
for (BaseSelectHandler exeHandler : exeHandlers) {
|
||||
session.setHandlerStart(exeHandler); //base start execute
|
||||
try {
|
||||
BackendConnection exeConn = exeHandler.initConnection();
|
||||
exeConn.getBackendService().setComplexQuery(true);
|
||||
queues.put(exeConn.getBackendService(), new LinkedBlockingQueue<>(queueSize));
|
||||
exeHandler.execute(exeConn.getBackendService());
|
||||
} catch (Exception e) {
|
||||
exeHandler.connectionError(e, exeHandler.getRrss());
|
||||
return;
|
||||
for (BaseDMLHandler exeHandler : exeHandlers) {
|
||||
if (exeHandler instanceof BaseSelectHandler) {
|
||||
BaseSelectHandler baseSelectHandler = (BaseSelectHandler) exeHandler;
|
||||
session.setHandlerStart(baseSelectHandler); //base start execute
|
||||
try {
|
||||
BackendConnection exeConn = baseSelectHandler.initConnection();
|
||||
exeConn.getBackendService().setComplexQuery(true);
|
||||
queues.put(exeConn.getBackendService(), new LinkedBlockingQueue<>(queueSize));
|
||||
baseSelectHandler.execute(exeConn.getBackendService());
|
||||
} catch (Exception e) {
|
||||
baseSelectHandler.connectionError(e, baseSelectHandler.getRrss());
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
|
||||
package com.actiontech.dble.backend.mysql.nio.handler.query.impl;
|
||||
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.BaseDMLHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.OwnThreadDMLHandler;
|
||||
import com.actiontech.dble.config.ErrorCode;
|
||||
@@ -26,12 +27,12 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
public abstract class MultiNodeMergeHandler extends OwnThreadDMLHandler {
|
||||
|
||||
protected final ReentrantLock lock;
|
||||
final List<BaseSelectHandler> exeHandlers;
|
||||
final List<BaseDMLHandler> exeHandlers;
|
||||
protected RouteResultsetNode[] route;
|
||||
int reachedConCount = 0;
|
||||
private Set<String> dependencies;
|
||||
|
||||
public MultiNodeMergeHandler(long id, RouteResultsetNode[] route, boolean autocommit, Session session) {
|
||||
public MultiNodeMergeHandler(long id, RouteResultsetNode[] route, boolean autocommit, Session session, boolean isSelect) {
|
||||
super(id, session);
|
||||
this.exeHandlers = new ArrayList<>();
|
||||
dependencies = new HashSet<>();
|
||||
@@ -39,8 +40,14 @@ public abstract class MultiNodeMergeHandler extends OwnThreadDMLHandler {
|
||||
if (route.length == 0)
|
||||
throw new MySQLOutPutException(ErrorCode.ER_QUERYHANDLER, "", "can not execute empty rrss!");
|
||||
for (RouteResultsetNode rrss : route) {
|
||||
BaseSelectHandler exeHandler = new BaseSelectHandler(id, rrss, autocommit, session);
|
||||
exeHandler.setNextHandler(this);
|
||||
BaseDMLHandler exeHandler;
|
||||
if (isSelect) {
|
||||
exeHandler = new BaseSelectHandler(id, rrss, autocommit, session);
|
||||
exeHandler.setNextHandler(this);
|
||||
} else {
|
||||
exeHandler = new BaseUpdateHandler(id, rrss, autocommit, session);
|
||||
exeHandler.setNextHandler(this);
|
||||
}
|
||||
this.exeHandlers.add(exeHandler);
|
||||
}
|
||||
this.route = route;
|
||||
@@ -55,13 +62,13 @@ public abstract class MultiNodeMergeHandler extends OwnThreadDMLHandler {
|
||||
|
||||
public abstract void execute() throws Exception;
|
||||
|
||||
public List<BaseSelectHandler> getExeHandlers() {
|
||||
public List<BaseDMLHandler> getExeHandlers() {
|
||||
return exeHandlers;
|
||||
}
|
||||
|
||||
protected void recycleConn() {
|
||||
synchronized (exeHandlers) {
|
||||
for (BaseSelectHandler exeHandler : exeHandlers) {
|
||||
for (BaseDMLHandler exeHandler : exeHandlers) {
|
||||
terminatePreHandler(exeHandler);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,128 @@
|
||||
/*
|
||||
* Copyright (C) 2016-2022 ActionTech.
|
||||
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
|
||||
*/
|
||||
|
||||
package com.actiontech.dble.backend.mysql.nio.handler.query.impl;
|
||||
|
||||
import com.actiontech.dble.DbleServer;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.BaseDMLHandler;
|
||||
import com.actiontech.dble.net.Session;
|
||||
import com.actiontech.dble.net.connection.BackendConnection;
|
||||
import com.actiontech.dble.net.mysql.FieldPacket;
|
||||
import com.actiontech.dble.net.mysql.OkPacket;
|
||||
import com.actiontech.dble.net.mysql.RowDataPacket;
|
||||
import com.actiontech.dble.net.service.AbstractService;
|
||||
import com.actiontech.dble.route.RouteResultsetNode;
|
||||
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.List;
|
||||
|
||||
public class MultiNodeUpdateHandler extends MultiNodeMergeHandler {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeUpdateHandler.class);
|
||||
private int rowEndConCount = 0;
|
||||
private long affectedRows;
|
||||
|
||||
public MultiNodeUpdateHandler(long id, Session session, RouteResultsetNode[] route, boolean autocommit) {
|
||||
super(id, route, autocommit, session, false);
|
||||
this.merges.add(this);
|
||||
}
|
||||
|
||||
|
||||
public void execute() {
|
||||
synchronized (exeHandlers) {
|
||||
if (terminate.get())
|
||||
return;
|
||||
|
||||
if (Thread.currentThread().getName().contains("complexQueryExecutor")) {
|
||||
doExecute();
|
||||
} else {
|
||||
DbleServer.getInstance().getComplexQueryExecutor().execute(() -> doExecute());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void doExecute() {
|
||||
for (BaseDMLHandler exeHandler : exeHandlers) {
|
||||
if (exeHandler instanceof BaseUpdateHandler) {
|
||||
BaseUpdateHandler baseUpdateHandler = (BaseUpdateHandler) exeHandler;
|
||||
session.setHandlerStart(baseUpdateHandler); //base start execute
|
||||
try {
|
||||
BackendConnection exeConn = baseUpdateHandler.initConnection();
|
||||
exeConn.getBackendService().setComplexQuery(true);
|
||||
baseUpdateHandler.execute(exeConn.getBackendService());
|
||||
} catch (Exception e) {
|
||||
baseUpdateHandler.connectionError(e, baseUpdateHandler.getRrss());
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void okResponse(byte[] ok, @NotNull AbstractService service) {
|
||||
if (this.terminate.get())
|
||||
return;
|
||||
startEasyMerge();
|
||||
boolean executeResponse = ((MySQLResponseService) service).syncAndExecute();
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("received ok response ,executeResponse:" + executeResponse + " from " + service);
|
||||
}
|
||||
|
||||
if (executeResponse) {
|
||||
OkPacket okPacket = new OkPacket();
|
||||
okPacket.read(ok);
|
||||
lock.lock();
|
||||
try {
|
||||
affectedRows += okPacket.getAffectedRows();
|
||||
((MySQLResponseService) service).backendSpecialCleanUp();
|
||||
if (++rowEndConCount != route.length) {
|
||||
return;
|
||||
}
|
||||
okPacket.setAffectedRows(affectedRows);
|
||||
nextHandler.okResponse(okPacket.toBytes(), service);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void ownThreadJob(Object... objects) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void terminateThread() throws Exception {
|
||||
recycleConn();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void recycleResources() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public HandlerType type() {
|
||||
return HandlerType.EASY_MERGE_UPDATE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacket> fieldPackets, byte[] eof, boolean isLeft, @Nonnull AbstractService service) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean rowResponse(byte[] rowNull, RowDataPacket rowPacket, boolean isLeft, @Nonnull AbstractService service) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rowEofResponse(byte[] eof, boolean isLeft, @Nonnull AbstractService service) {
|
||||
|
||||
}
|
||||
}
|
||||
@@ -160,4 +160,9 @@ public class OrderByHandler extends OwnThreadDMLHandler {
|
||||
this.localResult.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.ORDER;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -282,4 +282,9 @@ public class OutputHandler extends BaseDMLHandler {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.WRITE_TO_CLIENT;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ import java.util.List;
|
||||
public class RenameFieldHandler extends BaseDMLHandler {
|
||||
private String alias;
|
||||
private PlanNode.PlanNodeType childType;
|
||||
|
||||
public RenameFieldHandler(long id, Session session, String alias, PlanNode.PlanNodeType childType) {
|
||||
super(id, session);
|
||||
this.alias = alias;
|
||||
@@ -58,4 +59,9 @@ public class RenameFieldHandler extends BaseDMLHandler {
|
||||
@Override
|
||||
protected void onTerminate() throws Exception {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.RENAME_DERIVED_SUB_QUERY;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -151,4 +151,13 @@ public class SendMakeHandler extends BaseDMLHandler {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.SHUFFLE_FIELD;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void okResponse(byte[] ok, @NotNull AbstractService service) {
|
||||
nextHandler.okResponse(ok, service);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -191,4 +191,9 @@ public class TempTableHandler extends BaseDMLHandler {
|
||||
return HandlerType.TEMPTABLE;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.NEST_LOOP;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -172,4 +172,9 @@ public class UnionHandler extends BaseDMLHandler {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.UNION_ALL;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -79,5 +79,9 @@ public class WhereHandler extends BaseDMLHandler {
|
||||
public void onTerminate() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.WHERE_FILTER;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -0,0 +1,96 @@
|
||||
/*
|
||||
* Copyright (C) 2016-2022 ActionTech.
|
||||
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
|
||||
*/
|
||||
|
||||
package com.actiontech.dble.backend.mysql.nio.handler.query.impl.foreach;
|
||||
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.BaseDMLHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.MultiNodeUpdateHandler;
|
||||
import com.actiontech.dble.net.Session;
|
||||
import com.actiontech.dble.net.mysql.FieldPacket;
|
||||
|
||||
import com.actiontech.dble.net.mysql.OkPacket;
|
||||
import com.actiontech.dble.net.mysql.RowDataPacket;
|
||||
import com.actiontech.dble.net.service.AbstractService;
|
||||
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
public class MergeUpdateHandler extends BaseDMLHandler {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeUpdateHandler.class);
|
||||
private final long row;
|
||||
private long rowCount = 0;
|
||||
private long affectedRows;
|
||||
protected final ReentrantLock lock;
|
||||
|
||||
public MergeUpdateHandler(long id, Session session, long row) {
|
||||
super(id, session);
|
||||
this.row = row;
|
||||
this.lock = new ReentrantLock();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void onTerminate() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void okResponse(byte[] ok, @NotNull AbstractService service) {
|
||||
if (this.terminate.get())
|
||||
return;
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("received ok response ,from " + service);
|
||||
}
|
||||
|
||||
OkPacket okPacket = new OkPacket();
|
||||
okPacket.read(ok);
|
||||
lock.lock();
|
||||
try {
|
||||
affectedRows += okPacket.getAffectedRows();
|
||||
if (++rowCount != row) {
|
||||
return;
|
||||
}
|
||||
|
||||
okPacket.setAffectedRows(affectedRows);
|
||||
((MySQLResponseService) service).getSession().setRowCount(affectedRows);
|
||||
okPacket.setServerStatus(((MySQLResponseService) service).getSession().getShardingService().isAutocommit() ? 2 : 1);
|
||||
okPacket.setMessage(null);
|
||||
nextHandler.okResponse(okPacket.toBytes(), service);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacket> fieldPackets, byte[] eof, boolean isLeft, @Nonnull AbstractService service) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean rowResponse(byte[] rowNull, RowDataPacket rowPacket, boolean isLeft, @Nonnull AbstractService service) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void rowEofResponse(byte[] eof, boolean isLeft, @Nonnull AbstractService service) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public HandlerType type() {
|
||||
return HandlerType.MERGE_UPDATE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.MERGE_UPDATE;
|
||||
}
|
||||
}
|
||||
@@ -248,4 +248,9 @@ public class AggregateHandler extends BaseDMLHandler {
|
||||
store.close();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.AGGREGATE;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -329,4 +329,9 @@ public class DirectGroupByHandler extends OwnThreadDMLHandler {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.DIRECT_GROUP;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -410,4 +410,9 @@ public class JoinHandler extends OwnThreadDMLHandler {
|
||||
public void setNestLoopDependOn(boolean nestLoopDependOn) {
|
||||
this.nestLoopDependOn = nestLoopDependOn;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.JOIN;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,4 +19,9 @@ public class JoinInnerHandler extends JoinHandler {
|
||||
public JoinInnerHandler(long id, NonBlockingSession session, boolean isLeftJoin, List<Order> leftOrder, List<Order> rightOrder, Item otherJoinOn) {
|
||||
super(id, session, isLeftJoin, leftOrder, rightOrder, otherJoinOn);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.INNER_FUNC_ADD;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -265,4 +265,9 @@ public class NotInHandler extends OwnThreadDMLHandler {
|
||||
local = deque.poll();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.NOT_IN;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -194,4 +194,8 @@ public class AllAnySubQueryHandler extends SubQueryHandler {
|
||||
itemSubQuery.getValue().clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.ALL_ANY_SUB_QUERY;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ public class InSubQueryHandler extends SubQueryHandler {
|
||||
private int rowCount = 0;
|
||||
private Field sourceField;
|
||||
private ItemInSubQuery itemSubQuery;
|
||||
|
||||
public InSubQueryHandler(long id, Session session, ItemInSubQuery itemSubQuery, boolean isExplain) {
|
||||
super(id, session);
|
||||
this.itemSubQuery = itemSubQuery;
|
||||
@@ -121,4 +122,9 @@ public class InSubQueryHandler extends SubQueryHandler {
|
||||
itemSubQuery.getValue().clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.IN_SUB_QUERY;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ public class SingleRowSubQueryHandler extends SubQueryHandler {
|
||||
private int rowCount = 0;
|
||||
private Field sourceField;
|
||||
private ItemSingleRowSubQuery itemSubQuery;
|
||||
|
||||
public SingleRowSubQueryHandler(long id, Session session, ItemSingleRowSubQuery itemSubQuery, boolean isExplain) {
|
||||
super(id, session);
|
||||
this.itemSubQuery = itemSubQuery;
|
||||
@@ -113,4 +114,8 @@ public class SingleRowSubQueryHandler extends SubQueryHandler {
|
||||
itemSubQuery.setValue(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.SCALAR_SUB_QUERY;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,146 @@
|
||||
/*
|
||||
* Copyright (C) 2016-2022 ActionTech.
|
||||
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
|
||||
*/
|
||||
|
||||
package com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery;
|
||||
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.util.HandlerTool;
|
||||
import com.actiontech.dble.config.ErrorCode;
|
||||
import com.actiontech.dble.config.model.SystemConfig;
|
||||
import com.actiontech.dble.net.Session;
|
||||
import com.actiontech.dble.net.mysql.FieldPacket;
|
||||
import com.actiontech.dble.net.mysql.RowDataPacket;
|
||||
import com.actiontech.dble.net.service.AbstractService;
|
||||
import com.actiontech.dble.plan.common.field.Field;
|
||||
import com.actiontech.dble.plan.common.item.Item;
|
||||
import com.actiontech.dble.plan.common.item.ItemNull;
|
||||
import com.actiontech.dble.plan.common.item.ItemString;
|
||||
import com.actiontech.dble.plan.common.item.subquery.UpdateItemSubQuery;
|
||||
import com.actiontech.dble.plan.node.ManagerTableNode;
|
||||
import com.actiontech.dble.plan.node.PlanNode;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
public class UpdateSubQueryHandler extends SubQueryHandler {
|
||||
|
||||
public static final String NEED_REPLACE = "{CHILD}";
|
||||
private long maxRowsSize;
|
||||
private long rowCount = 0;
|
||||
private List<Field> sourceFieldList;
|
||||
private UpdateItemSubQuery itemSubQuery;
|
||||
|
||||
public UpdateSubQueryHandler(long id, Session session, UpdateItemSubQuery itemSubQuery, boolean isExplain) {
|
||||
super(id, session);
|
||||
this.itemSubQuery = itemSubQuery;
|
||||
this.maxRowsSize = SystemConfig.getInstance().getQueryForUpdateMaxRowsSize();
|
||||
if (isExplain) {
|
||||
setForExplain();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fieldEofResponse(byte[] headerNull, List<byte[]> fieldsNull, List<FieldPacket> fieldPackets,
|
||||
byte[] eofNull, boolean isLeft, @NotNull AbstractService service) {
|
||||
session.setHandlerStart(this);
|
||||
if (terminate.get()) {
|
||||
return;
|
||||
}
|
||||
lock.lock();
|
||||
try {
|
||||
// create field for first time
|
||||
if (this.fieldPackets.isEmpty()) {
|
||||
this.fieldPackets = fieldPackets;
|
||||
sourceFieldList = HandlerTool.createFields(this.fieldPackets);
|
||||
|
||||
int i = 0;
|
||||
for (Item item : itemSubQuery.getSelect()) {
|
||||
PlanNode planNode = itemSubQuery.getPlanNode();
|
||||
if (!(planNode instanceof ManagerTableNode) || ((ManagerTableNode) planNode).isNeedSendMaker()) {
|
||||
item.setPushDownName(item.getAlias());
|
||||
}
|
||||
item.setTableName(sourceFieldList.get(i).getTable());
|
||||
Item tmpItem = HandlerTool.createItem(item, Collections.singletonList(sourceFieldList.get(i)), 0, isAllPushDown(), type());
|
||||
itemSubQuery.getField().add(tmpItem);
|
||||
i++;
|
||||
}
|
||||
itemSubQuery.setSelect(itemSubQuery.getField());
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean rowResponse(byte[] rowNull, RowDataPacket rowPacket, boolean isLeft, @NotNull AbstractService service) {
|
||||
lock.lock();
|
||||
try {
|
||||
if (terminate.get()) {
|
||||
return true;
|
||||
}
|
||||
if (++rowCount > maxRowsSize) {
|
||||
String errMessage = "update involves too many rows in query,the maximum number of rows allowed is " + maxRowsSize;
|
||||
LOGGER.info(errMessage);
|
||||
genErrorPackage(ErrorCode.ER_UNKNOWN_ERROR, errMessage);
|
||||
service.getConnection().close(errMessage);
|
||||
try {
|
||||
tempDoneCallBack.call();
|
||||
} catch (Exception callback) {
|
||||
LOGGER.info("callback exception!", callback);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
RowDataPacket row = rowPacket;
|
||||
if (row == null) {
|
||||
row = new RowDataPacket(this.fieldPackets.size());
|
||||
row.read(rowNull);
|
||||
}
|
||||
int i = 0;
|
||||
for (byte[] fieldValue : row.getFieldValues()) {
|
||||
sourceFieldList.get(i).setPtr(fieldValue);
|
||||
i++;
|
||||
}
|
||||
List<Item> valueList = Lists.newArrayList();
|
||||
for (Item item : itemSubQuery.getField()) {
|
||||
if (item != null) {
|
||||
Item resultItem = item.getResultItem();
|
||||
valueList.add(resultItem == null ? new ItemNull() : resultItem);
|
||||
}
|
||||
}
|
||||
itemSubQuery.getValue().add(valueList);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HandlerType type() {
|
||||
return HandlerType.UPDATE_QUERY;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setForExplain() {
|
||||
List<Item> valueItemList = Lists.newArrayList();
|
||||
for (Item ignored : itemSubQuery.getSelect()) {
|
||||
valueItemList.add(new ItemString(NEED_REPLACE, itemSubQuery.getCharsetIndex()));
|
||||
}
|
||||
itemSubQuery.getValue().add(valueItemList);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void clearForExplain() {
|
||||
itemSubQuery.getValue().clear();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ExplainType explainType() {
|
||||
return ExplainType.TYPE_UPDATE_SUB_QUERY;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -255,6 +255,9 @@ public final class SystemConfig {
|
||||
//unit: ms
|
||||
private long releaseTimeout = 10 * 60 * 1000L;
|
||||
|
||||
//maximum number of rows in select result set in multi-table update
|
||||
private long queryForUpdateMaxRowsSize = 20000;
|
||||
|
||||
public int getEnableAsyncRelease() {
|
||||
return enableAsyncRelease;
|
||||
}
|
||||
@@ -1840,6 +1843,14 @@ public final class SystemConfig {
|
||||
this.supportSSL = supportSSL;
|
||||
}
|
||||
|
||||
public long getQueryForUpdateMaxRowsSize() {
|
||||
return queryForUpdateMaxRowsSize;
|
||||
}
|
||||
|
||||
public void setQueryForUpdateMaxRowsSize(long queryForUpdateMaxRowsSize) {
|
||||
this.queryForUpdateMaxRowsSize = queryForUpdateMaxRowsSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SystemConfig [" +
|
||||
@@ -1968,6 +1979,7 @@ public final class SystemConfig {
|
||||
", sqlDumpLogSizeBasedRotate='" + sqlDumpLogSizeBasedRotate + '\'' +
|
||||
", sqlDumpLogTimeBasedRotate=" + sqlDumpLogTimeBasedRotate +
|
||||
", sqlDumpLogDeleteFileAge='" + sqlDumpLogDeleteFileAge + '\'' +
|
||||
", queryForUpdateMaxRowsSize=" + queryForUpdateMaxRowsSize +
|
||||
"]";
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,11 @@
|
||||
|
||||
package com.actiontech.dble.config.model.sharding.table;
|
||||
|
||||
public class ERTable {
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
public class ERTable implements Comparable<ERTable> {
|
||||
private final String table;
|
||||
private final String column;
|
||||
private final String schema;
|
||||
@@ -65,4 +69,11 @@ public class ERTable {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(@NotNull ERTable o) {
|
||||
return Comparator.comparing(ERTable::getSchema)
|
||||
.thenComparing(ERTable::getTable)
|
||||
.compare(this, o);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -995,6 +995,10 @@ public abstract class Item {
|
||||
return null;
|
||||
}
|
||||
|
||||
public void setTableName(String tableName) {
|
||||
|
||||
}
|
||||
|
||||
public String getDbName() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@@ -218,7 +218,9 @@ public class ItemField extends ItemIdent {
|
||||
return tableName;
|
||||
}
|
||||
|
||||
|
||||
public void setTableName(String tableName) {
|
||||
this.tableName = tableName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean fixFields() {
|
||||
|
||||
@@ -0,0 +1,94 @@
|
||||
/*
|
||||
* Copyright (C) 2016-2022 ActionTech.
|
||||
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
|
||||
*/
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
package com.actiontech.dble.plan.common.item.subquery;
|
||||
|
||||
import com.actiontech.dble.config.ErrorCode;
|
||||
import com.actiontech.dble.meta.ProxyMetaManager;
|
||||
import com.actiontech.dble.plan.common.exception.MySQLOutPutException;
|
||||
import com.actiontech.dble.plan.common.item.Item;
|
||||
import com.actiontech.dble.plan.common.time.MySQLTime;
|
||||
import com.actiontech.dble.plan.optimizer.HintPlanInfo;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLSelectQuery;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.math.BigDecimal;
|
||||
import java.math.BigInteger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public abstract class ItemMultiColumnARowSubQuery extends ItemSubQuery {
|
||||
|
||||
protected List<List<Item>> value = new ArrayList<>();
|
||||
protected List<Item> field = new ArrayList<>();
|
||||
protected List<Item> select;
|
||||
|
||||
/**
|
||||
* @param currentDb
|
||||
* @param query
|
||||
*/
|
||||
public ItemMultiColumnARowSubQuery(String currentDb, SQLSelectQuery query, ProxyMetaManager metaManager, Map<String, String> usrVariables, int charsetIndex, @Nullable HintPlanInfo hintPlanInfo) {
|
||||
super(currentDb, query, metaManager, usrVariables, charsetIndex, hintPlanInfo);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fixLengthAndDec() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public BigDecimal valReal() {
|
||||
throw new MySQLOutPutException(ErrorCode.ER_OPTIMIZER, "", "not support yet!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public BigInteger valInt() {
|
||||
throw new MySQLOutPutException(ErrorCode.ER_OPTIMIZER, "", "not support yet!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String valStr() {
|
||||
throw new MySQLOutPutException(ErrorCode.ER_OPTIMIZER, "", "not support yet!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public BigDecimal valDecimal() {
|
||||
throw new MySQLOutPutException(ErrorCode.ER_OPTIMIZER, "", "not support yet!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getDate(MySQLTime ltime, long fuzzydate) {
|
||||
throw new MySQLOutPutException(ErrorCode.ER_OPTIMIZER, "", "not support yet!");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getTime(MySQLTime ltime) {
|
||||
throw new MySQLOutPutException(ErrorCode.ER_OPTIMIZER, "", "not support yet!");
|
||||
}
|
||||
|
||||
public List<Item> getSelect() {
|
||||
return select;
|
||||
}
|
||||
|
||||
public void setSelect(List<Item> select) {
|
||||
this.select = select;
|
||||
}
|
||||
|
||||
public List<List<Item>> getValue() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public List<Item> getField() {
|
||||
return field;
|
||||
}
|
||||
|
||||
public void setField(List<Item> field) {
|
||||
this.field = field;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -41,7 +41,9 @@ public abstract class ItemSubQuery extends ItemResultField {
|
||||
this.metaManager = metaManager;
|
||||
this.usrVariables = usrVariables;
|
||||
this.hintPlanInfo = hintPlanInfo;
|
||||
init();
|
||||
if (query != null) {
|
||||
init();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
/*
|
||||
* Copyright (C) 2016-2022 ActionTech.
|
||||
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
|
||||
*/
|
||||
|
||||
package com.actiontech.dble.plan.common.item.subquery;
|
||||
|
||||
import com.actiontech.dble.meta.ProxyMetaManager;
|
||||
import com.actiontech.dble.plan.common.context.NameResolutionContext;
|
||||
import com.actiontech.dble.plan.common.context.ReferContext;
|
||||
import com.actiontech.dble.plan.common.field.Field;
|
||||
import com.actiontech.dble.plan.common.item.Item;
|
||||
import com.actiontech.dble.plan.node.PlanNode;
|
||||
import com.actiontech.dble.plan.optimizer.HintPlanInfo;
|
||||
import com.alibaba.druid.sql.ast.SQLExpr;
|
||||
import com.alibaba.druid.sql.ast.expr.SQLInSubQueryExpr;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLSelect;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLSelectQuery;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class UpdateItemSubQuery extends ItemMultiColumnARowSubQuery {
|
||||
private final boolean isNeg;
|
||||
protected Item leftOperand;
|
||||
private PlanNode queryNode;
|
||||
|
||||
public UpdateItemSubQuery(String currentDb, SQLSelectQuery query, Item leftOperand, boolean isNeg, ProxyMetaManager metaManager, Map<String, String> usrVariables, int charsetIndex, @Nullable HintPlanInfo hintPlanInfo) {
|
||||
super(currentDb, query, metaManager, usrVariables, charsetIndex, hintPlanInfo);
|
||||
this.leftOperand = leftOperand;
|
||||
this.isNeg = isNeg;
|
||||
this.charsetIndex = charsetIndex;
|
||||
this.select = this.planNode.getColumnsSelected();
|
||||
}
|
||||
|
||||
public Item fixFields(NameResolutionContext context) {
|
||||
super.fixFields(context);
|
||||
leftOperand = leftOperand.fixFields(context);
|
||||
getReferTables().addAll(leftOperand.getReferTables());
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* added to construct all refers in an item
|
||||
*
|
||||
* @param context
|
||||
*/
|
||||
public void fixRefer(ReferContext context) {
|
||||
super.fixRefer(context);
|
||||
leftOperand.fixRefer(context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SubSelectType subType() {
|
||||
return SubSelectType.IN_SUBS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SQLExpr toExpression() {
|
||||
SQLExpr expr = leftOperand.toExpression();
|
||||
SQLSelect sqlSelect = new SQLSelect(query);
|
||||
SQLInSubQueryExpr inSub = new SQLInSubQueryExpr(sqlSelect);
|
||||
inSub.setExpr(expr);
|
||||
inSub.setNot(isNeg);
|
||||
return inSub;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Item cloneStruct(boolean forCalculate, List<Item> calArgs, boolean isPushDown, List<Field> fields) {
|
||||
UpdateItemSubQuery cloneItem = new UpdateItemSubQuery(this.currentDb, this.query, this.leftOperand.cloneStruct(), this.isNeg, this.metaManager, this.usrVariables, this.charsetIndex, this.hintPlanInfo);
|
||||
cloneItem.value = this.value;
|
||||
return cloneItem;
|
||||
}
|
||||
|
||||
public PlanNode getQueryNode() {
|
||||
return queryNode;
|
||||
}
|
||||
|
||||
public void setQueryNode(PlanNode queryNode) {
|
||||
this.queryNode = queryNode;
|
||||
}
|
||||
|
||||
public boolean isNeg() {
|
||||
return isNeg;
|
||||
}
|
||||
}
|
||||
|
||||
159
src/main/java/com/actiontech/dble/plan/node/ModifyNode.java
Normal file
159
src/main/java/com/actiontech/dble/plan/node/ModifyNode.java
Normal file
@@ -0,0 +1,159 @@
|
||||
/*
|
||||
* Copyright (C) 2016-2022 ActionTech.
|
||||
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
|
||||
*/
|
||||
|
||||
package com.actiontech.dble.plan.node;
|
||||
|
||||
import com.actiontech.dble.config.ErrorCode;
|
||||
import com.actiontech.dble.plan.NamedField;
|
||||
import com.actiontech.dble.plan.common.exception.MySQLOutPutException;
|
||||
import com.actiontech.dble.plan.common.item.Item;
|
||||
import com.actiontech.dble.plan.common.item.function.operator.cmpfunc.ItemFuncEqual;
|
||||
import com.actiontech.dble.plan.common.item.function.sumfunc.ItemFuncGroupConcat;
|
||||
import com.actiontech.dble.plan.util.ToStringUtil;
|
||||
import com.actiontech.dble.route.parser.druid.RouteTableConfigInfo;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class ModifyNode extends PlanNode {
|
||||
|
||||
/**
|
||||
* filter in where
|
||||
*/
|
||||
List<ItemFuncEqual> setItemList = new ArrayList<>();
|
||||
|
||||
public PlanNodeType type() {
|
||||
return PlanNodeType.MODIFY;
|
||||
}
|
||||
|
||||
public ModifyNode(PlanNode child) {
|
||||
this(child, null);
|
||||
}
|
||||
|
||||
public ModifyNode(PlanNode child, Item filter) {
|
||||
this.whereFilter = filter;
|
||||
for (PlanNode childChild : child.getChildren()) {
|
||||
this.addChild(childChild);
|
||||
}
|
||||
this.keepFieldSchema = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RouteTableConfigInfo findFieldSourceFromIndex(int index) throws Exception {
|
||||
if (columnsSelected.size() > index) {
|
||||
Item sourceColumns = columnsSelected.get(index);
|
||||
for (int i = 0; i < this.getChild().columnsSelected.size(); i++) {
|
||||
Item cSelected = this.getChild().columnsSelected.get(i);
|
||||
if (cSelected.getAlias() != null && cSelected.getAlias().equals(sourceColumns.getItemName())) {
|
||||
return this.getChild().findFieldSourceFromIndex(i);
|
||||
} else if (cSelected.getAlias() == null && cSelected.getItemName().equals(sourceColumns.getItemName())) {
|
||||
return this.getChild().findFieldSourceFromIndex(i);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
public void setChild(PlanNode child) {
|
||||
if (child == null) {
|
||||
return;
|
||||
}
|
||||
children.clear();
|
||||
addChild(child);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPureName() {
|
||||
return this.getChild().getAlias();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPureSchema() {
|
||||
return this.getChild().getPureSchema();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ModifyNode copy() {
|
||||
ModifyNode newTableNode = new ModifyNode(this.getChild().copy());
|
||||
this.copySelfTo(newTableNode);
|
||||
return newTableNode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getHeight() {
|
||||
return getChild().getHeight() + 1;
|
||||
}
|
||||
|
||||
public List<ItemFuncEqual> getSetItemList() {
|
||||
return setItemList;
|
||||
}
|
||||
|
||||
public void addSetItem(ItemFuncEqual setItem) {
|
||||
this.setItemList.add(setItem);
|
||||
}
|
||||
|
||||
|
||||
public PlanNode getLeftNode() {
|
||||
if (children.isEmpty())
|
||||
return null;
|
||||
return children.get(0);
|
||||
|
||||
}
|
||||
|
||||
public PlanNode getRightNode() {
|
||||
if (children.size() < 2)
|
||||
return null;
|
||||
return children.get(1);
|
||||
}
|
||||
|
||||
public void setUpFields() {
|
||||
super.setUpFields();
|
||||
setUpSetItem();
|
||||
}
|
||||
|
||||
private void setUpSetItem() {
|
||||
nameContext.setFindInSelect(false);
|
||||
nameContext.setSelectFirst(false);
|
||||
for (ItemFuncEqual setItem : setItemList) {
|
||||
for (Item argument : setItem.arguments()) {
|
||||
setUpItem(argument);
|
||||
if (argument instanceof ItemFuncGroupConcat) {
|
||||
((ItemFuncGroupConcat) argument).fixOrders(nameContext, referContext);
|
||||
}
|
||||
NamedField field = makeOutNamedField(argument);
|
||||
if (outerFields.containsKey(field) && isDuplicateField(this))
|
||||
throw new MySQLOutPutException(ErrorCode.ER_OPTIMIZER, "", "duplicate field");
|
||||
outerFields.put(field, argument);
|
||||
}
|
||||
setItem.setItemName(null);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString(int level) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
String tabTittle = ToStringUtil.getTab(level);
|
||||
String tabContent = ToStringUtil.getTab(level + 1);
|
||||
|
||||
ToStringUtil.appendln(sb, tabTittle + "Update");
|
||||
ToStringUtil.appendln(sb, tabContent + "set: " + ToStringUtil.itemListString(setItemList));
|
||||
ToStringUtil.appendln(sb, tabContent + "where: " + ToStringUtil.itemString(whereFilter));
|
||||
ToStringUtil.appendln(sb, tabContent + "orderBy: " + ToStringUtil.orderListString(orderBys));
|
||||
if (this.getLimitFrom() >= 0L && this.getLimitTo() > 0L) {
|
||||
ToStringUtil.appendln(sb, tabContent + "limitFrom: " + this.getLimitFrom());
|
||||
ToStringUtil.appendln(sb, tabContent + "limitTo: " + this.getLimitTo());
|
||||
}
|
||||
ToStringUtil.appendln(sb, tabContent + "sql: " + this.getSql());
|
||||
|
||||
ToStringUtil.appendln(sb, tabContent + "left:");
|
||||
sb.append(this.getLeftNode().toString(level + 2));
|
||||
ToStringUtil.appendln(sb, tabContent + "right:");
|
||||
sb.append(this.getRightNode().toString(level + 2));
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
}
|
||||
@@ -35,7 +35,7 @@ public abstract class PlanNode {
|
||||
}
|
||||
|
||||
public enum PlanNodeType {
|
||||
NONAME, TABLE, JOIN, MERGE, QUERY, JOIN_INNER, MANAGER_TABLE
|
||||
NONAME, TABLE, JOIN, MERGE, QUERY, JOIN_INNER, MANAGER_TABLE, MODIFY
|
||||
}
|
||||
|
||||
public abstract PlanNodeType type();
|
||||
@@ -119,7 +119,7 @@ public abstract class PlanNode {
|
||||
|
||||
NameResolutionContext nameContext;
|
||||
|
||||
private ReferContext referContext;
|
||||
protected ReferContext referContext;
|
||||
|
||||
protected boolean keepFieldSchema = true;
|
||||
private boolean singleRoute = false;
|
||||
@@ -476,7 +476,7 @@ public abstract class PlanNode {
|
||||
columnsSelected = newSels;
|
||||
}
|
||||
|
||||
private NamedField makeOutNamedField(Item sel) {
|
||||
protected NamedField makeOutNamedField(Item sel) {
|
||||
String tmpSchema = null;
|
||||
if (keepFieldSchema) {
|
||||
tmpSchema = sel.getDbName();
|
||||
|
||||
@@ -9,14 +9,6 @@ import com.actiontech.dble.backend.mysql.nio.handler.builder.BaseHandlerBuilder;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.BaseDMLHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.*;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.AggregateHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.DirectGroupByHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.join.JoinHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.join.JoinInnerHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.join.NotInHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.AllAnySubQueryHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.InSubQueryHandler;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.SingleRowSubQueryHandler;
|
||||
import com.actiontech.dble.plan.node.JoinNode;
|
||||
import com.actiontech.dble.route.RouteResultsetNode;
|
||||
import com.actiontech.dble.util.CollectionUtil;
|
||||
@@ -24,6 +16,9 @@ import com.actiontech.dble.util.CollectionUtil;
|
||||
import java.util.*;
|
||||
|
||||
public final class ComplexQueryPlanUtil {
|
||||
|
||||
public static final String TYPE_UPDATE_SUB_QUERY = "for CHILD in UPDATE_SUB_QUERY.RESULTS";
|
||||
|
||||
private ComplexQueryPlanUtil() {
|
||||
}
|
||||
|
||||
@@ -83,17 +78,22 @@ public final class ComplexQueryPlanUtil {
|
||||
} else if (!CollectionUtil.isEmpty(dependencies)) {
|
||||
dependencies.removeIf(dependency -> dependency.startsWith(JoinNode.Strategy.HINT_NEST_LOOP.name()));
|
||||
}
|
||||
boolean isSubUpdate = false;
|
||||
if (dependencies != null && dependencies.size() > 0) {
|
||||
isSubUpdate = dependencies.stream()
|
||||
.allMatch(entity -> entity.contains(ComplexQueryPlanUtil.TYPE_UPDATE_SUB_QUERY.toLowerCase())) && mergeHandler instanceof MultiNodeUpdateHandler;
|
||||
}
|
||||
String mergeName = getMergeType(mergeHandler);
|
||||
List<BaseSelectHandler> mergeList = new ArrayList<>();
|
||||
List<BaseDMLHandler> mergeList = new ArrayList<>();
|
||||
mergeList.addAll(mergeHandler.getExeHandlers());
|
||||
String mergeNode = genHandlerName(mergeName, nameMap);
|
||||
ReferenceHandlerInfo refInfo = new ReferenceHandlerInfo(mergeNode, mergeName, mergeHandler);
|
||||
ReferenceHandlerInfo refInfo = new ReferenceHandlerInfo(mergeNode, mergeName, mergeHandler, isSubUpdate);
|
||||
if (mergeHandler instanceof MultiNodeFakeHandler) {
|
||||
refInfo.setBaseSQL(((MultiNodeFakeHandler) mergeHandler).toSQLString());
|
||||
}
|
||||
handlerMap.put(mergeHandler, refInfo);
|
||||
refMap.put(mergeNode, refInfo);
|
||||
for (BaseSelectHandler exeHandler : mergeList) {
|
||||
for (BaseDMLHandler exeHandler : mergeList) {
|
||||
RouteResultsetNode rrss = exeHandler.getRrss();
|
||||
String dateNode = rrss.getName() + "_" + rrss.getMultiplexNum();
|
||||
refInfo.addChild(dateNode);
|
||||
@@ -101,7 +101,7 @@ public final class ComplexQueryPlanUtil {
|
||||
if (dependencies != null && dependencies.size() > 0) {
|
||||
type += "(May No Need)";
|
||||
}
|
||||
ReferenceHandlerInfo baseSQLInfo = new ReferenceHandlerInfo(dateNode, type, rrss.getStatement(), exeHandler);
|
||||
ReferenceHandlerInfo baseSQLInfo = new ReferenceHandlerInfo(dateNode, type, rrss.getStatement(), exeHandler, isSubUpdate);
|
||||
refMap.put(dateNode, baseSQLInfo);
|
||||
if (dependencies != null && dependencies.size() > 0) {
|
||||
baseSQLInfo.addAllStepChildren(dependencies);
|
||||
@@ -155,7 +155,7 @@ public final class ComplexQueryPlanUtil {
|
||||
while (nextHandler != null) {
|
||||
ReferenceHandlerInfo child = handlerMap.get(handler);
|
||||
String childName = child.getName();
|
||||
String handlerType = getTypeName(nextHandler);
|
||||
String handlerType = nextHandler.explainType().getContent();
|
||||
if (!handlerMap.containsKey(nextHandler)) {
|
||||
String handlerName = genHandlerName(handlerType, nameMap);
|
||||
ReferenceHandlerInfo handlerInfo = new ReferenceHandlerInfo(handlerName, handlerType, nextHandler);
|
||||
@@ -178,7 +178,7 @@ public final class ComplexQueryPlanUtil {
|
||||
Set<BaseDMLHandler> tableHandlers = ((SendMakeHandler) handler).getTableHandlers();
|
||||
for (BaseDMLHandler tableHandler : tableHandlers) {
|
||||
if (tableHandler instanceof DelayTableHandler) {
|
||||
StringBuilder sb = new StringBuilder(getTypeName(tableHandler));
|
||||
StringBuilder sb = new StringBuilder(nextHandler.explainType().getContent());
|
||||
sb.append(" - ").append(childName).append("'s RESULTS");
|
||||
MultiNodeMergeHandler dmlResponseHandler = (MultiNodeMergeHandler) ((DelayTableHandler) tableHandler).getCreatedHandler().getMerges().get(0);
|
||||
dmlResponseHandler.getDependencies().add(sb.toString());
|
||||
@@ -198,52 +198,11 @@ public final class ComplexQueryPlanUtil {
|
||||
return "INNER_FUNC_MERGE";
|
||||
} else if (handler instanceof MultiNodeEasyMergeHandler) {
|
||||
return "MERGE";
|
||||
} else if (handler instanceof MultiNodeUpdateHandler) {
|
||||
return "MERGE";
|
||||
} else {
|
||||
return "MERGE_AND_ORDER";
|
||||
}
|
||||
}
|
||||
|
||||
private static String getTypeName(DMLResponseHandler handler) {
|
||||
if (handler instanceof AggregateHandler) {
|
||||
return "AGGREGATE";
|
||||
} else if (handler instanceof DistinctHandler) {
|
||||
return "DISTINCT";
|
||||
} else if (handler instanceof LimitHandler) {
|
||||
return "LIMIT";
|
||||
} else if (handler instanceof WhereHandler) {
|
||||
return "WHERE_FILTER";
|
||||
} else if (handler instanceof HavingHandler) {
|
||||
return "HAVING_FILTER";
|
||||
} else if (handler instanceof SendMakeHandler) {
|
||||
return "SHUFFLE_FIELD";
|
||||
} else if (handler instanceof UnionHandler) {
|
||||
return "UNION_ALL";
|
||||
} else if (handler instanceof OrderByHandler) {
|
||||
return "ORDER";
|
||||
} else if (handler instanceof NotInHandler) {
|
||||
return "NOT_IN";
|
||||
} else if (handler instanceof JoinInnerHandler) {
|
||||
return "INNER_FUNC_ADD";
|
||||
} else if (handler instanceof JoinHandler) {
|
||||
return "JOIN";
|
||||
} else if (handler instanceof DirectGroupByHandler) {
|
||||
return "DIRECT_GROUP";
|
||||
} else if (handler instanceof TempTableHandler) {
|
||||
return "NEST_LOOP";
|
||||
} else if (handler instanceof InSubQueryHandler) {
|
||||
return "IN_SUB_QUERY";
|
||||
} else if (handler instanceof AllAnySubQueryHandler) {
|
||||
return "ALL_ANY_SUB_QUERY";
|
||||
} else if (handler instanceof SingleRowSubQueryHandler) {
|
||||
return "SCALAR_SUB_QUERY";
|
||||
} else if (handler instanceof RenameFieldHandler) {
|
||||
return "RENAME_DERIVED_SUB_QUERY";
|
||||
} else if (handler instanceof OutputHandler) {
|
||||
return "WRITE_TO_CLIENT";
|
||||
} else if (handler instanceof DelayTableHandler) {
|
||||
return "HINT_NEST_LOOP";
|
||||
}
|
||||
return "OTHER";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -14,9 +14,11 @@ import com.actiontech.dble.plan.common.item.*;
|
||||
import com.actiontech.dble.plan.common.item.Item.ItemType;
|
||||
import com.actiontech.dble.plan.common.item.function.ItemFunc;
|
||||
import com.actiontech.dble.plan.common.item.function.ItemFunc.Functype;
|
||||
import com.actiontech.dble.plan.common.item.function.operator.ItemBoolFunc2;
|
||||
import com.actiontech.dble.plan.common.item.function.operator.cmpfunc.*;
|
||||
import com.actiontech.dble.plan.common.item.function.operator.logic.ItemCondAnd;
|
||||
import com.actiontech.dble.plan.common.item.function.operator.logic.ItemCondOr;
|
||||
import com.actiontech.dble.plan.common.item.function.operator.logic.ItemFuncNot;
|
||||
import com.actiontech.dble.plan.common.item.function.sumfunc.ItemSum;
|
||||
import com.actiontech.dble.plan.common.item.function.sumfunc.ItemSum.SumFuncType;
|
||||
import com.actiontech.dble.plan.common.item.subquery.ItemAllAnySubQuery;
|
||||
@@ -27,9 +29,9 @@ import com.actiontech.dble.plan.common.ptr.BoolPtr;
|
||||
import com.actiontech.dble.plan.node.*;
|
||||
import com.actiontech.dble.plan.node.PlanNode.PlanNodeType;
|
||||
import com.actiontech.dble.route.parser.util.Pair;
|
||||
|
||||
import com.actiontech.dble.services.mysqlsharding.ShardingService;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLUpdateStatement;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
@@ -527,7 +529,7 @@ public final class PlanUtil {
|
||||
}
|
||||
}
|
||||
|
||||
private static Item rebuildBoolSubQuery(Item item, int index, BoolPtr reBuild, BoolPtr needExecuteNull, BoolPtr isAll) {
|
||||
public static Item rebuildBoolSubQuery(Item item, int index, BoolPtr reBuild, BoolPtr needExecuteNull, BoolPtr isAll) {
|
||||
Item arg = item.arguments().get(index);
|
||||
if (arg.type().equals(ItemType.SUBSELECT_ITEM)) {
|
||||
if (arg instanceof ItemScalarSubQuery) {
|
||||
@@ -589,4 +591,82 @@ public final class PlanUtil {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static void checkTablesPrivilege(ShardingService service, PlanNode node, SQLUpdateStatement stmt) {
|
||||
for (TableNode tn : node.getReferedTableNodes()) {
|
||||
if (!ShardingPrivileges.checkPrivilege(service.getUserConfig(), tn.getSchema(), tn.getTableName(), ShardingPrivileges.CheckType.UPDATE)) {
|
||||
String msg = "The statement DML privilege check is not passed, sql:" + stmt.toString().replaceAll("[\\t\\n\\r]", " ");
|
||||
throw new MySQLOutPutException(ErrorCode.ER_PARSE_ERROR, "", msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
// try to trim sharding from field_item
|
||||
protected String getItemName(Item item) {
|
||||
if (item instanceof ItemCondOr) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(" ( ");
|
||||
for (int index = 0; index < item.getArgCount(); index++) {
|
||||
if (index > 0) {
|
||||
sb.append(" OR ");
|
||||
}
|
||||
sb.append(getItemName(item.arguments().get(index)));
|
||||
}
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
} else if (item instanceof ItemCondAnd) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(" ( ");
|
||||
for (int index = 0; index < item.getArgCount(); index++) {
|
||||
if (index > 0) {
|
||||
sb.append(" AND ");
|
||||
}
|
||||
sb.append(getItemName(item.arguments().get(index)));
|
||||
}
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
} else if (item instanceof ItemFuncNot) {
|
||||
return " ( NOT " + getItemName(item.arguments().get(0)) + ")";
|
||||
} else if (item instanceof ItemBoolFunc2) {
|
||||
Item a = item.arguments().get(0);
|
||||
Item b = item.arguments().get(1);
|
||||
return getItemName(a) + " " + ((ItemBoolFunc2) item).funcName() + " " + getItemName(b);
|
||||
} else if (item.type().equals(ItemType.FIELD_ITEM)) {
|
||||
String tableName = "`" + item.getTableName() + "`.`" + item.getItemName() + "`";
|
||||
if (item.getDbName() == null) {
|
||||
return tableName;
|
||||
}
|
||||
if (item.getReferTables().size() == 0) {
|
||||
return tableName;
|
||||
}
|
||||
PlanNode tbNode = item.getReferTables().iterator().next();
|
||||
if (!(tbNode instanceof TableNode)) {
|
||||
return tableName;
|
||||
}
|
||||
if (!((TableNode) tbNode).getTableName().equals(item.getTableName())) {
|
||||
return tableName;
|
||||
}
|
||||
return "`" + item.getDbName() + "`." + tableName;
|
||||
} else if (item instanceof ItemFuncIn) {
|
||||
Item a = item.arguments().get(0);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(getItemName(a));
|
||||
if (((ItemFuncIn) item).isNegate()) {
|
||||
sb.append(" not ");
|
||||
}
|
||||
sb.append(" in (");
|
||||
for (int index = 1; index < item.arguments().size(); index++) {
|
||||
if (index > 1) {
|
||||
sb.append(",");
|
||||
}
|
||||
sb.append(getItemName(item.arguments().get(index)));
|
||||
}
|
||||
sb.append(")");
|
||||
return sb.toString();
|
||||
} else {
|
||||
return item.getItemName();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,25 +29,43 @@ public class ReferenceHandlerInfo {
|
||||
private Set<String> children = new LinkedHashSet<>();
|
||||
private Set<String> stepChildren = new LinkedHashSet<>();
|
||||
private final DMLResponseHandler handler;
|
||||
private boolean isIndentation;
|
||||
|
||||
ReferenceHandlerInfo(String name, String type, String baseSQL, DMLResponseHandler handler) {
|
||||
this(name, type, handler);
|
||||
this.baseSQL = baseSQL;
|
||||
}
|
||||
|
||||
public ReferenceHandlerInfo(String name, String type, String baseSQL, DMLResponseHandler handler, boolean isIndentation) {
|
||||
this.name = name;
|
||||
this.type = type;
|
||||
this.baseSQL = baseSQL;
|
||||
this.handler = handler;
|
||||
this.isIndentation = isIndentation;
|
||||
}
|
||||
|
||||
ReferenceHandlerInfo(String name, String type, DMLResponseHandler handler) {
|
||||
this.name = name;
|
||||
this.type = type;
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
public ReferenceHandlerInfo(String name, String type, DMLResponseHandler handler, boolean isIndentation) {
|
||||
this.name = name;
|
||||
this.type = type;
|
||||
this.handler = handler;
|
||||
this.isIndentation = isIndentation;
|
||||
}
|
||||
|
||||
public String getRefOrSQL() {
|
||||
StringBuilder names = new StringBuilder("");
|
||||
for (String child : stepChildren) {
|
||||
if (names.length() > 0) {
|
||||
names.append("; ");
|
||||
if (!child.contains(ComplexQueryPlanUtil.TYPE_UPDATE_SUB_QUERY.toLowerCase())) {
|
||||
if (names.length() > 0) {
|
||||
names.append("; ");
|
||||
}
|
||||
names.append(child);
|
||||
}
|
||||
names.append(child);
|
||||
}
|
||||
for (String child : children) {
|
||||
if (names.length() > 0) {
|
||||
@@ -73,6 +91,10 @@ public class ReferenceHandlerInfo {
|
||||
return this.stepChildren.size() != 0;
|
||||
}
|
||||
|
||||
public boolean isIndentation() {
|
||||
return isIndentation;
|
||||
}
|
||||
|
||||
void addAllStepChildren(Set<String> dependencies) {
|
||||
this.stepChildren.addAll(dependencies);
|
||||
}
|
||||
|
||||
@@ -161,6 +161,16 @@ public class MySQLItemVisitor extends MySqlASTVisitorAdapter {
|
||||
initName(x);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void endVisit(SQLUpdateSetItem x) {
|
||||
Item itemLeft = getItem(x.getColumn());
|
||||
Item itemRight = getItem();
|
||||
item = new ItemFuncEqual(itemLeft, itemRight, this.charsetIndex);
|
||||
item.setWithSubQuery(itemLeft.isWithSubQuery() || itemRight.isWithSubQuery());
|
||||
item.setCorrelatedSubQuery(itemLeft.isCorrelatedSubQuery() || itemRight.isCorrelatedSubQuery());
|
||||
item.setItemName(item.getItemName().replaceAll("\n\\t", " "));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void endVisit(SQLBinaryOpExpr x) {
|
||||
Item itemLeft = getItem(x.getLeft());
|
||||
|
||||
@@ -46,14 +46,14 @@ import java.util.Map;
|
||||
import static com.alibaba.druid.sql.ast.statement.SQLJoinTableSource.JoinType.INNER_JOIN;
|
||||
|
||||
public class MySQLPlanNodeVisitor {
|
||||
private final String currentDb;
|
||||
private final int charsetIndex;
|
||||
private final ProxyMetaManager metaManager;
|
||||
private PlanNode tableNode;
|
||||
private boolean containSchema = false;
|
||||
private boolean isSubQuery = false;
|
||||
private Map<String, String> usrVariables;
|
||||
private HintPlanInfo hintPlanInfo;
|
||||
protected final String currentDb;
|
||||
protected final int charsetIndex;
|
||||
protected final ProxyMetaManager metaManager;
|
||||
protected PlanNode tableNode;
|
||||
protected boolean containSchema = false;
|
||||
protected boolean isSubQuery = false;
|
||||
protected Map<String, String> usrVariables;
|
||||
protected HintPlanInfo hintPlanInfo;
|
||||
|
||||
public MySQLPlanNodeVisitor(String currentDb, int charsetIndex, ProxyMetaManager metaManager, boolean isSubQuery, Map<String, String> usrVariables, @Nullable HintPlanInfo hintPlanInfo) {
|
||||
this.currentDb = currentDb;
|
||||
@@ -261,6 +261,7 @@ public class MySQLPlanNodeVisitor {
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean visit(SQLJoinTableSource joinTables) {
|
||||
SQLTableSource left = joinTables.getLeft();
|
||||
MySQLPlanNodeVisitor mtvLeft = new MySQLPlanNodeVisitor(this.currentDb, this.charsetIndex, this.metaManager, this.isSubQuery, this.usrVariables, this.hintPlanInfo);
|
||||
@@ -373,6 +374,7 @@ public class MySQLPlanNodeVisitor {
|
||||
MySQLPlanNodeVisitor mtv = new MySQLPlanNodeVisitor(this.currentDb, this.charsetIndex, this.metaManager, this.isSubQuery, this.usrVariables, this.hintPlanInfo);
|
||||
mtv.visit(subQueryTables);
|
||||
this.tableNode = new QueryNode(mtv.getTableNode());
|
||||
this.tableNode.setSql(subQueryTables.toString());
|
||||
this.tableNode.setContainsSubQuery(mtv.getTableNode().isContainsSubQuery());
|
||||
this.containSchema = mtv.isContainSchema();
|
||||
}
|
||||
@@ -418,7 +420,7 @@ public class MySQLPlanNodeVisitor {
|
||||
alias = StringUtil.removeBackQuote(alias);
|
||||
}
|
||||
selItem.setAlias(alias);
|
||||
if (isSubQuery && selItem.getAlias() == null) {
|
||||
if (isSubQuery && selItem.getAlias() == null && items.size() == 1) {
|
||||
selItem.setAlias("autoalias_scalar");
|
||||
}
|
||||
selectItems.add(selItem);
|
||||
@@ -426,7 +428,7 @@ public class MySQLPlanNodeVisitor {
|
||||
return selectItems;
|
||||
}
|
||||
|
||||
private void setSubQueryNode(Item selItem) {
|
||||
protected void setSubQueryNode(Item selItem) {
|
||||
if (selItem instanceof ItemScalarSubQuery) {
|
||||
((ItemScalarSubQuery) selItem).setField(true);
|
||||
tableNode.getSubQueries().add((ItemScalarSubQuery) selItem);
|
||||
@@ -453,7 +455,7 @@ public class MySQLPlanNodeVisitor {
|
||||
tableNode.setCorrelatedSubQuery(selItem.isCorrelatedSubQuery());
|
||||
}
|
||||
|
||||
private void handleWhereCondition(SQLExpr whereExpr) {
|
||||
protected void handleWhereCondition(SQLExpr whereExpr) {
|
||||
MySQLItemVisitor mev = new MySQLItemVisitor(this.currentDb, this.charsetIndex, this.metaManager, this.usrVariables, this.hintPlanInfo);
|
||||
whereExpr.accept(mev);
|
||||
if (this.tableNode != null) {
|
||||
@@ -480,7 +482,7 @@ public class MySQLPlanNodeVisitor {
|
||||
}
|
||||
}
|
||||
|
||||
private void handleOrderBy(SQLOrderBy orderBy) {
|
||||
protected void handleOrderBy(SQLOrderBy orderBy) {
|
||||
for (SQLSelectOrderByItem p : orderBy.getItems()) {
|
||||
SQLExpr expr = p.getExpr();
|
||||
MySQLItemVisitor v = new MySQLItemVisitor(this.currentDb, this.charsetIndex, this.metaManager, this.usrVariables, this.hintPlanInfo);
|
||||
@@ -585,7 +587,7 @@ public class MySQLPlanNodeVisitor {
|
||||
tableNode.setLimitTo(to);
|
||||
}
|
||||
|
||||
private void addJoinOnColumns(Item ifilter, JoinNode joinNode) {
|
||||
protected void addJoinOnColumns(Item ifilter, JoinNode joinNode) {
|
||||
if (ifilter instanceof ItemFuncEqual) {
|
||||
ItemFuncEqual filter = (ItemFuncEqual) ifilter;
|
||||
Item column = filter.arguments().get(0);
|
||||
@@ -613,7 +615,7 @@ public class MySQLPlanNodeVisitor {
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> getUsingFields(List<SQLExpr> using) {
|
||||
protected List<String> getUsingFields(List<SQLExpr> using) {
|
||||
List<String> fds = new ArrayList<>(using.size());
|
||||
for (SQLExpr us : using) {
|
||||
fds.add(StringUtil.removeBackQuote(us.toString().toLowerCase()));
|
||||
|
||||
@@ -0,0 +1,292 @@
|
||||
/*
|
||||
* Copyright (C) 2016-2022 ActionTech.
|
||||
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
|
||||
*/
|
||||
|
||||
package com.actiontech.dble.plan.visitor;
|
||||
|
||||
import com.actiontech.dble.config.ErrorCode;
|
||||
import com.actiontech.dble.meta.ProxyMetaManager;
|
||||
import com.actiontech.dble.plan.common.exception.MySQLOutPutException;
|
||||
import com.actiontech.dble.plan.common.item.Item;
|
||||
import com.actiontech.dble.plan.common.item.function.ItemFunc;
|
||||
import com.actiontech.dble.plan.common.item.function.operator.cmpfunc.ItemFuncEqual;
|
||||
import com.actiontech.dble.plan.common.item.function.operator.logic.ItemCondAnd;
|
||||
import com.actiontech.dble.plan.common.item.function.operator.logic.ItemCondOr;
|
||||
import com.actiontech.dble.plan.common.item.function.sumfunc.ItemSum;
|
||||
import com.actiontech.dble.plan.common.item.subquery.UpdateItemSubQuery;
|
||||
import com.actiontech.dble.plan.node.ModifyNode;
|
||||
import com.actiontech.dble.plan.node.PlanNode;
|
||||
import com.actiontech.dble.plan.node.QueryNode;
|
||||
import com.actiontech.dble.plan.node.TableNode;
|
||||
import com.actiontech.dble.plan.optimizer.HintPlanInfo;
|
||||
import com.actiontech.dble.plan.util.PlanUtil;
|
||||
import com.actiontech.dble.singleton.TraceManager;
|
||||
import com.actiontech.dble.util.StringUtil;
|
||||
import com.alibaba.druid.sql.ast.SQLExpr;
|
||||
import com.alibaba.druid.sql.ast.SQLLimit;
|
||||
import com.alibaba.druid.sql.ast.SQLOrderBy;
|
||||
import com.alibaba.druid.sql.ast.statement.*;
|
||||
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlUpdateStatement;
|
||||
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
|
||||
import com.alibaba.druid.sql.parser.SQLStatementParser;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class UpdatePlanNodeVisitor extends MySQLPlanNodeVisitor {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(UpdatePlanNodeVisitor.class);
|
||||
|
||||
|
||||
public UpdatePlanNodeVisitor(String currentDb, int charsetIndex, ProxyMetaManager metaManager, boolean isSubQuery, Map<String, String> usrVariables, @Nullable HintPlanInfo hintPlanInfo) {
|
||||
super(currentDb, charsetIndex, metaManager, isSubQuery, usrVariables, hintPlanInfo);
|
||||
}
|
||||
|
||||
public boolean visit(SQLUpdateStatement node) {
|
||||
TraceManager.TraceObject traceObject = TraceManager.threadTrace("visit-for-sql-structure");
|
||||
try {
|
||||
if (node instanceof MySqlUpdateStatement) {
|
||||
return visit((MySqlUpdateStatement) node);
|
||||
}
|
||||
} finally {
|
||||
TraceManager.finishSpan(traceObject);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
public boolean visit(MySqlUpdateStatement node) {
|
||||
TraceManager.TraceObject traceObject = TraceManager.threadTrace("visit-for-sql-structure");
|
||||
try {
|
||||
UpdatePlanNodeVisitor mtv = new UpdatePlanNodeVisitor(this.currentDb, this.charsetIndex, this.metaManager, this.isSubQuery, this.usrVariables, this.hintPlanInfo);
|
||||
mtv.visit(node.getTableSource());
|
||||
ModifyNode modifyNode = new ModifyNode(mtv.getTableNode());
|
||||
|
||||
this.tableNode = modifyNode;
|
||||
this.containSchema = mtv.isContainSchema();
|
||||
|
||||
List<SQLUpdateSetItem> items = node.getItems();
|
||||
if (items != null) {
|
||||
handleSetItem(modifyNode, items);
|
||||
}
|
||||
|
||||
SQLExpr whereExpr = node.getWhere();
|
||||
if (whereExpr != null) {
|
||||
handleWhereCondition(whereExpr);
|
||||
}
|
||||
|
||||
//https://dev.mysql.com/doc/refman/8.0/en/update.html
|
||||
SQLOrderBy orderBy = node.getOrderBy();
|
||||
if (orderBy != null) {
|
||||
throw new MySQLOutPutException(ErrorCode.ERR_NOT_SUPPORTED, "", "Incorrect usage of UPDATE and ORDER");
|
||||
}
|
||||
SQLLimit limit = node.getLimit();
|
||||
if (limit != null) {
|
||||
throw new MySQLOutPutException(ErrorCode.ERR_NOT_SUPPORTED, "", "Incorrect usage of UPDATE and LIMIT");
|
||||
}
|
||||
//split query
|
||||
if (modifyNode.getReferedTableNodes().size() != 2) {
|
||||
throw new MySQLOutPutException(ErrorCode.ERR_NOT_SUPPORTED, "", "Update of more than 2 tables is not supported!");
|
||||
}
|
||||
//first assembly select
|
||||
assembleSelect(modifyNode);
|
||||
MySQLItemVisitor.clearCache();
|
||||
return true;
|
||||
} finally {
|
||||
TraceManager.finishSpan(traceObject);
|
||||
}
|
||||
}
|
||||
|
||||
public void visit(SQLTableSource tables) {
|
||||
if (tables instanceof SQLExprTableSource) {
|
||||
SQLExprTableSource table = (SQLExprTableSource) tables;
|
||||
UpdatePlanNodeVisitor mtv = new UpdatePlanNodeVisitor(this.currentDb, this.charsetIndex, this.metaManager, this.isSubQuery, this.usrVariables, this.hintPlanInfo);
|
||||
mtv.visit(table);
|
||||
this.tableNode = mtv.getTableNode();
|
||||
this.containSchema = mtv.isContainSchema();
|
||||
} else if (tables instanceof SQLJoinTableSource) {
|
||||
SQLJoinTableSource joinTables = (SQLJoinTableSource) tables;
|
||||
UpdatePlanNodeVisitor mtv = new UpdatePlanNodeVisitor(this.currentDb, this.charsetIndex, this.metaManager, this.isSubQuery, this.usrVariables, this.hintPlanInfo);
|
||||
mtv.visit(joinTables);
|
||||
this.tableNode = mtv.getTableNode();
|
||||
this.containSchema = mtv.isContainSchema();
|
||||
}
|
||||
if (tables.getAlias() != null) {
|
||||
this.tableNode.setAlias(tables.getAlias());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* assemble select to act as a sub query
|
||||
*
|
||||
* @param modifyNode
|
||||
*/
|
||||
private void assembleSelect(ModifyNode modifyNode) {
|
||||
String querySql;
|
||||
PlanNode queryNode = null;
|
||||
List<PlanNode> children = modifyNode.getChildren();
|
||||
List<QueryNode> queryNodeList = children.stream()
|
||||
.filter(planNode -> planNode instanceof QueryNode)
|
||||
.map(node -> (QueryNode) node)
|
||||
.collect(Collectors.toList());
|
||||
if (!queryNodeList.isEmpty()) {
|
||||
if (queryNodeList.size() > 1) {
|
||||
throw new MySQLOutPutException(ErrorCode.ERR_NOT_SUPPORTED, "", "Number of subqueries greater than 1 is not supported!");
|
||||
}
|
||||
querySql = queryNodeList.get(0).getSql();
|
||||
queryNode = queryNodeList.get(0);
|
||||
} else {
|
||||
querySql = handleQuery(modifyNode);
|
||||
}
|
||||
LOGGER.debug("merge update——query sql:{}", querySql);
|
||||
|
||||
SQLStatementParser parser = new MySqlStatementParser(querySql);
|
||||
SQLSelectStatement select = (SQLSelectStatement) parser.parseStatement();
|
||||
SQLSelectQuery selectQuery = select.getSelect().getQuery();
|
||||
UpdateItemSubQuery item = new UpdateItemSubQuery(currentDb, selectQuery, null, true, metaManager, usrVariables, this.charsetIndex, this.hintPlanInfo);
|
||||
item.setQueryNode(queryNode);
|
||||
modifyNode.getSubQueries().add(item);
|
||||
modifyNode.setWithSubQuery(true);
|
||||
modifyNode.setContainsSubQuery(true);
|
||||
}
|
||||
|
||||
protected String handleQuery(ModifyNode query) {
|
||||
List<Item> setValItemList = Lists.newArrayList();
|
||||
for (ItemFuncEqual itemFuncEqual : query.getSetItemList()) {
|
||||
setValItemList.add(itemFuncEqual.arguments().get(1));
|
||||
}
|
||||
|
||||
//only supports update set single table
|
||||
StringBuilder setBuilder = new StringBuilder();
|
||||
String tableName = null;
|
||||
for (Item item : setValItemList) {
|
||||
if (tableName == null) {
|
||||
tableName = item.getTableName();
|
||||
if (!StringUtil.isEmpty(item.getTableName())) {
|
||||
setBuilder.append("`" + item.getTableName() + "`.`" + item.getItemName() + "`,");
|
||||
}
|
||||
} else if (!tableName.equals(item.getTableName())) {
|
||||
throw new MySQLOutPutException(ErrorCode.ERR_NOT_SUPPORTED, "", "Update set multiple tables is not supported yet!");
|
||||
} else {
|
||||
setBuilder.append("`" + item.getTableName() + "`.`" + item.getItemName() + "`,");
|
||||
}
|
||||
}
|
||||
|
||||
setBuilder.deleteCharAt(setBuilder.length() - 1);
|
||||
StringBuilder sqlBuilder = new StringBuilder();
|
||||
sqlBuilder.append("select ");
|
||||
//set
|
||||
sqlBuilder.append(setBuilder);
|
||||
//where
|
||||
StringBuilder whereBuilder = new StringBuilder();
|
||||
String finalTableName = tableName;
|
||||
|
||||
List<Item> whereItemList = getAllWhereItem(query.getWhereFilter());
|
||||
whereItemList.stream().forEach(whereItem -> {
|
||||
if (!StringUtil.isEmpty(whereItem.getTableName()) && whereItem.getTableName().equals(finalTableName)) {
|
||||
whereBuilder.append(",`" + whereItem.getTableName() + "`.`" + whereItem.getItemName() + "` ");
|
||||
}
|
||||
});
|
||||
sqlBuilder.append(whereBuilder);
|
||||
sqlBuilder.append(" from ");
|
||||
|
||||
List<TableNode> tableNodes = query.getReferedTableNodes();
|
||||
for (TableNode tableNode : tableNodes) {
|
||||
if (!StringUtil.isEmpty(tableName) && tableName.equals(tableNode.getAlias())) {
|
||||
buildTableName(tableNode, sqlBuilder);
|
||||
}
|
||||
}
|
||||
if (!whereBuilder.toString().isEmpty()) {
|
||||
whereBuilder.deleteCharAt(0);
|
||||
sqlBuilder.append(" group by ");
|
||||
sqlBuilder.append(whereBuilder);
|
||||
}
|
||||
return sqlBuilder.toString();
|
||||
}
|
||||
|
||||
private List<Item> getAllWhereItem(Item item) {
|
||||
List<Item> whereItemList = Lists.newArrayList();
|
||||
if (PlanUtil.isCmpFunc(item)) {
|
||||
whereItemList.addAll(item.arguments());
|
||||
return whereItemList;
|
||||
} else if (item instanceof ItemCondAnd || item instanceof ItemCondOr) {
|
||||
for (int index = 0; index < item.getArgCount(); index++) {
|
||||
whereItemList.addAll(getAllWhereItem(item.arguments().get(index)));
|
||||
}
|
||||
}
|
||||
return whereItemList;
|
||||
}
|
||||
|
||||
void buildTableName(TableNode tableNode, StringBuilder sb) {
|
||||
String tableName = "`" + tableNode.getPureName() + "`";
|
||||
sb.append(" ").append(tableName);
|
||||
String alias = tableNode.getAlias();
|
||||
if (alias != null) {
|
||||
sb.append(" `").append(alias).append("`");
|
||||
}
|
||||
}
|
||||
|
||||
private void handleSetItem(ModifyNode modifyNode, List<SQLUpdateSetItem> items) {
|
||||
MySQLItemVisitor mev = new MySQLItemVisitor(this.currentDb, this.charsetIndex, this.metaManager, this.usrVariables, this.hintPlanInfo);
|
||||
for (SQLUpdateSetItem setItem : items) {
|
||||
setItem.accept(mev);
|
||||
ItemFuncEqual itemFuncEqual = (ItemFuncEqual) mev.getItem();
|
||||
//set-value does not support subqueries
|
||||
if (itemFuncEqual.isWithSubQuery()) {
|
||||
throw new MySQLOutPutException(ErrorCode.ERR_NOT_SUPPORTED, "", "Subqueries are not supported!");
|
||||
}
|
||||
//set-value does not support expression
|
||||
boolean hasFunc = itemFuncEqual.arguments().stream().anyMatch(item -> item instanceof ItemFunc);
|
||||
if (hasFunc) {
|
||||
throw new MySQLOutPutException(ErrorCode.ERR_NOT_SUPPORTED, "", "Expression not supported!");
|
||||
}
|
||||
//set-value does not support sum/min/max...
|
||||
boolean hasSum = itemFuncEqual.arguments().stream().anyMatch(item -> item instanceof ItemSum);
|
||||
if (hasSum) {
|
||||
throw new MySQLOutPutException(ErrorCode.ERR_NOT_SUPPORTED, "", " Invalid use of group function");
|
||||
}
|
||||
modifyNode.addSetItem(itemFuncEqual);
|
||||
}
|
||||
}
|
||||
|
||||
protected void handleWhereCondition(SQLExpr whereExpr) {
|
||||
MySQLItemVisitor mev = new MySQLItemVisitor(this.currentDb, this.charsetIndex, this.metaManager, this.usrVariables, this.hintPlanInfo);
|
||||
whereExpr.accept(mev);
|
||||
if (this.tableNode != null) {
|
||||
Item whereFilter = mev.getItem();
|
||||
tableNode.query(whereFilter);
|
||||
checkSupport(whereFilter);
|
||||
} else {
|
||||
throw new MySQLOutPutException(ErrorCode.ER_OPTIMIZER, "", "from expression is null,check the sql!");
|
||||
}
|
||||
}
|
||||
|
||||
private void checkSupport(Item item) {
|
||||
//where does not support subqueries
|
||||
if (item.isWithSubQuery()) {
|
||||
throw new MySQLOutPutException(ErrorCode.ERR_NOT_SUPPORTED, "", "Subqueries are not supported!");
|
||||
}
|
||||
if (PlanUtil.isCmpFunc(item)) {
|
||||
//where does not support expression
|
||||
boolean hasFunc = item.arguments().stream().anyMatch(subItem -> subItem instanceof ItemFunc);
|
||||
if (hasFunc) {
|
||||
throw new MySQLOutPutException(ErrorCode.ERR_NOT_SUPPORTED, "", "Expression not supported!");
|
||||
}
|
||||
//where does not support sum/min/max...
|
||||
boolean hasSum = item.arguments().stream().anyMatch(subItem -> subItem instanceof ItemSum);
|
||||
if (hasSum) {
|
||||
throw new MySQLOutPutException(ErrorCode.ERR_NOT_SUPPORTED, "", " Invalid use of group function");
|
||||
}
|
||||
} else if (item instanceof ItemCondAnd || item instanceof ItemCondOr) {
|
||||
for (Item argument : item.arguments()) {
|
||||
checkSupport(argument);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Copyright (C) 2016-2022 ActionTech.
|
||||
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
|
||||
*/
|
||||
package com.actiontech.dble.route.parser.druid;
|
||||
|
||||
import com.actiontech.dble.config.model.sharding.table.ERTable;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Objects;
|
||||
|
||||
public class ERRelation implements Comparable<ERRelation> {
|
||||
private ERTable left;
|
||||
private ERTable right;
|
||||
|
||||
public ERRelation(ERTable left, ERTable right) {
|
||||
this.left = left;
|
||||
this.right = right;
|
||||
}
|
||||
|
||||
public ERTable getLeft() {
|
||||
return left;
|
||||
}
|
||||
|
||||
public void setLeft(ERTable left) {
|
||||
this.left = left;
|
||||
}
|
||||
|
||||
public ERTable getRight() {
|
||||
return right;
|
||||
}
|
||||
|
||||
public void setRight(ERTable right) {
|
||||
this.right = right;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int compareTo(@NotNull ERRelation o) {
|
||||
return Comparator.comparing(ERRelation::getLeft)
|
||||
.thenComparing(ERRelation::getRight)
|
||||
.compare(this, o);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
ERRelation that = (ERRelation) o;
|
||||
return Objects.equals(left, that.left) &&
|
||||
Objects.equals(right, that.right);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(left, right);
|
||||
}
|
||||
}
|
||||
@@ -470,6 +470,10 @@ public class ServerSchemaStatVisitor extends MySqlSchemaStatVisitor {
|
||||
} else {
|
||||
tableSource.getLeft().accept(this);
|
||||
}
|
||||
SQLExpr condition = tableSource.getCondition();
|
||||
if (condition != null) {
|
||||
condition.accept(this);
|
||||
}
|
||||
} else {
|
||||
SQLName identName = x.getTableName();
|
||||
if (identName != null) {
|
||||
|
||||
@@ -76,6 +76,9 @@ public class DruidDeleteParser extends DruidModifyParser {
|
||||
//try to route to single Node for each table
|
||||
routeShardingNodes = checkForSingleNodeTable(rrs, service.getCharset().getClient());
|
||||
}
|
||||
if (routeShardingNodes == null) {
|
||||
throw new SQLNonTransientException(getErrorMsg());
|
||||
}
|
||||
|
||||
if (ctx.getTables().isEmpty()) {
|
||||
RouterUtil.routeToMultiNode(false, rrs, routeShardingNodes, true, tableSet);
|
||||
|
||||
@@ -409,7 +409,7 @@ abstract class DruidModifyParser extends DefaultDruidParser {
|
||||
|
||||
|
||||
if (involvedNodeSet.size() > 1 || currentNode == null) {
|
||||
throw new SQLNonTransientException(getErrorMsg());
|
||||
return null;
|
||||
}
|
||||
|
||||
//check for table remain
|
||||
|
||||
@@ -5,10 +5,12 @@
|
||||
|
||||
package com.actiontech.dble.route.parser.druid.impl;
|
||||
|
||||
import com.actiontech.dble.DbleServer;
|
||||
import com.actiontech.dble.config.model.sharding.SchemaConfig;
|
||||
import com.actiontech.dble.config.model.sharding.table.*;
|
||||
import com.actiontech.dble.config.privileges.ShardingPrivileges;
|
||||
import com.actiontech.dble.route.RouteResultset;
|
||||
import com.actiontech.dble.route.parser.druid.ERRelation;
|
||||
import com.actiontech.dble.route.parser.druid.ServerSchemaStatVisitor;
|
||||
import com.actiontech.dble.route.parser.util.Pair;
|
||||
import com.actiontech.dble.route.util.RouterUtil;
|
||||
@@ -23,14 +25,13 @@ import com.alibaba.druid.sql.ast.SQLStatement;
|
||||
import com.alibaba.druid.sql.ast.expr.*;
|
||||
import com.alibaba.druid.sql.ast.statement.*;
|
||||
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlUpdateStatement;
|
||||
import com.alibaba.druid.stat.TableStat;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.sql.SQLNonTransientException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* see http://dev.mysql.com/doc/refman/5.7/en/update.html
|
||||
@@ -39,7 +40,7 @@ import java.util.Set;
|
||||
*/
|
||||
public class DruidUpdateParser extends DruidModifyParser {
|
||||
|
||||
private static final String MODIFY_SQL_NOT_SUPPORT_MESSAGE = "This `Complex Update Syntax` is not supported!";
|
||||
public static final String MODIFY_SQL_NOT_SUPPORT_MESSAGE = "This `Complex Update Syntax` is not supported!";
|
||||
|
||||
@Override
|
||||
public SchemaConfig visitorParse(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, ServerSchemaStatVisitor visitor, ShardingService service, boolean isExplain)
|
||||
@@ -73,14 +74,18 @@ public class DruidUpdateParser extends DruidModifyParser {
|
||||
//try to route to single Node for each table
|
||||
routeShardingNodes = checkForSingleNodeTable(rrs, service.getCharset().getClient());
|
||||
}
|
||||
|
||||
if (ctx.getTables().isEmpty()) {
|
||||
RouterUtil.routeToMultiNode(false, rrs, routeShardingNodes, true, tableSet);
|
||||
if (routeShardingNodes != null && !routeShardingNodes.isEmpty()) {
|
||||
if (ctx.getTables().isEmpty()) {
|
||||
RouterUtil.routeToMultiNode(false, rrs, routeShardingNodes, true, tableSet);
|
||||
} else {
|
||||
RouterUtil.routeToMultiNode(false, rrs, routeShardingNodes, true, ctx.getTables());
|
||||
}
|
||||
rrs.setFinishedRoute(true);
|
||||
return schema;
|
||||
} else {
|
||||
RouterUtil.routeToMultiNode(false, rrs, routeShardingNodes, true, ctx.getTables());
|
||||
tryRouteAsMerge(tableSource, visitor, schemaInfos, rrs, tableSet, stmt);
|
||||
return schema;
|
||||
}
|
||||
rrs.setFinishedRoute(true);
|
||||
return schema;
|
||||
} else {
|
||||
SchemaInfo schemaInfo = SchemaUtil.getSchemaInfo(service.getUser(), schemaName, (SQLExprTableSource) tableSource);
|
||||
if (!ShardingPrivileges.checkPrivilege(service.getUserConfig(), schemaInfo.getSchema(), schemaInfo.getTable(), ShardingPrivileges.CheckType.UPDATE)) {
|
||||
@@ -95,7 +100,7 @@ public class DruidUpdateParser extends DruidModifyParser {
|
||||
BaseTableConfig tc = schema.getTables().get(tableName);
|
||||
String noShardingNode = RouterUtil.isNoSharding(schema, tableName);
|
||||
|
||||
if (visitor.getFirstClassSubQueryList().size() > 0) {
|
||||
if (visitor.getSubQueryList().size() > 0) {
|
||||
routeForModifySubQueryList(rrs, tc, visitor, schema, service);
|
||||
return schema;
|
||||
} else if (noShardingNode != null) {
|
||||
@@ -134,6 +139,117 @@ public class DruidUpdateParser extends DruidModifyParser {
|
||||
return schema;
|
||||
}
|
||||
|
||||
private void tryRouteAsMerge(SQLTableSource tableSource, ServerSchemaStatVisitor visitor, List<SchemaInfo> schemaInfos, RouteResultset rrs, Set<String> tableSet, SQLStatement stmt) throws SQLNonTransientException {
|
||||
//UPDATE A,B WHERE A.shardingColumn = B.shardingColumn
|
||||
//A,B not subQuery
|
||||
boolean isContainsSubQuery = containsSubQuery(tableSource);
|
||||
if (isContainsSubQuery) {
|
||||
rrs.setSqlStatement(stmt);
|
||||
rrs.setNeedOptimizer(true);
|
||||
rrs.setFinishedRoute(true);
|
||||
return;
|
||||
}
|
||||
//A,B the association condition has an ER relationship
|
||||
boolean canMerge = canDoAsMerge(visitor, schemaInfos);
|
||||
if (!canMerge) {
|
||||
rrs.setSqlStatement(stmt);
|
||||
rrs.setNeedOptimizer(true);
|
||||
rrs.setFinishedRoute(true);
|
||||
return;
|
||||
}
|
||||
Map<Pair<String, String>, Set<String>> tablesRouteMap = new HashMap<>();
|
||||
for (Pair<String, String> table : ctx.getTables()) {
|
||||
SchemaConfig schemaConfig = DbleServer.getInstance().getConfig().getSchemas().get(table.getKey());
|
||||
String tableName = table.getValue();
|
||||
BaseTableConfig tableConfig = schemaConfig.getTables().get(tableName);
|
||||
if (tableConfig != null && !(tableConfig instanceof GlobalTableConfig) && tablesRouteMap.get(table) == null) {
|
||||
tablesRouteMap.put(table, new HashSet<>());
|
||||
tablesRouteMap.get(table).addAll(tableConfig.getShardingNodes());
|
||||
}
|
||||
}
|
||||
Set<String> retNodesSet = RouterUtil.retainRouteMap(tablesRouteMap);
|
||||
RouterUtil.routeToMultiNode(false, rrs, retNodesSet, false, tableSet);
|
||||
rrs.setFinishedRoute(true);
|
||||
}
|
||||
|
||||
private boolean containsSubQuery(SQLTableSource tableSource) {
|
||||
if (tableSource instanceof SQLJoinTableSource) {
|
||||
SQLJoinTableSource joinTableSource = (SQLJoinTableSource) tableSource;
|
||||
return containsSubQuery(joinTableSource.getLeft()) || containsSubQuery(joinTableSource.getRight());
|
||||
} else if (tableSource instanceof SQLSubqueryTableSource) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private boolean canDoAsMerge(ServerSchemaStatVisitor visitor, List<SchemaInfo> schemaInfos) {
|
||||
List<ERRelation> erRelationList = filterERRelation(visitor.getRelationships(), schemaInfos);
|
||||
/*
|
||||
before sorting:
|
||||
a-b
|
||||
c-d
|
||||
b-c
|
||||
after sorting:
|
||||
a-b
|
||||
b-c
|
||||
c-d
|
||||
*/
|
||||
Collections.sort(erRelationList);
|
||||
Set<ERTable> erTableSet = Sets.newHashSet();
|
||||
for (ERRelation erRelation : erRelationList) {
|
||||
if (erTableSet.isEmpty() || erTableSet.contains(erRelation.getLeft()) || erTableSet.contains(erRelation.getRight())) {
|
||||
erTableSet.add(erRelation.getLeft());
|
||||
erTableSet.add(erRelation.getRight());
|
||||
}
|
||||
}
|
||||
schemaInfos.removeIf(schemaInfo -> erTableSet
|
||||
.stream()
|
||||
.anyMatch(erTable -> StringUtil.equalsIgnoreCase(erTable.getSchema(), schemaInfo.getSchema()) && StringUtil.equalsIgnoreCase(erTable.getTable(), schemaInfo.getTable())));
|
||||
return schemaInfos.isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Filter out table associations with er relationships
|
||||
*
|
||||
* @param relationships
|
||||
* @param schemaInfos
|
||||
* @return result format:
|
||||
* a-b
|
||||
* c-d
|
||||
* b-c
|
||||
*/
|
||||
private List<ERRelation> filterERRelation(Set<TableStat.Relationship> relationships, List<SchemaInfo> schemaInfos) {
|
||||
List<ERRelation> erRelationList = Lists.newArrayList();
|
||||
for (TableStat.Relationship relationship : relationships) {
|
||||
TableStat.Column left = relationship.getLeft();
|
||||
TableStat.Column right = relationship.getRight();
|
||||
String leftTable = left.getTable();
|
||||
String rightTable = right.getTable();
|
||||
String leftSchema = null;
|
||||
String rightSchema = null;
|
||||
for (SchemaInfo schemaInfo : schemaInfos) {
|
||||
if (schemaInfo.getTableAlias().equalsIgnoreCase(leftTable)) {
|
||||
leftSchema = schemaInfo.getSchema();
|
||||
leftTable = schemaInfo.getTable();
|
||||
} else if (schemaInfo.getTableAlias().equalsIgnoreCase(rightTable)) {
|
||||
rightSchema = schemaInfo.getSchema();
|
||||
rightTable = schemaInfo.getTable();
|
||||
}
|
||||
}
|
||||
ERTable leftERTable = new ERTable(leftSchema, leftTable, left.getName());
|
||||
ERTable rightERTable = new ERTable(rightSchema, rightTable, right.getName());
|
||||
Set<ERTable> erList = DbleServer.getInstance().getConfig().getErRelations().get(leftERTable);
|
||||
if (erList != null && !erList.isEmpty()) {
|
||||
boolean contains = erList.contains(rightERTable);
|
||||
if (contains) {
|
||||
int compare = leftERTable.compareTo(rightERTable);
|
||||
erRelationList.add(new ERRelation(compare < 0 ? leftERTable : rightERTable, compare < 0 ? rightERTable : leftERTable));
|
||||
}
|
||||
}
|
||||
}
|
||||
return erRelationList;
|
||||
}
|
||||
|
||||
|
||||
private static boolean columnInExpr(SQLExpr sqlExpr, String colName) throws SQLNonTransientException {
|
||||
String column;
|
||||
|
||||
@@ -8,6 +8,7 @@ package com.actiontech.dble.route.util;
|
||||
import com.actiontech.dble.DbleServer;
|
||||
import com.actiontech.dble.backend.datasource.ShardingNode;
|
||||
import com.actiontech.dble.backend.mysql.CharsetUtil;
|
||||
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.subquery.UpdateSubQueryHandler;
|
||||
import com.actiontech.dble.config.ErrorCode;
|
||||
import com.actiontech.dble.config.model.sharding.SchemaConfig;
|
||||
import com.actiontech.dble.config.model.sharding.table.*;
|
||||
@@ -464,7 +465,7 @@ public final class RouterUtil {
|
||||
String value = (String) originValue;
|
||||
//for explain
|
||||
if (NEED_REPLACE.equals(value) || ALL_SUB_QUERY_RESULTS.equals(value) ||
|
||||
MIN_SUB_QUERY_RESULTS.equals(value) || MAX_SUB_QUERY_RESULTS.equals(value)) {
|
||||
MIN_SUB_QUERY_RESULTS.equals(value) || MAX_SUB_QUERY_RESULTS.equals(value) || UpdateSubQueryHandler.NEED_REPLACE.equals(value)) {
|
||||
return routeNodeSet;
|
||||
}
|
||||
if (!ignoreNull || !value.equalsIgnoreCase("null")) {
|
||||
@@ -986,7 +987,7 @@ public final class RouterUtil {
|
||||
|
||||
}
|
||||
|
||||
private static Set<String> retainRouteMap(Map<Pair<String, String>, Set<String>> tablesRouteMap) throws SQLNonTransientException {
|
||||
public static Set<String> retainRouteMap(Map<Pair<String, String>, Set<String>> tablesRouteMap) throws SQLNonTransientException {
|
||||
Set<String> retNodesSet = new HashSet<>();
|
||||
boolean isFirstAdd = true;
|
||||
for (Map.Entry<Pair<String, String>, Set<String>> entry : tablesRouteMap.entrySet()) {
|
||||
|
||||
@@ -33,10 +33,13 @@ import com.actiontech.dble.net.mysql.MySQLPacket;
|
||||
import com.actiontech.dble.plan.common.exception.MySQLOutPutException;
|
||||
import com.actiontech.dble.plan.node.PlanNode;
|
||||
import com.actiontech.dble.plan.optimizer.MyOptimizer;
|
||||
import com.actiontech.dble.plan.optimizer.SelectedProcessor;
|
||||
import com.actiontech.dble.plan.util.PlanUtil;
|
||||
import com.actiontech.dble.plan.visitor.MySQLPlanNodeVisitor;
|
||||
import com.actiontech.dble.plan.visitor.UpdatePlanNodeVisitor;
|
||||
import com.actiontech.dble.route.RouteResultset;
|
||||
import com.actiontech.dble.route.RouteResultsetNode;
|
||||
import com.actiontech.dble.route.parser.druid.impl.DruidUpdateParser;
|
||||
import com.actiontech.dble.route.parser.util.ParseUtil;
|
||||
import com.actiontech.dble.server.parser.ServerParse;
|
||||
import com.actiontech.dble.server.status.LoadDataBatch;
|
||||
@@ -53,6 +56,8 @@ import com.actiontech.dble.statistic.stat.QueryTimeCost;
|
||||
import com.actiontech.dble.statistic.stat.QueryTimeCostContainer;
|
||||
import com.actiontech.dble.util.exception.NeedDelayedException;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLUpdateStatement;
|
||||
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlUpdateStatement;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@@ -423,8 +428,13 @@ public class NonBlockingSession extends Session {
|
||||
if (nodes == null || nodes.length == 0 || nodes[0].getName() == null || nodes[0].getName().equals("")) {
|
||||
if (rrs.isNeedOptimizer()) {
|
||||
try {
|
||||
this.complexRrs = rrs;
|
||||
executeMultiSelect(rrs);
|
||||
if (rrs.getSqlStatement() instanceof SQLSelectStatement) {
|
||||
this.complexRrs = rrs;
|
||||
executeMultiSelect(rrs);
|
||||
} else if (rrs.getSqlStatement() instanceof SQLUpdateStatement) {
|
||||
this.complexRrs = rrs;
|
||||
executeMultiUpdate(rrs);
|
||||
}
|
||||
} catch (MySQLOutPutException e) {
|
||||
LOGGER.warn("execute complex sql cause error", e);
|
||||
shardingService.writeErrMessage(e.getSqlState(), e.getMessage(), e.getErrorCode());
|
||||
@@ -617,6 +627,45 @@ public class NonBlockingSession extends Session {
|
||||
}
|
||||
}
|
||||
|
||||
public void executeMultiUpdate(RouteResultset rrs) {
|
||||
TraceManager.TraceObject traceObject = TraceManager.serviceTrace(shardingService, "try-complex-update");
|
||||
try {
|
||||
MySqlUpdateStatement ast = (MySqlUpdateStatement) rrs.getSqlStatement();
|
||||
UpdatePlanNodeVisitor visitor = new UpdatePlanNodeVisitor(shardingService.getSchema(), shardingService.getCharset().getResultsIndex(), ProxyMeta.getInstance().getTmManager(), false, shardingService.getUsrVariables(), rrs.getHintPlanInfo());
|
||||
visitor.visit(ast);
|
||||
PlanNode node = visitor.getTableNode();
|
||||
if (node.isCorrelatedSubQuery()) {
|
||||
throw new MySQLOutPutException(ErrorCode.ER_UNKNOWN_ERROR, "", "Correlated Sub Queries is not supported ");
|
||||
}
|
||||
node.setSql(rrs.getStatement());
|
||||
node.setUpFields();
|
||||
PlanUtil.checkTablesPrivilege(shardingService, node, ast);
|
||||
//sub query
|
||||
node = SelectedProcessor.optimize(node);
|
||||
|
||||
if (PauseShardingNodeManager.getInstance().getIsPausing().get() &&
|
||||
!PauseShardingNodeManager.getInstance().checkTarget(target) &&
|
||||
PauseShardingNodeManager.getInstance().checkReferredTableNodes(node.getReferedTableNodes())) {
|
||||
if (PauseShardingNodeManager.getInstance().waitForResume(rrs, this.shardingService, CONTINUE_TYPE_MULTIPLE)) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
setPreExecuteEnd(TraceResult.SqlTraceType.COMPLEX_MODIFY);
|
||||
if (PlanUtil.containsSubQuery(node)) {
|
||||
setSubQuery();
|
||||
final PlanNode finalNode = node;
|
||||
//sub Query build will be blocked, so use ComplexQueryExecutor
|
||||
DbleServer.getInstance().getComplexQueryExecutor().execute(() -> {
|
||||
executeMultiResultSet(rrs, finalNode);
|
||||
});
|
||||
} else {
|
||||
throw new MySQLOutPutException(ErrorCode.ER_UNKNOWN_ERROR, "", DruidUpdateParser.MODIFY_SQL_NOT_SUPPORT_MESSAGE);
|
||||
}
|
||||
} finally {
|
||||
TraceManager.finishSpan(shardingService, traceObject);
|
||||
}
|
||||
}
|
||||
|
||||
private void addMetaLock(RouteResultset rrs) throws SQLNonTransientException {
|
||||
// filtering: hint ddl、online ddl 、create/drop/alter/replace view、create database、create table、lock table/
|
||||
if (rrs.getSchema() == null || rrs.isOnline() ||
|
||||
|
||||
@@ -23,10 +23,12 @@ import com.actiontech.dble.config.model.sharding.table.ShardingTableConfig;
|
||||
import com.actiontech.dble.net.mysql.*;
|
||||
import com.actiontech.dble.plan.node.PlanNode;
|
||||
import com.actiontech.dble.plan.optimizer.MyOptimizer;
|
||||
import com.actiontech.dble.plan.optimizer.SelectedProcessor;
|
||||
import com.actiontech.dble.plan.util.ComplexQueryPlanUtil;
|
||||
import com.actiontech.dble.plan.util.PlanUtil;
|
||||
import com.actiontech.dble.plan.util.ReferenceHandlerInfo;
|
||||
import com.actiontech.dble.plan.visitor.MySQLPlanNodeVisitor;
|
||||
import com.actiontech.dble.plan.visitor.UpdatePlanNodeVisitor;
|
||||
import com.actiontech.dble.route.RouteResultset;
|
||||
import com.actiontech.dble.route.RouteResultsetNode;
|
||||
import com.actiontech.dble.server.parser.ServerParse;
|
||||
@@ -36,8 +38,10 @@ import com.actiontech.dble.services.mysqlsharding.ShardingService;
|
||||
import com.actiontech.dble.singleton.ProxyMeta;
|
||||
import com.actiontech.dble.singleton.RouteService;
|
||||
import com.actiontech.dble.util.StringUtil;
|
||||
import com.alibaba.druid.sql.ast.SQLStatement;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLExprTableSource;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLSelectStatement;
|
||||
import com.alibaba.druid.sql.ast.statement.SQLUpdateStatement;
|
||||
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlInsertStatement;
|
||||
import com.alibaba.druid.sql.dialect.mysql.parser.MySqlStatementParser;
|
||||
import com.alibaba.druid.sql.parser.SQLStatementParser;
|
||||
@@ -86,17 +90,40 @@ public final class ExplainHandler {
|
||||
}
|
||||
|
||||
private static BaseHandlerBuilder buildNodes(RouteResultset rrs, ShardingService service) {
|
||||
SQLSelectStatement ast = (SQLSelectStatement) rrs.getSqlStatement();
|
||||
MySQLPlanNodeVisitor visitor = new MySQLPlanNodeVisitor(service.getSchema(), service.getCharset().getResultsIndex(), ProxyMeta.getInstance().getTmManager(), false, service.getUsrVariables(), rrs.getHintPlanInfo());
|
||||
visitor.visit(ast);
|
||||
SQLStatement sqlStatement = rrs.getSqlStatement();
|
||||
if (sqlStatement instanceof SQLSelectStatement) {
|
||||
return buildSelectNodes((SQLSelectStatement) sqlStatement, service, rrs);
|
||||
} else if (sqlStatement instanceof SQLUpdateStatement) {
|
||||
return buildUpdateNodes((SQLUpdateStatement) sqlStatement, service, rrs);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private static BaseHandlerBuilder buildUpdateNodes(SQLUpdateStatement sqlStatement, ShardingService service, RouteResultset rrs) {
|
||||
UpdatePlanNodeVisitor visitor = new UpdatePlanNodeVisitor(service.getSchema(), service.getCharset().getResultsIndex(), ProxyMeta.getInstance().getTmManager(), false, service.getUsrVariables(), rrs.getHintPlanInfo());
|
||||
visitor.visit(sqlStatement);
|
||||
PlanNode node = visitor.getTableNode();
|
||||
node.setSql(rrs.getStatement());
|
||||
node.setUpFields();
|
||||
PlanUtil.checkTablesPrivilege(service, node, ast);
|
||||
PlanUtil.checkTablesPrivilege(service, node, sqlStatement);
|
||||
//sub query
|
||||
node = SelectedProcessor.optimize(node);
|
||||
|
||||
HandlerBuilder builder = new HandlerBuilder(node, service.getSession2());
|
||||
return builder.getBuilder(service.getSession2(), node, true);
|
||||
}
|
||||
|
||||
private static BaseHandlerBuilder buildSelectNodes(SQLSelectStatement sqlStatement, ShardingService service, RouteResultset rrs) {
|
||||
MySQLPlanNodeVisitor visitor = new MySQLPlanNodeVisitor(service.getSchema(), service.getCharset().getResultsIndex(), ProxyMeta.getInstance().getTmManager(), false, service.getUsrVariables(), rrs.getHintPlanInfo());
|
||||
visitor.visit(sqlStatement);
|
||||
PlanNode node = visitor.getTableNode();
|
||||
node.setSql(rrs.getStatement());
|
||||
node.setUpFields();
|
||||
PlanUtil.checkTablesPrivilege(service, node, sqlStatement);
|
||||
node = MyOptimizer.optimize(node, rrs.getHintPlanInfo());
|
||||
|
||||
if (!PlanUtil.containsSubQuery(node) && !visitor.isContainSchema()) {
|
||||
node.setAst(ast);
|
||||
node.setAst(sqlStatement);
|
||||
}
|
||||
HandlerBuilder builder = new HandlerBuilder(node, service.getSession2());
|
||||
return builder.getBuilder(service.getSession2(), node, true);
|
||||
@@ -278,9 +305,9 @@ public final class ExplainHandler {
|
||||
List<ReferenceHandlerInfo> results = ComplexQueryPlanUtil.getComplexQueryResult(builder);
|
||||
for (ReferenceHandlerInfo result : results) {
|
||||
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
|
||||
row.add(StringUtil.encode(result.getName(), service.getCharset().getResults()));
|
||||
row.add(StringUtil.encode(result.getType(), service.getCharset().getResults()));
|
||||
row.add(StringUtil.encode(result.getRefOrSQL(), service.getCharset().getResults()));
|
||||
row.add(StringUtil.encode(getRowStr(result.getName(), result.isIndentation()), service.getCharset().getResults()));
|
||||
row.add(StringUtil.encode(getRowStr(result.getType(), result.isIndentation()), service.getCharset().getResults()));
|
||||
row.add(StringUtil.encode(getRowStr(result.getRefOrSQL(), result.isIndentation()), service.getCharset().getResults()));
|
||||
row.setPacketId(++packetId);
|
||||
buffer = row.write(buffer, service, true);
|
||||
}
|
||||
@@ -291,4 +318,9 @@ public final class ExplainHandler {
|
||||
lastEof.setPacketId(++packetId);
|
||||
lastEof.write(buffer, service);
|
||||
}
|
||||
|
||||
private static String getRowStr(String content, boolean indentation) {
|
||||
String indentationStr = "------ ";
|
||||
return indentation ? indentationStr + content : content;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ public class TraceResult implements Cloneable {
|
||||
|
||||
|
||||
public enum SqlTraceType {
|
||||
SINGLE_NODE_QUERY, MULTI_NODE_QUERY, MULTI_NODE_GROUP, COMPLEX_QUERY, SIMPLE_QUERY
|
||||
SINGLE_NODE_QUERY, MULTI_NODE_QUERY, MULTI_NODE_GROUP, COMPLEX_QUERY, SIMPLE_QUERY, COMPLEX_MODIFY
|
||||
}
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(TraceResult.class);
|
||||
|
||||
@@ -233,3 +233,4 @@
|
||||
-DsqlDumpLogTimeBasedRotate=1
|
||||
-DsqlDumpLogDeleteFileAge=90d
|
||||
-DsqlDumpLogCompressFilePath=*/sqldump-*.log.gz
|
||||
-DqueryForUpdateMaxRowsSize=20000
|
||||
|
||||
Reference in New Issue
Block a user