[inner-2161] sequence configuration should not be instantiated when dryrun (cherry pick from #3685)

This commit is contained in:
wenyh
2023-05-08 18:39:01 +08:00
committed by wenyh1
parent 2cfb81980a
commit efb67e07ec
11 changed files with 128 additions and 101 deletions

View File

@@ -233,7 +233,7 @@ public final class DbleServer {
generalLogProcessor.start();
}
SequenceManager.init(ClusterConfig.getInstance().getSequenceHandlerType());
SequenceManager.init();
LOGGER.info("===================================Sequence manager init finish===================================");
@@ -412,7 +412,9 @@ public final class DbleServer {
private void reviseSchemas() {
if (systemVariables.isLowerCaseTableNames()) {
config.reviseLowerCase(DbleTempConfig.getInstance().getSequenceConfig());
config.reviseLowerCase();
config.loadSequence(DbleTempConfig.getInstance().getSequenceConfig());
config.selfChecking0();
ConfigUtil.setSchemasForPool(config.getDbGroups(), config.getShardingNodes());
} else {
config.loadSequence(DbleTempConfig.getInstance().getSequenceConfig());

View File

@@ -464,7 +464,7 @@ public class ServerConfig {
return sb.toString();
}
public void reviseLowerCase(String sequenceJson) {
public void reviseLowerCase() {
//user sharding
for (UserConfig uc : users.values()) {
@@ -506,21 +506,18 @@ public class ServerConfig {
erRelations = newErMap;
}
loadSequence(sequenceJson);
selfChecking0();
}
private void loadSequence() {
SequenceManager.load(DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames());
public void reloadSequence(String sequenceJson) {
SequenceManager.reload(sequenceJson);
}
public void loadSequence(String sequenceJson) {
if (StringUtil.isEmpty(sequenceJson)) {
loadSequence();
} else {
SequenceManager.load(DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames(), sequenceJson);
}
SequenceManager.load(sequenceJson);
}
public void tryLoadSequence(String sequenceJson) {
SequenceManager.tryLoad(sequenceJson);
}
public void selfChecking0() throws ConfigException {

View File

@@ -19,14 +19,13 @@ import com.actiontech.dble.config.ConfigFileName;
import com.actiontech.dble.config.ErrorInfo;
import com.actiontech.dble.config.ProblemReporter;
import com.actiontech.dble.config.Versions;
import com.actiontech.dble.config.model.ClusterConfig;
import com.actiontech.dble.config.model.sharding.SchemaConfig;
import com.actiontech.dble.config.model.sharding.ShardingNodeConfig;
import com.actiontech.dble.config.model.sharding.table.*;
import com.actiontech.dble.config.util.ConfigException;
import com.actiontech.dble.config.util.ParameterMapping;
import com.actiontech.dble.route.function.*;
import com.actiontech.dble.route.sequence.handler.IncrSequenceMySQLHandler;
import com.actiontech.dble.singleton.SequenceManager;
import com.actiontech.dble.util.SplitUtil;
import com.actiontech.dble.util.StringUtil;
import com.google.common.collect.Lists;
@@ -441,11 +440,7 @@ public class ShardingConverter {
}
// add global sequence node when it is some dedicated servers */
if (ClusterConfig.getInstance().getSequenceHandlerType() == ClusterConfig.SEQUENCE_HANDLER_MYSQL && !StringUtil.isBlank(sequenceJson)) {
IncrSequenceMySQLHandler redundancy = new IncrSequenceMySQLHandler();
redundancy.loadByJson(false, sequenceJson);
allUseShardingNode.addAll(redundancy.getShardingNodes());
}
allUseShardingNode.addAll(SequenceManager.getShardingNodes(sequenceJson));
//delete redundancy shardingNode
Iterator<Map.Entry<String, com.actiontech.dble.backend.datasource.ShardingNode>> iterator = this.shardingNodeMap.entrySet().iterator();

View File

@@ -61,7 +61,8 @@ public class DistributedSequenceHandler implements Closeable, SequenceHandler {
return DistributedSequenceHandler.instance;
}
public void load(boolean isLowerCaseTableNames) {
@Override
public void load(String sequenceJson, boolean isLowerCaseTableNames) {
if (ClusterConfig.getInstance().isSequenceInstanceByZk()) {
initializeZK();
loadInstanceIdByZK();

View File

@@ -22,48 +22,43 @@ public class IncrSequenceMySQLHandler implements SequenceHandler {
protected static final String ERR_SEQ_RESULT = "-999999999,null";
protected static final Map<String, String> LATEST_ERRORS = new ConcurrentHashMap<>();
private final FetchMySQLSequenceHandler mysqlSeqFetcher = new FetchMySQLSequenceHandler();
private static Set<String> shardingNodes = new HashSet<>();
public void load(boolean isLowerCaseTableNames) {
// load sequence properties
Properties props = PropertiesUtil.loadProps(ConfigFileName.SEQUENCE_DB_FILE_NAME, isLowerCaseTableNames);
removeDesertedSequenceVals(props);
putNewSequenceVals(props);
@Override
public void load(String sequenceJson, boolean isLowerCaseTableNames) {
Properties props;
if (sequenceJson != null) {
// load cluster properties
SequenceConverter sequenceConverter = new SequenceConverter();
props = sequenceConverter.jsonToProperties(sequenceJson);
props = PropertiesUtil.handleLowerCase(props, isLowerCaseTableNames);
} else {
// load sequence properties
props = PropertiesUtil.loadProps(ConfigFileName.SEQUENCE_DB_FILE_NAME, isLowerCaseTableNames);
}
loadContext(props);
}
@Override
public void loadByJson(boolean isLowerCaseTableNames, String sequenceJson) {
SequenceConverter sequenceConverter = new SequenceConverter();
Properties props = sequenceConverter.jsonToProperties(sequenceJson);
props = PropertiesUtil.handleLowerCase(props, isLowerCaseTableNames);
removeDesertedSequenceVals(props);
putNewSequenceVals(props);
public void tryLoad(String sequenceJson, boolean isLowerCaseTableNames) {
load(sequenceJson, isLowerCaseTableNames);
}
public Set<String> getShardingNodes() {
return shardingNodes;
}
private void removeDesertedSequenceVals(Properties props) {
Iterator<Map.Entry<String, SequenceVal>> i = seqValueMap.entrySet().iterator();
while (i.hasNext()) {
Map.Entry<String, SequenceVal> entry = i.next();
if (!props.containsKey(entry.getKey())) {
i.remove();
}
}
}
private void putNewSequenceVals(Properties props) {
for (Map.Entry<Object, Object> entry : props.entrySet()) {
public void loadContext(Properties props) {
seqValueMap.clear();
props.entrySet().stream().forEach(entry -> {
String seqName = (String) entry.getKey();
String shardingNode = (String) entry.getValue();
SequenceVal value = seqValueMap.putIfAbsent(seqName, new SequenceVal(seqName, shardingNode));
if (value != null) {
value.shardingNode = shardingNode;
}
shardingNodes.add(shardingNode);
}
seqValueMap.putIfAbsent(seqName, new SequenceVal(seqName, shardingNode));
});
}
public static Set<String> getShardingNodes(String sequenceJson) {
Set<String> shardingNodes = new HashSet<>();
Properties propsTmp = (new SequenceConverter()).jsonToProperties(sequenceJson);
propsTmp.entrySet().stream().forEach(entry -> {
shardingNodes.add((String) entry.getValue());
});
return shardingNodes;
}
/**

View File

@@ -17,7 +17,8 @@ public final class IncrSequenceTimeHandler implements SequenceHandler {
private IdWorker workey;
public void load(boolean isLowerCaseTableNames) {
@Override
public void load(String sequenceJson, boolean isLowerCaseTableNames) {
long startTimeMilliseconds = ClusterConfig.getInstance().sequenceStartTime();
workey = new IdWorker(startTimeMilliseconds);
}
@@ -30,9 +31,9 @@ public final class IncrSequenceTimeHandler implements SequenceHandler {
/**
* @author sw
* <p>
* Now:
* 64 bit ID 30 (millisecond high 30 )+10(instance_ID)+12(autoincrement)+12 (millisecond low 12)
* <p>
* Now:
* 64 bit ID 30 (millisecond high 30 )+10(instance_ID)+12(autoincrement)+12 (millisecond low 12)
*/
static class IdWorker {
private static final long TIMESTAMP_LOW_BITS = 12L;

View File

@@ -50,8 +50,16 @@ public class IncrSequenceZKHandler extends IncrSequenceHandler {
private Properties props;
@Override
public synchronized void load(boolean isLowerCaseTableNames) {
this.props = PropertiesUtil.loadProps(ConfigFileName.SEQUENCE_FILE_NAME, isLowerCaseTableNames);
public synchronized void load(String sequenceJson, boolean isLowerCaseTableNames) {
if (sequenceJson != null) {
// load cluster properties
SequenceConverter sequenceConverter = new SequenceConverter();
this.props = sequenceConverter.jsonToProperties(sequenceJson);
this.props = PropertiesUtil.handleLowerCase(this.props, isLowerCaseTableNames);
} else {
// load local properties
this.props = PropertiesUtil.loadProps(ConfigFileName.SEQUENCE_FILE_NAME, isLowerCaseTableNames);
}
String zkAddress = ClusterConfig.getInstance().getClusterIP();
if (zkAddress == null) {
throw new RuntimeException("please check ClusterIP is correct in config file \"cluster.cnf\" .");
@@ -64,18 +72,10 @@ public class IncrSequenceZKHandler extends IncrSequenceHandler {
}
@Override
public void loadByJson(boolean isLowerCaseTableNames, String sequenceJson) {
SequenceConverter sequenceConverter = new SequenceConverter();
this.props = sequenceConverter.jsonToProperties(sequenceJson);
this.props = PropertiesUtil.handleLowerCase(this.props, isLowerCaseTableNames);
String zkAddress = ClusterConfig.getInstance().getClusterIP();
if (zkAddress == null) {
throw new RuntimeException("please check ClusterIP is correct in config file \"cluster.cnf\" .");
}
try {
initializeZK(this.props, zkAddress);
} catch (Exception e) {
LOGGER.warn("Error caught while initializing ZK:" + e.getCause());
public void tryLoad(String sequenceJson, boolean isLowerCaseTableNames) {
load(sequenceJson, isLowerCaseTableNames);
if (client != null) {
client.close();
}
}

View File

@@ -5,6 +5,8 @@
*/
package com.actiontech.dble.route.sequence.handler;
import com.actiontech.dble.config.util.ConfigException;
import java.sql.SQLNonTransientException;
/**
@@ -16,9 +18,9 @@ public interface SequenceHandler {
long nextId(String prefixName) throws SQLNonTransientException;
void load(boolean isLowerCaseTableNames);
default void loadByJson(boolean isLowerCaseTableNames, String sequenceJson) {
default void tryLoad(String sequenceJson, boolean isLowerCaseTableNames) throws ConfigException {
}
void load(String sequenceJson, boolean isLowerCaseTableNames);
}

View File

@@ -125,11 +125,11 @@ public final class DryRun {
} else {
try {
if (newSystemVariables.isLowerCaseTableNames()) {
serverConfig.reviseLowerCase(loader.getSequenceConfig());
} else {
serverConfig.loadSequence(loader.getSequenceConfig());
serverConfig.selfChecking0();
serverConfig.reviseLowerCase();
}
serverConfig.tryLoadSequence(loader.getSequenceConfig());
serverConfig.selfChecking0();
Map<String, Set<String>> schemaMap = getExistSchemas(serverConfig);
//table exists check ,if the vars can not be touch ,the table check has no meaning
tableExistsCheck(list, serverConfig, newSystemVariables.isLowerCaseTableNames(), schemaMap);

View File

@@ -318,12 +318,11 @@ public final class ReloadConfig {
if (loader.isFullyConfigured()) {
if (newSystemVariables.isLowerCaseTableNames()) {
ReloadLogHelper.info("reload config: dbGroup's lowerCaseTableNames=1, lower the config properties start", LOGGER);
serverConfig.reviseLowerCase(loader.getSequenceConfig());
serverConfig.reviseLowerCase();
ReloadLogHelper.info("reload config: dbGroup's lowerCaseTableNames=1, lower the config properties end", LOGGER);
} else {
serverConfig.loadSequence(loader.getSequenceConfig());
serverConfig.selfChecking0();
}
serverConfig.reloadSequence(loader.getSequenceConfig());
serverConfig.selfChecking0();
}
checkTestConnIfNeed(loadAllMode, loader);
@@ -428,12 +427,11 @@ public final class ReloadConfig {
if (loader.isFullyConfigured()) {
if (newSystemVariables.isLowerCaseTableNames()) {
ReloadLogHelper.info("reload config: dbGroup's lowerCaseTableNames=1, lower the config properties start", LOGGER);
serverConfig.reviseLowerCase(loader.getSequenceConfig());
serverConfig.reviseLowerCase();
ReloadLogHelper.info("reload config: dbGroup's lowerCaseTableNames=1, lower the config properties end", LOGGER);
} else {
serverConfig.loadSequence(loader.getSequenceConfig());
serverConfig.selfChecking0();
}
serverConfig.reloadSequence(loader.getSequenceConfig());
serverConfig.selfChecking0();
}
checkTestConnIfNeed(loadAllMode, loader);

View File

@@ -1,7 +1,11 @@
package com.actiontech.dble.singleton;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.config.model.ClusterConfig;
import com.actiontech.dble.route.sequence.handler.*;
import com.google.common.collect.Sets;
import java.util.Set;
/**
* Created by szf on 2019/9/19.
@@ -14,41 +18,66 @@ public final class SequenceManager {
}
public static void init(int seqHandlerType) {
public static void init() {
int seqHandlerType = ClusterConfig.getInstance().getSequenceHandlerType();
INSTANCE.handler = newSequenceHandler(seqHandlerType);
}
private static SequenceHandler newSequenceHandler(int seqHandlerType) {
switch (seqHandlerType) {
case ClusterConfig.SEQUENCE_HANDLER_MYSQL:
INSTANCE.handler = new IncrSequenceMySQLHandler();
break;
return new IncrSequenceMySQLHandler();
case ClusterConfig.SEQUENCE_HANDLER_LOCAL_TIME:
INSTANCE.handler = new IncrSequenceTimeHandler();
break;
return new IncrSequenceTimeHandler();
case ClusterConfig.SEQUENCE_HANDLER_ZK_DISTRIBUTED:
if (ClusterConfig.getInstance().isClusterEnable() && ClusterConfig.getInstance().useZkMode()) {
INSTANCE.handler = new DistributedSequenceHandler();
return new DistributedSequenceHandler();
} else {
throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType + " for no-zk cluster");
}
break;
case ClusterConfig.SEQUENCE_HANDLER_ZK_GLOBAL_INCREMENT:
if (ClusterConfig.getInstance().isClusterEnable() && ClusterConfig.getInstance().useZkMode()) {
INSTANCE.handler = new IncrSequenceZKHandler();
return new IncrSequenceZKHandler();
} else {
throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType + " for no-zk cluster");
}
break;
default:
throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType);
}
}
public static void load(boolean lowerCaseTableNames) {
INSTANCE.handler.load(lowerCaseTableNames);
public static void load(String sequenceJson) {
if (INSTANCE.handler == null)
return;
INSTANCE.handler.load(sequenceJson, DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames());
}
public static void load(boolean lowerCaseTableNames, String sequenceJson) {
INSTANCE.handler.loadByJson(lowerCaseTableNames, sequenceJson);
public static void reload(String sequenceJson) {
if (INSTANCE.handler == null)
return;
int seqHandlerType = ClusterConfig.getInstance().getSequenceHandlerType();
switch (seqHandlerType) {
case ClusterConfig.SEQUENCE_HANDLER_MYSQL:
case ClusterConfig.SEQUENCE_HANDLER_ZK_GLOBAL_INCREMENT:
INSTANCE.handler.load(sequenceJson, DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames());
break;
default:
break;
}
}
public static void tryLoad(String sequenceJson) {
int seqHandlerType = ClusterConfig.getInstance().getSequenceHandlerType();
switch (seqHandlerType) {
case ClusterConfig.SEQUENCE_HANDLER_MYSQL:
case ClusterConfig.SEQUENCE_HANDLER_ZK_GLOBAL_INCREMENT:
SequenceHandler tmpHandler = newSequenceHandler(seqHandlerType);
tmpHandler.tryLoad(sequenceJson, DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames());
break;
default:
break;
}
}
public static SequenceManager getInstance() {
return INSTANCE;
}
@@ -56,4 +85,11 @@ public final class SequenceManager {
public static SequenceHandler getHandler() {
return INSTANCE.handler;
}
public static Set<String> getShardingNodes(String sequenceJson) {
if (ClusterConfig.getInstance().getSequenceHandlerType() == ClusterConfig.SEQUENCE_HANDLER_MYSQL && sequenceJson != null) {
return IncrSequenceMySQLHandler.getShardingNodes(sequenceJson);
}
return Sets.newHashSet();
}
}