mirror of
https://github.com/actiontech/dble.git
synced 2026-01-06 04:40:17 -06:00
[inner-1576&2161] adjust the trigger global sequence's load
This commit is contained in:
@@ -245,7 +245,7 @@ public final class DbleServer {
|
||||
}
|
||||
SqlDumpLogHelper.init();
|
||||
|
||||
SequenceManager.init(ClusterConfig.getInstance().getSequenceHandlerType());
|
||||
SequenceManager.init();
|
||||
LOGGER.info("===================================Sequence manager init finish===================================");
|
||||
|
||||
LOGGER.info("==============================Pull metaData from MySQL start======================================");
|
||||
@@ -451,12 +451,12 @@ public final class DbleServer {
|
||||
private void reviseSchemas() {
|
||||
if (systemVariables.isLowerCaseTableNames()) {
|
||||
config.reviseLowerCase();
|
||||
config.loadSequence(DbleTempConfig.getInstance().getSequenceConfig());
|
||||
config.selfChecking0();
|
||||
config.loadSequence(DbleTempConfig.getInstance().getSequenceConfig());
|
||||
ConfigUtil.setSchemasForPool(config.getDbGroups(), config.getShardingNodes());
|
||||
} else {
|
||||
config.loadSequence(DbleTempConfig.getInstance().getSequenceConfig());
|
||||
config.selfChecking0();
|
||||
config.loadSequence(DbleTempConfig.getInstance().getSequenceConfig());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -787,16 +787,16 @@ public class ServerConfig {
|
||||
}
|
||||
}
|
||||
|
||||
private void loadSequence() {
|
||||
SequenceManager.load(DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames());
|
||||
public void reloadSequence(RawJson sequenceJson) {
|
||||
SequenceManager.reload(sequenceJson, this.getShardingNodes().keySet());
|
||||
}
|
||||
|
||||
public void loadSequence(RawJson sequenceJson) {
|
||||
if (sequenceJson == null) {
|
||||
loadSequence();
|
||||
} else {
|
||||
SequenceManager.load(DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames(), sequenceJson);
|
||||
}
|
||||
SequenceManager.load(sequenceJson, this.getShardingNodes().keySet());
|
||||
}
|
||||
|
||||
public void tryLoadSequence(RawJson sequenceJson) {
|
||||
SequenceManager.tryLoad(sequenceJson, this.getShardingNodes().keySet());
|
||||
}
|
||||
|
||||
public void selfChecking0() throws ConfigException {
|
||||
|
||||
@@ -22,7 +22,6 @@ import com.actiontech.dble.config.ConfigFileName;
|
||||
import com.actiontech.dble.config.ErrorInfo;
|
||||
import com.actiontech.dble.config.ProblemReporter;
|
||||
import com.actiontech.dble.config.Versions;
|
||||
import com.actiontech.dble.config.model.ClusterConfig;
|
||||
import com.actiontech.dble.config.model.db.type.DataBaseType;
|
||||
import com.actiontech.dble.config.model.sharding.SchemaConfig;
|
||||
import com.actiontech.dble.config.model.sharding.ShardingNodeConfig;
|
||||
@@ -30,7 +29,7 @@ import com.actiontech.dble.config.model.sharding.table.*;
|
||||
import com.actiontech.dble.config.util.ConfigException;
|
||||
import com.actiontech.dble.config.util.ParameterMapping;
|
||||
import com.actiontech.dble.route.function.*;
|
||||
import com.actiontech.dble.route.sequence.handler.IncrSequenceMySQLHandler;
|
||||
import com.actiontech.dble.singleton.SequenceManager;
|
||||
import com.actiontech.dble.util.SplitUtil;
|
||||
import com.actiontech.dble.util.StringUtil;
|
||||
import com.google.common.collect.Lists;
|
||||
@@ -469,11 +468,7 @@ public class ShardingConverter {
|
||||
}
|
||||
|
||||
// add global sequence node when it is some dedicated servers */
|
||||
if (ClusterConfig.getInstance().getSequenceHandlerType() == ClusterConfig.SEQUENCE_HANDLER_MYSQL && sequenceJson != null) {
|
||||
IncrSequenceMySQLHandler redundancy = new IncrSequenceMySQLHandler();
|
||||
redundancy.loadByJson(false, sequenceJson);
|
||||
allUseShardingNode.addAll(redundancy.getShardingNodes());
|
||||
}
|
||||
allUseShardingNode.addAll(SequenceManager.getShardingNodes(sequenceJson));
|
||||
|
||||
//delete redundancy shardingNode
|
||||
Iterator<Map.Entry<String, com.actiontech.dble.backend.datasource.ShardingNode>> iterator = this.shardingNodeMap.entrySet().iterator();
|
||||
|
||||
@@ -439,7 +439,7 @@ public class DruidInsertParser extends DruidInsertReplaceParser {
|
||||
int iValue = 0;
|
||||
for (int i = 0; i < colSize; i++) {
|
||||
if (i == autoIncrement) {
|
||||
long id = SequenceManager.getHandler().nextId(tableKey, service);
|
||||
long id = SequenceManager.nextId(tableKey, service);
|
||||
sb.append(id);
|
||||
} else {
|
||||
String value = SQLUtils.toMySqlString(values.get(iValue++));
|
||||
|
||||
@@ -262,7 +262,7 @@ public class DruidReplaceParser extends DruidInsertReplaceParser {
|
||||
sb.append(new Date().getTime());
|
||||
} else if (i == autoIncrement) {
|
||||
if (checkSize > size) {
|
||||
long id = SequenceManager.getHandler().nextId(tableKey, service);
|
||||
long id = SequenceManager.nextId(tableKey, service);
|
||||
sb.append(id);
|
||||
} else {
|
||||
String value = SQLUtils.toMySqlString(values.get(iValue++));
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
package com.actiontech.dble.route.sequence.handler;
|
||||
|
||||
|
||||
import com.actiontech.dble.cluster.values.RawJson;
|
||||
import com.actiontech.dble.config.model.ClusterConfig;
|
||||
import com.actiontech.dble.config.model.SystemConfig;
|
||||
import com.actiontech.dble.services.FrontendService;
|
||||
@@ -22,6 +23,7 @@ import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.sql.SQLNonTransientException;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
@@ -62,7 +64,7 @@ public class DistributedSequenceHandler implements Closeable, SequenceHandler {
|
||||
return DistributedSequenceHandler.instance;
|
||||
}
|
||||
|
||||
public void load(boolean isLowerCaseTableNames) {
|
||||
public void load(RawJson sequenceJson, Set<String> currentShardingNodes) {
|
||||
if (ClusterConfig.getInstance().isSequenceInstanceByZk()) {
|
||||
initializeZK();
|
||||
loadInstanceIdByZK();
|
||||
@@ -73,6 +75,13 @@ public class DistributedSequenceHandler implements Closeable, SequenceHandler {
|
||||
this.deadline = startTimeMilliseconds + (1L << 39);
|
||||
}
|
||||
|
||||
public void tryLoad(RawJson sequenceJson, Set<String> currentShardingNodes) {
|
||||
load(sequenceJson, currentShardingNodes);
|
||||
if (client != null) {
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
|
||||
private void loadInstanceIdByConfig() {
|
||||
this.instanceId = SystemConfig.getInstance().getInstanceId();
|
||||
long maxInstanceId = ~(-1L << instanceIdBits);
|
||||
|
||||
@@ -21,6 +21,7 @@ import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.SQLNonTransientException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@@ -29,7 +30,7 @@ import java.util.List;
|
||||
public class FetchMySQLSequenceHandler implements ResponseHandler {
|
||||
protected static final Logger LOGGER = LoggerFactory.getLogger(FetchMySQLSequenceHandler.class);
|
||||
|
||||
public void execute(SequenceVal seqVal) {
|
||||
public void execute(SequenceVal seqVal) throws SQLNonTransientException {
|
||||
ServerConfig conf = DbleServer.getInstance().getConfig();
|
||||
ShardingNode mysqlDN = conf.getShardingNodes().get(seqVal.shardingNode);
|
||||
try {
|
||||
@@ -43,6 +44,7 @@ public class FetchMySQLSequenceHandler implements ResponseHandler {
|
||||
seqVal.sql), this, seqVal);
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("get connection err: " + e);
|
||||
throw new SQLNonTransientException(e.getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ public abstract class IncrSequenceHandler implements SequenceHandler {
|
||||
public static final String KEY_MAX_NAME = ".MAXID"; // 10000
|
||||
public static final String KEY_CUR_NAME = ".CURID"; // 888
|
||||
|
||||
public abstract Map<String, String> getParaValMap(String prefixName);
|
||||
public abstract Object[] getParaValMap(String prefixName);
|
||||
|
||||
public abstract Boolean updateCurIDVal(String prefixName, Long val);
|
||||
|
||||
@@ -36,19 +36,21 @@ public abstract class IncrSequenceHandler implements SequenceHandler {
|
||||
|
||||
@Override
|
||||
public synchronized long nextId(String prefixName, @Nullable FrontendService frontendService) {
|
||||
Map<String, String> paraMap = this.getParaValMap(prefixName);
|
||||
if (null == paraMap) {
|
||||
Object[] objects = this.getParaValMap(prefixName);
|
||||
if (null == objects) {
|
||||
String msg = "can't find definition for sequence :" + prefixName;
|
||||
LOGGER.info(msg);
|
||||
throw new ConfigException(msg);
|
||||
}
|
||||
long nextId = Long.parseLong(paraMap.get(prefixName + KEY_CUR_NAME)) + 1;
|
||||
long maxId = Long.parseLong(paraMap.get(prefixName + KEY_MAX_NAME));
|
||||
String prefixTable = (String) objects[0];
|
||||
Map<String, String> paraMap = (Map<String, String>) objects[1];
|
||||
long nextId = Long.parseLong(paraMap.get(prefixTable + KEY_CUR_NAME)) + 1;
|
||||
long maxId = Long.parseLong(paraMap.get(prefixTable + KEY_MAX_NAME));
|
||||
if (nextId > maxId) {
|
||||
fetchNextPeriod(prefixName);
|
||||
return nextId(prefixName, frontendService);
|
||||
fetchNextPeriod(prefixTable);
|
||||
return nextId(prefixTable, frontendService);
|
||||
}
|
||||
updateCurIDVal(prefixName, nextId);
|
||||
updateCurIDVal(prefixTable, nextId);
|
||||
return nextId;
|
||||
|
||||
}
|
||||
|
||||
@@ -5,12 +5,13 @@
|
||||
|
||||
package com.actiontech.dble.route.sequence.handler;
|
||||
|
||||
import com.actiontech.dble.DbleServer;
|
||||
import com.actiontech.dble.cluster.values.RawJson;
|
||||
import com.actiontech.dble.config.ConfigFileName;
|
||||
import com.actiontech.dble.config.converter.SequenceConverter;
|
||||
import com.actiontech.dble.config.util.ConfigException;
|
||||
import com.actiontech.dble.route.util.PropertiesUtil;
|
||||
import com.actiontech.dble.services.FrontendService;
|
||||
import org.apache.logging.log4j.util.Strings;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -18,54 +19,59 @@ import java.sql.SQLNonTransientException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import static com.actiontech.dble.config.ConfigFileName.SEQUENCE_DB_FILE_NAME;
|
||||
|
||||
public class IncrSequenceMySQLHandler implements SequenceHandler {
|
||||
|
||||
protected static final Logger LOGGER = LoggerFactory.getLogger(IncrSequenceMySQLHandler.class);
|
||||
protected static final String ERR_SEQ_RESULT = "-999999999,null";
|
||||
protected static final Map<String, String> LATEST_ERRORS = new ConcurrentHashMap<>();
|
||||
private final FetchMySQLSequenceHandler mysqlSeqFetcher = new FetchMySQLSequenceHandler();
|
||||
private static Set<String> shardingNodes = new HashSet<>();
|
||||
|
||||
public void load(boolean isLowerCaseTableNames) {
|
||||
// load sequence properties
|
||||
Properties props = PropertiesUtil.loadProps(ConfigFileName.SEQUENCE_DB_FILE_NAME, isLowerCaseTableNames);
|
||||
removeDesertedSequenceVals(props);
|
||||
putNewSequenceVals(props);
|
||||
public void load(RawJson sequenceJson, Set<String> currentShardingNodes) {
|
||||
Properties props;
|
||||
if (sequenceJson != null) {
|
||||
// load cluster properties
|
||||
SequenceConverter sequenceConverter = new SequenceConverter();
|
||||
props = sequenceConverter.jsonToProperties(sequenceJson);
|
||||
} else {
|
||||
// load local properties
|
||||
props = PropertiesUtil.loadProps(SEQUENCE_DB_FILE_NAME);
|
||||
}
|
||||
|
||||
loadCheck(props, currentShardingNodes);
|
||||
loadContext(props);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadByJson(boolean isLowerCaseTableNames, RawJson sequenceJson) {
|
||||
SequenceConverter sequenceConverter = new SequenceConverter();
|
||||
Properties props = sequenceConverter.jsonToProperties(sequenceJson);
|
||||
props = PropertiesUtil.handleLowerCase(props, isLowerCaseTableNames);
|
||||
removeDesertedSequenceVals(props);
|
||||
putNewSequenceVals(props);
|
||||
}
|
||||
|
||||
public Set<String> getShardingNodes() {
|
||||
return shardingNodes;
|
||||
}
|
||||
|
||||
private void removeDesertedSequenceVals(Properties props) {
|
||||
Iterator<Map.Entry<String, SequenceVal>> i = seqValueMap.entrySet().iterator();
|
||||
while (i.hasNext()) {
|
||||
Map.Entry<String, SequenceVal> entry = i.next();
|
||||
if (!props.containsKey(entry.getKey())) {
|
||||
i.remove();
|
||||
private void loadCheck(Properties props, Set<String> currentShardingNodes) {
|
||||
Set<String> noExistShardingNodes = new HashSet<>();
|
||||
props.entrySet().stream().forEach(entry -> {
|
||||
String shardingNode = (String) entry.getValue();
|
||||
if (!currentShardingNodes.contains(shardingNode)) {
|
||||
noExistShardingNodes.add(shardingNode);
|
||||
}
|
||||
});
|
||||
if (!noExistShardingNodes.isEmpty()) {
|
||||
throw new ConfigException("the shardingNodes[" + Strings.join(noExistShardingNodes, ',') + "] of the " + SEQUENCE_DB_FILE_NAME + " in sharding.xml does not exist");
|
||||
}
|
||||
}
|
||||
|
||||
private void putNewSequenceVals(Properties props) {
|
||||
for (Map.Entry<Object, Object> entry : props.entrySet()) {
|
||||
public void loadContext(Properties props) {
|
||||
seqValueMap.clear();
|
||||
props.entrySet().stream().forEach(entry -> {
|
||||
String seqName = (String) entry.getKey();
|
||||
String shardingNode = (String) entry.getValue();
|
||||
SequenceVal value = seqValueMap.putIfAbsent(seqName, new SequenceVal(seqName, shardingNode));
|
||||
if (value != null) {
|
||||
value.shardingNode = shardingNode;
|
||||
}
|
||||
shardingNodes.add(shardingNode);
|
||||
}
|
||||
seqValueMap.putIfAbsent(seqName, new SequenceVal(seqName, shardingNode));
|
||||
});
|
||||
}
|
||||
|
||||
public static Set<String> getShardingNodes(RawJson sequenceJson) {
|
||||
Set<String> shardingNodes = new HashSet<>();
|
||||
Properties propsTmp = (new SequenceConverter()).jsonToProperties(sequenceJson);
|
||||
propsTmp.entrySet().stream().forEach(entry -> {
|
||||
shardingNodes.add((String) entry.getValue());
|
||||
});
|
||||
return shardingNodes;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -75,7 +81,7 @@ public class IncrSequenceMySQLHandler implements SequenceHandler {
|
||||
|
||||
@Override
|
||||
public long nextId(String seqName, FrontendService frontendService) throws SQLNonTransientException {
|
||||
SequenceVal seqVal = seqValueMap.get(seqName);
|
||||
SequenceVal seqVal = matching(seqName);
|
||||
if (seqVal == null) {
|
||||
throw new ConfigException("can't find definition for sequence :" + seqName);
|
||||
}
|
||||
@@ -87,6 +93,19 @@ public class IncrSequenceMySQLHandler implements SequenceHandler {
|
||||
|
||||
}
|
||||
|
||||
private SequenceVal matching(String key) {
|
||||
if (DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames()) {
|
||||
Optional<Map.Entry<String, SequenceVal>> result = seqValueMap.entrySet().stream().filter(m -> m.getKey().equalsIgnoreCase(key)).findFirst();
|
||||
if (result.isPresent()) {
|
||||
return result.get().getValue();
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
return seqValueMap.get(key);
|
||||
}
|
||||
}
|
||||
|
||||
private Long getNextValidSeqVal(SequenceVal seqVal) throws SQLNonTransientException {
|
||||
long nexVal = seqVal.counter.getNext();
|
||||
if (nexVal != -1) {
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
|
||||
package com.actiontech.dble.route.sequence.handler;
|
||||
|
||||
import com.actiontech.dble.cluster.values.RawJson;
|
||||
import com.actiontech.dble.config.model.ClusterConfig;
|
||||
import com.actiontech.dble.config.model.SystemConfig;
|
||||
import com.actiontech.dble.services.FrontendService;
|
||||
@@ -12,13 +13,14 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.SQLNonTransientException;
|
||||
import java.util.Set;
|
||||
|
||||
public final class IncrSequenceTimeHandler implements SequenceHandler {
|
||||
protected static final Logger LOGGER = LoggerFactory.getLogger(IncrSequenceTimeHandler.class);
|
||||
|
||||
private IdWorker workey;
|
||||
|
||||
public void load(boolean isLowerCaseTableNames) {
|
||||
public void load(RawJson sequenceJson, Set<String> currentShardingNodes) {
|
||||
long startTimeMilliseconds = ClusterConfig.getInstance().sequenceStartTime();
|
||||
workey = new IdWorker(startTimeMilliseconds);
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
package com.actiontech.dble.route.sequence.handler;
|
||||
|
||||
|
||||
import com.actiontech.dble.DbleServer;
|
||||
import com.actiontech.dble.cluster.values.RawJson;
|
||||
import com.actiontech.dble.config.ConfigFileName;
|
||||
import com.actiontech.dble.config.converter.SequenceConverter;
|
||||
@@ -23,10 +24,7 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
@@ -53,8 +51,15 @@ public class IncrSequenceZKHandler extends IncrSequenceHandler {
|
||||
private ThreadLocal<InterProcessSemaphoreMutex> interProcessSemaphoreMutexThreadLocal = new ThreadLocal<>();
|
||||
private Properties props;
|
||||
|
||||
public void load(boolean isLowerCaseTableNames) {
|
||||
this.props = PropertiesUtil.loadProps(ConfigFileName.SEQUENCE_FILE_NAME, isLowerCaseTableNames);
|
||||
public void load(RawJson sequenceJson, Set<String> currentShardingNodes) {
|
||||
if (sequenceJson != null) {
|
||||
// load cluster properties
|
||||
SequenceConverter sequenceConverter = new SequenceConverter();
|
||||
this.props = sequenceConverter.jsonToProperties(sequenceJson);
|
||||
} else {
|
||||
// load local properties
|
||||
this.props = PropertiesUtil.loadProps(ConfigFileName.SEQUENCE_FILE_NAME);
|
||||
}
|
||||
String zkAddress = ClusterConfig.getInstance().getClusterIP();
|
||||
if (zkAddress == null) {
|
||||
throw new RuntimeException("please check ClusterIP is correct in config file \"cluster.cnf\" .");
|
||||
@@ -66,19 +71,10 @@ public class IncrSequenceZKHandler extends IncrSequenceHandler {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadByJson(boolean isLowerCaseTableNames, RawJson sequenceJson) {
|
||||
SequenceConverter sequenceConverter = new SequenceConverter();
|
||||
this.props = sequenceConverter.jsonToProperties(sequenceJson);
|
||||
this.props = PropertiesUtil.handleLowerCase(this.props, isLowerCaseTableNames);
|
||||
String zkAddress = ClusterConfig.getInstance().getClusterIP();
|
||||
if (zkAddress == null) {
|
||||
throw new RuntimeException("please check ClusterIP is correct in config file \"cluster.cnf\" .");
|
||||
}
|
||||
try {
|
||||
initializeZK(this.props, zkAddress);
|
||||
} catch (Exception e) {
|
||||
LOGGER.warn("Error caught while initializing ZK:" + e.getCause());
|
||||
public void tryLoad(RawJson sequenceJson, Set<String> currentShardingNodes) {
|
||||
load(sequenceJson, currentShardingNodes);
|
||||
if (client != null) {
|
||||
client.close();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -138,7 +134,7 @@ public class IncrSequenceZKHandler extends IncrSequenceHandler {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getParaValMap(String prefixName) {
|
||||
public Object[] getParaValMap(String prefixName) {
|
||||
Map<String, Map<String, String>> tableParaValMap = tableParaValMapThreadLocal.get();
|
||||
if (tableParaValMap == null) {
|
||||
try {
|
||||
@@ -148,7 +144,19 @@ public class IncrSequenceZKHandler extends IncrSequenceHandler {
|
||||
}
|
||||
tableParaValMap = tableParaValMapThreadLocal.get();
|
||||
}
|
||||
return tableParaValMap.get(prefixName);
|
||||
return matching(prefixName, tableParaValMap);
|
||||
}
|
||||
|
||||
private Object[] matching(String key, Map<String, Map<String, String>> map) {
|
||||
if (DbleServer.getInstance().getSystemVariables().isLowerCaseTableNames()) {
|
||||
Optional<Map.Entry<String, Map<String, String>>> result = map.entrySet().stream().filter(m -> m.getKey().equalsIgnoreCase(key)).findFirst();
|
||||
if (result.isPresent())
|
||||
return new Object[]{result.get().getKey(), result.get().getValue()};
|
||||
} else {
|
||||
if (map.containsKey(key))
|
||||
return new Object[]{key, map.get(key)};
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -6,10 +6,12 @@
|
||||
package com.actiontech.dble.route.sequence.handler;
|
||||
|
||||
import com.actiontech.dble.cluster.values.RawJson;
|
||||
import com.actiontech.dble.config.util.ConfigException;
|
||||
import com.actiontech.dble.services.FrontendService;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.sql.SQLNonTransientException;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* @author <a href="http://www.micmiu.com">Michael</a>
|
||||
@@ -18,11 +20,11 @@ import java.sql.SQLNonTransientException;
|
||||
*/
|
||||
public interface SequenceHandler {
|
||||
|
||||
long nextId(String prefixName, @Nullable FrontendService frontendService) throws SQLNonTransientException;
|
||||
void load(RawJson sequenceJson, Set<String> currentShardingNodes) throws ConfigException;
|
||||
|
||||
void load(boolean isLowerCaseTableNames);
|
||||
|
||||
default void loadByJson(boolean isLowerCaseTableNames, RawJson sequenceJson) {
|
||||
default void tryLoad(RawJson sequenceJson, Set<String> currentShardingNodes) throws ConfigException {
|
||||
}
|
||||
|
||||
long nextId(String prefixName, @Nullable FrontendService frontendService) throws SQLNonTransientException;
|
||||
|
||||
}
|
||||
|
||||
@@ -30,10 +30,6 @@ public final class PropertiesUtil {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(PropertiesUtil.class);
|
||||
|
||||
public static Properties loadProps(String propsFile, boolean isLowerCaseTableNames) {
|
||||
Properties props = loadProps(propsFile);
|
||||
return handleLowerCase(props, isLowerCaseTableNames);
|
||||
}
|
||||
|
||||
public static Properties loadProps(String propsFile) {
|
||||
Properties props = new Properties();
|
||||
|
||||
@@ -448,12 +448,12 @@ public final class ServerLoadDataInfileHandler implements LoadDataInfileHandler
|
||||
String[] newLine = new String[line.length + 1];
|
||||
System.arraycopy(line, 0, newLine, 0, line.length);
|
||||
String tableKey = StringUtil.getFullName(schema.getName(), tableName);
|
||||
newLine[line.length] = String.valueOf(SequenceManager.getHandler().nextId(tableKey, service));
|
||||
newLine[line.length] = String.valueOf(SequenceManager.nextId(tableKey, service));
|
||||
line = newLine;
|
||||
} else {
|
||||
if (StringUtil.isEmpty(line[autoIncrementIndex])) {
|
||||
String tableKey = StringUtil.getFullName(schema.getName(), tableName);
|
||||
line[autoIncrementIndex] = String.valueOf(SequenceManager.getHandler().nextId(tableKey, service));
|
||||
line[autoIncrementIndex] = String.valueOf(SequenceManager.nextId(tableKey, service));
|
||||
} else if (!appendAutoIncrementColumn) {
|
||||
throw new Exception("you can't set value for Autoincrement column!");
|
||||
}
|
||||
|
||||
@@ -25,7 +25,7 @@ public class ShardingValuesHandler extends DefaultValuesHandler {
|
||||
long incrementColumnVal = 0;
|
||||
if (context.getIncrementColumnIndex() != -1) {
|
||||
String tableKey = StringUtil.getFullName(context.getSchema(), context.getTable());
|
||||
incrementColumnVal = SequenceManager.getHandler().nextId(tableKey, null);
|
||||
incrementColumnVal = SequenceManager.nextId(tableKey, null);
|
||||
}
|
||||
context.getWriter().writeInsertHeader(shardingNode, new InsertQuery(insertQueryPos, valuePair, context.getIncrementColumnIndex(), incrementColumnVal));
|
||||
}
|
||||
|
||||
@@ -137,8 +137,8 @@ public final class DryRun {
|
||||
if (newSystemVariables.isLowerCaseTableNames()) {
|
||||
serverConfig.reviseLowerCase();
|
||||
}
|
||||
serverConfig.loadSequence(loader.getSequenceConfig());
|
||||
serverConfig.selfChecking0();
|
||||
serverConfig.tryLoadSequence(loader.getSequenceConfig());
|
||||
|
||||
Map<String, Set<String>> schemaMap = getExistSchemas(serverConfig);
|
||||
//table exists check ,if the vars can not be touch ,the table check has no meaning
|
||||
|
||||
@@ -274,17 +274,18 @@ public final class ReloadConfig {
|
||||
recycleOldBackendConnections(forceAllReload, (loadAllMode & ManagerParseConfig.OPTF_MODE) != 0);
|
||||
|
||||
|
||||
// lowerCase && load sequence
|
||||
// lowerCase
|
||||
if (loader.isFullyConfigured()) {
|
||||
if (newSystemVariables.isLowerCaseTableNames()) {
|
||||
ReloadLogHelper.briefInfo("dbGroup's lowerCaseTableNames=1, lower the config properties ...");
|
||||
newConfig.reviseLowerCase();
|
||||
}
|
||||
ReloadLogHelper.briefInfo("loadSequence ...");
|
||||
newConfig.loadSequence(loader.getSequenceConfig());
|
||||
ReloadLogHelper.briefInfo("selfChecking0 ...");
|
||||
newConfig.selfChecking0();
|
||||
}
|
||||
// load sequence
|
||||
ReloadLogHelper.briefInfo("loadSequence ...");
|
||||
newConfig.reloadSequence(loader.getSequenceConfig());
|
||||
|
||||
Map<UserName, UserConfig> newUsers = newConfig.getUsers();
|
||||
Map<String, SchemaConfig> newSchemas = newConfig.getSchemas();
|
||||
|
||||
@@ -7,59 +7,100 @@ package com.actiontech.dble.singleton;
|
||||
|
||||
import com.actiontech.dble.cluster.values.RawJson;
|
||||
import com.actiontech.dble.config.model.ClusterConfig;
|
||||
import com.actiontech.dble.config.util.ConfigException;
|
||||
import com.actiontech.dble.route.sequence.handler.*;
|
||||
import com.actiontech.dble.services.FrontendService;
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import java.sql.SQLNonTransientException;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Created by szf on 2019/9/19.
|
||||
*/
|
||||
public final class SequenceManager {
|
||||
private static final SequenceManager INSTANCE = new SequenceManager();
|
||||
private volatile SequenceHandler handler;
|
||||
private SequenceHandler handler;
|
||||
|
||||
private SequenceManager() {
|
||||
|
||||
}
|
||||
|
||||
public static void init(int seqHandlerType) {
|
||||
public static void init() {
|
||||
int seqHandlerType = ClusterConfig.getInstance().getSequenceHandlerType();
|
||||
INSTANCE.handler = newSequenceHandler(seqHandlerType);
|
||||
}
|
||||
|
||||
private static SequenceHandler newSequenceHandler(int seqHandlerType) {
|
||||
switch (seqHandlerType) {
|
||||
case ClusterConfig.SEQUENCE_HANDLER_MYSQL:
|
||||
INSTANCE.handler = new IncrSequenceMySQLHandler();
|
||||
break;
|
||||
return new IncrSequenceMySQLHandler();
|
||||
case ClusterConfig.SEQUENCE_HANDLER_LOCAL_TIME:
|
||||
INSTANCE.handler = new IncrSequenceTimeHandler();
|
||||
break;
|
||||
return new IncrSequenceTimeHandler();
|
||||
case ClusterConfig.SEQUENCE_HANDLER_ZK_DISTRIBUTED:
|
||||
if (ClusterConfig.getInstance().isClusterEnable() && ClusterConfig.getInstance().useZkMode()) {
|
||||
INSTANCE.handler = new DistributedSequenceHandler();
|
||||
return new DistributedSequenceHandler();
|
||||
} else {
|
||||
throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType + " for no-zk clusetr");
|
||||
}
|
||||
break;
|
||||
case ClusterConfig.SEQUENCE_HANDLER_ZK_GLOBAL_INCREMENT:
|
||||
if (ClusterConfig.getInstance().isClusterEnable() && ClusterConfig.getInstance().useZkMode()) {
|
||||
INSTANCE.handler = new IncrSequenceZKHandler();
|
||||
return new IncrSequenceZKHandler();
|
||||
} else {
|
||||
throw new java.lang.IllegalArgumentException("Invalid sequence handler type " + seqHandlerType + " for no-zk clusetr");
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new java.lang.IllegalArgumentException("Invalid sequnce handler type " + seqHandlerType);
|
||||
}
|
||||
}
|
||||
|
||||
public static void load(boolean lowerCaseTableNames) {
|
||||
INSTANCE.handler.load(lowerCaseTableNames);
|
||||
public static void load(RawJson sequenceJson, Set<String> currentShardingNodes) {
|
||||
if (INSTANCE.handler == null)
|
||||
return;
|
||||
INSTANCE.handler.load(sequenceJson, currentShardingNodes);
|
||||
}
|
||||
|
||||
public static void load(boolean lowerCaseTableNames, RawJson sequenceJson) {
|
||||
INSTANCE.handler.loadByJson(lowerCaseTableNames, sequenceJson);
|
||||
public static void reload(RawJson sequenceJson, Set<String> currentShardingNodes) {
|
||||
if (INSTANCE.handler == null)
|
||||
return;
|
||||
int seqHandlerType = ClusterConfig.getInstance().getSequenceHandlerType();
|
||||
switch (seqHandlerType) {
|
||||
case ClusterConfig.SEQUENCE_HANDLER_MYSQL:
|
||||
case ClusterConfig.SEQUENCE_HANDLER_ZK_GLOBAL_INCREMENT:
|
||||
INSTANCE.handler.load(sequenceJson, currentShardingNodes);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public static SequenceManager getInstance() {
|
||||
return INSTANCE;
|
||||
public static void tryLoad(RawJson sequenceJson, Set<String> currentShardingNodes) {
|
||||
int seqHandlerType = ClusterConfig.getInstance().getSequenceHandlerType();
|
||||
switch (seqHandlerType) {
|
||||
case ClusterConfig.SEQUENCE_HANDLER_MYSQL:
|
||||
case ClusterConfig.SEQUENCE_HANDLER_ZK_GLOBAL_INCREMENT:
|
||||
SequenceHandler tmpHandler = newSequenceHandler(seqHandlerType);
|
||||
tmpHandler.tryLoad(sequenceJson, currentShardingNodes);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public static long nextId(String prefixName, @Nullable FrontendService frontendService) throws SQLNonTransientException {
|
||||
if (INSTANCE.handler == null)
|
||||
throw new ConfigException("sequence is not init");
|
||||
return INSTANCE.handler.nextId(prefixName, frontendService);
|
||||
}
|
||||
|
||||
public static SequenceHandler getHandler() {
|
||||
return INSTANCE.handler;
|
||||
}
|
||||
|
||||
public static Set<String> getShardingNodes(RawJson sequenceJson) {
|
||||
if (ClusterConfig.getInstance().getSequenceHandlerType() == ClusterConfig.SEQUENCE_HANDLER_MYSQL && sequenceJson != null) {
|
||||
return IncrSequenceMySQLHandler.getShardingNodes(sequenceJson);
|
||||
}
|
||||
return Sets.newHashSet();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user