mirror of
https://github.com/actiontech/dble.git
synced 2026-01-01 18:30:43 -06:00
[inner-2205] should update the latest sequence_conf.properties when sequenceHandlerType=4 and executing 'reload @@config'
This commit is contained in:
@@ -138,7 +138,7 @@ public final class ClusterConfig {
|
||||
if (sequenceHandlerType >= 1 && sequenceHandlerType <= 4) {
|
||||
this.sequenceHandlerType = sequenceHandlerType;
|
||||
} else {
|
||||
problemReporter.warn("sequenceHandlerType value is " + sequenceHandlerType + ", it will use default value:" + this.sequenceHandlerType);
|
||||
problemReporter.warn("sequenceHandlerType value is " + sequenceHandlerType + ", you can use default value:" + this.sequenceHandlerType);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -46,12 +46,14 @@ public class IncrSequenceZKHandler extends IncrSequenceHandler {
|
||||
private static final String SEQ = "/seq";
|
||||
|
||||
private ThreadLocal<Map<String, Map<String, String>>> tableParaValMapThreadLocal = new ThreadLocal<>();
|
||||
private Set<Thread> threadList = new HashSet<>();
|
||||
private Set<Thread> removeThreadList = new HashSet<>();
|
||||
|
||||
private CuratorFramework client;
|
||||
private ThreadLocal<InterProcessSemaphoreMutex> interProcessSemaphoreMutexThreadLocal = new ThreadLocal<>();
|
||||
private Properties props;
|
||||
|
||||
public void load(RawJson sequenceJson, Set<String> currentShardingNodes) {
|
||||
public synchronized void load(RawJson sequenceJson, Set<String> currentShardingNodes) {
|
||||
if (sequenceJson != null) {
|
||||
// load cluster properties
|
||||
SequenceConverter sequenceConverter = new SequenceConverter();
|
||||
@@ -95,6 +97,11 @@ public class IncrSequenceZKHandler extends IncrSequenceHandler {
|
||||
this.client = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(1000, 3));
|
||||
this.client.start();
|
||||
this.props = properties;
|
||||
this.tableParaValMapThreadLocal.remove();
|
||||
this.interProcessSemaphoreMutexThreadLocal.remove();
|
||||
this.removeThreadList.addAll(threadList);
|
||||
this.threadList.clear();
|
||||
this.removeThreadList.remove(Thread.currentThread());
|
||||
}
|
||||
|
||||
private void handle(String key) throws Exception {
|
||||
@@ -135,14 +142,20 @@ public class IncrSequenceZKHandler extends IncrSequenceHandler {
|
||||
|
||||
@Override
|
||||
public Object[] getParaValMap(String prefixName) {
|
||||
if (this.removeThreadList.remove(Thread.currentThread())) {
|
||||
this.interProcessSemaphoreMutexThreadLocal.remove();
|
||||
this.tableParaValMapThreadLocal.remove();
|
||||
}
|
||||
if (props.entrySet().isEmpty()) return null;
|
||||
Map<String, Map<String, String>> tableParaValMap = tableParaValMapThreadLocal.get();
|
||||
if (tableParaValMap == null) {
|
||||
try {
|
||||
threadLocalLoad();
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Error caught while loding configuration within current thread:" + e.getCause());
|
||||
LOGGER.warn("Error caught while loading configuration within current thread:" + e.getCause());
|
||||
}
|
||||
tableParaValMap = tableParaValMapThreadLocal.get();
|
||||
threadList.add(Thread.currentThread());
|
||||
}
|
||||
return matching(prefixName, tableParaValMap);
|
||||
}
|
||||
@@ -227,13 +240,13 @@ public class IncrSequenceZKHandler extends IncrSequenceHandler {
|
||||
return super.nextId(prefixName, frontendService);
|
||||
}
|
||||
|
||||
public void detach() throws Exception {
|
||||
public synchronized void detach() throws Exception {
|
||||
if (this.client != null) {
|
||||
this.client.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void attach() throws Exception {
|
||||
public synchronized void attach() throws Exception {
|
||||
String zkAddress = ClusterConfig.getInstance().getClusterIP();
|
||||
if (zkAddress == null) {
|
||||
throw new RuntimeException("please check ClusterIP is correct in config file \"cluster.cnf\" .");
|
||||
|
||||
@@ -43,16 +43,16 @@ public final class SequenceManager {
|
||||
if (ClusterConfig.getInstance().isClusterEnable() && ClusterConfig.getInstance().useZkMode()) {
|
||||
return new DistributedSequenceHandler();
|
||||
} else {
|
||||
throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType + " for no-zk clusetr");
|
||||
throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType + " for no-zk cluster");
|
||||
}
|
||||
case ClusterConfig.SEQUENCE_HANDLER_ZK_GLOBAL_INCREMENT:
|
||||
if (ClusterConfig.getInstance().isClusterEnable() && ClusterConfig.getInstance().useZkMode()) {
|
||||
return new IncrSequenceZKHandler();
|
||||
} else {
|
||||
throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType + " for no-zk clusetr");
|
||||
throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType + " for no-zk cluster");
|
||||
}
|
||||
default:
|
||||
throw new java.lang.IllegalArgumentException("Invalid sequnce handler type " + seqHandlerType);
|
||||
throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user