mirror of
https://github.com/actiontech/dble.git
synced 2026-05-04 13:30:16 -05:00
[inner-2205] should update the latest sequence_conf.properties when sequenceHandlerType=4 and executing 'reload @@config'
This commit is contained in:
@@ -128,7 +128,7 @@ public final class ClusterConfig {
|
||||
if (sequenceHandlerType >= 1 && sequenceHandlerType <= 4) {
|
||||
this.sequenceHandlerType = sequenceHandlerType;
|
||||
} else {
|
||||
problemReporter.warn("sequenceHandlerType value is " + sequenceHandlerType + ", it will use default value:" + this.sequenceHandlerType);
|
||||
problemReporter.warn("sequenceHandlerType value is " + sequenceHandlerType + ", you can use default value:" + this.sequenceHandlerType);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+17
-8
@@ -23,10 +23,7 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
@@ -48,12 +45,14 @@ public class IncrSequenceZKHandler extends IncrSequenceHandler {
|
||||
private static final String SEQ = "/seq";
|
||||
|
||||
private ThreadLocal<Map<String, Map<String, String>>> tableParaValMapThreadLocal = new ThreadLocal<>();
|
||||
private Set<Thread> threadList = new HashSet<>();
|
||||
private Set<Thread> removeThreadList = new HashSet<>();
|
||||
|
||||
private CuratorFramework client;
|
||||
private ThreadLocal<InterProcessSemaphoreMutex> interProcessSemaphoreMutexThreadLocal = new ThreadLocal<>();
|
||||
private Properties props;
|
||||
|
||||
public void load(boolean isLowerCaseTableNames) {
|
||||
public synchronized void load(boolean isLowerCaseTableNames) {
|
||||
this.props = PropertiesUtil.loadProps(ConfigFileName.SEQUENCE_FILE_NAME, isLowerCaseTableNames);
|
||||
String zkAddress = ClusterConfig.getInstance().getClusterIP();
|
||||
if (zkAddress == null) {
|
||||
@@ -99,6 +98,11 @@ public class IncrSequenceZKHandler extends IncrSequenceHandler {
|
||||
this.client = CuratorFrameworkFactory.newClient(zkAddress, new ExponentialBackoffRetry(1000, 3));
|
||||
this.client.start();
|
||||
this.props = properties;
|
||||
this.tableParaValMapThreadLocal.remove();
|
||||
this.interProcessSemaphoreMutexThreadLocal.remove();
|
||||
this.removeThreadList.addAll(threadList);
|
||||
this.threadList.clear();
|
||||
this.removeThreadList.remove(Thread.currentThread());
|
||||
}
|
||||
|
||||
private void handle(String key) throws Exception {
|
||||
@@ -139,14 +143,19 @@ public class IncrSequenceZKHandler extends IncrSequenceHandler {
|
||||
|
||||
@Override
|
||||
public Map<String, String> getParaValMap(String prefixName) {
|
||||
if (this.removeThreadList.remove(Thread.currentThread())) {
|
||||
this.interProcessSemaphoreMutexThreadLocal.remove();
|
||||
this.tableParaValMapThreadLocal.remove();
|
||||
}
|
||||
Map<String, Map<String, String>> tableParaValMap = tableParaValMapThreadLocal.get();
|
||||
if (tableParaValMap == null) {
|
||||
try {
|
||||
threadLocalLoad();
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Error caught while loding configuration within current thread:" + e.getCause());
|
||||
LOGGER.warn("Error caught while loading configuration within current thread:" + e.getCause());
|
||||
}
|
||||
tableParaValMap = tableParaValMapThreadLocal.get();
|
||||
threadList.add(Thread.currentThread());
|
||||
}
|
||||
return tableParaValMap.get(prefixName);
|
||||
}
|
||||
@@ -219,13 +228,13 @@ public class IncrSequenceZKHandler extends IncrSequenceHandler {
|
||||
return super.nextId(prefixName, frontendService);
|
||||
}
|
||||
|
||||
public void detach() throws Exception {
|
||||
public synchronized void detach() throws Exception {
|
||||
if (this.client != null) {
|
||||
this.client.close();
|
||||
}
|
||||
}
|
||||
|
||||
public void attach() throws Exception {
|
||||
public synchronized void attach() throws Exception {
|
||||
String zkAddress = ClusterConfig.getInstance().getClusterIP();
|
||||
if (zkAddress == null) {
|
||||
throw new RuntimeException("please check ClusterIP is correct in config file \"cluster.cnf\" .");
|
||||
|
||||
@@ -32,18 +32,18 @@ public final class SequenceManager {
|
||||
if (ClusterConfig.getInstance().isClusterEnable() && ClusterConfig.getInstance().useZkMode()) {
|
||||
INSTANCE.handler = new DistributedSequenceHandler();
|
||||
} else {
|
||||
throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType + " for no-zk clusetr");
|
||||
throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType + " for no-zk cluster");
|
||||
}
|
||||
break;
|
||||
case ClusterConfig.SEQUENCE_HANDLER_ZK_GLOBAL_INCREMENT:
|
||||
if (ClusterConfig.getInstance().isClusterEnable() && ClusterConfig.getInstance().useZkMode()) {
|
||||
INSTANCE.handler = new IncrSequenceZKHandler();
|
||||
} else {
|
||||
throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType + " for no-zk clusetr");
|
||||
throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType + " for no-zk cluster");
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new java.lang.IllegalArgumentException("Invalid sequnce handler type " + seqHandlerType);
|
||||
throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user