Merge pull request #3761 from actiontech/inner-2218_2

[inner-2218] during startup/dryrun/reload, apNode and associated configuration need to be detected
This commit is contained in:
wenyh
2023-07-18 16:22:19 +08:00
committed by GitHub
20 changed files with 723 additions and 310 deletions
@@ -469,7 +469,7 @@ public final class DbleServer {
config.reviseLowerCase();
config.selfChecking0();
config.loadSequence(DbleTempConfig.getInstance().getSequenceConfig());
ConfigUtil.setSchemasForPool(config.getDbGroups(), config.getShardingNodes());
ConfigUtil.setSchemasForPool(config.getDbGroups(), config.getAllNodes());
} else {
config.selfChecking0();
config.loadSequence(DbleTempConfig.getInstance().getSequenceConfig());
@@ -39,6 +39,7 @@ public final class AlarmCode {
public static final String TEST_CONN_FAIL = "DBLE_TEST_CONN_FAIL";
public static final String HEARTBEAT_FAIL = "DBLE_HEARTBEAT_FAIL"; //Resolve by trigger
public static final String SHARDING_NODE_LACK = "DBLE_SHARDING_NODE_LACK"; //Resolve by trigger
public static final String AP_NODE_LACK = "DBLE_AP_NODE_LACK"; //Resolve by trigger
public static final String DB_INSTANCE_LOWER_CASE_ERROR = "DBLE_DB_INSTANCE_LOWER_CASE_ERROR"; //Resolve by trigger
public static final String DB_SLAVE_INSTANCE_DELAY = "DBLE_DB_SLAVE_INSTANCE_DELAY"; //Resolve by trigger
public static final String DB_MASTER_INSTANCE_DELAY_FAIL = "DB_MASTER_INSTANCE_DELAY_FAIL";
@@ -92,4 +92,8 @@ public final class AlertUtil {
public static String getTableLackKey(String node, String table) {
return "shardingNode[" + node + "]:Table[" + table + "]";
}
public static String getTableLackKey2(String node, String table) {
return "apNode[" + node + "]:Table[" + table + "]";
}
}
@@ -18,6 +18,7 @@ public final class ToResolveContainer {
public static final Set<String> TABLE_LACK = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
public static final Set<String> GLOBAL_TABLE_CONSISTENCY = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
public static final Set<String> SHARDING_NODE_LACK = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
public static final Set<String> AP_NODE_LACK = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
public static final Set<String> CREATE_CONN_FAIL = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
public static final Set<String> REACH_MAX_CON = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
public static final Set<String> XA_WRITE_CHECK_POINT_FAIL = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
@@ -1,8 +1,9 @@
package com.actiontech.dble.backend.datasource;
public class ApNode extends ShardingNode {
public class ApNode extends BaseNode {
public ApNode(String dbGroupName, String hostName, String database, PhysicalDbGroup dbGroup) {
super(dbGroupName, hostName, database, dbGroup);
nodeType = "ap";
}
}
@@ -0,0 +1,168 @@
package com.actiontech.dble.backend.datasource;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.singleton.TraceManager;
import com.actiontech.dble.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Objects;
public abstract class BaseNode {
protected static final Logger LOGGER = LoggerFactory.getLogger(BaseNode.class);
protected final String name;
protected final String dbGroupName;
protected String database;
protected volatile PhysicalDbGroup dbGroup;
protected volatile boolean isSchemaExists = false;
protected String nodeType;
public BaseNode(String dbGroupName, String hostName, String database, PhysicalDbGroup dbGroup) {
this.dbGroupName = dbGroupName;
this.name = hostName;
this.database = database;
this.dbGroup = dbGroup;
}
public String getNodeType() {
return nodeType;
}
public boolean isSchemaExists() {
return isSchemaExists;
}
public void setSchemaExists(boolean schemaExists) {
isSchemaExists = schemaExists;
}
public String getName() {
return name;
}
public String getDbGroupName() {
return dbGroupName;
}
public PhysicalDbGroup getDbGroup() {
return dbGroup;
}
public void setDbGroup(PhysicalDbGroup dbGroup) {
this.dbGroup = dbGroup;
}
public String getDatabase() {
return database;
}
public void toLowerCase() {
this.database = database.toLowerCase();
}
/**
* get connection from the same dbInstance
*/
public void getConnectionFromSameSource(String schema, BackendConnection exitsCon, ResponseHandler handler,
Object attachment) throws Exception {
PhysicalDbInstance ds = this.dbGroup.findDbInstance(exitsCon);
if (ds == null) {
throw new RuntimeException("can't find exits connection, maybe finished " + exitsCon);
} else {
ds.getConnection(schema, handler, attachment, false);
}
}
private void checkRequest(String schema) {
if (schema != null && !schema.equals(this.database)) {
throw new RuntimeException("invalid param ,connection request db is :" + schema +
" and schema db is " + this.database);
}
}
public void syncGetConnection(String schema, boolean isMustWrite, boolean autoCommit, RouteResultsetNode rrs,
ResponseHandler handler, Object attachment) throws Exception {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("get-connection-from-" + nodeType + "-node");
try {
checkRequest(schema);
PhysicalDbInstance instance = dbGroup.select(canRunOnMaster(rrs, !isMustWrite && autoCommit), rrs.isForUpdate(), localRead(rrs.getSqlType()));
instance.syncGetConnection(schema, handler, attachment, isMustWrite);
} finally {
TraceManager.finishSpan(traceObject);
}
}
public void getConnection(String schema, boolean isMustWrite, boolean autoCommit, RouteResultsetNode rrs,
ResponseHandler handler, Object attachment) throws Exception {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("get-connection-from-" + nodeType + "-node");
try {
checkRequest(schema);
PhysicalDbInstance instance = dbGroup.select(canRunOnMaster(rrs, !isMustWrite && autoCommit), rrs.isForUpdate(), localRead(rrs.getSqlType()));
instance.getConnection(schema, handler, attachment, isMustWrite);
} finally {
TraceManager.finishSpan(traceObject);
}
}
public BackendConnection getConnection(String schema, boolean autocommit, Object attachment) throws IOException {
checkRequest(schema);
RouteResultsetNode rrs = (RouteResultsetNode) attachment;
PhysicalDbInstance instance = dbGroup.select(canRunOnMaster(rrs, autocommit), rrs.isForUpdate(), localRead(rrs.getSqlType()));
return instance.getConnection(schema, attachment);
}
// if force master,set canRunInReadDB=false
// if force slave set runOnSlave,default null means not effect
private Boolean canRunOnMaster(RouteResultsetNode rrs, boolean autoCommit) {
Boolean master = null;
if (rrs.getRunOnSlave() == null) {
if (!rrs.canRunINReadDB(autoCommit)) {
master = true;
}
} else {
// force slave
if (rrs.getRunOnSlave()) {
master = false;
} else {
rrs.setCanRunInReadDB(false);
master = true;
}
}
return master;
}
private boolean localRead(int sqlType) {
return sqlType == ServerParse.SELECT;
}
public boolean equalsBaseInfo(BaseNode node) {
return StringUtil.equalsWithEmpty(this.name, node.getName()) &&
StringUtil.equalsWithEmpty(this.dbGroupName, node.getDbGroupName()) &&
StringUtil.equalsWithEmpty(this.database, node.getDatabase()) &&
this.dbGroup.equalsBaseInfo(node.getDbGroup());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
BaseNode that = (BaseNode) o;
return Objects.equals(name, that.name) &&
Objects.equals(dbGroupName, that.dbGroupName) &&
Objects.equals(database, that.database);
}
@Override
public int hashCode() {
return Objects.hash(name, dbGroupName, database);
}
}
@@ -97,7 +97,7 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
LOGGER.info("init dbInstance[{}] because {}, but it has been initialized, skip initialization.", this.dbGroup.getGroupName() + "." + name, reason);
return;
}
//minCon/maxCon/numOfShardingNodes
//minCon/maxCon/(numOfShardingNodes/numOfApNodes)
checkPoolSize();
LOGGER.info("init dbInstance[{}]", this.dbGroup.getGroupName() + "." + name);
@@ -112,7 +112,7 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
List<String> physicalSchemas = dbGroup.getSchemas();
int initSize = physicalSchemas.size();
if (size < initSize) {
LOGGER.warn("For db instance[{}], minIdle is less than (the count of shardingNodes), so dble will create at least 1 conn for every schema, " +
LOGGER.warn("For db instance[{}], minIdle is less than (the count of shardingNodes/apNodes), so dble will create at least 1 conn for every schema, " +
"minCon size before:{}, now:{}", this.dbGroup.getGroupName() + "." + name, size, initSize);
config.setMinCon(initSize);
}
@@ -120,7 +120,7 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
initSize = Math.max(initSize, config.getMinCon());
size = config.getMaxCon();
if (size < initSize) {
LOGGER.warn("For db instance[{}], maxTotal[{}] is less than the minCon or the count of shardingNodes,change the maxCon into {}", this.dbGroup.getGroupName() + "." + name, size, initSize);
LOGGER.warn("For db instance[{}], maxTotal[{}] is less than the minCon or the count of shardingNodes/apNodes,change the maxCon into {}", this.dbGroup.getGroupName() + "." + name, size, initSize);
config.setMaxCon(initSize);
}
}
@@ -530,7 +530,7 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
}
public void updatePoolCapacity() {
//minCon/maxCon/numOfShardingNodes
//minCon/maxCon/(numOfShardingNodes/numOfApNodes)
if ((dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this) && !dbGroup.isUseless()) {
checkPoolSize();
connectionPool.evictImmediately();
@@ -5,164 +5,10 @@
*/
package com.actiontech.dble.backend.datasource;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.singleton.TraceManager;
import com.actiontech.dble.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Objects;
public class ShardingNode {
protected static final Logger LOGGER = LoggerFactory.getLogger(ShardingNode.class);
protected final String name;
private final String dbGroupName;
protected String database;
protected volatile PhysicalDbGroup dbGroup;
private volatile boolean isSchemaExists = false;
public class ShardingNode extends BaseNode {
public ShardingNode(String dbGroupName, String hostName, String database, PhysicalDbGroup dbGroup) {
this.dbGroupName = dbGroupName;
this.name = hostName;
this.database = database;
this.dbGroup = dbGroup;
}
public boolean isSchemaExists() {
return isSchemaExists;
}
public void setSchemaExists(boolean schemaExists) {
isSchemaExists = schemaExists;
}
public String getName() {
return name;
}
public String getDbGroupName() {
return dbGroupName;
}
public PhysicalDbGroup getDbGroup() {
return dbGroup;
}
public void setDbGroup(PhysicalDbGroup dbGroup) {
this.dbGroup = dbGroup;
}
public String getDatabase() {
return database;
}
public void toLowerCase() {
this.database = database.toLowerCase();
}
/**
* get connection from the same dbInstance
*/
public void getConnectionFromSameSource(String schema, BackendConnection exitsCon, ResponseHandler handler,
Object attachment) throws Exception {
PhysicalDbInstance ds = this.dbGroup.findDbInstance(exitsCon);
if (ds == null) {
throw new RuntimeException("can't find exits connection, maybe finished " + exitsCon);
} else {
ds.getConnection(schema, handler, attachment, false);
}
}
private void checkRequest(String schema) {
if (schema != null && !schema.equals(this.database)) {
throw new RuntimeException("invalid param ,connection request db is :" + schema +
" and schema db is " + this.database);
}
}
public void syncGetConnection(String schema, boolean isMustWrite, boolean autoCommit, RouteResultsetNode rrs,
ResponseHandler handler, Object attachment) throws Exception {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("get-connection-from-sharding-node");
try {
checkRequest(schema);
PhysicalDbInstance instance = dbGroup.select(canRunOnMaster(rrs, !isMustWrite && autoCommit), rrs.isForUpdate(), localRead(rrs.getSqlType()));
instance.syncGetConnection(schema, handler, attachment, isMustWrite);
} finally {
TraceManager.finishSpan(traceObject);
}
}
public void getConnection(String schema, boolean isMustWrite, boolean autoCommit, RouteResultsetNode rrs,
ResponseHandler handler, Object attachment) throws Exception {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("get-connection-from-sharding-node");
try {
checkRequest(schema);
PhysicalDbInstance instance = dbGroup.select(canRunOnMaster(rrs, !isMustWrite && autoCommit), rrs.isForUpdate(), localRead(rrs.getSqlType()));
instance.getConnection(schema, handler, attachment, isMustWrite);
} finally {
TraceManager.finishSpan(traceObject);
}
}
public BackendConnection getConnection(String schema, boolean autocommit, Object attachment) throws IOException {
checkRequest(schema);
RouteResultsetNode rrs = (RouteResultsetNode) attachment;
PhysicalDbInstance instance = dbGroup.select(canRunOnMaster(rrs, autocommit), rrs.isForUpdate(), localRead(rrs.getSqlType()));
return instance.getConnection(schema, attachment);
}
// if force master,set canRunInReadDB=false
// if force slave set runOnSlave,default null means not effect
private Boolean canRunOnMaster(RouteResultsetNode rrs, boolean autoCommit) {
Boolean master = null;
if (rrs.getRunOnSlave() == null) {
if (!rrs.canRunINReadDB(autoCommit)) {
master = true;
}
} else {
// force slave
if (rrs.getRunOnSlave()) {
master = false;
} else {
rrs.setCanRunInReadDB(false);
master = true;
}
}
return master;
}
private boolean localRead(int sqlType) {
return sqlType == ServerParse.SELECT;
}
public boolean equalsBaseInfo(ShardingNode shardingNode) {
return StringUtil.equalsWithEmpty(this.name, shardingNode.getName()) &&
StringUtil.equalsWithEmpty(this.dbGroupName, shardingNode.getDbGroupName()) &&
StringUtil.equalsWithEmpty(this.database, shardingNode.getDatabase()) &&
this.dbGroup.equalsBaseInfo(shardingNode.getDbGroup());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardingNode that = (ShardingNode) o;
return Objects.equals(name, that.name) &&
Objects.equals(dbGroupName, that.dbGroupName) &&
Objects.equals(database, that.database);
}
@Override
public int hashCode() {
return Objects.hash(name, dbGroupName, database);
super(dbGroupName, hostName, database, dbGroup);
nodeType = "sharding";
}
}
@@ -63,4 +63,5 @@ public interface ResponseHandler {
*/
void connectionClose(@Nonnull AbstractService service, String reason);
}
@@ -5,10 +5,7 @@
*/
package com.actiontech.dble.config;
import com.actiontech.dble.backend.datasource.ApNode;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.datasource.ShardingNode;
import com.actiontech.dble.backend.datasource.*;
import com.actiontech.dble.cluster.values.RawJson;
import com.actiontech.dble.cluster.zkprocess.entity.Shardings;
import com.actiontech.dble.config.converter.DBConverter;
@@ -16,6 +13,7 @@ import com.actiontech.dble.config.converter.SequenceConverter;
import com.actiontech.dble.config.converter.ShardingConverter;
import com.actiontech.dble.config.converter.UserConverter;
import com.actiontech.dble.config.helper.TestSchemasTask;
import com.actiontech.dble.config.helper.TestSchemasTaskForClickHouse;
import com.actiontech.dble.config.helper.TestTask;
import com.actiontech.dble.config.model.ClusterConfig;
import com.actiontech.dble.config.model.SystemConfig;
@@ -32,6 +30,7 @@ import com.actiontech.dble.services.manager.response.ChangeItem;
import com.actiontech.dble.services.manager.response.ChangeItemType;
import com.actiontech.dble.services.manager.response.ChangeType;
import com.actiontech.dble.singleton.TraceManager;
import com.actiontech.dble.util.CollectionUtil;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.slf4j.Logger;
@@ -373,7 +372,7 @@ public class ConfigInitializer implements ProblemReporter {
dbGroup = entry.getValue();
dbGroupName = entry.getKey();
// sharding group
// db group
List<Pair<String, String>> schemaList = null;
if (hostSchemaMap.containsKey(dbGroupName)) {
schemaList = hostSchemaMap.get(entry.getKey());
@@ -392,7 +391,7 @@ public class ConfigInitializer implements ProblemReporter {
ds.setTestConnSuccess(false);
continue;
}
if (!testDbInstance(dbGroupName, ds, schemaList)) {
if (!testDbInstance(dbGroupName, ds, schemaList, ds.getConfig().getDataBaseType())) {
isAllDbInstanceConnected = false;
errDbInstanceNames.add("dbInstance[" + dbGroupName + "." + ds.getName() + "]");
}
@@ -463,7 +462,7 @@ public class ConfigInitializer implements ProblemReporter {
ds.setTestConnSuccess(false);
return true;
}
return testDbInstance(dbGroupName, ds, schemaList);
return testDbInstance(dbGroupName, ds, schemaList, ds.getConfig().getDataBaseType());
}
@@ -485,7 +484,7 @@ public class ConfigInitializer implements ProblemReporter {
}
}
private boolean testDbInstance(String dbGroupName, PhysicalDbInstance ds, List<Pair<String, String>> schemaList) {
private boolean testDbInstance(String dbGroupName, PhysicalDbInstance ds, List<Pair<String, String>> schemaList, DataBaseType dataBaseType) {
boolean isConnectivity = true;
String dbInstanceKey = "dbInstance[" + dbGroupName + "." + ds.getName() + "]";
try {
@@ -499,10 +498,16 @@ public class ConfigInitializer implements ProblemReporter {
errorInfos.add(new ErrorInfo("Backend", "WARNING", "Can't connect to [" + dbInstanceKey + "]"));
LOGGER.warn("SelfCheck### can't connect to [" + dbInstanceKey + "]");
isConnectivity = false;
} else if (schemaList != null) {
TestSchemasTask testSchemaTask = new TestSchemasTask(shardingNodes, ds, schemaList, !ds.isReadInstance());
testSchemaTask.start();
testSchemaTask.join(3000);
} else if (!CollectionUtil.isEmpty(schemaList)) {
if (dataBaseType == DataBaseType.MYSQL) {
TestSchemasTask testSchemaTask = new TestSchemasTask(shardingNodes, ds, schemaList, !ds.isReadInstance());
testSchemaTask.start();
testSchemaTask.join(3000);
} else {
TestSchemasTaskForClickHouse testSchemaTask2 = new TestSchemasTaskForClickHouse(apNodes, ds, schemaList, !ds.isReadInstance());
testSchemaTask2.start();
testSchemaTask2.join(3000);
}
} else {
LOGGER.info("SelfCheck### connect to [" + dbInstanceKey + "] successfully.");
}
@@ -524,6 +529,12 @@ public class ConfigInitializer implements ProblemReporter {
nodes.add(new Pair<>(shardingNode.getName(), shardingNode.getDatabase()));
}
}
if (apNodes != null) {
for (ApNode apNode : apNodes.values()) {
List<Pair<String, String>> nodes = dbInstanceSchemaMap.computeIfAbsent(apNode.getDbGroupName(), k -> new ArrayList<>(8));
nodes.add(new Pair<>(apNode.getName(), apNode.getDatabase()));
}
}
return dbInstanceSchemaMap;
}
@@ -17,6 +17,7 @@ import com.actiontech.dble.cluster.zkprocess.entity.DbGroups;
import com.actiontech.dble.cluster.zkprocess.entity.Shardings;
import com.actiontech.dble.cluster.zkprocess.entity.Users;
import com.actiontech.dble.cluster.zkprocess.parse.XmlProcessBase;
import com.actiontech.dble.config.model.db.type.DataBaseType;
import com.actiontech.dble.config.model.sharding.SchemaConfig;
import com.actiontech.dble.config.model.sharding.table.BaseTableConfig;
import com.actiontech.dble.config.model.sharding.table.ERTable;
@@ -37,6 +38,7 @@ import com.actiontech.dble.services.manager.response.ChangeItem;
import com.actiontech.dble.services.manager.response.ChangeItemType;
import com.actiontech.dble.services.manager.response.ChangeType;
import com.actiontech.dble.singleton.*;
import com.actiontech.dble.util.CollectionUtil;
import com.actiontech.dble.util.StringUtil;
import com.actiontech.dble.util.TimeUtil;
import com.google.common.collect.Maps;
@@ -90,7 +92,7 @@ public class ServerConfig {
this.funcNodeERMap = confInitNew.getFuncNodeERMap();
this.functions = confInitNew.getFunctions();
this.fullyConfigured = confInitNew.isFullyConfigured();
ConfigUtil.setSchemasForPool(dbGroups, shardingNodes);
ConfigUtil.setSchemasForPool(dbGroups, getAllNodes());
this.reloadTime = TimeUtil.currentTimeMillis();
@@ -114,7 +116,7 @@ public class ServerConfig {
this.funcNodeERMap = confInit.getFuncNodeERMap();
this.functions = confInit.getFunctions();
this.fullyConfigured = confInit.isFullyConfigured();
ConfigUtil.setSchemasForPool(dbGroups, shardingNodes);
ConfigUtil.setSchemasForPool(dbGroups, getAllNodes());
this.reloadTime = TimeUtil.currentTimeMillis();
@@ -134,7 +136,7 @@ public class ServerConfig {
this.funcNodeERMap = confInitNew.getFuncNodeERMap();
this.functions = confInitNew.getFunctions();
this.fullyConfigured = confInitNew.isFullyConfigured();
ConfigUtil.setSchemasForPool(dbGroups, shardingNodes);
ConfigUtil.setSchemasForPool(dbGroups, getAllNodes());
this.reloadTime = TimeUtil.currentTimeMillis();
@@ -207,6 +209,14 @@ public class ServerConfig {
return apNodes;
}
public Map<String, BaseNode> getAllNodes() {
waitIfChanging();
Map<String, BaseNode> all = new HashMap<>(shardingNodes);
if (!CollectionUtil.isEmpty(apNodes))
all.putAll(apNodes);
return all;
}
public Map<String, PhysicalDbGroup> getDbGroups() {
waitIfChanging();
return dbGroups;
@@ -232,21 +242,21 @@ public class ServerConfig {
}
public boolean reload(Map<UserName, UserConfig> newUsers, Map<String, SchemaConfig> newSchemas,
Map<String, ShardingNode> newShardingNodes, Map<String, PhysicalDbGroup> newDbGroups,
Map<String, ShardingNode> newShardingNodes, Map<String, ApNode> newApNodes, Map<String, PhysicalDbGroup> newDbGroups,
Map<String, PhysicalDbGroup> oldDbGroups,
Map<ERTable, Set<ERTable>> newErRelations,
Map<String, Set<ERTable>> newFuncNodeERMap,
SystemVariables newSystemVariables, boolean isFullyConfigured,
final int loadAllMode, Map<String, Properties> newBlacklistConfig, Map<String, AbstractPartitionAlgorithm> newFunctions,
RawJson userJsonConfig, RawJson sequenceJsonConfig, RawJson shardingJsonConfig, RawJson dbJsonConfig, List<ChangeItem> changeItemList) throws SQLNonTransientException {
boolean result = apply(newUsers, newSchemas, newShardingNodes, newDbGroups, oldDbGroups, newErRelations, newFuncNodeERMap,
boolean result = apply(newUsers, newSchemas, newShardingNodes, newApNodes, newDbGroups, oldDbGroups, newErRelations, newFuncNodeERMap,
newSystemVariables, isFullyConfigured, loadAllMode, newBlacklistConfig, newFunctions, userJsonConfig,
sequenceJsonConfig, shardingJsonConfig, dbJsonConfig, changeItemList);
this.reloadTime = TimeUtil.currentTimeMillis();
return result;
}
private void calcDiffForMetaData(Map<String, SchemaConfig> newSchemas, Map<String, ShardingNode> newShardingNodes, int loadAllMode,
private void calcDiffForMetaData(Map<String, SchemaConfig> newSchemas, Map<String, ShardingNode> newShardingNodes, Map<String, ApNode> newApNodes, int loadAllMode,
List<Pair<String, String>> delTables, List<Pair<String, String>> reloadTables,
List<String> delSchema, List<String> reloadSchema) {
for (Map.Entry<String, SchemaConfig> schemaEntry : this.schemas.entrySet()) {
@@ -260,20 +270,22 @@ public class ServerConfig {
delSchema.add(oldSchema);
reloadSchema.add(oldSchema);
} else {
if (newSchemaConfig.getDefaultShardingNodes() != null) { // reload config_all
if (newSchemaConfig.getDefaultShardingNodes() != null || !StringUtil.isBlank(newSchemaConfig.getDefaultApNode())) { // reload config_all
//check shardingNode and dbGroup change
List<String> strShardingNodes = newSchemaConfig.getDefaultShardingNodes();
if (isShardingNodeChanged(strShardingNodes, newShardingNodes)) {
String strApNode = newSchemaConfig.getDefaultApNode();
if (isShardingNodeChanged(strShardingNodes, newShardingNodes) || isApNodeChanged(strApNode, newApNodes)) {
delSchema.add(oldSchema);
reloadSchema.add(oldSchema);
continue;
} else if ((loadAllMode & ManagerParseConfig.OPTS_MODE) == 0 && isDbGroupChanged(strShardingNodes, newShardingNodes)) { // reload @@config_all not contains -s
} else if ((loadAllMode & ManagerParseConfig.OPTS_MODE) == 0 &&
(isDbGroupChanged(strShardingNodes, newShardingNodes) || isDbGroupChangedByApNode(strApNode, newApNodes))) { // reload @@config_all not contains -s
delSchema.add(oldSchema);
reloadSchema.add(oldSchema);
continue;
}
}
calcTableDiffForMetaData(newShardingNodes, loadAllMode, delTables, reloadTables, oldSchema, newSchemaConfig, oldSchemaConfig);
calcTableDiffForMetaData(newShardingNodes, newApNodes, loadAllMode, delTables, reloadTables, oldSchema, newSchemaConfig, oldSchemaConfig);
}
}
}
@@ -287,7 +299,8 @@ public class ServerConfig {
}
}
private void calcTableDiffForMetaData(Map<String, ShardingNode> newShardingNodes, int loadAllMode, List<Pair<String, String>> delTables, List<Pair<String, String>> reloadTables, String oldSchema, SchemaConfig newSchemaConfig, SchemaConfig oldSchemaConfig) {
private void calcTableDiffForMetaData(Map<String, ShardingNode> newShardingNodes, Map<String, ApNode> newApNodes, int loadAllMode,
List<Pair<String, String>> delTables, List<Pair<String, String>> reloadTables, String oldSchema, SchemaConfig newSchemaConfig, SchemaConfig oldSchemaConfig) {
for (Map.Entry<String, BaseTableConfig> tableEntry : oldSchemaConfig.getTables().entrySet()) {
String oldTable = tableEntry.getKey();
BaseTableConfig newTableConfig = newSchemaConfig.getTables().get(oldTable);
@@ -339,6 +352,17 @@ public class ServerConfig {
return false;
}
private boolean isApNodeChanged(String strApNode, Map<String, ApNode> newApNodes) {
ApNode newDBNode = newApNodes.get(strApNode);
ApNode oldDBNode = apNodes.get(strApNode);
if (!oldDBNode.getDatabase().equals(newDBNode.getDatabase()) ||
oldDBNode.getDbGroup() == null ||
!oldDBNode.getDbGroup().getGroupName().equals(newDBNode.getDbGroup().getGroupName())) {
return true;
}
return false;
}
private boolean isDbGroupChanged(List<String> strShardingNodes, Map<String, ShardingNode> newShardingNodes) {
for (String strShardingNode : strShardingNodes) {
PhysicalDbGroup newDBPool = newShardingNodes.get(strShardingNode).getDbGroup();
@@ -351,9 +375,20 @@ public class ServerConfig {
return false;
}
private boolean isDbGroupChangedByApNode(String strApNodes, Map<String, ApNode> newApNodes) {
PhysicalDbGroup newDBPool = newApNodes.get(strApNodes).getDbGroup();
PhysicalDbGroup oldDBPool = apNodes.get(strApNodes).getDbGroup();
PhysicalDbGroupDiff diff = new PhysicalDbGroupDiff(oldDBPool, newDBPool);
if (!PhysicalDbGroupDiff.CHANGE_TYPE_NO.equals(diff.getChangeType())) {
return true;
}
return false;
}
private boolean apply(Map<UserName, UserConfig> newUsers,
Map<String, SchemaConfig> newSchemas,
Map<String, ShardingNode> newShardingNodes,
Map<String, ApNode> newApNodes,
Map<String, PhysicalDbGroup> newDbGroups,
Map<String, PhysicalDbGroup> oldDbGroups,
Map<ERTable, Set<ERTable>> newErRelations,
@@ -367,7 +402,7 @@ public class ServerConfig {
List<String> reloadSchema = new ArrayList<>();
if (isFullyConfigured) {
ReloadLogHelper.briefInfo("calcDiffForMetaData ...");
calcDiffForMetaData(newSchemas, newShardingNodes, loadAllMode, delTables, reloadTables, delSchema, reloadSchema);
calcDiffForMetaData(newSchemas, newShardingNodes, newApNodes, loadAllMode, delTables, reloadTables, delSchema, reloadSchema);
}
final ReentrantLock metaLock = ProxyMeta.getInstance().getTmManager().getMetaLock();
metaLock.lock();
@@ -386,7 +421,7 @@ public class ServerConfig {
ReloadLogHelper.briefInfo("init new dbGroup start");
if ((loadAllMode & ManagerParseConfig.OPTR_MODE) != 0) {
//all dbGroup reload & recycle
initDbGroupByMap(oldDbGroups, newDbGroups, newShardingNodes, isFullyConfigured, loadAllMode);
initDbGroupByMap(oldDbGroups, newDbGroups, newShardingNodes, newApNodes, isFullyConfigured, loadAllMode);
} else {
//replace dbGroup reference
for (Map.Entry<String, ShardingNode> shardingNodeEntry : newShardingNodes.entrySet()) {
@@ -397,13 +432,22 @@ public class ServerConfig {
}
shardingNode.setDbGroup(oldDbGroup);
}
for (Map.Entry<String, ApNode> apNodeEntry : newApNodes.entrySet()) {
ApNode apNode = apNodeEntry.getValue();
PhysicalDbGroup oldDbGroup = oldDbGroups.get(apNode.getDbGroupName());
if (null == oldDbGroup) {
oldDbGroup = newDbGroups.get(apNode.getDbGroupName());
}
apNode.setDbGroup(oldDbGroup);
}
//only change dbGroup reload & recycle
initDbGroupByMap(changeItemList, oldDbGroups, newShardingNodes, isFullyConfigured, loadAllMode);
initDbGroupByMap(changeItemList, oldDbGroups, newShardingNodes, newApNodes, isFullyConfigured, loadAllMode);
newDbGroups = oldDbGroups;
}
ReloadLogHelper.briefInfo("init new dbGroup end");
ReloadLogHelper.briefInfo("config the transformation ...");
this.shardingNodes = newShardingNodes;
this.apNodes = newApNodes;
this.dbGroups = newDbGroups;
this.fullyConfigured = isFullyConfigured;
DbleServer.getInstance().reloadSystemVariables(newSystemVariables);
@@ -468,7 +512,7 @@ public class ServerConfig {
}
private static void initDbGroupByMap(Map<String, PhysicalDbGroup> oldDbGroups, Map<String, PhysicalDbGroup> newDbGroups,
Map<String, ShardingNode> newShardingNodes, boolean fullyConfigured, int loadAllMode) {
Map<String, ShardingNode> newShardingNodes, Map<String, ApNode> newApNodes, boolean fullyConfigured, int loadAllMode) {
if (oldDbGroups != null) {
//Only -r uses this method to recycle the connection pool
String recycleGroupName;
@@ -487,10 +531,19 @@ public class ServerConfig {
String hostName = dbGroup.getGroupName();
// set schemas
ArrayList<String> dnSchemas = new ArrayList<>(30);
for (ShardingNode dn : newShardingNodes.values()) {
if (dn.getDbGroup().getGroupName().equals(hostName)) {
dn.setDbGroup(dbGroup);
dnSchemas.add(dn.getDatabase());
if (dbGroup.getDbGroupConfig().instanceDatabaseType() == DataBaseType.MYSQL) {
for (ShardingNode dn : newShardingNodes.values()) {
if (dn.getDbGroup().getGroupName().equals(hostName)) {
dn.setDbGroup(dbGroup);
dnSchemas.add(dn.getDatabase());
}
}
} else {
for (ApNode dn : newApNodes.values()) {
if (dn.getDbGroup().getGroupName().equals(hostName)) {
dn.setDbGroup(dbGroup);
dnSchemas.add(dn.getDatabase());
}
}
}
dbGroup.setSchemas(dnSchemas);
@@ -503,7 +556,8 @@ public class ServerConfig {
}
private void initDbGroupByMap(List<ChangeItem> changeItemList, Map<String, PhysicalDbGroup> oldDbGroupMap, Map<String, ShardingNode> newShardingNodes,
private void initDbGroupByMap(List<ChangeItem> changeItemList, Map<String, PhysicalDbGroup> oldDbGroupMap,
Map<String, ShardingNode> newShardingNodes, Map<String, ApNode> newApNodes,
boolean isFullyConfigured, int loadAllMode) {
Map<ChangeItem, PhysicalDbGroup> updateDbGroupMap = Maps.newHashMap();
for (ChangeItem changeItem : changeItemList) {
@@ -511,10 +565,10 @@ public class ServerConfig {
ChangeItemType itemType = changeItem.getItemType();
switch (changeItem.getType()) {
case ADD:
addItem(item, itemType, oldDbGroupMap, newShardingNodes, isFullyConfigured);
addItem(item, itemType, oldDbGroupMap, newShardingNodes, newApNodes, isFullyConfigured);
break;
case UPDATE:
updateItem(item, itemType, oldDbGroupMap, newShardingNodes, changeItem, updateDbGroupMap, loadAllMode);
updateItem(item, itemType, oldDbGroupMap, newShardingNodes, newApNodes, changeItem, updateDbGroupMap, loadAllMode);
break;
case DELETE:
deleteItem(item, itemType, oldDbGroupMap, loadAllMode);
@@ -561,7 +615,8 @@ public class ServerConfig {
}
}
private void updateItem(Object item, ChangeItemType itemType, Map<String, PhysicalDbGroup> oldDbGroupMap, Map<String, ShardingNode> newShardingNodes, ChangeItem changeItem,
private void updateItem(Object item, ChangeItemType itemType, Map<String, PhysicalDbGroup> oldDbGroupMap,
Map<String, ShardingNode> newShardingNodes, Map<String, ApNode> newApNodes, ChangeItem changeItem,
Map<ChangeItem, PhysicalDbGroup> updateDbGroupMap, int loadAllMode) {
if (itemType == ChangeItemType.PHYSICAL_DB_GROUP) {
//change dbGroup
@@ -586,7 +641,7 @@ public class ServerConfig {
if (!dbGroupCopy) {
oldDbGroup.copyBaseInfo(physicalDbGroup);
}
reloadSchema(oldDbGroup, newShardingNodes);
reloadSchema(oldDbGroup, newShardingNodes, newApNodes);
if (changeItem.isAffectConnectionPool()) {
if (physicalDbGroup.getRwSplitMode() == 0) {
oldDbGroup.stopPool("reload config, recycle read instance", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0), false);
@@ -654,11 +709,12 @@ public class ServerConfig {
return dbInstance;
}
private void addItem(Object item, ChangeItemType itemType, Map<String, PhysicalDbGroup> oldDbGroupMap, Map<String, ShardingNode> newShardingNodes, boolean isFullyConfigured) {
private void addItem(Object item, ChangeItemType itemType, Map<String, PhysicalDbGroup> oldDbGroupMap,
Map<String, ShardingNode> newShardingNodes, Map<String, ApNode> newApNodes, boolean isFullyConfigured) {
if (itemType == ChangeItemType.PHYSICAL_DB_GROUP) {
//add dbGroup+dbInstance
PhysicalDbGroup physicalDbGroup = (PhysicalDbGroup) item;
initDbGroup(physicalDbGroup, newShardingNodes, isFullyConfigured);
initDbGroup(physicalDbGroup, newShardingNodes, newApNodes, isFullyConfigured);
oldDbGroupMap.put(physicalDbGroup.getGroupName(), physicalDbGroup);
} else if (itemType == ChangeItemType.PHYSICAL_DB_INSTANCE) {
//add dbInstance
@@ -683,7 +739,7 @@ public class ServerConfig {
}
}
public static void reloadSchema(PhysicalDbGroup dbGroup, Map<String, ShardingNode> newShardingNodes) {
public static void reloadSchema(PhysicalDbGroup dbGroup, Map<String, ShardingNode> newShardingNodes, Map<String, ApNode> newApNodes) {
String hostName = dbGroup.getGroupName();
// set schemas
ArrayList<String> dnSchemas = new ArrayList<>(30);
@@ -693,12 +749,18 @@ public class ServerConfig {
dnSchemas.add(dn.getDatabase());
}
}
for (ApNode dn : newApNodes.values()) {
if (dn.getDbGroup().getGroupName().equals(hostName)) {
dn.setDbGroup(dbGroup);
dnSchemas.add(dn.getDatabase());
}
}
dbGroup.setSchemas(dnSchemas);
}
private static void initDbGroup(PhysicalDbGroup dbGroup, Map<String, ShardingNode> newShardingNodes, boolean fullyConfigured) {
reloadSchema(dbGroup, newShardingNodes);
private static void initDbGroup(PhysicalDbGroup dbGroup, Map<String, ShardingNode> newShardingNodes, Map<String, ApNode> newApNodes, boolean fullyConfigured) {
reloadSchema(dbGroup, newShardingNodes, newApNodes);
if (fullyConfigured) {
dbGroup.init("reload config");
} else {
@@ -787,6 +849,10 @@ public class ServerConfig {
physicalDBNode.toLowerCase();
}
for (ApNode physicalDBNode : apNodes.values()) {
physicalDBNode.toLowerCase();
}
//schemas
Map<String, SchemaConfig> newSchemas = new HashMap<>();
for (Map.Entry<String, SchemaConfig> entry : schemas.entrySet()) {
@@ -0,0 +1,119 @@
/*
* Copyright (C) 2016-2023 ActionTech.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.config.helper;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.datasource.ApNode;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.sqlengine.MultiRowSQLQueryResultHandler;
import com.actiontech.dble.sqlengine.OneTimeConnJob;
import com.actiontech.dble.sqlengine.SQLQueryResult;
import com.actiontech.dble.sqlengine.SQLQueryResultListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TestSchemasTaskForClickHouse extends Thread {
private static final Logger LOGGER = LoggerFactory.getLogger(TestSchemasTaskForClickHouse.class);
private final PhysicalDbInstance ds;
private final Map<String, String> nodes = new HashMap<>();
private final Map<String, ApNode> apNodes;
private final boolean needAlert;
private final ReentrantLock lock = new ReentrantLock();
private final Condition finishCond = lock.newCondition();
private boolean isFinish = false;
public TestSchemasTaskForClickHouse(Map<String, ApNode> apNodes, PhysicalDbInstance ds,
List<Pair<String, String>> nodeList, boolean needAlert) {
this.ds = ds;
this.needAlert = needAlert;
this.apNodes = apNodes;
for (Pair<String, String> node : nodeList) {
nodes.put(node.getValue(), node.getKey()); // apNode->node
}
}
public Map<String, String> getNodes() {
return nodes;
}
@Override
public void run() {
String mysqlShowDataBasesCols = "name";
MultiRowSQLQueryResultHandler resultHandler = new MultiRowSQLQueryResultHandler(new String[]{mysqlShowDataBasesCols}, new MySQLShowDatabasesListener(mysqlShowDataBasesCols));
OneTimeConnJob sqlJob = new OneTimeConnJob("show databases", null, resultHandler, ds);
sqlJob.run();
lock.lock();
try {
while (!isFinish) {
finishCond.await();
}
} catch (InterruptedException e) {
LOGGER.warn("test conn Interrupted:", e);
} finally {
lock.unlock();
}
}
private class MySQLShowDatabasesListener implements SQLQueryResultListener<SQLQueryResult<List<Map<String, String>>>> {
private String mysqlShowDataBasesCol;
MySQLShowDatabasesListener(String mysqlShowDataBasesCol) {
this.mysqlShowDataBasesCol = mysqlShowDataBasesCol;
}
@Override
public void onResult(SQLQueryResult<List<Map<String, String>>> result) {
if (result.isSuccess()) {
List<Map<String, String>> rows = result.getResult();
for (Map<String, String> row : rows) {
String schema = row.get(mysqlShowDataBasesCol);
if (DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames()) {
schema = schema.toLowerCase();
}
String nodeName = nodes.remove(schema);
if (nodeName != null) {
apNodes.get(nodeName).setSchemaExists(true);
String key = "dbGroup[" + ds.getDbGroupConfig().getName() + "." + ds.getConfig().getInstanceName() + "],apNode[" + nodeName + "],schema[" + schema + "]";
LOGGER.info("SelfCheck### test " + key + " database connection success ");
}
}
}
reportSchemaNotFound();
handleFinished();
}
private void reportSchemaNotFound() {
for (Map.Entry<String, String> node : nodes.entrySet()) {
String nodeName = node.getValue();
String key = "dbInstance[" + ds.getDbGroupConfig().getName() + "." + ds.getConfig().getInstanceName() + "],ap_node[" + nodeName + "],schema[" + node.getKey() + "]";
LOGGER.warn("SelfCheck### test " + key + " database connection fail ");
if (needAlert) {
/* Map<String, String> labels = AlertUtil.genSingleLabel("dbInstance", ds.getDbGroupConfig().getName() + "-" + ds.getConfig().getInstanceName());
labels.put("ap_node", nodeName);
AlertUtil.alert(AlarmCode.AP_NODE_LACK, Alert.AlertLevel.WARN, "{" + key + "} is lack", "mysql", ds.getConfig().getId(), labels);
ToResolveContainer.AP_NODE_LACK.add(key);*/
}
}
}
private void handleFinished() {
lock.lock();
try {
isFinish = true;
finishCond.signal();
} finally {
lock.unlock();
}
}
}
}
@@ -6,10 +6,7 @@
package com.actiontech.dble.config.util;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.datasource.ApNode;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.datasource.ShardingNode;
import com.actiontech.dble.backend.datasource.*;
import com.actiontech.dble.backend.mysql.VersionUtil;
import com.actiontech.dble.config.ConfigInitializer;
import com.actiontech.dble.config.DbleTempConfig;
@@ -24,6 +21,7 @@ import com.actiontech.dble.services.manager.response.ChangeItem;
import com.actiontech.dble.services.manager.response.ChangeItemType;
import com.actiontech.dble.services.manager.response.ChangeType;
import com.actiontech.dble.singleton.TraceManager;
import com.actiontech.dble.util.CollectionUtil;
import com.actiontech.dble.util.StringUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -76,15 +74,15 @@ public final class ConfigUtil {
return s.append(text.substring(cur)).toString();
}
public static void setSchemasForPool(Map<String, PhysicalDbGroup> dbGroupMap, Map<String, ShardingNode> shardingNodeMap) {
public static void setSchemasForPool(Map<String, PhysicalDbGroup> dbGroupMap, Map<String, BaseNode> baseNodeMap) {
for (PhysicalDbGroup dbGroup : dbGroupMap.values()) {
dbGroup.setSchemas(getShardingNodeSchemasOfDbGroup(dbGroup.getGroupName(), shardingNodeMap));
dbGroup.setSchemas(getBaseNodeSchemasOfDbGroup(dbGroup.getGroupName(), baseNodeMap));
}
}
private static ArrayList<String> getShardingNodeSchemasOfDbGroup(String dbGroup, Map<String, ShardingNode> shardingNodeMap) {
private static ArrayList<String> getBaseNodeSchemasOfDbGroup(String dbGroup, Map<String, BaseNode> baseNodeMap) {
ArrayList<String> schemaList = new ArrayList<>(30);
for (ShardingNode dn : shardingNodeMap.values()) {
for (BaseNode dn : baseNodeMap.values()) {
if (dn.getDbGroup() != null && dn.getDbGroup().getGroupName().equals(dbGroup)) {
schemaList.add(dn.getDatabase());
}
@@ -166,7 +164,7 @@ public final class ConfigUtil {
});
List<String> syncKeyVariables = Lists.newArrayList();
List<String> mysqlSyncKeyVariables = getMysqlSyncKeyVariables(mysqlDbGroups, needSync);
List<String> mysqlSyncKeyVariables = getMysqlSyncKeyVariables(mysqlDbGroups, needSync, !CollectionUtil.isEmpty(clickHouseDbGroups));
Optional.ofNullable(mysqlSyncKeyVariables).ifPresent(syncKeyVariables::addAll);
List<String> clickHouseSyncKeyVariables = getClickHouseSyncKeyVariables(clickHouseDbGroups, needSync);
Optional.ofNullable(clickHouseSyncKeyVariables).ifPresent(syncKeyVariables::addAll);
@@ -177,7 +175,7 @@ public final class ConfigUtil {
}
@Nullable
private static List<String> getMysqlSyncKeyVariables(Map<String, PhysicalDbGroup> dbGroups, boolean needSync) throws InterruptedException, ExecutionException, IOException {
private static List<String> getMysqlSyncKeyVariables(Map<String, PhysicalDbGroup> dbGroups, boolean needSync, boolean existClickHouse) throws InterruptedException, ExecutionException, IOException {
String msg = null;
List<String> list = new ArrayList<>();
if (dbGroups.size() == 0) {
@@ -225,7 +223,12 @@ public final class ConfigUtil {
}
if (secondGroup.size() != 0) {
// if all datasoure's lower case are not equal, throw exception
StringBuilder sb = new StringBuilder("The values of lower_case_table_names for backend MySQLs are different.");
StringBuilder sb = new StringBuilder();
if (existClickHouse) {
sb.append("The configuration contains Clickhouse. Since clickhouse is case-sensitive by default, The values of lower_case_table_names for backend MySQLs must be 0. ");
} else {
sb.append("The values of lower_case_table_names for backend MySQLs are different. ");
}
String firstGroupValue;
String secondGroupValue;
if (lowerCase) {
@@ -238,12 +241,16 @@ public final class ConfigUtil {
sb.append("These MySQL's value is");
sb.append(firstGroupValue);
sb.append(Strings.join(firstGroup, ','));
sb.append(".And these MySQL's value is");
sb.append(". And these MySQL's value is");
sb.append(secondGroupValue);
sb.append(Strings.join(secondGroup, ','));
sb.append(".");
throw new IOException(sb.toString());
}
if (existClickHouse && lowerCase) {
StringBuilder sb = new StringBuilder("The configuration contains Clickhouse. Since clickhouse is case-sensitive by default, The values of lower_case_table_names for backend MySQLs must be 0. All current backend mysql are 1.");
throw new IOException(sb.toString());
}
dbInstanceList.forEach(dbInstance -> dbInstance.setNeedSkipHeartTest(true));
DbleTempConfig.getInstance().setLowerCase(lowerCase);
return list;
@@ -311,23 +318,12 @@ public final class ConfigUtil {
List<PhysicalDbInstance> dbInstanceList = Lists.newArrayList();
getAndSyncKeyVariablesForDataSources(dbGroups, keyVariablesTaskMap, needSync, dbInstanceList);
boolean lowerCase = false;
boolean isFirst = true;
Set<String> firstGroup = new HashSet<>();
Set<String> secondGroup = new HashSet<>();
final boolean lowerCase = false; // ClickHouse is case sensitive by default
int minNodePacketSize = Integer.MAX_VALUE;
for (Map.Entry<VariableMapKey, Future<KeyVariables>> entry : keyVariablesTaskMap.entrySet()) {
VariableMapKey variableMapKey = entry.getKey();
Future<KeyVariables> future = entry.getValue();
KeyVariables keyVariables = future.get();
if (keyVariables != null) {
if (isFirst) {
lowerCase = keyVariables.isLowerCase();
isFirst = false;
firstGroup.add(variableMapKey.getDataSourceName());
} else if (keyVariables.isLowerCase() != lowerCase) {
secondGroup.add(variableMapKey.getDataSourceName());
}
minNodePacketSize = Math.min(minNodePacketSize, keyVariables.getMaxPacketSize());
}
}
@@ -337,27 +333,6 @@ public final class ConfigUtil {
list.add(msg);
LOGGER.warn(msg);
}
if (secondGroup.size() != 0) {
// if all datasoure's lower case are not equal, throw exception
StringBuilder sb = new StringBuilder("The values of lower_case_table_names for backend clickHouse are different.");
String firstGroupValue;
String secondGroupValue;
if (lowerCase) {
firstGroupValue = " not 0 :";
secondGroupValue = " 0 :";
} else {
firstGroupValue = " 0 :";
secondGroupValue = " not 0 :";
}
sb.append("These clickHouse's value is");
sb.append(firstGroupValue);
sb.append(Strings.join(firstGroup, ','));
sb.append(".And these clickHouse's value is");
sb.append(secondGroupValue);
sb.append(Strings.join(secondGroup, ','));
sb.append(".");
throw new IOException(sb.toString());
}
dbInstanceList.forEach(dbInstance -> dbInstance.setNeedSkipHeartTest(true));
DbleTempConfig.getInstance().setLowerCase(lowerCase);
return list;
@@ -13,6 +13,7 @@ import com.actiontech.dble.meta.ProxyMetaManager;
import com.actiontech.dble.meta.ReloadLogHelper;
import com.actiontech.dble.meta.TableMeta;
import com.actiontech.dble.meta.ViewMeta;
import com.actiontech.dble.util.StringUtil;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -46,6 +47,10 @@ public abstract class AbstractSchemaMetaHandler {
handlers.add(new FakeConfigTableHandler(this, getTmManager()));
// tables config
handlers.add(new ConfigTableHandler(this));
if (!StringUtil.isBlank(schemaConfig.getDefaultApNode())) {
// check tables exists in ClickHouse
handlers.add(new ClickHouseTableHandler(this, getTmManager()));
}
boolean existTable = false;
for (ModeTableHandler handler : handlers) {
@@ -0,0 +1,186 @@
package com.actiontech.dble.meta.table;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.alarm.AlarmCode;
import com.actiontech.dble.alarm.Alert;
import com.actiontech.dble.alarm.AlertUtil;
import com.actiontech.dble.alarm.ToResolveContainer;
import com.actiontech.dble.backend.datasource.ApNode;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.config.model.sharding.SchemaConfig;
import com.actiontech.dble.meta.ProxyMetaManager;
import com.actiontech.dble.meta.SchemaMeta;
import com.actiontech.dble.sqlengine.MultiRowSQLQueryResultHandler;
import com.actiontech.dble.sqlengine.SQLJob;
import com.actiontech.dble.sqlengine.SQLQueryResult;
import com.actiontech.dble.sqlengine.SQLQueryResultListener;
import com.actiontech.dble.util.CollectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ClickHouseTableHandler extends ModeTableHandler {
protected static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseTableHandler.class);
private final AbstractSchemaMetaHandler operationalHandler;
protected final String schema;
protected final SchemaConfig schemaConfig;
private final ProxyMetaManager tmManager;
protected boolean isFinished = false;
protected Lock lock = new ReentrantLock();
protected Condition notify = lock.newCondition();
public ClickHouseTableHandler(AbstractSchemaMetaHandler operationalHandler, ProxyMetaManager tmManager) {
this.operationalHandler = operationalHandler;
this.schemaConfig = operationalHandler.getSchemaConfig();
this.schema = operationalHandler.getSchema();
this.tmManager = tmManager;
}
@Override
boolean loadMetaData() {
Set<String> tables = new HashSet<>();
SchemaMeta schemaMeta = tmManager.getCatalogs().get(schema);
if (schemaMeta != null) {
schemaMeta.getTableMetas().values().stream().forEach(f -> tables.add(f.getTableName()));
}
schemaConfig.getTables().values().stream().forEach(f -> tables.add(f.getName()));
if (!CollectionUtil.isEmpty(tables)) {
ShowTableByNodeUnitHandler unitHandler = new ShowTableByNodeUnitHandler(tables, schemaConfig.getDefaultApNode());
unitHandler.execute();
Set<String> noExistTable = unitHandler.getNotExistTable();
if (!CollectionUtil.isEmpty(noExistTable)) {
for (String tableName : noExistTable) {
String tableLackKey = AlertUtil.getTableLackKey2(schemaConfig.getDefaultApNode(), tableName);
String warnMsg = "Can't get table " + tableName + "'s config from apNode:" + schemaConfig.getDefaultApNode() + "! Maybe the table is not initialized!";
LOGGER.warn(warnMsg);
AlertUtil.alertSelf(AlarmCode.TABLE_LACK, Alert.AlertLevel.WARN, warnMsg, AlertUtil.genSingleLabel("TABLE", tableLackKey));
ToResolveContainer.TABLE_LACK.add(tableLackKey);
}
}
}
return true;
}
@Override
void handleTable(String table, String apNode, boolean isView, String sql) {
}
@Override
void countdown(String apNode, Set<String> remainingTables) {
}
@Override
void tryComplete(String apNode, boolean isLastApNode) {
}
static class ShowTableByNodeUnitHandler extends GetNodeTablesHandler {
private final Set<String> expectedTables;
private final Set<String> ckTables = new HashSet<>();
private String apNode;
ShowTableByNodeUnitHandler(Set<String> expectedTables, String apNode) {
super(apNode);
this.sql = CLICKHOUSE_SQL;
this.apNode = apNode;
this.expectedTables = expectedTables;
}
public void execute() {
ApNode dn = DbleServer.getInstance().getConfig().getApNodes().get(apNode);
String showTableCol = "name";
String[] showTableCols = new String[]{showTableCol};
PhysicalDbInstance ds = dn.getDbGroup().getWriteDbInstance();
if (ds.isAlive()) {
MultiRowSQLQueryResultHandler resultHandler = new MultiRowSQLQueryResultHandler(showTableCols, new ClickHouseShowTablesListener(showTableCol, dn.getDatabase(), ds));
SQLJob sqlJob = new SQLJob(sql, dn.getDatabase(), resultHandler, ds);
sqlJob.run();
} else {
MultiRowSQLQueryResultHandler resultHandler = new MultiRowSQLQueryResultHandler(showTableCols, new ClickHouseShowTablesListener(showTableCol, dn.getDatabase(), null));
SQLJob sqlJob = new SQLJob(sql, apNode, resultHandler, false);
sqlJob.run();
}
}
@Override
protected void handleTable(String table, String tableType) {
ckTables.add(table);
}
private Set<String> getNotExistTable() {
lock.lock();
try {
while (!isFinished) {
notify.await();
}
} catch (InterruptedException e) {
LOGGER.warn("getNotExistTable() is interrupted.");
return Collections.emptySet();
} finally {
lock.unlock();
}
expectedTables.removeAll(ckTables); // get not exist table
return expectedTables;
}
protected class ClickHouseShowTablesListener implements SQLQueryResultListener<SQLQueryResult<List<Map<String, String>>>> {
private String showTableCol;
private PhysicalDbInstance ds;
private String schema;
ClickHouseShowTablesListener(String showTableCol, String schema, PhysicalDbInstance ds) {
this.showTableCol = showTableCol;
this.ds = ds;
this.schema = schema;
}
@Override
public void onResult(SQLQueryResult<List<Map<String, String>>> result) {
String key = null;
if (ds != null) {
key = "dbInstance[" + ds.getDbGroupConfig().getName() + "." + ds.getConfig().getInstanceName() + "],ap_node[" + apNode + "],schema[" + schema + "]";
}
if (!result.isSuccess()) {
//not thread safe
String warnMsg = "Can't show tables from apNode:" + apNode + "! Maybe the apNode is not initialized!";
LOGGER.warn(warnMsg);
if (ds != null) {
Map<String, String> labels = AlertUtil.genSingleLabel("dbInstance", ds.getDbGroupConfig().getName() + "-" + ds.getConfig().getInstanceName());
labels.put("ap_node", apNode);
AlertUtil.alert(AlarmCode.AP_NODE_LACK, Alert.AlertLevel.WARN, "{" + key + "} is lack", "mysql", ds.getConfig().getId(), labels);
ToResolveContainer.AP_NODE_LACK.add(key);
}
handleFinished();
return;
}
if (ds != null && ToResolveContainer.AP_NODE_LACK.contains(key)) {
Map<String, String> labels = AlertUtil.genSingleLabel("dbInstance", ds.getDbGroupConfig().getName() + "-" + ds.getConfig().getInstanceName());
labels.put("ap_node", apNode);
AlertUtil.alertResolve(AlarmCode.AP_NODE_LACK, Alert.AlertLevel.WARN, "mysql", ds.getConfig().getId(), labels,
ToResolveContainer.AP_NODE_LACK, key);
}
List<Map<String, String>> rows = result.getResult();
for (Map<String, String> row : rows) {
String table = row.get(showTableCol);
if (DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames()) {
table = table.toLowerCase();
}
String tableLackKey = AlertUtil.getTableLackKey2(apNode, table);
if (ToResolveContainer.TABLE_LACK.contains(tableLackKey)) {
AlertUtil.alertSelfResolve(AlarmCode.TABLE_LACK, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableLackKey),
ToResolveContainer.TABLE_LACK, tableLackKey);
}
handleTable(table, null);
}
handleFinished();
}
}
}
}
@@ -5,9 +5,10 @@
package com.actiontech.dble.meta.table;
import com.actiontech.dble.backend.datasource.BaseNode;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.datasource.ShardingNode;
import com.actiontech.dble.config.ErrorInfo;
import com.actiontech.dble.config.model.db.type.DataBaseType;
import com.actiontech.dble.sqlengine.MultiRowSQLQueryResultHandler;
import com.actiontech.dble.sqlengine.SQLQueryResult;
import com.actiontech.dble.sqlengine.SQLQueryResultListener;
@@ -25,15 +26,15 @@ import java.util.concurrent.atomic.AtomicInteger;
public class DryRunGetNodeTablesHandler extends GetNodeTablesHandler {
private final AtomicInteger counter;
private final ShardingNode phyShardingNode;
private final BaseNode baseNode;
private final Map<String, Set<String>> returnMap;
private final boolean isLowerCase;
private final List<ErrorInfo> list;
public DryRunGetNodeTablesHandler(AtomicInteger counter, ShardingNode phyShardingNode, Map<String, Set<String>> returnMap, boolean isLowerCase, List<ErrorInfo> list) {
super(phyShardingNode.getName());
public DryRunGetNodeTablesHandler(AtomicInteger counter, BaseNode baseNode, Map<String, Set<String>> returnMap, boolean isLowerCase, List<ErrorInfo> list) {
super(baseNode.getName());
this.counter = counter;
this.phyShardingNode = phyShardingNode;
this.baseNode = baseNode;
this.returnMap = returnMap;
this.isLowerCase = isLowerCase;
this.list = list;
@@ -41,9 +42,17 @@ public class DryRunGetNodeTablesHandler extends GetNodeTablesHandler {
@Override
public void execute() {
String mysqlShowTableCol = "Tables_in_" + phyShardingNode.getDatabase();
String mysqlShowTableCol;
String executeSql;
if (baseNode.getDbGroup().getDbGroupConfig().instanceDatabaseType() == DataBaseType.MYSQL) {
executeSql = SQL;
mysqlShowTableCol = "Tables_in_" + baseNode.getDatabase();
} else {
executeSql = CLICKHOUSE_SQL;
mysqlShowTableCol = "name";
}
String[] mysqlShowTableCols = new String[]{mysqlShowTableCol};
PhysicalDbInstance tds = phyShardingNode.getDbGroup().getWriteDbInstance();
PhysicalDbInstance tds = baseNode.getDbGroup().getWriteDbInstance();
PhysicalDbInstance ds = null;
if (tds != null) {
if (tds.isTestConnSuccess()) {
@@ -52,17 +61,17 @@ public class DryRunGetNodeTablesHandler extends GetNodeTablesHandler {
}
if (ds != null) {
MultiRowSQLQueryResultHandler resultHandler = new MultiRowSQLQueryResultHandler(mysqlShowTableCols, new MySQLShowTablesListener(mysqlShowTableCol));
SpecialSqlJob sqlJob = new SpecialSqlJob(SQL, phyShardingNode.getDatabase(), resultHandler, ds, list);
SpecialSqlJob sqlJob = new SpecialSqlJob(executeSql, baseNode.getDatabase(), resultHandler, ds, list);
sqlJob.run();
} else {
list.add(new ErrorInfo("Backend", "WARNING", "shardingNode[" + phyShardingNode.getName() + "] has no available primary dbinstance,The table in this shardingNode has not checked"));
list.add(new ErrorInfo("Backend", "WARNING", baseNode.getNodeType() + "Node[" + baseNode.getName() + "] has no available primary dbinstance,The table in this " + baseNode.getNodeType() + "Node has not checked"));
handleFinished();
}
}
@Override
protected void handleTable(String table, String tableType) {
returnMap.get(phyShardingNode.getName()).add(table);
returnMap.get(baseNode.getName()).add(table);
}
@Override
@@ -81,12 +90,12 @@ public class DryRunGetNodeTablesHandler extends GetNodeTablesHandler {
@Override
public void onResult(SQLQueryResult<List<Map<String, String>>> result) {
if (!result.isSuccess()) {
String warnMsg = "Can't show tables from shardingNode:" + phyShardingNode.getName() + "! Maybe the shardingNode is not initialized!";
String warnMsg = "Can't show tables from " + baseNode.getNodeType() + "Node:" + baseNode.getName() + "! Maybe the " + baseNode.getNodeType() + "Node is not initialized!";
LOGGER.warn(warnMsg);
handleFinished();
return;
}
returnMap.put(phyShardingNode.getName(), new HashSet<>());
returnMap.put(baseNode.getName(), new HashSet<>());
List<Map<String, String>> rows = result.getResult();
for (Map<String, String> row : rows) {
String table = row.get(mysqlShowTableCol);
@@ -28,12 +28,13 @@ import java.util.concurrent.locks.ReentrantLock;
public abstract class GetNodeTablesHandler {
protected static final Logger LOGGER = LoggerFactory.getLogger(GetNodeTablesHandler.class);
protected static final String SQL = "show full tables where Table_type ='BASE TABLE' ";
protected static final String CLICKHOUSE_SQL = "show tables ";
private static final String SQL_WITH_VIEW = "show full tables ";
protected String shardingNode;
protected boolean isFinished = false;
protected Lock lock = new ReentrantLock();
protected Condition notify = lock.newCondition();
private String sql = SQL;
protected String sql = SQL;
GetNodeTablesHandler(String shardingNode, boolean skipView) {
this.shardingNode = shardingNode;
@@ -74,7 +75,7 @@ public abstract class GetNodeTablesHandler {
}
}
private class MySQLShowTablesListener implements SQLQueryResultListener<SQLQueryResult<List<Map<String, String>>>> {
protected class MySQLShowTablesListener implements SQLQueryResultListener<SQLQueryResult<List<Map<String, String>>>> {
private String mysqlShowTableCol;
private PhysicalDbInstance ds;
private String schema;
@@ -5,6 +5,7 @@
package com.actiontech.dble.services.manager.response;
import com.actiontech.dble.backend.datasource.ApNode;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.datasource.ShardingNode;
@@ -23,10 +24,7 @@ import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.config.model.db.type.DataBaseType;
import com.actiontech.dble.config.model.sharding.SchemaConfig;
import com.actiontech.dble.config.model.sharding.table.BaseTableConfig;
import com.actiontech.dble.config.model.user.ManagerUserConfig;
import com.actiontech.dble.config.model.user.ShardingUserConfig;
import com.actiontech.dble.config.model.user.UserConfig;
import com.actiontech.dble.config.model.user.UserName;
import com.actiontech.dble.config.model.user.*;
import com.actiontech.dble.config.util.ConfigUtil;
import com.actiontech.dble.meta.table.DryRunGetNodeTablesHandler;
import com.actiontech.dble.net.mysql.*;
@@ -242,7 +240,6 @@ public final class DryRun {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("table-exists-check");
try {
Map<String, Set<String>> tableMap = showShardingNodeTable(serverConfig, isLowerCase, list, schemaMap);
for (SchemaConfig schema : serverConfig.getSchemas().values()) {
for (BaseTableConfig table : schema.getTables().values()) {
StringBuilder sb = new StringBuilder(100);
@@ -258,6 +255,19 @@ public final class DryRun {
}
}
}
Map<String, Set<String>> tableMap2 = showApNodeTable(serverConfig, isLowerCase, list, schemaMap);
for (SchemaConfig schema : serverConfig.getSchemas().values()) {
String apDn = schema.getDefaultApNode();
if (!StringUtil.isBlank(apDn)) {
for (BaseTableConfig table : schema.getTables().values()) {
if (tableMap2.get(apDn) != null && !tableMap2.get(apDn).contains(table.getName())) {
list.add(new ErrorInfo("Meta", "WARNING", "Table " + schema.getName() + "." + table.getName() + " doesn't exists in apNode[" + apDn + "]"));
}
}
}
}
} finally {
TraceManager.finishSpan(traceObject);
}
@@ -296,6 +306,38 @@ public final class DryRun {
return result;
}
private static Map<String, Set<String>> showApNodeTable(ServerConfig serverConfig, boolean isLowerCase, List<ErrorInfo> list, Map<String, Set<String>> schemaMap) {
Map<String, Set<String>> result = new ConcurrentHashMap<>();
AtomicInteger counter = new AtomicInteger(serverConfig.getApNodes().size());
for (ApNode apNode : serverConfig.getApNodes().values()) {
String dbGroupName = apNode.getDbGroupName();
String databaseName = apNode.getDatabase();
if (schemaMap.containsKey(dbGroupName)) {
Set<String> schemaSet = schemaMap.get(dbGroupName);
boolean exist;
if (isLowerCase) {
Optional<String> existSchema = schemaSet.stream().filter(schema -> StringUtil.equals(schema.toLowerCase(), databaseName)).findFirst();
exist = existSchema.isPresent();
} else {
exist = schemaSet.contains(databaseName);
}
if (exist) {
DryRunGetNodeTablesHandler showTablesHandler = new DryRunGetNodeTablesHandler(counter, apNode, result, isLowerCase, list);
showTablesHandler.execute();
} else {
counter.decrementAndGet();
list.add(new ErrorInfo("Meta", "WARNING", "Database " + apNode.getDatabase() + " doesn't exists in dbGroup[" + apNode.getDbGroupName() + "]"));
}
} else {
counter.decrementAndGet();
list.add(new ErrorInfo("Meta", "WARNING", "Database " + apNode.getDatabase() + " doesn't exists in dbGroup[" + apNode.getDbGroupName() + "]"));
}
}
while (counter.get() != 0) {
LockSupport.parkNanos(1000L);
}
return result;
}
private static void printResult(ManagerService service, List<ErrorInfo> list) {
ByteBuffer buffer = service.allocate();
@@ -357,7 +399,10 @@ public final class DryRun {
for (UserConfig user : userMap.values()) {
if (user instanceof ManagerUserConfig) {
hasManagerUser = true;
} else if (user instanceof ShardingUserConfig) { // contains HybridTAUserConfig
} else if (user instanceof HybridTAUserConfig) {
hasShardingUser = true;
schema.addAll(((HybridTAUserConfig) user).getSchemas());
} else if (user instanceof ShardingUserConfig) {
hasShardingUser = true;
schema.addAll(((ShardingUserConfig) user).getSchemas());
} else {
@@ -366,9 +411,9 @@ public final class DryRun {
}
if (!hasShardingUser) {
if (serverConfig.getSchemas().size() > 0) {
list.add(new ErrorInfo("Xml", "WARNING", "There is No Sharding User"));
list.add(new ErrorInfo("Xml", "WARNING", "There is No Sharding/HybridTA User"));
} else {
list.add(new ErrorInfo("Xml", "NOTICE", "There is No Sharding User"));
list.add(new ErrorInfo("Xml", "NOTICE", "There is No Sharding/HybridTA User"));
}
} else if (schema.size() <= serverConfig.getSchemas().size()) {
for (String schemaName : serverConfig.getSchemas().keySet()) {
@@ -290,6 +290,7 @@ public final class ReloadConfig {
Map<UserName, UserConfig> newUsers = newConfig.getUsers();
Map<String, SchemaConfig> newSchemas = newConfig.getSchemas();
Map<String, ShardingNode> newShardingNodes = newConfig.getShardingNodes();
Map<String, ApNode> newApNodes = newConfig.getApNodes();
Map<ERTable, Set<ERTable>> newErRelations = newConfig.getErRelations();
Map<String, Set<ERTable>> newFuncNodeERMap = newConfig.getFuncNodeERMap();
Map<String, Properties> newBlacklistConfig = newConfig.getBlacklistConfig();
@@ -302,7 +303,7 @@ public final class ReloadConfig {
ServerConfig oldConfig = DbleServer.getInstance().getConfig();
boolean result;
try {
result = oldConfig.reload(newUsers, newSchemas, newShardingNodes, newDbGroups, oldConfig.getDbGroups(), newErRelations, newFuncNodeERMap,
result = oldConfig.reload(newUsers, newSchemas, newShardingNodes, newApNodes, newDbGroups, oldConfig.getDbGroups(), newErRelations, newFuncNodeERMap,
newSystemVariables, loader.isFullyConfigured(), loadAllMode, newBlacklistConfig, newFunctions,
loader.getUserConfig(), loader.getSequenceConfig(), loader.getShardingConfig(), loader.getDbConfig(), changeItemList);
CronScheduler.getInstance().init(oldConfig.getSchemas());
@@ -568,33 +569,6 @@ public final class ReloadConfig {
return changeItemList;
}
private static PhysicalDbInstance getPhysicalDbInstance(ConfigInitializer loader) {
PhysicalDbInstance ds = null;
for (PhysicalDbGroup dbGroup : loader.getDbGroups().values()) {
PhysicalDbInstance dsTest = dbGroup.getWriteDbInstance();
if (dsTest.isTestConnSuccess()) {
ds = dsTest;
}
if (ds != null) {
break;
}
}
if (ds == null) {
for (PhysicalDbGroup dbGroup : loader.getDbGroups().values()) {
for (PhysicalDbInstance dsTest : dbGroup.getDbInstances(false)) {
if (dsTest.isTestConnSuccess()) {
ds = dsTest;
break;
}
}
if (ds != null) {
break;
}
}
}
return ds;
}
private static void recycleOldBackendConnections(boolean forceAllReload, boolean closeFrontCon) {
if (forceAllReload && closeFrontCon) {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("recycle-activeBackend-connections");
@@ -6,8 +6,8 @@
package com.actiontech.dble.sqlengine;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.datasource.BaseNode;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.datasource.ShardingNode;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import com.actiontech.dble.btrace.provider.GeneralProvider;
import com.actiontech.dble.config.ErrorCode;
@@ -39,7 +39,7 @@ public class SQLJob implements ResponseHandler, Runnable, Cloneable {
public static final Logger LOGGER = LoggerFactory.getLogger(SQLJob.class);
private final String sql;
private final String shardingNode;
private final String baseNode;
private final String schema;
private BackendConnection connection;
private final SQLJobHandler jobHandler;
@@ -54,15 +54,15 @@ public class SQLJob implements ResponseHandler, Runnable, Cloneable {
this.jobHandler = jobHandler;
this.ds = ds;
this.schema = schema;
this.shardingNode = null;
this.baseNode = null;
}
public SQLJob(String sql, String shardingNode, SQLJobHandler jobHandler, boolean isMustWriteNode) {
public SQLJob(String sql, String baseNode, SQLJobHandler jobHandler, boolean isMustWriteNode) {
super();
this.sql = "/*#timestamp=" + System.currentTimeMillis() + " from=" + SystemConfig.getInstance().getInstanceName() + " reason=sql job*/" + sql;
this.jobHandler = jobHandler;
this.ds = null;
this.shardingNode = shardingNode;
this.baseNode = baseNode;
this.schema = null;
this.isMustWriteNode = isMustWriteNode;
}
@@ -72,9 +72,8 @@ public class SQLJob implements ResponseHandler, Runnable, Cloneable {
TraceManager.log(ImmutableMap.of("sql", sql), traceObject);
try {
if (ds == null) {
RouteResultsetNode node = new RouteResultsetNode(shardingNode, ServerParse.SELECT, sql);
// create new connection
ShardingNode dn = DbleServer.getInstance().getConfig().getShardingNodes().get(node.getName());
RouteResultsetNode node = new RouteResultsetNode(baseNode, ServerParse.SELECT, sql);
BaseNode dn = DbleServer.getInstance().getConfig().getAllNodes().get(node.getName());
dn.getConnection(dn.getDatabase(), isMustWriteNode, true, node, this, node);
} else {
ds.getConnection(schema, this, null, isMustWriteNode);
@@ -121,7 +120,7 @@ public class SQLJob implements ResponseHandler, Runnable, Cloneable {
protected boolean doFinished(boolean failed) {
if (finished.compareAndSet(false, true)) {
GeneralProvider.sqlJobDoFinished();
jobHandler.finished(shardingNode == null ? schema : shardingNode, failed);
jobHandler.finished(baseNode == null ? schema : baseNode, failed);
return true;
}
return false;
@@ -199,10 +198,11 @@ public class SQLJob implements ResponseHandler, Runnable, Cloneable {
doFinished(true);
}
@Override
public String toString() {
return "SQLJob [shardingNode=" +
shardingNode + ",schema=" +
return "SQLJob [baseNode=" +
baseNode + ",schema=" +
schema + ",sql=" + sql + ", jobHandler=" +
jobHandler + "]";
}