diff --git a/dble_checkstyle.xml b/dble_checkstyle.xml index 4805185e4..58f3da03c 100644 --- a/dble_checkstyle.xml +++ b/dble_checkstyle.xml @@ -76,7 +76,6 @@ - diff --git a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java index 48a681ec8..ff14f4e6c 100644 --- a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java +++ b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java @@ -9,6 +9,7 @@ import com.actiontech.dble.DbleServer; import com.actiontech.dble.alarm.AlarmCode; import com.actiontech.dble.alarm.Alert; import com.actiontech.dble.alarm.AlertUtil; +import com.actiontech.dble.backend.heartbeat.MySQLHeartbeat; import com.actiontech.dble.backend.mysql.nio.MySQLInstance; import com.actiontech.dble.cluster.JsonFactory; import com.actiontech.dble.cluster.values.DbInstanceStatus; @@ -19,12 +20,14 @@ import com.actiontech.dble.config.helper.GetAndSyncDbInstanceKeyVariables; import com.actiontech.dble.config.helper.KeyVariables; import com.actiontech.dble.config.model.db.DbGroupConfig; import com.actiontech.dble.config.model.db.DbInstanceConfig; +import com.actiontech.dble.meta.ReloadLogHelper; import com.actiontech.dble.net.IOProcessor; import com.actiontech.dble.net.Session; import com.actiontech.dble.net.connection.BackendConnection; import com.actiontech.dble.net.connection.PooledConnection; import com.actiontech.dble.rwsplit.RWSplitNonBlockingSession; import com.actiontech.dble.singleton.HaConfigManager; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.gson.Gson; import com.google.gson.JsonObject; @@ -43,20 +46,17 @@ public class PhysicalDbGroup { public static final String JSON_LIST = "dbInstance"; // rw split public static final int RW_SPLIT_OFF = 0; - public static final int RW_SPLIT_ALL_SLAVES = 1; public static final int RW_SPLIT_ALL = 2; public static final int RW_SPLIT_ALL_SLAVES_MAY_MASTER = 3; - // weight - public static final int WEIGHT = 0; - private final List writeInstanceList; + private List writeInstanceList; - private final String groupName; - private final DbGroupConfig dbGroupConfig; + private String groupName; + private DbGroupConfig dbGroupConfig; private volatile PhysicalDbInstance writeDbInstance; private Map allSourceMap = new HashMap<>(); - private final int rwSplitMode; - protected String[] schemas; + private int rwSplitMode; + protected List schemas = Lists.newArrayList(); private final LoadBalancer loadBalancer = new RandomLoadBalancer(); private final ReentrantReadWriteLock adjustLock = new ReentrantReadWriteLock(); @@ -106,14 +106,22 @@ public class PhysicalDbGroup { return groupName; } - public String[] getSchemas() { + public List getSchemas() { return schemas; } - public void setSchemas(String[] mySchemas) { + public void setSchemas(List mySchemas) { this.schemas = mySchemas; } + public void addSchema(String schema) { + this.schemas.add(schema); + } + + public void removeSchema(String schema) { + this.schemas.remove(schema); + } + public DbGroupConfig getDbGroupConfig() { return dbGroupConfig; } @@ -153,10 +161,6 @@ public class PhysicalDbGroup { return shardingUseless; } - public boolean isRwSplitUseless() { - return rwSplitUseless; - } - public void setShardingUseless(boolean shardingUseless) { this.shardingUseless = shardingUseless; } @@ -183,8 +187,11 @@ public class PhysicalDbGroup { } public void init(String reason) { + if (LOGGER.isDebugEnabled()) { + ReloadLogHelper.debug("init new group :{},reason:{}", LOGGER, this.toString(), reason); + } for (Map.Entry entry : allSourceMap.entrySet()) { - entry.getValue().init(reason); + entry.getValue().init(reason, true); } } @@ -219,19 +226,23 @@ public class PhysicalDbGroup { } public void stop(String reason, boolean closeFront) { + if (LOGGER.isDebugEnabled()) { + ReloadLogHelper.debug("recycle old group :{},reason:{},is close front:{}", LOGGER, this.toString(), reason, closeFront); + } boolean flag = checkState(); if (!flag) { return; } for (PhysicalDbInstance dbInstance : allSourceMap.values()) { - dbInstance.stop(reason, closeFront); + dbInstance.stopDirectly(reason, closeFront); } } public void stopOfFresh(List sourceNames, String reason, boolean closeFront) { for (String sourceName : sourceNames) { if (allSourceMap.containsKey(sourceName)) { - allSourceMap.get(sourceName).stop(reason, closeFront, false); + PhysicalDbInstance dbInstance = allSourceMap.get(sourceName); + dbInstance.stop(reason, closeFront, false, dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || writeDbInstance == dbInstance); } } @@ -254,7 +265,7 @@ public class PhysicalDbGroup { public boolean stopOfBackground(String reason) { if (state.intValue() == STATE_DELETING && getBindingCount() == 0) { for (PhysicalDbInstance dbInstance : allSourceMap.values()) { - dbInstance.stop(reason, false); + dbInstance.stopDirectly(reason, false); } return true; } @@ -266,6 +277,40 @@ public class PhysicalDbGroup { return state.intValue() != INITIAL; } + public void stopPool(String reason, boolean closeFront, boolean closeWrite) { + for (PhysicalDbInstance dbInstance : allSourceMap.values()) { + if (!closeWrite && writeDbInstance == dbInstance) { + continue; + } + dbInstance.stopPool(reason, closeFront); + } + } + + public void startPool(String reason, boolean startWrite) { + for (PhysicalDbInstance dbInstance : allSourceMap.values()) { + if (!startWrite && writeDbInstance == dbInstance) { + continue; + } + dbInstance.startPool(reason); + } + } + + public void stopHeartbeat(String reason) { + for (PhysicalDbInstance dbInstance : allSourceMap.values()) { + dbInstance.stopHeartbeat(reason); + } + } + + public void startHeartbeat() { + for (PhysicalDbInstance dbInstance : allSourceMap.values()) { + if (dbInstance.heartbeat.isStop()) { + dbInstance.heartbeat = new MySQLHeartbeat(dbInstance); + } + dbInstance.startHeartbeat(); + } + } + + public Collection getDbInstances(boolean isAll) { if (!isAll && rwSplitMode == RW_SPLIT_OFF) { return writeInstanceList; @@ -594,6 +639,14 @@ public class PhysicalDbGroup { } } + public void setState(Integer state) { + this.state = state; + } + + public Integer getState() { + return state; + } + public int getBindingCount() { return rwSplitSessionSet.size(); } @@ -606,15 +659,58 @@ public class PhysicalDbGroup { return this.rwSplitSessionSet.remove(session); } + public void setDbInstance(PhysicalDbInstance dbInstance) { + dbInstance.setDbGroup(this); + if (dbInstance.getConfig().isPrimary()) { + this.writeDbInstance = dbInstance; + this.writeInstanceList = Collections.singletonList(dbInstance); + } + this.allSourceMap.put(dbInstance.getName(), dbInstance); + } + public boolean equalsBaseInfo(PhysicalDbGroup pool) { - return pool.getDbGroupConfig().getName().equals(this.dbGroupConfig.getName()) && - pool.getDbGroupConfig().getHeartbeatSQL().equals(this.dbGroupConfig.getHeartbeatSQL()) && + return pool.dbGroupConfig.equalsBaseInfo(this.dbGroupConfig) && + pool.rwSplitMode == this.rwSplitMode && + pool.getGroupName().equals(this.groupName) && + pool.isUseless() == this.isUseless(); + } + + public boolean equalsForConnectionPool(PhysicalDbGroup pool) { + boolean rwSplitModeFlag1 = pool.getDbGroupConfig().getRwSplitMode() != 0 && this.dbGroupConfig.getRwSplitMode() != 0; + boolean rwSplitModeFlag2 = pool.getDbGroupConfig().getRwSplitMode() == 0 && this.dbGroupConfig.getRwSplitMode() == 0; + return (rwSplitModeFlag1 || rwSplitModeFlag2) && pool.isUseless() == this.isUseless(); + } + + public boolean equalsForHeartbeat(PhysicalDbGroup pool) { + return pool.getDbGroupConfig().getHeartbeatSQL().equals(this.dbGroupConfig.getHeartbeatSQL()) && pool.getDbGroupConfig().getHeartbeatTimeout() == this.dbGroupConfig.getHeartbeatTimeout() && - pool.getDbGroupConfig().getErrorRetryCount() == this.dbGroupConfig.getErrorRetryCount() && - pool.getDbGroupConfig().getRwSplitMode() == this.dbGroupConfig.getRwSplitMode() && - pool.getDbGroupConfig().getDelayThreshold() == this.dbGroupConfig.getDelayThreshold() && - pool.getDbGroupConfig().isDisableHA() == this.dbGroupConfig.isDisableHA() && - pool.getGroupName().equals(this.groupName) && pool.isShardingUseless() == this.isShardingUseless() && pool.isRwSplitUseless() == this.isRwSplitUseless() && - pool.isAnalysisUseless() == this.isAnalysisUseless(); + pool.getDbGroupConfig().getErrorRetryCount() == this.dbGroupConfig.getErrorRetryCount(); + } + + + public void copyBaseInfo(PhysicalDbGroup physicalDbGroup) { + this.dbGroupConfig = physicalDbGroup.dbGroupConfig; + this.groupName = physicalDbGroup.groupName; + this.rwSplitMode = physicalDbGroup.rwSplitMode; + this.schemas = physicalDbGroup.schemas; + this.rwSplitUseless = physicalDbGroup.rwSplitUseless; + this.shardingUseless = physicalDbGroup.shardingUseless; + for (PhysicalDbInstance dbInstance : this.allSourceMap.values()) { + dbInstance.setDbGroupConfig(physicalDbGroup.dbGroupConfig); + } + } + + @Override + public String toString() { + return "PhysicalDbGroup{" + + "groupName='" + groupName + '\'' + + ", dbGroupConfig=" + dbGroupConfig + + ", writeDbInstance=" + writeDbInstance + + ", allSourceMap=" + allSourceMap + + ", rwSplitMode=" + rwSplitMode + + ", schemas=" + schemas + + ", shardingUseless=" + shardingUseless + + ", rwSplitUseless=" + rwSplitUseless + + '}'; } } diff --git a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbInstance.java b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbInstance.java index 6746c454f..da8367696 100644 --- a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbInstance.java +++ b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbInstance.java @@ -13,6 +13,8 @@ import com.actiontech.dble.backend.pool.ConnectionPool; import com.actiontech.dble.backend.pool.ReadTimeStatusInstance; import com.actiontech.dble.config.model.db.DbGroupConfig; import com.actiontech.dble.config.model.db.DbInstanceConfig; +import com.actiontech.dble.meta.ReloadLogHelper; +import com.actiontech.dble.net.IOProcessor; import com.actiontech.dble.net.connection.BackendConnection; import com.actiontech.dble.net.connection.PooledConnection; import com.actiontech.dble.net.factory.MySQLConnectionFactory; @@ -24,6 +26,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.List; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; @@ -35,10 +39,10 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { private static final Logger LOGGER = LoggerFactory.getLogger(PhysicalDbInstance.class); private final String name; - private final DbInstanceConfig config; + private DbInstanceConfig config; private volatile boolean readInstance; - private final DbGroupConfig dbGroupConfig; + private DbGroupConfig dbGroupConfig; private PhysicalDbGroup dbGroup; private final AtomicBoolean disabled; private String dsVersion; @@ -75,10 +79,6 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { this.disabled = new AtomicBoolean(org.disabled.get()); } - public void init(String reason) { - init(reason, true); - } - public void init(String reason, boolean isInitHeartbeat) { if (disabled.get() || fakeNode) { LOGGER.info("init dbInstance[{}] because {}, but it is disabled or a fakeNode, skip initialization.", this.dbGroup.getGroupName() + "." + name, reason); @@ -89,10 +89,17 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { LOGGER.info("init dbInstance[{}] because {}, but it has been initialized, skip initialization.", this.dbGroup.getGroupName() + "." + name, reason); return; } + //minCon/maxCon/numOfShardingNodes + checkPoolSize(); + LOGGER.info("init dbInstance[{}]", this.dbGroup.getGroupName() + "." + name); + start(reason, isInitHeartbeat); + } + + protected void checkPoolSize() { int size = config.getMinCon(); - String[] physicalSchemas = dbGroup.getSchemas(); - int initSize = physicalSchemas.length; + List physicalSchemas = dbGroup.getSchemas(); + int initSize = physicalSchemas.size(); if (size < initSize) { LOGGER.warn("For db instance[{}], minIdle is less than (the count of shardingNodes), so dble will create at least 1 conn for every schema, " + "minCon size before:{}, now:{}", this.dbGroup.getGroupName() + "." + name, size, initSize); @@ -105,9 +112,6 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { LOGGER.warn("For db instance[{}], maxTotal[{}] is less than the minCon or the count of shardingNodes,change the maxCon into {}", this.dbGroup.getGroupName() + "." + name, size, initSize); config.setMaxCon(initSize); } - - LOGGER.info("init dbInstance[{}]", this.dbGroup.getGroupName() + "." + name); - start(reason, isInitHeartbeat); } public void createConnectionSkipPool(String schema, ResponseHandler handler) { @@ -284,6 +288,10 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { return dbGroupConfig; } + public void setDbGroupConfig(DbGroupConfig dbGroupConfig) { + this.dbGroupConfig = dbGroupConfig; + } + public boolean isFakeNode() { return fakeNode; } @@ -368,7 +376,10 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { return isSync && isNotDelay; } - private void startHeartbeat() { + public void startHeartbeat() { + if (LOGGER.isDebugEnabled()) { + ReloadLogHelper.debug("start heartbeat :{}", LOGGER, this.toString()); + } if (this.isDisabled() || this.isFakeNode()) { LOGGER.info("the instance[{}] is disabled or fake node, skip to start heartbeat.", this.dbGroup.getGroupName() + "." + name); return; @@ -378,29 +389,94 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { } void start(String reason, boolean isStartHeartbeat) { - if ((dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this) && !dbGroup.isUseless()) { - this.connectionPool.startEvictor(this.dbGroup.getGroupName() + "." + name, reason); - } + startPool(reason); if (isStartHeartbeat) { startHeartbeat(); } } - public void stop(String reason, boolean closeFront) { - stop(reason, closeFront, true); + public void startPool(String reason) { + if (disabled.get() || fakeNode) { + LOGGER.info("init dbInstance[{}] because {}, but it is disabled or a fakeNode, skip initialization.", this.dbGroup.getGroupName() + "." + name, reason); + return; + } + if ((dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this) && !dbGroup.isUseless()) { + if (LOGGER.isDebugEnabled()) { + ReloadLogHelper.debug("start connection pool :{},reason:{}", LOGGER, this.toString(), reason); + } + this.connectionPool.startEvictor(this.dbGroup.getGroupName() + "." + name, reason); + } } - public void stop(String reason, boolean closeFront, boolean isStopHeartbeat) { - if (isStopHeartbeat) { - heartbeat.stop(reason); + private boolean checkState() { + if (dbGroup.getBindingCount() != 0) { + dbGroup.setState(PhysicalDbGroup.STATE_DELETING); + IOProcessor.BACKENDS_OLD_INSTANCE.add(this); + return false; } - if (dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this) { - LOGGER.info("stop connection pool of physical db instance[{}], due to {}", this.dbGroup.getGroupName() + "." + name, reason); - connectionPool.stop(reason, closeFront); + if (dbGroup.isStop()) { + return false; + } + if (dbGroup.getBindingCount() != 0) { + dbGroup.setState(PhysicalDbGroup.STATE_DELETING); + IOProcessor.BACKENDS_OLD_INSTANCE.add(this); + return false; + } + return true; + } + + public boolean stopOfBackground(String reason) { + if (dbGroup.getState() == PhysicalDbGroup.STATE_DELETING && dbGroup.getBindingCount() == 0) { + stopDirectly(reason, false); + return true; + } + return false; + } + + public void stopDirectly(String reason, boolean closeFront) { + stop(reason, closeFront, true, dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this); + } + + public void stop(String reason, boolean closeFront) { + boolean flag = checkState(); + if (!flag) { + return; + } + stopDirectly(reason, closeFront); + } + + protected void stop(String reason, boolean closeFront, boolean isStopHeartbeat, boolean isStopPool) { + if (isStopHeartbeat) { + stopHeartbeat(reason); + } + if (isStopPool) { + stopPool(reason, closeFront); } isInitial.set(false); } + public void stopHeartbeat(String reason) { + if (LOGGER.isDebugEnabled()) { + ReloadLogHelper.debug("stop heartbeat :{},reason:{}", LOGGER, this.toString(), reason); + } + heartbeat.stop(reason); + } + + public void stopPool(String reason, boolean closeFront) { + LOGGER.info("stop connection pool of physical db instance[{}], due to {}", this.dbGroup.getGroupName() + "." + name, reason); + if (LOGGER.isDebugEnabled()) { + ReloadLogHelper.debug("stop connection pool :{},reason:{},is close front:{}", LOGGER, this.toString(), reason, closeFront); + } + connectionPool.stop(reason, closeFront); + } + + public void updatePoolCapacity() { + //minCon/maxCon/numOfShardingNodes + checkPoolSize(); + connectionPool.evictImmediately(); + connectionPool.fillPool(); + } + public void closeAllConnection(String reason) { this.needSkipEvit = true; this.connectionPool.forceCloseAllConnection(reason); @@ -460,27 +536,53 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { } @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (!(other instanceof PhysicalDbInstance)) { - return false; - } + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + PhysicalDbInstance that = (PhysicalDbInstance) o; - PhysicalDbInstance dbInstance = (PhysicalDbInstance) other; - DbInstanceConfig otherConfig = dbInstance.getConfig(); - DbInstanceConfig thisConfig = this.getConfig(); - return otherConfig.getUser().equals(thisConfig.getUser()) && otherConfig.getUrl().equals(thisConfig.getUrl()) && - otherConfig.getMaxCon() == thisConfig.getMaxCon() && otherConfig.getMinCon() == thisConfig.getMinCon() && - otherConfig.getPassword().equals(thisConfig.getPassword()) && otherConfig.getInstanceName().equals(thisConfig.getInstanceName()) && - dbInstance.isDisabled() == this.isDisabled() && otherConfig.getReadWeight() == thisConfig.getReadWeight() && - otherConfig.getPoolConfig().equals(thisConfig.getPoolConfig()) && otherConfig.isUsingDecrypt() == thisConfig.isUsingDecrypt(); + return readInstance == that.readInstance && + Objects.equals(name, that.name) && + Objects.equals(config, that.config) && + dbGroupConfig.equalsBaseInfo(that.dbGroupConfig) && + dbGroup.equalsBaseInfo(that.dbGroup) && + Objects.equals(disabled.get(), that.disabled.get()); } @Override public int hashCode() { - return super.hashCode(); + return Objects.hash(name, config, readInstance, dbGroupConfig, dbGroup, disabled, heartbeat); + } + + public boolean equalsForConnectionPool(PhysicalDbInstance dbInstance) { + return this.config.getUrl().equals(dbInstance.getConfig().getUrl()) && + this.config.getPort() == dbInstance.getConfig().getPort() && + this.config.getUser().equals(dbInstance.getConfig().getUser()) && + this.config.getPassword().equals(dbInstance.getConfig().getPassword()) && + this.config.isUsingDecrypt() == dbInstance.getConfig().isUsingDecrypt() && + this.config.getPoolConfig().getTimeBetweenEvictionRunsMillis() == dbInstance.getConfig().getPoolConfig().getTimeBetweenEvictionRunsMillis(); + } + + public boolean equalsForPoolCapacity(PhysicalDbInstance dbInstance) { + return this.config.getMinCon() == dbInstance.getConfig().getMinCon() && + this.config.getMaxCon() == dbInstance.getConfig().getMaxCon(); + } + + public boolean equalsForHeartbeat(PhysicalDbInstance dbInstance) { + return this.config.getUrl().equals(dbInstance.getConfig().getUrl()) && + this.config.getPort() == dbInstance.getConfig().getPort() && + this.config.getUser().equals(dbInstance.getConfig().getUser()) && + this.config.getPassword().equals(dbInstance.getConfig().getPassword()) && + this.config.isUsingDecrypt() == dbInstance.getConfig().isUsingDecrypt() && + this.config.getPoolConfig().getHeartbeatPeriodMillis() == dbInstance.getConfig().getPoolConfig().getHeartbeatPeriodMillis(); + } + + public boolean equalsForTestConn(PhysicalDbInstance dbInstance) { + return this.config.getUrl().equals(dbInstance.getConfig().getUrl()) && + this.config.getPort() == dbInstance.getConfig().getPort() && + this.config.getUser().equals(dbInstance.getConfig().getUser()) && + this.config.getPassword().equals(dbInstance.getConfig().getPassword()) && + this.config.isUsingDecrypt() == dbInstance.getConfig().isUsingDecrypt(); } @Override @@ -491,4 +593,9 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { ",minCon=" + config.getMinCon() + "]"; } + public void copyBaseInfo(PhysicalDbInstance physicalDbInstance) { + this.config = physicalDbInstance.getConfig(); + this.connectionPool.copyBaseInfo(physicalDbInstance.connectionPool); + } + } diff --git a/src/main/java/com/actiontech/dble/backend/datasource/ShardingNode.java b/src/main/java/com/actiontech/dble/backend/datasource/ShardingNode.java index 8a79e7558..57eee649a 100644 --- a/src/main/java/com/actiontech/dble/backend/datasource/ShardingNode.java +++ b/src/main/java/com/actiontech/dble/backend/datasource/ShardingNode.java @@ -14,6 +14,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Objects; public class ShardingNode { protected static final Logger LOGGER = LoggerFactory.getLogger(ShardingNode.class); @@ -144,4 +145,19 @@ public class ShardingNode { StringUtil.equalsWithEmpty(this.database, shardingNode.getDatabase()) && this.dbGroup.equalsBaseInfo(shardingNode.getDbGroup()); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ShardingNode that = (ShardingNode) o; + return Objects.equals(name, that.name) && + Objects.equals(dbGroupName, that.dbGroupName) && + Objects.equals(database, that.database); + } + + @Override + public int hashCode() { + return Objects.hash(name, dbGroupName, database); + } } diff --git a/src/main/java/com/actiontech/dble/backend/pool/ConnectionPool.java b/src/main/java/com/actiontech/dble/backend/pool/ConnectionPool.java index 254f76c8b..8bcca7e71 100644 --- a/src/main/java/com/actiontech/dble/backend/pool/ConnectionPool.java +++ b/src/main/java/com/actiontech/dble/backend/pool/ConnectionPool.java @@ -179,7 +179,7 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener } } - private void fillPool() { + public void fillPool() { final int idleCount = getCount(STATE_NOT_IN_USE, STATE_HEARTBEAT); int tmpTotalConnections = totalConnections.get(); if (tmpTotalConnections < 0) { @@ -385,6 +385,33 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener } + public void evictImmediately() { + + final ArrayList idleList = new ArrayList<>(allConnections.size()); + for (final PooledConnection entry : allConnections) { + if (entry.getState() == STATE_NOT_IN_USE) { + idleList.add(entry); + } + } + + int removable = idleList.size() - config.getMinCon(); + if (removable <= 0) { + return; + } + + // Sort pool entries on lastAccessed + idleList.sort(LAST_ACCESS_COMPARABLE); + + logPoolState("before cleanup "); + for (PooledConnection conn : idleList) { + if (removable > 0 && conn.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED)) { + conn.close("connection has passed idleTimeout"); + removable--; + } + } + + } + public ReadTimeStatusInstance getInstance() { return instance; } @@ -435,6 +462,10 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener } } + public void copyBaseInfo(ConnectionPool connectionPool) { + this.config = connectionPool.config; + } + /** * The idle object evictor. */ diff --git a/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java b/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java index aacd9b834..724ed4b15 100644 --- a/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java +++ b/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java @@ -13,7 +13,7 @@ import java.io.IOException; public class PoolBase { - protected final DbInstanceConfig config; + protected DbInstanceConfig config; protected final ReadTimeStatusInstance instance; protected final PooledConnectionFactory factory; diff --git a/src/main/java/com/actiontech/dble/config/ConfigInitializer.java b/src/main/java/com/actiontech/dble/config/ConfigInitializer.java index 4edb27488..e1298a63d 100644 --- a/src/main/java/com/actiontech/dble/config/ConfigInitializer.java +++ b/src/main/java/com/actiontech/dble/config/ConfigInitializer.java @@ -20,11 +20,16 @@ import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.config.model.db.type.DataBaseType; import com.actiontech.dble.config.model.sharding.SchemaConfig; import com.actiontech.dble.config.model.sharding.table.ERTable; -import com.actiontech.dble.config.model.user.*; +import com.actiontech.dble.config.model.user.AnalysisUserConfig; +import com.actiontech.dble.config.model.user.RwSplitUserConfig; +import com.actiontech.dble.config.model.user.UserConfig; +import com.actiontech.dble.config.model.user.UserName; import com.actiontech.dble.config.util.ConfigException; +import com.actiontech.dble.meta.ReloadLogHelper; import com.actiontech.dble.plan.common.ptr.BoolPtr; import com.actiontech.dble.route.function.AbstractPartitionAlgorithm; import com.actiontech.dble.route.parser.util.Pair; +import com.actiontech.dble.services.manager.response.ReloadConfig; import com.actiontech.dble.singleton.TraceManager; import com.google.common.collect.Maps; import org.slf4j.Logger; @@ -122,7 +127,7 @@ public class ConfigInitializer implements ProblemReporter { this.dbGroups = dbConverter.getDbGroupMap(); this.dbConfig = dbJson; } else { - this.dbGroups = new HashMap<>(); + this.dbGroups = Maps.newLinkedHashMap(); this.dbConfig = null; } @@ -183,7 +188,7 @@ public class ConfigInitializer implements ProblemReporter { } private void checkWriteDbInstance() { - if (this.dbGroups.isEmpty()) { + if (this.dbGroups == null || this.dbGroups.isEmpty()) { return; } //Mark all dbInstance whether they are fake or not @@ -229,6 +234,116 @@ public class ConfigInitializer implements ProblemReporter { } } + public void testConnection(List changeItemList) { + TraceManager.TraceObject traceObject = TraceManager.threadTrace("test-connection"); + try { + Map>> hostSchemaMap = genDbInstanceSchemaMap(); + Set errDbInstanceNames = new HashSet<>(); + boolean isAllDbInstanceConnected = true; + // check whether dbInstance is connected + String dbGroupName; + PhysicalDbGroup dbGroup; + + for (ReloadConfig.ChangeItem changeItem : changeItemList) { + int type = changeItem.getType(); + Object item = changeItem.getItem(); + switch (type) { + case 1: + if (item instanceof PhysicalDbGroup) { + //test dbGroup + dbGroup = (PhysicalDbGroup) item; + dbGroupName = dbGroup.getGroupName(); + + // sharding group + List> schemaList = checkDbGroupMaxConn(hostSchemaMap, dbGroup); + + for (PhysicalDbInstance ds : dbGroup.getDbInstances(true)) { + //test dbInstance + boolean testResult = checkAndTestDbInstance(ds, dbGroupName, schemaList); + if (!testResult) { + isAllDbInstanceConnected = false; + errDbInstanceNames.add("dbInstance[" + dbGroupName + "." + ds.getName() + "]"); + } + } + } else if (item instanceof PhysicalDbInstance) { + PhysicalDbInstance ds = (PhysicalDbInstance) item; + dbGroupName = ds.getDbGroupConfig().getName(); + // sharding group + List> schemaList = checkDbInstanceMaxConn(hostSchemaMap, ds); + //test dbInstance + boolean testResult = checkAndTestDbInstance(ds, dbGroupName, schemaList); + if (!testResult) { + isAllDbInstanceConnected = false; + errDbInstanceNames.add("dbInstance[" + dbGroupName + "." + ds.getName() + "]"); + } + } else if (item instanceof ShardingNode) { + ShardingNode shardingNode = (ShardingNode) item; + dbGroup = shardingNode.getDbGroup(); + dbGroupName = dbGroup.getGroupName(); + // sharding group + List> schemaList = checkDbGroupMaxConn(hostSchemaMap, dbGroup); + + for (PhysicalDbInstance ds : dbGroup.getDbInstances(true)) { + //test dbInstance + boolean testResult = checkAndTestDbInstance(ds, dbGroupName, schemaList); + if (!testResult) { + isAllDbInstanceConnected = false; + errDbInstanceNames.add("dbInstance[" + dbGroupName + "." + ds.getName() + "]"); + } + } + } + break; + case 2: + if (item instanceof PhysicalDbInstance && changeItem.isAffectTestConn()) { + PhysicalDbInstance ds = (PhysicalDbInstance) item; + dbGroupName = ds.getDbGroupConfig().getName(); + // sharding group + List> schemaList = checkDbInstanceMaxConn(hostSchemaMap, ds); + //test dbInstance + //test dbInstance + boolean testResult = checkAndTestDbInstance(ds, dbGroupName, schemaList); + if (!testResult) { + isAllDbInstanceConnected = false; + errDbInstanceNames.add("dbInstance[" + dbGroupName + "." + ds.getName() + "]"); + } + } else if (item instanceof ShardingNode) { + ShardingNode shardingNode = (ShardingNode) item; + dbGroup = shardingNode.getDbGroup(); + dbGroupName = dbGroup.getGroupName(); + // sharding group + List> schemaList = checkDbGroupMaxConn(hostSchemaMap, dbGroup); + + for (PhysicalDbInstance ds : dbGroup.getDbInstances(true)) { + //test dbInstance + boolean testResult = checkAndTestDbInstance(ds, dbGroupName, schemaList); + if (!testResult) { + isAllDbInstanceConnected = false; + errDbInstanceNames.add("dbInstance[" + dbGroupName + "." + ds.getName() + "]"); + } + } + } + break; + default: + break; + } + } + + if (!isAllDbInstanceConnected) { + StringBuilder sb = new StringBuilder("SelfCheck### there are some dbInstance connection failed, pls check these dbInstance:"); + for (String key : errDbInstanceNames) { + sb.append("{"); + sb.append(key); + sb.append("},"); + } + throw new ConfigException(sb.toString()); + } + + } finally { + TraceManager.finishSpan(traceObject); + } + } + + public void testConnection() { TraceManager.TraceObject traceObject = TraceManager.threadTrace("test-connection"); try { @@ -283,17 +398,55 @@ public class ConfigInitializer implements ProblemReporter { } } + private List> checkDbInstanceMaxConn(Map>> hostSchemaMap, PhysicalDbInstance ds) { + List> schemaList = null; + if (hostSchemaMap.containsKey(ds.getDbGroupConfig().getName())) { + schemaList = hostSchemaMap.get(ds.getDbGroupConfig().getName()); + checkMaxCon(ds, schemaList.size()); + } + return schemaList; + } + + private List> checkDbGroupMaxConn(Map>> hostSchemaMap, PhysicalDbGroup dbGroup) { + List> schemaList = null; + if (hostSchemaMap.containsKey(dbGroup.getGroupName())) { + schemaList = hostSchemaMap.get(dbGroup.getGroupName()); + checkMaxCon(dbGroup, schemaList.size()); + } + return schemaList; + } + + private boolean checkAndTestDbInstance(PhysicalDbInstance ds, String dbGroupName, List> schemaList) { + if (ds.getConfig().isDisabled()) { + errorInfos.add(new ErrorInfo("Backend", "WARNING", "dbGroup[" + dbGroupName + "," + ds.getName() + "] is disabled")); + LOGGER.info("dbGroup[" + ds.getDbGroupConfig().getName() + "] is disabled,just mark testing failed and skip it"); + ds.setTestConnSuccess(false); + return true; + } else if (ds.isFakeNode()) { + errorInfos.add(new ErrorInfo("Backend", "WARNING", "dbGroup[" + dbGroupName + "," + ds.getName() + "] is fake Node")); + LOGGER.info("dbGroup[" + ds.getDbGroupConfig().getName() + "] is disabled,just mark testing failed and skip it"); + ds.setTestConnSuccess(false); + return true; + } + return testDbInstance(dbGroupName, ds, schemaList); + } + + private void checkMaxCon(PhysicalDbGroup pool, int schemasCount) { for (PhysicalDbInstance dbInstance : pool.getDbInstances(true)) { - if (dbInstance.getConfig().getMaxCon() < Math.max(schemasCount + 1, dbInstance.getConfig().getMinCon())) { - errorInfos.add(new ErrorInfo("Xml", "NOTICE", "dbGroup[" + pool.getGroupName() + "." + dbInstance.getConfig().getInstanceName() + "] maxCon too little,would be change to " + - Math.max(schemasCount + 1, dbInstance.getConfig().getMinCon()))); - } + checkMaxCon(dbInstance, schemasCount); + } + } - if (Math.max(schemasCount + 1, dbInstance.getConfig().getMinCon()) != dbInstance.getConfig().getMinCon()) { - errorInfos.add(new ErrorInfo("Xml", "NOTICE", "dbGroup[" + pool.getGroupName() + "] minCon too little, Dble would init dbGroup" + - " with " + (schemasCount + 1) + " connections")); - } + private void checkMaxCon(PhysicalDbInstance dbInstance, int schemasCount) { + if (dbInstance.getConfig().getMaxCon() < Math.max(schemasCount + 1, dbInstance.getConfig().getMinCon())) { + errorInfos.add(new ErrorInfo("Xml", "NOTICE", "dbGroup[" + dbInstance.getDbGroupConfig().getName() + "." + dbInstance.getConfig().getInstanceName() + "] maxCon too little,would be change to " + + Math.max(schemasCount + 1, dbInstance.getConfig().getMinCon()))); + } + + if (Math.max(schemasCount + 1, dbInstance.getConfig().getMinCon()) != dbInstance.getConfig().getMinCon()) { + errorInfos.add(new ErrorInfo("Xml", "NOTICE", "dbGroup[" + dbInstance.getDbGroupConfig().getName() + "] minCon too little, Dble would init dbGroup" + + " with " + (schemasCount + 1) + " connections")); } } @@ -322,6 +475,8 @@ public class ConfigInitializer implements ProblemReporter { errorInfos.add(new ErrorInfo("Backend", "WARNING", "Can't connect to [" + dbInstanceKey + "]")); LOGGER.warn("SelfCheck### can't connect to [" + dbInstanceKey + "]"); isConnectivity = false; + } finally { + ReloadLogHelper.debug("test connection dbInstance:{},is connect:{},schemaList:{}", LOGGER, ds, isConnectivity, schemaList); } return isConnectivity; } diff --git a/src/main/java/com/actiontech/dble/config/DbleTempConfig.java b/src/main/java/com/actiontech/dble/config/DbleTempConfig.java index d3aa255b8..2df1b84b4 100644 --- a/src/main/java/com/actiontech/dble/config/DbleTempConfig.java +++ b/src/main/java/com/actiontech/dble/config/DbleTempConfig.java @@ -17,6 +17,7 @@ public final class DbleTempConfig { private RawJson shardingConfig; private RawJson userConfig; private RawJson sequenceConfig; + private boolean lowerCase; public RawJson getDbConfig() { return dbConfig; @@ -50,6 +51,14 @@ public final class DbleTempConfig { this.sequenceConfig = sequenceConfig; } + public boolean isLowerCase() { + return lowerCase; + } + + public void setLowerCase(boolean lowerCase) { + this.lowerCase = lowerCase; + } + public static DbleTempConfig getInstance() { return INSTANCE; } diff --git a/src/main/java/com/actiontech/dble/config/ServerConfig.java b/src/main/java/com/actiontech/dble/config/ServerConfig.java index 973f61a48..a948f4da2 100644 --- a/src/main/java/com/actiontech/dble/config/ServerConfig.java +++ b/src/main/java/com/actiontech/dble/config/ServerConfig.java @@ -11,6 +11,7 @@ import com.actiontech.dble.alarm.Alert; import com.actiontech.dble.alarm.AlertUtil; import com.actiontech.dble.backend.datasource.PhysicalDbGroup; import com.actiontech.dble.backend.datasource.PhysicalDbGroupDiff; +import com.actiontech.dble.backend.datasource.PhysicalDbInstance; import com.actiontech.dble.backend.datasource.ShardingNode; import com.actiontech.dble.cluster.JsonFactory; import com.actiontech.dble.cluster.logic.ClusterLogic; @@ -34,12 +35,14 @@ import com.actiontech.dble.route.function.AbstractPartitionAlgorithm; import com.actiontech.dble.route.parser.ManagerParseConfig; import com.actiontech.dble.route.parser.util.Pair; import com.actiontech.dble.server.variables.SystemVariables; +import com.actiontech.dble.services.manager.response.ReloadConfig; import com.actiontech.dble.singleton.CacheService; import com.actiontech.dble.singleton.HaConfigManager; import com.actiontech.dble.singleton.ProxyMeta; import com.actiontech.dble.singleton.SequenceManager; import com.actiontech.dble.util.StringUtil; import com.actiontech.dble.util.TimeUtil; +import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +76,7 @@ public class ServerConfig { private RawJson shardingConfig; private RawJson userConfig; private RawJson sequenceConfig; + private boolean lowerCase; public ServerConfig() { //read sharding.xml,db.xml and user.xml @@ -220,15 +224,15 @@ public class ServerConfig { public boolean reload(Map newUsers, Map newSchemas, Map newShardingNodes, Map newDbGroups, - Map recycleDbGroups, + Map oldDbGroups, Map> newErRelations, Map> newFuncNodeERMap, SystemVariables newSystemVariables, boolean isFullyConfigured, final int loadAllMode, Map newBlacklistConfig, Map newFunctions, - RawJson userJsonConfig, RawJson sequenceJsonConfig, RawJson shardingJsonConfig, RawJson dbJsonConfig) throws SQLNonTransientException { - boolean result = apply(newUsers, newSchemas, newShardingNodes, newDbGroups, recycleDbGroups, newErRelations, newFuncNodeERMap, + RawJson userJsonConfig, RawJson sequenceJsonConfig, RawJson shardingJsonConfig, RawJson dbJsonConfig, List changeItemList) throws SQLNonTransientException { + boolean result = apply(newUsers, newSchemas, newShardingNodes, newDbGroups, oldDbGroups, newErRelations, newFuncNodeERMap, newSystemVariables, isFullyConfigured, loadAllMode, newBlacklistConfig, newFunctions, userJsonConfig, - sequenceJsonConfig, shardingJsonConfig, dbJsonConfig); + sequenceJsonConfig, shardingJsonConfig, dbJsonConfig, changeItemList); this.reloadTime = TimeUtil.currentTimeMillis(); return result; } @@ -342,12 +346,12 @@ public class ServerConfig { Map newSchemas, Map newShardingNodes, Map newDbGroups, - Map recycleDbGroups, + Map oldDbGroups, Map> newErRelations, Map> newFuncNodeERMap, SystemVariables newSystemVariables, boolean isFullyConfigured, final int loadAllMode, Map newBlacklistConfig, Map newFunctions, - RawJson userJsonConfig, RawJson sequenceJsonConfig, RawJson shardingJsonConfig, RawJson dbJsonConfig) throws SQLNonTransientException { + RawJson userJsonConfig, RawJson sequenceJsonConfig, RawJson shardingJsonConfig, RawJson dbJsonConfig, List changeItemList) throws SQLNonTransientException { List> delTables = new ArrayList<>(); List> reloadTables = new ArrayList<>(); List delSchema = new ArrayList<>(); @@ -369,23 +373,29 @@ public class ServerConfig { } catch (Exception e) { throw new SQLNonTransientException("HaConfigManager init failed", "HY000", ErrorCode.ER_YES); } - // old dbGroup - // 1 stop heartbeat - // 2 backup - //-------------------------------------------- - if (recycleDbGroups != null) { - String recycleGroupName; - PhysicalDbGroup recycleGroup; - for (Map.Entry entry : recycleDbGroups.entrySet()) { - recycleGroupName = entry.getKey(); - recycleGroup = entry.getValue(); - // avoid recycleGroup == newGroup, can't stop recycleGroup - if (newDbGroups.get(recycleGroupName) != recycleGroup) { - ReloadLogHelper.info("reload config, recycle old group. old active backend conn will be close", LOGGER); - recycleGroup.stop("reload config, recycle old group", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0)); + + + ReloadLogHelper.info("reload config: init new dbGroup start", LOGGER); + if ((loadAllMode & ManagerParseConfig.OPTR_MODE) != 0) { + //all dbGroup reload & recycle + initDbGroupByMap(oldDbGroups, newDbGroups, newShardingNodes, isFullyConfigured, loadAllMode); + } else { + //replace dbGroup reference + for (Map.Entry shardingNodeEntry : newShardingNodes.entrySet()) { + ShardingNode shardingNode = shardingNodeEntry.getValue(); + PhysicalDbGroup oldDbGroup = oldDbGroups.get(shardingNode.getDbGroupName()); + if (null == oldDbGroup) { + oldDbGroup = newDbGroups.get(shardingNode.getDbGroupName()); } + shardingNode.setDbGroup(oldDbGroup); } + //only change dbGroup reload & recycle + initDbGroupByMap(changeItemList, oldDbGroups, newShardingNodes, isFullyConfigured, loadAllMode); + newDbGroups = oldDbGroups; } + ReloadLogHelper.info("reload config: init new dbGroup end", LOGGER); + + this.shardingNodes = newShardingNodes; this.dbGroups = newDbGroups; this.fullyConfigured = isFullyConfigured; @@ -401,6 +411,7 @@ public class ServerConfig { this.dbConfig = dbJsonConfig; this.shardingConfig = shardingJsonConfig; this.sequenceConfig = sequenceJsonConfig; + this.lowerCase = DbleTempConfig.getInstance().isLowerCase(); CacheService.getInstance().clearCache(); this.changing = false; if (isFullyConfigured) { @@ -413,6 +424,194 @@ public class ServerConfig { return true; } + private static void initDbGroupByMap(Map oldDbGroups, Map newDbGroups, + Map newShardingNodes, boolean fullyConfigured, int loadAllMode) { + if (oldDbGroups != null) { + //Only -r uses this method to recycle the connection pool + String recycleGroupName; + PhysicalDbGroup recycleGroup; + for (Map.Entry entry : oldDbGroups.entrySet()) { + recycleGroupName = entry.getKey(); + recycleGroup = entry.getValue(); + // avoid recycleGroup == newGroup, can't stop recycleGroup + if (newDbGroups.get(recycleGroupName) != recycleGroup) { + ReloadLogHelper.info("reload config, recycle old group. old active backend conn will be close", LOGGER); + recycleGroup.stop("reload config, recycle old group", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0)); + } + } + } + for (PhysicalDbGroup dbGroup : newDbGroups.values()) { + String hostName = dbGroup.getGroupName(); + // set schemas + ArrayList dnSchemas = new ArrayList<>(30); + for (ShardingNode dn : newShardingNodes.values()) { + if (dn.getDbGroup().getGroupName().equals(hostName)) { + dn.setDbGroup(dbGroup); + dnSchemas.add(dn.getDatabase()); + } + } + dbGroup.setSchemas(dnSchemas); + if (fullyConfigured) { + dbGroup.init("reload config"); + } else { + LOGGER.info("dbGroup[" + hostName + "] is not fullyConfigured, so doing nothing"); + } + } + } + + + private void initDbGroupByMap(List changeItemList, Map oldDbGroupMap, Map newShardingNodes, + boolean isFullyConfigured, int loadAllMode) { + List updateDbGroupList = Lists.newArrayList(); + for (ReloadConfig.ChangeItem changeItem : changeItemList) { + Object item = changeItem.getItem(); + switch (changeItem.getType()) { + case 1: + addItem(item, oldDbGroupMap, newShardingNodes, isFullyConfigured); + break; + case 2: + updateItem(item, oldDbGroupMap, newShardingNodes, changeItem, updateDbGroupList, loadAllMode); + break; + case 3: + deleteItem(item, oldDbGroupMap, loadAllMode); + break; + default: + break; + } + } + for (PhysicalDbGroup physicalDbGroup : updateDbGroupList) { + physicalDbGroup.startHeartbeat(); + } + } + + private void deleteItem(Object item, Map oldDbGroupMap, int loadAllMode) { + if (item instanceof PhysicalDbGroup) { + //delete dbGroup + PhysicalDbGroup physicalDbGroup = (PhysicalDbGroup) item; + PhysicalDbGroup oldDbGroup = oldDbGroupMap.remove(physicalDbGroup.getGroupName()); + oldDbGroup.stop("reload config, recycle old group", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0)); + oldDbGroup = null; + } else if (item instanceof PhysicalDbInstance) { + PhysicalDbInstance physicalDbInstance = (PhysicalDbInstance) item; + //delete slave instance + PhysicalDbGroup physicalDbGroup = oldDbGroupMap.get(physicalDbInstance.getDbGroupConfig().getName()); + PhysicalDbInstance oldDbInstance = physicalDbGroup.getAllDbInstanceMap().remove(physicalDbInstance.getName()); + oldDbInstance.stop("reload config, recycle old instance", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0)); + oldDbInstance = null; + } else if (item instanceof ShardingNode) { + ShardingNode shardingNode = (ShardingNode) item; + if (shardingNode.getDbGroup() != null) { + shardingNode.getDbGroup().removeSchema(shardingNode.getDatabase()); + } + } + } + + private void updateItem(Object item, Map oldDbGroupMap, Map newShardingNodes, ReloadConfig.ChangeItem changeItem, + List updateDbGroupList, int loadAllMode) { + if (item instanceof PhysicalDbGroup) { + //change dbGroup + PhysicalDbGroup physicalDbGroup = (PhysicalDbGroup) item; + PhysicalDbGroup oldDbGroup = oldDbGroupMap.get(physicalDbGroup.getGroupName()); + if (changeItem.isAffectHeartbeat()) { + oldDbGroup.stopHeartbeat("reload config, stop group heartbeat"); + oldDbGroup.copyBaseInfo(physicalDbGroup); + //create a new heartbeat in the follow-up + updateDbGroupList.add(oldDbGroup); + } else { + oldDbGroup.copyBaseInfo(physicalDbGroup); + } + reloadSchema(oldDbGroup, newShardingNodes); + if (changeItem.isAffectConnectionPool()) { + if (physicalDbGroup.getRwSplitMode() == 0) { + oldDbGroup.stopPool("reload config, recycle read instance", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0), false); + } else { + oldDbGroup.startPool("reload config, init read instance", false); + } + if (physicalDbGroup.isUseless()) { + oldDbGroup.stopPool("reload config, recycle all instance", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0), true); + } else { + oldDbGroup.startPool("reload config, init all instance", true); + } + } + oldDbGroupMap.put(physicalDbGroup.getGroupName(), oldDbGroup); + } else if (item instanceof PhysicalDbInstance) { + if (changeItem.isAffectHeartbeat() || changeItem.isAffectConnectionPool()) { + PhysicalDbInstance physicalDbInstance = (PhysicalDbInstance) item; + PhysicalDbGroup physicalDbGroup = oldDbGroupMap.get(physicalDbInstance.getDbGroupConfig().getName()); + PhysicalDbInstance oldDbInstance = physicalDbGroup.getAllDbInstanceMap().get(physicalDbInstance.getName()); + oldDbInstance.stop("reload config, recycle old instance", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0)); + oldDbInstance = null; + physicalDbInstance.init("reload config", true); + physicalDbGroup.setDbInstance(physicalDbInstance); + } else { + PhysicalDbInstance physicalDbInstance = (PhysicalDbInstance) item; + PhysicalDbGroup physicalDbGroup = oldDbGroupMap.get(physicalDbInstance.getDbGroupConfig().getName()); + PhysicalDbInstance oldDbInstance = physicalDbGroup.getAllDbInstanceMap().get(physicalDbInstance.getName()); + //copy base config + oldDbInstance.copyBaseInfo(physicalDbInstance); + if (changeItem.isAffectPoolCapacity()) { + oldDbInstance.updatePoolCapacity(); + } + } + } else if (item instanceof ShardingNode) { + ShardingNode shardingNode = (ShardingNode) item; + ShardingNode oldShardingNode = this.shardingNodes.get(shardingNode.getName()); + if (oldShardingNode != null && oldShardingNode.getDbGroup() != null) { + oldShardingNode.getDbGroup().removeSchema(oldShardingNode.getDatabase()); + } + if (shardingNode.getDbGroup() != null) { + shardingNode.getDbGroup().addSchema(shardingNode.getDatabase()); + } + } + } + + private void addItem(Object item, Map oldDbGroupMap, Map newShardingNodes, boolean isFullyConfigured) { + if (item instanceof PhysicalDbGroup) { + //add dbGroup+dbInstance + PhysicalDbGroup physicalDbGroup = (PhysicalDbGroup) item; + initDbGroup(physicalDbGroup, newShardingNodes, isFullyConfigured); + oldDbGroupMap.put(physicalDbGroup.getGroupName(), physicalDbGroup); + } else if (item instanceof PhysicalDbInstance) { + //add dbInstance + PhysicalDbInstance dbInstance = (PhysicalDbInstance) item; + PhysicalDbGroup physicalDbGroup = oldDbGroupMap.get(dbInstance.getDbGroupConfig().getName()); + if (isFullyConfigured) { + dbInstance.init("reload config", true); + physicalDbGroup.setDbInstance(dbInstance); + } else { + LOGGER.info("dbGroup[" + dbInstance.getDbGroupConfig().getName() + "] is not fullyConfigured, so doing nothing"); + } + } else if (item instanceof ShardingNode) { + ShardingNode shardingNode = (ShardingNode) item; + if (shardingNode.getDbGroup() != null) { + shardingNode.getDbGroup().addSchema(shardingNode.getDatabase()); + } + } + } + + public static void reloadSchema(PhysicalDbGroup dbGroup, Map newShardingNodes) { + String hostName = dbGroup.getGroupName(); + // set schemas + ArrayList dnSchemas = new ArrayList<>(30); + for (ShardingNode dn : newShardingNodes.values()) { + if (dn.getDbGroup().getGroupName().equals(hostName)) { + dn.setDbGroup(dbGroup); + dnSchemas.add(dn.getDatabase()); + } + } + dbGroup.setSchemas(dnSchemas); + } + + + private static void initDbGroup(PhysicalDbGroup dbGroup, Map newShardingNodes, boolean fullyConfigured) { + reloadSchema(dbGroup, newShardingNodes); + if (fullyConfigured) { + dbGroup.init("reload config"); + } else { + LOGGER.info("dbGroup[" + dbGroup.getGroupName() + "] is not fullyConfigured, so doing nothing"); + } + } + private boolean reloadMetaData(List> delTables, List> reloadTables, List delSchema, List reloadSchema) { boolean reloadResult = true; if (delSchema.size() > 0) { @@ -626,6 +825,10 @@ public class ServerConfig { public RawJson getSequenceConfig() { return sequenceConfig; } + + public boolean isLowerCase() { + return lowerCase; + } } diff --git a/src/main/java/com/actiontech/dble/config/converter/UserConverter.java b/src/main/java/com/actiontech/dble/config/converter/UserConverter.java index cd6ed60b4..c10bb485d 100644 --- a/src/main/java/com/actiontech/dble/config/converter/UserConverter.java +++ b/src/main/java/com/actiontech/dble/config/converter/UserConverter.java @@ -124,7 +124,7 @@ public class UserConverter { } if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Xml to Shardings is :" + usersBean); + LOGGER.debug("Xml to Users is :" + usersBean); } if (null == usersBean.getUser() || !usersBean.getUser().stream().anyMatch(userObj -> userObj instanceof ManagerUser)) { throw new ConfigException("user.xml contains at least one managerUser"); @@ -172,10 +172,10 @@ public class UserConverter { UserName userName = new UserName(userConfig.getName(), tenant); if (this.userConfigMap.containsKey(userName)) { - throw new ConfigException("User [" + userName + "] has already existed"); + throw new ConfigException("User [" + userName.getFullName() + "] has already existed"); } if (StringUtil.isEmpty(dbGroup)) { - throw new ConfigException("User [" + userName + "]'s dbGroup is empty"); + throw new ConfigException("User [" + userName.getFullName() + "]'s dbGroup is empty"); } WallProvider wallProvider = getWallProvider(blackListMap, problemReporter, blacklistStr, userName); @@ -191,10 +191,10 @@ public class UserConverter { UserName userName = new UserName(userConfig.getName(), tenant); if (this.userConfigMap.containsKey(userName)) { - throw new ConfigException("User [" + userName + "] has already existed"); + throw new ConfigException("User [" + userName.getFullName() + "] has already existed"); } if (StringUtil.isEmpty(dbGroup)) { - throw new ConfigException("User [" + userName + "]'s dbGroup is empty"); + throw new ConfigException("User [" + userName.getFullName() + "]'s dbGroup is empty"); } WallProvider wallProvider = getWallProvider(blackListMap, problemReporter, blacklistStr, userName); @@ -211,10 +211,10 @@ public class UserConverter { UserName userName = new UserName(userConfig.getName(), tenant); if (this.userConfigMap.containsKey(userName)) { - throw new ConfigException("User [" + userName + "] has already existed"); + throw new ConfigException("User [" + userName.getFullName() + "] has already existed"); } if (StringUtil.isEmpty(schemas)) { - throw new ConfigException("User [" + userName + "]'s schemas is empty"); + throw new ConfigException("User [" + userName.getFullName() + "]'s schemas is empty"); } String[] strArray = SplitUtil.split(schemas, ',', true); diff --git a/src/main/java/com/actiontech/dble/config/helper/GetAndSyncDbInstanceKeyVariables.java b/src/main/java/com/actiontech/dble/config/helper/GetAndSyncDbInstanceKeyVariables.java index 3630b0493..12c580ab4 100644 --- a/src/main/java/com/actiontech/dble/config/helper/GetAndSyncDbInstanceKeyVariables.java +++ b/src/main/java/com/actiontech/dble/config/helper/GetAndSyncDbInstanceKeyVariables.java @@ -9,6 +9,7 @@ import com.actiontech.dble.backend.datasource.PhysicalDbInstance; import com.actiontech.dble.backend.mysql.VersionUtil; import com.actiontech.dble.config.Isolations; import com.actiontech.dble.config.model.SystemConfig; +import com.actiontech.dble.meta.ReloadLogHelper; import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler; import com.actiontech.dble.sqlengine.OneTimeConnJob; import com.actiontech.dble.sqlengine.SQLQueryResult; @@ -79,6 +80,7 @@ public class GetAndSyncDbInstanceKeyVariables implements Callable LOGGER.warn("test conn Interrupted:", e); } finally { lock.unlock(); + ReloadLogHelper.debug("get key variables :{},dbInstance:{},result:{}", LOGGER, sql.toString(), ds, keyVariables); } return keyVariables; } diff --git a/src/main/java/com/actiontech/dble/config/model/db/DbGroupConfig.java b/src/main/java/com/actiontech/dble/config/model/db/DbGroupConfig.java index 186f495ce..ac248f9ff 100644 --- a/src/main/java/com/actiontech/dble/config/model/db/DbGroupConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/db/DbGroupConfig.java @@ -10,6 +10,8 @@ import com.actiontech.dble.config.model.db.type.DataBaseType; import com.actiontech.dble.config.util.ConfigException; import com.actiontech.dble.util.StringUtil; +import java.util.Arrays; +import java.util.Objects; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -140,4 +142,37 @@ public class DbGroupConfig { public DataBaseType instanceDatabaseType() { return writeInstanceConfig.getDataBaseType(); } + + public boolean equalsBaseInfo(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DbGroupConfig that = (DbGroupConfig) o; + + return rwSplitMode == that.rwSplitMode && + isShowSlaveSql == that.isShowSlaveSql && + isSelectReadOnlySql == that.isSelectReadOnlySql && + delayThreshold == that.delayThreshold && + heartbeatTimeout == that.heartbeatTimeout && + errorRetryCount == that.errorRetryCount && + disableHA == that.disableHA && + Objects.equals(name, that.name) && + Objects.equals(heartbeatSQL, that.heartbeatSQL); + } + + @Override + public String toString() { + return "DbGroupConfig{" + + "name='" + name + '\'' + + ", rwSplitMode=" + rwSplitMode + + ", writeInstanceConfig=" + writeInstanceConfig + + ", readInstanceConfigs=" + Arrays.toString(readInstanceConfigs) + + ", heartbeatSQL='" + heartbeatSQL + '\'' + + ", isShowSlaveSql=" + isShowSlaveSql + + ", isSelectReadOnlySql=" + isSelectReadOnlySql + + ", delayThreshold=" + delayThreshold + + ", heartbeatTimeout=" + heartbeatTimeout + + ", errorRetryCount=" + errorRetryCount + + ", disableHA=" + disableHA + + '}'; + } } diff --git a/src/main/java/com/actiontech/dble/config/model/db/DbInstanceConfig.java b/src/main/java/com/actiontech/dble/config/model/db/DbInstanceConfig.java index 9f0d1f425..00fc1e421 100644 --- a/src/main/java/com/actiontech/dble/config/model/db/DbInstanceConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/db/DbInstanceConfig.java @@ -7,6 +7,8 @@ package com.actiontech.dble.config.model.db; import com.actiontech.dble.config.model.db.type.DataBaseType; +import java.util.Objects; + public class DbInstanceConfig { private final String instanceName; @@ -155,4 +157,30 @@ public class DbInstanceConfig { return "DbInstanceConfig [hostName=" + instanceName + ", url=" + url + "]"; } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DbInstanceConfig that = (DbInstanceConfig) o; + return port == that.port && + readWeight == that.readWeight && + disabled == that.disabled && + primary == that.primary && + maxCon == that.maxCon && + minCon == that.minCon && + usingDecrypt == that.usingDecrypt && + dataBaseType.equals(((DbInstanceConfig) o).dataBaseType) && + Objects.equals(instanceName, that.instanceName) && + Objects.equals(ip, that.ip) && + Objects.equals(url, that.url) && + Objects.equals(user, that.user) && + Objects.equals(password, that.password) && + Objects.equals(id, that.id) && + Objects.equals(poolConfig, that.poolConfig); + } + + @Override + public int hashCode() { + return Objects.hash(instanceName, ip, port, url, user, password, readWeight, id, disabled, primary, maxCon, minCon, poolConfig, usingDecrypt, dataBaseType); + } } diff --git a/src/main/java/com/actiontech/dble/config/model/user/ManagerUserConfig.java b/src/main/java/com/actiontech/dble/config/model/user/ManagerUserConfig.java index dea631af6..be68745ba 100644 --- a/src/main/java/com/actiontech/dble/config/model/user/ManagerUserConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/user/ManagerUserConfig.java @@ -11,6 +11,7 @@ import com.actiontech.dble.services.manager.information.ManagerSchemaInfo; import com.actiontech.dble.util.StringUtil; import java.sql.SQLException; +import java.util.Objects; public class ManagerUserConfig extends UserConfig { private final boolean readOnly; @@ -52,6 +53,20 @@ public class ManagerUserConfig extends UserConfig { return 0; } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + ManagerUserConfig that = (ManagerUserConfig) o; + return readOnly == that.readOnly; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), readOnly); + } + public boolean equalsBaseInfo(ManagerUserConfig managerUserConfig) { return super.equalsBaseInfo(managerUserConfig) && this.readOnly == managerUserConfig.isReadOnly(); diff --git a/src/main/java/com/actiontech/dble/config/model/user/RwSplitUserConfig.java b/src/main/java/com/actiontech/dble/config/model/user/RwSplitUserConfig.java index 955ac11f2..e538bb983 100644 --- a/src/main/java/com/actiontech/dble/config/model/user/RwSplitUserConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/user/RwSplitUserConfig.java @@ -11,6 +11,7 @@ import com.actiontech.dble.services.mysqlauthenticate.MysqlDatabaseHandler; import com.actiontech.dble.util.StringUtil; import com.alibaba.druid.wall.WallProvider; +import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -48,4 +49,17 @@ public class RwSplitUserConfig extends SingleDbGroupUserConfig { return exist ? 0 : ErrorCode.ER_BAD_DB_ERROR; } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + RwSplitUserConfig that = (RwSplitUserConfig) o; + return Objects.equals(dbGroup, that.dbGroup); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), dbGroup); + } } diff --git a/src/main/java/com/actiontech/dble/config/model/user/ServerUserConfig.java b/src/main/java/com/actiontech/dble/config/model/user/ServerUserConfig.java index befa9e212..c86176c24 100644 --- a/src/main/java/com/actiontech/dble/config/model/user/ServerUserConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/user/ServerUserConfig.java @@ -8,6 +8,8 @@ package com.actiontech.dble.config.model.user; import com.actiontech.dble.util.StringUtil; import com.alibaba.druid.wall.WallProvider; +import java.util.Objects; + public abstract class ServerUserConfig extends UserConfig { private final String tenant; private final WallProvider blacklist; @@ -26,6 +28,20 @@ public abstract class ServerUserConfig extends UserConfig { return blacklist; } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + ServerUserConfig that = (ServerUserConfig) o; + return Objects.equals(tenant, that.tenant) && + isEquals(this.blacklist, that.blacklist); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), tenant, blacklist); + } public boolean equalsBaseInfo(ServerUserConfig serverUserConfig) { return super.equalsBaseInfo(serverUserConfig) && diff --git a/src/main/java/com/actiontech/dble/config/model/user/ShardingUserConfig.java b/src/main/java/com/actiontech/dble/config/model/user/ShardingUserConfig.java index ff3a61dac..08bcc2ed8 100644 --- a/src/main/java/com/actiontech/dble/config/model/user/ShardingUserConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/user/ShardingUserConfig.java @@ -14,6 +14,7 @@ import com.alibaba.druid.wall.WallProvider; import java.sql.SQLException; import java.util.HashSet; +import java.util.Objects; import java.util.Set; public class ShardingUserConfig extends ServerUserConfig { @@ -59,7 +60,7 @@ public class ShardingUserConfig extends ServerUserConfig { throw new SQLException(msg, "42S02", ErrorCode.ER_NO_SUCH_TABLE); } if (!schemas.contains(schemaInfo.getSchema())) { - String msg = "Access denied for user '" + user + "' to database '" + schemaInfo.getSchema() + "'"; + String msg = "Access denied for user '" + user.getFullName() + "' to database '" + schemaInfo.getSchema() + "'"; throw new SQLException(msg, "HY000", ErrorCode.ER_DBACCESS_DENIED_ERROR); } schemaInfo.setSchemaConfig(schemaConfig); @@ -98,4 +99,19 @@ public class ShardingUserConfig extends ServerUserConfig { } } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + ShardingUserConfig that = (ShardingUserConfig) o; + return readOnly == that.readOnly && + Objects.equals(schemas, that.schemas) && + isEquals(privilegesConfig, that.privilegesConfig); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), readOnly, schemas, privilegesConfig); + } } diff --git a/src/main/java/com/actiontech/dble/config/model/user/UserConfig.java b/src/main/java/com/actiontech/dble/config/model/user/UserConfig.java index 419adb784..e2ea5251c 100644 --- a/src/main/java/com/actiontech/dble/config/model/user/UserConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/user/UserConfig.java @@ -12,6 +12,7 @@ import com.actiontech.dble.util.StringUtil; import java.sql.SQLException; import java.util.Arrays; import java.util.HashSet; +import java.util.Objects; import java.util.Set; public class UserConfig { @@ -90,6 +91,23 @@ public class UserConfig { return 0; } + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + UserConfig that = (UserConfig) o; + return isEncrypt == that.isEncrypt && + maxCon == that.maxCon && + Objects.equals(name, that.name) && + Objects.equals(password, that.password) && + Objects.equals(whiteIPs, that.whiteIPs); + } + + @Override + public int hashCode() { + return Objects.hash(name, password, isEncrypt, whiteIPs, maxCon); + } + public boolean equalsBaseInfo(UserConfig userConfig) { return StringUtil.equalsWithEmpty(this.name, userConfig.getName()) && StringUtil.equalsWithEmpty(this.password, userConfig.getPassword()) && diff --git a/src/main/java/com/actiontech/dble/config/model/user/UserName.java b/src/main/java/com/actiontech/dble/config/model/user/UserName.java index 050b437cd..11c0daf21 100644 --- a/src/main/java/com/actiontech/dble/config/model/user/UserName.java +++ b/src/main/java/com/actiontech/dble/config/model/user/UserName.java @@ -28,6 +28,13 @@ public class UserName { @Override public String toString() { + return "UserName{" + + "name='" + name + '\'' + + ", tenant='" + tenant + '\'' + + '}'; + } + + public String getFullName() { if (tenant == null) { return name; } diff --git a/src/main/java/com/actiontech/dble/config/model/user/UserPrivilegesConfig.java b/src/main/java/com/actiontech/dble/config/model/user/UserPrivilegesConfig.java index 63e3bd30a..003c7d002 100644 --- a/src/main/java/com/actiontech/dble/config/model/user/UserPrivilegesConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/user/UserPrivilegesConfig.java @@ -5,10 +5,7 @@ package com.actiontech.dble.config.model.user; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; +import java.util.*; /** @@ -52,8 +49,9 @@ public class UserPrivilegesConfig { schemaPrivileges = newSchemaPrivileges; } - public boolean equalsBaseInfo(UserPrivilegesConfig userPrivilegesConfig) { + if (this == userPrivilegesConfig) return true; + if (userPrivilegesConfig == null || getClass() != userPrivilegesConfig.getClass()) return false; boolean equalTableInfo1 = this.schemaPrivileges.entrySet().stream().allMatch(schemaPrivilegeEntry -> null != userPrivilegesConfig.getSchemaPrivilege(schemaPrivilegeEntry.getKey()) && userPrivilegesConfig.getSchemaPrivilege(schemaPrivilegeEntry.getKey()).equalsBaseInfo(schemaPrivilegeEntry.getValue())); boolean equalTableInfo2 = userPrivilegesConfig.getSchemaPrivileges().entrySet().stream().allMatch(schemaPrivilegeEntry -> null != this.schemaPrivileges.get(schemaPrivilegeEntry.getKey()) && this.schemaPrivileges.get(schemaPrivilegeEntry.getKey()).equalsBaseInfo(schemaPrivilegeEntry.getValue())); return this.check == userPrivilegesConfig.isCheck() && diff --git a/src/main/java/com/actiontech/dble/config/util/ConfigUtil.java b/src/main/java/com/actiontech/dble/config/util/ConfigUtil.java index 30d8f6d6e..1aefb9dd0 100644 --- a/src/main/java/com/actiontech/dble/config/util/ConfigUtil.java +++ b/src/main/java/com/actiontech/dble/config/util/ConfigUtil.java @@ -5,15 +5,20 @@ */ package com.actiontech.dble.config.util; +import com.actiontech.dble.DbleServer; import com.actiontech.dble.backend.datasource.PhysicalDbGroup; import com.actiontech.dble.backend.datasource.PhysicalDbInstance; import com.actiontech.dble.backend.datasource.ShardingNode; import com.actiontech.dble.backend.mysql.VersionUtil; +import com.actiontech.dble.config.DbleTempConfig; import com.actiontech.dble.config.helper.GetAndSyncDbInstanceKeyVariables; import com.actiontech.dble.config.helper.KeyVariables; import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.config.model.db.type.DataBaseType; +import com.actiontech.dble.services.manager.response.ReloadConfig; import com.actiontech.dble.singleton.TraceManager; +import com.actiontech.dble.util.StringUtil; +import com.google.common.collect.Maps; import org.apache.logging.log4j.util.Strings; import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; @@ -22,6 +27,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.*; import java.util.concurrent.*; +import java.util.stream.Collectors; public final class ConfigUtil { private static final Logger LOGGER = LoggerFactory.getLogger(ConfigUtil.class); @@ -68,14 +74,92 @@ public final class ConfigUtil { } } - private static String[] getShardingNodeSchemasOfDbGroup(String dbGroup, Map shardingNodeMap) { + private static ArrayList getShardingNodeSchemasOfDbGroup(String dbGroup, Map shardingNodeMap) { ArrayList schemaList = new ArrayList<>(30); for (ShardingNode dn : shardingNodeMap.values()) { if (dn.getDbGroup() != null && dn.getDbGroup().getGroupName().equals(dbGroup)) { schemaList.add(dn.getDatabase()); } } - return schemaList.toArray(new String[schemaList.size()]); + return schemaList; + } + + public static String getAndSyncKeyVariables(List changeItemList, boolean needSync) throws Exception { + TraceManager.TraceObject traceObject = TraceManager.threadTrace("sync-key-variables"); + try { + String msg = null; + List needCheckItemList = changeItemList.stream() + //add dbInstance or add dbGroup or (update dbInstance and need testConn) + .filter(changeItem -> ((changeItem.getItem() instanceof PhysicalDbInstance || changeItem.getItem() instanceof PhysicalDbGroup) && changeItem.getType() == 1) || + (changeItem.getItem() instanceof PhysicalDbInstance && changeItem.getType() == 2 && changeItem.isAffectTestConn())) + .collect(Collectors.toList()); + if (changeItemList.size() == 0 || needCheckItemList == null || needCheckItemList.isEmpty()) { + //with no dbGroups, do not check the variables + return null; + } + Map> keyVariablesTaskMap = Maps.newHashMap(); + getAndSyncKeyVariablesForDataSources(needCheckItemList, keyVariablesTaskMap, needSync); + + boolean lowerCase = false; + boolean isFirst = true; + Set firstGroup = new HashSet<>(); + Set secondGroup = new HashSet<>(); + int minNodePacketSize = Integer.MAX_VALUE; + int minVersion = Integer.parseInt(SystemConfig.getInstance().getFakeMySQLVersion().substring(0, 1)); + for (Map.Entry> entry : keyVariablesTaskMap.entrySet()) { + String dataSourceName = entry.getKey(); + Future future = entry.getValue(); + KeyVariables keyVariables = future.get(); + if (keyVariables != null) { + if (isFirst) { + lowerCase = keyVariables.isLowerCase(); + if (lowerCase != DbleServer.getInstance().getConfig().isLowerCase()) { + secondGroup.add(dataSourceName); + } + isFirst = false; + firstGroup.add(dataSourceName); + } else if (keyVariables.isLowerCase() != lowerCase) { + secondGroup.add(dataSourceName); + } + minNodePacketSize = Math.min(minNodePacketSize, keyVariables.getMaxPacketSize()); + int version = Integer.parseInt(keyVariables.getVersion().substring(0, 1)); + minVersion = Math.min(minVersion, version); + } + } + if (minNodePacketSize < SystemConfig.getInstance().getMaxPacketSize() + KeyVariables.MARGIN_PACKET_SIZE) { + SystemConfig.getInstance().setMaxPacketSize(minNodePacketSize - KeyVariables.MARGIN_PACKET_SIZE); + msg = "dble's maxPacketSize will be set to (the min of all dbGroup's max_allowed_packet) - " + KeyVariables.MARGIN_PACKET_SIZE + ":" + (minNodePacketSize - KeyVariables.MARGIN_PACKET_SIZE); + LOGGER.warn(msg); + } + if (minVersion < Integer.parseInt(SystemConfig.getInstance().getFakeMySQLVersion().substring(0, 1))) { + throw new ConfigException("the dble version[=" + SystemConfig.getInstance().getFakeMySQLVersion() + "] cannot be higher than the minimum version of the backend mysql node,pls check the backend mysql node."); + } + DbleTempConfig.getInstance().setLowerCase(lowerCase); + if (secondGroup.size() != 0) { + // if all datasoure's lower case are not equal, throw exception + StringBuilder sb = new StringBuilder("The values of lower_case_table_names for backend MySQLs are different."); + String firstGroupValue; + String secondGroupValue; + if (lowerCase) { + firstGroupValue = " not 0 :"; + secondGroupValue = " 0 :"; + } else { + firstGroupValue = " 0 :"; + secondGroupValue = " not 0 :"; + } + sb.append("These MySQL's value is"); + sb.append(firstGroupValue); + sb.append(Strings.join(firstGroup, ',')); + sb.append(".And these MySQL's value is"); + sb.append(secondGroupValue); + sb.append(Strings.join(secondGroup, ',')); + sb.append("."); + throw new IOException(sb.toString()); + } + return msg; + } finally { + TraceManager.finishSpan(traceObject); + } } public static String getAndSyncKeyVariables(Map dbGroups, boolean needSync) throws Exception { @@ -91,10 +175,15 @@ public final class ConfigUtil { clickHouseDbGroups.put(k, v); } }); - - sb.append(getMysqlSyncKeyVariables(mysqlDbGroups, needSync)); - sb.append(getClickHouseSyncKeyVariables(clickHouseDbGroups, needSync)); - return sb.toString(); + String mysqlSyncKeyVariables = getMysqlSyncKeyVariables(mysqlDbGroups, needSync); + if (!StringUtil.isEmpty(mysqlSyncKeyVariables)) { + sb.append(mysqlSyncKeyVariables); + } + String clickHouseSyncKeyVariables = getClickHouseSyncKeyVariables(clickHouseDbGroups, needSync); + if (!StringUtil.isEmpty(clickHouseSyncKeyVariables)) { + sb.append(clickHouseSyncKeyVariables); + } + return sb.length() == 0 ? null : sb.toString(); } finally { TraceManager.finishSpan(traceObject); } @@ -145,6 +234,7 @@ public final class ConfigUtil { if (minVersion < VersionUtil.getMajorVersion(SystemConfig.getInstance().getFakeMySQLVersion())) { throw new ConfigException("the dble version[=" + SystemConfig.getInstance().getFakeMySQLVersion() + "] cannot be higher than the minimum version of the backend mysql node,pls check the backend mysql node."); } + DbleTempConfig.getInstance().setLowerCase(lowerCase); if (secondGroup.size() != 0) { // if all datasoure's lower case are not equal, throw exception StringBuilder sb = new StringBuilder("The values of lower_case_table_names for backend MySQLs are different."); @@ -239,6 +329,41 @@ public final class ConfigUtil { } + private static void getAndSyncKeyVariablesForDataSources(List changeItemList, Map> keyVariablesTaskMap, + boolean needSync) throws InterruptedException { + ExecutorService service = Executors.newFixedThreadPool(changeItemList.size()); + for (ReloadConfig.ChangeItem changeItem : changeItemList) { + if (changeItem.getItem() instanceof PhysicalDbInstance) { + PhysicalDbInstance ds = (PhysicalDbInstance) changeItem.getItem(); + if (ds.isDisabled() || !ds.isTestConnSuccess() || ds.isFakeNode()) { + continue; + } + getKeyVariablesForDataSource(service, ds, ds.getDbGroupConfig().getName(), keyVariablesTaskMap, needSync); + } else if (changeItem.getItem() instanceof PhysicalDbGroup) { + PhysicalDbGroup dbGroup = (PhysicalDbGroup) changeItem.getItem(); + for (PhysicalDbInstance ds : dbGroup.getAllDbInstanceMap().values()) { + if (ds.isDisabled() || !ds.isTestConnSuccess() || ds.isFakeNode()) { + continue; + } + getKeyVariablesForDataSource(service, ds, ds.getDbGroupConfig().getName(), keyVariablesTaskMap, needSync); + } + } + } + service.shutdown(); + int i = 0; + while (!service.awaitTermination(100, TimeUnit.MILLISECONDS)) { + if (LOGGER.isDebugEnabled()) { + if (i == 0) { + LOGGER.debug("wait to get all dbInstances's get key variable"); + } + i++; + if (i == 100) { //log every 10 seconds + i = 0; + } + } + } + } + private static void getAndSyncKeyVariablesForDataSources(Map dbGroups, Map> keyVariablesTaskMap, boolean needSync) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(dbGroups.size()); for (Map.Entry entry : dbGroups.entrySet()) { diff --git a/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogEntry.java b/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogEntry.java index 526cd108d..14f2be368 100644 --- a/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogEntry.java +++ b/src/main/java/com/actiontech/dble/log/slow/SlowQueryLogEntry.java @@ -42,7 +42,7 @@ public class SlowQueryLogEntry { sb.append("# User@Host: "); sb.append(user); sb.append("["); - sb.append(user); + sb.append(user.getFullName()); sb.append("] @ ["); sb.append(clientIp); sb.append("] Id: "); diff --git a/src/main/java/com/actiontech/dble/meta/ReloadLogHelper.java b/src/main/java/com/actiontech/dble/meta/ReloadLogHelper.java index cb7a8c637..b3658cc36 100644 --- a/src/main/java/com/actiontech/dble/meta/ReloadLogHelper.java +++ b/src/main/java/com/actiontech/dble/meta/ReloadLogHelper.java @@ -54,6 +54,39 @@ public class ReloadLogHelper { LOGGER.info(getStage() + message); } + public static void debug(String message, Logger logger) { + if (!logger.isDebugEnabled()) { + return; + } + if (ReloadManager.getReloadInstance().getStatus() != null) { + logger.debug(ReloadManager.getReloadInstance().getStatus().getLogStage() + message); + } else { + logger.debug(message); + } + } + + public static void debug(String message, Logger logger, Object val) { + if (!logger.isDebugEnabled()) { + return; + } + if (ReloadManager.getReloadInstance().getStatus() != null) { + logger.debug(ReloadManager.getReloadInstance().getStatus().getLogStage() + message, val); + } else { + logger.debug(message, val); + } + } + + public static void debug(String message, Logger logger, Object... val) { + if (!logger.isDebugEnabled()) { + return; + } + if (ReloadManager.getReloadInstance().getStatus() != null) { + logger.debug(ReloadManager.getReloadInstance().getStatus().getLogStage() + message, val); + } else { + logger.debug(message, val); + } + } + public static void warn(String message, Logger logger) { if (ReloadManager.getReloadInstance().getStatus() != null) { logger.info(ReloadManager.getReloadInstance().getStatus().getLogStage() + message); @@ -79,7 +112,6 @@ public class ReloadLogHelper { } - private String getStage() { return reload == null ? "" : reload.getLogStage(); } diff --git a/src/main/java/com/actiontech/dble/net/IOProcessor.java b/src/main/java/com/actiontech/dble/net/IOProcessor.java index 47993cd14..612817a8b 100644 --- a/src/main/java/com/actiontech/dble/net/IOProcessor.java +++ b/src/main/java/com/actiontech/dble/net/IOProcessor.java @@ -7,6 +7,7 @@ package com.actiontech.dble.net; import com.actiontech.dble.backend.datasource.PhysicalDbGroup; +import com.actiontech.dble.backend.datasource.PhysicalDbInstance; import com.actiontech.dble.backend.mysql.nio.handler.transaction.xa.stage.XAStage; import com.actiontech.dble.backend.mysql.xa.TxState; import com.actiontech.dble.buffer.BufferPool; @@ -48,6 +49,7 @@ public final class IOProcessor { // after reload @@config_all ,old back ends connections stored in backends_old public static final ConcurrentLinkedQueue BACKENDS_OLD = new ConcurrentLinkedQueue<>(); public static final ConcurrentLinkedQueue BACKENDS_OLD_GROUP = new ConcurrentLinkedQueue<>(); + public static final ConcurrentLinkedQueue BACKENDS_OLD_INSTANCE = new ConcurrentLinkedQueue<>(); private AtomicInteger frontEndsLength = new AtomicInteger(0); diff --git a/src/main/java/com/actiontech/dble/net/ssl/GMSslWrapper.java b/src/main/java/com/actiontech/dble/net/ssl/GMSslWrapper.java index e2dc21555..b0b1b0bf9 100644 --- a/src/main/java/com/actiontech/dble/net/ssl/GMSslWrapper.java +++ b/src/main/java/com/actiontech/dble/net/ssl/GMSslWrapper.java @@ -24,8 +24,6 @@ public class GMSslWrapper extends OpenSSLWrapper { public boolean initContext() { try { - Security.insertProviderAt((Provider) Class.forName("cn.gmssl.jce.provider.GMJCE").newInstance(), 1); - Security.insertProviderAt((Provider) Class.forName("cn.gmssl.jsse.provider.GMJSSE").newInstance(), 2); String pfxfile = SystemConfig.getInstance().getGmsslBothPfx(); String pwd = SystemConfig.getInstance().getGmsslBothPfxPwd(); @@ -42,6 +40,8 @@ public class GMSslWrapper extends OpenSSLWrapper { LOGGER.warn("Neither [gmsslRcaPem] nor [gmsslOcaPem] are empty."); return false; } + Security.insertProviderAt((Provider) Class.forName("cn.gmssl.jce.provider.GMJCE").newInstance(), 1); + Security.insertProviderAt((Provider) Class.forName("cn.gmssl.jsse.provider.GMJSSE").newInstance(), 2); // load key pair KeyManager[] kms = createKeyManagers(pfxfile, pwd); diff --git a/src/main/java/com/actiontech/dble/server/handler/FieldListHandler.java b/src/main/java/com/actiontech/dble/server/handler/FieldListHandler.java index 4ac611795..c64aa9354 100644 --- a/src/main/java/com/actiontech/dble/server/handler/FieldListHandler.java +++ b/src/main/java/com/actiontech/dble/server/handler/FieldListHandler.java @@ -41,7 +41,7 @@ public final class FieldListHandler { ShardingUserConfig user = (ShardingUserConfig) (DbleServer.getInstance().getConfig().getUsers().get(service.getUser())); if (user == null || !user.getSchemas().contains(cSchema)) { - service.writeErrMessage("42000", "Access denied for user '" + service.getUser() + "' to database '" + cSchema + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR); + service.writeErrMessage("42000", "Access denied for user '" + service.getUser().getFullName() + "' to database '" + cSchema + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR); return; } diff --git a/src/main/java/com/actiontech/dble/server/handler/SelectInformationSchemaColumnsHandler.java b/src/main/java/com/actiontech/dble/server/handler/SelectInformationSchemaColumnsHandler.java index 446691884..d14fdaf56 100644 --- a/src/main/java/com/actiontech/dble/server/handler/SelectInformationSchemaColumnsHandler.java +++ b/src/main/java/com/actiontech/dble/server/handler/SelectInformationSchemaColumnsHandler.java @@ -108,7 +108,7 @@ public class SelectInformationSchemaColumnsHandler { ShardingUserConfig userConfig = (ShardingUserConfig) (DbleServer.getInstance().getConfig().getUsers().get(shardingService.getUser())); if (userConfig == null || !userConfig.getSchemas().contains(cSchema)) { - shardingService.writeErrMessage("42000", "Access denied for user '" + shardingService.getUser() + "' to database '" + cSchema + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR); + shardingService.writeErrMessage("42000", "Access denied for user '" + shardingService.getUser().getFullName() + "' to database '" + cSchema + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR); return; } diff --git a/src/main/java/com/actiontech/dble/server/handler/UseHandler.java b/src/main/java/com/actiontech/dble/server/handler/UseHandler.java index bb6370cd8..a93128719 100644 --- a/src/main/java/com/actiontech/dble/server/handler/UseHandler.java +++ b/src/main/java/com/actiontech/dble/server/handler/UseHandler.java @@ -29,7 +29,7 @@ public final class UseHandler { } ShardingUserConfig userConfig = service.getUserConfig(); if (!userConfig.getSchemas().contains(schema)) { - String msg = "Access denied for user '" + service.getUser() + "' to database '" + schema + "'"; + String msg = "Access denied for user '" + service.getUser().getFullName() + "' to database '" + schema + "'"; service.writeErrMessage(ErrorCode.ER_DBACCESS_DENIED_ERROR, msg); return; } diff --git a/src/main/java/com/actiontech/dble/server/response/FieldList.java b/src/main/java/com/actiontech/dble/server/response/FieldList.java index 0df67910f..bb3487f8a 100644 --- a/src/main/java/com/actiontech/dble/server/response/FieldList.java +++ b/src/main/java/com/actiontech/dble/server/response/FieldList.java @@ -38,7 +38,7 @@ public final class FieldList { ShardingUserConfig user = (ShardingUserConfig) (DbleServer.getInstance().getConfig().getUsers().get(service.getUser())); if (user == null || !user.getSchemas().contains(cSchema)) { - service.writeErrMessage("42000", "Access denied for user '" + service.getUser() + "' to database '" + cSchema + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR); + service.writeErrMessage("42000", "Access denied for user '" + service.getUser().getFullName() + "' to database '" + cSchema + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR); return; } diff --git a/src/main/java/com/actiontech/dble/server/response/SelectCurrentUser.java b/src/main/java/com/actiontech/dble/server/response/SelectCurrentUser.java index 2d6da7bcb..8f0d0e115 100644 --- a/src/main/java/com/actiontech/dble/server/response/SelectCurrentUser.java +++ b/src/main/java/com/actiontech/dble/server/response/SelectCurrentUser.java @@ -54,7 +54,7 @@ public final class SelectCurrentUser implements InnerFuncResponse { } private static byte[] getUser(ShardingService service) { - return StringUtil.encode(service.getUser() + "@%", service.getCharset().getResults()); + return StringUtil.encode(service.getUser().getFullName() + "@%", service.getCharset().getResults()); } public static byte setCurrentPacket(ShardingService service) { diff --git a/src/main/java/com/actiontech/dble/server/response/SelectUser.java b/src/main/java/com/actiontech/dble/server/response/SelectUser.java index 56beea907..93eaffe71 100644 --- a/src/main/java/com/actiontech/dble/server/response/SelectUser.java +++ b/src/main/java/com/actiontech/dble/server/response/SelectUser.java @@ -54,7 +54,7 @@ public final class SelectUser implements InnerFuncResponse { } private static byte[] getUser(ShardingService service) { - return StringUtil.encode(service.getUser().toString() + '@' + service.getConnection().getHost(), service.getCharset().getResults()); + return StringUtil.encode(service.getUser().getFullName() + '@' + service.getConnection().getHost(), service.getCharset().getResults()); } diff --git a/src/main/java/com/actiontech/dble/server/response/ShowDbleProcessList.java b/src/main/java/com/actiontech/dble/server/response/ShowDbleProcessList.java index 9780d1e4d..92ea7b0ad 100644 --- a/src/main/java/com/actiontech/dble/server/response/ShowDbleProcessList.java +++ b/src/main/java/com/actiontech/dble/server/response/ShowDbleProcessList.java @@ -192,7 +192,7 @@ public final class ShowDbleProcessList { // BconnID row.add(threadId == null ? StringUtil.encode(NULL_VAL, charset) : LongUtil.toBytes(threadId)); // User - row.add(StringUtil.encode(fc.getFrontEndService().getUser().toString(), charset)); + row.add(StringUtil.encode(fc.getFrontEndService().getUser().getFullName(), charset)); // Front_Host row.add(StringUtil.encode(fc.getHost() + ":" + fc.getLocalPort(), charset)); // db diff --git a/src/main/java/com/actiontech/dble/server/response/ShowTables.java b/src/main/java/com/actiontech/dble/server/response/ShowTables.java index 72f239256..77c4dfd3c 100644 --- a/src/main/java/com/actiontech/dble/server/response/ShowTables.java +++ b/src/main/java/com/actiontech/dble/server/response/ShowTables.java @@ -78,7 +78,7 @@ public final class ShowTables { ShardingUserConfig user = (ShardingUserConfig) (DbleServer.getInstance().getConfig().getUsers().get(shardingService.getUser())); if (user == null || !user.getSchemas().contains(cSchema)) { - shardingService.writeErrMessage("42000", "Access denied for user '" + shardingService.getUser() + "' to database '" + cSchema + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR); + shardingService.writeErrMessage("42000", "Access denied for user '" + shardingService.getUser().getFullName() + "' to database '" + cSchema + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR); return; } //if sharding has default single node ,show tables will send to backend diff --git a/src/main/java/com/actiontech/dble/server/variables/VarsExtractorHandler.java b/src/main/java/com/actiontech/dble/server/variables/VarsExtractorHandler.java index d40a9a8f9..0068628fd 100644 --- a/src/main/java/com/actiontech/dble/server/variables/VarsExtractorHandler.java +++ b/src/main/java/com/actiontech/dble/server/variables/VarsExtractorHandler.java @@ -7,6 +7,7 @@ package com.actiontech.dble.server.variables; import com.actiontech.dble.backend.datasource.PhysicalDbGroup; import com.actiontech.dble.backend.datasource.PhysicalDbInstance; +import com.actiontech.dble.meta.ReloadLogHelper; import com.actiontech.dble.singleton.TraceManager; import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler; import com.actiontech.dble.sqlengine.OneTimeConnJob; @@ -30,6 +31,7 @@ public class VarsExtractorHandler { private Lock lock; private Condition done; private Map dbGroups; + private PhysicalDbInstance physicalDbInstance; private volatile SystemVariables systemVariables = null; public VarsExtractorHandler(Map dbGroups) { @@ -39,13 +41,23 @@ public class VarsExtractorHandler { this.done = lock.newCondition(); } + public VarsExtractorHandler(PhysicalDbInstance physicalDbInstance) { + this.physicalDbInstance = physicalDbInstance; + this.extracting = false; + this.lock = new ReentrantLock(); + this.done = lock.newCondition(); + } + + public SystemVariables execute() { TraceManager.TraceObject traceObject = TraceManager.threadTrace("get-system-variables-from-backend"); try { OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(MYSQL_SHOW_VARIABLES_COLS, new MysqlVarsListener(this)); - PhysicalDbInstance ds = getPhysicalDbInstance(); - if (ds != null) { - OneTimeConnJob sqlJob = new OneTimeConnJob(MYSQL_SHOW_VARIABLES, null, resultHandler, ds); + if (null == this.physicalDbInstance) { + this.physicalDbInstance = getPhysicalDbInstance(); + } + if (physicalDbInstance != null) { + OneTimeConnJob sqlJob = new OneTimeConnJob(MYSQL_SHOW_VARIABLES, null, resultHandler, physicalDbInstance); sqlJob.run(); waitDone(); } else { @@ -54,11 +66,16 @@ public class VarsExtractorHandler { } return systemVariables; } finally { + ReloadLogHelper.debug("get system variables :{},dbInstance:{},result:{}", LOGGER, MYSQL_SHOW_VARIABLES, physicalDbInstance, systemVariables); + this.physicalDbInstance = null; TraceManager.finishSpan(traceObject); } } private PhysicalDbInstance getPhysicalDbInstance() { + if (dbGroups == null || dbGroups.isEmpty()) { + return null; + } PhysicalDbInstance ds = null; List dbGroupList = dbGroups.values().stream().filter(dbGroup -> dbGroup.getDbGroupConfig().existInstanceProvideVars()).collect(Collectors.toList()); for (PhysicalDbGroup dbGroup : dbGroupList) { @@ -126,7 +143,7 @@ public class VarsExtractorHandler { } private void tryInitVars() { - if (dbGroups.isEmpty()) { + if (dbGroups == null || dbGroups.isEmpty()) { return; } List dbGroupList = dbGroups.values().stream().filter(dbGroup -> dbGroup.getDbGroupConfig().existInstanceProvideVars()).collect(Collectors.toList()); diff --git a/src/main/java/com/actiontech/dble/services/manager/information/ManagerSchemaInfo.java b/src/main/java/com/actiontech/dble/services/manager/information/ManagerSchemaInfo.java index 4888bf885..8747ec727 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/ManagerSchemaInfo.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/ManagerSchemaInfo.java @@ -74,6 +74,7 @@ public final class ManagerSchemaInfo { registerTable(new DbleFrontConnectionsAssociateThread()); registerTable(new DbleBackendConnectionsAssociateThread()); registerTable(new DbleClusterRenewThread()); + registerTable(new RecyclingResource()); } private void initViews() { diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDbGroup.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDbGroup.java index 9611ac37c..f8548afdc 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDbGroup.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDbGroup.java @@ -221,7 +221,7 @@ public class DbleDbGroup extends ManagerWritableTable { DbleRwSplitEntry dbleRwSplitEntry = (DbleRwSplitEntry) ManagerSchemaInfo.getInstance().getTables().get(DbleRwSplitEntry.TABLE_NAME); boolean existUser = dbleRwSplitEntry.getRows().stream().anyMatch(entry -> entry.get(DbleRwSplitEntry.COLUMN_DB_GROUP).equals(affectPk.get(COLUMN_NAME))); if (existUser) { - throw new ConfigException("Cannot delete or update a parent row: a foreign key constraint fails `dble_db_user`(`db_group`) REFERENCES `dble_db_group`(`name`)"); + throw new ConfigException("Cannot delete or update a parent row: a foreign key constraint fails `dble_rw_split_entry`(`db_group`) REFERENCES `dble_db_group`(`name`)"); } //check instance-group DbleDbInstance dbleDbInstance = (DbleDbInstance) ManagerSchemaInfo.getInstance().getTables().get(DbleDbInstance.TABLE_NAME); diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleFlowControl.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleFlowControl.java index fe20decca..04eb5d2ed 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleFlowControl.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleFlowControl.java @@ -72,7 +72,7 @@ public class DbleFlowControl extends ManagerBaseTable { LinkedHashMap row = Maps.newLinkedHashMap(); row.put(COLUMN_CONNECTION_TYPE, "ServerConnection"); row.put(COLUMN_CONNECTION_ID, Long.toString(fc.getId())); - row.put(COLUMN_CONNECTION_INFO, fc.getHost() + ":" + fc.getLocalPort() + "/" + ((FrontendService) service).getSchema() + " user = " + ((FrontendService) service).getUser()); + row.put(COLUMN_CONNECTION_INFO, fc.getHost() + ":" + fc.getLocalPort() + "/" + ((FrontendService) service).getSchema() + " user = " + ((FrontendService) service).getUser().getFullName()); row.put(COLUMN_WRITING_QUEUE_BYTES, Integer.toString(size)); row.put(COLUMN_READING_QUEUE_BYTES, null); row.put(COLUMN_FLOW_CONTROLLED, fc.isFrontWriteFlowControlled() ? "true" : "false"); diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/ProcessList.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/ProcessList.java index f11703d2a..bcc9cc7bb 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/tables/ProcessList.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/ProcessList.java @@ -153,7 +153,7 @@ public class ProcessList extends ManagerBaseTable { // BconnID row.put(COLUMN_MYSQL_ID, conn.getThreadId() + ""); // User - row.put(COLUMN_USER, frontConn.getFrontEndService().getUser().toString()); + row.put(COLUMN_USER, frontConn.getFrontEndService().getUser().getFullName()); // Front_Host row.put(COLUMN_FRONT_HOST, frontConn.getHost() + ":" + frontConn.getLocalPort()); // time @@ -168,7 +168,7 @@ public class ProcessList extends ManagerBaseTable { // Front_Id row.put(COLUMN_FRONT_ID, frontConn.getId() + ""); // User - row.put(COLUMN_USER, frontConn.getFrontEndService().getUser().toString()); + row.put(COLUMN_USER, frontConn.getFrontEndService().getUser().getFullName()); // Front_Host row.put(COLUMN_FRONT_HOST, frontConn.getHost() + ":" + frontConn.getLocalPort()); // time diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/RecyclingResource.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/RecyclingResource.java new file mode 100644 index 000000000..4a4cfee27 --- /dev/null +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/RecyclingResource.java @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2016-2022 ActionTech. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ +package com.actiontech.dble.services.manager.information.tables; + +import com.actiontech.dble.backend.datasource.PhysicalDbGroup; +import com.actiontech.dble.backend.datasource.PhysicalDbInstance; +import com.actiontech.dble.config.Fields; +import com.actiontech.dble.meta.ColumnMeta; +import com.actiontech.dble.net.IOProcessor; +import com.actiontech.dble.net.connection.PooledConnection; +import com.actiontech.dble.services.manager.information.ManagerBaseTable; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import java.util.LinkedHashMap; +import java.util.List; + +public class RecyclingResource extends ManagerBaseTable { + + private static final String TABLE_NAME = "recycling_resource"; + + private static final String COLUMN_TYPE = "type"; + private static final String COLUMN_INFO = "info"; + + public RecyclingResource() { + super(TABLE_NAME, 2); + } + + @Override + protected void initColumnAndType() { + columns.put(COLUMN_TYPE, new ColumnMeta(COLUMN_TYPE, "varchar(64)", false)); + columnsType.put(COLUMN_TYPE, Fields.FIELD_TYPE_VAR_STRING); + columns.put(COLUMN_INFO, new ColumnMeta(COLUMN_INFO, "text", false)); + columnsType.put(COLUMN_INFO, Fields.FIELD_TYPE_STRING); + } + + @Override + protected List> getRows() { + List> result = Lists.newArrayList(); + //dbGroup + for (PhysicalDbGroup dbGroup : IOProcessor.BACKENDS_OLD_GROUP) { + LinkedHashMap map = Maps.newLinkedHashMap(); + map.put(COLUMN_TYPE, "dbGroup"); + map.put(COLUMN_INFO, dbGroup.toString()); + result.add(map); + } + //dbInstance + for (PhysicalDbInstance dbInstance : IOProcessor.BACKENDS_OLD_INSTANCE) { + LinkedHashMap map = Maps.newLinkedHashMap(); + map.put(COLUMN_TYPE, "dbInstance"); + map.put(COLUMN_INFO, dbInstance.toString()); + result.add(map); + } + //backendConnection + for (PooledConnection connection : IOProcessor.BACKENDS_OLD) { + LinkedHashMap map = Maps.newLinkedHashMap(); + map.put(COLUMN_TYPE, "backendConnection"); + map.put(COLUMN_INFO, connection.toString()); + result.add(map); + } + return result; + } +} diff --git a/src/main/java/com/actiontech/dble/services/manager/response/FlowControlList.java b/src/main/java/com/actiontech/dble/services/manager/response/FlowControlList.java index ea58d56c1..8fda1fde5 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/FlowControlList.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/FlowControlList.java @@ -133,7 +133,7 @@ public final class FlowControlList { RowDataPacket row = new RowDataPacket(FIELD_COUNT); row.add(StringUtil.encode("ServerConnection", service.getCharset().getResults())); row.add(LongUtil.toBytes(fc.getId())); - row.add(StringUtil.encode(fc.getHost() + ":" + fc.getLocalPort() + "/" + ((FrontendService) fcService).getSchema() + " user = " + ((FrontendService) fcService).getUser(), service.getCharset().getResults())); + row.add(StringUtil.encode(fc.getHost() + ":" + fc.getLocalPort() + "/" + ((FrontendService) fcService).getSchema() + " user = " + ((FrontendService) fcService).getUser().getFullName(), service.getCharset().getResults())); row.add(LongUtil.toBytes(size)); row.add(null); // not support row.add(fc.isFrontWriteFlowControlled() ? "true".getBytes() : "false".getBytes()); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java b/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java index e4761cb74..a12a56a88 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java @@ -7,7 +7,7 @@ package com.actiontech.dble.services.manager.response; import com.actiontech.dble.DbleServer; import com.actiontech.dble.backend.datasource.PhysicalDbGroup; -import com.actiontech.dble.backend.datasource.PhysicalDbGroupDiff; +import com.actiontech.dble.backend.datasource.PhysicalDbInstance; import com.actiontech.dble.backend.datasource.ShardingNode; import com.actiontech.dble.btrace.provider.ClusterDelayProvider; import com.actiontech.dble.cluster.ClusterHelper; @@ -27,8 +27,10 @@ import com.actiontech.dble.config.model.ClusterConfig; import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.config.model.sharding.SchemaConfig; import com.actiontech.dble.config.model.sharding.table.ERTable; +import com.actiontech.dble.config.model.user.RwSplitUserConfig; import com.actiontech.dble.config.model.user.UserConfig; import com.actiontech.dble.config.model.user.UserName; +import com.actiontech.dble.config.util.ConfigException; import com.actiontech.dble.config.util.ConfigUtil; import com.actiontech.dble.meta.ReloadLogHelper; import com.actiontech.dble.meta.ReloadManager; @@ -45,6 +47,9 @@ import com.actiontech.dble.services.manager.handler.PacketResult; import com.actiontech.dble.singleton.CronScheduler; import com.actiontech.dble.singleton.FrontendUserManager; import com.actiontech.dble.singleton.TraceManager; +import com.google.common.collect.Lists; +import com.google.common.collect.MapDifference; +import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -175,7 +180,6 @@ public final class ReloadConfig { } else { packetResult.setErrorCode(ErrorCode.ER_CLUSTER_RELOAD); } - return; } } finally { lock.writeLock().unlock(); @@ -231,7 +235,6 @@ public final class ReloadConfig { c.writeErrMessage(ErrorCode.ER_YES, sb); } - @Deprecated public static boolean reloadByLocalXml(final int loadAllMode) throws Exception { return reload(loadAllMode, null, null, null, null); } @@ -255,140 +258,348 @@ public final class ReloadConfig { private static boolean reload(final int loadAllMode, RawJson userConfig, RawJson dbConfig, RawJson shardingConfig, RawJson sequenceConfig) throws Exception { TraceManager.TraceObject traceObject = TraceManager.threadTrace("self-reload"); try { - /* - * 1 load new conf - * 1.1 ConfigInitializer init adn check itself - * 1.2 ShardingNode/dbGroup test connection - */ - ReloadLogHelper.info("reload config: load all xml info start", LOGGER); - ConfigInitializer loader; - try { - if (null == userConfig && null == dbConfig && null == shardingConfig && null == sequenceConfig) { - loader = new ConfigInitializer(); - } else { - loader = new ConfigInitializer(userConfig, dbConfig, shardingConfig, sequenceConfig); - } - } catch (Exception e) { - throw new Exception(e.getMessage() == null ? e.toString() : e.getMessage(), e); - } - ReloadLogHelper.info("reload config: load all xml info end", LOGGER); + // load configuration + ConfigInitializer loader = loadConfig(userConfig, dbConfig, shardingConfig, sequenceConfig); - ReloadLogHelper.info("reload config: get variables from random alive dbGroup start", LOGGER); + // compare changes + List changeItemList = compareChange(loader); - try { - loader.testConnection(); - } catch (Exception e) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("just test ,not stop reload, catch exception", e); - } - } + // user in use cannot be deleted + checkUser(changeItemList); boolean forceAllReload = false; - if ((loadAllMode & ManagerParseConfig.OPTR_MODE) != 0) { + //-r forceAllReload = true; } - if (forceAllReload) { - return forceReloadAll(loadAllMode, loader); - } else { - return intelligentReloadAll(loadAllMode, loader); - } - } finally { - TraceManager.finishSpan(traceObject); - } - } + // test connection + testConnection(loader, changeItemList, forceAllReload, loadAllMode); - private static boolean intelligentReloadAll(int loadAllMode, ConfigInitializer loader) throws Exception { - TraceManager.TraceObject traceObject = TraceManager.threadTrace("self-intelligent-reload"); - try { - /* 2.1.1 get diff of dbGroups */ - ServerConfig config = DbleServer.getInstance().getConfig(); - Map addOrChangeHosts = new LinkedHashMap<>(); - Map noChangeHosts = new LinkedHashMap<>(); - Map recycleHosts = new HashMap<>(); - distinguishDbGroup(loader.getDbGroups(), config.getDbGroups(), addOrChangeHosts, noChangeHosts, recycleHosts); + ServerConfig newConfig = new ServerConfig(loader); + Map newDbGroups = newConfig.getDbGroups(); - Map mergedDbGroups = new LinkedHashMap<>(); - mergedDbGroups.putAll(noChangeHosts); - mergedDbGroups.putAll(addOrChangeHosts); + // check version/packetSize/lowerCase && get system variables + SystemVariables newSystemVariables = checkVersionAGetSystemVariables(loader, newDbGroups, changeItemList, forceAllReload); - ConfigUtil.getAndSyncKeyVariables(mergedDbGroups, true); + // recycle old active conn + recycleOldBackendConnections(forceAllReload, (loadAllMode & ManagerParseConfig.OPTF_MODE) != 0); - SystemVariables newSystemVariables = getSystemVariablesFromdbGroup(loader, mergedDbGroups); - ReloadLogHelper.info("reload config: get variables from random node end", LOGGER); - ServerConfig serverConfig = new ServerConfig(loader); + // lowerCase && load sequence if (loader.isFullyConfigured()) { if (newSystemVariables.isLowerCaseTableNames()) { ReloadLogHelper.info("reload config: dbGroup's lowerCaseTableNames=1, lower the config properties start", LOGGER); - serverConfig.reviseLowerCase(loader.getSequenceConfig()); + newConfig.reviseLowerCase(loader.getSequenceConfig()); ReloadLogHelper.info("reload config: dbGroup's lowerCaseTableNames=1, lower the config properties end", LOGGER); } else { - serverConfig.loadSequence(loader.getSequenceConfig()); - serverConfig.selfChecking0(); + newConfig.loadSequence(loader.getSequenceConfig()); + newConfig.selfChecking0(); } } - checkTestConnIfNeed(loadAllMode, loader); - Map newUsers = serverConfig.getUsers(); - Map newSchemas = serverConfig.getSchemas(); - Map newShardingNodes = serverConfig.getShardingNodes(); - Map> newErRelations = serverConfig.getErRelations(); - Map> newFuncNodeERMap = serverConfig.getFuncNodeERMap(); - Map newDbGroups = serverConfig.getDbGroups(); - Map newBlacklistConfig = serverConfig.getBlacklistConfig(); - Map newFunctions = serverConfig.getFunctions(); - - /* - * 2 transform - * 2.1 old lDbInstance continue to work - * 2.1.1 define the diff of new & old dbGroup config - * 2.1.2 create new init plan for the reload - * 2.2 init the new lDbInstance - * 2.3 transform - * 2.4 put the old connection into a queue - */ + Map newUsers = newConfig.getUsers(); + Map newSchemas = newConfig.getSchemas(); + Map newShardingNodes = newConfig.getShardingNodes(); + Map> newErRelations = newConfig.getErRelations(); + Map> newFuncNodeERMap = newConfig.getFuncNodeERMap(); + Map newBlacklistConfig = newConfig.getBlacklistConfig(); + Map newFunctions = newConfig.getFunctions(); - /* 2.2 init the lDbInstance with diff*/ - ReloadLogHelper.info("reload config: init new dbGroup start", LOGGER); - String reasonMsg = initDbGroupByMap(mergedDbGroups, newShardingNodes, loader.isFullyConfigured()); - ReloadLogHelper.info("reload config: init new dbGroup end", LOGGER); - if (reasonMsg == null) { - /* 2.3 apply new conf */ - ReloadLogHelper.info("reload config: apply new config start", LOGGER); - boolean result; - try { - result = config.reload(newUsers, newSchemas, newShardingNodes, mergedDbGroups, recycleHosts, newErRelations, newFuncNodeERMap, - newSystemVariables, loader.isFullyConfigured(), loadAllMode, newBlacklistConfig, newFunctions, - loader.getUserConfig(), loader.getSequenceConfig(), loader.getShardingConfig(), loader.getDbConfig()); - CronScheduler.getInstance().init(config.getSchemas()); - if (!result) { - initFailed(newDbGroups); - } - FrontendUserManager.getInstance().initForLatest(newUsers, SystemConfig.getInstance().getMaxCon()); - ReloadLogHelper.info("reload config: apply new config end", LOGGER); - recycleOldBackendConnections((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0); - if (!loader.isFullyConfigured()) { - recycleServerConnections(); - } - return result; - } catch (Exception e) { + // start/stop connection pool && heartbeat + // replace config + ReloadLogHelper.info("reload config: apply new config start", LOGGER); + ServerConfig oldConfig = DbleServer.getInstance().getConfig(); + boolean result; + try { + result = oldConfig.reload(newUsers, newSchemas, newShardingNodes, newDbGroups, oldConfig.getDbGroups(), newErRelations, newFuncNodeERMap, + newSystemVariables, loader.isFullyConfigured(), loadAllMode, newBlacklistConfig, newFunctions, + loader.getUserConfig(), loader.getSequenceConfig(), loader.getShardingConfig(), loader.getDbConfig(), changeItemList); + CronScheduler.getInstance().init(oldConfig.getSchemas()); + if (!result) { initFailed(newDbGroups); - throw e; } - } else { + FrontendUserManager.getInstance().changeUser(changeItemList, SystemConfig.getInstance().getMaxCon()); + ReloadLogHelper.info("reload config: apply new config end", LOGGER); + // recycle old active conn + recycleOldBackendConnections(!forceAllReload, (loadAllMode & ManagerParseConfig.OPTF_MODE) != 0); + if (!loader.isFullyConfigured()) { + recycleServerConnections(); + } + return result; + } catch (Exception e) { initFailed(newDbGroups); - throw new Exception(reasonMsg); + throw e; } } finally { TraceManager.finishSpan(traceObject); } } - private static void recycleOldBackendConnections(boolean closeFrontCon) { - if (closeFrontCon) { + /** + * check version/packetSize/lowerCase + * get system variables + */ + private static SystemVariables checkVersionAGetSystemVariables(ConfigInitializer loader, Map newDbGroups, List changeItemList, boolean forceAllReload) throws Exception { + ReloadLogHelper.info("reload config: check and get system variables from random node start", LOGGER); + SystemVariables newSystemVariables; + if (forceAllReload) { + //check version/packetSize/lowerCase + ConfigUtil.getAndSyncKeyVariables(newDbGroups, true); + //system variables + newSystemVariables = getSystemVariablesFromdbGroup(loader, newDbGroups); + } else { + //check version/packetSize/lowerCase + ConfigUtil.getAndSyncKeyVariables(changeItemList, true); + //random one node + //system variables + PhysicalDbInstance physicalDbInstance = getPhysicalDbInstance(loader); + newSystemVariables = getSystemVariablesFromDbInstance(loader.isFullyConfigured(), physicalDbInstance); + } + ReloadLogHelper.info("reload config: check and get system variables from random node end", LOGGER); + return newSystemVariables; + } + + /** + * test connection + */ + private static void testConnection(ConfigInitializer loader, List changeItemList, boolean forceAllReload, int loadAllMode) throws Exception { + ReloadLogHelper.info("reload config: test connection start", LOGGER); + try { + //test connection + if (forceAllReload && loader.isFullyConfigured()) { + loader.testConnection(); + } else { + syncShardingNode(loader); + loader.testConnection(changeItemList); + } + } catch (Exception e) { + if ((loadAllMode & ManagerParseConfig.OPTS_MODE) == 0 && loader.isFullyConfigured()) { + //default/-f/-r + throw new Exception(e); + } else { + //-s + ReloadLogHelper.debug("just test ,not stop reload, catch exception", LOGGER, e); + } + } + ReloadLogHelper.info("reload config: test connection end", LOGGER); + } + + /** + * compare change + */ + private static List compareChange(ConfigInitializer loader) { + ReloadLogHelper.info("reload config: compare changes start", LOGGER); + //todo 测试效率 + List changeItemList = differentiateChanges(loader); + ReloadLogHelper.debug("change items :{}", LOGGER, changeItemList); + ReloadLogHelper.info("reload config: compare changes end", LOGGER); + return changeItemList; + } + + /** + * load configuration + * xml:db.xml/user.xml/sharding.xml/sequence + * memory:dble_information/cluster synchronization + */ + private static ConfigInitializer loadConfig(RawJson userConfig, RawJson dbConfig, RawJson shardingConfig, RawJson sequenceConfig) throws Exception { + ConfigInitializer loader; + try { + if (null == userConfig && null == dbConfig && null == shardingConfig && null == sequenceConfig) { + ReloadLogHelper.info("reload config: load config start [local xml]", LOGGER); + loader = new ConfigInitializer(); + } else { + ReloadLogHelper.info("reload config: load info start [memory]", LOGGER); + ReloadLogHelper.debug("memory to Users is :{}\r\n" + + "memory to DbGroups is :{}\r\n" + + "memory to Shardings is :{}\r\n" + + "memory to sequence is :{}", LOGGER, userConfig, dbConfig, shardingConfig, sequenceConfig); + loader = new ConfigInitializer(userConfig, dbConfig, shardingConfig, sequenceConfig); + } + ReloadLogHelper.info("reload config: load config end", LOGGER); + return loader; + } catch (Exception e) { + throw new Exception(e.getMessage() == null ? e.toString() : e.getMessage(), e); + } + } + + private static void syncShardingNode(ConfigInitializer loader) { + Map oldShardingNodeMap = DbleServer.getInstance().getConfig().getShardingNodes(); + Map newShardingNodeMap = loader.getShardingNodes(); + for (Map.Entry shardingNodeEntry : newShardingNodeMap.entrySet()) { + //sync schema_exists,only testConn can update schema_exists + if (oldShardingNodeMap.containsKey(shardingNodeEntry.getKey())) { + shardingNodeEntry.getValue().setSchemaExists(oldShardingNodeMap.get(shardingNodeEntry.getKey()).isSchemaExists()); + } + } + + } + + /** + * user in use cannot be deleted + */ + private static void checkUser(List changeItemList) { + for (ChangeItem changeItem : changeItemList) { + int type = changeItem.getType(); + Object item = changeItem.getItem(); + if (type == 3 && item instanceof UserName) { + //check is it in use + Integer count = FrontendUserManager.getInstance().getUserConnectionMap().get(item); + if (null != count && count > 0) { + throw new ConfigException("user['" + item.toString() + "'] is being used."); + } + } else if (type == 2 && changeItem.isAffectEntryDbGroup() && item instanceof UserName) { + //check is it in use + Integer count = FrontendUserManager.getInstance().getUserConnectionMap().get(item); + if (null != count && count > 0) { + throw new ConfigException("user['" + item.toString() + "'] is being used."); + } + } + } + } + + private static List differentiateChanges(ConfigInitializer newLoader) { + List changeItemList = Lists.newArrayList(); + //user + //old + ServerConfig oldServerConfig = DbleServer.getInstance().getConfig(); + Map oldUserMap = oldServerConfig.getUsers(); + //new + Map newUserMap = newLoader.getUsers(); + MapDifference userMapDifference = Maps.difference(newUserMap, oldUserMap); + //delete + userMapDifference.entriesOnlyOnRight().keySet().stream().map(userConfig -> new ChangeItem(3, userConfig)).forEach(changeItemList::add); + //add + userMapDifference.entriesOnlyOnLeft().keySet().stream().map(userConfig -> new ChangeItem(1, userConfig)).forEach(changeItemList::add); + //update + userMapDifference.entriesDiffering().entrySet().stream().map(differenceEntry -> { + UserConfig newUserConfig = differenceEntry.getValue().leftValue(); + UserConfig oldUserConfig = differenceEntry.getValue().rightValue(); + ChangeItem changeItem = new ChangeItem(2, differenceEntry.getKey()); + if (newUserConfig instanceof RwSplitUserConfig && oldUserConfig instanceof RwSplitUserConfig) { + if (!((RwSplitUserConfig) newUserConfig).getDbGroup().equals(((RwSplitUserConfig) oldUserConfig).getDbGroup())) { + changeItem.setAffectEntryDbGroup(true); + } + } + return changeItem; + }).forEach(changeItemList::add); + + //shardingNode + Map oldShardingNodeMap = oldServerConfig.getShardingNodes(); + Map newShardingNodeMap = newLoader.getShardingNodes(); + MapDifference shardingNodeMapDiff = Maps.difference(newShardingNodeMap, oldShardingNodeMap); + //delete + shardingNodeMapDiff.entriesOnlyOnRight().values().stream().map(sharingNode -> new ChangeItem(3, sharingNode)).forEach(changeItemList::add); + //add + shardingNodeMapDiff.entriesOnlyOnLeft().values().stream().map(sharingNode -> new ChangeItem(1, sharingNode)).forEach(changeItemList::add); + //update + shardingNodeMapDiff.entriesDiffering().entrySet().stream().map(differenceEntry -> { + ShardingNode newShardingNode = differenceEntry.getValue().leftValue(); + ChangeItem changeItem = new ChangeItem(2, newShardingNode); + return changeItem; + }).forEach(changeItemList::add); + + //dbGroup + Map oldDbGroupMap = oldServerConfig.getDbGroups(); + Map newDbGroupMap = newLoader.getDbGroups(); + Map removeDbGroup = new LinkedHashMap<>(oldDbGroupMap); + for (Map.Entry newDbGroupEntry : newDbGroupMap.entrySet()) { + PhysicalDbGroup oldDbGroup = oldDbGroupMap.get(newDbGroupEntry.getKey()); + PhysicalDbGroup newDbGroup = newDbGroupEntry.getValue(); + + if (null == oldDbGroup) { + //add dbGroup + changeItemList.add(new ChangeItem(1, newDbGroup)); + } else { + removeDbGroup.remove(newDbGroupEntry.getKey()); + //change dbGroup + if (!newDbGroup.equalsBaseInfo(oldDbGroup)) { + ChangeItem changeItem = new ChangeItem(2, newDbGroup); + if (!newDbGroup.equalsForConnectionPool(oldDbGroup)) { + changeItem.setAffectConnectionPool(true); + } + if (!newDbGroup.equalsForHeartbeat(oldDbGroup)) { + changeItem.setAffectHeartbeat(true); + } + changeItemList.add(changeItem); + } + + //dbInstance + Map newDbInstanceMap = newDbGroup.getAllDbInstanceMap(); + Map oldDbInstanceMap = oldDbGroup.getAllDbInstanceMap(); + + MapDifference dbInstanceMapDifference = Maps.difference(newDbInstanceMap, oldDbInstanceMap); + //delete + dbInstanceMapDifference.entriesOnlyOnRight().values().stream().map(dbInstance -> new ChangeItem(3, dbInstance)).forEach(changeItemList::add); + //add + dbInstanceMapDifference.entriesOnlyOnLeft().values().stream().map(dbInstance -> new ChangeItem(1, dbInstance)).forEach(changeItemList::add); + //update + dbInstanceMapDifference.entriesDiffering().values().stream().map(physicalDbInstanceValueDifference -> { + PhysicalDbInstance newDbInstance = physicalDbInstanceValueDifference.leftValue(); + PhysicalDbInstance oldDbInstance = physicalDbInstanceValueDifference.rightValue(); + ChangeItem changeItem = new ChangeItem(2, newDbInstance); + if (!newDbInstance.equalsForConnectionPool(oldDbInstance)) { + changeItem.setAffectConnectionPool(true); + } + if (!newDbInstance.equalsForPoolCapacity(oldDbInstance)) { + changeItem.setAffectPoolCapacity(true); + } + if (!newDbInstance.equalsForHeartbeat(oldDbInstance)) { + changeItem.setAffectHeartbeat(true); + } + if (!newDbInstance.equalsForTestConn(oldDbInstance)) { + changeItem.setAffectTestConn(true); + } else { + newDbInstance.setTestConnSuccess(oldDbInstance.isTestConnSuccess()); + } + return changeItem; + }).forEach(changeItemList::add); + //testConnSuccess with both + for (Map.Entry dbInstanceEntry : dbInstanceMapDifference.entriesInCommon().entrySet()) { + dbInstanceEntry.getValue().setTestConnSuccess(oldDbInstanceMap.get(dbInstanceEntry.getKey()).isTestConnSuccess()); + } + + } + } + for (Map.Entry entry : removeDbGroup.entrySet()) { + PhysicalDbGroup value = entry.getValue(); + changeItemList.add(new ChangeItem(3, value)); + } + + return changeItemList; + } + + private static PhysicalDbInstance getPhysicalDbInstance(ConfigInitializer loader) { + PhysicalDbInstance ds = null; + for (PhysicalDbGroup dbGroup : loader.getDbGroups().values()) { + PhysicalDbInstance dsTest = dbGroup.getWriteDbInstance(); + if (dsTest.isTestConnSuccess()) { + ds = dsTest; + } + if (ds != null) { + break; + } + } + if (ds == null) { + for (PhysicalDbGroup dbGroup : loader.getDbGroups().values()) { + for (PhysicalDbInstance dsTest : dbGroup.getDbInstances(false)) { + if (dsTest.isTestConnSuccess()) { + ds = dsTest; + break; + } + } + if (ds != null) { + break; + } + } + } + return ds; + } + + private static void recycleOldBackendConnections(boolean forceAllReload, boolean closeFrontCon) { + ReloadLogHelper.info("reload config: recycle old active backend [frontend] connections start", LOGGER); + if (forceAllReload && closeFrontCon) { for (IOProcessor processor : DbleServer.getInstance().getBackendProcessors()) { for (BackendConnection con : processor.getBackends().values()) { if (con.getPoolDestroyedTime() != 0) { @@ -397,6 +608,7 @@ public final class ReloadConfig { } } } + ReloadLogHelper.info("reload config: recycle old active backend [frontend] connections end", LOGGER); } @@ -408,93 +620,19 @@ public final class ReloadConfig { } } - private static boolean forceReloadAll(final int loadAllMode, ConfigInitializer loader) throws Exception { - TraceManager.TraceObject traceObject = TraceManager.threadTrace("self-force-reload"); - try { - ServerConfig config = DbleServer.getInstance().getConfig(); - ServerConfig serverConfig = new ServerConfig(loader); - Map newDbGroups = serverConfig.getDbGroups(); - - ConfigUtil.getAndSyncKeyVariables(newDbGroups, true); - - SystemVariables newSystemVariables = getSystemVariablesFromdbGroup(loader, newDbGroups); - ReloadLogHelper.info("reload config: get variables from random node end", LOGGER); - // recycle old active conn - if ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0) { - for (IOProcessor processor : DbleServer.getInstance().getBackendProcessors()) { - for (BackendConnection con : processor.getBackends().values()) { - if (con.getPoolDestroyedTime() != 0) { - con.close("old active backend conn will be forced closed by closing front conn"); - } - } - } - } - - if (loader.isFullyConfigured()) { - if (newSystemVariables.isLowerCaseTableNames()) { - ReloadLogHelper.info("reload config: dbGroup's lowerCaseTableNames=1, lower the config properties start", LOGGER); - serverConfig.reviseLowerCase(loader.getSequenceConfig()); - ReloadLogHelper.info("reload config: dbGroup's lowerCaseTableNames=1, lower the config properties end", LOGGER); - } else { - serverConfig.loadSequence(loader.getSequenceConfig()); - serverConfig.selfChecking0(); - } - } - checkTestConnIfNeed(loadAllMode, loader); - - Map newUsers = serverConfig.getUsers(); - Map newSchemas = serverConfig.getSchemas(); - Map newShardingNodes = serverConfig.getShardingNodes(); - Map> newErRelations = serverConfig.getErRelations(); - Map> newFuncNodeERMap = serverConfig.getFuncNodeERMap(); - Map newBlacklistConfig = serverConfig.getBlacklistConfig(); - Map newFunctions = serverConfig.getFunctions(); - - - ReloadLogHelper.info("reload config: init new dbGroup start", LOGGER); - String reasonMsg = initDbGroupByMap(newDbGroups, newShardingNodes, loader.isFullyConfigured()); - ReloadLogHelper.info("reload config: init new dbGroup end", LOGGER); - if (reasonMsg == null) { - /* 2.3 apply new conf */ - ReloadLogHelper.info("reload config: apply new config start", LOGGER); - boolean result; - try { - result = config.reload(newUsers, newSchemas, newShardingNodes, newDbGroups, config.getDbGroups(), newErRelations, newFuncNodeERMap, - newSystemVariables, loader.isFullyConfigured(), loadAllMode, newBlacklistConfig, newFunctions, - loader.getUserConfig(), loader.getSequenceConfig(), loader.getShardingConfig(), loader.getDbConfig()); - CronScheduler.getInstance().init(config.getSchemas()); - if (!result) { - initFailed(newDbGroups); - } - FrontendUserManager.getInstance().initForLatest(newUsers, SystemConfig.getInstance().getMaxCon()); - ReloadLogHelper.info("reload config: apply new config end", LOGGER); - if (!loader.isFullyConfigured()) { - recycleServerConnections(); - } - return result; - } catch (Exception e) { - initFailed(newDbGroups); - throw e; - } + private static SystemVariables getSystemVariablesFromDbInstance(boolean fullyConfigured, PhysicalDbInstance physicalDbInstance) throws Exception { + VarsExtractorHandler handler = new VarsExtractorHandler(physicalDbInstance); + SystemVariables newSystemVariables; + newSystemVariables = handler.execute(); + if (newSystemVariables == null) { + if (fullyConfigured) { + throw new Exception("Can't get variables from any dbInstance, because all of dbGroup can't connect to MySQL correctly"); } else { - initFailed(newDbGroups); - throw new Exception(reasonMsg); - } - } finally { - TraceManager.finishSpan(traceObject); - } - } - - private static void checkTestConnIfNeed(int loadAllMode, ConfigInitializer loader) throws Exception { - if ((loadAllMode & ManagerParseConfig.OPTS_MODE) == 0 && loader.isFullyConfigured()) { - try { - ReloadLogHelper.info("reload config: test all shardingNodes start", LOGGER); - loader.testConnection(); - ReloadLogHelper.info("reload config: test all shardingNodes end", LOGGER); - } catch (Exception e) { - throw new Exception(e); + ReloadLogHelper.info("reload config: no valid dbGroup ,keep variables as old", LOGGER); + newSystemVariables = DbleServer.getInstance().getSystemVariables(); } } + return newSystemVariables; } private static SystemVariables getSystemVariablesFromdbGroup(ConfigInitializer loader, Map newDbGroups) throws Exception { @@ -513,91 +651,23 @@ public final class ReloadConfig { } private static void recycleServerConnections() { + ReloadLogHelper.info("reload config: recycle front connection start", LOGGER); TraceManager.TraceObject traceObject = TraceManager.threadTrace("recycle-sharding-connections"); try { for (IOProcessor processor : DbleServer.getInstance().getFrontProcessors()) { for (FrontendConnection fcon : processor.getFrontends().values()) { if (!fcon.isManager()) { + ReloadLogHelper.debug("recycle front connection:{}", LOGGER, fcon); fcon.close("Reload causes the service to stop"); } } } + ReloadLogHelper.info("reload config: recycle front connection end", LOGGER); } finally { TraceManager.finishSpan(traceObject); } } - private static void distinguishDbGroup(Map newDbGroups, Map oldDbGroups, - Map addOrChangeDbGroups, Map noChangeDbGroups, - Map recycleHosts) { - - for (Map.Entry entry : newDbGroups.entrySet()) { - PhysicalDbGroup oldPool = oldDbGroups.get(entry.getKey()); - PhysicalDbGroup newPool = entry.getValue(); - if (oldPool == null) { - addOrChangeDbGroups.put(newPool.getGroupName(), newPool); - } else { - calcChangedDbGroups(addOrChangeDbGroups, noChangeDbGroups, recycleHosts, entry, oldPool); - } - } - - for (Map.Entry entry : oldDbGroups.entrySet()) { - PhysicalDbGroup newPool = newDbGroups.get(entry.getKey()); - - if (newPool == null) { - PhysicalDbGroup oldPool = entry.getValue(); - recycleHosts.put(oldPool.getGroupName(), oldPool); - } - } - } - - private static void calcChangedDbGroups(Map addOrChangeHosts, Map noChangeHosts, Map recycleHosts, Map.Entry entry, PhysicalDbGroup oldPool) { - PhysicalDbGroupDiff toCheck = new PhysicalDbGroupDiff(entry.getValue(), oldPool); - switch (toCheck.getChangeType()) { - case PhysicalDbGroupDiff.CHANGE_TYPE_CHANGE: - recycleHosts.put(toCheck.getNewPool().getGroupName(), toCheck.getOrgPool()); - addOrChangeHosts.put(toCheck.getNewPool().getGroupName(), toCheck.getNewPool()); - break; - case PhysicalDbGroupDiff.CHANGE_TYPE_ADD: - //when the type is change,just delete the old one and use the new one - addOrChangeHosts.put(toCheck.getNewPool().getGroupName(), toCheck.getNewPool()); - break; - case PhysicalDbGroupDiff.CHANGE_TYPE_NO: - //add old dbGroup into the new mergeddbGroups - noChangeHosts.put(toCheck.getNewPool().getGroupName(), toCheck.getOrgPool()); - break; - case PhysicalDbGroupDiff.CHANGE_TYPE_DELETE: - recycleHosts.put(toCheck.getOrgPool().getGroupName(), toCheck.getOrgPool()); - break; - //do not add into old one - default: - break; - } - } - - - private static String initDbGroupByMap(Map newDbGroups, Map newShardingNodes, boolean fullyConfigured) { - for (PhysicalDbGroup dbGroup : newDbGroups.values()) { - ReloadLogHelper.info("try to init dbGroup : " + dbGroup.getGroupName(), LOGGER); - String hostName = dbGroup.getGroupName(); - // set schemas - ArrayList dnSchemas = new ArrayList<>(30); - for (ShardingNode dn : newShardingNodes.values()) { - if (dn.getDbGroup().getGroupName().equals(hostName)) { - dn.setDbGroup(dbGroup); - dnSchemas.add(dn.getDatabase()); - } - } - dbGroup.setSchemas(dnSchemas.toArray(new String[dnSchemas.size()])); - if (fullyConfigured) { - dbGroup.init("reload config"); - } else { - LOGGER.info("dbGroup[" + hostName + "] is not fullyConfigured, so doing nothing"); - } - } - return null; - } - private static void writePacket(boolean isSuccess, ManagerService service, String errorMsg, int errorCode) { if (isSuccess) { if (LOGGER.isInfoEnabled()) { @@ -614,4 +684,90 @@ public final class ReloadConfig { service.writeErrMessage(errorCode, errorMsg); } } + + public static class ChangeItem { + //1:add 2:update 3:delete + private int type; + private Object item; + private boolean affectHeartbeat; + private boolean affectConnectionPool; + private boolean affectTestConn; + private boolean affectEntryDbGroup; + //connection pool capacity + private boolean affectPoolCapacity; + + public ChangeItem(int type, Object item) { + this.type = type; + this.item = item; + } + + public boolean isAffectHeartbeat() { + return affectHeartbeat; + } + + public void setAffectHeartbeat(boolean affectHeartbeat) { + this.affectHeartbeat = affectHeartbeat; + } + + public boolean isAffectConnectionPool() { + return affectConnectionPool; + } + + public void setAffectConnectionPool(boolean affectConnectionPool) { + this.affectConnectionPool = affectConnectionPool; + } + + public boolean isAffectPoolCapacity() { + return affectPoolCapacity; + } + + public void setAffectPoolCapacity(boolean affectPoolCapacity) { + this.affectPoolCapacity = affectPoolCapacity; + } + + public boolean isAffectTestConn() { + return affectTestConn; + } + + public void setAffectTestConn(boolean affectTestConn) { + this.affectTestConn = affectTestConn; + } + + public boolean isAffectEntryDbGroup() { + return affectEntryDbGroup; + } + + public void setAffectEntryDbGroup(boolean affectEntryDbGroup) { + this.affectEntryDbGroup = affectEntryDbGroup; + } + + public int getType() { + return type; + } + + public void setType(int type) { + this.type = type; + } + + public Object getItem() { + return item; + } + + public void setItem(Object item) { + this.item = item; + } + + @Override + public String toString() { + return "ChangeItem{" + + "type=" + type + + ", item=" + item + + ", affectHeartbeat=" + affectHeartbeat + + ", affectConnectionPool=" + affectConnectionPool + + ", affectTestConn=" + affectTestConn + + ", affectEntryDbGroup=" + affectEntryDbGroup + + ", affectPoolCapacity=" + affectPoolCapacity + + '}'; + } + } } diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowConnection.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowConnection.java index 3b0a2cde1..ba2693413 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowConnection.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowConnection.java @@ -215,7 +215,7 @@ public final class ShowConnection { isMatch = fc.getHost().equals(whereInfo.get("host")); } if (whereInfo.get("user") != null) { - isMatch = isMatch && fc.getFrontEndService().getUser().toString().equals(whereInfo.get("user")); + isMatch = isMatch && fc.getFrontEndService().getUser().getFullName().equals(whereInfo.get("user")); } return isMatch; } @@ -228,7 +228,7 @@ public final class ShowConnection { row.add(StringUtil.encode(c.getHost(), charset)); row.add(IntegerUtil.toBytes(c.getPort())); row.add(IntegerUtil.toBytes(c.getLocalPort())); - row.add(StringUtil.encode(service.getUser().toString(), charset)); + row.add(StringUtil.encode(service.getUser().getFullName(), charset)); row.add(StringUtil.encode(service.getSchema(), charset)); row.add(StringUtil.encode(service.getCharset().getClient(), charset)); row.add(StringUtil.encode(service.getCharset().getCollation(), charset)); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowConnectionSQL.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowConnectionSQL.java index 00a9c086e..3fe8908ae 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowConnectionSQL.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowConnectionSQL.java @@ -106,7 +106,7 @@ public final class ShowConnectionSQL { RowDataPacket row = new RowDataPacket(FIELD_COUNT); row.add(LongUtil.toBytes(c.getId())); row.add(StringUtil.encode(c.getHost(), charset)); - row.add(StringUtil.encode(c.getFrontEndService().getUser().toString(), charset)); + row.add(StringUtil.encode(c.getFrontEndService().getUser().getFullName(), charset)); AbstractService service = c.getService(); String executeSql = c.getFrontEndService().getExecuteSql(); if (executeSql != null) { diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowProcessList.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowProcessList.java index 79a0d5748..3f3c389c6 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowProcessList.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowProcessList.java @@ -188,7 +188,7 @@ public final class ShowProcessList { // BconnID row.add(threadId == null ? StringUtil.encode(NULL_VAL, charset) : LongUtil.toBytes(threadId)); // User - row.add(StringUtil.encode(fc.getFrontEndService().getUser().toString(), charset)); + row.add(StringUtil.encode(fc.getFrontEndService().getUser().getFullName(), charset)); // Front_Host row.add(StringUtil.encode(fc.getHost() + ":" + fc.getLocalPort(), charset)); // db diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowSQL.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowSQL.java index 3984b0ad3..43d5bc2e6 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowSQL.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowSQL.java @@ -108,7 +108,7 @@ public final class ShowSQL { RowDataPacket row = new RowDataPacket(FIELD_COUNT); row.add(LongUtil.toBytes(idx)); - row.add(StringUtil.encode(user.toString(), charset)); + row.add(StringUtil.encode(user.getFullName(), charset)); row.add(StringUtil.encode(FormatUtil.formatDate(sql.getStartTime()), charset)); row.add(LongUtil.toBytes(sql.getExecuteTime())); row.add(StringUtil.encode(sql.getSql(), charset)); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowSQLHigh.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowSQLHigh.java index 3bbd6967e..4d4ed8668 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowSQLHigh.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowSQLHigh.java @@ -116,7 +116,7 @@ public final class ShowSQLHigh { long minTime, long executeTime, long lastTime, String charset) { RowDataPacket row = new RowDataPacket(FIELD_COUNT); row.add(LongUtil.toBytes(i)); - row.add(StringUtil.encode(user.toString(), charset)); + row.add(StringUtil.encode(user.getFullName(), charset)); row.add(LongUtil.toBytes(count)); row.add(LongUtil.toBytes(avgTime)); row.add(LongUtil.toBytes(maxTime)); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowSQLLarge.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowSQLLarge.java index 2dc0d6d8c..422c01814 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowSQLLarge.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowSQLLarge.java @@ -102,7 +102,7 @@ public final class ShowSQLLarge { private static RowDataPacket getRow(UserName user, UserSqlLargeStat.SqlLarge sql, String charset) { RowDataPacket row = new RowDataPacket(FIELD_COUNT); - row.add(StringUtil.encode(user.toString(), charset)); + row.add(StringUtil.encode(user.getFullName(), charset)); row.add(LongUtil.toBytes(sql.getSqlRows())); row.add(StringUtil.encode(FormatUtil.formatDate(sql.getStartTime()), charset)); row.add(LongUtil.toBytes(sql.getExecuteTime())); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowSQLSlow.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowSQLSlow.java index 82882c8cf..2a5123e3a 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowSQLSlow.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowSQLSlow.java @@ -99,7 +99,7 @@ public final class ShowSQLSlow { private static RowDataPacket getRow(UserName user, SQLRecord sql, String charset) { RowDataPacket row = new RowDataPacket(FIELD_COUNT); - row.add(StringUtil.encode(user.toString(), charset)); + row.add(StringUtil.encode(user.getFullName(), charset)); row.add(StringUtil.encode(FormatUtil.formatDate(sql.getStartTime()), charset)); row.add(LongUtil.toBytes(sql.getExecuteTime())); row.add(StringUtil.encode(sql.getStatement(), charset)); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowSQLSumUser.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowSQLSumUser.java index 93bcda707..03cd67dd4 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowSQLSumUser.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowSQLSumUser.java @@ -138,7 +138,7 @@ public final class ShowSQLSumUser { String rStr = decimalFormat.format(1.0D * r / (r + w)); int max = rwStat.getConcurrentMax(); - row.add(StringUtil.encode(user.toString(), charset)); + row.add(StringUtil.encode(user.getFullName(), charset)); row.add(LongUtil.toBytes(r)); row.add(LongUtil.toBytes(w)); row.add(StringUtil.encode(String.valueOf(rStr), charset)); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowSqlResultSet.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowSqlResultSet.java index 5c172a1d4..a253291ae 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowSqlResultSet.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowSqlResultSet.java @@ -97,7 +97,7 @@ public final class ShowSqlResultSet { private static RowDataPacket getRow(int i, UserName user, String sql, int count, long resultSetSize, String charset) { RowDataPacket row = new RowDataPacket(FIELD_COUNT); row.add(LongUtil.toBytes(i)); - row.add(StringUtil.encode(user.toString(), charset)); + row.add(StringUtil.encode(user.getFullName(), charset)); row.add(LongUtil.toBytes(count)); row.add(StringUtil.encode(sql, charset)); row.add(LongUtil.toBytes(resultSetSize)); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowUserPrivilege.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowUserPrivilege.java index 1b58e8da3..39a01b04b 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowUserPrivilege.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowUserPrivilege.java @@ -93,7 +93,7 @@ public final class ShowUserPrivilege { if (noNeedCheck || userPrivilegesConfig.getSchemaPrivilege(schema) == null) { tableName = "*"; pri = ALL_PRIVILEGES; - RowDataPacket row = getRow(userName.toString(), schema, tableName, pri, service.getCharset().getResults()); + RowDataPacket row = getRow(userName.getFullName(), schema, tableName, pri, service.getCharset().getResults()); row.setPacketId(++packetId); buffer = row.write(buffer, service, true); } else { @@ -102,13 +102,13 @@ public final class ShowUserPrivilege { for (String tn : tables) { tableName = tn; pri = schemaPrivilege.getTablePrivilege(tn).getDml(); - RowDataPacket row = getRow(userName.toString(), schema, tableName, pri, service.getCharset().getResults()); + RowDataPacket row = getRow(userName.getFullName(), schema, tableName, pri, service.getCharset().getResults()); row.setPacketId(++packetId); buffer = row.write(buffer, service, true); } tableName = "*"; pri = schemaPrivilege.getDml(); - RowDataPacket row = getRow(userName.toString(), schema, tableName, pri, service.getCharset().getResults()); + RowDataPacket row = getRow(userName.getFullName(), schema, tableName, pri, service.getCharset().getResults()); row.setPacketId(++packetId); buffer = row.write(buffer, service, true); } diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ShowWhiteHost.java b/src/main/java/com/actiontech/dble/services/manager/response/ShowWhiteHost.java index 059111c81..4109b49d3 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ShowWhiteHost.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ShowWhiteHost.java @@ -62,7 +62,7 @@ public final class ShowWhiteHost { for (Map.Entry entry : map.entrySet()) { if (entry.getValue().getWhiteIPs().size() > 0) { for (String whiteIP : entry.getValue().getWhiteIPs()) { - RowDataPacket row = getRow(whiteIP, entry.getKey().toString(), service.getCharset().getResults()); + RowDataPacket row = getRow(whiteIP, entry.getKey().getFullName(), service.getCharset().getResults()); row.setPacketId(++packetId); buffer = row.write(buffer, service, true); } diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ha/DbGroupHaEnable.java b/src/main/java/com/actiontech/dble/services/manager/response/ha/DbGroupHaEnable.java index fe717a0e8..91c057990 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ha/DbGroupHaEnable.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ha/DbGroupHaEnable.java @@ -17,6 +17,8 @@ import com.actiontech.dble.config.model.ClusterConfig; import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.services.manager.handler.PacketResult; import com.actiontech.dble.singleton.HaConfigManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; @@ -27,6 +29,8 @@ import java.util.regex.Matcher; */ public final class DbGroupHaEnable { + private static final Logger LOGGER = LoggerFactory.getLogger(DbGroupHaEnable.class); + private DbGroupHaEnable() { } @@ -83,6 +87,7 @@ public final class DbGroupHaEnable { HaConfigManager.getInstance().haFinish(id, e.getMessage(), null); packetResult.setSuccess(false); packetResult.setErrorMsg("enable dbGroup with error, use show @@dbInstance to check latest status. Error:" + e.getMessage()); + LOGGER.warn("enable dbGroup with error, use show @@dbInstance to check latest status. Error:", e); return; } } diff --git a/src/main/java/com/actiontech/dble/services/mysqlauthenticate/util/AuthUtil.java b/src/main/java/com/actiontech/dble/services/mysqlauthenticate/util/AuthUtil.java index c643e675c..121c3338d 100644 --- a/src/main/java/com/actiontech/dble/services/mysqlauthenticate/util/AuthUtil.java +++ b/src/main/java/com/actiontech/dble/services/mysqlauthenticate/util/AuthUtil.java @@ -31,24 +31,24 @@ public final class AuthUtil { UserName user = new UserName(authPacket.getUser(), authPacket.getTenant()); UserConfig userConfig = DbleServer.getInstance().getConfig().getUsers().get(user); if (userConfig == null) { - return new AuthResultInfo("Access denied for user '" + user + "' with host '" + fconn.getHost() + "'"); + return new AuthResultInfo("Access denied for user '" + user.getFullName() + "' with host '" + fconn.getHost() + "'"); } //normal user login into manager port if (fconn.isManager() && !(userConfig instanceof ManagerUserConfig)) { - return new AuthResultInfo("Access denied for user '" + user + "'"); + return new AuthResultInfo("Access denied for user '" + user.getFullName() + "'"); } else if (!fconn.isManager() && userConfig instanceof ManagerUserConfig) { //manager user login into server port - return new AuthResultInfo("Access denied for manager user '" + user + "'"); + return new AuthResultInfo("Access denied for manager user '" + user.getFullName() + "'"); } if (!checkWhiteIPs(fconn.getHost(), userConfig.getWhiteIPs())) { - return new AuthResultInfo("Access denied for user '" + user + "' with host '" + fconn.getHost() + "'"); + return new AuthResultInfo("Access denied for user '" + user.getFullName() + "' with host '" + fconn.getHost() + "'"); } // check password if (!checkPassword(seed, authPacket.getPassword(), userConfig.getPassword(), plugin)) { - return new AuthResultInfo("Access denied for user '" + user + "', because password is incorrect"); + return new AuthResultInfo("Access denied for user '" + user.getFullName() + "', because password is incorrect"); } if (!DbleServer.getInstance().getConfig().isFullyConfigured() && !fconn.isManager()) { - return new AuthResultInfo("Access denied for user '" + user + "', because there are some empty dbGroup/fake dbInstance"); + return new AuthResultInfo("Access denied for user '" + user.getFullName() + "', because there are some empty dbGroup/fake dbInstance"); } // check schema final String schema = authPacket.getDatabase(); @@ -56,16 +56,16 @@ public final class AuthUtil { case ErrorCode.ER_BAD_DB_ERROR: return new AuthResultInfo("Unknown database '" + schema + "'"); case ErrorCode.ER_DBACCESS_DENIED_ERROR: - return new AuthResultInfo("Access denied for user '" + user + "' to database '" + schema + "'"); + return new AuthResultInfo("Access denied for user '" + user.getFullName() + "' to database '" + schema + "'"); default: break; } //check max connection switch (FrontendUserManager.getInstance().maxConnectionCheck(user, userConfig.getMaxCon(), fconn.isManager())) { case SERVER_MAX: - return new AuthResultInfo("Access denied for user '" + user + "',too many connections for dble server"); + return new AuthResultInfo("Access denied for user '" + user.getFullName() + "',too many connections for dble server"); case USER_MAX: - return new AuthResultInfo("Access denied for user '" + user + "',too many connections for this user"); + return new AuthResultInfo("Access denied for user '" + user.getFullName() + "',too many connections for this user"); default: break; } @@ -76,21 +76,21 @@ public final class AuthUtil { UserName user = new UserName(changeUserPacket.getUser(), changeUserPacket.getTenant()); UserConfig userConfig = DbleServer.getInstance().getConfig().getUsers().get(user); if (userConfig == null) { - return new AuthResultInfo("Access denied for user '" + user + "' with host '" + fconn.getHost() + "'"); + return new AuthResultInfo("Access denied for user '" + user.getFullName() + "' with host '" + fconn.getHost() + "'"); } //normal user login into manager port if (fconn.isManager() && !(userConfig instanceof ManagerUserConfig)) { - return new AuthResultInfo("Access denied for user '" + user + "'"); + return new AuthResultInfo("Access denied for user '" + user.getFullName() + "'"); } else if (!fconn.isManager() && userConfig instanceof ManagerUserConfig) { //manager user login into server port - return new AuthResultInfo("Access denied for manager user '" + user + "'"); + return new AuthResultInfo("Access denied for manager user '" + user.getFullName() + "'"); } if (!checkWhiteIPs(fconn.getHost(), userConfig.getWhiteIPs())) { - return new AuthResultInfo("Access denied for user '" + user + "' with host '" + fconn.getHost() + "'"); + return new AuthResultInfo("Access denied for user '" + user.getFullName() + "' with host '" + fconn.getHost() + "'"); } // check password if (!checkPassword(seed, changeUserPacket.getPassword(), userConfig.getPassword(), plugin)) { - return new AuthResultInfo("Access denied for user '" + user + "', because password is incorrect"); + return new AuthResultInfo("Access denied for user '" + user.getFullName() + "', because password is incorrect"); } // check schema final String schema = changeUserPacket.getDatabase(); @@ -98,7 +98,7 @@ public final class AuthUtil { case ErrorCode.ER_BAD_DB_ERROR: return new AuthResultInfo("Unknown database '" + schema + "'"); case ErrorCode.ER_DBACCESS_DENIED_ERROR: - return new AuthResultInfo("Access denied for user '" + user + "' to database '" + schema + "'"); + return new AuthResultInfo("Access denied for user '" + user.getFullName() + "' to database '" + schema + "'"); default: break; } diff --git a/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLProtoLogicHandler.java b/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLProtoLogicHandler.java index b4dbed46c..a0554812a 100644 --- a/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLProtoLogicHandler.java +++ b/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLProtoLogicHandler.java @@ -50,7 +50,7 @@ public class MySQLProtoLogicHandler { return; } if (!service.getUserConfig().getSchemas().contains(db)) { - String s = "Access denied for user '" + service.getUser() + "' to database '" + db + "'"; + String s = "Access denied for user '" + service.getUser().getFullName() + "' to database '" + db + "'"; service.writeErrMessage(ErrorCode.ER_DBACCESS_DENIED_ERROR, s); return; } diff --git a/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLShardingSQLHandler.java b/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLShardingSQLHandler.java index ea90a774d..c3bc997ab 100644 --- a/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLShardingSQLHandler.java +++ b/src/main/java/com/actiontech/dble/services/mysqlsharding/MySQLShardingSQLHandler.java @@ -72,7 +72,7 @@ public class MySQLShardingSQLHandler { public void routeSystemInfoAndExecuteSQL(String stmt, SchemaUtil.SchemaInfo schemaInfo, int sqlType) { ShardingUserConfig user = (ShardingUserConfig) (DbleServer.getInstance().getConfig().getUsers().get(service.getUser())); if (user == null || !user.getSchemas().contains(schemaInfo.getSchema())) { - service.writeErrMessage("42000", "Access denied for user '" + service.getUser() + "' to database '" + schemaInfo.getSchema() + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR); + service.writeErrMessage("42000", "Access denied for user '" + service.getUser().getFullName() + "' to database '" + schemaInfo.getSchema() + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR); return; } RouteResultset rrs = new RouteResultset(stmt, sqlType); diff --git a/src/main/java/com/actiontech/dble/services/mysqlsharding/ShardingService.java b/src/main/java/com/actiontech/dble/services/mysqlsharding/ShardingService.java index 6f1073cd3..79d10711b 100644 --- a/src/main/java/com/actiontech/dble/services/mysqlsharding/ShardingService.java +++ b/src/main/java/com/actiontech/dble/services/mysqlsharding/ShardingService.java @@ -193,7 +193,7 @@ public class ShardingService extends BusinessService { result.getViolations().get(0).getMessage()); } else { String violation = "[" + WallErrorCode.get(result.getViolations().get(0)) + "]"; - String msg = "Intercepted by suspected configuration " + violation + " in the blacklist of user '" + user + "', so it is considered unsafe SQL"; + String msg = "Intercepted by suspected configuration " + violation + " in the blacklist of user '" + user.getFullName() + "', so it is considered unsafe SQL"; LOGGER.warn("Firewall message:{}, {}", result.getViolations().get(0).getMessage(), msg); writeErrMessage(ErrorCode.ERR_WRONG_USED, msg); diff --git a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java index 00072df07..696191ef5 100644 --- a/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java +++ b/src/main/java/com/actiontech/dble/services/rwsplit/RWSplitService.java @@ -258,7 +258,7 @@ public class RWSplitService extends BusinessService { LOGGER.warn("{}", "druid not support sql syntax, the reason is " + result.getViolations().get(0).getMessage()); } else { String violation = "[" + WallErrorCode.get(result.getViolations().get(0)) + "]"; - String msg = "Intercepted by suspected configuration " + violation + " in the blacklist of user '" + user + "', so it is considered unsafe SQL"; + String msg = "Intercepted by suspected configuration " + violation + " in the blacklist of user '" + user.getFullName() + "', so it is considered unsafe SQL"; LOGGER.warn("Firewall message:{}, {}", result.getViolations().get(0).getMessage(), msg); writeErrMessage(ErrorCode.ERR_WRONG_USED, msg); diff --git a/src/main/java/com/actiontech/dble/singleton/FrontendUserManager.java b/src/main/java/com/actiontech/dble/singleton/FrontendUserManager.java index f562f68da..61d6ebd89 100644 --- a/src/main/java/com/actiontech/dble/singleton/FrontendUserManager.java +++ b/src/main/java/com/actiontech/dble/singleton/FrontendUserManager.java @@ -7,9 +7,11 @@ package com.actiontech.dble.singleton; import com.actiontech.dble.config.model.user.UserConfig; import com.actiontech.dble.config.model.user.UserName; +import com.actiontech.dble.services.manager.response.ReloadConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; @@ -67,6 +69,36 @@ public final class FrontendUserManager { } } + public void changeUser(List changeItemList, int serverLimit) { + TraceManager.TraceObject traceObject = TraceManager.threadTrace("init-for-user-manager"); + try { + serverMaxConnection = serverLimit; + + for (ReloadConfig.ChangeItem changeItem : changeItemList) { + int type = changeItem.getType(); + Object item = changeItem.getItem(); + boolean isUser = item instanceof UserName; + if (!isUser) { + continue; + } + UserName userName = (UserName) item; + if (type == 1) { + //add + if (!userConnectionMap.containsKey(userName)) { + userConnectionMap.put(userName, 0); + } + } else if (type == 3) { + //delete + if (userConnectionMap.containsKey(userName)) { + userConnectionMap.remove(userName); + } + } + } + } finally { + TraceManager.finishSpan(traceObject); + } + } + public CheckStatus maxConnectionCheck(UserName user, int userLimit, boolean isManager) { @@ -99,6 +131,10 @@ public final class FrontendUserManager { return OK; } + public Map getUserConnectionMap() { + return userConnectionMap; + } + public enum CheckStatus { OK, SERVER_MAX, USER_MAX } diff --git a/src/main/java/com/actiontech/dble/singleton/RouteService.java b/src/main/java/com/actiontech/dble/singleton/RouteService.java index e1e8b9c8e..86a49d707 100644 --- a/src/main/java/com/actiontech/dble/singleton/RouteService.java +++ b/src/main/java/com/actiontech/dble/singleton/RouteService.java @@ -43,7 +43,7 @@ public final class RouteService { String cacheKey = null; if (sqlType == ServerParse.SELECT && !LOGGER.isDebugEnabled() && CacheService.getSqlRouteCache() != null) { - cacheKey = (schema == null ? "NULL" : schema.getName()) + "_" + service.getUser() + "_" + stmt; + cacheKey = (schema == null ? "NULL" : schema.getName()) + "_" + service.getUser().getFullName() + "_" + stmt; rrs = (RouteResultset) CacheService.getSqlRouteCache().get(cacheKey); if (rrs != null) { service.getSession2().endParse(); diff --git a/src/main/java/com/actiontech/dble/singleton/Scheduler.java b/src/main/java/com/actiontech/dble/singleton/Scheduler.java index e86c9cf9d..5ed406811 100644 --- a/src/main/java/com/actiontech/dble/singleton/Scheduler.java +++ b/src/main/java/com/actiontech/dble/singleton/Scheduler.java @@ -6,8 +6,9 @@ package com.actiontech.dble.singleton; import com.actiontech.dble.DbleServer; -import com.actiontech.dble.backend.mysql.nio.handler.transaction.xa.XAAnalysisHandler; import com.actiontech.dble.backend.datasource.PhysicalDbGroup; +import com.actiontech.dble.backend.datasource.PhysicalDbInstance; +import com.actiontech.dble.backend.mysql.nio.handler.transaction.xa.XAAnalysisHandler; import com.actiontech.dble.backend.mysql.xa.XAStateLog; import com.actiontech.dble.buffer.BufferPool; import com.actiontech.dble.config.model.SystemConfig; @@ -52,6 +53,7 @@ public final class Scheduler { scheduledExecutor.scheduleWithFixedDelay(DbleServer.getInstance().processorCheck(), 0L, SystemConfig.getInstance().getProcessorCheckPeriod(), TimeUnit.MILLISECONDS); scheduledExecutor.scheduleAtFixedRate(dbInstanceOldConsClear(), 0L, DEFAULT_OLD_CONNECTION_CLEAR_PERIOD, TimeUnit.MILLISECONDS); scheduledExecutor.scheduleAtFixedRate(oldDbGroupClear(), 0L, DEFAULT_OLD_CONNECTION_CLEAR_PERIOD, TimeUnit.MILLISECONDS); + scheduledExecutor.scheduleAtFixedRate(oldDbInstanceClear(), 0L, DEFAULT_OLD_CONNECTION_CLEAR_PERIOD, TimeUnit.MILLISECONDS); scheduledExecutor.scheduleWithFixedDelay(xaSessionCheck(), 0L, SystemConfig.getInstance().getXaSessionCheckPeriod(), TimeUnit.MILLISECONDS); scheduledExecutor.scheduleWithFixedDelay(xaLogClean(), 0L, SystemConfig.getInstance().getXaLogCleanPeriod(), TimeUnit.MILLISECONDS); scheduledExecutor.scheduleWithFixedDelay(resultSetMapClear(), 0L, SystemConfig.getInstance().getClearBigSQLResultSetMapMs(), TimeUnit.MILLISECONDS); @@ -116,16 +118,15 @@ public final class Scheduler { } /** - * after reload @@config_all ,clean old connection + * after reload @@config_all ,clean old dbGroup */ private Runnable oldDbGroupClear() { return () -> timerExecutor.execute(() -> { - Iterator iterator = IOProcessor.BACKENDS_OLD_GROUP.iterator(); while (iterator.hasNext()) { PhysicalDbGroup dbGroup = iterator.next(); boolean isStop = dbGroup.stopOfBackground("[background task]reload config, recycle old group"); - LOGGER.info("[background task]recycle old group`{}` result{}", dbGroup.getGroupName(), isStop); + LOGGER.info("[background task]recycle old group:{},result:{}", dbGroup.getGroupName(), isStop); if (isStop) { iterator.remove(); } @@ -133,6 +134,24 @@ public final class Scheduler { }); } + /** + * after reload @@config_all ,clean old dbInstance + */ + private Runnable oldDbInstanceClear() { + return () -> timerExecutor.execute(() -> { + Iterator iterator = IOProcessor.BACKENDS_OLD_INSTANCE.iterator(); + while (iterator.hasNext()) { + PhysicalDbInstance dbInstance = iterator.next(); + boolean isStop = dbInstance.stopOfBackground("[background task]reload config, recycle old dbInstance"); + LOGGER.info("[background task]recycle old dbInstance:{},result:{}", dbInstance, isStop); + if (isStop) { + iterator.remove(); + dbInstance.getDbGroup().setState(PhysicalDbGroup.INITIAL); + } + } + }); + } + // XA session check job private Runnable xaSessionCheck() {