[inner-1335&1788&1792] optimizer group by (cherry pick) (#3353)

This commit is contained in:
wenyh
2022-08-15 09:48:08 +08:00
committed by GitHub
parent cc5131e823
commit e1fa2f411a
5 changed files with 205 additions and 43 deletions
@@ -5,7 +5,14 @@
package com.actiontech.dble.backend.mysql;
import com.actiontech.dble.DbleServer;
import javax.annotation.Nullable;
public final class VersionUtil {
private static final String VERSION_8 = "8.";
private static final String VERSION_5 = "5.";
private VersionUtil() {
}
@@ -23,4 +30,29 @@ public final class VersionUtil {
return TX_ISOLATION;
}
}
@Nullable
public static Integer getMajorVersionWithoutDefaultValue(String version) {
if (version.startsWith(VERSION_8)) {
return 8;
} else if (version.startsWith(VERSION_5) || version.contains("MariaDB")) {
return 5;
} else {
return null;
}
}
public static boolean isMysql8(String version) {
final Integer versionNumber = VersionUtil.getMajorVersionWithoutDefaultValue(version);
return versionNumber == 8;
}
public static boolean isMysql8() {
String version = DbleServer.getInstance().getConfig().getSystem().getFakeMySQLVersion();
if (version == null) {
version = VERSION_5;
}
final Integer versionNumber = VersionUtil.getMajorVersionWithoutDefaultValue(version);
return versionNumber == 8;
}
}
@@ -8,7 +8,9 @@ import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.BackendConnection;
import com.actiontech.dble.backend.mysql.nio.MySQLConnection;
import com.actiontech.dble.backend.mysql.nio.handler.builder.BaseHandlerBuilder;
import com.actiontech.dble.backend.mysql.nio.handler.query.BaseDMLHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.OutputHandler;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.SendMakeHandler;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.AutoTxOperation;
import com.actiontech.dble.backend.mysql.nio.handler.util.ArrayMinHeap;
import com.actiontech.dble.backend.mysql.nio.handler.util.HandlerTool;
@@ -20,6 +22,7 @@ import com.actiontech.dble.plan.Order;
import com.actiontech.dble.plan.common.item.ItemField;
import com.actiontech.dble.route.RouteResultset;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.util.CollectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,31 +40,44 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
private final int queueSize;
private Map<BackendConnection, BlockingQueue<HeapItem>> queues;
private RowDataComparator rowComparator;
private OutputHandler outputHandler;
private BaseDMLHandler nextHandler;
private volatile boolean noNeedRows = false;
public MultiNodeSelectHandler(RouteResultset rrs, NonBlockingSession session) {
super(rrs, session, false);
this.queueSize = DbleServer.getInstance().getConfig().getSystem().getMergeQueueSize();
this.queues = new ConcurrentHashMap<>();
outputHandler = new OutputHandler(BaseHandlerBuilder.getSequenceId(), session);
if (CollectionUtil.isEmpty(rrs.getSelectCols())) {
nextHandler = new OutputHandler(BaseHandlerBuilder.getSequenceId(), session);
} else {
nextHandler = new SendMakeHandler(BaseHandlerBuilder.getSequenceId(), session, rrs.getSelectCols(), rrs.getSchema(), rrs.getTable(), rrs.getTableAlias());
nextHandler.setNextHandler(new OutputHandler(BaseHandlerBuilder.getSequenceId(), session));
}
}
void nextHandlerCleanBuffer() {
if (nextHandler instanceof OutputHandler) {
((OutputHandler) nextHandler).cleanBuffer();
} else if (nextHandler instanceof SendMakeHandler) {
((SendMakeHandler) nextHandler).cleanBuffer();
}
}
@Override
public void connectionClose(BackendConnection conn, String reason) {
outputHandler.cleanBuffer();
nextHandlerCleanBuffer();
super.connectionClose(conn, reason);
}
@Override
public void connectionError(Throwable e, BackendConnection conn) {
outputHandler.cleanBuffer();
nextHandlerCleanBuffer();
super.connectionError(e, conn);
}
@Override
public void errorResponse(byte[] data, BackendConnection conn) {
outputHandler.cleanBuffer();
nextHandlerCleanBuffer();
super.errorResponse(data, conn);
}
@@ -163,7 +179,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
orderBys.add(new Order(itemField));
}
rowComparator = new RowDataComparator(HandlerTool.createFields(fieldPackets), orderBys);
outputHandler.fieldEofResponse(null, null, fieldPackets, null, false, conn);
nextHandler.fieldEofResponse(null, null, fieldPackets, null, false, conn);
}
private void startOwnThread() {
@@ -220,7 +236,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
continue;
}
}
outputHandler.rowResponse(top.getRowData(), top.getRowPacket(), false, top.getIndex());
nextHandler.rowResponse(top.getRowData(), top.getRowPacket(), false, top.getIndex());
}
}
Iterator<Map.Entry<BackendConnection, BlockingQueue<HeapItem>>> iterator = this.queues.entrySet().iterator();
@@ -231,7 +247,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
iterator.remove();
}
doSqlStat();
outputHandler.rowEofResponse(null, false, null);
nextHandler.rowEofResponse(null, false, null);
} catch (Exception e) {
String msg = "Merge thread error, " + e.getLocalizedMessage();
LOGGER.info(msg, e);
@@ -124,5 +124,9 @@ public class SendMakeHandler extends BaseDMLHandler {
public void onTerminate() {
}
public void cleanBuffer() {
if (nextHandler instanceof OutputHandler) {
((OutputHandler) nextHandler).cleanBuffer();
}
}
}
@@ -6,6 +6,7 @@
package com.actiontech.dble.route;
import com.actiontech.dble.config.loader.zkprocess.zookeeper.process.DDLInfo;
import com.actiontech.dble.plan.common.item.Item;
import com.actiontech.dble.util.FormatUtil;
import com.actiontech.dble.util.StringUtil;
import com.alibaba.druid.sql.ast.SQLStatement;
@@ -60,7 +61,11 @@ public final class RouteResultset implements Serializable {
// if force master,set canRunInReadDB=false
// if force slave set runOnSlave,default null means not effect
private Boolean runOnSlave = null;
private String[] groupByCols;
private transient List<Item> selectCols;
private boolean groupByColsHasShardingCols;
private boolean routePenetration = false;
@@ -72,6 +77,22 @@ public final class RouteResultset implements Serializable {
this.groupByCols = groupByCols;
}
public List<Item> getSelectCols() {
return selectCols;
}
public void setSelectCols(List<Item> selectCols) {
this.selectCols = selectCols;
}
public boolean isGroupByColsHasShardingCols() {
return groupByColsHasShardingCols;
}
public void setGroupByColsHasShardingCols(boolean groupByColsHasShardingCols) {
this.groupByColsHasShardingCols = groupByColsHasShardingCols;
}
public boolean isNeedOptimizer() {
return needOptimizer;
}
@@ -7,6 +7,7 @@ package com.actiontech.dble.route.parser.druid.impl;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.mysql.CharsetUtil;
import com.actiontech.dble.backend.mysql.VersionUtil;
import com.actiontech.dble.cache.LayerCachePool;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.config.ServerPrivileges;
@@ -16,6 +17,7 @@ import com.actiontech.dble.config.model.TableConfig;
import com.actiontech.dble.config.model.rule.RuleConfig;
import com.actiontech.dble.meta.protocol.StructureMeta;
import com.actiontech.dble.plan.common.item.Item;
import com.actiontech.dble.plan.common.item.ItemField;
import com.actiontech.dble.plan.common.item.function.ItemCreate;
import com.actiontech.dble.plan.common.ptr.StringPtr;
import com.actiontech.dble.plan.visitor.MySQLItemVisitor;
@@ -23,6 +25,7 @@ import com.actiontech.dble.route.RouteResultset;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.route.parser.druid.RouteCalculateUnit;
import com.actiontech.dble.route.parser.druid.ServerSchemaStatVisitor;
import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.route.util.RouterUtil;
import com.actiontech.dble.server.ServerConnection;
import com.actiontech.dble.server.handler.MysqlSystemSchemaHandler;
@@ -31,6 +34,7 @@ import com.actiontech.dble.server.util.SchemaUtil.SchemaInfo;
import com.actiontech.dble.singleton.CacheService;
import com.actiontech.dble.singleton.ProxyMeta;
import com.actiontech.dble.sqlengine.mpp.ColumnRoutePair;
import com.actiontech.dble.util.CollectionUtil;
import com.actiontech.dble.util.StringUtil;
import com.alibaba.druid.sql.ast.*;
import com.alibaba.druid.sql.ast.expr.*;
@@ -176,7 +180,7 @@ public class DruidSelectParser extends DefaultDruidParser {
}
}
//if the sql involved node more than 1 ,Aggregate function/Group by/Order by should use complexQuery
parseOrderAggGroupMysql(schema, selectStmt, rrs, mysqlSelectQuery, tc);
parseOrderAggGroupMysql(sc, schema, selectStmt, rrs, mysqlSelectQuery, tc);
if (rrs.isNeedOptimizer()) {
rrs.setNodes(null);
return;
@@ -334,7 +338,7 @@ public class DruidSelectParser extends DefaultDruidParser {
}
private void parseOrderAggGroupMysql(SchemaConfig schema, SQLStatement stmt, RouteResultset rrs,
private void parseOrderAggGroupMysql(ServerConnection sc, SchemaConfig schema, SQLStatement stmt, RouteResultset rrs,
MySqlSelectQueryBlock mysqlSelectQuery, TableConfig tc) throws SQLException {
//simple merge of ORDER BY has bugs,so optimizer here
if (mysqlSelectQuery.getOrderBy() != null) {
@@ -343,10 +347,10 @@ public class DruidSelectParser extends DefaultDruidParser {
rrs.setNeedOptimizer(true);
return;
}
parseAggGroupCommon(schema, stmt, rrs, mysqlSelectQuery, tc);
parseAggGroupCommon(sc, schema, stmt, rrs, mysqlSelectQuery, tc);
}
private void parseAggExprCommon(SchemaConfig schema, RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery, Map<String, String> aliaColumns, TableConfig tc, boolean isDistinct) throws SQLException {
private void parseAggExprCommon(SchemaConfig schema, RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery, List<Pair<String, String>> selectColumns, Map<String, String> aliaColumns, TableConfig tc, boolean isDistinct) throws SQLException {
List<SQLSelectItem> selectList = mysqlSelectQuery.getSelectList();
boolean hasPartitionColumn = false;
for (SQLSelectItem selectItem : selectList) {
@@ -371,7 +375,7 @@ public class DruidSelectParser extends DefaultDruidParser {
rrs.setNeedOptimizer(true);
return;
} else {
addToAliaColumn(aliaColumns, selectItem);
addToAliaColumn(selectColumns, aliaColumns, selectItem);
}
} else if (itemExpr instanceof SQLAllColumnExpr) {
StructureMeta.TableMeta tbMeta = ProxyMeta.getInstance().getTmManager().getSyncTableMeta(schema.getName(), tc.getName());
@@ -382,27 +386,26 @@ public class DruidSelectParser extends DefaultDruidParser {
}
for (StructureMeta.ColumnMeta column : tbMeta.getColumnsList()) {
aliaColumns.put(column.getName(), column.getName());
Pair<String, String> selectCol = new Pair<>(column.getName(), column.getName());
selectColumns.add(selectCol);
}
} else {
if (isDistinct && !isNeedOptimizer(itemExpr)) {
if (itemExpr instanceof SQLIdentifierExpr) {
SQLIdentifierExpr item = (SQLIdentifierExpr) itemExpr;
if (item.getSimpleName().equalsIgnoreCase(tc.getPartitionColumn())) {
hasPartitionColumn = true;
}
addToAliaColumn(aliaColumns, selectItem);
if (hasShardingColumn(tc, item.getSimpleName())) hasPartitionColumn = true;
addToAliaColumn(selectColumns, aliaColumns, selectItem);
} else if (itemExpr instanceof SQLPropertyExpr) {
SQLPropertyExpr item = (SQLPropertyExpr) itemExpr;
if (item.getSimpleName().equalsIgnoreCase(tc.getPartitionColumn())) {
hasPartitionColumn = true;
}
addToAliaColumn(aliaColumns, selectItem);
if (hasShardingColumn(tc, item.getSimpleName())) hasPartitionColumn = true;
addToAliaColumn(selectColumns, aliaColumns, selectItem);
}
} else if (isSumFuncOrSubQuery(schema.getName(), itemExpr)) {
rrs.setNeedOptimizer(true);
return;
} else {
addToAliaColumn(aliaColumns, selectItem);
addToAliaColumn(selectColumns, aliaColumns, selectItem);
}
}
}
@@ -410,48 +413,91 @@ public class DruidSelectParser extends DefaultDruidParser {
rrs.setNeedOptimizer(true);
return;
}
parseGroupCommon(rrs, mysqlSelectQuery, tc);
parseGroupCommon(rrs, mysqlSelectQuery, aliaColumns, tc);
}
private void parseGroupCommon(RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery, TableConfig tc) {
private void parseGroupCommon(RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery, Map<String, String> aliaColumns, TableConfig tc) {
if (mysqlSelectQuery.getGroupBy() != null) {
SQLSelectGroupByClause groupBy = mysqlSelectQuery.getGroupBy();
boolean hasPartitionColumn = false;
SQLExpr partitionColumn = null;
for (SQLExpr groupByItem : groupBy.getItems()) {
if (isNeedOptimizer(groupByItem)) {
rrs.setNeedOptimizer(true);
return;
} else if (groupByItem instanceof SQLIdentifierExpr) {
SQLIdentifierExpr item = (SQLIdentifierExpr) groupByItem;
if (item.getSimpleName().equalsIgnoreCase(tc.getPartitionColumn())) {
hasPartitionColumn = true;
if (hasShardingColumnWithAlia(tc, StringUtil.removeBackQuote(item.getSimpleName()), aliaColumns)) {
partitionColumn = item;
break;
}
} else if (groupByItem instanceof SQLPropertyExpr) {
SQLPropertyExpr item = (SQLPropertyExpr) groupByItem;
if (item.getSimpleName().equalsIgnoreCase(tc.getPartitionColumn())) {
hasPartitionColumn = true;
if (hasShardingColumnWithAlia(tc, StringUtil.removeBackQuote(item.getSimpleName()), aliaColumns)) {
partitionColumn = item;
break;
}
}
}
if (groupBy.getItems().size() > 0 && !hasPartitionColumn) {
if (groupBy.getItems().size() > 0 && partitionColumn == null) {
rrs.setNeedOptimizer(true);
return;
}
if (groupBy.getItems().size() == 0 && groupBy.getHaving() != null) {
// only having filter need optimizer
rrs.setNeedOptimizer(true);
return;
}
rrs.setGroupByColsHasShardingCols(partitionColumn != null);
}
}
private Set<SQLSelectItem> groupColumnPushSelectList(List<SQLExpr> groupByItemList, List<Pair<String, String>> selectColumns) {
Set<SQLSelectItem> pushItem = new HashSet<>();
for (SQLExpr groupByItem : groupByItemList) {
String groupColumnName = null;
if (groupByItem instanceof SQLIdentifierExpr) {
groupColumnName = ((SQLIdentifierExpr) groupByItem).getSimpleName();
} else if (groupByItem instanceof SQLPropertyExpr) {
groupColumnName = ((SQLPropertyExpr) groupByItem).getSimpleName();
}
if (!StringUtil.isEmpty(groupColumnName)) {
if (!hasColumnOrAlia(StringUtil.removeBackQuote(groupColumnName), selectColumns)) {
pushItem.add(new SQLSelectItem(groupByItem));
}
}
}
return pushItem;
}
private boolean hasColumnOrAlia(String columnName, List<Pair<String, String>> selectColumns) {
return selectColumns.stream().anyMatch(s -> s.getKey().equalsIgnoreCase(columnName) || s.getValue().equalsIgnoreCase(columnName));
}
private boolean hasShardingColumn(TableConfig tc, String columnName) {
return tc.getPartitionColumn() != null && columnName.equalsIgnoreCase(tc.getPartitionColumn());
}
private boolean hasShardingColumnWithAlia(TableConfig tc, String columnName, Map<String, String> aliaColumns) {
String shardingColumn = tc.getPartitionColumn();
boolean isShardingColumn = tc.getPartitionColumn() != null && columnName.equalsIgnoreCase(shardingColumn);
if (!isShardingColumn) {
Optional<Map.Entry<String, String>> alias = aliaColumns.entrySet().stream().filter(c -> c.getKey().toUpperCase().equals(shardingColumn)).findFirst();
if (alias.isPresent()) {
isShardingColumn = tc.getPartitionColumn() != null && alias.get().getValue().equalsIgnoreCase(columnName);
}
}
return isShardingColumn;
}
private boolean isSumFuncOrSubQuery(String schema, SQLExpr itemExpr) {
MySQLItemVisitor ev = new MySQLItemVisitor(schema, CharsetUtil.getCharsetDefaultIndex("utf8mb4"), ProxyMeta.getInstance().getTmManager(), null);
itemExpr.accept(ev);
Item selItem = ev.getItem();
return contaisSumFuncOrSubquery(selItem);
return containSumFuncOrSubQuery(selItem);
}
private boolean contaisSumFuncOrSubquery(Item selItem) {
private boolean containSumFuncOrSubQuery(Item selItem) {
if (selItem.isWithSumFunc()) {
return true;
}
@@ -460,7 +506,7 @@ public class DruidSelectParser extends DefaultDruidParser {
}
if (selItem.getArgCount() > 0) {
for (Item child : selItem.arguments()) {
if (contaisSumFuncOrSubquery(child)) {
if (containSumFuncOrSubQuery(child)) {
return true;
}
}
@@ -475,13 +521,16 @@ public class DruidSelectParser extends DefaultDruidParser {
return !(expr instanceof SQLPropertyExpr) && !(expr instanceof SQLIdentifierExpr);
}
private void addToAliaColumn(Map<String, String> aliaColumns, SQLSelectItem item) {
private void addToAliaColumn(List<Pair<String, String>> selectColumns, Map<String, String> aliaColumns, SQLSelectItem item) {
String alia = item.getAlias();
String field = getFieldName(item);
if (alia == null) {
alia = field;
}
aliaColumns.put(field, alia);
Pair<String, String> selectCol = new Pair<String, String>(alia, field);
selectColumns.add(selectCol);
}
private String getFieldName(SQLSelectItem item) {
@@ -493,11 +542,12 @@ public class DruidSelectParser extends DefaultDruidParser {
}
}
private void parseAggGroupCommon(SchemaConfig schema, SQLStatement stmt, RouteResultset rrs,
private void parseAggGroupCommon(ServerConnection sc, SchemaConfig schema, SQLStatement stmt, RouteResultset rrs,
MySqlSelectQueryBlock mysqlSelectQuery, TableConfig tc) throws SQLException {
Map<String, String> aliaColumns = new HashMap<>();
List<Pair<String, String>> selectColumns = new LinkedList<>();
boolean isDistinct = (mysqlSelectQuery.getDistionOption() == SQLSetQuantifier.DISTINCT) || (mysqlSelectQuery.getDistionOption() == SQLSetQuantifier.DISTINCTROW);
parseAggExprCommon(schema, rrs, mysqlSelectQuery, aliaColumns, tc, isDistinct);
parseAggExprCommon(schema, rrs, mysqlSelectQuery, selectColumns, aliaColumns, tc, isDistinct);
if (rrs.isNeedOptimizer()) {
tryAddLimit(schema, tc, mysqlSelectQuery, rrs);
rrs.setSqlStatement(stmt);
@@ -514,18 +564,57 @@ public class DruidSelectParser extends DefaultDruidParser {
mysqlSelectQuery.setGroupBy(groupBy);
}
// setGroupByCols
if (mysqlSelectQuery.getGroupBy() != null) {
List<SQLExpr> groupByItems = mysqlSelectQuery.getGroupBy().getItems();
String[] groupByCols = buildGroupByCols(groupByItems, aliaColumns);
rrs.setGroupByCols(groupByCols);
}
boolean isGroupByColPushSelectList = tryGroupColumnPushSelectList(aliaColumns, selectColumns,
mysqlSelectQuery, rrs, sc.getCharset().getResultsIndex());
if (isDistinct) {
rrs.changeNodeSqlAfterAddLimit(statementToString(stmt), 0, -1);
} else if (isGroupByColPushSelectList) {
rrs.changeNodeSqlAfterAddLimit(statementToString(stmt), rrs.getLimitStart(), rrs.getLimitSize());
}
}
/**
* when fakeMysqlVersion is 8.0, in 'group by' no longer has the semantics of 'order by'
*/
private boolean tryGroupColumnPushSelectList(Map<String, String> aliaColumns, List<Pair<String, String>> selectColumns,
MySqlSelectQueryBlock mysqlSelectQuery, RouteResultset rrs, int charsetIndex) {
boolean isGroupByColPushSelectList = false;
if (!VersionUtil.isMysql8() &&
rrs.isGroupByColsHasShardingCols()) { // && mysqlSelectQuery.getGroupBy() != null
Set<SQLSelectItem> pushSelectList = groupColumnPushSelectList(mysqlSelectQuery.getGroupBy().getItems(), selectColumns);
if (!CollectionUtil.isEmpty(pushSelectList)) {
for (SQLSelectItem e : pushSelectList) {
mysqlSelectQuery.getSelectList().add(e);
}
isGroupByColPushSelectList = true;
}
// setGroupByCols
List<SQLExpr> groupByItems = mysqlSelectQuery.getGroupBy().getItems();
String[] groupByCols = buildGroupByCols(groupByItems, aliaColumns);
rrs.setGroupByCols(groupByCols);
if (isGroupByColPushSelectList) {
rrs.setSelectCols(
handleSelectItems(selectColumns, rrs, charsetIndex));
}
}
return isGroupByColPushSelectList;
}
private LinkedList<Item> handleSelectItems(List<Pair<String, String>> selectList, RouteResultset rrs, int charsetIndex) {
LinkedList<Item> selectItems = new LinkedList<>();
for (Pair<String, String> sel : selectList) {
ItemField selItem = new ItemField(rrs.getSchema(), rrs.getTable(), StringUtil.removeBackQuote(sel.getValue()), charsetIndex);
selItem.setAlias(StringUtil.removeBackQuote(sel.getKey()));
selItem.setCharsetIndex(charsetIndex);
selectItems.add(selItem);
}
return selectItems;
}
private String getAliaColumn(Map<String, String> aliaColumns, String column) {
String alia = aliaColumns.get(column);
if (alia == null) {