[inner-2315] fix: handle clickhouse incompatibilities

The fields returned in the clickhouse-mysql protocol do not contain information about database/table, etc.
This commit is contained in:
guoaomen
2023-08-03 15:11:23 +08:00
parent bb99c4d6ae
commit 43e9c9058a
8 changed files with 101 additions and 13 deletions

View File

@@ -162,7 +162,7 @@ public class HandlerBuilder {
routeNode = routes[0];
}
}
if (routeNode == null || routeNode.isApNode()) return null;
if (routeNode == null) return null;
Set<String> tableSet = Sets.newHashSet();
for (RouteResultsetNode routeResultsetNode : rrsNodes) {
@@ -181,7 +181,7 @@ public class HandlerBuilder {
sql = sql.replace(tableToSimple.getKey(), tableToSimple.getValue());
}
}
return new RouteResultsetNode(routeNode.getName(), ServerParse.SELECT, sql, tableSet, routeNode.isApNode());
return new RouteResultsetNode(routeNode.getName(), ServerParse.SELECT, sql, tableSet, routeNode.getTableAliasMap(), routeNode.isApNode());
}
/**

View File

@@ -17,9 +17,12 @@ import com.actiontech.dble.net.mysql.RowDataPacket;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.plan.common.exception.MySQLOutPutException;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.route.util.ConditionUtil;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.singleton.TraceManager;
import com.actiontech.dble.util.StringUtil;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,6 +31,8 @@ import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
/**
* for execute Sql,transform the response data to next handler
@@ -114,11 +119,71 @@ public class BaseSelectHandler extends BaseDMLHandler {
for (byte[] field1 : fields) {
FieldPacket field = new FieldPacket();
field.read(field1);
if (rrss.isApNode()) {
handleFieldsOfOLAP(field);
}
fieldPackets.add(field);
}
nextHandler.fieldEofResponse(null, null, fieldPackets, null, this.isLeft, service);
}
private void handleFieldsOfOLAP(FieldPacket field) {
String charset = CharsetUtil.getJavaCharset(field.getCharsetIndex());
try {
String column = new String(field.getName(), charset);
int separatorIndex = column.indexOf(StringUtil.TABLE_COLUMN_SEPARATOR);
if (rrss.getTableAliasMap().isEmpty() || rrss.getTableSet().isEmpty()) {
throw new RuntimeException("parse error: table name should not be empty.");
}
if (separatorIndex < 0) {
//first table-clickhouse rules
Map.Entry<String, String> firstTable = rrss.getTableAliasMap().entrySet().iterator().next();
populateTableInfo(field, column, firstTable, charset);
} else {
//Only clickhouse will return table.column format
String tableName = column.substring(0, separatorIndex);
String columnName = column.substring(++separatorIndex);
//first: key:alias value:tableName
//second: key:tableName value:tableName
Optional<Map.Entry<String, String>> tableOptional = rrss.getTableAliasMap().entrySet().stream().filter(entity -> StringUtil.equals(entity.getKey(), tableName)).findFirst();
tableOptional.ifPresent(table -> populateTableInfo(field, columnName, table, charset));
}
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("parser error ,charset :" + charset);
}
}
private void populateTableInfo(FieldPacket field, String columnName, Map.Entry<String, String> table, String charset) {
try {
String tableAlias = StringUtil.removeBackQuote(table.getKey());
String tableName = StringUtil.removeBackQuote(table.getValue());
int tableIndex;
if ((tableIndex = tableName.indexOf(StringUtil.TABLE_COLUMN_SEPARATOR)) >= 0) {
tableName = tableName.substring(++tableIndex);
}
field.setName(columnName.getBytes(charset));
field.setTable(tableAlias.getBytes(charset));
field.setOrgTable(tableName.getBytes(charset));
for (String tableFullName : rrss.getTableSet()) {
//key:schemaName value:tableName
Pair<String, String> tableInfo = ConditionUtil.getTableInfo(rrss.getTableAliasMap(), tableFullName, null);
if (StringUtil.equals(tableName, tableInfo.getValue())) {
if (!DbleServer.getInstance().getConfig().getSchemas().containsKey(tableInfo.getKey())) {
throw new RuntimeException("schema not found:" + tableInfo.getKey());
}
String defaultApNode = DbleServer.getInstance().getConfig().getSchemas().get(tableInfo.getKey()).getDefaultApNode();
String database = DbleServer.getInstance().getConfig().getApNodes().get(defaultApNode).getDatabase();
field.setDb(database.getBytes(charset));
break;
}
}
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("parser error ,charset :" + charset);
}
}
@Override
public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, @NotNull AbstractService conn) {
if (terminate.get())

View File

@@ -6,6 +6,7 @@
package com.actiontech.dble.backend.mysql.nio.handler.util;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.datasource.ApNode;
import com.actiontech.dble.backend.datasource.ShardingNode;
import com.actiontech.dble.backend.mysql.nio.handler.builder.sqlvisitor.MysqlVisitor;
import com.actiontech.dble.backend.mysql.nio.handler.query.DMLResponseHandler;
@@ -350,6 +351,14 @@ public final class HandlerTool {
ShardingNode dbNode = DbleServer.getInstance().getConfig().getShardingNodes().get(schemaConfig.getDefaultSingleNode());
return dbNode.getDatabase().equals(sourceSchema);
}
//for ap node
String defaultApNode = schemaConfig.getDefaultApNode();
if (!StringUtil.isEmpty(defaultApNode)) {
ApNode apNode = DbleServer.getInstance().getConfig().getApNodes().get(defaultApNode);
if (null != apNode && apNode.getDatabase().equals(sourceSchema)) {
return true;
}
}
BaseTableConfig tbConfig = schemaConfig.getTables().get(table);
if (tbConfig == null) {
ShardingNode dbNode = DbleServer.getInstance().getConfig().getShardingNodes().get(schemaConfig.getDefaultSingleNode());

View File

@@ -9,6 +9,8 @@ import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.sqlengine.mpp.LoadData;
import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -32,6 +34,10 @@ public class RouteResultsetNode implements Serializable, Comparable<RouteResults
private AtomicLong multiplexNum;
//included table
private Set<String> tableSet;
/**
* key table alias, value table real name;
*/
private Map<String, String> tableAliasMap = new LinkedHashMap<>();
private AtomicLong repeatTableIndex;
private boolean isForUpdate = false;
private volatile byte loadDataRrnStatus;
@@ -65,7 +71,7 @@ public class RouteResultsetNode implements Serializable, Comparable<RouteResults
this.tableSet = tableSet;
}
public RouteResultsetNode(String name, int sqlType, String srcStatement, Set<String> tableSet, boolean isApNode) {
public RouteResultsetNode(String name, int sqlType, String srcStatement, Set<String> tableSet, Map<String, String> tableAliasMap, boolean isApNode) {
this.name = name;
this.limitStart = 0;
this.limitSize = -1;
@@ -77,9 +83,14 @@ public class RouteResultsetNode implements Serializable, Comparable<RouteResults
this.repeatTableIndex = new AtomicLong(0);
loadDataRrnStatus = 0;
this.tableSet = tableSet;
this.tableAliasMap = tableAliasMap;
this.isApNode = isApNode;
}
public void setApNode(boolean apNode) {
isApNode = apNode;
}
public byte getLoadDataRrnStatus() {
return loadDataRrnStatus;
}
@@ -198,8 +209,8 @@ public class RouteResultsetNode implements Serializable, Comparable<RouteResults
return isApNode;
}
public void setApNode(boolean apNode) {
isApNode = apNode;
public Map<String, String> getTableAliasMap() {
return tableAliasMap;
}
@Override

View File

@@ -81,7 +81,7 @@ public class DefaultDruidParser implements DruidParser {
if (isAggregate) {
Set<String> tableSet = ctx.getTables().stream().map(tableEntry -> tableEntry.getKey() + "." + tableEntry.getValue()).collect(Collectors.toSet());
rrs.setNeedOptimizer(false);
RouterUtil.routeToApNode(rrs, schema.getDefaultApNode(), tableSet);
RouterUtil.routeToApNode(rrs, schema.getDefaultApNode(), tableSet, ctx.getTableAliasMap());
}
}
}
@@ -117,7 +117,7 @@ public class DefaultDruidParser implements DruidParser {
return null;
}
Map<String, String> tableAliasMap = new HashMap<>(originTableAliasMap);
Map<String, String> tableAliasMap = new LinkedHashMap<>(originTableAliasMap);
for (Map.Entry<String, String> entry : originTableAliasMap.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();

View File

@@ -199,7 +199,7 @@ public final class ConditionUtil {
return table;
}
private static Pair<String, String> getTableInfo(Map<String, String> tableAliasMap, String tableFullName, String defaultSchema) {
public static Pair<String, String> getTableInfo(Map<String, String> tableAliasMap, String tableFullName, String defaultSchema) {
if (tableAliasMap != null && tableAliasMap.get(tableFullName) != null &&
!tableAliasMap.get(tableFullName).equals(tableFullName)) {
tableFullName = tableAliasMap.get(tableFullName);

View File

@@ -221,13 +221,12 @@ public final class RouterUtil {
return rrs;
}
public static RouteResultset routeToApNode(RouteResultset rrs, String apNode, Set<String> tableSet) {
public static RouteResultset routeToApNode(RouteResultset rrs, String apNode, Set<String> tableSet, Map<String, String> tableAliasMap) {
if (apNode == null) {
return rrs;
}
RouteResultsetNode[] nodes = new RouteResultsetNode[1];
nodes[0] = new RouteResultsetNode(apNode, rrs.getSqlType(), rrs.getStatement(), tableSet);
nodes[0].setApNode(true);
nodes[0] = new RouteResultsetNode(apNode, rrs.getSqlType(), rrs.getStatement(), tableSet, tableAliasMap, true);
rrs.setNodes(nodes);
rrs.setFinishedRoute(true);
return rrs;
@@ -330,6 +329,10 @@ public final class RouterUtil {
}
RouteResultsetNode[] nodes = new RouteResultsetNode[1];
nodes[0] = new RouteResultsetNode(shardingNode, rrs.getSqlType(), rrs.getStatement(), tableSet);
if (rrs.getSqlType() == ServerParse.SELECT) {
boolean isApNode = DbleServer.getInstance().getConfig().getApNodes().containsKey(shardingNode);
nodes[0].setApNode(isApNode);
}
rrs.setNodes(nodes);
rrs.setFinishedRoute(true);
if (rrs.getCanRunInReadDB() != null) {

View File

@@ -253,7 +253,7 @@ public final class ExplainHandler {
routeNode = routes[0];
}
}
if (routeNode == null || routeNode.isApNode()) return null;
if (routeNode == null) return null;
PlanNode node = builder.getNode();
String sql = rrs.isHaveHintPlan2Inner() ? routeNode.getStatement() : node.getSql();
@@ -266,7 +266,7 @@ public final class ExplainHandler {
sql = sql.replace(tableToSimple.getKey(), tableToSimple.getValue());
}
}
return new RouteResultsetNode(routeNode.getName(), rrs.getSqlType(), sql);
return new RouteResultsetNode(routeNode.getName(), rrs.getSqlType(), sql, routeNode.getTableSet(), routeNode.getTableAliasMap(), routeNode.isApNode());
}
public static void writeOutHeadAndEof(ShardingService service, RouteResultset rrs) {