Merge pull request #3299 from actiontech/fix/1542

pref: refine the scope of effect of the reload configuration
This commit is contained in:
LUA
2022-07-12 10:46:04 +08:00
committed by GitHub
64 changed files with 1709 additions and 464 deletions

View File

@@ -76,7 +76,6 @@
<module name="TypecastParenPad"/>
<module name="WhitespaceAfter"/>
<module name="WhitespaceAround"/>
<module name="SeparatorWrap"/>
<module name="NoLineWrap"/>
<module name="EmptyForInitializerPad"/>

View File

@@ -9,6 +9,7 @@ import com.actiontech.dble.DbleServer;
import com.actiontech.dble.alarm.AlarmCode;
import com.actiontech.dble.alarm.Alert;
import com.actiontech.dble.alarm.AlertUtil;
import com.actiontech.dble.backend.heartbeat.MySQLHeartbeat;
import com.actiontech.dble.backend.mysql.nio.MySQLInstance;
import com.actiontech.dble.cluster.JsonFactory;
import com.actiontech.dble.cluster.values.DbInstanceStatus;
@@ -19,12 +20,14 @@ import com.actiontech.dble.config.helper.GetAndSyncDbInstanceKeyVariables;
import com.actiontech.dble.config.helper.KeyVariables;
import com.actiontech.dble.config.model.db.DbGroupConfig;
import com.actiontech.dble.config.model.db.DbInstanceConfig;
import com.actiontech.dble.meta.ReloadLogHelper;
import com.actiontech.dble.net.IOProcessor;
import com.actiontech.dble.net.Session;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.connection.PooledConnection;
import com.actiontech.dble.rwsplit.RWSplitNonBlockingSession;
import com.actiontech.dble.singleton.HaConfigManager;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
@@ -43,20 +46,17 @@ public class PhysicalDbGroup {
public static final String JSON_LIST = "dbInstance";
// rw split
public static final int RW_SPLIT_OFF = 0;
public static final int RW_SPLIT_ALL_SLAVES = 1;
public static final int RW_SPLIT_ALL = 2;
public static final int RW_SPLIT_ALL_SLAVES_MAY_MASTER = 3;
// weight
public static final int WEIGHT = 0;
private final List<PhysicalDbInstance> writeInstanceList;
private List<PhysicalDbInstance> writeInstanceList;
private final String groupName;
private final DbGroupConfig dbGroupConfig;
private String groupName;
private DbGroupConfig dbGroupConfig;
private volatile PhysicalDbInstance writeDbInstance;
private Map<String, PhysicalDbInstance> allSourceMap = new HashMap<>();
private final int rwSplitMode;
protected String[] schemas;
private int rwSplitMode;
protected List<String> schemas = Lists.newArrayList();
private final LoadBalancer loadBalancer = new RandomLoadBalancer();
private final LocalReadLoadBalancer localReadLoadBalancer = new LocalReadLoadBalancer();
private final ReentrantReadWriteLock adjustLock = new ReentrantReadWriteLock();
@@ -107,14 +107,22 @@ public class PhysicalDbGroup {
return groupName;
}
public String[] getSchemas() {
public List<String> getSchemas() {
return schemas;
}
public void setSchemas(String[] mySchemas) {
public void setSchemas(List<String> mySchemas) {
this.schemas = mySchemas;
}
public void addSchema(String schema) {
this.schemas.add(schema);
}
public void removeSchema(String schema) {
this.schemas.remove(schema);
}
public DbGroupConfig getDbGroupConfig() {
return dbGroupConfig;
}
@@ -154,14 +162,14 @@ public class PhysicalDbGroup {
return shardingUseless;
}
public boolean isRwSplitUseless() {
return rwSplitUseless;
}
public void setShardingUseless(boolean shardingUseless) {
this.shardingUseless = shardingUseless;
}
public boolean isRwSplitUseless() {
return rwSplitUseless;
}
public void setRwSplitUseless(boolean rwSplitUseless) {
this.rwSplitUseless = rwSplitUseless;
}
@@ -184,8 +192,11 @@ public class PhysicalDbGroup {
}
public void init(String reason) {
if (LOGGER.isDebugEnabled()) {
ReloadLogHelper.debug("init new group :{},reason:{}", LOGGER, this.toString(), reason);
}
for (Map.Entry<String, PhysicalDbInstance> entry : allSourceMap.entrySet()) {
entry.getValue().init(reason);
entry.getValue().init(reason, true);
}
}
@@ -198,12 +209,7 @@ public class PhysicalDbGroup {
}
private boolean checkState() {
if (getBindingCount() != 0) {
state = STATE_DELETING;
IOProcessor.BACKENDS_OLD_GROUP.add(this);
return false;
}
if (state.intValue() != INITIAL) {
if (isStop()) {
return false;
}
if (getBindingCount() != 0) {
@@ -220,19 +226,23 @@ public class PhysicalDbGroup {
}
public void stop(String reason, boolean closeFront) {
if (LOGGER.isDebugEnabled()) {
ReloadLogHelper.debug("recycle old group :{},reason:{},is close front:{}", LOGGER, this.toString(), reason, closeFront);
}
boolean flag = checkState();
if (!flag) {
return;
}
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
dbInstance.stop(reason, closeFront);
dbInstance.stopDirectly(reason, closeFront);
}
}
public void stopOfFresh(List<String> sourceNames, String reason, boolean closeFront) {
for (String sourceName : sourceNames) {
if (allSourceMap.containsKey(sourceName)) {
allSourceMap.get(sourceName).stop(reason, closeFront, false);
PhysicalDbInstance dbInstance = allSourceMap.get(sourceName);
dbInstance.stop(reason, closeFront, false, dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || writeDbInstance == dbInstance);
}
}
@@ -255,7 +265,7 @@ public class PhysicalDbGroup {
public boolean stopOfBackground(String reason) {
if (state.intValue() == STATE_DELETING && getBindingCount() == 0) {
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
dbInstance.stop(reason, false);
dbInstance.stopDirectly(reason, false);
}
return true;
}
@@ -267,6 +277,40 @@ public class PhysicalDbGroup {
return state.intValue() != INITIAL;
}
public void stopPool(String reason, boolean closeFront, boolean closeWrite) {
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
if (!closeWrite && writeDbInstance == dbInstance) {
continue;
}
dbInstance.stopPool(reason, closeFront);
}
}
public void startPool(String reason, boolean startWrite) {
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
if (!startWrite && writeDbInstance == dbInstance) {
continue;
}
dbInstance.startPool(reason);
}
}
public void stopHeartbeat(String reason) {
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
dbInstance.stopHeartbeat(reason);
}
}
public void startHeartbeat() {
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
if (dbInstance.heartbeat.isStop()) {
dbInstance.heartbeat = new MySQLHeartbeat(dbInstance);
}
dbInstance.startHeartbeat();
}
}
public Collection<PhysicalDbInstance> getDbInstances(boolean isAll) {
if (!isAll && rwSplitMode == RW_SPLIT_OFF) {
return writeInstanceList;
@@ -637,6 +681,14 @@ public class PhysicalDbGroup {
}
}
public void setState(Integer state) {
this.state = state;
}
public Integer getState() {
return state;
}
public int getBindingCount() {
return rwSplitSessionSet.size();
}
@@ -649,15 +701,58 @@ public class PhysicalDbGroup {
return this.rwSplitSessionSet.remove(session);
}
public void setDbInstance(PhysicalDbInstance dbInstance) {
dbInstance.setDbGroup(this);
if (dbInstance.getConfig().isPrimary()) {
this.writeDbInstance = dbInstance;
this.writeInstanceList = Collections.singletonList(dbInstance);
}
this.allSourceMap.put(dbInstance.getName(), dbInstance);
}
public boolean equalsBaseInfo(PhysicalDbGroup pool) {
return pool.getDbGroupConfig().getName().equals(this.dbGroupConfig.getName()) &&
pool.getDbGroupConfig().getHeartbeatSQL().equals(this.dbGroupConfig.getHeartbeatSQL()) &&
return pool.dbGroupConfig.equalsBaseInfo(this.dbGroupConfig) &&
pool.rwSplitMode == this.rwSplitMode &&
pool.getGroupName().equals(this.groupName) &&
pool.isUseless() == this.isUseless();
}
public boolean equalsForConnectionPool(PhysicalDbGroup pool) {
boolean rwSplitModeFlag1 = pool.getDbGroupConfig().getRwSplitMode() != 0 && this.dbGroupConfig.getRwSplitMode() != 0;
boolean rwSplitModeFlag2 = pool.getDbGroupConfig().getRwSplitMode() == 0 && this.dbGroupConfig.getRwSplitMode() == 0;
return (rwSplitModeFlag1 || rwSplitModeFlag2) && pool.isUseless() == this.isUseless();
}
public boolean equalsForHeartbeat(PhysicalDbGroup pool) {
return pool.getDbGroupConfig().getHeartbeatSQL().equals(this.dbGroupConfig.getHeartbeatSQL()) &&
pool.getDbGroupConfig().getHeartbeatTimeout() == this.dbGroupConfig.getHeartbeatTimeout() &&
pool.getDbGroupConfig().getErrorRetryCount() == this.dbGroupConfig.getErrorRetryCount() &&
pool.getDbGroupConfig().getRwSplitMode() == this.dbGroupConfig.getRwSplitMode() &&
pool.getDbGroupConfig().getDelayThreshold() == this.dbGroupConfig.getDelayThreshold() &&
pool.getDbGroupConfig().isDisableHA() == this.dbGroupConfig.isDisableHA() &&
pool.getGroupName().equals(this.groupName) && pool.isShardingUseless() == this.isShardingUseless() && pool.isRwSplitUseless() == this.isRwSplitUseless() &&
pool.isAnalysisUseless() == this.isAnalysisUseless();
pool.getDbGroupConfig().getErrorRetryCount() == this.dbGroupConfig.getErrorRetryCount();
}
public void copyBaseInfo(PhysicalDbGroup physicalDbGroup) {
this.dbGroupConfig = physicalDbGroup.dbGroupConfig;
this.groupName = physicalDbGroup.groupName;
this.rwSplitMode = physicalDbGroup.rwSplitMode;
this.schemas = physicalDbGroup.schemas;
this.rwSplitUseless = physicalDbGroup.rwSplitUseless;
this.shardingUseless = physicalDbGroup.shardingUseless;
for (PhysicalDbInstance dbInstance : this.allSourceMap.values()) {
dbInstance.setDbGroupConfig(physicalDbGroup.dbGroupConfig);
}
}
@Override
public String toString() {
return "PhysicalDbGroup{" +
"groupName='" + groupName + '\'' +
", dbGroupConfig=" + dbGroupConfig +
", writeDbInstance=" + writeDbInstance +
", allSourceMap=" + allSourceMap +
", rwSplitMode=" + rwSplitMode +
", schemas=" + schemas +
", shardingUseless=" + shardingUseless +
", rwSplitUseless=" + rwSplitUseless +
'}';
}
}

View File

@@ -13,6 +13,8 @@ import com.actiontech.dble.backend.pool.ConnectionPool;
import com.actiontech.dble.backend.pool.ReadTimeStatusInstance;
import com.actiontech.dble.config.model.db.DbGroupConfig;
import com.actiontech.dble.config.model.db.DbInstanceConfig;
import com.actiontech.dble.meta.ReloadLogHelper;
import com.actiontech.dble.net.IOProcessor;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.connection.PooledConnection;
import com.actiontech.dble.net.factory.MySQLConnectionFactory;
@@ -24,6 +26,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
@@ -35,10 +39,10 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
private static final Logger LOGGER = LoggerFactory.getLogger(PhysicalDbInstance.class);
private final String name;
private final DbInstanceConfig config;
private DbInstanceConfig config;
private volatile boolean readInstance;
private final DbGroupConfig dbGroupConfig;
private DbGroupConfig dbGroupConfig;
private PhysicalDbGroup dbGroup;
private final AtomicBoolean disabled;
private String dsVersion;
@@ -75,10 +79,6 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
this.disabled = new AtomicBoolean(org.disabled.get());
}
public void init(String reason) {
init(reason, true);
}
public void init(String reason, boolean isInitHeartbeat) {
if (disabled.get() || fakeNode) {
LOGGER.info("init dbInstance[{}] because {}, but it is disabled or a fakeNode, skip initialization.", this.dbGroup.getGroupName() + "." + name, reason);
@@ -89,10 +89,17 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
LOGGER.info("init dbInstance[{}] because {}, but it has been initialized, skip initialization.", this.dbGroup.getGroupName() + "." + name, reason);
return;
}
//minCon/maxCon/numOfShardingNodes
checkPoolSize();
LOGGER.info("init dbInstance[{}]", this.dbGroup.getGroupName() + "." + name);
start(reason, isInitHeartbeat);
}
protected void checkPoolSize() {
int size = config.getMinCon();
String[] physicalSchemas = dbGroup.getSchemas();
int initSize = physicalSchemas.length;
List<String> physicalSchemas = dbGroup.getSchemas();
int initSize = physicalSchemas.size();
if (size < initSize) {
LOGGER.warn("For db instance[{}], minIdle is less than (the count of shardingNodes), so dble will create at least 1 conn for every schema, " +
"minCon size before:{}, now:{}", this.dbGroup.getGroupName() + "." + name, size, initSize);
@@ -105,9 +112,6 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
LOGGER.warn("For db instance[{}], maxTotal[{}] is less than the minCon or the count of shardingNodes,change the maxCon into {}", this.dbGroup.getGroupName() + "." + name, size, initSize);
config.setMaxCon(initSize);
}
LOGGER.info("init dbInstance[{}]", this.dbGroup.getGroupName() + "." + name);
start(reason, isInitHeartbeat);
}
public void createConnectionSkipPool(String schema, ResponseHandler handler) {
@@ -284,6 +288,10 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
return dbGroupConfig;
}
public void setDbGroupConfig(DbGroupConfig dbGroupConfig) {
this.dbGroupConfig = dbGroupConfig;
}
public boolean isFakeNode() {
return fakeNode;
}
@@ -368,7 +376,10 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
return isSync && isNotDelay;
}
private void startHeartbeat() {
public void startHeartbeat() {
if (LOGGER.isDebugEnabled()) {
ReloadLogHelper.debug("start heartbeat :{}", LOGGER, this.toString());
}
if (this.isDisabled() || this.isFakeNode()) {
LOGGER.info("the instance[{}] is disabled or fake node, skip to start heartbeat.", this.dbGroup.getGroupName() + "." + name);
return;
@@ -378,29 +389,94 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
}
void start(String reason, boolean isStartHeartbeat) {
if ((dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this) && !dbGroup.isUseless()) {
this.connectionPool.startEvictor(this.dbGroup.getGroupName() + "." + name, reason);
}
startPool(reason);
if (isStartHeartbeat) {
startHeartbeat();
}
}
public void stop(String reason, boolean closeFront) {
stop(reason, closeFront, true);
public void startPool(String reason) {
if (disabled.get() || fakeNode) {
LOGGER.info("init dbInstance[{}] because {}, but it is disabled or a fakeNode, skip initialization.", this.dbGroup.getGroupName() + "." + name, reason);
return;
}
if ((dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this) && !dbGroup.isUseless()) {
if (LOGGER.isDebugEnabled()) {
ReloadLogHelper.debug("start connection pool :{},reason:{}", LOGGER, this.toString(), reason);
}
this.connectionPool.startEvictor(this.dbGroup.getGroupName() + "." + name, reason);
}
}
public void stop(String reason, boolean closeFront, boolean isStopHeartbeat) {
if (isStopHeartbeat) {
heartbeat.stop(reason);
private boolean checkState() {
if (dbGroup.getBindingCount() != 0) {
dbGroup.setState(PhysicalDbGroup.STATE_DELETING);
IOProcessor.BACKENDS_OLD_INSTANCE.add(this);
return false;
}
if (dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this) {
LOGGER.info("stop connection pool of physical db instance[{}], due to {}", this.dbGroup.getGroupName() + "." + name, reason);
connectionPool.stop(reason, closeFront);
if (dbGroup.isStop()) {
return false;
}
if (dbGroup.getBindingCount() != 0) {
dbGroup.setState(PhysicalDbGroup.STATE_DELETING);
IOProcessor.BACKENDS_OLD_INSTANCE.add(this);
return false;
}
return true;
}
public boolean stopOfBackground(String reason) {
if (dbGroup.getState() == PhysicalDbGroup.STATE_DELETING && dbGroup.getBindingCount() == 0) {
stopDirectly(reason, false);
return true;
}
return false;
}
public void stopDirectly(String reason, boolean closeFront) {
stop(reason, closeFront, true, dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this);
}
public void stop(String reason, boolean closeFront) {
boolean flag = checkState();
if (!flag) {
return;
}
stopDirectly(reason, closeFront);
}
protected void stop(String reason, boolean closeFront, boolean isStopHeartbeat, boolean isStopPool) {
if (isStopHeartbeat) {
stopHeartbeat(reason);
}
if (isStopPool) {
stopPool(reason, closeFront);
}
isInitial.set(false);
}
public void stopHeartbeat(String reason) {
if (LOGGER.isDebugEnabled()) {
ReloadLogHelper.debug("stop heartbeat :{},reason:{}", LOGGER, this.toString(), reason);
}
heartbeat.stop(reason);
}
public void stopPool(String reason, boolean closeFront) {
LOGGER.info("stop connection pool of physical db instance[{}], due to {}", this.dbGroup.getGroupName() + "." + name, reason);
if (LOGGER.isDebugEnabled()) {
ReloadLogHelper.debug("stop connection pool :{},reason:{},is close front:{}", LOGGER, this.toString(), reason, closeFront);
}
connectionPool.stop(reason, closeFront);
}
public void updatePoolCapacity() {
//minCon/maxCon/numOfShardingNodes
checkPoolSize();
connectionPool.evictImmediately();
connectionPool.fillPool();
}
public void closeAllConnection(String reason) {
this.needSkipEvit = true;
this.connectionPool.forceCloseAllConnection(reason);
@@ -460,29 +536,55 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
if (!(other instanceof PhysicalDbInstance)) {
return false;
}
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PhysicalDbInstance that = (PhysicalDbInstance) o;
PhysicalDbInstance dbInstance = (PhysicalDbInstance) other;
DbInstanceConfig otherConfig = dbInstance.getConfig();
DbInstanceConfig thisConfig = this.getConfig();
return otherConfig.getUser().equals(thisConfig.getUser()) && otherConfig.getUrl().equals(thisConfig.getUrl()) &&
otherConfig.getMaxCon() == thisConfig.getMaxCon() && otherConfig.getMinCon() == thisConfig.getMinCon() &&
otherConfig.getPassword().equals(thisConfig.getPassword()) && otherConfig.getInstanceName().equals(thisConfig.getInstanceName()) &&
dbInstance.isDisabled() == this.isDisabled() && otherConfig.getReadWeight() == thisConfig.getReadWeight() &&
otherConfig.getPoolConfig().equals(thisConfig.getPoolConfig()) && otherConfig.isUsingDecrypt() == thisConfig.isUsingDecrypt() &&
otherConfig.getDataBaseType() == thisConfig.getDataBaseType() && StringUtil.equalsIgnoreCase(otherConfig.getDbDistrict(), thisConfig.getDbDistrict()) &&
StringUtil.equalsIgnoreCase(otherConfig.getDbDataCenter(), thisConfig.getDbDataCenter());
return readInstance == that.readInstance &&
Objects.equals(name, that.name) &&
Objects.equals(config, that.config) &&
dbGroupConfig.equalsBaseInfo(that.dbGroupConfig) &&
dbGroup.equalsBaseInfo(that.dbGroup) &&
Objects.equals(disabled.get(), that.disabled.get());
}
@Override
public int hashCode() {
return super.hashCode();
return Objects.hash(name, config, readInstance, dbGroupConfig, dbGroup, disabled, heartbeat);
}
public boolean equalsForConnectionPool(PhysicalDbInstance dbInstance) {
return this.config.getUrl().equals(dbInstance.getConfig().getUrl()) &&
this.config.getPort() == dbInstance.getConfig().getPort() &&
this.config.getUser().equals(dbInstance.getConfig().getUser()) &&
this.config.getPassword().equals(dbInstance.getConfig().getPassword()) &&
this.config.isUsingDecrypt() == dbInstance.getConfig().isUsingDecrypt() &&
this.config.getPoolConfig().getTimeBetweenEvictionRunsMillis() == dbInstance.getConfig().getPoolConfig().getTimeBetweenEvictionRunsMillis() &&
this.disabled.get() == dbInstance.isDisabled();
}
public boolean equalsForPoolCapacity(PhysicalDbInstance dbInstance) {
return this.config.getMinCon() == dbInstance.getConfig().getMinCon() &&
this.config.getMaxCon() == dbInstance.getConfig().getMaxCon();
}
public boolean equalsForHeartbeat(PhysicalDbInstance dbInstance) {
return this.config.getUrl().equals(dbInstance.getConfig().getUrl()) &&
this.config.getPort() == dbInstance.getConfig().getPort() &&
this.config.getUser().equals(dbInstance.getConfig().getUser()) &&
this.config.getPassword().equals(dbInstance.getConfig().getPassword()) &&
this.config.isUsingDecrypt() == dbInstance.getConfig().isUsingDecrypt() &&
this.config.getPoolConfig().getHeartbeatPeriodMillis() == dbInstance.getConfig().getPoolConfig().getHeartbeatPeriodMillis() &&
this.disabled.get() == dbInstance.isDisabled();
}
public boolean equalsForTestConn(PhysicalDbInstance dbInstance) {
return this.config.getUrl().equals(dbInstance.getConfig().getUrl()) &&
this.config.getPort() == dbInstance.getConfig().getPort() &&
this.config.getUser().equals(dbInstance.getConfig().getUser()) &&
this.config.getPassword().equals(dbInstance.getConfig().getPassword()) &&
this.config.isUsingDecrypt() == dbInstance.getConfig().isUsingDecrypt();
}
@Override
@@ -493,4 +595,9 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
",minCon=" + config.getMinCon() + "]";
}
public void copyBaseInfo(PhysicalDbInstance physicalDbInstance) {
this.config = physicalDbInstance.getConfig();
this.connectionPool.copyBaseInfo(physicalDbInstance.connectionPool);
}
}

View File

@@ -14,6 +14,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Objects;
public class ShardingNode {
protected static final Logger LOGGER = LoggerFactory.getLogger(ShardingNode.class);
@@ -144,4 +145,19 @@ public class ShardingNode {
StringUtil.equalsWithEmpty(this.database, shardingNode.getDatabase()) &&
this.dbGroup.equalsBaseInfo(shardingNode.getDbGroup());
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardingNode that = (ShardingNode) o;
return Objects.equals(name, that.name) &&
Objects.equals(dbGroupName, that.dbGroupName) &&
Objects.equals(database, that.database);
}
@Override
public int hashCode() {
return Objects.hash(name, dbGroupName, database);
}
}

View File

@@ -179,7 +179,7 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
}
}
private void fillPool() {
public void fillPool() {
final int idleCount = getCount(STATE_NOT_IN_USE, STATE_HEARTBEAT);
int tmpTotalConnections = totalConnections.get();
if (tmpTotalConnections < 0) {
@@ -385,6 +385,33 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
}
public void evictImmediately() {
final ArrayList<PooledConnection> idleList = new ArrayList<>(allConnections.size());
for (final PooledConnection entry : allConnections) {
if (entry.getState() == STATE_NOT_IN_USE) {
idleList.add(entry);
}
}
int removable = idleList.size() - config.getMinCon();
if (removable <= 0) {
return;
}
// Sort pool entries on lastAccessed
idleList.sort(LAST_ACCESS_COMPARABLE);
logPoolState("before cleanup ");
for (PooledConnection conn : idleList) {
if (removable > 0 && conn.compareAndSet(STATE_NOT_IN_USE, STATE_RESERVED)) {
conn.close("connection has passed idleTimeout");
removable--;
}
}
}
public ReadTimeStatusInstance getInstance() {
return instance;
}
@@ -435,6 +462,10 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
}
}
public void copyBaseInfo(ConnectionPool connectionPool) {
this.config = connectionPool.config;
}
/**
* The idle object evictor.
*/

View File

@@ -13,7 +13,7 @@ import java.io.IOException;
public class PoolBase {
protected final DbInstanceConfig config;
protected DbInstanceConfig config;
protected final ReadTimeStatusInstance instance;
protected final PooledConnectionFactory factory;

View File

@@ -20,11 +20,18 @@ import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.config.model.db.type.DataBaseType;
import com.actiontech.dble.config.model.sharding.SchemaConfig;
import com.actiontech.dble.config.model.sharding.table.ERTable;
import com.actiontech.dble.config.model.user.*;
import com.actiontech.dble.config.model.user.AnalysisUserConfig;
import com.actiontech.dble.config.model.user.RwSplitUserConfig;
import com.actiontech.dble.config.model.user.UserConfig;
import com.actiontech.dble.config.model.user.UserName;
import com.actiontech.dble.config.util.ConfigException;
import com.actiontech.dble.meta.ReloadLogHelper;
import com.actiontech.dble.plan.common.ptr.BoolPtr;
import com.actiontech.dble.route.function.AbstractPartitionAlgorithm;
import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.services.manager.response.ChangeItem;
import com.actiontech.dble.services.manager.response.ChangeItemType;
import com.actiontech.dble.services.manager.response.ChangeType;
import com.actiontech.dble.singleton.TraceManager;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
@@ -122,7 +129,7 @@ public class ConfigInitializer implements ProblemReporter {
this.dbGroups = dbConverter.getDbGroupMap();
this.dbConfig = dbJson;
} else {
this.dbGroups = new HashMap<>();
this.dbGroups = Maps.newLinkedHashMap();
this.dbConfig = null;
}
@@ -183,7 +190,7 @@ public class ConfigInitializer implements ProblemReporter {
}
private void checkWriteDbInstance() {
if (this.dbGroups.isEmpty()) {
if (this.dbGroups == null || this.dbGroups.isEmpty()) {
return;
}
//Mark all dbInstance whether they are fake or not
@@ -229,6 +236,117 @@ public class ConfigInitializer implements ProblemReporter {
}
}
public void testConnection(List<ChangeItem> changeItemList) {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("test-connection");
try {
Map<String, List<Pair<String, String>>> hostSchemaMap = genDbInstanceSchemaMap();
Set<String> errDbInstanceNames = new HashSet<>();
boolean isAllDbInstanceConnected = true;
// check whether dbInstance is connected
String dbGroupName;
PhysicalDbGroup dbGroup;
for (ChangeItem changeItem : changeItemList) {
ChangeType type = changeItem.getType();
Object item = changeItem.getItem();
ChangeItemType itemType = changeItem.getItemType();
switch (type) {
case ADD:
if (itemType == ChangeItemType.PHYSICAL_DB_GROUP) {
//test dbGroup
dbGroup = (PhysicalDbGroup) item;
dbGroupName = dbGroup.getGroupName();
// sharding group
List<Pair<String, String>> schemaList = checkDbGroupMaxConn(hostSchemaMap, dbGroup);
for (PhysicalDbInstance ds : dbGroup.getDbInstances(true)) {
//test dbInstance
boolean testResult = checkAndTestDbInstance(ds, dbGroupName, schemaList);
if (!testResult) {
isAllDbInstanceConnected = false;
errDbInstanceNames.add("dbInstance[" + dbGroupName + "." + ds.getName() + "]");
}
}
} else if (itemType == ChangeItemType.PHYSICAL_DB_INSTANCE) {
PhysicalDbInstance ds = (PhysicalDbInstance) item;
dbGroupName = ds.getDbGroupConfig().getName();
// sharding group
List<Pair<String, String>> schemaList = checkDbInstanceMaxConn(hostSchemaMap, ds);
//test dbInstance
boolean testResult = checkAndTestDbInstance(ds, dbGroupName, schemaList);
if (!testResult) {
isAllDbInstanceConnected = false;
errDbInstanceNames.add("dbInstance[" + dbGroupName + "." + ds.getName() + "]");
}
} else if (itemType == ChangeItemType.SHARDING_NODE) {
ShardingNode shardingNode = (ShardingNode) item;
dbGroup = shardingNode.getDbGroup();
dbGroupName = dbGroup.getGroupName();
// sharding group
List<Pair<String, String>> schemaList = checkDbGroupMaxConn(hostSchemaMap, dbGroup);
for (PhysicalDbInstance ds : dbGroup.getDbInstances(true)) {
//test dbInstance
boolean testResult = checkAndTestDbInstance(ds, dbGroupName, schemaList);
if (!testResult) {
isAllDbInstanceConnected = false;
errDbInstanceNames.add("dbInstance[" + dbGroupName + "." + ds.getName() + "]");
}
}
}
break;
case UPDATE:
if (itemType == ChangeItemType.PHYSICAL_DB_INSTANCE && changeItem.isAffectTestConn()) {
PhysicalDbInstance ds = (PhysicalDbInstance) item;
dbGroupName = ds.getDbGroupConfig().getName();
// sharding group
List<Pair<String, String>> schemaList = checkDbInstanceMaxConn(hostSchemaMap, ds);
//test dbInstance
//test dbInstance
boolean testResult = checkAndTestDbInstance(ds, dbGroupName, schemaList);
if (!testResult) {
isAllDbInstanceConnected = false;
errDbInstanceNames.add("dbInstance[" + dbGroupName + "." + ds.getName() + "]");
}
} else if (itemType == ChangeItemType.SHARDING_NODE) {
ShardingNode shardingNode = (ShardingNode) item;
dbGroup = shardingNode.getDbGroup();
dbGroupName = dbGroup.getGroupName();
// sharding group
List<Pair<String, String>> schemaList = checkDbGroupMaxConn(hostSchemaMap, dbGroup);
for (PhysicalDbInstance ds : dbGroup.getDbInstances(true)) {
//test dbInstance
boolean testResult = checkAndTestDbInstance(ds, dbGroupName, schemaList);
if (!testResult) {
isAllDbInstanceConnected = false;
errDbInstanceNames.add("dbInstance[" + dbGroupName + "." + ds.getName() + "]");
}
}
}
break;
default:
break;
}
}
if (!isAllDbInstanceConnected) {
StringBuilder sb = new StringBuilder("SelfCheck### there are some dbInstance connection failed, pls check these dbInstance:");
for (String key : errDbInstanceNames) {
sb.append("{");
sb.append(key);
sb.append("},");
}
throw new ConfigException(sb.toString());
}
} finally {
TraceManager.finishSpan(traceObject);
}
}
public void testConnection() {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("test-connection");
try {
@@ -283,17 +401,55 @@ public class ConfigInitializer implements ProblemReporter {
}
}
private List<Pair<String, String>> checkDbInstanceMaxConn(Map<String, List<Pair<String, String>>> hostSchemaMap, PhysicalDbInstance ds) {
List<Pair<String, String>> schemaList = null;
if (hostSchemaMap.containsKey(ds.getDbGroupConfig().getName())) {
schemaList = hostSchemaMap.get(ds.getDbGroupConfig().getName());
checkMaxCon(ds, schemaList.size());
}
return schemaList;
}
private List<Pair<String, String>> checkDbGroupMaxConn(Map<String, List<Pair<String, String>>> hostSchemaMap, PhysicalDbGroup dbGroup) {
List<Pair<String, String>> schemaList = null;
if (hostSchemaMap.containsKey(dbGroup.getGroupName())) {
schemaList = hostSchemaMap.get(dbGroup.getGroupName());
checkMaxCon(dbGroup, schemaList.size());
}
return schemaList;
}
private boolean checkAndTestDbInstance(PhysicalDbInstance ds, String dbGroupName, List<Pair<String, String>> schemaList) {
if (ds.getConfig().isDisabled()) {
errorInfos.add(new ErrorInfo("Backend", "WARNING", "dbGroup[" + dbGroupName + "," + ds.getName() + "] is disabled"));
LOGGER.info("dbGroup[" + ds.getDbGroupConfig().getName() + "] is disabled,just mark testing failed and skip it");
ds.setTestConnSuccess(false);
return true;
} else if (ds.isFakeNode()) {
errorInfos.add(new ErrorInfo("Backend", "WARNING", "dbGroup[" + dbGroupName + "," + ds.getName() + "] is fake Node"));
LOGGER.info("dbGroup[" + ds.getDbGroupConfig().getName() + "] is disabled,just mark testing failed and skip it");
ds.setTestConnSuccess(false);
return true;
}
return testDbInstance(dbGroupName, ds, schemaList);
}
private void checkMaxCon(PhysicalDbGroup pool, int schemasCount) {
for (PhysicalDbInstance dbInstance : pool.getDbInstances(true)) {
if (dbInstance.getConfig().getMaxCon() < Math.max(schemasCount + 1, dbInstance.getConfig().getMinCon())) {
errorInfos.add(new ErrorInfo("Xml", "NOTICE", "dbGroup[" + pool.getGroupName() + "." + dbInstance.getConfig().getInstanceName() + "] maxCon too little,would be change to " +
Math.max(schemasCount + 1, dbInstance.getConfig().getMinCon())));
}
checkMaxCon(dbInstance, schemasCount);
}
}
if (Math.max(schemasCount + 1, dbInstance.getConfig().getMinCon()) != dbInstance.getConfig().getMinCon()) {
errorInfos.add(new ErrorInfo("Xml", "NOTICE", "dbGroup[" + pool.getGroupName() + "] minCon too little, Dble would init dbGroup" +
" with " + (schemasCount + 1) + " connections"));
}
private void checkMaxCon(PhysicalDbInstance dbInstance, int schemasCount) {
if (dbInstance.getConfig().getMaxCon() < Math.max(schemasCount + 1, dbInstance.getConfig().getMinCon())) {
errorInfos.add(new ErrorInfo("Xml", "NOTICE", "dbGroup[" + dbInstance.getDbGroupConfig().getName() + "." + dbInstance.getConfig().getInstanceName() + "] maxCon too little,would be change to " +
Math.max(schemasCount + 1, dbInstance.getConfig().getMinCon())));
}
if (Math.max(schemasCount + 1, dbInstance.getConfig().getMinCon()) != dbInstance.getConfig().getMinCon()) {
errorInfos.add(new ErrorInfo("Xml", "NOTICE", "dbGroup[" + dbInstance.getDbGroupConfig().getName() + "] minCon too little, Dble would init dbGroup" +
" with " + (schemasCount + 1) + " connections"));
}
}
@@ -322,6 +478,8 @@ public class ConfigInitializer implements ProblemReporter {
errorInfos.add(new ErrorInfo("Backend", "WARNING", "Can't connect to [" + dbInstanceKey + "]"));
LOGGER.warn("SelfCheck### can't connect to [" + dbInstanceKey + "]");
isConnectivity = false;
} finally {
ReloadLogHelper.debug("test connection dbInstance:{},is connect:{},schemaList:{}", LOGGER, ds, isConnectivity, schemaList);
}
return isConnectivity;
}

View File

@@ -17,6 +17,7 @@ public final class DbleTempConfig {
private RawJson shardingConfig;
private RawJson userConfig;
private RawJson sequenceConfig;
private boolean lowerCase;
public RawJson getDbConfig() {
return dbConfig;
@@ -50,6 +51,14 @@ public final class DbleTempConfig {
this.sequenceConfig = sequenceConfig;
}
public boolean isLowerCase() {
return lowerCase;
}
public void setLowerCase(boolean lowerCase) {
this.lowerCase = lowerCase;
}
public static DbleTempConfig getInstance() {
return INSTANCE;
}

View File

@@ -11,6 +11,7 @@ import com.actiontech.dble.alarm.Alert;
import com.actiontech.dble.alarm.AlertUtil;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbGroupDiff;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.datasource.ShardingNode;
import com.actiontech.dble.cluster.JsonFactory;
import com.actiontech.dble.cluster.logic.ClusterLogic;
@@ -34,12 +35,13 @@ import com.actiontech.dble.route.function.AbstractPartitionAlgorithm;
import com.actiontech.dble.route.parser.ManagerParseConfig;
import com.actiontech.dble.route.parser.util.Pair;
import com.actiontech.dble.server.variables.SystemVariables;
import com.actiontech.dble.singleton.CacheService;
import com.actiontech.dble.singleton.HaConfigManager;
import com.actiontech.dble.singleton.ProxyMeta;
import com.actiontech.dble.singleton.SequenceManager;
import com.actiontech.dble.services.manager.response.ChangeItem;
import com.actiontech.dble.services.manager.response.ChangeItemType;
import com.actiontech.dble.services.manager.response.ChangeType;
import com.actiontech.dble.singleton.*;
import com.actiontech.dble.util.StringUtil;
import com.actiontech.dble.util.TimeUtil;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +52,7 @@ import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author mycat
*/
@@ -73,6 +76,7 @@ public class ServerConfig {
private RawJson shardingConfig;
private RawJson userConfig;
private RawJson sequenceConfig;
private boolean lowerCase;
public ServerConfig() {
//read sharding.xml,db.xml and user.xml
@@ -220,15 +224,15 @@ public class ServerConfig {
public boolean reload(Map<UserName, UserConfig> newUsers, Map<String, SchemaConfig> newSchemas,
Map<String, ShardingNode> newShardingNodes, Map<String, PhysicalDbGroup> newDbGroups,
Map<String, PhysicalDbGroup> recycleDbGroups,
Map<String, PhysicalDbGroup> oldDbGroups,
Map<ERTable, Set<ERTable>> newErRelations,
Map<String, Set<ERTable>> newFuncNodeERMap,
SystemVariables newSystemVariables, boolean isFullyConfigured,
final int loadAllMode, Map<String, Properties> newBlacklistConfig, Map<String, AbstractPartitionAlgorithm> newFunctions,
RawJson userJsonConfig, RawJson sequenceJsonConfig, RawJson shardingJsonConfig, RawJson dbJsonConfig) throws SQLNonTransientException {
boolean result = apply(newUsers, newSchemas, newShardingNodes, newDbGroups, recycleDbGroups, newErRelations, newFuncNodeERMap,
RawJson userJsonConfig, RawJson sequenceJsonConfig, RawJson shardingJsonConfig, RawJson dbJsonConfig, List<ChangeItem> changeItemList) throws SQLNonTransientException {
boolean result = apply(newUsers, newSchemas, newShardingNodes, newDbGroups, oldDbGroups, newErRelations, newFuncNodeERMap,
newSystemVariables, isFullyConfigured, loadAllMode, newBlacklistConfig, newFunctions, userJsonConfig,
sequenceJsonConfig, shardingJsonConfig, dbJsonConfig);
sequenceJsonConfig, shardingJsonConfig, dbJsonConfig, changeItemList);
this.reloadTime = TimeUtil.currentTimeMillis();
return result;
}
@@ -342,12 +346,12 @@ public class ServerConfig {
Map<String, SchemaConfig> newSchemas,
Map<String, ShardingNode> newShardingNodes,
Map<String, PhysicalDbGroup> newDbGroups,
Map<String, PhysicalDbGroup> recycleDbGroups,
Map<String, PhysicalDbGroup> oldDbGroups,
Map<ERTable, Set<ERTable>> newErRelations,
Map<String, Set<ERTable>> newFuncNodeERMap,
SystemVariables newSystemVariables,
boolean isFullyConfigured, final int loadAllMode, Map<String, Properties> newBlacklistConfig, Map<String, AbstractPartitionAlgorithm> newFunctions,
RawJson userJsonConfig, RawJson sequenceJsonConfig, RawJson shardingJsonConfig, RawJson dbJsonConfig) throws SQLNonTransientException {
RawJson userJsonConfig, RawJson sequenceJsonConfig, RawJson shardingJsonConfig, RawJson dbJsonConfig, List<ChangeItem> changeItemList) throws SQLNonTransientException {
List<Pair<String, String>> delTables = new ArrayList<>();
List<Pair<String, String>> reloadTables = new ArrayList<>();
List<String> delSchema = new ArrayList<>();
@@ -359,6 +363,8 @@ public class ServerConfig {
metaLock.lock();
this.changing = true;
try {
// user in use cannot be deleted
checkUser(changeItemList);
String checkResult = ProxyMeta.getInstance().getTmManager().metaCountCheck();
if (checkResult != null) {
LOGGER.warn(checkResult);
@@ -369,23 +375,29 @@ public class ServerConfig {
} catch (Exception e) {
throw new SQLNonTransientException("HaConfigManager init failed", "HY000", ErrorCode.ER_YES);
}
// old dbGroup
// 1 stop heartbeat
// 2 backup
//--------------------------------------------
if (recycleDbGroups != null) {
String recycleGroupName;
PhysicalDbGroup recycleGroup;
for (Map.Entry<String, PhysicalDbGroup> entry : recycleDbGroups.entrySet()) {
recycleGroupName = entry.getKey();
recycleGroup = entry.getValue();
// avoid recycleGroup == newGroup, can't stop recycleGroup
if (newDbGroups.get(recycleGroupName) != recycleGroup) {
ReloadLogHelper.info("reload config, recycle old group. old active backend conn will be close", LOGGER);
recycleGroup.stop("reload config, recycle old group", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0));
ReloadLogHelper.info("reload config: init new dbGroup start", LOGGER);
if ((loadAllMode & ManagerParseConfig.OPTR_MODE) != 0) {
//all dbGroup reload & recycle
initDbGroupByMap(oldDbGroups, newDbGroups, newShardingNodes, isFullyConfigured, loadAllMode);
} else {
//replace dbGroup reference
for (Map.Entry<String, ShardingNode> shardingNodeEntry : newShardingNodes.entrySet()) {
ShardingNode shardingNode = shardingNodeEntry.getValue();
PhysicalDbGroup oldDbGroup = oldDbGroups.get(shardingNode.getDbGroupName());
if (null == oldDbGroup) {
oldDbGroup = newDbGroups.get(shardingNode.getDbGroupName());
}
shardingNode.setDbGroup(oldDbGroup);
}
//only change dbGroup reload & recycle
initDbGroupByMap(changeItemList, oldDbGroups, newShardingNodes, isFullyConfigured, loadAllMode);
newDbGroups = oldDbGroups;
}
ReloadLogHelper.info("reload config: init new dbGroup end", LOGGER);
this.shardingNodes = newShardingNodes;
this.dbGroups = newDbGroups;
this.fullyConfigured = isFullyConfigured;
@@ -401,6 +413,7 @@ public class ServerConfig {
this.dbConfig = dbJsonConfig;
this.shardingConfig = shardingJsonConfig;
this.sequenceConfig = sequenceJsonConfig;
this.lowerCase = DbleTempConfig.getInstance().isLowerCase();
CacheService.getInstance().clearCache();
this.changing = false;
if (isFullyConfigured) {
@@ -413,6 +426,221 @@ public class ServerConfig {
return true;
}
/**
* user in use cannot be deleted
*/
private static void checkUser(List<ChangeItem> changeItemList) {
for (ChangeItem changeItem : changeItemList) {
ChangeType type = changeItem.getType();
Object item = changeItem.getItem();
ChangeItemType itemType = changeItem.getItemType();
if (type == ChangeType.DELETE && itemType == ChangeItemType.USERNAME) {
//check is it in use
Integer count = FrontendUserManager.getInstance().getUserConnectionMap().get(item);
if (null != count && count > 0) {
throw new ConfigException("user['" + item.toString() + "'] is being used.");
}
} else if (type == ChangeType.UPDATE && changeItem.isAffectEntryDbGroup() && itemType == ChangeItemType.USERNAME) {
//check is it in use
Integer count = FrontendUserManager.getInstance().getUserConnectionMap().get(item);
if (null != count && count > 0) {
throw new ConfigException("user['" + item.toString() + "'] is being used.");
}
}
}
}
private static void initDbGroupByMap(Map<String, PhysicalDbGroup> oldDbGroups, Map<String, PhysicalDbGroup> newDbGroups,
Map<String, ShardingNode> newShardingNodes, boolean fullyConfigured, int loadAllMode) {
if (oldDbGroups != null) {
//Only -r uses this method to recycle the connection pool
String recycleGroupName;
PhysicalDbGroup recycleGroup;
for (Map.Entry<String, PhysicalDbGroup> entry : oldDbGroups.entrySet()) {
recycleGroupName = entry.getKey();
recycleGroup = entry.getValue();
// avoid recycleGroup == newGroup, can't stop recycleGroup
if (newDbGroups.get(recycleGroupName) != recycleGroup) {
ReloadLogHelper.info("reload config, recycle old group. old active backend conn will be close", LOGGER);
recycleGroup.stop("reload config, recycle old group", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0));
}
}
}
for (PhysicalDbGroup dbGroup : newDbGroups.values()) {
String hostName = dbGroup.getGroupName();
// set schemas
ArrayList<String> dnSchemas = new ArrayList<>(30);
for (ShardingNode dn : newShardingNodes.values()) {
if (dn.getDbGroup().getGroupName().equals(hostName)) {
dn.setDbGroup(dbGroup);
dnSchemas.add(dn.getDatabase());
}
}
dbGroup.setSchemas(dnSchemas);
if (fullyConfigured) {
dbGroup.init("reload config");
} else {
LOGGER.info("dbGroup[" + hostName + "] is not fullyConfigured, so doing nothing");
}
}
}
private void initDbGroupByMap(List<ChangeItem> changeItemList, Map<String, PhysicalDbGroup> oldDbGroupMap, Map<String, ShardingNode> newShardingNodes,
boolean isFullyConfigured, int loadAllMode) {
List<PhysicalDbGroup> updateDbGroupList = Lists.newArrayList();
for (ChangeItem changeItem : changeItemList) {
Object item = changeItem.getItem();
ChangeItemType itemType = changeItem.getItemType();
switch (changeItem.getType()) {
case ADD:
addItem(item, itemType, oldDbGroupMap, newShardingNodes, isFullyConfigured);
break;
case UPDATE:
updateItem(item, itemType, oldDbGroupMap, newShardingNodes, changeItem, updateDbGroupList, loadAllMode);
break;
case DELETE:
deleteItem(item, itemType, oldDbGroupMap, loadAllMode);
break;
default:
break;
}
}
for (PhysicalDbGroup physicalDbGroup : updateDbGroupList) {
physicalDbGroup.startHeartbeat();
}
}
private void deleteItem(Object item, ChangeItemType itemType, Map<String, PhysicalDbGroup> oldDbGroupMap, int loadAllMode) {
if (itemType == ChangeItemType.PHYSICAL_DB_GROUP) {
//delete dbGroup
PhysicalDbGroup physicalDbGroup = (PhysicalDbGroup) item;
PhysicalDbGroup oldDbGroup = oldDbGroupMap.remove(physicalDbGroup.getGroupName());
oldDbGroup.stop("reload config, recycle old group", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0));
oldDbGroup = null;
} else if (itemType == ChangeItemType.PHYSICAL_DB_INSTANCE) {
PhysicalDbInstance physicalDbInstance = (PhysicalDbInstance) item;
//delete slave instance
PhysicalDbGroup physicalDbGroup = oldDbGroupMap.get(physicalDbInstance.getDbGroupConfig().getName());
PhysicalDbInstance oldDbInstance = physicalDbGroup.getAllDbInstanceMap().remove(physicalDbInstance.getName());
oldDbInstance.stop("reload config, recycle old instance", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0));
oldDbInstance = null;
} else if (itemType == ChangeItemType.SHARDING_NODE) {
ShardingNode shardingNode = (ShardingNode) item;
if (shardingNode.getDbGroup() != null) {
shardingNode.getDbGroup().removeSchema(shardingNode.getDatabase());
}
}
}
private void updateItem(Object item, ChangeItemType itemType, Map<String, PhysicalDbGroup> oldDbGroupMap, Map<String, ShardingNode> newShardingNodes, ChangeItem changeItem,
List<PhysicalDbGroup> updateDbGroupList, int loadAllMode) {
if (itemType == ChangeItemType.PHYSICAL_DB_GROUP) {
//change dbGroup
PhysicalDbGroup physicalDbGroup = (PhysicalDbGroup) item;
PhysicalDbGroup oldDbGroup = oldDbGroupMap.get(physicalDbGroup.getGroupName());
if (changeItem.isAffectHeartbeat()) {
oldDbGroup.stopHeartbeat("reload config, stop group heartbeat");
oldDbGroup.copyBaseInfo(physicalDbGroup);
//create a new heartbeat in the follow-up
updateDbGroupList.add(oldDbGroup);
} else {
oldDbGroup.copyBaseInfo(physicalDbGroup);
}
reloadSchema(oldDbGroup, newShardingNodes);
if (changeItem.isAffectConnectionPool()) {
if (physicalDbGroup.getRwSplitMode() == 0) {
oldDbGroup.stopPool("reload config, recycle read instance", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0), false);
} else {
oldDbGroup.startPool("reload config, init read instance", false);
}
if (physicalDbGroup.isUseless()) {
oldDbGroup.stopPool("reload config, recycle all instance", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0), true);
} else {
oldDbGroup.startPool("reload config, init all instance", true);
}
}
oldDbGroupMap.put(physicalDbGroup.getGroupName(), oldDbGroup);
} else if (itemType == ChangeItemType.PHYSICAL_DB_INSTANCE) {
if (changeItem.isAffectHeartbeat() || changeItem.isAffectConnectionPool()) {
PhysicalDbInstance physicalDbInstance = (PhysicalDbInstance) item;
PhysicalDbGroup physicalDbGroup = oldDbGroupMap.get(physicalDbInstance.getDbGroupConfig().getName());
PhysicalDbInstance oldDbInstance = physicalDbGroup.getAllDbInstanceMap().get(physicalDbInstance.getName());
oldDbInstance.stop("reload config, recycle old instance", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0));
oldDbInstance = null;
physicalDbInstance.init("reload config", true);
physicalDbGroup.setDbInstance(physicalDbInstance);
} else {
PhysicalDbInstance physicalDbInstance = (PhysicalDbInstance) item;
PhysicalDbGroup physicalDbGroup = oldDbGroupMap.get(physicalDbInstance.getDbGroupConfig().getName());
PhysicalDbInstance oldDbInstance = physicalDbGroup.getAllDbInstanceMap().get(physicalDbInstance.getName());
//copy base config
oldDbInstance.copyBaseInfo(physicalDbInstance);
if (changeItem.isAffectPoolCapacity()) {
oldDbInstance.updatePoolCapacity();
}
}
} else if (itemType == ChangeItemType.SHARDING_NODE) {
ShardingNode shardingNode = (ShardingNode) item;
ShardingNode oldShardingNode = this.shardingNodes.get(shardingNode.getName());
if (oldShardingNode != null && oldShardingNode.getDbGroup() != null) {
oldShardingNode.getDbGroup().removeSchema(oldShardingNode.getDatabase());
}
if (shardingNode.getDbGroup() != null) {
shardingNode.getDbGroup().addSchema(shardingNode.getDatabase());
}
}
}
private void addItem(Object item, ChangeItemType itemType, Map<String, PhysicalDbGroup> oldDbGroupMap, Map<String, ShardingNode> newShardingNodes, boolean isFullyConfigured) {
if (itemType == ChangeItemType.PHYSICAL_DB_GROUP) {
//add dbGroup+dbInstance
PhysicalDbGroup physicalDbGroup = (PhysicalDbGroup) item;
initDbGroup(physicalDbGroup, newShardingNodes, isFullyConfigured);
oldDbGroupMap.put(physicalDbGroup.getGroupName(), physicalDbGroup);
} else if (itemType == ChangeItemType.PHYSICAL_DB_INSTANCE) {
//add dbInstance
PhysicalDbInstance dbInstance = (PhysicalDbInstance) item;
PhysicalDbGroup physicalDbGroup = oldDbGroupMap.get(dbInstance.getDbGroupConfig().getName());
if (isFullyConfigured) {
dbInstance.init("reload config", true);
physicalDbGroup.setDbInstance(dbInstance);
} else {
LOGGER.info("dbGroup[" + dbInstance.getDbGroupConfig().getName() + "] is not fullyConfigured, so doing nothing");
}
} else if (itemType == ChangeItemType.SHARDING_NODE) {
ShardingNode shardingNode = (ShardingNode) item;
if (shardingNode.getDbGroup() != null) {
shardingNode.getDbGroup().addSchema(shardingNode.getDatabase());
}
}
}
public static void reloadSchema(PhysicalDbGroup dbGroup, Map<String, ShardingNode> newShardingNodes) {
String hostName = dbGroup.getGroupName();
// set schemas
ArrayList<String> dnSchemas = new ArrayList<>(30);
for (ShardingNode dn : newShardingNodes.values()) {
if (dn.getDbGroup().getGroupName().equals(hostName)) {
dn.setDbGroup(dbGroup);
dnSchemas.add(dn.getDatabase());
}
}
dbGroup.setSchemas(dnSchemas);
}
private static void initDbGroup(PhysicalDbGroup dbGroup, Map<String, ShardingNode> newShardingNodes, boolean fullyConfigured) {
reloadSchema(dbGroup, newShardingNodes);
if (fullyConfigured) {
dbGroup.init("reload config");
} else {
LOGGER.info("dbGroup[" + dbGroup.getGroupName() + "] is not fullyConfigured, so doing nothing");
}
}
private boolean reloadMetaData(List<Pair<String, String>> delTables, List<Pair<String, String>> reloadTables, List<String> delSchema, List<String> reloadSchema) {
boolean reloadResult = true;
if (delSchema.size() > 0) {
@@ -626,6 +854,10 @@ public class ServerConfig {
public RawJson getSequenceConfig() {
return sequenceConfig;
}
public boolean isLowerCase() {
return lowerCase;
}
}

View File

@@ -124,7 +124,7 @@ public class UserConverter {
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Xml to Shardings is :" + usersBean);
LOGGER.debug("Xml to Users is :" + usersBean);
}
if (null == usersBean.getUser() || !usersBean.getUser().stream().anyMatch(userObj -> userObj instanceof ManagerUser)) {
throw new ConfigException("user.xml contains at least one managerUser");
@@ -172,10 +172,10 @@ public class UserConverter {
UserName userName = new UserName(userConfig.getName(), tenant);
if (this.userConfigMap.containsKey(userName)) {
throw new ConfigException("User [" + userName + "] has already existed");
throw new ConfigException("User [" + userName.getFullName() + "] has already existed");
}
if (StringUtil.isEmpty(dbGroup)) {
throw new ConfigException("User [" + userName + "]'s dbGroup is empty");
throw new ConfigException("User [" + userName.getFullName() + "]'s dbGroup is empty");
}
WallProvider wallProvider = getWallProvider(blackListMap, problemReporter, blacklistStr, userName);
@@ -191,10 +191,10 @@ public class UserConverter {
UserName userName = new UserName(userConfig.getName(), tenant);
if (this.userConfigMap.containsKey(userName)) {
throw new ConfigException("User [" + userName + "] has already existed");
throw new ConfigException("User [" + userName.getFullName() + "] has already existed");
}
if (StringUtil.isEmpty(dbGroup)) {
throw new ConfigException("User [" + userName + "]'s dbGroup is empty");
throw new ConfigException("User [" + userName.getFullName() + "]'s dbGroup is empty");
}
WallProvider wallProvider = getWallProvider(blackListMap, problemReporter, blacklistStr, userName);
@@ -211,10 +211,10 @@ public class UserConverter {
UserName userName = new UserName(userConfig.getName(), tenant);
if (this.userConfigMap.containsKey(userName)) {
throw new ConfigException("User [" + userName + "] has already existed");
throw new ConfigException("User [" + userName.getFullName() + "] has already existed");
}
if (StringUtil.isEmpty(schemas)) {
throw new ConfigException("User [" + userName + "]'s schemas is empty");
throw new ConfigException("User [" + userName.getFullName() + "]'s schemas is empty");
}
String[] strArray = SplitUtil.split(schemas, ',', true);

View File

@@ -9,6 +9,7 @@ import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.mysql.VersionUtil;
import com.actiontech.dble.config.Isolations;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.meta.ReloadLogHelper;
import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler;
import com.actiontech.dble.sqlengine.OneTimeConnJob;
import com.actiontech.dble.sqlengine.SQLQueryResult;
@@ -79,6 +80,7 @@ public class GetAndSyncDbInstanceKeyVariables implements Callable<KeyVariables>
LOGGER.warn("test conn Interrupted:", e);
} finally {
lock.unlock();
ReloadLogHelper.debug("get key variables :{},dbInstance:{},result:{}", LOGGER, sql.toString(), ds, keyVariables);
}
return keyVariables;
}

View File

@@ -10,6 +10,8 @@ import com.actiontech.dble.config.model.db.type.DataBaseType;
import com.actiontech.dble.config.util.ConfigException;
import com.actiontech.dble.util.StringUtil;
import java.util.Arrays;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -39,18 +41,6 @@ public class DbGroupConfig {
this.disableHA = disableHA;
}
public DbGroupConfig(String name, int rwSplitMode, DbInstanceConfig writeInstanceConfig, DbInstanceConfig[] readInstanceConfigs, String heartbeatSQL, int delayThreshold, int heartbeatTimeout, int errorRetryCount, boolean disableHA) {
this.name = name;
this.rwSplitMode = rwSplitMode;
this.writeInstanceConfig = writeInstanceConfig;
this.readInstanceConfigs = readInstanceConfigs;
this.heartbeatSQL = heartbeatSQL;
this.delayThreshold = delayThreshold;
this.heartbeatTimeout = heartbeatTimeout;
this.errorRetryCount = errorRetryCount;
this.disableHA = disableHA;
}
public int getDelayThreshold() {
return delayThreshold;
}
@@ -140,4 +130,37 @@ public class DbGroupConfig {
public DataBaseType instanceDatabaseType() {
return writeInstanceConfig.getDataBaseType();
}
public boolean equalsBaseInfo(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DbGroupConfig that = (DbGroupConfig) o;
return rwSplitMode == that.rwSplitMode &&
isShowSlaveSql == that.isShowSlaveSql &&
isSelectReadOnlySql == that.isSelectReadOnlySql &&
delayThreshold == that.delayThreshold &&
heartbeatTimeout == that.heartbeatTimeout &&
errorRetryCount == that.errorRetryCount &&
disableHA == that.disableHA &&
Objects.equals(name, that.name) &&
Objects.equals(heartbeatSQL, that.heartbeatSQL);
}
@Override
public String toString() {
return "DbGroupConfig{" +
"name='" + name + '\'' +
", rwSplitMode=" + rwSplitMode +
", writeInstanceConfig=" + writeInstanceConfig +
", readInstanceConfigs=" + Arrays.toString(readInstanceConfigs) +
", heartbeatSQL='" + heartbeatSQL + '\'' +
", isShowSlaveSql=" + isShowSlaveSql +
", isSelectReadOnlySql=" + isSelectReadOnlySql +
", delayThreshold=" + delayThreshold +
", heartbeatTimeout=" + heartbeatTimeout +
", errorRetryCount=" + errorRetryCount +
", disableHA=" + disableHA +
'}';
}
}

View File

@@ -6,6 +6,9 @@
package com.actiontech.dble.config.model.db;
import com.actiontech.dble.config.model.db.type.DataBaseType;
import com.actiontech.dble.util.StringUtil;
import java.util.Objects;
public class DbInstanceConfig {
@@ -172,4 +175,34 @@ public class DbInstanceConfig {
return "DbInstanceConfig [hostName=" + instanceName + ", url=" + url + "]";
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
DbInstanceConfig that = (DbInstanceConfig) o;
return port == that.port &&
readWeight == that.readWeight &&
disabled == that.disabled &&
primary == that.primary &&
maxCon == that.maxCon &&
minCon == that.minCon &&
usingDecrypt == that.usingDecrypt &&
dataBaseType.equals(((DbInstanceConfig) o).dataBaseType) &&
Objects.equals(instanceName, that.instanceName) &&
Objects.equals(ip, that.ip) &&
Objects.equals(url, that.url) &&
Objects.equals(user, that.user) &&
Objects.equals(password, that.password) &&
Objects.equals(id, that.id) &&
Objects.equals(poolConfig, that.poolConfig) &&
StringUtil.equalsIgnoreCase(dbDistrict, that.getDbDistrict()) &&
StringUtil.equalsIgnoreCase(dbDataCenter, that.getDbDataCenter());
}
@Override
public int hashCode() {
return Objects.hash(instanceName, ip, port, url, user, password, readWeight,
id, disabled, primary, maxCon, minCon, poolConfig, usingDecrypt,
dataBaseType, dbDistrict, dbDataCenter);
}
}

View File

@@ -11,6 +11,7 @@ import com.actiontech.dble.services.manager.information.ManagerSchemaInfo;
import com.actiontech.dble.util.StringUtil;
import java.sql.SQLException;
import java.util.Objects;
public class ManagerUserConfig extends UserConfig {
private final boolean readOnly;
@@ -52,6 +53,20 @@ public class ManagerUserConfig extends UserConfig {
return 0;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
ManagerUserConfig that = (ManagerUserConfig) o;
return readOnly == that.readOnly;
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), readOnly);
}
public boolean equalsBaseInfo(ManagerUserConfig managerUserConfig) {
return super.equalsBaseInfo(managerUserConfig) &&
this.readOnly == managerUserConfig.isReadOnly();

View File

@@ -11,6 +11,7 @@ import com.actiontech.dble.config.helper.ShowDatabaseHandler;
import com.actiontech.dble.util.StringUtil;
import com.alibaba.druid.wall.WallProvider;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -48,4 +49,17 @@ public class RwSplitUserConfig extends SingleDbGroupUserConfig {
return exist ? 0 : ErrorCode.ER_BAD_DB_ERROR;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
RwSplitUserConfig that = (RwSplitUserConfig) o;
return Objects.equals(dbGroup, that.dbGroup);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), dbGroup);
}
}

View File

@@ -8,6 +8,8 @@ package com.actiontech.dble.config.model.user;
import com.actiontech.dble.util.StringUtil;
import com.alibaba.druid.wall.WallProvider;
import java.util.Objects;
public abstract class ServerUserConfig extends UserConfig {
private final String tenant;
private final WallProvider blacklist;
@@ -26,6 +28,20 @@ public abstract class ServerUserConfig extends UserConfig {
return blacklist;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
ServerUserConfig that = (ServerUserConfig) o;
return Objects.equals(tenant, that.tenant) &&
isEquals(this.blacklist, that.blacklist);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), tenant, blacklist);
}
public boolean equalsBaseInfo(ServerUserConfig serverUserConfig) {
return super.equalsBaseInfo(serverUserConfig) &&

View File

@@ -14,6 +14,7 @@ import com.alibaba.druid.wall.WallProvider;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
public class ShardingUserConfig extends ServerUserConfig {
@@ -59,7 +60,7 @@ public class ShardingUserConfig extends ServerUserConfig {
throw new SQLException(msg, "42S02", ErrorCode.ER_NO_SUCH_TABLE);
}
if (!schemas.contains(schemaInfo.getSchema())) {
String msg = "Access denied for user '" + user + "' to database '" + schemaInfo.getSchema() + "'";
String msg = "Access denied for user '" + user.getFullName() + "' to database '" + schemaInfo.getSchema() + "'";
throw new SQLException(msg, "HY000", ErrorCode.ER_DBACCESS_DENIED_ERROR);
}
schemaInfo.setSchemaConfig(schemaConfig);
@@ -98,4 +99,19 @@ public class ShardingUserConfig extends ServerUserConfig {
}
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (!super.equals(o)) return false;
ShardingUserConfig that = (ShardingUserConfig) o;
return readOnly == that.readOnly &&
Objects.equals(schemas, that.schemas) &&
isEquals(privilegesConfig, that.privilegesConfig);
}
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), readOnly, schemas, privilegesConfig);
}
}

View File

@@ -12,6 +12,7 @@ import com.actiontech.dble.util.StringUtil;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
public class UserConfig {
@@ -90,6 +91,23 @@ public class UserConfig {
return 0;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
UserConfig that = (UserConfig) o;
return isEncrypt == that.isEncrypt &&
maxCon == that.maxCon &&
Objects.equals(name, that.name) &&
Objects.equals(password, that.password) &&
Objects.equals(whiteIPs, that.whiteIPs);
}
@Override
public int hashCode() {
return Objects.hash(name, password, isEncrypt, whiteIPs, maxCon);
}
public boolean equalsBaseInfo(UserConfig userConfig) {
return StringUtil.equalsWithEmpty(this.name, userConfig.getName()) &&
StringUtil.equalsWithEmpty(this.password, userConfig.getPassword()) &&

View File

@@ -28,6 +28,13 @@ public class UserName {
@Override
public String toString() {
return "UserName{" +
"name='" + name + '\'' +
", tenant='" + tenant + '\'' +
'}';
}
public String getFullName() {
if (tenant == null) {
return name;
}

View File

@@ -5,10 +5,7 @@
package com.actiontech.dble.config.model.user;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.*;
/**
@@ -52,8 +49,9 @@ public class UserPrivilegesConfig {
schemaPrivileges = newSchemaPrivileges;
}
public boolean equalsBaseInfo(UserPrivilegesConfig userPrivilegesConfig) {
if (this == userPrivilegesConfig) return true;
if (userPrivilegesConfig == null || getClass() != userPrivilegesConfig.getClass()) return false;
boolean equalTableInfo1 = this.schemaPrivileges.entrySet().stream().allMatch(schemaPrivilegeEntry -> null != userPrivilegesConfig.getSchemaPrivilege(schemaPrivilegeEntry.getKey()) && userPrivilegesConfig.getSchemaPrivilege(schemaPrivilegeEntry.getKey()).equalsBaseInfo(schemaPrivilegeEntry.getValue()));
boolean equalTableInfo2 = userPrivilegesConfig.getSchemaPrivileges().entrySet().stream().allMatch(schemaPrivilegeEntry -> null != this.schemaPrivileges.get(schemaPrivilegeEntry.getKey()) && this.schemaPrivileges.get(schemaPrivilegeEntry.getKey()).equalsBaseInfo(schemaPrivilegeEntry.getValue()));
return this.check == userPrivilegesConfig.isCheck() &&

View File

@@ -5,15 +5,22 @@
*/
package com.actiontech.dble.config.util;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.datasource.ShardingNode;
import com.actiontech.dble.backend.mysql.VersionUtil;
import com.actiontech.dble.config.DbleTempConfig;
import com.actiontech.dble.config.helper.GetAndSyncDbInstanceKeyVariables;
import com.actiontech.dble.config.helper.KeyVariables;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.config.model.db.type.DataBaseType;
import com.actiontech.dble.services.manager.response.ChangeItem;
import com.actiontech.dble.services.manager.response.ChangeItemType;
import com.actiontech.dble.services.manager.response.ChangeType;
import com.actiontech.dble.singleton.TraceManager;
import com.actiontech.dble.util.StringUtil;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.util.Strings;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
@@ -22,6 +29,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public final class ConfigUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigUtil.class);
@@ -68,14 +76,93 @@ public final class ConfigUtil {
}
}
private static String[] getShardingNodeSchemasOfDbGroup(String dbGroup, Map<String, ShardingNode> shardingNodeMap) {
private static ArrayList<String> getShardingNodeSchemasOfDbGroup(String dbGroup, Map<String, ShardingNode> shardingNodeMap) {
ArrayList<String> schemaList = new ArrayList<>(30);
for (ShardingNode dn : shardingNodeMap.values()) {
if (dn.getDbGroup() != null && dn.getDbGroup().getGroupName().equals(dbGroup)) {
schemaList.add(dn.getDatabase());
}
}
return schemaList.toArray(new String[schemaList.size()]);
return schemaList;
}
public static String getAndSyncKeyVariables(List<ChangeItem> changeItemList, boolean needSync) throws Exception {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("sync-key-variables");
try {
String msg = null;
List<ChangeItem> needCheckItemList = changeItemList.stream()
//add dbInstance or add dbGroup or (update dbInstance and need testConn)
.filter(changeItem -> ((changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_INSTANCE || changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_GROUP) &&
changeItem.getType() == ChangeType.ADD) ||
(changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_INSTANCE && changeItem.getType() == ChangeType.UPDATE && changeItem.isAffectTestConn()))
.collect(Collectors.toList());
if (changeItemList.size() == 0 || needCheckItemList == null || needCheckItemList.isEmpty()) {
//with no dbGroups, do not check the variables
return null;
}
Map<String, Future<KeyVariables>> keyVariablesTaskMap = Maps.newHashMap();
getAndSyncKeyVariablesForDataSources(needCheckItemList, keyVariablesTaskMap, needSync);
boolean lowerCase = false;
boolean isFirst = true;
Set<String> firstGroup = new HashSet<>();
Set<String> secondGroup = new HashSet<>();
int minNodePacketSize = Integer.MAX_VALUE;
int minVersion = Integer.parseInt(SystemConfig.getInstance().getFakeMySQLVersion().substring(0, 1));
for (Map.Entry<String, Future<KeyVariables>> entry : keyVariablesTaskMap.entrySet()) {
String dataSourceName = entry.getKey();
Future<KeyVariables> future = entry.getValue();
KeyVariables keyVariables = future.get();
if (keyVariables != null) {
if (isFirst) {
lowerCase = keyVariables.isLowerCase();
if (lowerCase != DbleServer.getInstance().getConfig().isLowerCase()) {
secondGroup.add(dataSourceName);
}
isFirst = false;
firstGroup.add(dataSourceName);
} else if (keyVariables.isLowerCase() != lowerCase) {
secondGroup.add(dataSourceName);
}
minNodePacketSize = Math.min(minNodePacketSize, keyVariables.getMaxPacketSize());
int version = Integer.parseInt(keyVariables.getVersion().substring(0, 1));
minVersion = Math.min(minVersion, version);
}
}
if (minNodePacketSize < SystemConfig.getInstance().getMaxPacketSize() + KeyVariables.MARGIN_PACKET_SIZE) {
SystemConfig.getInstance().setMaxPacketSize(minNodePacketSize - KeyVariables.MARGIN_PACKET_SIZE);
msg = "dble's maxPacketSize will be set to (the min of all dbGroup's max_allowed_packet) - " + KeyVariables.MARGIN_PACKET_SIZE + ":" + (minNodePacketSize - KeyVariables.MARGIN_PACKET_SIZE);
LOGGER.warn(msg);
}
if (minVersion < Integer.parseInt(SystemConfig.getInstance().getFakeMySQLVersion().substring(0, 1))) {
throw new ConfigException("the dble version[=" + SystemConfig.getInstance().getFakeMySQLVersion() + "] cannot be higher than the minimum version of the backend mysql node,pls check the backend mysql node.");
}
DbleTempConfig.getInstance().setLowerCase(lowerCase);
if (secondGroup.size() != 0) {
// if all datasoure's lower case are not equal, throw exception
StringBuilder sb = new StringBuilder("The values of lower_case_table_names for backend MySQLs are different.");
String firstGroupValue;
String secondGroupValue;
if (lowerCase) {
firstGroupValue = " not 0 :";
secondGroupValue = " 0 :";
} else {
firstGroupValue = " 0 :";
secondGroupValue = " not 0 :";
}
sb.append("These MySQL's value is");
sb.append(firstGroupValue);
sb.append(Strings.join(firstGroup, ','));
sb.append(".And these MySQL's value is");
sb.append(secondGroupValue);
sb.append(Strings.join(secondGroup, ','));
sb.append(".");
throw new IOException(sb.toString());
}
return msg;
} finally {
TraceManager.finishSpan(traceObject);
}
}
public static String getAndSyncKeyVariables(Map<String, PhysicalDbGroup> dbGroups, boolean needSync) throws Exception {
@@ -91,10 +178,15 @@ public final class ConfigUtil {
clickHouseDbGroups.put(k, v);
}
});
sb.append(getMysqlSyncKeyVariables(mysqlDbGroups, needSync));
sb.append(getClickHouseSyncKeyVariables(clickHouseDbGroups, needSync));
return sb.toString();
String mysqlSyncKeyVariables = getMysqlSyncKeyVariables(mysqlDbGroups, needSync);
if (!StringUtil.isEmpty(mysqlSyncKeyVariables)) {
sb.append(mysqlSyncKeyVariables);
}
String clickHouseSyncKeyVariables = getClickHouseSyncKeyVariables(clickHouseDbGroups, needSync);
if (!StringUtil.isEmpty(clickHouseSyncKeyVariables)) {
sb.append(clickHouseSyncKeyVariables);
}
return sb.length() == 0 ? null : sb.toString();
} finally {
TraceManager.finishSpan(traceObject);
}
@@ -145,6 +237,7 @@ public final class ConfigUtil {
if (minVersion < VersionUtil.getMajorVersion(SystemConfig.getInstance().getFakeMySQLVersion())) {
throw new ConfigException("the dble version[=" + SystemConfig.getInstance().getFakeMySQLVersion() + "] cannot be higher than the minimum version of the backend mysql node,pls check the backend mysql node.");
}
DbleTempConfig.getInstance().setLowerCase(lowerCase);
if (secondGroup.size() != 0) {
// if all datasoure's lower case are not equal, throw exception
StringBuilder sb = new StringBuilder("The values of lower_case_table_names for backend MySQLs are different.");
@@ -239,6 +332,42 @@ public final class ConfigUtil {
}
private static void getAndSyncKeyVariablesForDataSources(List<ChangeItem> changeItemList, Map<String, Future<KeyVariables>> keyVariablesTaskMap,
boolean needSync) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(changeItemList.size());
for (ChangeItem changeItem : changeItemList) {
Object item = changeItem.getItem();
if (changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_INSTANCE) {
PhysicalDbInstance ds = (PhysicalDbInstance) item;
if (ds.isDisabled() || !ds.isTestConnSuccess() || ds.isFakeNode()) {
continue;
}
getKeyVariablesForDataSource(service, ds, ds.getDbGroupConfig().getName(), keyVariablesTaskMap, needSync);
} else if (changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_GROUP) {
PhysicalDbGroup dbGroup = (PhysicalDbGroup) item;
for (PhysicalDbInstance ds : dbGroup.getAllDbInstanceMap().values()) {
if (ds.isDisabled() || !ds.isTestConnSuccess() || ds.isFakeNode()) {
continue;
}
getKeyVariablesForDataSource(service, ds, ds.getDbGroupConfig().getName(), keyVariablesTaskMap, needSync);
}
}
}
service.shutdown();
int i = 0;
while (!service.awaitTermination(100, TimeUnit.MILLISECONDS)) {
if (LOGGER.isDebugEnabled()) {
if (i == 0) {
LOGGER.info("wait to get all dbInstances's get key variable");
}
i++;
if (i == 100) { //log every 10 seconds
i = 0;
}
}
}
}
private static void getAndSyncKeyVariablesForDataSources(Map<String, PhysicalDbGroup> dbGroups, Map<String, Future<KeyVariables>> keyVariablesTaskMap, boolean needSync) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(dbGroups.size());
for (Map.Entry<String, PhysicalDbGroup> entry : dbGroups.entrySet()) {
@@ -257,7 +386,7 @@ public final class ConfigUtil {
while (!service.awaitTermination(100, TimeUnit.MILLISECONDS)) {
if (LOGGER.isDebugEnabled()) {
if (i == 0) {
LOGGER.debug("wait to get all dbInstances's get key variable");
LOGGER.info("wait to get all dbInstances's get key variable");
}
i++;
if (i == 100) { //log every 10 seconds

View File

@@ -42,7 +42,7 @@ public class SlowQueryLogEntry {
sb.append("# User@Host: ");
sb.append(user);
sb.append("[");
sb.append(user);
sb.append(user.getFullName());
sb.append("] @ [");
sb.append(clientIp);
sb.append("] Id: ");

View File

@@ -54,6 +54,39 @@ public class ReloadLogHelper {
LOGGER.info(getStage() + message);
}
public static void debug(String message, Logger logger) {
if (!logger.isDebugEnabled()) {
return;
}
if (ReloadManager.getReloadInstance().getStatus() != null) {
logger.debug(ReloadManager.getReloadInstance().getStatus().getLogStage() + message);
} else {
logger.debug(message);
}
}
public static void debug(String message, Logger logger, Object val) {
if (!logger.isDebugEnabled()) {
return;
}
if (ReloadManager.getReloadInstance().getStatus() != null) {
logger.debug(ReloadManager.getReloadInstance().getStatus().getLogStage() + message, val);
} else {
logger.debug(message, val);
}
}
public static void debug(String message, Logger logger, Object... val) {
if (!logger.isDebugEnabled()) {
return;
}
if (ReloadManager.getReloadInstance().getStatus() != null) {
logger.debug(ReloadManager.getReloadInstance().getStatus().getLogStage() + message, val);
} else {
logger.debug(message, val);
}
}
public static void warn(String message, Logger logger) {
if (ReloadManager.getReloadInstance().getStatus() != null) {
logger.info(ReloadManager.getReloadInstance().getStatus().getLogStage() + message);
@@ -79,7 +112,6 @@ public class ReloadLogHelper {
}
private String getStage() {
return reload == null ? "" : reload.getLogStage();
}

View File

@@ -7,6 +7,7 @@ package com.actiontech.dble.net;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.xa.stage.XAStage;
import com.actiontech.dble.backend.mysql.xa.TxState;
import com.actiontech.dble.buffer.BufferPool;
@@ -48,6 +49,7 @@ public final class IOProcessor {
// after reload @@config_all ,old back ends connections stored in backends_old
public static final ConcurrentLinkedQueue<PooledConnection> BACKENDS_OLD = new ConcurrentLinkedQueue<>();
public static final ConcurrentLinkedQueue<PhysicalDbGroup> BACKENDS_OLD_GROUP = new ConcurrentLinkedQueue<>();
public static final ConcurrentLinkedQueue<PhysicalDbInstance> BACKENDS_OLD_INSTANCE = new ConcurrentLinkedQueue<>();
private AtomicInteger frontEndsLength = new AtomicInteger(0);

View File

@@ -24,8 +24,6 @@ public class GMSslWrapper extends OpenSSLWrapper {
public boolean initContext() {
try {
Security.insertProviderAt((Provider) Class.forName("cn.gmssl.jce.provider.GMJCE").newInstance(), 1);
Security.insertProviderAt((Provider) Class.forName("cn.gmssl.jsse.provider.GMJSSE").newInstance(), 2);
String pfxfile = SystemConfig.getInstance().getGmsslBothPfx();
String pwd = SystemConfig.getInstance().getGmsslBothPfxPwd();
@@ -42,6 +40,8 @@ public class GMSslWrapper extends OpenSSLWrapper {
LOGGER.warn("Neither [gmsslRcaPem] nor [gmsslOcaPem] are empty.");
return false;
}
Security.insertProviderAt((Provider) Class.forName("cn.gmssl.jce.provider.GMJCE").newInstance(), 1);
Security.insertProviderAt((Provider) Class.forName("cn.gmssl.jsse.provider.GMJSSE").newInstance(), 2);
// load key pair
KeyManager[] kms = createKeyManagers(pfxfile, pwd);

View File

@@ -41,7 +41,7 @@ public final class FieldListHandler {
ShardingUserConfig user = (ShardingUserConfig) (DbleServer.getInstance().getConfig().getUsers().get(service.getUser()));
if (user == null || !user.getSchemas().contains(cSchema)) {
service.writeErrMessage("42000", "Access denied for user '" + service.getUser() + "' to database '" + cSchema + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR);
service.writeErrMessage("42000", "Access denied for user '" + service.getUser().getFullName() + "' to database '" + cSchema + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR);
return;
}

View File

@@ -108,7 +108,7 @@ public class SelectInformationSchemaColumnsHandler {
ShardingUserConfig userConfig = (ShardingUserConfig) (DbleServer.getInstance().getConfig().getUsers().get(shardingService.getUser()));
if (userConfig == null || !userConfig.getSchemas().contains(cSchema)) {
shardingService.writeErrMessage("42000", "Access denied for user '" + shardingService.getUser() + "' to database '" + cSchema + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR);
shardingService.writeErrMessage("42000", "Access denied for user '" + shardingService.getUser().getFullName() + "' to database '" + cSchema + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR);
return;
}

View File

@@ -29,7 +29,7 @@ public final class UseHandler {
}
ShardingUserConfig userConfig = service.getUserConfig();
if (!userConfig.getSchemas().contains(schema)) {
String msg = "Access denied for user '" + service.getUser() + "' to database '" + schema + "'";
String msg = "Access denied for user '" + service.getUser().getFullName() + "' to database '" + schema + "'";
service.writeErrMessage(ErrorCode.ER_DBACCESS_DENIED_ERROR, msg);
return;
}

View File

@@ -38,7 +38,7 @@ public final class FieldList {
ShardingUserConfig user = (ShardingUserConfig) (DbleServer.getInstance().getConfig().getUsers().get(service.getUser()));
if (user == null || !user.getSchemas().contains(cSchema)) {
service.writeErrMessage("42000", "Access denied for user '" + service.getUser() + "' to database '" + cSchema + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR);
service.writeErrMessage("42000", "Access denied for user '" + service.getUser().getFullName() + "' to database '" + cSchema + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR);
return;
}

View File

@@ -54,7 +54,7 @@ public final class SelectCurrentUser implements InnerFuncResponse {
}
private static byte[] getUser(ShardingService service) {
return StringUtil.encode(service.getUser() + "@%", service.getCharset().getResults());
return StringUtil.encode(service.getUser().getFullName() + "@%", service.getCharset().getResults());
}
public static byte setCurrentPacket(ShardingService service) {

View File

@@ -54,7 +54,7 @@ public final class SelectUser implements InnerFuncResponse {
}
private static byte[] getUser(ShardingService service) {
return StringUtil.encode(service.getUser().toString() + '@' + service.getConnection().getHost(), service.getCharset().getResults());
return StringUtil.encode(service.getUser().getFullName() + '@' + service.getConnection().getHost(), service.getCharset().getResults());
}

View File

@@ -192,7 +192,7 @@ public final class ShowDbleProcessList {
// BconnID
row.add(threadId == null ? StringUtil.encode(NULL_VAL, charset) : LongUtil.toBytes(threadId));
// User
row.add(StringUtil.encode(fc.getFrontEndService().getUser().toString(), charset));
row.add(StringUtil.encode(fc.getFrontEndService().getUser().getFullName(), charset));
// Front_Host
row.add(StringUtil.encode(fc.getHost() + ":" + fc.getLocalPort(), charset));
// db

View File

@@ -78,7 +78,7 @@ public final class ShowTables {
ShardingUserConfig user = (ShardingUserConfig) (DbleServer.getInstance().getConfig().getUsers().get(shardingService.getUser()));
if (user == null || !user.getSchemas().contains(cSchema)) {
shardingService.writeErrMessage("42000", "Access denied for user '" + shardingService.getUser() + "' to database '" + cSchema + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR);
shardingService.writeErrMessage("42000", "Access denied for user '" + shardingService.getUser().getFullName() + "' to database '" + cSchema + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR);
return;
}
//if sharding has default single node ,show tables will send to backend

View File

@@ -7,6 +7,7 @@ package com.actiontech.dble.server.variables;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.meta.ReloadLogHelper;
import com.actiontech.dble.singleton.TraceManager;
import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler;
import com.actiontech.dble.sqlengine.OneTimeConnJob;
@@ -30,6 +31,7 @@ public class VarsExtractorHandler {
private Lock lock;
private Condition done;
private Map<String, PhysicalDbGroup> dbGroups;
private PhysicalDbInstance physicalDbInstance;
private volatile SystemVariables systemVariables = null;
public VarsExtractorHandler(Map<String, PhysicalDbGroup> dbGroups) {
@@ -39,13 +41,23 @@ public class VarsExtractorHandler {
this.done = lock.newCondition();
}
public VarsExtractorHandler(PhysicalDbInstance physicalDbInstance) {
this.physicalDbInstance = physicalDbInstance;
this.extracting = false;
this.lock = new ReentrantLock();
this.done = lock.newCondition();
}
public SystemVariables execute() {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("get-system-variables-from-backend");
try {
OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(MYSQL_SHOW_VARIABLES_COLS, new MysqlVarsListener(this));
PhysicalDbInstance ds = getPhysicalDbInstance();
if (ds != null) {
OneTimeConnJob sqlJob = new OneTimeConnJob(MYSQL_SHOW_VARIABLES, null, resultHandler, ds);
if (null == this.physicalDbInstance) {
this.physicalDbInstance = getPhysicalDbInstance();
}
if (physicalDbInstance != null) {
OneTimeConnJob sqlJob = new OneTimeConnJob(MYSQL_SHOW_VARIABLES, null, resultHandler, physicalDbInstance);
sqlJob.run();
waitDone();
} else {
@@ -54,11 +66,16 @@ public class VarsExtractorHandler {
}
return systemVariables;
} finally {
ReloadLogHelper.debug("get system variables :{},dbInstance:{},result:{}", LOGGER, MYSQL_SHOW_VARIABLES, physicalDbInstance, systemVariables);
this.physicalDbInstance = null;
TraceManager.finishSpan(traceObject);
}
}
private PhysicalDbInstance getPhysicalDbInstance() {
if (dbGroups == null || dbGroups.isEmpty()) {
return null;
}
PhysicalDbInstance ds = null;
List<PhysicalDbGroup> dbGroupList = dbGroups.values().stream().filter(dbGroup -> dbGroup.getDbGroupConfig().existInstanceProvideVars()).collect(Collectors.toList());
for (PhysicalDbGroup dbGroup : dbGroupList) {
@@ -126,7 +143,7 @@ public class VarsExtractorHandler {
}
private void tryInitVars() {
if (dbGroups.isEmpty()) {
if (dbGroups == null || dbGroups.isEmpty()) {
return;
}
List<PhysicalDbGroup> dbGroupList = dbGroups.values().stream().filter(dbGroup -> dbGroup.getDbGroupConfig().existInstanceProvideVars()).collect(Collectors.toList());

View File

@@ -74,6 +74,7 @@ public final class ManagerSchemaInfo {
registerTable(new DbleFrontConnectionsAssociateThread());
registerTable(new DbleBackendConnectionsAssociateThread());
registerTable(new DbleClusterRenewThread());
registerTable(new RecyclingResource());
}
private void initViews() {

View File

@@ -221,7 +221,7 @@ public class DbleDbGroup extends ManagerWritableTable {
DbleRwSplitEntry dbleRwSplitEntry = (DbleRwSplitEntry) ManagerSchemaInfo.getInstance().getTables().get(DbleRwSplitEntry.TABLE_NAME);
boolean existUser = dbleRwSplitEntry.getRows().stream().anyMatch(entry -> entry.get(DbleRwSplitEntry.COLUMN_DB_GROUP).equals(affectPk.get(COLUMN_NAME)));
if (existUser) {
throw new ConfigException("Cannot delete or update a parent row: a foreign key constraint fails `dble_db_user`(`db_group`) REFERENCES `dble_db_group`(`name`)");
throw new ConfigException("Cannot delete or update a parent row: a foreign key constraint fails `dble_rw_split_entry`(`db_group`) REFERENCES `dble_db_group`(`name`)");
}
//check instance-group
DbleDbInstance dbleDbInstance = (DbleDbInstance) ManagerSchemaInfo.getInstance().getTables().get(DbleDbInstance.TABLE_NAME);

View File

@@ -72,7 +72,7 @@ public class DbleFlowControl extends ManagerBaseTable {
LinkedHashMap<String, String> row = Maps.newLinkedHashMap();
row.put(COLUMN_CONNECTION_TYPE, "ServerConnection");
row.put(COLUMN_CONNECTION_ID, Long.toString(fc.getId()));
row.put(COLUMN_CONNECTION_INFO, fc.getHost() + ":" + fc.getLocalPort() + "/" + ((FrontendService) service).getSchema() + " user = " + ((FrontendService) service).getUser());
row.put(COLUMN_CONNECTION_INFO, fc.getHost() + ":" + fc.getLocalPort() + "/" + ((FrontendService) service).getSchema() + " user = " + ((FrontendService) service).getUser().getFullName());
row.put(COLUMN_WRITING_QUEUE_BYTES, Integer.toString(size));
row.put(COLUMN_READING_QUEUE_BYTES, null);
row.put(COLUMN_FLOW_CONTROLLED, fc.isFrontWriteFlowControlled() ? "true" : "false");

View File

@@ -153,7 +153,7 @@ public class ProcessList extends ManagerBaseTable {
// BconnID
row.put(COLUMN_MYSQL_ID, conn.getThreadId() + "");
// User
row.put(COLUMN_USER, frontConn.getFrontEndService().getUser().toString());
row.put(COLUMN_USER, frontConn.getFrontEndService().getUser().getFullName());
// Front_Host
row.put(COLUMN_FRONT_HOST, frontConn.getHost() + ":" + frontConn.getLocalPort());
// time
@@ -168,7 +168,7 @@ public class ProcessList extends ManagerBaseTable {
// Front_Id
row.put(COLUMN_FRONT_ID, frontConn.getId() + "");
// User
row.put(COLUMN_USER, frontConn.getFrontEndService().getUser().toString());
row.put(COLUMN_USER, frontConn.getFrontEndService().getUser().getFullName());
// Front_Host
row.put(COLUMN_FRONT_HOST, frontConn.getHost() + ":" + frontConn.getLocalPort());
// time

View File

@@ -0,0 +1,65 @@
/*
* Copyright (C) 2016-2022 ActionTech.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.services.manager.information.tables;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.config.Fields;
import com.actiontech.dble.meta.ColumnMeta;
import com.actiontech.dble.net.IOProcessor;
import com.actiontech.dble.net.connection.PooledConnection;
import com.actiontech.dble.services.manager.information.ManagerBaseTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.LinkedHashMap;
import java.util.List;
public class RecyclingResource extends ManagerBaseTable {
private static final String TABLE_NAME = "recycling_resource";
private static final String COLUMN_TYPE = "type";
private static final String COLUMN_INFO = "info";
public RecyclingResource() {
super(TABLE_NAME, 2);
}
@Override
protected void initColumnAndType() {
columns.put(COLUMN_TYPE, new ColumnMeta(COLUMN_TYPE, "varchar(64)", false));
columnsType.put(COLUMN_TYPE, Fields.FIELD_TYPE_VAR_STRING);
columns.put(COLUMN_INFO, new ColumnMeta(COLUMN_INFO, "text", false));
columnsType.put(COLUMN_INFO, Fields.FIELD_TYPE_STRING);
}
@Override
protected List<LinkedHashMap<String, String>> getRows() {
List<LinkedHashMap<String, String>> result = Lists.newArrayList();
//dbGroup
for (PhysicalDbGroup dbGroup : IOProcessor.BACKENDS_OLD_GROUP) {
LinkedHashMap<String, String> map = Maps.newLinkedHashMap();
map.put(COLUMN_TYPE, "dbGroup");
map.put(COLUMN_INFO, dbGroup.toString());
result.add(map);
}
//dbInstance
for (PhysicalDbInstance dbInstance : IOProcessor.BACKENDS_OLD_INSTANCE) {
LinkedHashMap<String, String> map = Maps.newLinkedHashMap();
map.put(COLUMN_TYPE, "dbInstance");
map.put(COLUMN_INFO, dbInstance.toString());
result.add(map);
}
//backendConnection
for (PooledConnection connection : IOProcessor.BACKENDS_OLD) {
LinkedHashMap<String, String> map = Maps.newLinkedHashMap();
map.put(COLUMN_TYPE, "backendConnection");
map.put(COLUMN_INFO, connection.toString());
result.add(map);
}
return result;
}
}

View File

@@ -0,0 +1,92 @@
package com.actiontech.dble.services.manager.response;
public class ChangeItem {
private ChangeType type;
private Object item;
private ChangeItemType itemType;
private boolean affectHeartbeat;
private boolean affectConnectionPool;
private boolean affectTestConn;
private boolean affectEntryDbGroup;
//connection pool capacity
private boolean affectPoolCapacity;
public ChangeItem(ChangeType type, Object item, ChangeItemType itemType) {
this.type = type;
this.item = item;
this.itemType = itemType;
}
public ChangeItemType getItemType() {
return itemType;
}
public boolean isAffectHeartbeat() {
return affectHeartbeat;
}
public void setAffectHeartbeat(boolean affectHeartbeat) {
this.affectHeartbeat = affectHeartbeat;
}
public boolean isAffectConnectionPool() {
return affectConnectionPool;
}
public void setAffectConnectionPool(boolean affectConnectionPool) {
this.affectConnectionPool = affectConnectionPool;
}
public boolean isAffectPoolCapacity() {
return affectPoolCapacity;
}
public void setAffectPoolCapacity(boolean affectPoolCapacity) {
this.affectPoolCapacity = affectPoolCapacity;
}
public boolean isAffectTestConn() {
return affectTestConn;
}
public void setAffectTestConn(boolean affectTestConn) {
this.affectTestConn = affectTestConn;
}
public boolean isAffectEntryDbGroup() {
return affectEntryDbGroup;
}
public void setAffectEntryDbGroup(boolean affectEntryDbGroup) {
this.affectEntryDbGroup = affectEntryDbGroup;
}
public ChangeType getType() {
return type;
}
public void setType(ChangeType type) {
this.type = type;
}
public Object getItem() {
return item;
}
public void setItem(Object item) {
this.item = item;
}
@Override
public String toString() {
return "ChangeItem{" +
"type=" + type +
", item=" + item +
", affectHeartbeat=" + affectHeartbeat +
", affectConnectionPool=" + affectConnectionPool +
", affectTestConn=" + affectTestConn +
", affectEntryDbGroup=" + affectEntryDbGroup +
", affectPoolCapacity=" + affectPoolCapacity +
'}';
}
}

View File

@@ -0,0 +1,5 @@
package com.actiontech.dble.services.manager.response;
public enum ChangeItemType {
PHYSICAL_DB_GROUP(), PHYSICAL_DB_INSTANCE(), SHARDING_NODE(), USERNAME();
}

View File

@@ -0,0 +1,5 @@
package com.actiontech.dble.services.manager.response;
public enum ChangeType {
ADD(), UPDATE(), DELETE();
}

View File

@@ -133,7 +133,7 @@ public final class FlowControlList {
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
row.add(StringUtil.encode("ServerConnection", service.getCharset().getResults()));
row.add(LongUtil.toBytes(fc.getId()));
row.add(StringUtil.encode(fc.getHost() + ":" + fc.getLocalPort() + "/" + ((FrontendService) fcService).getSchema() + " user = " + ((FrontendService) fcService).getUser(), service.getCharset().getResults()));
row.add(StringUtil.encode(fc.getHost() + ":" + fc.getLocalPort() + "/" + ((FrontendService) fcService).getSchema() + " user = " + ((FrontendService) fcService).getUser().getFullName(), service.getCharset().getResults()));
row.add(LongUtil.toBytes(size));
row.add(null); // not support
row.add(fc.isFrontWriteFlowControlled() ? "true".getBytes() : "false".getBytes());

View File

@@ -7,7 +7,7 @@ package com.actiontech.dble.services.manager.response;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbGroupDiff;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.datasource.ShardingNode;
import com.actiontech.dble.btrace.provider.ClusterDelayProvider;
import com.actiontech.dble.cluster.ClusterHelper;
@@ -27,6 +27,7 @@ import com.actiontech.dble.config.model.ClusterConfig;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.config.model.sharding.SchemaConfig;
import com.actiontech.dble.config.model.sharding.table.ERTable;
import com.actiontech.dble.config.model.user.RwSplitUserConfig;
import com.actiontech.dble.config.model.user.UserConfig;
import com.actiontech.dble.config.model.user.UserName;
import com.actiontech.dble.config.util.ConfigUtil;
@@ -45,6 +46,9 @@ import com.actiontech.dble.services.manager.handler.PacketResult;
import com.actiontech.dble.singleton.CronScheduler;
import com.actiontech.dble.singleton.FrontendUserManager;
import com.actiontech.dble.singleton.TraceManager;
import com.google.common.collect.Lists;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -175,7 +179,6 @@ public final class ReloadConfig {
} else {
packetResult.setErrorCode(ErrorCode.ER_CLUSTER_RELOAD);
}
return;
}
} finally {
lock.writeLock().unlock();
@@ -231,7 +234,6 @@ public final class ReloadConfig {
c.writeErrMessage(ErrorCode.ER_YES, sb);
}
@Deprecated
public static boolean reloadByLocalXml(final int loadAllMode) throws Exception {
return reload(loadAllMode, null, null, null, null);
}
@@ -255,140 +257,321 @@ public final class ReloadConfig {
private static boolean reload(final int loadAllMode, RawJson userConfig, RawJson dbConfig, RawJson shardingConfig, RawJson sequenceConfig) throws Exception {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("self-reload");
try {
/*
* 1 load new conf
* 1.1 ConfigInitializer init adn check itself
* 1.2 ShardingNode/dbGroup test connection
*/
ReloadLogHelper.info("reload config: load all xml info start", LOGGER);
ConfigInitializer loader;
try {
if (null == userConfig && null == dbConfig && null == shardingConfig && null == sequenceConfig) {
loader = new ConfigInitializer();
} else {
loader = new ConfigInitializer(userConfig, dbConfig, shardingConfig, sequenceConfig);
}
} catch (Exception e) {
throw new Exception(e.getMessage() == null ? e.toString() : e.getMessage(), e);
}
ReloadLogHelper.info("reload config: load all xml info end", LOGGER);
// load configuration
ConfigInitializer loader = loadConfig(userConfig, dbConfig, shardingConfig, sequenceConfig);
ReloadLogHelper.info("reload config: get variables from random alive dbGroup start", LOGGER);
try {
loader.testConnection();
} catch (Exception e) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("just test ,not stop reload, catch exception", e);
}
}
// compare changes
List<ChangeItem> changeItemList = compareChange(loader);
boolean forceAllReload = false;
if ((loadAllMode & ManagerParseConfig.OPTR_MODE) != 0) {
//-r
forceAllReload = true;
}
if (forceAllReload) {
return forceReloadAll(loadAllMode, loader);
} else {
return intelligentReloadAll(loadAllMode, loader);
}
} finally {
TraceManager.finishSpan(traceObject);
}
}
// test connection
testConnection(loader, changeItemList, forceAllReload, loadAllMode);
private static boolean intelligentReloadAll(int loadAllMode, ConfigInitializer loader) throws Exception {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("self-intelligent-reload");
try {
/* 2.1.1 get diff of dbGroups */
ServerConfig config = DbleServer.getInstance().getConfig();
Map<String, PhysicalDbGroup> addOrChangeHosts = new LinkedHashMap<>();
Map<String, PhysicalDbGroup> noChangeHosts = new LinkedHashMap<>();
Map<String, PhysicalDbGroup> recycleHosts = new HashMap<>();
distinguishDbGroup(loader.getDbGroups(), config.getDbGroups(), addOrChangeHosts, noChangeHosts, recycleHosts);
ServerConfig newConfig = new ServerConfig(loader);
Map<String, PhysicalDbGroup> newDbGroups = newConfig.getDbGroups();
Map<String, PhysicalDbGroup> mergedDbGroups = new LinkedHashMap<>();
mergedDbGroups.putAll(noChangeHosts);
mergedDbGroups.putAll(addOrChangeHosts);
// check version/packetSize/lowerCase && get system variables
SystemVariables newSystemVariables = checkVersionAGetSystemVariables(loader, newDbGroups, changeItemList, forceAllReload);
ConfigUtil.getAndSyncKeyVariables(mergedDbGroups, true);
// recycle old active conn
recycleOldBackendConnections(forceAllReload, (loadAllMode & ManagerParseConfig.OPTF_MODE) != 0);
SystemVariables newSystemVariables = getSystemVariablesFromdbGroup(loader, mergedDbGroups);
ReloadLogHelper.info("reload config: get variables from random node end", LOGGER);
ServerConfig serverConfig = new ServerConfig(loader);
// lowerCase && load sequence
if (loader.isFullyConfigured()) {
if (newSystemVariables.isLowerCaseTableNames()) {
ReloadLogHelper.info("reload config: dbGroup's lowerCaseTableNames=1, lower the config properties start", LOGGER);
serverConfig.reviseLowerCase(loader.getSequenceConfig());
newConfig.reviseLowerCase(loader.getSequenceConfig());
ReloadLogHelper.info("reload config: dbGroup's lowerCaseTableNames=1, lower the config properties end", LOGGER);
} else {
serverConfig.loadSequence(loader.getSequenceConfig());
serverConfig.selfChecking0();
newConfig.loadSequence(loader.getSequenceConfig());
newConfig.selfChecking0();
}
}
checkTestConnIfNeed(loadAllMode, loader);
Map<UserName, UserConfig> newUsers = serverConfig.getUsers();
Map<String, SchemaConfig> newSchemas = serverConfig.getSchemas();
Map<String, ShardingNode> newShardingNodes = serverConfig.getShardingNodes();
Map<ERTable, Set<ERTable>> newErRelations = serverConfig.getErRelations();
Map<String, Set<ERTable>> newFuncNodeERMap = serverConfig.getFuncNodeERMap();
Map<String, PhysicalDbGroup> newDbGroups = serverConfig.getDbGroups();
Map<String, Properties> newBlacklistConfig = serverConfig.getBlacklistConfig();
Map<String, AbstractPartitionAlgorithm> newFunctions = serverConfig.getFunctions();
/*
* 2 transform
* 2.1 old lDbInstance continue to work
* 2.1.1 define the diff of new & old dbGroup config
* 2.1.2 create new init plan for the reload
* 2.2 init the new lDbInstance
* 2.3 transform
* 2.4 put the old connection into a queue
*/
Map<UserName, UserConfig> newUsers = newConfig.getUsers();
Map<String, SchemaConfig> newSchemas = newConfig.getSchemas();
Map<String, ShardingNode> newShardingNodes = newConfig.getShardingNodes();
Map<ERTable, Set<ERTable>> newErRelations = newConfig.getErRelations();
Map<String, Set<ERTable>> newFuncNodeERMap = newConfig.getFuncNodeERMap();
Map<String, Properties> newBlacklistConfig = newConfig.getBlacklistConfig();
Map<String, AbstractPartitionAlgorithm> newFunctions = newConfig.getFunctions();
/* 2.2 init the lDbInstance with diff*/
ReloadLogHelper.info("reload config: init new dbGroup start", LOGGER);
String reasonMsg = initDbGroupByMap(mergedDbGroups, newShardingNodes, loader.isFullyConfigured());
ReloadLogHelper.info("reload config: init new dbGroup end", LOGGER);
if (reasonMsg == null) {
/* 2.3 apply new conf */
ReloadLogHelper.info("reload config: apply new config start", LOGGER);
boolean result;
try {
result = config.reload(newUsers, newSchemas, newShardingNodes, mergedDbGroups, recycleHosts, newErRelations, newFuncNodeERMap,
newSystemVariables, loader.isFullyConfigured(), loadAllMode, newBlacklistConfig, newFunctions,
loader.getUserConfig(), loader.getSequenceConfig(), loader.getShardingConfig(), loader.getDbConfig());
CronScheduler.getInstance().init(config.getSchemas());
if (!result) {
initFailed(newDbGroups);
}
FrontendUserManager.getInstance().initForLatest(newUsers, SystemConfig.getInstance().getMaxCon());
ReloadLogHelper.info("reload config: apply new config end", LOGGER);
recycleOldBackendConnections((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0);
if (!loader.isFullyConfigured()) {
recycleServerConnections();
}
return result;
} catch (Exception e) {
// start/stop connection pool && heartbeat
// replace config
ReloadLogHelper.info("reload config: apply new config start", LOGGER);
ServerConfig oldConfig = DbleServer.getInstance().getConfig();
boolean result;
try {
result = oldConfig.reload(newUsers, newSchemas, newShardingNodes, newDbGroups, oldConfig.getDbGroups(), newErRelations, newFuncNodeERMap,
newSystemVariables, loader.isFullyConfigured(), loadAllMode, newBlacklistConfig, newFunctions,
loader.getUserConfig(), loader.getSequenceConfig(), loader.getShardingConfig(), loader.getDbConfig(), changeItemList);
CronScheduler.getInstance().init(oldConfig.getSchemas());
if (!result) {
initFailed(newDbGroups);
throw e;
}
} else {
FrontendUserManager.getInstance().changeUser(changeItemList, SystemConfig.getInstance().getMaxCon());
ReloadLogHelper.info("reload config: apply new config end", LOGGER);
// recycle old active conn
recycleOldBackendConnections(!forceAllReload, (loadAllMode & ManagerParseConfig.OPTF_MODE) != 0);
if (!loader.isFullyConfigured()) {
recycleServerConnections();
}
return result;
} catch (Exception e) {
initFailed(newDbGroups);
throw new Exception(reasonMsg);
throw e;
}
} finally {
TraceManager.finishSpan(traceObject);
}
}
private static void recycleOldBackendConnections(boolean closeFrontCon) {
if (closeFrontCon) {
/**
* check version/packetSize/lowerCase
* get system variables
*/
private static SystemVariables checkVersionAGetSystemVariables(ConfigInitializer loader, Map<String, PhysicalDbGroup> newDbGroups, List<ChangeItem> changeItemList, boolean forceAllReload) throws Exception {
ReloadLogHelper.info("reload config: check and get system variables from random node start", LOGGER);
SystemVariables newSystemVariables;
if (forceAllReload) {
//check version/packetSize/lowerCase
ConfigUtil.getAndSyncKeyVariables(newDbGroups, true);
//system variables
newSystemVariables = getSystemVariablesFromdbGroup(loader, newDbGroups);
} else {
//check version/packetSize/lowerCase
ConfigUtil.getAndSyncKeyVariables(changeItemList, true);
//random one node
//system variables
PhysicalDbInstance physicalDbInstance = getPhysicalDbInstance(loader);
newSystemVariables = getSystemVariablesFromDbInstance(loader.isFullyConfigured(), physicalDbInstance);
}
ReloadLogHelper.info("reload config: check and get system variables from random node end", LOGGER);
return newSystemVariables;
}
/**
* test connection
*/
private static void testConnection(ConfigInitializer loader, List<ChangeItem> changeItemList, boolean forceAllReload, int loadAllMode) throws Exception {
ReloadLogHelper.info("reload config: test connection start", LOGGER);
try {
//test connection
if (forceAllReload && loader.isFullyConfigured()) {
loader.testConnection();
} else {
syncShardingNode(loader);
loader.testConnection(changeItemList);
}
} catch (Exception e) {
if ((loadAllMode & ManagerParseConfig.OPTS_MODE) == 0 && loader.isFullyConfigured()) {
//default/-f/-r
throw new Exception(e);
} else {
//-s
ReloadLogHelper.debug("just test ,not stop reload, catch exception", LOGGER, e);
}
}
ReloadLogHelper.info("reload config: test connection end", LOGGER);
}
/**
* compare change
*/
private static List<ChangeItem> compareChange(ConfigInitializer loader) {
ReloadLogHelper.info("reload config: compare changes start", LOGGER);
List<ChangeItem> changeItemList = differentiateChanges(loader);
ReloadLogHelper.debug("change items :{}", LOGGER, changeItemList);
ReloadLogHelper.info("reload config: compare changes end", LOGGER);
return changeItemList;
}
/**
* load configuration
* xml:db.xml/user.xml/sharding.xml/sequence
* memory:dble_information/cluster synchronization
*/
private static ConfigInitializer loadConfig(RawJson userConfig, RawJson dbConfig, RawJson shardingConfig, RawJson sequenceConfig) throws Exception {
ConfigInitializer loader;
try {
if (null == userConfig && null == dbConfig && null == shardingConfig && null == sequenceConfig) {
ReloadLogHelper.info("reload config: load config start [local xml]", LOGGER);
loader = new ConfigInitializer();
} else {
ReloadLogHelper.info("reload config: load info start [memory]", LOGGER);
ReloadLogHelper.debug("memory to Users is :{}\r\n" +
"memory to DbGroups is :{}\r\n" +
"memory to Shardings is :{}\r\n" +
"memory to sequence is :{}", LOGGER, userConfig, dbConfig, shardingConfig, sequenceConfig);
loader = new ConfigInitializer(userConfig, dbConfig, shardingConfig, sequenceConfig);
}
ReloadLogHelper.info("reload config: load config end", LOGGER);
return loader;
} catch (Exception e) {
throw new Exception(e.getMessage() == null ? e.toString() : e.getMessage(), e);
}
}
private static void syncShardingNode(ConfigInitializer loader) {
Map<String, ShardingNode> oldShardingNodeMap = DbleServer.getInstance().getConfig().getShardingNodes();
Map<String, ShardingNode> newShardingNodeMap = loader.getShardingNodes();
for (Map.Entry<String, ShardingNode> shardingNodeEntry : newShardingNodeMap.entrySet()) {
//sync schema_exists,only testConn can update schema_exists
if (oldShardingNodeMap.containsKey(shardingNodeEntry.getKey())) {
shardingNodeEntry.getValue().setSchemaExists(oldShardingNodeMap.get(shardingNodeEntry.getKey()).isSchemaExists());
}
}
}
private static List<ChangeItem> differentiateChanges(ConfigInitializer newLoader) {
List<ChangeItem> changeItemList = Lists.newArrayList();
//user
//old
ServerConfig oldServerConfig = DbleServer.getInstance().getConfig();
Map<UserName, UserConfig> oldUserMap = oldServerConfig.getUsers();
//new
Map<UserName, UserConfig> newUserMap = newLoader.getUsers();
MapDifference<UserName, UserConfig> userMapDifference = Maps.difference(newUserMap, oldUserMap);
//delete
userMapDifference.entriesOnlyOnRight().keySet().stream().map(username -> new ChangeItem(ChangeType.DELETE, username, ChangeItemType.USERNAME)).forEach(changeItemList::add);
//add
userMapDifference.entriesOnlyOnLeft().keySet().stream().map(username -> new ChangeItem(ChangeType.ADD, username, ChangeItemType.USERNAME)).forEach(changeItemList::add);
//update
userMapDifference.entriesDiffering().entrySet().stream().map(differenceEntry -> {
UserConfig newUserConfig = differenceEntry.getValue().leftValue();
UserConfig oldUserConfig = differenceEntry.getValue().rightValue();
ChangeItem changeItem = new ChangeItem(ChangeType.UPDATE, differenceEntry.getKey(), ChangeItemType.USERNAME);
if (newUserConfig instanceof RwSplitUserConfig && oldUserConfig instanceof RwSplitUserConfig) {
if (!((RwSplitUserConfig) newUserConfig).getDbGroup().equals(((RwSplitUserConfig) oldUserConfig).getDbGroup())) {
changeItem.setAffectEntryDbGroup(true);
}
}
return changeItem;
}).forEach(changeItemList::add);
//shardingNode
Map<String, ShardingNode> oldShardingNodeMap = oldServerConfig.getShardingNodes();
Map<String, ShardingNode> newShardingNodeMap = newLoader.getShardingNodes();
MapDifference<String, ShardingNode> shardingNodeMapDiff = Maps.difference(newShardingNodeMap, oldShardingNodeMap);
//delete
shardingNodeMapDiff.entriesOnlyOnRight().values().stream().map(sharingNode -> new ChangeItem(ChangeType.DELETE, sharingNode, ChangeItemType.SHARDING_NODE)).forEach(changeItemList::add);
//add
shardingNodeMapDiff.entriesOnlyOnLeft().values().stream().map(sharingNode -> new ChangeItem(ChangeType.ADD, sharingNode, ChangeItemType.SHARDING_NODE)).forEach(changeItemList::add);
//update
shardingNodeMapDiff.entriesDiffering().entrySet().stream().map(differenceEntry -> {
ShardingNode newShardingNode = differenceEntry.getValue().leftValue();
ChangeItem changeItem = new ChangeItem(ChangeType.UPDATE, newShardingNode, ChangeItemType.SHARDING_NODE);
return changeItem;
}).forEach(changeItemList::add);
//dbGroup
Map<String, PhysicalDbGroup> oldDbGroupMap = oldServerConfig.getDbGroups();
Map<String, PhysicalDbGroup> newDbGroupMap = newLoader.getDbGroups();
Map<String, PhysicalDbGroup> removeDbGroup = new LinkedHashMap<>(oldDbGroupMap);
for (Map.Entry<String, PhysicalDbGroup> newDbGroupEntry : newDbGroupMap.entrySet()) {
PhysicalDbGroup oldDbGroup = oldDbGroupMap.get(newDbGroupEntry.getKey());
PhysicalDbGroup newDbGroup = newDbGroupEntry.getValue();
if (null == oldDbGroup) {
//add dbGroup
changeItemList.add(new ChangeItem(ChangeType.ADD, newDbGroup, ChangeItemType.PHYSICAL_DB_GROUP));
} else {
removeDbGroup.remove(newDbGroupEntry.getKey());
//change dbGroup
if (!newDbGroup.equalsBaseInfo(oldDbGroup)) {
ChangeItem changeItem = new ChangeItem(ChangeType.UPDATE, newDbGroup, ChangeItemType.PHYSICAL_DB_GROUP);
if (!newDbGroup.equalsForConnectionPool(oldDbGroup)) {
changeItem.setAffectConnectionPool(true);
}
if (!newDbGroup.equalsForHeartbeat(oldDbGroup)) {
changeItem.setAffectHeartbeat(true);
}
changeItemList.add(changeItem);
}
//dbInstance
Map<String, PhysicalDbInstance> newDbInstanceMap = newDbGroup.getAllDbInstanceMap();
Map<String, PhysicalDbInstance> oldDbInstanceMap = oldDbGroup.getAllDbInstanceMap();
MapDifference<String, PhysicalDbInstance> dbInstanceMapDifference = Maps.difference(newDbInstanceMap, oldDbInstanceMap);
//delete
dbInstanceMapDifference.entriesOnlyOnRight().values().stream().map(dbInstance -> new ChangeItem(ChangeType.DELETE, dbInstance, ChangeItemType.PHYSICAL_DB_INSTANCE)).forEach(changeItemList::add);
//add
dbInstanceMapDifference.entriesOnlyOnLeft().values().stream().map(dbInstance -> new ChangeItem(ChangeType.ADD, dbInstance, ChangeItemType.PHYSICAL_DB_INSTANCE)).forEach(changeItemList::add);
//update
dbInstanceMapDifference.entriesDiffering().values().stream().map(physicalDbInstanceValueDifference -> {
PhysicalDbInstance newDbInstance = physicalDbInstanceValueDifference.leftValue();
PhysicalDbInstance oldDbInstance = physicalDbInstanceValueDifference.rightValue();
ChangeItem changeItem = new ChangeItem(ChangeType.UPDATE, newDbInstance, ChangeItemType.PHYSICAL_DB_INSTANCE);
if (!newDbInstance.equalsForConnectionPool(oldDbInstance)) {
changeItem.setAffectConnectionPool(true);
}
if (!newDbInstance.equalsForPoolCapacity(oldDbInstance)) {
changeItem.setAffectPoolCapacity(true);
}
if (!newDbInstance.equalsForHeartbeat(oldDbInstance)) {
changeItem.setAffectHeartbeat(true);
}
if (!newDbInstance.equalsForTestConn(oldDbInstance)) {
changeItem.setAffectTestConn(true);
} else {
newDbInstance.setTestConnSuccess(oldDbInstance.isTestConnSuccess());
}
return changeItem;
}).forEach(changeItemList::add);
//testConnSuccess with both
for (Map.Entry<String, PhysicalDbInstance> dbInstanceEntry : dbInstanceMapDifference.entriesInCommon().entrySet()) {
dbInstanceEntry.getValue().setTestConnSuccess(oldDbInstanceMap.get(dbInstanceEntry.getKey()).isTestConnSuccess());
}
}
}
for (Map.Entry<String, PhysicalDbGroup> entry : removeDbGroup.entrySet()) {
PhysicalDbGroup value = entry.getValue();
changeItemList.add(new ChangeItem(ChangeType.DELETE, value, ChangeItemType.PHYSICAL_DB_GROUP));
}
return changeItemList;
}
private static PhysicalDbInstance getPhysicalDbInstance(ConfigInitializer loader) {
PhysicalDbInstance ds = null;
for (PhysicalDbGroup dbGroup : loader.getDbGroups().values()) {
PhysicalDbInstance dsTest = dbGroup.getWriteDbInstance();
if (dsTest.isTestConnSuccess()) {
ds = dsTest;
}
if (ds != null) {
break;
}
}
if (ds == null) {
for (PhysicalDbGroup dbGroup : loader.getDbGroups().values()) {
for (PhysicalDbInstance dsTest : dbGroup.getDbInstances(false)) {
if (dsTest.isTestConnSuccess()) {
ds = dsTest;
break;
}
}
if (ds != null) {
break;
}
}
}
return ds;
}
private static void recycleOldBackendConnections(boolean forceAllReload, boolean closeFrontCon) {
ReloadLogHelper.info("reload config: recycle old active backend [frontend] connections start", LOGGER);
if (forceAllReload && closeFrontCon) {
for (IOProcessor processor : DbleServer.getInstance().getBackendProcessors()) {
for (BackendConnection con : processor.getBackends().values()) {
if (con.getPoolDestroyedTime() != 0) {
@@ -397,6 +580,7 @@ public final class ReloadConfig {
}
}
}
ReloadLogHelper.info("reload config: recycle old active backend [frontend] connections end", LOGGER);
}
@@ -408,93 +592,19 @@ public final class ReloadConfig {
}
}
private static boolean forceReloadAll(final int loadAllMode, ConfigInitializer loader) throws Exception {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("self-force-reload");
try {
ServerConfig config = DbleServer.getInstance().getConfig();
ServerConfig serverConfig = new ServerConfig(loader);
Map<String, PhysicalDbGroup> newDbGroups = serverConfig.getDbGroups();
ConfigUtil.getAndSyncKeyVariables(newDbGroups, true);
SystemVariables newSystemVariables = getSystemVariablesFromdbGroup(loader, newDbGroups);
ReloadLogHelper.info("reload config: get variables from random node end", LOGGER);
// recycle old active conn
if ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0) {
for (IOProcessor processor : DbleServer.getInstance().getBackendProcessors()) {
for (BackendConnection con : processor.getBackends().values()) {
if (con.getPoolDestroyedTime() != 0) {
con.close("old active backend conn will be forced closed by closing front conn");
}
}
}
}
if (loader.isFullyConfigured()) {
if (newSystemVariables.isLowerCaseTableNames()) {
ReloadLogHelper.info("reload config: dbGroup's lowerCaseTableNames=1, lower the config properties start", LOGGER);
serverConfig.reviseLowerCase(loader.getSequenceConfig());
ReloadLogHelper.info("reload config: dbGroup's lowerCaseTableNames=1, lower the config properties end", LOGGER);
} else {
serverConfig.loadSequence(loader.getSequenceConfig());
serverConfig.selfChecking0();
}
}
checkTestConnIfNeed(loadAllMode, loader);
Map<UserName, UserConfig> newUsers = serverConfig.getUsers();
Map<String, SchemaConfig> newSchemas = serverConfig.getSchemas();
Map<String, ShardingNode> newShardingNodes = serverConfig.getShardingNodes();
Map<ERTable, Set<ERTable>> newErRelations = serverConfig.getErRelations();
Map<String, Set<ERTable>> newFuncNodeERMap = serverConfig.getFuncNodeERMap();
Map<String, Properties> newBlacklistConfig = serverConfig.getBlacklistConfig();
Map<String, AbstractPartitionAlgorithm> newFunctions = serverConfig.getFunctions();
ReloadLogHelper.info("reload config: init new dbGroup start", LOGGER);
String reasonMsg = initDbGroupByMap(newDbGroups, newShardingNodes, loader.isFullyConfigured());
ReloadLogHelper.info("reload config: init new dbGroup end", LOGGER);
if (reasonMsg == null) {
/* 2.3 apply new conf */
ReloadLogHelper.info("reload config: apply new config start", LOGGER);
boolean result;
try {
result = config.reload(newUsers, newSchemas, newShardingNodes, newDbGroups, config.getDbGroups(), newErRelations, newFuncNodeERMap,
newSystemVariables, loader.isFullyConfigured(), loadAllMode, newBlacklistConfig, newFunctions,
loader.getUserConfig(), loader.getSequenceConfig(), loader.getShardingConfig(), loader.getDbConfig());
CronScheduler.getInstance().init(config.getSchemas());
if (!result) {
initFailed(newDbGroups);
}
FrontendUserManager.getInstance().initForLatest(newUsers, SystemConfig.getInstance().getMaxCon());
ReloadLogHelper.info("reload config: apply new config end", LOGGER);
if (!loader.isFullyConfigured()) {
recycleServerConnections();
}
return result;
} catch (Exception e) {
initFailed(newDbGroups);
throw e;
}
private static SystemVariables getSystemVariablesFromDbInstance(boolean fullyConfigured, PhysicalDbInstance physicalDbInstance) throws Exception {
VarsExtractorHandler handler = new VarsExtractorHandler(physicalDbInstance);
SystemVariables newSystemVariables;
newSystemVariables = handler.execute();
if (newSystemVariables == null) {
if (fullyConfigured) {
throw new Exception("Can't get variables from any dbInstance, because all of dbGroup can't connect to MySQL correctly");
} else {
initFailed(newDbGroups);
throw new Exception(reasonMsg);
}
} finally {
TraceManager.finishSpan(traceObject);
}
}
private static void checkTestConnIfNeed(int loadAllMode, ConfigInitializer loader) throws Exception {
if ((loadAllMode & ManagerParseConfig.OPTS_MODE) == 0 && loader.isFullyConfigured()) {
try {
ReloadLogHelper.info("reload config: test all shardingNodes start", LOGGER);
loader.testConnection();
ReloadLogHelper.info("reload config: test all shardingNodes end", LOGGER);
} catch (Exception e) {
throw new Exception(e);
ReloadLogHelper.info("reload config: no valid dbGroup ,keep variables as old", LOGGER);
newSystemVariables = DbleServer.getInstance().getSystemVariables();
}
}
return newSystemVariables;
}
private static SystemVariables getSystemVariablesFromdbGroup(ConfigInitializer loader, Map<String, PhysicalDbGroup> newDbGroups) throws Exception {
@@ -513,91 +623,23 @@ public final class ReloadConfig {
}
private static void recycleServerConnections() {
ReloadLogHelper.info("reload config: recycle front connection start", LOGGER);
TraceManager.TraceObject traceObject = TraceManager.threadTrace("recycle-sharding-connections");
try {
for (IOProcessor processor : DbleServer.getInstance().getFrontProcessors()) {
for (FrontendConnection fcon : processor.getFrontends().values()) {
if (!fcon.isManager()) {
ReloadLogHelper.debug("recycle front connection:{}", LOGGER, fcon);
fcon.close("Reload causes the service to stop");
}
}
}
ReloadLogHelper.info("reload config: recycle front connection end", LOGGER);
} finally {
TraceManager.finishSpan(traceObject);
}
}
private static void distinguishDbGroup(Map<String, PhysicalDbGroup> newDbGroups, Map<String, PhysicalDbGroup> oldDbGroups,
Map<String, PhysicalDbGroup> addOrChangeDbGroups, Map<String, PhysicalDbGroup> noChangeDbGroups,
Map<String, PhysicalDbGroup> recycleHosts) {
for (Map.Entry<String, PhysicalDbGroup> entry : newDbGroups.entrySet()) {
PhysicalDbGroup oldPool = oldDbGroups.get(entry.getKey());
PhysicalDbGroup newPool = entry.getValue();
if (oldPool == null) {
addOrChangeDbGroups.put(newPool.getGroupName(), newPool);
} else {
calcChangedDbGroups(addOrChangeDbGroups, noChangeDbGroups, recycleHosts, entry, oldPool);
}
}
for (Map.Entry<String, PhysicalDbGroup> entry : oldDbGroups.entrySet()) {
PhysicalDbGroup newPool = newDbGroups.get(entry.getKey());
if (newPool == null) {
PhysicalDbGroup oldPool = entry.getValue();
recycleHosts.put(oldPool.getGroupName(), oldPool);
}
}
}
private static void calcChangedDbGroups(Map<String, PhysicalDbGroup> addOrChangeHosts, Map<String, PhysicalDbGroup> noChangeHosts, Map<String, PhysicalDbGroup> recycleHosts, Map.Entry<String, PhysicalDbGroup> entry, PhysicalDbGroup oldPool) {
PhysicalDbGroupDiff toCheck = new PhysicalDbGroupDiff(entry.getValue(), oldPool);
switch (toCheck.getChangeType()) {
case PhysicalDbGroupDiff.CHANGE_TYPE_CHANGE:
recycleHosts.put(toCheck.getNewPool().getGroupName(), toCheck.getOrgPool());
addOrChangeHosts.put(toCheck.getNewPool().getGroupName(), toCheck.getNewPool());
break;
case PhysicalDbGroupDiff.CHANGE_TYPE_ADD:
//when the type is change,just delete the old one and use the new one
addOrChangeHosts.put(toCheck.getNewPool().getGroupName(), toCheck.getNewPool());
break;
case PhysicalDbGroupDiff.CHANGE_TYPE_NO:
//add old dbGroup into the new mergeddbGroups
noChangeHosts.put(toCheck.getNewPool().getGroupName(), toCheck.getOrgPool());
break;
case PhysicalDbGroupDiff.CHANGE_TYPE_DELETE:
recycleHosts.put(toCheck.getOrgPool().getGroupName(), toCheck.getOrgPool());
break;
//do not add into old one
default:
break;
}
}
private static String initDbGroupByMap(Map<String, PhysicalDbGroup> newDbGroups, Map<String, ShardingNode> newShardingNodes, boolean fullyConfigured) {
for (PhysicalDbGroup dbGroup : newDbGroups.values()) {
ReloadLogHelper.info("try to init dbGroup : " + dbGroup.getGroupName(), LOGGER);
String hostName = dbGroup.getGroupName();
// set schemas
ArrayList<String> dnSchemas = new ArrayList<>(30);
for (ShardingNode dn : newShardingNodes.values()) {
if (dn.getDbGroup().getGroupName().equals(hostName)) {
dn.setDbGroup(dbGroup);
dnSchemas.add(dn.getDatabase());
}
}
dbGroup.setSchemas(dnSchemas.toArray(new String[dnSchemas.size()]));
if (fullyConfigured) {
dbGroup.init("reload config");
} else {
LOGGER.info("dbGroup[" + hostName + "] is not fullyConfigured, so doing nothing");
}
}
return null;
}
private static void writePacket(boolean isSuccess, ManagerService service, String errorMsg, int errorCode) {
if (isSuccess) {
if (LOGGER.isInfoEnabled()) {
@@ -614,4 +656,5 @@ public final class ReloadConfig {
service.writeErrMessage(errorCode, errorMsg);
}
}
}

View File

@@ -215,7 +215,7 @@ public final class ShowConnection {
isMatch = fc.getHost().equals(whereInfo.get("host"));
}
if (whereInfo.get("user") != null) {
isMatch = isMatch && fc.getFrontEndService().getUser().toString().equals(whereInfo.get("user"));
isMatch = isMatch && fc.getFrontEndService().getUser().getFullName().equals(whereInfo.get("user"));
}
return isMatch;
}
@@ -228,7 +228,7 @@ public final class ShowConnection {
row.add(StringUtil.encode(c.getHost(), charset));
row.add(IntegerUtil.toBytes(c.getPort()));
row.add(IntegerUtil.toBytes(c.getLocalPort()));
row.add(StringUtil.encode(service.getUser().toString(), charset));
row.add(StringUtil.encode(service.getUser().getFullName(), charset));
row.add(StringUtil.encode(service.getSchema(), charset));
row.add(StringUtil.encode(service.getCharset().getClient(), charset));
row.add(StringUtil.encode(service.getCharset().getCollation(), charset));

View File

@@ -106,7 +106,7 @@ public final class ShowConnectionSQL {
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
row.add(LongUtil.toBytes(c.getId()));
row.add(StringUtil.encode(c.getHost(), charset));
row.add(StringUtil.encode(c.getFrontEndService().getUser().toString(), charset));
row.add(StringUtil.encode(c.getFrontEndService().getUser().getFullName(), charset));
AbstractService service = c.getService();
String executeSql = c.getFrontEndService().getExecuteSql();
if (executeSql != null) {

View File

@@ -188,7 +188,7 @@ public final class ShowProcessList {
// BconnID
row.add(threadId == null ? StringUtil.encode(NULL_VAL, charset) : LongUtil.toBytes(threadId));
// User
row.add(StringUtil.encode(fc.getFrontEndService().getUser().toString(), charset));
row.add(StringUtil.encode(fc.getFrontEndService().getUser().getFullName(), charset));
// Front_Host
row.add(StringUtil.encode(fc.getHost() + ":" + fc.getLocalPort(), charset));
// db

View File

@@ -108,7 +108,7 @@ public final class ShowSQL {
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
row.add(LongUtil.toBytes(idx));
row.add(StringUtil.encode(user.toString(), charset));
row.add(StringUtil.encode(user.getFullName(), charset));
row.add(StringUtil.encode(FormatUtil.formatDate(sql.getStartTime()), charset));
row.add(LongUtil.toBytes(sql.getExecuteTime()));
row.add(StringUtil.encode(sql.getSql(), charset));

View File

@@ -116,7 +116,7 @@ public final class ShowSQLHigh {
long minTime, long executeTime, long lastTime, String charset) {
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
row.add(LongUtil.toBytes(i));
row.add(StringUtil.encode(user.toString(), charset));
row.add(StringUtil.encode(user.getFullName(), charset));
row.add(LongUtil.toBytes(count));
row.add(LongUtil.toBytes(avgTime));
row.add(LongUtil.toBytes(maxTime));

View File

@@ -102,7 +102,7 @@ public final class ShowSQLLarge {
private static RowDataPacket getRow(UserName user, UserSqlLargeStat.SqlLarge sql, String charset) {
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
row.add(StringUtil.encode(user.toString(), charset));
row.add(StringUtil.encode(user.getFullName(), charset));
row.add(LongUtil.toBytes(sql.getSqlRows()));
row.add(StringUtil.encode(FormatUtil.formatDate(sql.getStartTime()), charset));
row.add(LongUtil.toBytes(sql.getExecuteTime()));

View File

@@ -99,7 +99,7 @@ public final class ShowSQLSlow {
private static RowDataPacket getRow(UserName user, SQLRecord sql, String charset) {
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
row.add(StringUtil.encode(user.toString(), charset));
row.add(StringUtil.encode(user.getFullName(), charset));
row.add(StringUtil.encode(FormatUtil.formatDate(sql.getStartTime()), charset));
row.add(LongUtil.toBytes(sql.getExecuteTime()));
row.add(StringUtil.encode(sql.getStatement(), charset));

View File

@@ -138,7 +138,7 @@ public final class ShowSQLSumUser {
String rStr = decimalFormat.format(1.0D * r / (r + w));
int max = rwStat.getConcurrentMax();
row.add(StringUtil.encode(user.toString(), charset));
row.add(StringUtil.encode(user.getFullName(), charset));
row.add(LongUtil.toBytes(r));
row.add(LongUtil.toBytes(w));
row.add(StringUtil.encode(String.valueOf(rStr), charset));

View File

@@ -97,7 +97,7 @@ public final class ShowSqlResultSet {
private static RowDataPacket getRow(int i, UserName user, String sql, int count, long resultSetSize, String charset) {
RowDataPacket row = new RowDataPacket(FIELD_COUNT);
row.add(LongUtil.toBytes(i));
row.add(StringUtil.encode(user.toString(), charset));
row.add(StringUtil.encode(user.getFullName(), charset));
row.add(LongUtil.toBytes(count));
row.add(StringUtil.encode(sql, charset));
row.add(LongUtil.toBytes(resultSetSize));

View File

@@ -93,7 +93,7 @@ public final class ShowUserPrivilege {
if (noNeedCheck || userPrivilegesConfig.getSchemaPrivilege(schema) == null) {
tableName = "*";
pri = ALL_PRIVILEGES;
RowDataPacket row = getRow(userName.toString(), schema, tableName, pri, service.getCharset().getResults());
RowDataPacket row = getRow(userName.getFullName(), schema, tableName, pri, service.getCharset().getResults());
row.setPacketId(++packetId);
buffer = row.write(buffer, service, true);
} else {
@@ -102,13 +102,13 @@ public final class ShowUserPrivilege {
for (String tn : tables) {
tableName = tn;
pri = schemaPrivilege.getTablePrivilege(tn).getDml();
RowDataPacket row = getRow(userName.toString(), schema, tableName, pri, service.getCharset().getResults());
RowDataPacket row = getRow(userName.getFullName(), schema, tableName, pri, service.getCharset().getResults());
row.setPacketId(++packetId);
buffer = row.write(buffer, service, true);
}
tableName = "*";
pri = schemaPrivilege.getDml();
RowDataPacket row = getRow(userName.toString(), schema, tableName, pri, service.getCharset().getResults());
RowDataPacket row = getRow(userName.getFullName(), schema, tableName, pri, service.getCharset().getResults());
row.setPacketId(++packetId);
buffer = row.write(buffer, service, true);
}

View File

@@ -62,7 +62,7 @@ public final class ShowWhiteHost {
for (Map.Entry<UserName, UserConfig> entry : map.entrySet()) {
if (entry.getValue().getWhiteIPs().size() > 0) {
for (String whiteIP : entry.getValue().getWhiteIPs()) {
RowDataPacket row = getRow(whiteIP, entry.getKey().toString(), service.getCharset().getResults());
RowDataPacket row = getRow(whiteIP, entry.getKey().getFullName(), service.getCharset().getResults());
row.setPacketId(++packetId);
buffer = row.write(buffer, service, true);
}

View File

@@ -17,6 +17,8 @@ import com.actiontech.dble.config.model.ClusterConfig;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.services.manager.handler.PacketResult;
import com.actiontech.dble.singleton.HaConfigManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
@@ -27,6 +29,8 @@ import java.util.regex.Matcher;
*/
public final class DbGroupHaEnable {
private static final Logger LOGGER = LoggerFactory.getLogger(DbGroupHaEnable.class);
private DbGroupHaEnable() {
}
@@ -83,6 +87,7 @@ public final class DbGroupHaEnable {
HaConfigManager.getInstance().haFinish(id, e.getMessage(), null);
packetResult.setSuccess(false);
packetResult.setErrorMsg("enable dbGroup with error, use show @@dbInstance to check latest status. Error:" + e.getMessage());
LOGGER.warn("enable dbGroup with error, use show @@dbInstance to check latest status. Error:", e);
return;
}
}

View File

@@ -31,24 +31,24 @@ public final class AuthUtil {
UserName user = new UserName(authPacket.getUser(), authPacket.getTenant());
UserConfig userConfig = DbleServer.getInstance().getConfig().getUsers().get(user);
if (userConfig == null) {
return new AuthResultInfo("Access denied for user '" + user + "' with host '" + fconn.getHost() + "'");
return new AuthResultInfo("Access denied for user '" + user.getFullName() + "' with host '" + fconn.getHost() + "'");
}
//normal user login into manager port
if (fconn.isManager() && !(userConfig instanceof ManagerUserConfig)) {
return new AuthResultInfo("Access denied for user '" + user + "'");
return new AuthResultInfo("Access denied for user '" + user.getFullName() + "'");
} else if (!fconn.isManager() && userConfig instanceof ManagerUserConfig) {
//manager user login into server port
return new AuthResultInfo("Access denied for manager user '" + user + "'");
return new AuthResultInfo("Access denied for manager user '" + user.getFullName() + "'");
}
if (!checkWhiteIPs(fconn.getHost(), userConfig.getWhiteIPs())) {
return new AuthResultInfo("Access denied for user '" + user + "' with host '" + fconn.getHost() + "'");
return new AuthResultInfo("Access denied for user '" + user.getFullName() + "' with host '" + fconn.getHost() + "'");
}
// check password
if (!checkPassword(seed, authPacket.getPassword(), userConfig.getPassword(), plugin)) {
return new AuthResultInfo("Access denied for user '" + user + "', because password is incorrect");
return new AuthResultInfo("Access denied for user '" + user.getFullName() + "', because password is incorrect");
}
if (!DbleServer.getInstance().getConfig().isFullyConfigured() && !fconn.isManager()) {
return new AuthResultInfo("Access denied for user '" + user + "', because there are some empty dbGroup/fake dbInstance");
return new AuthResultInfo("Access denied for user '" + user.getFullName() + "', because there are some empty dbGroup/fake dbInstance");
}
// check schema
final String schema = authPacket.getDatabase();
@@ -56,16 +56,16 @@ public final class AuthUtil {
case ErrorCode.ER_BAD_DB_ERROR:
return new AuthResultInfo("Unknown database '" + schema + "'");
case ErrorCode.ER_DBACCESS_DENIED_ERROR:
return new AuthResultInfo("Access denied for user '" + user + "' to database '" + schema + "'");
return new AuthResultInfo("Access denied for user '" + user.getFullName() + "' to database '" + schema + "'");
default:
break;
}
//check max connection
switch (FrontendUserManager.getInstance().maxConnectionCheck(user, userConfig.getMaxCon(), fconn.isManager())) {
case SERVER_MAX:
return new AuthResultInfo("Access denied for user '" + user + "',too many connections for dble server");
return new AuthResultInfo("Access denied for user '" + user.getFullName() + "',too many connections for dble server");
case USER_MAX:
return new AuthResultInfo("Access denied for user '" + user + "',too many connections for this user");
return new AuthResultInfo("Access denied for user '" + user.getFullName() + "',too many connections for this user");
default:
break;
}
@@ -76,21 +76,21 @@ public final class AuthUtil {
UserName user = new UserName(changeUserPacket.getUser(), changeUserPacket.getTenant());
UserConfig userConfig = DbleServer.getInstance().getConfig().getUsers().get(user);
if (userConfig == null) {
return new AuthResultInfo("Access denied for user '" + user + "' with host '" + fconn.getHost() + "'");
return new AuthResultInfo("Access denied for user '" + user.getFullName() + "' with host '" + fconn.getHost() + "'");
}
//normal user login into manager port
if (fconn.isManager() && !(userConfig instanceof ManagerUserConfig)) {
return new AuthResultInfo("Access denied for user '" + user + "'");
return new AuthResultInfo("Access denied for user '" + user.getFullName() + "'");
} else if (!fconn.isManager() && userConfig instanceof ManagerUserConfig) {
//manager user login into server port
return new AuthResultInfo("Access denied for manager user '" + user + "'");
return new AuthResultInfo("Access denied for manager user '" + user.getFullName() + "'");
}
if (!checkWhiteIPs(fconn.getHost(), userConfig.getWhiteIPs())) {
return new AuthResultInfo("Access denied for user '" + user + "' with host '" + fconn.getHost() + "'");
return new AuthResultInfo("Access denied for user '" + user.getFullName() + "' with host '" + fconn.getHost() + "'");
}
// check password
if (!checkPassword(seed, changeUserPacket.getPassword(), userConfig.getPassword(), plugin)) {
return new AuthResultInfo("Access denied for user '" + user + "', because password is incorrect");
return new AuthResultInfo("Access denied for user '" + user.getFullName() + "', because password is incorrect");
}
// check schema
final String schema = changeUserPacket.getDatabase();
@@ -98,7 +98,7 @@ public final class AuthUtil {
case ErrorCode.ER_BAD_DB_ERROR:
return new AuthResultInfo("Unknown database '" + schema + "'");
case ErrorCode.ER_DBACCESS_DENIED_ERROR:
return new AuthResultInfo("Access denied for user '" + user + "' to database '" + schema + "'");
return new AuthResultInfo("Access denied for user '" + user.getFullName() + "' to database '" + schema + "'");
default:
break;
}

View File

@@ -62,7 +62,7 @@ public class MySQLProtoLogicHandler {
return;
}
if (!service.getUserConfig().getSchemas().contains(db)) {
String s = "Access denied for user '" + service.getUser() + "' to database '" + db + "'";
String s = "Access denied for user '" + service.getUser().getFullName() + "' to database '" + db + "'";
service.writeErrMessage(ErrorCode.ER_DBACCESS_DENIED_ERROR, s);
return;
}

View File

@@ -72,7 +72,7 @@ public class MySQLShardingSQLHandler {
public void routeSystemInfoAndExecuteSQL(String stmt, SchemaUtil.SchemaInfo schemaInfo, int sqlType) {
ShardingUserConfig user = (ShardingUserConfig) (DbleServer.getInstance().getConfig().getUsers().get(service.getUser()));
if (user == null || !user.getSchemas().contains(schemaInfo.getSchema())) {
service.writeErrMessage("42000", "Access denied for user '" + service.getUser() + "' to database '" + schemaInfo.getSchema() + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR);
service.writeErrMessage("42000", "Access denied for user '" + service.getUser().getFullName() + "' to database '" + schemaInfo.getSchema() + "'", ErrorCode.ER_DBACCESS_DENIED_ERROR);
return;
}
RouteResultset rrs = new RouteResultset(stmt, sqlType);

View File

@@ -193,7 +193,7 @@ public class ShardingService extends BusinessService<ShardingUserConfig> {
result.getViolations().get(0).getMessage());
} else {
String violation = "[" + WallErrorCode.get(result.getViolations().get(0)) + "]";
String msg = "Intercepted by suspected configuration " + violation + " in the blacklist of user '" + user + "', so it is considered unsafe SQL";
String msg = "Intercepted by suspected configuration " + violation + " in the blacklist of user '" + user.getFullName() + "', so it is considered unsafe SQL";
LOGGER.warn("Firewall message:{}, {}",
result.getViolations().get(0).getMessage(), msg);
writeErrMessage(ErrorCode.ERR_WRONG_USED, msg);

View File

@@ -265,7 +265,7 @@ public class RWSplitService extends BusinessService<SingleDbGroupUserConfig> {
LOGGER.warn("{}", "druid not support sql syntax, the reason is " + result.getViolations().get(0).getMessage());
} else {
String violation = "[" + WallErrorCode.get(result.getViolations().get(0)) + "]";
String msg = "Intercepted by suspected configuration " + violation + " in the blacklist of user '" + user + "', so it is considered unsafe SQL";
String msg = "Intercepted by suspected configuration " + violation + " in the blacklist of user '" + user.getFullName() + "', so it is considered unsafe SQL";
LOGGER.warn("Firewall message:{}, {}",
result.getViolations().get(0).getMessage(), msg);
writeErrMessage(ErrorCode.ERR_WRONG_USED, msg);

View File

@@ -7,9 +7,13 @@ package com.actiontech.dble.singleton;
import com.actiontech.dble.config.model.user.UserConfig;
import com.actiontech.dble.config.model.user.UserName;
import com.actiontech.dble.services.manager.response.ChangeItem;
import com.actiontech.dble.services.manager.response.ChangeItemType;
import com.actiontech.dble.services.manager.response.ChangeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
@@ -67,12 +71,45 @@ public final class FrontendUserManager {
}
}
public void changeUser(List<ChangeItem> changeItemList, int serverLimit) {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("init-for-user-manager");
try {
serverMaxConnection = serverLimit;
for (ChangeItem changeItem : changeItemList) {
ChangeType type = changeItem.getType();
Object item = changeItem.getItem();
boolean isUser = changeItem.getItemType() == ChangeItemType.USERNAME;
if (!isUser) {
continue;
}
UserName userName = (UserName) item;
if (type == ChangeType.ADD) {
//add
if (!userConnectionMap.containsKey(userName)) {
userConnectionMap.put(userName, 0);
}
} else if (type == ChangeType.DELETE) {
//delete
if (userConnectionMap.containsKey(userName)) {
userConnectionMap.remove(userName);
}
}
}
} finally {
TraceManager.finishSpan(traceObject);
}
}
public CheckStatus maxConnectionCheck(UserName user, int userLimit, boolean isManager) {
maxConLock.lock();
try {
int userConnection = userConnectionMap.get(user);
Integer userConnection = userConnectionMap.get(user);
if (null == userConnection) {
userConnection = 0;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("user:" + user + ",userLimit=" + userLimit + ",userConnection=" + userConnection);
}
@@ -99,6 +136,10 @@ public final class FrontendUserManager {
return OK;
}
public Map<UserName, Integer> getUserConnectionMap() {
return userConnectionMap;
}
public enum CheckStatus {
OK, SERVER_MAX, USER_MAX
}

View File

@@ -43,7 +43,7 @@ public final class RouteService {
String cacheKey = null;
if (sqlType == ServerParse.SELECT && !LOGGER.isDebugEnabled() && CacheService.getSqlRouteCache() != null) {
cacheKey = (schema == null ? "NULL" : schema.getName()) + "_" + service.getUser() + "_" + stmt;
cacheKey = (schema == null ? "NULL" : schema.getName()) + "_" + service.getUser().getFullName() + "_" + stmt;
rrs = (RouteResultset) CacheService.getSqlRouteCache().get(cacheKey);
if (rrs != null) {
service.getSession2().endParse();

View File

@@ -6,8 +6,9 @@
package com.actiontech.dble.singleton;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.xa.XAAnalysisHandler;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.xa.XAAnalysisHandler;
import com.actiontech.dble.backend.mysql.xa.XAStateLog;
import com.actiontech.dble.buffer.BufferPool;
import com.actiontech.dble.config.model.SystemConfig;
@@ -52,6 +53,7 @@ public final class Scheduler {
scheduledExecutor.scheduleWithFixedDelay(DbleServer.getInstance().processorCheck(), 0L, SystemConfig.getInstance().getProcessorCheckPeriod(), TimeUnit.MILLISECONDS);
scheduledExecutor.scheduleAtFixedRate(dbInstanceOldConsClear(), 0L, DEFAULT_OLD_CONNECTION_CLEAR_PERIOD, TimeUnit.MILLISECONDS);
scheduledExecutor.scheduleAtFixedRate(oldDbGroupClear(), 0L, DEFAULT_OLD_CONNECTION_CLEAR_PERIOD, TimeUnit.MILLISECONDS);
scheduledExecutor.scheduleAtFixedRate(oldDbInstanceClear(), 0L, DEFAULT_OLD_CONNECTION_CLEAR_PERIOD, TimeUnit.MILLISECONDS);
scheduledExecutor.scheduleWithFixedDelay(xaSessionCheck(), 0L, SystemConfig.getInstance().getXaSessionCheckPeriod(), TimeUnit.MILLISECONDS);
scheduledExecutor.scheduleWithFixedDelay(xaLogClean(), 0L, SystemConfig.getInstance().getXaLogCleanPeriod(), TimeUnit.MILLISECONDS);
scheduledExecutor.scheduleWithFixedDelay(resultSetMapClear(), 0L, SystemConfig.getInstance().getClearBigSQLResultSetMapMs(), TimeUnit.MILLISECONDS);
@@ -116,16 +118,15 @@ public final class Scheduler {
}
/**
* after reload @@config_all ,clean old connection
* after reload @@config_all ,clean old dbGroup
*/
private Runnable oldDbGroupClear() {
return () -> timerExecutor.execute(() -> {
Iterator<PhysicalDbGroup> iterator = IOProcessor.BACKENDS_OLD_GROUP.iterator();
while (iterator.hasNext()) {
PhysicalDbGroup dbGroup = iterator.next();
boolean isStop = dbGroup.stopOfBackground("[background task]reload config, recycle old group");
LOGGER.info("[background task]recycle old group`{}` result{}", dbGroup.getGroupName(), isStop);
LOGGER.info("[background task]recycle old group:{},result:{}", dbGroup.getGroupName(), isStop);
if (isStop) {
iterator.remove();
}
@@ -133,6 +134,24 @@ public final class Scheduler {
});
}
/**
* after reload @@config_all ,clean old dbInstance
*/
private Runnable oldDbInstanceClear() {
return () -> timerExecutor.execute(() -> {
Iterator<PhysicalDbInstance> iterator = IOProcessor.BACKENDS_OLD_INSTANCE.iterator();
while (iterator.hasNext()) {
PhysicalDbInstance dbInstance = iterator.next();
boolean isStop = dbInstance.stopOfBackground("[background task]reload config, recycle old dbInstance");
LOGGER.info("[background task]recycle old dbInstance:{},result:{}", dbInstance, isStop);
if (isStop) {
iterator.remove();
dbInstance.getDbGroup().setState(PhysicalDbGroup.INITIAL);
}
}
});
}
// XA session check job
private Runnable xaSessionCheck() {