diff --git a/src/main/java/com/actiontech/dble/DbleServer.java b/src/main/java/com/actiontech/dble/DbleServer.java index d836f6e58..2d16ab97d 100644 --- a/src/main/java/com/actiontech/dble/DbleServer.java +++ b/src/main/java/com/actiontech/dble/DbleServer.java @@ -233,7 +233,7 @@ public final class DbleServer { generalLogProcessor.start(); } - SequenceManager.init(ClusterConfig.getInstance().getSequenceHandlerType()); + SequenceManager.init(); LOGGER.info("===================================Sequence manager init finish==================================="); @@ -412,7 +412,9 @@ public final class DbleServer { private void reviseSchemas() { if (systemVariables.isLowerCaseTableNames()) { - config.reviseLowerCase(DbleTempConfig.getInstance().getSequenceConfig()); + config.reviseLowerCase(); + config.loadSequence(DbleTempConfig.getInstance().getSequenceConfig()); + config.selfChecking0(); ConfigUtil.setSchemasForPool(config.getDbGroups(), config.getShardingNodes()); } else { config.loadSequence(DbleTempConfig.getInstance().getSequenceConfig()); diff --git a/src/main/java/com/actiontech/dble/config/ServerConfig.java b/src/main/java/com/actiontech/dble/config/ServerConfig.java index b9896f496..bc02fe192 100644 --- a/src/main/java/com/actiontech/dble/config/ServerConfig.java +++ b/src/main/java/com/actiontech/dble/config/ServerConfig.java @@ -464,7 +464,7 @@ public class ServerConfig { return sb.toString(); } - public void reviseLowerCase(String sequenceJson) { + public void reviseLowerCase() { //user sharding for (UserConfig uc : users.values()) { @@ -506,21 +506,18 @@ public class ServerConfig { erRelations = newErMap; } - loadSequence(sequenceJson); - selfChecking0(); - } - private void loadSequence() { - SequenceManager.load(DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames()); + public void reloadSequence(String sequenceJson) { + SequenceManager.reload(sequenceJson); } public void loadSequence(String sequenceJson) { - if (StringUtil.isEmpty(sequenceJson)) { - loadSequence(); - } else { - SequenceManager.load(DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames(), sequenceJson); - } + SequenceManager.load(sequenceJson); + } + + public void tryLoadSequence(String sequenceJson) { + SequenceManager.tryLoad(sequenceJson); } public void selfChecking0() throws ConfigException { diff --git a/src/main/java/com/actiontech/dble/config/converter/ShardingConverter.java b/src/main/java/com/actiontech/dble/config/converter/ShardingConverter.java index eb225a061..08011d29d 100644 --- a/src/main/java/com/actiontech/dble/config/converter/ShardingConverter.java +++ b/src/main/java/com/actiontech/dble/config/converter/ShardingConverter.java @@ -19,14 +19,13 @@ import com.actiontech.dble.config.ConfigFileName; import com.actiontech.dble.config.ErrorInfo; import com.actiontech.dble.config.ProblemReporter; import com.actiontech.dble.config.Versions; -import com.actiontech.dble.config.model.ClusterConfig; import com.actiontech.dble.config.model.sharding.SchemaConfig; import com.actiontech.dble.config.model.sharding.ShardingNodeConfig; import com.actiontech.dble.config.model.sharding.table.*; import com.actiontech.dble.config.util.ConfigException; import com.actiontech.dble.config.util.ParameterMapping; import com.actiontech.dble.route.function.*; -import com.actiontech.dble.route.sequence.handler.IncrSequenceMySQLHandler; +import com.actiontech.dble.singleton.SequenceManager; import com.actiontech.dble.util.SplitUtil; import com.actiontech.dble.util.StringUtil; import com.google.common.collect.Lists; @@ -441,11 +440,7 @@ public class ShardingConverter { } // add global sequence node when it is some dedicated servers */ - if (ClusterConfig.getInstance().getSequenceHandlerType() == ClusterConfig.SEQUENCE_HANDLER_MYSQL && !StringUtil.isBlank(sequenceJson)) { - IncrSequenceMySQLHandler redundancy = new IncrSequenceMySQLHandler(); - redundancy.loadByJson(false, sequenceJson); - allUseShardingNode.addAll(redundancy.getShardingNodes()); - } + allUseShardingNode.addAll(SequenceManager.getShardingNodes(sequenceJson)); //delete redundancy shardingNode Iterator> iterator = this.shardingNodeMap.entrySet().iterator(); diff --git a/src/main/java/com/actiontech/dble/route/sequence/handler/DistributedSequenceHandler.java b/src/main/java/com/actiontech/dble/route/sequence/handler/DistributedSequenceHandler.java index 0d596c7cf..780f42754 100644 --- a/src/main/java/com/actiontech/dble/route/sequence/handler/DistributedSequenceHandler.java +++ b/src/main/java/com/actiontech/dble/route/sequence/handler/DistributedSequenceHandler.java @@ -61,7 +61,8 @@ public class DistributedSequenceHandler implements Closeable, SequenceHandler { return DistributedSequenceHandler.instance; } - public void load(boolean isLowerCaseTableNames) { + @Override + public void load(String sequenceJson, boolean isLowerCaseTableNames) { if (ClusterConfig.getInstance().isSequenceInstanceByZk()) { initializeZK(); loadInstanceIdByZK(); diff --git a/src/main/java/com/actiontech/dble/route/sequence/handler/IncrSequenceMySQLHandler.java b/src/main/java/com/actiontech/dble/route/sequence/handler/IncrSequenceMySQLHandler.java index 6179b6c8e..29799c869 100644 --- a/src/main/java/com/actiontech/dble/route/sequence/handler/IncrSequenceMySQLHandler.java +++ b/src/main/java/com/actiontech/dble/route/sequence/handler/IncrSequenceMySQLHandler.java @@ -22,48 +22,43 @@ public class IncrSequenceMySQLHandler implements SequenceHandler { protected static final String ERR_SEQ_RESULT = "-999999999,null"; protected static final Map LATEST_ERRORS = new ConcurrentHashMap<>(); private final FetchMySQLSequenceHandler mysqlSeqFetcher = new FetchMySQLSequenceHandler(); - private static Set shardingNodes = new HashSet<>(); - public void load(boolean isLowerCaseTableNames) { - // load sequence properties - Properties props = PropertiesUtil.loadProps(ConfigFileName.SEQUENCE_DB_FILE_NAME, isLowerCaseTableNames); - removeDesertedSequenceVals(props); - putNewSequenceVals(props); + @Override + public void load(String sequenceJson, boolean isLowerCaseTableNames) { + Properties props; + if (sequenceJson != null) { + // load cluster properties + SequenceConverter sequenceConverter = new SequenceConverter(); + props = sequenceConverter.jsonToProperties(sequenceJson); + props = PropertiesUtil.handleLowerCase(props, isLowerCaseTableNames); + } else { + // load sequence properties + props = PropertiesUtil.loadProps(ConfigFileName.SEQUENCE_DB_FILE_NAME, isLowerCaseTableNames); + } + loadContext(props); } @Override - public void loadByJson(boolean isLowerCaseTableNames, String sequenceJson) { - SequenceConverter sequenceConverter = new SequenceConverter(); - Properties props = sequenceConverter.jsonToProperties(sequenceJson); - props = PropertiesUtil.handleLowerCase(props, isLowerCaseTableNames); - removeDesertedSequenceVals(props); - putNewSequenceVals(props); + public void tryLoad(String sequenceJson, boolean isLowerCaseTableNames) { + load(sequenceJson, isLowerCaseTableNames); } - public Set getShardingNodes() { - return shardingNodes; - } - - private void removeDesertedSequenceVals(Properties props) { - Iterator> i = seqValueMap.entrySet().iterator(); - while (i.hasNext()) { - Map.Entry entry = i.next(); - if (!props.containsKey(entry.getKey())) { - i.remove(); - } - } - } - - private void putNewSequenceVals(Properties props) { - for (Map.Entry entry : props.entrySet()) { + public void loadContext(Properties props) { + seqValueMap.clear(); + props.entrySet().stream().forEach(entry -> { String seqName = (String) entry.getKey(); String shardingNode = (String) entry.getValue(); - SequenceVal value = seqValueMap.putIfAbsent(seqName, new SequenceVal(seqName, shardingNode)); - if (value != null) { - value.shardingNode = shardingNode; - } - shardingNodes.add(shardingNode); - } + seqValueMap.putIfAbsent(seqName, new SequenceVal(seqName, shardingNode)); + }); + } + + public static Set getShardingNodes(String sequenceJson) { + Set shardingNodes = new HashSet<>(); + Properties propsTmp = (new SequenceConverter()).jsonToProperties(sequenceJson); + propsTmp.entrySet().stream().forEach(entry -> { + shardingNodes.add((String) entry.getValue()); + }); + return shardingNodes; } /** diff --git a/src/main/java/com/actiontech/dble/route/sequence/handler/IncrSequenceTimeHandler.java b/src/main/java/com/actiontech/dble/route/sequence/handler/IncrSequenceTimeHandler.java index 8dc284cdd..824fce88c 100644 --- a/src/main/java/com/actiontech/dble/route/sequence/handler/IncrSequenceTimeHandler.java +++ b/src/main/java/com/actiontech/dble/route/sequence/handler/IncrSequenceTimeHandler.java @@ -17,7 +17,8 @@ public final class IncrSequenceTimeHandler implements SequenceHandler { private IdWorker workey; - public void load(boolean isLowerCaseTableNames) { + @Override + public void load(String sequenceJson, boolean isLowerCaseTableNames) { long startTimeMilliseconds = ClusterConfig.getInstance().sequenceStartTime(); workey = new IdWorker(startTimeMilliseconds); } @@ -30,9 +31,9 @@ public final class IncrSequenceTimeHandler implements SequenceHandler { /** * @author sw - *

- * Now: - * 64 bit ID 30 (millisecond high 30 )+10(instance_ID)+12(autoincrement)+12 (millisecond low 12) + *

+ * Now: + * 64 bit ID 30 (millisecond high 30 )+10(instance_ID)+12(autoincrement)+12 (millisecond low 12) */ static class IdWorker { private static final long TIMESTAMP_LOW_BITS = 12L; diff --git a/src/main/java/com/actiontech/dble/route/sequence/handler/IncrSequenceZKHandler.java b/src/main/java/com/actiontech/dble/route/sequence/handler/IncrSequenceZKHandler.java index 026d0dd1e..f55d99a9b 100644 --- a/src/main/java/com/actiontech/dble/route/sequence/handler/IncrSequenceZKHandler.java +++ b/src/main/java/com/actiontech/dble/route/sequence/handler/IncrSequenceZKHandler.java @@ -50,8 +50,16 @@ public class IncrSequenceZKHandler extends IncrSequenceHandler { private Properties props; @Override - public synchronized void load(boolean isLowerCaseTableNames) { - this.props = PropertiesUtil.loadProps(ConfigFileName.SEQUENCE_FILE_NAME, isLowerCaseTableNames); + public synchronized void load(String sequenceJson, boolean isLowerCaseTableNames) { + if (sequenceJson != null) { + // load cluster properties + SequenceConverter sequenceConverter = new SequenceConverter(); + this.props = sequenceConverter.jsonToProperties(sequenceJson); + this.props = PropertiesUtil.handleLowerCase(this.props, isLowerCaseTableNames); + } else { + // load local properties + this.props = PropertiesUtil.loadProps(ConfigFileName.SEQUENCE_FILE_NAME, isLowerCaseTableNames); + } String zkAddress = ClusterConfig.getInstance().getClusterIP(); if (zkAddress == null) { throw new RuntimeException("please check ClusterIP is correct in config file \"cluster.cnf\" ."); @@ -64,18 +72,10 @@ public class IncrSequenceZKHandler extends IncrSequenceHandler { } @Override - public void loadByJson(boolean isLowerCaseTableNames, String sequenceJson) { - SequenceConverter sequenceConverter = new SequenceConverter(); - this.props = sequenceConverter.jsonToProperties(sequenceJson); - this.props = PropertiesUtil.handleLowerCase(this.props, isLowerCaseTableNames); - String zkAddress = ClusterConfig.getInstance().getClusterIP(); - if (zkAddress == null) { - throw new RuntimeException("please check ClusterIP is correct in config file \"cluster.cnf\" ."); - } - try { - initializeZK(this.props, zkAddress); - } catch (Exception e) { - LOGGER.warn("Error caught while initializing ZK:" + e.getCause()); + public void tryLoad(String sequenceJson, boolean isLowerCaseTableNames) { + load(sequenceJson, isLowerCaseTableNames); + if (client != null) { + client.close(); } } diff --git a/src/main/java/com/actiontech/dble/route/sequence/handler/SequenceHandler.java b/src/main/java/com/actiontech/dble/route/sequence/handler/SequenceHandler.java index 0e1026b19..4ca390197 100644 --- a/src/main/java/com/actiontech/dble/route/sequence/handler/SequenceHandler.java +++ b/src/main/java/com/actiontech/dble/route/sequence/handler/SequenceHandler.java @@ -5,6 +5,8 @@ */ package com.actiontech.dble.route.sequence.handler; +import com.actiontech.dble.config.util.ConfigException; + import java.sql.SQLNonTransientException; /** @@ -16,9 +18,9 @@ public interface SequenceHandler { long nextId(String prefixName) throws SQLNonTransientException; - void load(boolean isLowerCaseTableNames); - - default void loadByJson(boolean isLowerCaseTableNames, String sequenceJson) { + default void tryLoad(String sequenceJson, boolean isLowerCaseTableNames) throws ConfigException { } + void load(String sequenceJson, boolean isLowerCaseTableNames); + } diff --git a/src/main/java/com/actiontech/dble/services/manager/response/DryRun.java b/src/main/java/com/actiontech/dble/services/manager/response/DryRun.java index 3a9c88146..7fb0ae51e 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/DryRun.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/DryRun.java @@ -125,11 +125,11 @@ public final class DryRun { } else { try { if (newSystemVariables.isLowerCaseTableNames()) { - serverConfig.reviseLowerCase(loader.getSequenceConfig()); - } else { - serverConfig.loadSequence(loader.getSequenceConfig()); - serverConfig.selfChecking0(); + serverConfig.reviseLowerCase(); } + serverConfig.tryLoadSequence(loader.getSequenceConfig()); + serverConfig.selfChecking0(); + Map> schemaMap = getExistSchemas(serverConfig); //table exists check ,if the vars can not be touch ,the table check has no meaning tableExistsCheck(list, serverConfig, newSystemVariables.isLowerCaseTableNames(), schemaMap); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java b/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java index f56fafb60..403f1a43d 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java @@ -318,12 +318,11 @@ public final class ReloadConfig { if (loader.isFullyConfigured()) { if (newSystemVariables.isLowerCaseTableNames()) { ReloadLogHelper.info("reload config: dbGroup's lowerCaseTableNames=1, lower the config properties start", LOGGER); - serverConfig.reviseLowerCase(loader.getSequenceConfig()); + serverConfig.reviseLowerCase(); ReloadLogHelper.info("reload config: dbGroup's lowerCaseTableNames=1, lower the config properties end", LOGGER); - } else { - serverConfig.loadSequence(loader.getSequenceConfig()); - serverConfig.selfChecking0(); } + serverConfig.reloadSequence(loader.getSequenceConfig()); + serverConfig.selfChecking0(); } checkTestConnIfNeed(loadAllMode, loader); @@ -428,12 +427,11 @@ public final class ReloadConfig { if (loader.isFullyConfigured()) { if (newSystemVariables.isLowerCaseTableNames()) { ReloadLogHelper.info("reload config: dbGroup's lowerCaseTableNames=1, lower the config properties start", LOGGER); - serverConfig.reviseLowerCase(loader.getSequenceConfig()); + serverConfig.reviseLowerCase(); ReloadLogHelper.info("reload config: dbGroup's lowerCaseTableNames=1, lower the config properties end", LOGGER); - } else { - serverConfig.loadSequence(loader.getSequenceConfig()); - serverConfig.selfChecking0(); } + serverConfig.reloadSequence(loader.getSequenceConfig()); + serverConfig.selfChecking0(); } checkTestConnIfNeed(loadAllMode, loader); diff --git a/src/main/java/com/actiontech/dble/singleton/SequenceManager.java b/src/main/java/com/actiontech/dble/singleton/SequenceManager.java index 8cac49d9a..c0e237d83 100644 --- a/src/main/java/com/actiontech/dble/singleton/SequenceManager.java +++ b/src/main/java/com/actiontech/dble/singleton/SequenceManager.java @@ -1,7 +1,11 @@ package com.actiontech.dble.singleton; +import com.actiontech.dble.DbleServer; import com.actiontech.dble.config.model.ClusterConfig; import com.actiontech.dble.route.sequence.handler.*; +import com.google.common.collect.Sets; + +import java.util.Set; /** * Created by szf on 2019/9/19. @@ -14,41 +18,66 @@ public final class SequenceManager { } - public static void init(int seqHandlerType) { + public static void init() { + int seqHandlerType = ClusterConfig.getInstance().getSequenceHandlerType(); + INSTANCE.handler = newSequenceHandler(seqHandlerType); + } + + private static SequenceHandler newSequenceHandler(int seqHandlerType) { switch (seqHandlerType) { case ClusterConfig.SEQUENCE_HANDLER_MYSQL: - INSTANCE.handler = new IncrSequenceMySQLHandler(); - break; + return new IncrSequenceMySQLHandler(); case ClusterConfig.SEQUENCE_HANDLER_LOCAL_TIME: - INSTANCE.handler = new IncrSequenceTimeHandler(); - break; + return new IncrSequenceTimeHandler(); case ClusterConfig.SEQUENCE_HANDLER_ZK_DISTRIBUTED: if (ClusterConfig.getInstance().isClusterEnable() && ClusterConfig.getInstance().useZkMode()) { - INSTANCE.handler = new DistributedSequenceHandler(); + return new DistributedSequenceHandler(); } else { throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType + " for no-zk cluster"); } - break; case ClusterConfig.SEQUENCE_HANDLER_ZK_GLOBAL_INCREMENT: if (ClusterConfig.getInstance().isClusterEnable() && ClusterConfig.getInstance().useZkMode()) { - INSTANCE.handler = new IncrSequenceZKHandler(); + return new IncrSequenceZKHandler(); } else { throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType + " for no-zk cluster"); } - break; default: throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType); } } - public static void load(boolean lowerCaseTableNames) { - INSTANCE.handler.load(lowerCaseTableNames); + public static void load(String sequenceJson) { + if (INSTANCE.handler == null) + return; + INSTANCE.handler.load(sequenceJson, DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames()); } - public static void load(boolean lowerCaseTableNames, String sequenceJson) { - INSTANCE.handler.loadByJson(lowerCaseTableNames, sequenceJson); + public static void reload(String sequenceJson) { + if (INSTANCE.handler == null) + return; + int seqHandlerType = ClusterConfig.getInstance().getSequenceHandlerType(); + switch (seqHandlerType) { + case ClusterConfig.SEQUENCE_HANDLER_MYSQL: + case ClusterConfig.SEQUENCE_HANDLER_ZK_GLOBAL_INCREMENT: + INSTANCE.handler.load(sequenceJson, DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames()); + break; + default: + break; + } } + public static void tryLoad(String sequenceJson) { + int seqHandlerType = ClusterConfig.getInstance().getSequenceHandlerType(); + switch (seqHandlerType) { + case ClusterConfig.SEQUENCE_HANDLER_MYSQL: + case ClusterConfig.SEQUENCE_HANDLER_ZK_GLOBAL_INCREMENT: + SequenceHandler tmpHandler = newSequenceHandler(seqHandlerType); + tmpHandler.tryLoad(sequenceJson, DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames()); + break; + default: + break; + } + } public static SequenceManager getInstance() { return INSTANCE; } @@ -56,4 +85,11 @@ public final class SequenceManager { public static SequenceHandler getHandler() { return INSTANCE.handler; } + + public static Set getShardingNodes(String sequenceJson) { + if (ClusterConfig.getInstance().getSequenceHandlerType() == ClusterConfig.SEQUENCE_HANDLER_MYSQL && sequenceJson != null) { + return IncrSequenceMySQLHandler.getShardingNodes(sequenceJson); + } + return Sets.newHashSet(); + } }