From 43e9c9058a33da560ee3772072ba8fae704a6488 Mon Sep 17 00:00:00 2001 From: guoaomen Date: Thu, 3 Aug 2023 15:11:23 +0800 Subject: [PATCH] [inner-2315] fix: handle clickhouse incompatibilities The fields returned in the clickhouse-mysql protocol do not contain information about database/table, etc. --- .../nio/handler/builder/HandlerBuilder.java | 4 +- .../handler/query/impl/BaseSelectHandler.java | 65 +++++++++++++++++++ .../mysql/nio/handler/util/HandlerTool.java | 9 +++ .../dble/route/RouteResultsetNode.java | 17 ++++- .../parser/druid/impl/DefaultDruidParser.java | 4 +- .../dble/route/util/ConditionUtil.java | 2 +- .../dble/route/util/RouterUtil.java | 9 ++- .../dble/server/handler/ExplainHandler.java | 4 +- 8 files changed, 101 insertions(+), 13 deletions(-) diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/HandlerBuilder.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/HandlerBuilder.java index 955709188..f77bd099c 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/HandlerBuilder.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/HandlerBuilder.java @@ -162,7 +162,7 @@ public class HandlerBuilder { routeNode = routes[0]; } } - if (routeNode == null || routeNode.isApNode()) return null; + if (routeNode == null) return null; Set tableSet = Sets.newHashSet(); for (RouteResultsetNode routeResultsetNode : rrsNodes) { @@ -181,7 +181,7 @@ public class HandlerBuilder { sql = sql.replace(tableToSimple.getKey(), tableToSimple.getValue()); } } - return new RouteResultsetNode(routeNode.getName(), ServerParse.SELECT, sql, tableSet, routeNode.isApNode()); + return new RouteResultsetNode(routeNode.getName(), ServerParse.SELECT, sql, tableSet, routeNode.getTableAliasMap(), routeNode.isApNode()); } /** diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/BaseSelectHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/BaseSelectHandler.java index a99bc7dac..d1ee1841b 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/BaseSelectHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/BaseSelectHandler.java @@ -17,9 +17,12 @@ import com.actiontech.dble.net.mysql.RowDataPacket; import com.actiontech.dble.net.service.AbstractService; import com.actiontech.dble.plan.common.exception.MySQLOutPutException; import com.actiontech.dble.route.RouteResultsetNode; +import com.actiontech.dble.route.parser.util.Pair; +import com.actiontech.dble.route.util.ConditionUtil; import com.actiontech.dble.server.NonBlockingSession; import com.actiontech.dble.services.mysqlsharding.MySQLResponseService; import com.actiontech.dble.singleton.TraceManager; +import com.actiontech.dble.util.StringUtil; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,6 +31,8 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Optional; /** * for execute Sql,transform the response data to next handler @@ -114,11 +119,71 @@ public class BaseSelectHandler extends BaseDMLHandler { for (byte[] field1 : fields) { FieldPacket field = new FieldPacket(); field.read(field1); + if (rrss.isApNode()) { + handleFieldsOfOLAP(field); + } fieldPackets.add(field); } nextHandler.fieldEofResponse(null, null, fieldPackets, null, this.isLeft, service); } + private void handleFieldsOfOLAP(FieldPacket field) { + String charset = CharsetUtil.getJavaCharset(field.getCharsetIndex()); + try { + String column = new String(field.getName(), charset); + int separatorIndex = column.indexOf(StringUtil.TABLE_COLUMN_SEPARATOR); + if (rrss.getTableAliasMap().isEmpty() || rrss.getTableSet().isEmpty()) { + throw new RuntimeException("parse error: table name should not be empty."); + } + if (separatorIndex < 0) { + //first table-clickhouse rules + Map.Entry firstTable = rrss.getTableAliasMap().entrySet().iterator().next(); + populateTableInfo(field, column, firstTable, charset); + } else { + //Only clickhouse will return table.column format + String tableName = column.substring(0, separatorIndex); + String columnName = column.substring(++separatorIndex); + //first: key:alias value:tableName + //second: key:tableName value:tableName + Optional> tableOptional = rrss.getTableAliasMap().entrySet().stream().filter(entity -> StringUtil.equals(entity.getKey(), tableName)).findFirst(); + tableOptional.ifPresent(table -> populateTableInfo(field, columnName, table, charset)); + + } + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("parser error ,charset :" + charset); + } + } + + private void populateTableInfo(FieldPacket field, String columnName, Map.Entry table, String charset) { + try { + String tableAlias = StringUtil.removeBackQuote(table.getKey()); + String tableName = StringUtil.removeBackQuote(table.getValue()); + int tableIndex; + if ((tableIndex = tableName.indexOf(StringUtil.TABLE_COLUMN_SEPARATOR)) >= 0) { + tableName = tableName.substring(++tableIndex); + } + field.setName(columnName.getBytes(charset)); + field.setTable(tableAlias.getBytes(charset)); + field.setOrgTable(tableName.getBytes(charset)); + + for (String tableFullName : rrss.getTableSet()) { + //key:schemaName value:tableName + Pair tableInfo = ConditionUtil.getTableInfo(rrss.getTableAliasMap(), tableFullName, null); + if (StringUtil.equals(tableName, tableInfo.getValue())) { + if (!DbleServer.getInstance().getConfig().getSchemas().containsKey(tableInfo.getKey())) { + throw new RuntimeException("schema not found:" + tableInfo.getKey()); + } + String defaultApNode = DbleServer.getInstance().getConfig().getSchemas().get(tableInfo.getKey()).getDefaultApNode(); + String database = DbleServer.getInstance().getConfig().getApNodes().get(defaultApNode).getDatabase(); + field.setDb(database.getBytes(charset)); + break; + } + } + } catch (UnsupportedEncodingException e) { + throw new RuntimeException("parser error ,charset :" + charset); + } + } + @Override public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, @NotNull AbstractService conn) { if (terminate.get()) diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/util/HandlerTool.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/util/HandlerTool.java index a275f1b2b..3579e4d62 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/util/HandlerTool.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/util/HandlerTool.java @@ -6,6 +6,7 @@ package com.actiontech.dble.backend.mysql.nio.handler.util; import com.actiontech.dble.DbleServer; +import com.actiontech.dble.backend.datasource.ApNode; import com.actiontech.dble.backend.datasource.ShardingNode; import com.actiontech.dble.backend.mysql.nio.handler.builder.sqlvisitor.MysqlVisitor; import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler; @@ -350,6 +351,14 @@ public final class HandlerTool { ShardingNode dbNode = DbleServer.getInstance().getConfig().getShardingNodes().get(schemaConfig.getDefaultSingleNode()); return dbNode.getDatabase().equals(sourceSchema); } + //for ap node + String defaultApNode = schemaConfig.getDefaultApNode(); + if (!StringUtil.isEmpty(defaultApNode)) { + ApNode apNode = DbleServer.getInstance().getConfig().getApNodes().get(defaultApNode); + if (null != apNode && apNode.getDatabase().equals(sourceSchema)) { + return true; + } + } BaseTableConfig tbConfig = schemaConfig.getTables().get(table); if (tbConfig == null) { ShardingNode dbNode = DbleServer.getInstance().getConfig().getShardingNodes().get(schemaConfig.getDefaultSingleNode()); diff --git a/src/main/java/com/actiontech/dble/route/RouteResultsetNode.java b/src/main/java/com/actiontech/dble/route/RouteResultsetNode.java index b21e7bbd1..3994b3e36 100644 --- a/src/main/java/com/actiontech/dble/route/RouteResultsetNode.java +++ b/src/main/java/com/actiontech/dble/route/RouteResultsetNode.java @@ -9,6 +9,8 @@ import com.actiontech.dble.server.parser.ServerParse; import com.actiontech.dble.sqlengine.mpp.LoadData; import java.io.Serializable; +import java.util.LinkedHashMap; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -32,6 +34,10 @@ public class RouteResultsetNode implements Serializable, Comparable tableSet; + /** + * key table alias, value table real name; + */ + private Map tableAliasMap = new LinkedHashMap<>(); private AtomicLong repeatTableIndex; private boolean isForUpdate = false; private volatile byte loadDataRrnStatus; @@ -65,7 +71,7 @@ public class RouteResultsetNode implements Serializable, Comparable tableSet, boolean isApNode) { + public RouteResultsetNode(String name, int sqlType, String srcStatement, Set tableSet, Map tableAliasMap, boolean isApNode) { this.name = name; this.limitStart = 0; this.limitSize = -1; @@ -77,9 +83,14 @@ public class RouteResultsetNode implements Serializable, Comparable getTableAliasMap() { + return tableAliasMap; } @Override diff --git a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DefaultDruidParser.java b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DefaultDruidParser.java index 52ff15df8..7e184ee2f 100644 --- a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DefaultDruidParser.java +++ b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DefaultDruidParser.java @@ -81,7 +81,7 @@ public class DefaultDruidParser implements DruidParser { if (isAggregate) { Set tableSet = ctx.getTables().stream().map(tableEntry -> tableEntry.getKey() + "." + tableEntry.getValue()).collect(Collectors.toSet()); rrs.setNeedOptimizer(false); - RouterUtil.routeToApNode(rrs, schema.getDefaultApNode(), tableSet); + RouterUtil.routeToApNode(rrs, schema.getDefaultApNode(), tableSet, ctx.getTableAliasMap()); } } } @@ -117,7 +117,7 @@ public class DefaultDruidParser implements DruidParser { return null; } - Map tableAliasMap = new HashMap<>(originTableAliasMap); + Map tableAliasMap = new LinkedHashMap<>(originTableAliasMap); for (Map.Entry entry : originTableAliasMap.entrySet()) { String key = entry.getKey(); String value = entry.getValue(); diff --git a/src/main/java/com/actiontech/dble/route/util/ConditionUtil.java b/src/main/java/com/actiontech/dble/route/util/ConditionUtil.java index d4df00f60..63162c83f 100644 --- a/src/main/java/com/actiontech/dble/route/util/ConditionUtil.java +++ b/src/main/java/com/actiontech/dble/route/util/ConditionUtil.java @@ -199,7 +199,7 @@ public final class ConditionUtil { return table; } - private static Pair getTableInfo(Map tableAliasMap, String tableFullName, String defaultSchema) { + public static Pair getTableInfo(Map tableAliasMap, String tableFullName, String defaultSchema) { if (tableAliasMap != null && tableAliasMap.get(tableFullName) != null && !tableAliasMap.get(tableFullName).equals(tableFullName)) { tableFullName = tableAliasMap.get(tableFullName); diff --git a/src/main/java/com/actiontech/dble/route/util/RouterUtil.java b/src/main/java/com/actiontech/dble/route/util/RouterUtil.java index 916cdd770..880dcdcf0 100644 --- a/src/main/java/com/actiontech/dble/route/util/RouterUtil.java +++ b/src/main/java/com/actiontech/dble/route/util/RouterUtil.java @@ -221,13 +221,12 @@ public final class RouterUtil { return rrs; } - public static RouteResultset routeToApNode(RouteResultset rrs, String apNode, Set tableSet) { + public static RouteResultset routeToApNode(RouteResultset rrs, String apNode, Set tableSet, Map tableAliasMap) { if (apNode == null) { return rrs; } RouteResultsetNode[] nodes = new RouteResultsetNode[1]; - nodes[0] = new RouteResultsetNode(apNode, rrs.getSqlType(), rrs.getStatement(), tableSet); - nodes[0].setApNode(true); + nodes[0] = new RouteResultsetNode(apNode, rrs.getSqlType(), rrs.getStatement(), tableSet, tableAliasMap, true); rrs.setNodes(nodes); rrs.setFinishedRoute(true); return rrs; @@ -330,6 +329,10 @@ public final class RouterUtil { } RouteResultsetNode[] nodes = new RouteResultsetNode[1]; nodes[0] = new RouteResultsetNode(shardingNode, rrs.getSqlType(), rrs.getStatement(), tableSet); + if (rrs.getSqlType() == ServerParse.SELECT) { + boolean isApNode = DbleServer.getInstance().getConfig().getApNodes().containsKey(shardingNode); + nodes[0].setApNode(isApNode); + } rrs.setNodes(nodes); rrs.setFinishedRoute(true); if (rrs.getCanRunInReadDB() != null) { diff --git a/src/main/java/com/actiontech/dble/server/handler/ExplainHandler.java b/src/main/java/com/actiontech/dble/server/handler/ExplainHandler.java index 7a073de52..b5ef7af3e 100644 --- a/src/main/java/com/actiontech/dble/server/handler/ExplainHandler.java +++ b/src/main/java/com/actiontech/dble/server/handler/ExplainHandler.java @@ -253,7 +253,7 @@ public final class ExplainHandler { routeNode = routes[0]; } } - if (routeNode == null || routeNode.isApNode()) return null; + if (routeNode == null) return null; PlanNode node = builder.getNode(); String sql = rrs.isHaveHintPlan2Inner() ? routeNode.getStatement() : node.getSql(); @@ -266,7 +266,7 @@ public final class ExplainHandler { sql = sql.replace(tableToSimple.getKey(), tableToSimple.getValue()); } } - return new RouteResultsetNode(routeNode.getName(), rrs.getSqlType(), sql); + return new RouteResultsetNode(routeNode.getName(), rrs.getSqlType(), sql, routeNode.getTableSet(), routeNode.getTableAliasMap(), routeNode.isApNode()); } public static void writeOutHeadAndEof(ShardingService service, RouteResultset rrs) {