diff --git a/src/main/java/com/actiontech/dble/backend/mysql/VersionUtil.java b/src/main/java/com/actiontech/dble/backend/mysql/VersionUtil.java index 4cddc7f48..9be9678d9 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/VersionUtil.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/VersionUtil.java @@ -5,7 +5,12 @@ package com.actiontech.dble.backend.mysql; +import javax.annotation.Nullable; + public final class VersionUtil { + private static final String VERSION_8 = "8."; + private static final String VERSION_5 = "5."; + private VersionUtil() { } @@ -23,4 +28,20 @@ public final class VersionUtil { return TX_ISOLATION; } } + + @Nullable + public static Integer getMajorVersionWithoutDefaultValue(String version) { + if (version.startsWith(VERSION_8)) { + return 8; + } else if (version.startsWith(VERSION_5) || version.contains("MariaDB")) { + return 5; + } else { + return null; + } + } + + public static boolean isMysql8(String version) { + final Integer versionNumber = VersionUtil.getMajorVersionWithoutDefaultValue(version); + return versionNumber == 8; + } } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java index c5338604e..ff6222ba6 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java @@ -6,7 +6,9 @@ package com.actiontech.dble.backend.mysql.nio.handler; import com.actiontech.dble.DbleServer; import com.actiontech.dble.backend.mysql.nio.handler.builder.BaseHandlerBuilder; +import com.actiontech.dble.backend.mysql.nio.handler.query.BaseDMLHandler; import com.actiontech.dble.backend.mysql.nio.handler.query.impl.OutputHandler; +import com.actiontech.dble.backend.mysql.nio.handler.query.impl.SendMakeHandler; import com.actiontech.dble.backend.mysql.nio.handler.transaction.AutoTxOperation; import com.actiontech.dble.backend.mysql.nio.handler.util.ArrayMinHeap; import com.actiontech.dble.backend.mysql.nio.handler.util.HandlerTool; @@ -22,6 +24,7 @@ import com.actiontech.dble.route.RouteResultset; import com.actiontech.dble.server.NonBlockingSession; import com.actiontech.dble.services.mysqlsharding.MySQLResponseService; import com.actiontech.dble.singleton.TraceManager; +import com.actiontech.dble.util.CollectionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,32 +42,44 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler { private final int queueSize; private Map> queues; private RowDataComparator rowComparator; - private OutputHandler outputHandler; + private BaseDMLHandler nextHandler; private volatile boolean noNeedRows = false; public MultiNodeSelectHandler(RouteResultset rrs, NonBlockingSession session) { super(rrs, session, false); this.queueSize = SystemConfig.getInstance().getMergeQueueSize(); this.queues = new ConcurrentHashMap<>(); - outputHandler = new OutputHandler(BaseHandlerBuilder.getSequenceId(), session); + if (CollectionUtil.isEmpty(rrs.getSelectCols())) { + nextHandler = new OutputHandler(BaseHandlerBuilder.getSequenceId(), session); + } else { + nextHandler = new SendMakeHandler(BaseHandlerBuilder.getSequenceId(), session, rrs.getSelectCols(), rrs.getSchema(), rrs.getTable(), rrs.getTableAlias()); + nextHandler.setNextHandler(new OutputHandler(BaseHandlerBuilder.getSequenceId(), session)); + } + } + + void nextHandlerCleanBuffer() { + if (nextHandler instanceof OutputHandler) { + ((OutputHandler) nextHandler).cleanBuffer(); + } else if (nextHandler instanceof SendMakeHandler) { + ((SendMakeHandler) nextHandler).cleanBuffer(); + } } @Override public void connectionClose(AbstractService service, String reason) { - outputHandler.cleanBuffer(); + nextHandlerCleanBuffer(); super.connectionClose(service, reason); } - @Override public void connectionError(Throwable e, Object attachment) { - outputHandler.cleanBuffer(); + nextHandlerCleanBuffer(); super.connectionError(e, attachment); } @Override public void errorResponse(byte[] data, AbstractService service) { - outputHandler.cleanBuffer(); + nextHandlerCleanBuffer(); super.errorResponse(data, service); } @@ -83,7 +98,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler { } @Override - public void fieldEofResponse(byte[] header, List fields, List fieldPacketsNull, byte[] eof, + public void fieldEofResponse(byte[] header, List fields, List fieldPackets, byte[] eof, boolean isLeft, AbstractService service) { queues.put((MySQLResponseService) service, new LinkedBlockingQueue<>(queueSize)); lock.lock(); @@ -166,7 +181,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler { orderBys.add(new Order(itemField)); } rowComparator = new RowDataComparator(HandlerTool.createFields(fieldPackets), orderBys); - outputHandler.fieldEofResponse(null, null, fieldPackets, null, false, service); + nextHandler.fieldEofResponse(null, null, fieldPackets, null, false, service); } private void startOwnThread() { @@ -223,7 +238,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler { continue; } } - outputHandler.rowResponse(top.getRowData(), top.getRowPacket(), false, top.getIndex()); + nextHandler.rowResponse(top.getRowData(), top.getRowPacket(), false, top.getIndex()); } } Iterator>> iterator = this.queues.entrySet().iterator(); @@ -234,7 +249,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler { iterator.remove(); } doSqlStat(); - outputHandler.rowEofResponse(null, false, null); + nextHandler.rowEofResponse(null, false, null); } catch (Exception e) { String msg = "Merge thread error, " + e.getLocalizedMessage(); LOGGER.info(msg, e); diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/SendMakeHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/SendMakeHandler.java index a8e1e1400..ad8aabeeb 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/SendMakeHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/SendMakeHandler.java @@ -125,5 +125,9 @@ public class SendMakeHandler extends BaseDMLHandler { public void onTerminate() { } - + public void cleanBuffer() { + if (nextHandler instanceof OutputHandler) { + ((OutputHandler) nextHandler).cleanBuffer(); + } + } } diff --git a/src/main/java/com/actiontech/dble/route/RouteResultset.java b/src/main/java/com/actiontech/dble/route/RouteResultset.java index e6d93523c..689d95eb0 100644 --- a/src/main/java/com/actiontech/dble/route/RouteResultset.java +++ b/src/main/java/com/actiontech/dble/route/RouteResultset.java @@ -6,6 +6,7 @@ package com.actiontech.dble.route; import com.actiontech.dble.cluster.values.DDLInfo; +import com.actiontech.dble.plan.common.item.Item; import com.actiontech.dble.util.FormatUtil; import com.alibaba.druid.sql.ast.SQLStatement; import org.slf4j.Logger; @@ -61,7 +62,11 @@ public final class RouteResultset implements Serializable { // if force master,set canRunInReadDB=false // if force slave set runOnSlave,default null means not effect private Boolean runOnSlave = null; + private String[] groupByCols; + private transient List selectCols; + + private boolean groupByColsHasShardingCols; private boolean routePenetration = false; @@ -83,6 +88,22 @@ public final class RouteResultset implements Serializable { this.groupByCols = groupByCols; } + public List getSelectCols() { + return selectCols; + } + + public void setSelectCols(List selectCols) { + this.selectCols = selectCols; + } + + public boolean isGroupByColsHasShardingCols() { + return groupByColsHasShardingCols; + } + + public void setGroupByColsHasShardingCols(boolean groupByColsHasShardingCols) { + this.groupByColsHasShardingCols = groupByColsHasShardingCols; + } + public boolean isNeedOptimizer() { return needOptimizer; } diff --git a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidSelectParser.java b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidSelectParser.java index 5e78a1668..2a7709f8f 100644 --- a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidSelectParser.java +++ b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidSelectParser.java @@ -7,7 +7,9 @@ package com.actiontech.dble.route.parser.druid.impl; import com.actiontech.dble.DbleServer; import com.actiontech.dble.backend.mysql.CharsetUtil; +import com.actiontech.dble.backend.mysql.VersionUtil; import com.actiontech.dble.config.ErrorCode; +import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.config.model.sharding.SchemaConfig; import com.actiontech.dble.config.model.sharding.table.BaseTableConfig; import com.actiontech.dble.config.model.sharding.table.GlobalTableConfig; @@ -17,6 +19,7 @@ import com.actiontech.dble.config.privileges.ShardingPrivileges.CheckType; import com.actiontech.dble.meta.ColumnMeta; import com.actiontech.dble.meta.TableMeta; import com.actiontech.dble.plan.common.item.Item; +import com.actiontech.dble.plan.common.item.ItemField; import com.actiontech.dble.plan.common.item.function.ItemCreate; import com.actiontech.dble.plan.common.ptr.StringPtr; import com.actiontech.dble.plan.visitor.MySQLItemVisitor; @@ -33,6 +36,7 @@ import com.actiontech.dble.server.util.SchemaUtil.SchemaInfo; import com.actiontech.dble.services.mysqlsharding.ShardingService; import com.actiontech.dble.singleton.ProxyMeta; import com.actiontech.dble.sqlengine.mpp.ColumnRoute; +import com.actiontech.dble.util.CollectionUtil; import com.actiontech.dble.util.StringUtil; import com.alibaba.druid.sql.ast.*; import com.alibaba.druid.sql.ast.expr.*; @@ -184,7 +188,7 @@ public class DruidSelectParser extends DefaultDruidParser { } } //if the sql involved node more than 1 ,Aggregate function/Group by/Order by should use complexQuery - parseOrderAggGroupMysql(schema, selectStmt, rrs, mysqlSelectQuery, tc); + parseOrderAggGroupMysql(service, schema, selectStmt, rrs, mysqlSelectQuery, tc); if (rrs.isNeedOptimizer()) { rrs.setNodes(null); return; @@ -336,7 +340,7 @@ public class DruidSelectParser extends DefaultDruidParser { } - private void parseOrderAggGroupMysql(SchemaConfig schema, SQLStatement stmt, RouteResultset rrs, + private void parseOrderAggGroupMysql(ShardingService service, SchemaConfig schema, SQLStatement stmt, RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery, BaseTableConfig tc) throws SQLException { //simple merge of ORDER BY has bugs,so optimizer here if (mysqlSelectQuery.getOrderBy() != null) { @@ -345,10 +349,10 @@ public class DruidSelectParser extends DefaultDruidParser { rrs.setNeedOptimizer(true); return; } - parseAggGroupCommon(schema, stmt, rrs, mysqlSelectQuery, tc); + parseAggGroupCommon(service, schema, stmt, rrs, mysqlSelectQuery, tc); } - private void parseAggExprCommon(SchemaConfig schema, RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery, Map aliaColumns, BaseTableConfig tc, boolean isDistinct) throws SQLException { + private void parseAggExprCommon(SchemaConfig schema, RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery, List> selectColumns, Map aliaColumns, BaseTableConfig tc, boolean isDistinct) throws SQLException { List selectList = mysqlSelectQuery.getSelectList(); boolean hasPartitionColumn = false; for (SQLSelectItem selectItem : selectList) { @@ -373,7 +377,7 @@ public class DruidSelectParser extends DefaultDruidParser { rrs.setNeedOptimizer(true); return; } else { - addToAliaColumn(aliaColumns, selectItem); + addToAliaColumn(selectColumns, aliaColumns, selectItem); } } else if (itemExpr instanceof SQLAllColumnExpr) { TableMeta tbMeta = ProxyMeta.getInstance().getTmManager().getSyncTableMeta(schema.getName(), tc.getName()); @@ -384,23 +388,26 @@ public class DruidSelectParser extends DefaultDruidParser { } for (ColumnMeta column : tbMeta.getColumns()) { aliaColumns.put(column.getName(), column.getName()); + + Pair selectCol = new Pair<>(column.getName(), column.getName()); + selectColumns.add(selectCol); } } else { if (isDistinct && !isNeedOptimizer(itemExpr)) { if (itemExpr instanceof SQLIdentifierExpr) { SQLIdentifierExpr item = (SQLIdentifierExpr) itemExpr; if (hasShardingColumn(tc, item.getSimpleName())) hasPartitionColumn = true; - addToAliaColumn(aliaColumns, selectItem); + addToAliaColumn(selectColumns, aliaColumns, selectItem); } else if (itemExpr instanceof SQLPropertyExpr) { SQLPropertyExpr item = (SQLPropertyExpr) itemExpr; if (hasShardingColumn(tc, item.getSimpleName())) hasPartitionColumn = true; - addToAliaColumn(aliaColumns, selectItem); + addToAliaColumn(selectColumns, aliaColumns, selectItem); } } else if (isSumFuncOrSubQuery(schema.getName(), itemExpr)) { rrs.setNeedOptimizer(true); return; } else { - addToAliaColumn(aliaColumns, selectItem); + addToAliaColumn(selectColumns, aliaColumns, selectItem); } } } @@ -408,48 +415,91 @@ public class DruidSelectParser extends DefaultDruidParser { rrs.setNeedOptimizer(true); return; } - parseGroupCommon(rrs, mysqlSelectQuery, tc); + parseGroupCommon(rrs, mysqlSelectQuery, aliaColumns, tc); } - private void parseGroupCommon(RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery, BaseTableConfig tc) { + private void parseGroupCommon(RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery, Map aliaColumns, BaseTableConfig tc) { if (mysqlSelectQuery.getGroupBy() != null) { SQLSelectGroupByClause groupBy = mysqlSelectQuery.getGroupBy(); - boolean hasPartitionColumn = false; + SQLExpr partitionColumn = null; for (SQLExpr groupByItem : groupBy.getItems()) { if (isNeedOptimizer(groupByItem)) { rrs.setNeedOptimizer(true); return; } else if (groupByItem instanceof SQLIdentifierExpr) { SQLIdentifierExpr item = (SQLIdentifierExpr) groupByItem; - hasPartitionColumn = hasShardingColumn(tc, item.getSimpleName()); + if (hasShardingColumnWithAlia(tc, StringUtil.removeBackQuote(item.getSimpleName()), aliaColumns)) { + partitionColumn = item; + break; + } } else if (groupByItem instanceof SQLPropertyExpr) { SQLPropertyExpr item = (SQLPropertyExpr) groupByItem; - hasPartitionColumn = hasShardingColumn(tc, item.getSimpleName()); + if (hasShardingColumnWithAlia(tc, StringUtil.removeBackQuote(item.getSimpleName()), aliaColumns)) { + partitionColumn = item; + break; + } } } - if (groupBy.getItems().size() > 0 && !hasPartitionColumn) { + if (groupBy.getItems().size() > 0 && partitionColumn == null) { rrs.setNeedOptimizer(true); return; } if (groupBy.getItems().size() == 0 && groupBy.getHaving() != null) { // only having filter need optimizer rrs.setNeedOptimizer(true); + return; + } + rrs.setGroupByColsHasShardingCols(partitionColumn != null); + } + } + + private Set groupColumnPushSelectList(List groupByItemList, List> selectColumns) { + Set pushItem = new HashSet<>(); + + for (SQLExpr groupByItem : groupByItemList) { + String groupColumnName = null; + if (groupByItem instanceof SQLIdentifierExpr) { + groupColumnName = ((SQLIdentifierExpr) groupByItem).getSimpleName(); + } else if (groupByItem instanceof SQLPropertyExpr) { + groupColumnName = ((SQLPropertyExpr) groupByItem).getSimpleName(); + } + if (!StringUtil.isEmpty(groupColumnName)) { + if (!hasColumnOrAlia(StringUtil.removeBackQuote(groupColumnName), selectColumns)) { + pushItem.add(new SQLSelectItem(groupByItem)); + } } } + return pushItem; + } + + private boolean hasColumnOrAlia(String columnName, List> selectColumns) { + return selectColumns.stream().anyMatch(s -> s.getKey().equalsIgnoreCase(columnName) || s.getValue().equalsIgnoreCase(columnName)); } private boolean hasShardingColumn(BaseTableConfig tc, String columnName) { return tc instanceof ShardingTableConfig && columnName.equalsIgnoreCase(((ShardingTableConfig) tc).getShardingColumn()); } + private boolean hasShardingColumnWithAlia(BaseTableConfig tc, String columnName, Map aliaColumns) { + String shardingColumn = ((ShardingTableConfig) tc).getShardingColumn(); + boolean isShardingColumn = tc instanceof ShardingTableConfig && columnName.equalsIgnoreCase(shardingColumn); + if (!isShardingColumn) { + Optional> alias = aliaColumns.entrySet().stream().filter(c -> c.getKey().toUpperCase().equals(shardingColumn)).findFirst(); + if (alias.isPresent()) { + isShardingColumn = tc instanceof ShardingTableConfig && alias.get().getValue().equalsIgnoreCase(columnName); + } + } + return isShardingColumn; + } + private boolean isSumFuncOrSubQuery(String schema, SQLExpr itemExpr) { MySQLItemVisitor ev = new MySQLItemVisitor(schema, CharsetUtil.getCharsetDefaultIndex("utf8mb4"), ProxyMeta.getInstance().getTmManager(), null); itemExpr.accept(ev); Item selItem = ev.getItem(); - return contaisSumFuncOrSubquery(selItem); + return containSumFuncOrSubQuery(selItem); } - private boolean contaisSumFuncOrSubquery(Item selItem) { + private boolean containSumFuncOrSubQuery(Item selItem) { if (selItem.isWithSumFunc()) { return true; } @@ -458,7 +508,7 @@ public class DruidSelectParser extends DefaultDruidParser { } if (selItem.getArgCount() > 0) { for (Item child : selItem.arguments()) { - if (contaisSumFuncOrSubquery(child)) { + if (containSumFuncOrSubQuery(child)) { return true; } } @@ -473,13 +523,16 @@ public class DruidSelectParser extends DefaultDruidParser { return !(expr instanceof SQLPropertyExpr) && !(expr instanceof SQLIdentifierExpr); } - private void addToAliaColumn(Map aliaColumns, SQLSelectItem item) { + private void addToAliaColumn(List> selectColumns, Map aliaColumns, SQLSelectItem item) { String alia = item.getAlias(); String field = getFieldName(item); if (alia == null) { alia = field; } aliaColumns.put(field, alia); + + Pair selectCol = new Pair(alia, field); + selectColumns.add(selectCol); } private String getFieldName(SQLSelectItem item) { @@ -491,11 +544,12 @@ public class DruidSelectParser extends DefaultDruidParser { } } - private void parseAggGroupCommon(SchemaConfig schema, SQLStatement stmt, RouteResultset rrs, + private void parseAggGroupCommon(ShardingService service, SchemaConfig schema, SQLStatement stmt, RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery, BaseTableConfig tc) throws SQLException { Map aliaColumns = new HashMap<>(); + List> selectColumns = new LinkedList<>(); boolean isDistinct = (mysqlSelectQuery.getDistionOption() == SQLSetQuantifier.DISTINCT) || (mysqlSelectQuery.getDistionOption() == SQLSetQuantifier.DISTINCTROW); - parseAggExprCommon(schema, rrs, mysqlSelectQuery, aliaColumns, tc, isDistinct); + parseAggExprCommon(schema, rrs, mysqlSelectQuery, selectColumns, aliaColumns, tc, isDistinct); if (rrs.isNeedOptimizer()) { tryAddLimit(tc, mysqlSelectQuery); rrs.setSqlStatement(stmt); @@ -512,18 +566,57 @@ public class DruidSelectParser extends DefaultDruidParser { mysqlSelectQuery.setGroupBy(groupBy); } - // setGroupByCols - if (mysqlSelectQuery.getGroupBy() != null) { - List groupByItems = mysqlSelectQuery.getGroupBy().getItems(); - String[] groupByCols = buildGroupByCols(groupByItems, aliaColumns); - rrs.setGroupByCols(groupByCols); - } + boolean isGroupByColPushSelectList = tryGroupColumnPushSelectList(aliaColumns, selectColumns, + mysqlSelectQuery, rrs, service.getCharset().getResultsIndex()); if (isDistinct) { rrs.changeNodeSqlAfterAddLimit(statementToString(stmt), 0, -1); + } else if (isGroupByColPushSelectList) { + rrs.changeNodeSqlAfterAddLimit(statementToString(stmt), rrs.getLimitStart(), rrs.getLimitSize()); } } + /** + * when fakeMysqlVersion is 8.0, in 'group by' no longer has the semantics of 'order by' + */ + private boolean tryGroupColumnPushSelectList(Map aliaColumns, List> selectColumns, + MySqlSelectQueryBlock mysqlSelectQuery, RouteResultset rrs, int charsetIndex) { + boolean isGroupByColPushSelectList = false; + if (!VersionUtil.isMysql8(SystemConfig.getInstance().getFakeMySQLVersion()) && + rrs.isGroupByColsHasShardingCols()) { // && mysqlSelectQuery.getGroupBy() != null + + Set pushSelectList = groupColumnPushSelectList(mysqlSelectQuery.getGroupBy().getItems(), selectColumns); + if (!CollectionUtil.isEmpty(pushSelectList)) { + for (SQLSelectItem e : pushSelectList) { + mysqlSelectQuery.getSelectList().add(e); + } + isGroupByColPushSelectList = true; + } + + // setGroupByCols + List groupByItems = mysqlSelectQuery.getGroupBy().getItems(); + String[] groupByCols = buildGroupByCols(groupByItems, aliaColumns); + rrs.setGroupByCols(groupByCols); + + if (isGroupByColPushSelectList) { + rrs.setSelectCols( + handleSelectItems(selectColumns, rrs, charsetIndex)); + } + } + return isGroupByColPushSelectList; + } + + private LinkedList handleSelectItems(List> selectList, RouteResultset rrs, int charsetIndex) { + LinkedList selectItems = new LinkedList<>(); + for (Pair sel : selectList) { + ItemField selItem = new ItemField(rrs.getSchema(), rrs.getTable(), StringUtil.removeBackQuote(sel.getValue()), charsetIndex); + selItem.setAlias(StringUtil.removeBackQuote(sel.getKey())); + selItem.setCharsetIndex(charsetIndex); + selectItems.add(selItem); + } + return selectItems; + } + private String getAliaColumn(Map aliaColumns, String column) { String alia = aliaColumns.get(column); if (alia == null) { @@ -582,7 +675,7 @@ public class DruidSelectParser extends DefaultDruidParser { // get column from table.column column = column.substring(dotIndex + 1); } - groupByCols[i] = getAliaColumn(aliaColumns, column); // column; + groupByCols[i] = getAliaColumn(aliaColumns, StringUtil.removeBackQuote(column)); // column; } return groupByCols; }