mirror of
https://github.com/actiontech/dble.git
synced 2026-02-14 01:29:01 -06:00
Merge branch 'master' into fix/1542
This commit is contained in:
@@ -347,6 +347,7 @@ public final class CharsetUtil {
|
||||
CHARSET_TO_JAVA.put("cp932", "MS932");
|
||||
CHARSET_TO_JAVA.put("eucjpms", "EUC_JP_Solaris");
|
||||
CHARSET_TO_JAVA.put("gb18030", "GB18030");
|
||||
CHARSET_TO_JAVA.put("iso-8859-1", "ISO-8859-1");
|
||||
}
|
||||
|
||||
public static String getCharset(int index) {
|
||||
|
||||
@@ -595,7 +595,11 @@ public abstract class BaseHandlerBuilder {
|
||||
boolean tryRouteToOneNode(List<DMLResponseHandler> pres) {
|
||||
List<DMLResponseHandler> merges = Lists.newArrayList();
|
||||
for (DMLResponseHandler preHandler : pres) {
|
||||
merges.addAll(preHandler.getMerges());
|
||||
List<DMLResponseHandler> baseMerges = preHandler.getMerges();
|
||||
if (HandlerBuilder.nestLoopCheck(baseMerges)) {
|
||||
return false;
|
||||
}
|
||||
merges.addAll(baseMerges);
|
||||
}
|
||||
Set<String> routeNodes = HandlerBuilder.canRouteToNodes(merges);
|
||||
if (routeNodes != null && routeNodes.size() > 0) {
|
||||
|
||||
@@ -128,15 +128,19 @@ public class HandlerBuilder {
|
||||
}
|
||||
|
||||
public boolean canAsWholeToSingle(List<DMLResponseHandler> merges) {
|
||||
if (merges.size() != 1)
|
||||
if (merges.size() != 1 || nestLoopCheck(merges))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
public static boolean nestLoopCheck(List<DMLResponseHandler> merges) {
|
||||
DMLResponseHandler next = merges.get(0).getNextHandler();
|
||||
while (next != null) {
|
||||
if (next instanceof TempTableHandler || (next instanceof SendMakeHandler && !((SendMakeHandler) next).getTableHandlers().isEmpty()))
|
||||
return false;
|
||||
return true;
|
||||
next = next.getNextHandler();
|
||||
}
|
||||
return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
// check whether the SQL can be directly sent to a single node
|
||||
|
||||
@@ -30,6 +30,9 @@ public class ChildTable implements Named {
|
||||
@XmlAttribute
|
||||
protected Integer sqlMaxLimit;
|
||||
|
||||
@XmlAttribute
|
||||
protected boolean specifyCharset;
|
||||
|
||||
protected List<ChildTable> childTable;
|
||||
|
||||
public String getName() {
|
||||
@@ -83,6 +86,13 @@ public class ChildTable implements Named {
|
||||
this.sqlMaxLimit = sqlMaxLimit;
|
||||
}
|
||||
|
||||
public boolean isSpecifyCharset() {
|
||||
return specifyCharset;
|
||||
}
|
||||
|
||||
public void setSpecifyCharset(boolean specifyCharset) {
|
||||
this.specifyCharset = specifyCharset;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
@@ -96,6 +106,8 @@ public class ChildTable implements Named {
|
||||
incrementColumn +
|
||||
", sqlMaxLimit=" +
|
||||
sqlMaxLimit +
|
||||
", specifyCharset=" +
|
||||
specifyCharset +
|
||||
", childTable=" +
|
||||
childTable +
|
||||
"]";
|
||||
|
||||
@@ -21,6 +21,8 @@ public abstract class Table implements Named {
|
||||
protected String shardingNode;
|
||||
@XmlAttribute
|
||||
protected Integer sqlMaxLimit;
|
||||
@XmlAttribute
|
||||
protected boolean specifyCharset;
|
||||
|
||||
public String getName() {
|
||||
return name;
|
||||
@@ -48,6 +50,15 @@ public abstract class Table implements Named {
|
||||
this.sqlMaxLimit = sqlMaxLimit;
|
||||
}
|
||||
|
||||
public boolean getSpecifyCharset() {
|
||||
return specifyCharset;
|
||||
}
|
||||
|
||||
public void setSpecifyCharset(boolean specifyCharset) {
|
||||
this.specifyCharset = specifyCharset;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "name=" +
|
||||
@@ -55,6 +66,8 @@ public abstract class Table implements Named {
|
||||
", shardingNode=" +
|
||||
shardingNode +
|
||||
", sqlMaxLimit=" +
|
||||
sqlMaxLimit;
|
||||
sqlMaxLimit +
|
||||
", specifyCharset=" +
|
||||
specifyCharset;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -294,6 +294,7 @@ public class ShardingConverter {
|
||||
String singleTableName = singleTable.getName();
|
||||
String singleTableSqlMaxLimitStr = null == singleTable.getSqlMaxLimit() ? null : String.valueOf(singleTable.getSqlMaxLimit());
|
||||
String singleTableShardingNode = singleTable.getShardingNode();
|
||||
boolean specifyCharset = singleTable.getSpecifyCharset();
|
||||
|
||||
if (StringUtil.isBlank(singleTableName)) {
|
||||
throw new ConfigException("one of tables' name is empty");
|
||||
@@ -316,7 +317,7 @@ public class ShardingConverter {
|
||||
if (StringUtil.isBlank(tableName)) {
|
||||
throw new ConfigException("one of table name of " + singleTableName + " is empty");
|
||||
}
|
||||
SingleTableConfig table = new SingleTableConfig(tableName, tableSqlMaxLimit, Arrays.asList(theShardingNodes));
|
||||
SingleTableConfig table = new SingleTableConfig(tableName, tableSqlMaxLimit, Arrays.asList(theShardingNodes), specifyCharset);
|
||||
checkShardingNodeExists(table.getShardingNodes(), shardingNodeConfigMap);
|
||||
if (tableConfigMap.containsKey(table.getName())) {
|
||||
throw new ConfigException("table " + tableName + " duplicated!");
|
||||
@@ -333,6 +334,7 @@ public class ShardingConverter {
|
||||
String globalTableCheckClass = Optional.ofNullable(globalTable.getCheckClass()).orElse(GLOBAL_TABLE_CHECK_DEFAULT);
|
||||
String globalTableCron = Optional.ofNullable(globalTable.getCron()).orElse(GLOBAL_TABLE_CHECK_DEFAULT_CRON).toUpperCase();
|
||||
boolean globalCheck = !StringUtil.isBlank(globalTable.getCheckClass());
|
||||
boolean specifyCharset = globalTable.getSpecifyCharset();
|
||||
|
||||
if (StringUtil.isBlank(globalTableName)) {
|
||||
throw new ConfigException("one of tables' name is empty");
|
||||
@@ -361,7 +363,7 @@ public class ShardingConverter {
|
||||
throw new ConfigException("one of table name of " + globalTableName + " is empty");
|
||||
}
|
||||
GlobalTableConfig table = new GlobalTableConfig(tableName, tableSqlMaxLimit, Arrays.asList(theShardingNodes),
|
||||
globalTableCron, globalTableCheckClass, globalCheck);
|
||||
globalTableCron, globalTableCheckClass, globalCheck, specifyCharset);
|
||||
checkShardingNodeExists(table.getShardingNodes(), shardingNodeConfigMap);
|
||||
if (tableConfigMap.containsKey(table.getName())) {
|
||||
throw new ConfigException("table " + tableName + " duplicated!");
|
||||
@@ -376,6 +378,7 @@ public class ShardingConverter {
|
||||
String shardingTableSqlMaxLimitStr = null == shardingTable.getSqlMaxLimit() ? null : String.valueOf(shardingTable.getSqlMaxLimit());
|
||||
String shardingTableShardingColumn = shardingTable.getShardingColumn();
|
||||
boolean shardingTableSqlRequiredSharding = Optional.ofNullable(shardingTable.getSqlRequiredSharding()).orElse(false);
|
||||
boolean specifyCharset = shardingTable.getSpecifyCharset();
|
||||
|
||||
if (StringUtil.isBlank(shardingTableName)) {
|
||||
throw new ConfigException("one of tables' name is empty");
|
||||
@@ -420,7 +423,7 @@ public class ShardingConverter {
|
||||
throw new ConfigException("one of table name of " + shardingTableName + " is empty");
|
||||
}
|
||||
ShardingTableConfig table = new ShardingTableConfig(tableName, tableSqlMaxLimit,
|
||||
lstShardingNode, shardingTableIncrementColumn, algorithm, shardingTableShardingColumn, shardingTableSqlRequiredSharding);
|
||||
lstShardingNode, shardingTableIncrementColumn, algorithm, shardingTableShardingColumn, shardingTableSqlRequiredSharding, specifyCharset);
|
||||
checkShardingNodeExists(table.getShardingNodes(), shardingNodeConfigMap);
|
||||
checkRuleSuitTable(table, shardingTableFunction, problemReporter);
|
||||
if (tableConfigMap.containsKey(table.getName())) {
|
||||
@@ -534,7 +537,7 @@ public class ShardingConverter {
|
||||
int tableSqlMaxLimit = getSqlMaxLimit(childTableSqlMaxLimitStr, schemaSqlMaxLimit);
|
||||
|
||||
ChildTableConfig table = new ChildTableConfig(childTableName, tableSqlMaxLimit, lstShardingNode,
|
||||
parentTable, childTableJoinColumn, childTableParentColumn, childTableIncrementColumn);
|
||||
parentTable, childTableJoinColumn, childTableParentColumn, childTableIncrementColumn, childTable.isSpecifyCharset());
|
||||
|
||||
if (tables.containsKey(table.getName())) {
|
||||
throw new ConfigException("table " + table.getName() + " duplicated!");
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
|
||||
*/
|
||||
|
||||
package com.actiontech.dble.services.mysqlauthenticate;
|
||||
package com.actiontech.dble.config.helper;
|
||||
|
||||
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
|
||||
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
|
||||
@@ -16,17 +16,20 @@ import java.util.*;
|
||||
import java.util.concurrent.locks.Condition;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
public class MysqlDatabaseHandler {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(MysqlDatabaseHandler.class);
|
||||
private static final String MYSQL_SHOW_DATABASES = "show databases";
|
||||
public class ShowDatabaseHandler {
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(ShowDatabaseHandler.class);
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
private Map<String, PhysicalDbGroup> dbGroups;
|
||||
private Set<String> databases = new HashSet<>();
|
||||
private final Condition finishCond = lock.newCondition();
|
||||
private boolean isFinish = false;
|
||||
private String showDatabases = "show databases";
|
||||
private String showDataBasesCols;
|
||||
|
||||
public MysqlDatabaseHandler(Map<String, PhysicalDbGroup> dbGroups) {
|
||||
|
||||
public ShowDatabaseHandler(Map<String, PhysicalDbGroup> dbGroups, String showDataBasesCols) {
|
||||
this.dbGroups = dbGroups;
|
||||
this.showDataBasesCols = showDataBasesCols;
|
||||
}
|
||||
|
||||
private void reset() {
|
||||
@@ -36,11 +39,10 @@ public class MysqlDatabaseHandler {
|
||||
|
||||
public Set<String> execute(String dbGroupName) {
|
||||
reset();
|
||||
String mysqlShowDataBasesCols = "Database";
|
||||
MultiRowSQLQueryResultHandler resultHandler = new MultiRowSQLQueryResultHandler(new String[]{mysqlShowDataBasesCols}, new MySQLShowDatabasesListener(mysqlShowDataBasesCols));
|
||||
MultiRowSQLQueryResultHandler resultHandler = new MultiRowSQLQueryResultHandler(new String[]{showDataBasesCols}, new ShowDatabasesListener(showDataBasesCols));
|
||||
PhysicalDbInstance ds = getPhysicalDbInstance(dbGroupName);
|
||||
if (ds != null) {
|
||||
SQLJob sqlJob = new SQLJob(MYSQL_SHOW_DATABASES, null, resultHandler, ds);
|
||||
SQLJob sqlJob = new SQLJob(showDatabases, null, resultHandler, ds);
|
||||
sqlJob.run();
|
||||
waitDone();
|
||||
} else {
|
||||
@@ -49,13 +51,13 @@ public class MysqlDatabaseHandler {
|
||||
return new HashSet<>(databases);
|
||||
}
|
||||
|
||||
|
||||
// for dryrun
|
||||
public Set<String> execute(PhysicalDbInstance ds) {
|
||||
reset();
|
||||
String mysqlShowDataBasesCols = "Database";
|
||||
MultiRowSQLQueryResultHandler resultHandler = new MultiRowSQLQueryResultHandler(new String[]{mysqlShowDataBasesCols}, new MySQLShowDatabasesListener(mysqlShowDataBasesCols));
|
||||
MultiRowSQLQueryResultHandler resultHandler = new MultiRowSQLQueryResultHandler(new String[]{showDataBasesCols}, new ShowDatabasesListener(showDataBasesCols));
|
||||
if (ds != null) {
|
||||
OneTimeConnJob sqlJob = new OneTimeConnJob(MYSQL_SHOW_DATABASES, null, resultHandler, ds);
|
||||
OneTimeConnJob sqlJob = new OneTimeConnJob(showDatabases, null, resultHandler, ds);
|
||||
sqlJob.run();
|
||||
waitDone();
|
||||
} else {
|
||||
@@ -96,17 +98,17 @@ public class MysqlDatabaseHandler {
|
||||
finishCond.await();
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOGGER.info("[MysqlDatabaseHandler] conn Interrupted: " + e);
|
||||
LOGGER.info("[ClickHouseDatabaseHandler] conn Interrupted: " + e);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private class MySQLShowDatabasesListener implements SQLQueryResultListener<SQLQueryResult<List<Map<String, String>>>> {
|
||||
private String mysqlShowDataBasesCol;
|
||||
private class ShowDatabasesListener implements SQLQueryResultListener<SQLQueryResult<List<Map<String, String>>>> {
|
||||
private String showDataBasesCol;
|
||||
|
||||
MySQLShowDatabasesListener(String mysqlShowDataBasesCol) {
|
||||
this.mysqlShowDataBasesCol = mysqlShowDataBasesCol;
|
||||
ShowDatabasesListener(String clickhouseShowDataBasesCol) {
|
||||
this.showDataBasesCol = clickhouseShowDataBasesCol;
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -114,7 +116,7 @@ public class MysqlDatabaseHandler {
|
||||
if (result.isSuccess()) {
|
||||
List<Map<String, String>> rows = result.getResult();
|
||||
for (Map<String, String> row : rows) {
|
||||
String databaseName = row.get(mysqlShowDataBasesCol);
|
||||
String databaseName = row.get(showDataBasesCol);
|
||||
databases.add(databaseName);
|
||||
}
|
||||
}
|
||||
@@ -15,11 +15,13 @@ public abstract class BaseTableConfig {
|
||||
protected final String name;
|
||||
protected final int maxLimit;
|
||||
protected final List<String> shardingNodes;
|
||||
protected boolean specifyCharset;
|
||||
|
||||
BaseTableConfig(String name, int maxLimit, List<String> shardingNodes) {
|
||||
BaseTableConfig(String name, int maxLimit, List<String> shardingNodes, boolean specifyCharset) {
|
||||
this.name = name;
|
||||
this.maxLimit = maxLimit;
|
||||
this.shardingNodes = shardingNodes;
|
||||
this.specifyCharset = specifyCharset;
|
||||
}
|
||||
|
||||
public int getId() {
|
||||
@@ -43,6 +45,10 @@ public abstract class BaseTableConfig {
|
||||
return shardingNodes;
|
||||
}
|
||||
|
||||
public boolean isSpecifyCharset() {
|
||||
return specifyCharset;
|
||||
}
|
||||
|
||||
public abstract BaseTableConfig lowerCaseCopy(BaseTableConfig parent);
|
||||
|
||||
|
||||
|
||||
@@ -17,8 +17,8 @@ public class ChildTableConfig extends BaseTableConfig {
|
||||
private final String locateRTableKeySql;
|
||||
private final ShardingTableConfig directRouteTC;
|
||||
|
||||
public ChildTableConfig(String name, int maxLimit, List<String> shardingNodes, BaseTableConfig parentTC, String joinColumn, String parentColumn, String incrementColumn) {
|
||||
super(name, maxLimit, shardingNodes);
|
||||
public ChildTableConfig(String name, int maxLimit, List<String> shardingNodes, BaseTableConfig parentTC, String joinColumn, String parentColumn, String incrementColumn, boolean specifyCharset) {
|
||||
super(name, maxLimit, shardingNodes, specifyCharset);
|
||||
this.parentTC = parentTC;
|
||||
this.joinColumn = joinColumn;
|
||||
this.parentColumn = parentColumn;
|
||||
@@ -34,7 +34,7 @@ public class ChildTableConfig extends BaseTableConfig {
|
||||
@Override
|
||||
public BaseTableConfig lowerCaseCopy(BaseTableConfig parent) {
|
||||
ChildTableConfig config = new ChildTableConfig(this.name.toLowerCase(), this.maxLimit, this.shardingNodes,
|
||||
parent, this.joinColumn, this.parentColumn, this.incrementColumn);
|
||||
parent, this.joinColumn, this.parentColumn, this.incrementColumn, this.specifyCharset);
|
||||
config.setId(this.getId());
|
||||
return config;
|
||||
}
|
||||
|
||||
@@ -15,13 +15,14 @@ public class GlobalTableConfig extends BaseTableConfig {
|
||||
private final String cron;
|
||||
private final String checkClass;
|
||||
|
||||
public GlobalTableConfig(String name, int maxLimit, List<String> shardingNodes, String cron, String checkClass, boolean globalCheck) {
|
||||
super(name, maxLimit, shardingNodes);
|
||||
public GlobalTableConfig(String name, int maxLimit, List<String> shardingNodes, String cron, String checkClass, boolean globalCheck, boolean specifyCharset) {
|
||||
super(name, maxLimit, shardingNodes, specifyCharset);
|
||||
this.cron = cron;
|
||||
this.checkClass = checkClass;
|
||||
this.globalCheck = globalCheck;
|
||||
}
|
||||
|
||||
|
||||
public boolean isGlobalCheck() {
|
||||
return globalCheck;
|
||||
}
|
||||
@@ -36,7 +37,7 @@ public class GlobalTableConfig extends BaseTableConfig {
|
||||
|
||||
@Override
|
||||
public BaseTableConfig lowerCaseCopy(BaseTableConfig parent) {
|
||||
GlobalTableConfig config = new GlobalTableConfig(this.name.toLowerCase(), this.maxLimit, this.shardingNodes, this.cron, this.checkClass, this.globalCheck);
|
||||
GlobalTableConfig config = new GlobalTableConfig(this.name.toLowerCase(), this.maxLimit, this.shardingNodes, this.cron, this.checkClass, this.globalCheck, this.specifyCharset);
|
||||
config.setId(this.getId());
|
||||
return config;
|
||||
}
|
||||
|
||||
@@ -17,8 +17,8 @@ public class ShardingTableConfig extends BaseTableConfig {
|
||||
private final boolean sqlRequiredSharding;
|
||||
|
||||
public ShardingTableConfig(String name, int maxLimit, List<String> shardingNodes, String incrementColumn,
|
||||
AbstractPartitionAlgorithm function, String shardingColumn, boolean sqlRequiredSharding) {
|
||||
super(name, maxLimit, shardingNodes);
|
||||
AbstractPartitionAlgorithm function, String shardingColumn, boolean sqlRequiredSharding, boolean specifyCharset) {
|
||||
super(name, maxLimit, shardingNodes, specifyCharset);
|
||||
this.incrementColumn = incrementColumn;
|
||||
this.function = function;
|
||||
this.shardingColumn = shardingColumn;
|
||||
@@ -45,7 +45,7 @@ public class ShardingTableConfig extends BaseTableConfig {
|
||||
@Override
|
||||
public BaseTableConfig lowerCaseCopy(BaseTableConfig parent) {
|
||||
ShardingTableConfig config = new ShardingTableConfig(this.name.toLowerCase(), this.maxLimit, this.shardingNodes,
|
||||
this.incrementColumn, this.function, this.shardingColumn, this.sqlRequiredSharding);
|
||||
this.incrementColumn, this.function, this.shardingColumn, this.sqlRequiredSharding, this.specifyCharset);
|
||||
config.setId(this.getId());
|
||||
return config;
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ public class ShardingTableFakeConfig extends ShardingTableConfig {
|
||||
|
||||
public ShardingTableFakeConfig(String name, int maxLimit, List<String> shardingNodes, AbstractPartitionAlgorithm function, String createSql) {
|
||||
super(name, maxLimit, shardingNodes, null, function,
|
||||
MetaHelper.electionShardingColumn(createSql).toUpperCase(), false);
|
||||
MetaHelper.electionShardingColumn(createSql).toUpperCase(), false, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -9,13 +9,13 @@ import java.util.List;
|
||||
|
||||
public final class SingleTableConfig extends BaseTableConfig {
|
||||
|
||||
public SingleTableConfig(String name, int maxLimit, List<String> shardingNodes) {
|
||||
super(name, maxLimit, shardingNodes);
|
||||
public SingleTableConfig(String name, int maxLimit, List<String> shardingNodes, boolean specifyCharset) {
|
||||
super(name, maxLimit, shardingNodes, specifyCharset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BaseTableConfig lowerCaseCopy(BaseTableConfig parent) {
|
||||
SingleTableConfig config = new SingleTableConfig(this.name.toLowerCase(), this.maxLimit, this.shardingNodes);
|
||||
SingleTableConfig config = new SingleTableConfig(this.name.toLowerCase(), this.maxLimit, this.shardingNodes, this.specifyCharset);
|
||||
config.setId(getId());
|
||||
return config;
|
||||
}
|
||||
|
||||
@@ -7,7 +7,7 @@ package com.actiontech.dble.config.model.user;
|
||||
|
||||
import com.actiontech.dble.DbleServer;
|
||||
import com.actiontech.dble.config.ErrorCode;
|
||||
import com.actiontech.dble.services.mysqlauthenticate.MysqlDatabaseHandler;
|
||||
import com.actiontech.dble.config.helper.ShowDatabaseHandler;
|
||||
import com.actiontech.dble.util.StringUtil;
|
||||
import com.alibaba.druid.wall.WallProvider;
|
||||
|
||||
@@ -31,7 +31,7 @@ public class AnalysisUserConfig extends SingleDbGroupUserConfig {
|
||||
return 0;
|
||||
}
|
||||
boolean exist;
|
||||
Set<String> schemas = new MysqlDatabaseHandler(DbleServer.getInstance().getConfig().getDbGroups()).execute(dbGroup);
|
||||
Set<String> schemas = new ShowDatabaseHandler(DbleServer.getInstance().getConfig().getDbGroups(), "name").execute(dbGroup);
|
||||
if (DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames()) {
|
||||
Optional<String> result = schemas.stream().filter(item -> StringUtil.equals(item.toLowerCase(), schema.toLowerCase())).findFirst();
|
||||
exist = result.isPresent();
|
||||
|
||||
@@ -7,7 +7,7 @@ package com.actiontech.dble.config.model.user;
|
||||
|
||||
import com.actiontech.dble.DbleServer;
|
||||
import com.actiontech.dble.config.ErrorCode;
|
||||
import com.actiontech.dble.services.mysqlauthenticate.MysqlDatabaseHandler;
|
||||
import com.actiontech.dble.config.helper.ShowDatabaseHandler;
|
||||
import com.actiontech.dble.util.StringUtil;
|
||||
import com.alibaba.druid.wall.WallProvider;
|
||||
|
||||
@@ -39,7 +39,7 @@ public class RwSplitUserConfig extends SingleDbGroupUserConfig {
|
||||
return 0;
|
||||
}
|
||||
boolean exist;
|
||||
Set<String> schemas = new MysqlDatabaseHandler(DbleServer.getInstance().getConfig().getDbGroups()).execute(dbGroup);
|
||||
Set<String> schemas = new ShowDatabaseHandler(DbleServer.getInstance().getConfig().getDbGroups(), "Database").execute(dbGroup);
|
||||
if (DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames()) {
|
||||
Optional<String> result = schemas.stream().filter(item -> StringUtil.equals(item.toLowerCase(), schema.toLowerCase())).findFirst();
|
||||
exist = result.isPresent();
|
||||
|
||||
@@ -22,7 +22,6 @@ import com.actiontech.dble.route.RouteResultsetNode;
|
||||
import com.actiontech.dble.util.CollectionUtil;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public final class ComplexQueryPlanUtil {
|
||||
private ComplexQueryPlanUtil() {
|
||||
@@ -82,7 +81,7 @@ public final class ComplexQueryPlanUtil {
|
||||
if (!dependenciesSet.isEmpty()) {
|
||||
dependencies = dependenciesSet;
|
||||
} else if (!CollectionUtil.isEmpty(dependencies)) {
|
||||
dependencies.stream().filter(dependency -> dependency.startsWith(JoinNode.Strategy.HINT_NEST_LOOP.name())).collect(Collectors.toSet()).clear();
|
||||
dependencies.removeIf(dependency -> dependency.startsWith(JoinNode.Strategy.HINT_NEST_LOOP.name()));
|
||||
}
|
||||
String mergeName = getMergeType(mergeHandler);
|
||||
List<BaseSelectHandler> mergeList = new ArrayList<>();
|
||||
|
||||
@@ -1007,6 +1007,10 @@ public class ServerSchemaStatVisitor extends MySqlSchemaStatVisitor {
|
||||
return currentTable;
|
||||
}
|
||||
|
||||
public void setCurrentTable(String currentTable) {
|
||||
this.currentTable = currentTable;
|
||||
}
|
||||
|
||||
private static void mergeOuterRelations(WhereUnit whereUnit) {
|
||||
if (whereUnit.getSubWhereUnit().size() > 0) {
|
||||
for (WhereUnit sub : whereUnit.getSubWhereUnit()) {
|
||||
|
||||
@@ -59,6 +59,7 @@ public class DruidInsertParser extends DruidInsertReplaceParser {
|
||||
}
|
||||
|
||||
schema = schemaInfo.getSchemaConfig();
|
||||
visitor.setCurrentTable(schemaInfo.getTable());
|
||||
if (insert.getQuery() != null) {
|
||||
tryRouteInsertQuery(service, rrs, stmt, visitor, schemaInfo);
|
||||
return schema;
|
||||
|
||||
@@ -134,6 +134,7 @@ abstract class DruidInsertReplaceParser extends DruidModifyParser {
|
||||
if (shardingValue == null && !(valueExpr instanceof SQLNullExpr)) {
|
||||
throw new SQLNonTransientException("Not Supported of Sharding Value EXPR :" + valueExpr.toString());
|
||||
}
|
||||
shardingValue = StringUtil.isoCharsetReplace(clientCharset, shardingValue);
|
||||
return shardingValue;
|
||||
}
|
||||
|
||||
|
||||
@@ -570,7 +570,7 @@ public final class RouterUtil {
|
||||
value = HexFormatUtil.fromHex(((SQLHexExpr) originValue).getHex(), CharsetUtil.getJavaCharset(clientCharset));
|
||||
}
|
||||
} else {
|
||||
value = originValue.toString();
|
||||
value = StringUtil.isoCharsetReplace(clientCharset, originValue.toString());
|
||||
}
|
||||
Integer nodeIndex = tc.getFunction().calculate(value);
|
||||
if (nodeIndex == null) {
|
||||
|
||||
@@ -549,21 +549,21 @@ public class DbleDbInstance extends ManagerWritableTable {
|
||||
}
|
||||
|
||||
private void checkChineseProperty(String val, String name) {
|
||||
if (StringUtil.isBlank(val)) {
|
||||
throw new ConfigException("Column [ " + name + " ] " + val + " is illegal, the value not be null or empty");
|
||||
}
|
||||
int length = 11;
|
||||
if (val.length() > length) {
|
||||
throw new ConfigException("Column [ " + name + " ] " + val + " is illegal, the value contains a maximum of " + length + " characters");
|
||||
}
|
||||
|
||||
String chinese = val.replaceAll(DBConverter.PATTERN_DB.toString(), "");
|
||||
if (Strings.isNullOrEmpty(chinese)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!StringUtil.isChinese(chinese)) {
|
||||
throw new ConfigException("Column [ " + name + " ] " + val + " is illegal,the " + Charset.defaultCharset().name() + " encoding is recommended, Column [ " + name + " ] show be use u4E00-u9FA5a-zA-Z_0-9\\-\\.");
|
||||
if (Objects.nonNull(val)) {
|
||||
if (StringUtil.isBlank(val)) {
|
||||
throw new ConfigException("Column [ " + name + " ] " + val + " is illegal, the value not be null or empty");
|
||||
}
|
||||
int length = 11;
|
||||
if (val.length() > length) {
|
||||
throw new ConfigException("Column [ " + name + " ] " + val + " is illegal, the value contains a maximum of " + length + " characters");
|
||||
}
|
||||
String chinese = val.replaceAll(DBConverter.PATTERN_DB.toString(), "");
|
||||
if (Strings.isNullOrEmpty(chinese)) {
|
||||
return;
|
||||
}
|
||||
if (!StringUtil.isChinese(chinese)) {
|
||||
throw new ConfigException("Column [ " + name + " ] " + val + " is illegal,the " + Charset.defaultCharset().name() + " encoding is recommended, Column [ " + name + " ] show be use u4E00-u9FA5a-zA-Z_0-9\\-\\.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
|
||||
package com.actiontech.dble.services.manager.response;
|
||||
|
||||
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
|
||||
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
|
||||
import com.actiontech.dble.backend.datasource.ShardingNode;
|
||||
import com.actiontech.dble.backend.mysql.PacketUtil;
|
||||
@@ -16,8 +17,10 @@ import com.actiontech.dble.config.converter.DBConverter;
|
||||
import com.actiontech.dble.config.converter.SequenceConverter;
|
||||
import com.actiontech.dble.config.converter.ShardingConverter;
|
||||
import com.actiontech.dble.config.converter.UserConverter;
|
||||
import com.actiontech.dble.config.helper.ShowDatabaseHandler;
|
||||
import com.actiontech.dble.config.model.ClusterConfig;
|
||||
import com.actiontech.dble.config.model.SystemConfig;
|
||||
import com.actiontech.dble.config.model.db.type.DataBaseType;
|
||||
import com.actiontech.dble.config.model.sharding.SchemaConfig;
|
||||
import com.actiontech.dble.config.model.sharding.table.BaseTableConfig;
|
||||
import com.actiontech.dble.config.model.user.ManagerUserConfig;
|
||||
@@ -30,9 +33,9 @@ import com.actiontech.dble.net.mysql.*;
|
||||
import com.actiontech.dble.server.variables.SystemVariables;
|
||||
import com.actiontech.dble.server.variables.VarsExtractorHandler;
|
||||
import com.actiontech.dble.services.manager.ManagerService;
|
||||
import com.actiontech.dble.services.mysqlauthenticate.MysqlDatabaseHandler;
|
||||
import com.actiontech.dble.singleton.TraceManager;
|
||||
import com.actiontech.dble.util.StringUtil;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -160,8 +163,32 @@ public final class DryRun {
|
||||
}
|
||||
|
||||
private static Map<String, Set<String>> getExistSchemas(ServerConfig serverConfig) {
|
||||
Map<String, Set<String>> schemaMap = Maps.newHashMap();
|
||||
Map<String, PhysicalDbGroup> dbGroups = serverConfig.getDbGroups();
|
||||
Map<String, PhysicalDbGroup> mysqlDbGroups = Maps.newHashMap();
|
||||
Map<String, PhysicalDbGroup> clickHouseDbGroups = Maps.newHashMap();
|
||||
dbGroups.forEach((k, v) -> {
|
||||
DataBaseType dataBaseType = v.getDbGroupConfig().instanceDatabaseType();
|
||||
if (dataBaseType == DataBaseType.MYSQL) {
|
||||
mysqlDbGroups.put(k, v);
|
||||
} else {
|
||||
clickHouseDbGroups.put(k, v);
|
||||
}
|
||||
});
|
||||
|
||||
if (!mysqlDbGroups.isEmpty()) {
|
||||
ShowDatabaseHandler mysqlShowDatabaseHandler = new ShowDatabaseHandler(mysqlDbGroups, "Database");
|
||||
schemaMap.putAll(getSchemaMap(mysqlShowDatabaseHandler));
|
||||
}
|
||||
if (!clickHouseDbGroups.isEmpty()) {
|
||||
ShowDatabaseHandler clickHouseDatabaseHandler = new ShowDatabaseHandler(clickHouseDbGroups, "name");
|
||||
schemaMap.putAll(getSchemaMap(clickHouseDatabaseHandler));
|
||||
}
|
||||
return schemaMap;
|
||||
}
|
||||
|
||||
private static Map<String, Set<String>> getSchemaMap(ShowDatabaseHandler databaseHandler) {
|
||||
Map<String, Set<String>> schemaMap = new HashMap<>();
|
||||
MysqlDatabaseHandler databaseHandler = new MysqlDatabaseHandler(serverConfig.getDbGroups());
|
||||
List<PhysicalDbInstance> physicalDbInstances = databaseHandler.getPhysicalDbInstances();
|
||||
physicalDbInstances.forEach(ds -> {
|
||||
Set<String> schemaSet = databaseHandler.execute(ds);
|
||||
|
||||
@@ -8,12 +8,24 @@ package com.actiontech.dble.services.mysqlsharding;
|
||||
import com.actiontech.dble.DbleServer;
|
||||
import com.actiontech.dble.backend.mysql.MySQLMessage;
|
||||
import com.actiontech.dble.config.ErrorCode;
|
||||
import com.actiontech.dble.config.model.sharding.SchemaConfig;
|
||||
import com.actiontech.dble.config.model.sharding.table.BaseTableConfig;
|
||||
import com.actiontech.dble.net.mysql.OkPacket;
|
||||
import com.actiontech.dble.route.RouteResultset;
|
||||
import com.actiontech.dble.route.parser.druid.DruidParser;
|
||||
import com.actiontech.dble.route.parser.druid.DruidParserFactory;
|
||||
import com.actiontech.dble.route.parser.druid.ServerSchemaStatVisitor;
|
||||
import com.actiontech.dble.route.parser.util.DruidUtil;
|
||||
import com.actiontech.dble.server.response.FieldList;
|
||||
import com.actiontech.dble.util.StringUtil;
|
||||
import com.alibaba.druid.sql.ast.SQLStatement;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Created by szf on 2020/7/2.
|
||||
@@ -62,11 +74,19 @@ public class MySQLProtoLogicHandler {
|
||||
|
||||
public void query(byte[] data) {
|
||||
this.multiQueryData = data;
|
||||
String sql = null;
|
||||
String sql;
|
||||
int position = 5;
|
||||
MySQLMessage mm;
|
||||
try {
|
||||
MySQLMessage mm = new MySQLMessage(data);
|
||||
mm.position(5);
|
||||
sql = mm.readString(service.getCharset().getClient());
|
||||
mm = new MySQLMessage(data);
|
||||
mm.position(position);
|
||||
String clientCharset = service.getCharset().getClient();
|
||||
sql = mm.readString(clientCharset);
|
||||
if (!StringUtil.byteEqual(data, sql.getBytes(clientCharset), position)) {
|
||||
mm = new MySQLMessage(data);
|
||||
mm.position(position);
|
||||
sql = getSql(sql, mm);
|
||||
}
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
service.writeErrMessage(ErrorCode.ER_UNKNOWN_CHARACTER_SET, "Unknown charset '" + service.getCharset().getClient() + "'");
|
||||
return;
|
||||
@@ -101,4 +121,33 @@ public class MySQLProtoLogicHandler {
|
||||
public byte[] getMultiQueryData() {
|
||||
return multiQueryData;
|
||||
}
|
||||
|
||||
|
||||
private String getSql(String sql, MySQLMessage mm) {
|
||||
try {
|
||||
SQLStatement statement = DruidUtil.parseSQL(sql);
|
||||
SchemaConfig schemaConfig = DbleServer.getInstance().getConfig().getSchemas().get(service.getSchema());
|
||||
RouteResultset rrs = new RouteResultset(sql, -1);
|
||||
DruidParser druidParser = DruidParserFactory.create(statement, rrs.getSqlType(), service);
|
||||
ServerSchemaStatVisitor visitor = new ServerSchemaStatVisitor(schemaConfig.getName());
|
||||
druidParser.parser(schemaConfig, rrs, statement, visitor, service, true);
|
||||
Map<String, BaseTableConfig> tables = schemaConfig.getTables();
|
||||
HashSet<String> tableSet = Sets.newHashSet(visitor.getAliasMap().values());
|
||||
tableSet.add(visitor.getCurrentTable());
|
||||
boolean specifyCharset;
|
||||
for (String tableName : tableSet) {
|
||||
specifyCharset = tables.get(tableName).isSpecifyCharset();
|
||||
if (specifyCharset) {
|
||||
sql = mm.readString(StringUtil.ISO_8859_1);
|
||||
LOGGER.warn("Enforces {} to String, clientCharset sql is {}", StringUtil.ISO_8859_1, sql);
|
||||
return sql;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// ignore exception
|
||||
return sql;
|
||||
}
|
||||
return sql;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ import com.actiontech.dble.services.mysqlauthenticate.MySQLBackAuthService;
|
||||
import com.actiontech.dble.services.rwsplit.RWSplitService;
|
||||
import com.actiontech.dble.singleton.TraceManager;
|
||||
import com.actiontech.dble.statistic.sql.StatisticListener;
|
||||
import com.actiontech.dble.util.StringUtil;
|
||||
import com.actiontech.dble.util.TimeUtil;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
@@ -42,6 +43,7 @@ import java.nio.ByteBuffer;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
@@ -333,7 +335,7 @@ public class MySQLResponseService extends BackendService {
|
||||
packet.setPacketId(0);
|
||||
packet.setCommand(MySQLPacket.COM_QUERY);
|
||||
try {
|
||||
packet.setArg(query.getBytes(CharsetUtil.getJavaCharset(clientCharset.getClient())));
|
||||
packet.setArg(query.getBytes(getCharset(query, clientCharset)));
|
||||
} catch (UnsupportedEncodingException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
@@ -347,6 +349,19 @@ public class MySQLResponseService extends BackendService {
|
||||
}
|
||||
}
|
||||
|
||||
private String getCharset(String sql, CharsetNames clientCharset) throws UnsupportedEncodingException {
|
||||
String javaCharset = CharsetUtil.getJavaCharset(clientCharset.getClient());
|
||||
if (Objects.isNull(session)) {
|
||||
return javaCharset;
|
||||
}
|
||||
String clientSql = new String(sql.getBytes(javaCharset), javaCharset);
|
||||
if (StringUtil.equals(sql, clientSql)) {
|
||||
return javaCharset;
|
||||
} else {
|
||||
return StringUtil.ISO_8859_1;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean innerRelease() {
|
||||
if (isRowDataFlowing) {
|
||||
|
||||
@@ -40,6 +40,7 @@ import java.io.IOException;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.util.HashSet;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
@@ -436,9 +437,9 @@ public class RWSplitService extends BusinessService<SingleDbGroupUserConfig> {
|
||||
setLockTable(false);
|
||||
inLoadData = false;
|
||||
txStarted = false;
|
||||
this.tmpTableSet.clear();
|
||||
this.sysVariables.clear();
|
||||
this.usrVariables.clear();
|
||||
Optional.ofNullable(tmpTableSet).ifPresent((tmpTables) -> tmpTables.clear());
|
||||
Optional.ofNullable(sysVariables).ifPresent((sysVariableMap) -> sysVariableMap.clear());
|
||||
Optional.ofNullable(usrVariables).ifPresent((usrVariableMap) -> usrVariableMap.clear());
|
||||
autocommit = SystemConfig.getInstance().getAutocommit() == 1;
|
||||
txIsolation = SystemConfig.getInstance().getTxIsolation();
|
||||
setCharacterSet(SystemConfig.getInstance().getCharset());
|
||||
|
||||
@@ -11,7 +11,12 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.CharsetDecoder;
|
||||
import java.nio.charset.CodingErrorAction;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
@@ -24,6 +29,7 @@ public final class StringUtil {
|
||||
}
|
||||
|
||||
public static final String TABLE_COLUMN_SEPARATOR = ".";
|
||||
public static final String ISO_8859_1 = "ISO-8859-1";
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(StringUtil.class);
|
||||
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
|
||||
@@ -612,6 +618,64 @@ public final class StringUtil {
|
||||
return orgStr;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Refer to String source code handling code,Whether the encoding can parse String properly
|
||||
*
|
||||
* @param charsetEncode
|
||||
* @param values
|
||||
* @return
|
||||
*/
|
||||
public static boolean charsetParseString(String charsetEncode, String values) {
|
||||
try {
|
||||
Charset charset = Charset.forName(charsetEncode);
|
||||
CharsetDecoder decoder = charset.newDecoder().onMalformedInput(CodingErrorAction.REPORT).onUnmappableCharacter(CodingErrorAction.REPORT);
|
||||
decoder.decode(ByteBuffer.wrap(values.getBytes(charsetEncode)));
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* iso Charset may need to be replaced with the correct Charset
|
||||
*
|
||||
* @param clientCharset
|
||||
* @param value
|
||||
* @return
|
||||
*/
|
||||
public static String isoCharsetReplace(String clientCharset, String value) {
|
||||
try {
|
||||
String isoValues = new String(value.getBytes(ISO_8859_1), ISO_8859_1);
|
||||
String clientValues = new String(value.getBytes(clientCharset), clientCharset);
|
||||
if (!equals(isoValues, clientValues) && equals(value, clientValues)) {
|
||||
return value;
|
||||
}
|
||||
if (charsetParseString(clientCharset, value)) {
|
||||
value = new String(value.getBytes(ISO_8859_1), clientCharset);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
return value;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
public static boolean byteEqual(byte[] a, byte[] b, int i) {
|
||||
if (Objects.isNull(a) || Objects.isNull(b)) {
|
||||
return false;
|
||||
}
|
||||
int length = a.length - i;
|
||||
if (length != b.length || length <= 0) {
|
||||
return false;
|
||||
}
|
||||
for (int len = 0; len < length; len++) {
|
||||
if (a[len + i] != b[len]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public static boolean isDoubleOrFloat(String str) {
|
||||
Pattern pattern = Pattern.compile("^[-\\+]?[.\\d]*$");
|
||||
return pattern.matcher(str).matches();
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
<xs:attribute name="parentColumn" type="xs:string" use="required"/>
|
||||
<xs:attribute name="incrementColumn" type="xs:string"/>
|
||||
<xs:attribute name="sqlMaxLimit" type="xs:integer"/>
|
||||
<xs:attribute name="specifyCharset" type="xs:boolean"/>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
</xs:sequence>
|
||||
@@ -41,6 +42,7 @@
|
||||
<xs:attribute name="sqlRequiredSharding" type="xs:boolean"/>
|
||||
<xs:attribute name="incrementColumn" type="xs:string"/>
|
||||
<xs:attribute name="sqlMaxLimit" type="xs:integer"/>
|
||||
<xs:attribute name="specifyCharset" type="xs:boolean"/>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
<xs:element maxOccurs="unbounded" name="globalTable">
|
||||
@@ -50,6 +52,7 @@
|
||||
<xs:attribute name="sqlMaxLimit" type="xs:integer"/>
|
||||
<xs:attribute name="cron" type="xs:string"/>
|
||||
<xs:attribute name="checkClass" type="xs:string"/>
|
||||
<xs:attribute name="specifyCharset" type="xs:boolean"/>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
<xs:element maxOccurs="unbounded" name="singleTable">
|
||||
@@ -57,6 +60,7 @@
|
||||
<xs:attribute name="name" type="xs:string" use="required"/>
|
||||
<xs:attribute name="shardingNode" type="xs:string" use="required"/>
|
||||
<xs:attribute name="sqlMaxLimit" type="xs:integer"/>
|
||||
<xs:attribute name="specifyCharset" type="xs:boolean"/>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
</xs:choice>
|
||||
|
||||
Reference in New Issue
Block a user