[inner-1335&1788&1792] optimizer group by

This commit is contained in:
wd2365151147
2022-06-20 17:22:31 +08:00
parent ada1c2a8e3
commit 13c0074466
5 changed files with 192 additions and 38 deletions

View File

@@ -5,7 +5,12 @@
package com.actiontech.dble.backend.mysql;
import javax.annotation.Nullable;
public final class VersionUtil {
private static final String VERSION_8 = "8.";
private static final String VERSION_5 = "5.";
private VersionUtil() {
}
@@ -23,4 +28,20 @@ public final class VersionUtil {
return TX_ISOLATION;
}
}
@Nullable
public static Integer getMajorVersionWithoutDefaultValue(String version) {
if (version.startsWith(VERSION_8)) {
return 8;
} else if (version.startsWith(VERSION_5) || version.contains("MariaDB")) {
return 5;
} else {
return null;
}
}
public static boolean isMysql8(String version) {
final Integer versionNumber = VersionUtil.getMajorVersionWithoutDefaultValue(version);
return versionNumber == 8;
}
}

View File

@@ -6,7 +6,9 @@ package com.actiontech.dble.backend.mysql.nio.handler;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.mysql.nio.handler.builder.BaseHandlerBuilder;
import com.actiontech.dble.backend.mysql.nio.handler.query.BaseDMLHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.OutputHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.SendMakeHandler;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.AutoTxOperation;
import com.actiontech.dble.backend.mysql.nio.handler.util.ArrayMinHeap;
import com.actiontech.dble.backend.mysql.nio.handler.util.HandlerTool;
@@ -22,6 +24,7 @@ import com.actiontech.dble.route.RouteResultset;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.singleton.TraceManager;
import com.actiontech.dble.util.CollectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,32 +42,44 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
private final int queueSize;
private Map<MySQLResponseService, BlockingQueue<HeapItem>> queues;
private RowDataComparator rowComparator;
private OutputHandler outputHandler;
private BaseDMLHandler nextHandler;
private volatile boolean noNeedRows = false;
public MultiNodeSelectHandler(RouteResultset rrs, NonBlockingSession session) {
super(rrs, session, false);
this.queueSize = SystemConfig.getInstance().getMergeQueueSize();
this.queues = new ConcurrentHashMap<>();
outputHandler = new OutputHandler(BaseHandlerBuilder.getSequenceId(), session);
if (CollectionUtil.isEmpty(rrs.getSelectCols())) {
nextHandler = new OutputHandler(BaseHandlerBuilder.getSequenceId(), session);
} else {
nextHandler = new SendMakeHandler(BaseHandlerBuilder.getSequenceId(), session, rrs.getSelectCols(), rrs.getSchema(), rrs.getTable(), rrs.getTableAlias());
nextHandler.setNextHandler(new OutputHandler(BaseHandlerBuilder.getSequenceId(), session));
}
}
void nextHandlerCleanBuffer() {
if (nextHandler instanceof OutputHandler) {
((OutputHandler) nextHandler).cleanBuffer();
} else if (nextHandler instanceof SendMakeHandler) {
((SendMakeHandler) nextHandler).cleanBuffer();
}
}
@Override
public void connectionClose(AbstractService service, String reason) {
outputHandler.cleanBuffer();
nextHandlerCleanBuffer();
super.connectionClose(service, reason);
}
@Override
public void connectionError(Throwable e, Object attachment) {
outputHandler.cleanBuffer();
nextHandlerCleanBuffer();
super.connectionError(e, attachment);
}
@Override
public void errorResponse(byte[] data, AbstractService service) {
outputHandler.cleanBuffer();
nextHandlerCleanBuffer();
super.errorResponse(data, service);
}
@@ -83,7 +98,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
}
@Override
public void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacket> fieldPacketsNull, byte[] eof,
public void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacket> fieldPackets, byte[] eof,
boolean isLeft, AbstractService service) {
queues.put((MySQLResponseService) service, new LinkedBlockingQueue<>(queueSize));
lock.lock();
@@ -166,7 +181,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
orderBys.add(new Order(itemField));
}
rowComparator = new RowDataComparator(HandlerTool.createFields(fieldPackets), orderBys);
outputHandler.fieldEofResponse(null, null, fieldPackets, null, false, service);
nextHandler.fieldEofResponse(null, null, fieldPackets, null, false, service);
}
private void startOwnThread() {
@@ -223,7 +238,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
continue;
}
}
outputHandler.rowResponse(top.getRowData(), top.getRowPacket(), false, top.getIndex());
nextHandler.rowResponse(top.getRowData(), top.getRowPacket(), false, top.getIndex());
}
}
Iterator<Map.Entry<MySQLResponseService, BlockingQueue<HeapItem>>> iterator = this.queues.entrySet().iterator();
@@ -234,7 +249,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
iterator.remove();
}
doSqlStat();
outputHandler.rowEofResponse(null, false, null);
nextHandler.rowEofResponse(null, false, null);
} catch (Exception e) {
String msg = "Merge thread error, " + e.getLocalizedMessage();
LOGGER.info(msg, e);

View File

@@ -125,5 +125,9 @@ public class SendMakeHandler extends BaseDMLHandler {
public void onTerminate() {
}
public void cleanBuffer() {
if (nextHandler instanceof OutputHandler) {
((OutputHandler) nextHandler).cleanBuffer();
}
}
}

View File

@@ -6,6 +6,7 @@
package com.actiontech.dble.route;
import com.actiontech.dble.cluster.values.DDLInfo;
import com.actiontech.dble.plan.common.item.Item;
import com.actiontech.dble.util.FormatUtil;
import com.alibaba.druid.sql.ast.SQLStatement;
import org.slf4j.Logger;
@@ -61,7 +62,11 @@ public final class RouteResultset implements Serializable {
// if force master,set canRunInReadDB=false
// if force slave set runOnSlave,default null means not effect
private Boolean runOnSlave = null;
private String[] groupByCols;
private transient List<Item> selectCols;
private boolean groupByColsHasShardingCols;
private boolean routePenetration = false;
@@ -83,6 +88,22 @@ public final class RouteResultset implements Serializable {
this.groupByCols = groupByCols;
}
public List<Item> getSelectCols() {
return selectCols;
}
public void setSelectCols(List<Item> selectCols) {
this.selectCols = selectCols;
}
public boolean isGroupByColsHasShardingCols() {
return groupByColsHasShardingCols;
}
public void setGroupByColsHasShardingCols(boolean groupByColsHasShardingCols) {
this.groupByColsHasShardingCols = groupByColsHasShardingCols;
}
public boolean isNeedOptimizer() {
return needOptimizer;
}

View File

@@ -7,7 +7,9 @@ package com.actiontech.dble.route.parser.druid.impl;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.mysql.CharsetUtil;
import com.actiontech.dble.backend.mysql.VersionUtil;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.config.model.sharding.SchemaConfig;
import com.actiontech.dble.config.model.sharding.table.BaseTableConfig;
import com.actiontech.dble.config.model.sharding.table.GlobalTableConfig;
@@ -17,6 +19,7 @@ import com.actiontech.dble.config.privileges.ShardingPrivileges.CheckType;
import com.actiontech.dble.meta.ColumnMeta;
import com.actiontech.dble.meta.TableMeta;
import com.actiontech.dble.plan.common.item.Item;
import com.actiontech.dble.plan.common.item.ItemField;
import com.actiontech.dble.plan.common.item.function.ItemCreate;
import com.actiontech.dble.plan.common.ptr.StringPtr;
import com.actiontech.dble.plan.visitor.MySQLItemVisitor;
@@ -33,6 +36,7 @@ import com.actiontech.dble.server.util.SchemaUtil.SchemaInfo;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.singleton.ProxyMeta;
import com.actiontech.dble.sqlengine.mpp.ColumnRoute;
import com.actiontech.dble.util.CollectionUtil;
import com.actiontech.dble.util.StringUtil;
import com.alibaba.druid.sql.ast.*;
import com.alibaba.druid.sql.ast.expr.*;
@@ -184,7 +188,7 @@ public class DruidSelectParser extends DefaultDruidParser {
}
}
//if the sql involved node more than 1 ,Aggregate function/Group by/Order by should use complexQuery
parseOrderAggGroupMysql(schema, selectStmt, rrs, mysqlSelectQuery, tc);
parseOrderAggGroupMysql(service, schema, selectStmt, rrs, mysqlSelectQuery, tc);
if (rrs.isNeedOptimizer()) {
rrs.setNodes(null);
return;
@@ -336,7 +340,7 @@ public class DruidSelectParser extends DefaultDruidParser {
}
private void parseOrderAggGroupMysql(SchemaConfig schema, SQLStatement stmt, RouteResultset rrs,
private void parseOrderAggGroupMysql(ShardingService service, SchemaConfig schema, SQLStatement stmt, RouteResultset rrs,
MySqlSelectQueryBlock mysqlSelectQuery, BaseTableConfig tc) throws SQLException {
//simple merge of ORDER BY has bugs,so optimizer here
if (mysqlSelectQuery.getOrderBy() != null) {
@@ -345,10 +349,10 @@ public class DruidSelectParser extends DefaultDruidParser {
rrs.setNeedOptimizer(true);
return;
}
parseAggGroupCommon(schema, stmt, rrs, mysqlSelectQuery, tc);
parseAggGroupCommon(service, schema, stmt, rrs, mysqlSelectQuery, tc);
}
private void parseAggExprCommon(SchemaConfig schema, RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery, Map<String, String> aliaColumns, BaseTableConfig tc, boolean isDistinct) throws SQLException {
private void parseAggExprCommon(SchemaConfig schema, RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery, List<Pair<String, String>> selectColumns, Map<String, String> aliaColumns, BaseTableConfig tc, boolean isDistinct) throws SQLException {
List<SQLSelectItem> selectList = mysqlSelectQuery.getSelectList();
boolean hasPartitionColumn = false;
for (SQLSelectItem selectItem : selectList) {
@@ -373,7 +377,7 @@ public class DruidSelectParser extends DefaultDruidParser {
rrs.setNeedOptimizer(true);
return;
} else {
addToAliaColumn(aliaColumns, selectItem);
addToAliaColumn(selectColumns, aliaColumns, selectItem);
}
} else if (itemExpr instanceof SQLAllColumnExpr) {
TableMeta tbMeta = ProxyMeta.getInstance().getTmManager().getSyncTableMeta(schema.getName(), tc.getName());
@@ -384,23 +388,26 @@ public class DruidSelectParser extends DefaultDruidParser {
}
for (ColumnMeta column : tbMeta.getColumns()) {
aliaColumns.put(column.getName(), column.getName());
Pair<String, String> selectCol = new Pair<>(column.getName(), column.getName());
selectColumns.add(selectCol);
}
} else {
if (isDistinct && !isNeedOptimizer(itemExpr)) {
if (itemExpr instanceof SQLIdentifierExpr) {
SQLIdentifierExpr item = (SQLIdentifierExpr) itemExpr;
if (hasShardingColumn(tc, item.getSimpleName())) hasPartitionColumn = true;
addToAliaColumn(aliaColumns, selectItem);
addToAliaColumn(selectColumns, aliaColumns, selectItem);
} else if (itemExpr instanceof SQLPropertyExpr) {
SQLPropertyExpr item = (SQLPropertyExpr) itemExpr;
if (hasShardingColumn(tc, item.getSimpleName())) hasPartitionColumn = true;
addToAliaColumn(aliaColumns, selectItem);
addToAliaColumn(selectColumns, aliaColumns, selectItem);
}
} else if (isSumFuncOrSubQuery(schema.getName(), itemExpr)) {
rrs.setNeedOptimizer(true);
return;
} else {
addToAliaColumn(aliaColumns, selectItem);
addToAliaColumn(selectColumns, aliaColumns, selectItem);
}
}
}
@@ -408,48 +415,91 @@ public class DruidSelectParser extends DefaultDruidParser {
rrs.setNeedOptimizer(true);
return;
}
parseGroupCommon(rrs, mysqlSelectQuery, tc);
parseGroupCommon(rrs, mysqlSelectQuery, aliaColumns, tc);
}
private void parseGroupCommon(RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery, BaseTableConfig tc) {
private void parseGroupCommon(RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery, Map<String, String> aliaColumns, BaseTableConfig tc) {
if (mysqlSelectQuery.getGroupBy() != null) {
SQLSelectGroupByClause groupBy = mysqlSelectQuery.getGroupBy();
boolean hasPartitionColumn = false;
SQLExpr partitionColumn = null;
for (SQLExpr groupByItem : groupBy.getItems()) {
if (isNeedOptimizer(groupByItem)) {
rrs.setNeedOptimizer(true);
return;
} else if (groupByItem instanceof SQLIdentifierExpr) {
SQLIdentifierExpr item = (SQLIdentifierExpr) groupByItem;
hasPartitionColumn = hasShardingColumn(tc, item.getSimpleName());
if (hasShardingColumnWithAlia(tc, StringUtil.removeBackQuote(item.getSimpleName()), aliaColumns)) {
partitionColumn = item;
break;
}
} else if (groupByItem instanceof SQLPropertyExpr) {
SQLPropertyExpr item = (SQLPropertyExpr) groupByItem;
hasPartitionColumn = hasShardingColumn(tc, item.getSimpleName());
if (hasShardingColumnWithAlia(tc, StringUtil.removeBackQuote(item.getSimpleName()), aliaColumns)) {
partitionColumn = item;
break;
}
}
}
if (groupBy.getItems().size() > 0 && !hasPartitionColumn) {
if (groupBy.getItems().size() > 0 && partitionColumn == null) {
rrs.setNeedOptimizer(true);
return;
}
if (groupBy.getItems().size() == 0 && groupBy.getHaving() != null) {
// only having filter need optimizer
rrs.setNeedOptimizer(true);
return;
}
rrs.setGroupByColsHasShardingCols(partitionColumn != null);
}
}
private Set<SQLSelectItem> groupColumnPushSelectList(List<SQLExpr> groupByItemList, List<Pair<String, String>> selectColumns) {
Set<SQLSelectItem> pushItem = new HashSet<>();
for (SQLExpr groupByItem : groupByItemList) {
String groupColumnName = null;
if (groupByItem instanceof SQLIdentifierExpr) {
groupColumnName = ((SQLIdentifierExpr) groupByItem).getSimpleName();
} else if (groupByItem instanceof SQLPropertyExpr) {
groupColumnName = ((SQLPropertyExpr) groupByItem).getSimpleName();
}
if (!StringUtil.isEmpty(groupColumnName)) {
if (!hasColumnOrAlia(StringUtil.removeBackQuote(groupColumnName), selectColumns)) {
pushItem.add(new SQLSelectItem(groupByItem));
}
}
}
return pushItem;
}
private boolean hasColumnOrAlia(String columnName, List<Pair<String, String>> selectColumns) {
return selectColumns.stream().anyMatch(s -> s.getKey().equalsIgnoreCase(columnName) || s.getValue().equalsIgnoreCase(columnName));
}
private boolean hasShardingColumn(BaseTableConfig tc, String columnName) {
return tc instanceof ShardingTableConfig && columnName.equalsIgnoreCase(((ShardingTableConfig) tc).getShardingColumn());
}
private boolean hasShardingColumnWithAlia(BaseTableConfig tc, String columnName, Map<String, String> aliaColumns) {
String shardingColumn = ((ShardingTableConfig) tc).getShardingColumn();
boolean isShardingColumn = tc instanceof ShardingTableConfig && columnName.equalsIgnoreCase(shardingColumn);
if (!isShardingColumn) {
Optional<Map.Entry<String, String>> alias = aliaColumns.entrySet().stream().filter(c -> c.getKey().toUpperCase().equals(shardingColumn)).findFirst();
if (alias.isPresent()) {
isShardingColumn = tc instanceof ShardingTableConfig && alias.get().getValue().equalsIgnoreCase(columnName);
}
}
return isShardingColumn;
}
private boolean isSumFuncOrSubQuery(String schema, SQLExpr itemExpr) {
MySQLItemVisitor ev = new MySQLItemVisitor(schema, CharsetUtil.getCharsetDefaultIndex("utf8mb4"), ProxyMeta.getInstance().getTmManager(), null);
itemExpr.accept(ev);
Item selItem = ev.getItem();
return contaisSumFuncOrSubquery(selItem);
return containSumFuncOrSubQuery(selItem);
}
private boolean contaisSumFuncOrSubquery(Item selItem) {
private boolean containSumFuncOrSubQuery(Item selItem) {
if (selItem.isWithSumFunc()) {
return true;
}
@@ -458,7 +508,7 @@ public class DruidSelectParser extends DefaultDruidParser {
}
if (selItem.getArgCount() > 0) {
for (Item child : selItem.arguments()) {
if (contaisSumFuncOrSubquery(child)) {
if (containSumFuncOrSubQuery(child)) {
return true;
}
}
@@ -473,13 +523,16 @@ public class DruidSelectParser extends DefaultDruidParser {
return !(expr instanceof SQLPropertyExpr) && !(expr instanceof SQLIdentifierExpr);
}
private void addToAliaColumn(Map<String, String> aliaColumns, SQLSelectItem item) {
private void addToAliaColumn(List<Pair<String, String>> selectColumns, Map<String, String> aliaColumns, SQLSelectItem item) {
String alia = item.getAlias();
String field = getFieldName(item);
if (alia == null) {
alia = field;
}
aliaColumns.put(field, alia);
Pair<String, String> selectCol = new Pair<String, String>(alia, field);
selectColumns.add(selectCol);
}
private String getFieldName(SQLSelectItem item) {
@@ -491,11 +544,12 @@ public class DruidSelectParser extends DefaultDruidParser {
}
}
private void parseAggGroupCommon(SchemaConfig schema, SQLStatement stmt, RouteResultset rrs,
private void parseAggGroupCommon(ShardingService service, SchemaConfig schema, SQLStatement stmt, RouteResultset rrs,
MySqlSelectQueryBlock mysqlSelectQuery, BaseTableConfig tc) throws SQLException {
Map<String, String> aliaColumns = new HashMap<>();
List<Pair<String, String>> selectColumns = new LinkedList<>();
boolean isDistinct = (mysqlSelectQuery.getDistionOption() == SQLSetQuantifier.DISTINCT) || (mysqlSelectQuery.getDistionOption() == SQLSetQuantifier.DISTINCTROW);
parseAggExprCommon(schema, rrs, mysqlSelectQuery, aliaColumns, tc, isDistinct);
parseAggExprCommon(schema, rrs, mysqlSelectQuery, selectColumns, aliaColumns, tc, isDistinct);
if (rrs.isNeedOptimizer()) {
tryAddLimit(tc, mysqlSelectQuery);
rrs.setSqlStatement(stmt);
@@ -512,18 +566,57 @@ public class DruidSelectParser extends DefaultDruidParser {
mysqlSelectQuery.setGroupBy(groupBy);
}
// setGroupByCols
if (mysqlSelectQuery.getGroupBy() != null) {
List<SQLExpr> groupByItems = mysqlSelectQuery.getGroupBy().getItems();
String[] groupByCols = buildGroupByCols(groupByItems, aliaColumns);
rrs.setGroupByCols(groupByCols);
}
boolean isGroupByColPushSelectList = tryGroupColumnPushSelectList(aliaColumns, selectColumns,
mysqlSelectQuery, rrs, service.getCharset().getResultsIndex());
if (isDistinct) {
rrs.changeNodeSqlAfterAddLimit(statementToString(stmt), 0, -1);
} else if (isGroupByColPushSelectList) {
rrs.changeNodeSqlAfterAddLimit(statementToString(stmt), rrs.getLimitStart(), rrs.getLimitSize());
}
}
/**
* when fakeMysqlVersion is 8.0, in 'group by' no longer has the semantics of 'order by'
*/
private boolean tryGroupColumnPushSelectList(Map<String, String> aliaColumns, List<Pair<String, String>> selectColumns,
MySqlSelectQueryBlock mysqlSelectQuery, RouteResultset rrs, int charsetIndex) {
boolean isGroupByColPushSelectList = false;
if (!VersionUtil.isMysql8(SystemConfig.getInstance().getFakeMySQLVersion()) &&
rrs.isGroupByColsHasShardingCols()) { // && mysqlSelectQuery.getGroupBy() != null
Set<SQLSelectItem> pushSelectList = groupColumnPushSelectList(mysqlSelectQuery.getGroupBy().getItems(), selectColumns);
if (!CollectionUtil.isEmpty(pushSelectList)) {
for (SQLSelectItem e : pushSelectList) {
mysqlSelectQuery.getSelectList().add(e);
}
isGroupByColPushSelectList = true;
}
// setGroupByCols
List<SQLExpr> groupByItems = mysqlSelectQuery.getGroupBy().getItems();
String[] groupByCols = buildGroupByCols(groupByItems, aliaColumns);
rrs.setGroupByCols(groupByCols);
if (isGroupByColPushSelectList) {
rrs.setSelectCols(
handleSelectItems(selectColumns, rrs, charsetIndex));
}
}
return isGroupByColPushSelectList;
}
private LinkedList<Item> handleSelectItems(List<Pair<String, String>> selectList, RouteResultset rrs, int charsetIndex) {
LinkedList<Item> selectItems = new LinkedList<>();
for (Pair<String, String> sel : selectList) {
ItemField selItem = new ItemField(rrs.getSchema(), rrs.getTable(), StringUtil.removeBackQuote(sel.getValue()), charsetIndex);
selItem.setAlias(StringUtil.removeBackQuote(sel.getKey()));
selItem.setCharsetIndex(charsetIndex);
selectItems.add(selItem);
}
return selectItems;
}
private String getAliaColumn(Map<String, String> aliaColumns, String column) {
String alia = aliaColumns.get(column);
if (alia == null) {
@@ -582,7 +675,7 @@ public class DruidSelectParser extends DefaultDruidParser {
// get column from table.column
column = column.substring(dotIndex + 1);
}
groupByCols[i] = getAliaColumn(aliaColumns, column); // column;
groupByCols[i] = getAliaColumn(aliaColumns, StringUtil.removeBackQuote(column)); // column;
}
return groupByCols;
}