[inner-2281&2301&2302] fix: when reloading, the consistency of system parameters between comparison instances is not comprehensive

This commit is contained in:
wenyh
2023-07-21 17:32:01 +08:00
committed by wenyh1
parent 1ce6c9786c
commit eaaa18981f
6 changed files with 241 additions and 251 deletions
@@ -683,7 +683,8 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
this.config.getPort() == dbInstance.getConfig().getPort() &&
this.config.getUser().equals(dbInstance.getConfig().getUser()) &&
this.config.getPassword().equals(dbInstance.getConfig().getPassword()) &&
this.config.isUsingDecrypt() == dbInstance.getConfig().isUsingDecrypt();
this.config.isUsingDecrypt() == dbInstance.getConfig().isUsingDecrypt() &&
this.disabled.get() == dbInstance.isDisabled();
}
@Override
@@ -151,10 +151,6 @@ public class DbGroupConfig {
this.disableHA = disableHA;
}
public boolean existInstanceProvideVars() {
return writeInstanceConfig.provideVars();
}
public DataBaseType instanceDatabaseType() {
return writeInstanceConfig.getDataBaseType();
}
@@ -147,13 +147,6 @@ public class DbInstanceConfig {
return dataBaseType;
}
public boolean provideVars() {
if (dataBaseType == DataBaseType.MYSQL) {
return true;
}
return false;
}
public String getDbDistrict() {
return dbDistrict;
}
@@ -19,17 +19,15 @@ import com.actiontech.dble.services.manager.response.ChangeItem;
import com.actiontech.dble.services.manager.response.ChangeItemType;
import com.actiontech.dble.services.manager.response.ChangeType;
import com.actiontech.dble.singleton.TraceManager;
import com.actiontech.dble.util.CollectionUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.util.Strings;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
public final class ConfigUtil {
private static final Logger LOGGER = LoggerFactory.getLogger(ConfigUtil.class);
@@ -86,111 +84,156 @@ public final class ConfigUtil {
return schemaList;
}
public static String getAndSyncKeyVariables(List<ChangeItem> changeItemList, boolean needSync) throws Exception {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("sync-key-variables");
try {
String msg = null;
List<ChangeItem> needCheckItemList = changeItemList.stream()
//add dbInstance or add dbGroup or (update dbInstance and need testConn)
.filter(changeItem -> ((changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_INSTANCE || changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_GROUP) &&
changeItem.getType() == ChangeType.ADD) ||
(changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_INSTANCE && changeItem.getType() == ChangeType.UPDATE && changeItem.isAffectTestConn()))
.collect(Collectors.toList());
if (changeItemList.size() == 0 || needCheckItemList == null || needCheckItemList.isEmpty()) {
//with no dbGroups, do not check the variables
return null;
}
Map<VariableMapKey, Future<KeyVariables>> keyVariablesTaskMap = Maps.newHashMap();
List<PhysicalDbInstance> dbInstanceList = Lists.newArrayList();
getAndSyncKeyVariablesForDataSources(needCheckItemList, keyVariablesTaskMap, needSync, dbInstanceList);
public static boolean isAllDbInstancesChange(List<ChangeItem> changeItemList) {
if (changeItemList.size() == 0) return false;
Set<String> diffGroup = new HashSet<>();
int minNodePacketSize = Integer.MAX_VALUE;
int minVersion = Integer.parseInt(SystemConfig.getInstance().getFakeMySQLVersion().substring(0, 1));
Boolean lowerCase = DbleServer.getInstance().getConfig().isLowerCase();
for (Map.Entry<VariableMapKey, Future<KeyVariables>> entry : keyVariablesTaskMap.entrySet()) {
VariableMapKey variableMapKey = entry.getKey();
Future<KeyVariables> future = entry.getValue();
KeyVariables keyVariables = future.get();
if (keyVariables != null) {
if (dbInstanceList.size() == 1 || lowerCase == null) {
lowerCase = keyVariables.isLowerCase();
} else if (keyVariables.isLowerCase() != lowerCase && dbInstanceList.size() > 1) {
diffGroup.add(variableMapKey.getDataSourceName());
Map<String, PhysicalDbInstance> oldDbInstanceMaps = new HashMap<>();
DbleServer.getInstance().getConfig().getDbGroups()
.values().stream().forEach(group -> group.getAllDbInstanceMap()
.values().stream().forEach(db -> {
oldDbInstanceMaps.put(genDataSourceKey(group.getGroupName(), db.getName()), db);
}));
if (CollectionUtil.isEmpty(oldDbInstanceMaps)) return false;
for (ChangeItem changeItem : changeItemList) {
switch (changeItem.getItemType()) {
case PHYSICAL_DB_GROUP:
if (changeItem.getType() == ChangeType.DELETE) {
((PhysicalDbGroup) changeItem.getItem()).getAllDbInstanceMap().values().stream().forEach(db -> {
oldDbInstanceMaps.remove(genDataSourceKey(db.getDbGroupConfig().getName(), db.getName()));
});
}
minNodePacketSize = Math.min(minNodePacketSize, keyVariables.getMaxPacketSize());
int version = Integer.parseInt(keyVariables.getVersion().substring(0, 1));
minVersion = Math.min(minVersion, version);
}
break;
case PHYSICAL_DB_INSTANCE:
if ((changeItem.getType() == ChangeType.UPDATE || changeItem.getType() == ChangeType.DELETE)) {
PhysicalDbInstance db = ((PhysicalDbInstance) changeItem.getItem());
oldDbInstanceMaps.remove(genDataSourceKey(db.getDbGroupConfig().getName(), db.getName()));
}
break;
default:
break;
}
if (minNodePacketSize < SystemConfig.getInstance().getMaxPacketSize() + KeyVariables.MARGIN_PACKET_SIZE) {
SystemConfig.getInstance().setMaxPacketSize(minNodePacketSize - KeyVariables.MARGIN_PACKET_SIZE);
msg = "dble's maxPacketSize will be set to (the min of all dbGroup's max_allowed_packet) - " + KeyVariables.MARGIN_PACKET_SIZE + ":" + (minNodePacketSize - KeyVariables.MARGIN_PACKET_SIZE);
LOGGER.warn(msg);
}
if (minVersion < Integer.parseInt(SystemConfig.getInstance().getFakeMySQLVersion().substring(0, 1))) {
throw new ConfigException("the dble version[=" + SystemConfig.getInstance().getFakeMySQLVersion() + "] cannot be higher than the minimum version of the backend mysql node,pls check the backend mysql node.");
}
if (diffGroup.size() != 0) {
// if all datasoure's lower case are not equal, throw exception
StringBuilder sb = new StringBuilder("The values of lower_case_table_names for backend MySQLs are different.");
sb.append("These previous MySQL's value is");
sb.append(DbleServer.getInstance().getConfig().isLowerCase() ? " not 0" : " 0");
sb.append(".but these MySQL's [");
sb.append(Strings.join(diffGroup, ','));
sb.append("] value is");
sb.append(DbleServer.getInstance().getConfig().isLowerCase() ? " 0" : " not 0");
sb.append(".");
throw new IOException(sb.toString());
}
dbInstanceList.forEach(dbInstance -> dbInstance.setNeedSkipHeartTest(true));
DbleTempConfig.getInstance().setLowerCase(lowerCase);
return msg;
} finally {
TraceManager.finishSpan(traceObject);
}
oldDbInstanceMaps.values().removeIf(db -> db.isDisabled() || !db.isTestConnSuccess() || db.isFakeNode());
return CollectionUtil.isEmpty(oldDbInstanceMaps);
}
public static List<String> getAndSyncKeyVariables(Map<String, PhysicalDbGroup> dbGroups, boolean needSync) throws Exception {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("sync-key-variables");
try {
Map<String, PhysicalDbGroup> mysqlDbGroups = new HashMap<>();
Map<String, PhysicalDbGroup> clickHouseDbGroups = new HashMap<>();
dbGroups.forEach((k, v) -> {
if (v.getDbGroupConfig().instanceDatabaseType() == DataBaseType.MYSQL) {
mysqlDbGroups.put(k, v);
} else {
clickHouseDbGroups.put(k, v);
Set<PhysicalDbInstance> mysqlDbInstances = new HashSet<>();
Set<PhysicalDbInstance> ckDbInstances = new HashSet<>();
for (Map.Entry<String, PhysicalDbGroup> entry : dbGroups.entrySet()) {
for (PhysicalDbInstance ds : entry.getValue().getDbInstances(true)) {
if (ds.isDisabled() || !ds.isTestConnSuccess() || ds.isFakeNode()) {
continue;
}
if (ds.getConfig().getDataBaseType() == DataBaseType.MYSQL) {
mysqlDbInstances.add(ds);
} else {
ckDbInstances.add(ds);
}
}
});
}
List<String> syncKeyVariables = Lists.newArrayList();
List<String> mysqlSyncKeyVariables = getMysqlSyncKeyVariables(mysqlDbGroups, needSync);
Optional.ofNullable(mysqlSyncKeyVariables).ifPresent(syncKeyVariables::addAll);
List<String> clickHouseSyncKeyVariables = getClickHouseSyncKeyVariables(clickHouseDbGroups, needSync);
Optional.ofNullable(clickHouseSyncKeyVariables).ifPresent(syncKeyVariables::addAll);
List<String> clickHouseSyncKeyVariables = getClickHouseSyncKeyVariables(ckDbInstances, true, needSync, !CollectionUtil.isEmpty(mysqlDbInstances));
syncKeyVariables.addAll(clickHouseSyncKeyVariables);
List<String> mysqlSyncKeyVariables = getMysqlSyncKeyVariables(mysqlDbInstances, true, needSync, !CollectionUtil.isEmpty(ckDbInstances));
syncKeyVariables.addAll(mysqlSyncKeyVariables);
return syncKeyVariables;
} finally {
TraceManager.finishSpan(traceObject);
}
}
@Nullable
private static List<String> getMysqlSyncKeyVariables(Map<String, PhysicalDbGroup> dbGroups, boolean needSync) throws InterruptedException, ExecutionException, IOException {
String msg = null;
List<String> list = new ArrayList<>();
if (dbGroups.size() == 0) {
//with no dbGroups, do not check the variables
return list;
}
Map<VariableMapKey, Future<KeyVariables>> keyVariablesTaskMap = new HashMap<>(dbGroups.size());
List<PhysicalDbInstance> dbInstanceList = Lists.newArrayList();
getAndSyncKeyVariablesForDataSources(dbGroups, keyVariablesTaskMap, needSync, dbInstanceList);
public static List<String> getAndSyncKeyVariables(List<ChangeItem> changeItemList, boolean needSync) throws Exception {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("sync-key-variables");
try {
Set<PhysicalDbInstance> mysqlDbInstances = new HashSet<>();
Set<PhysicalDbInstance> ckDbInstances = new HashSet<>();
boolean lowerCase = false;
boolean isFirst = true;
Set<String> firstGroup = new HashSet<>();
Set<String> secondGroup = new HashSet<>();
for (ChangeItem changeItem : changeItemList) {
// add dbInstance or add dbGroup or (update dbInstance and need testConn)
if (((changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_INSTANCE || changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_GROUP) &&
changeItem.getType() == ChangeType.ADD) ||
(changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_INSTANCE && changeItem.getType() == ChangeType.UPDATE && changeItem.isAffectTestConn())) {
// filter not available
if (changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_INSTANCE) {
PhysicalDbInstance ds = (PhysicalDbInstance) changeItem.getItem();
if (ds.isDisabled() || !ds.isTestConnSuccess() || ds.isFakeNode()) {
continue;
}
if (ds.getConfig().getDataBaseType() == DataBaseType.MYSQL) {
mysqlDbInstances.add(ds);
} else {
ckDbInstances.add(ds);
}
} else if (changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_GROUP) {
PhysicalDbGroup dbGroup = (PhysicalDbGroup) changeItem.getItem();
for (PhysicalDbInstance ds : dbGroup.getAllDbInstanceMap().values()) {
if (ds.isDisabled() || !ds.isTestConnSuccess() || ds.isFakeNode()) {
continue;
}
if (ds.getConfig().getDataBaseType() == DataBaseType.MYSQL) {
mysqlDbInstances.add(ds);
} else {
ckDbInstances.add(ds);
}
}
}
}
}
List<String> syncKeyVariables = Lists.newArrayList();
List<String> clickHouseSyncKeyVariables = getClickHouseSyncKeyVariables(ckDbInstances, false, needSync, !CollectionUtil.isEmpty(mysqlDbInstances));
syncKeyVariables.addAll(clickHouseSyncKeyVariables);
List<String> mysqlSyncKeyVariables = getMysqlSyncKeyVariables(mysqlDbInstances, false, needSync, !CollectionUtil.isEmpty(ckDbInstances));
syncKeyVariables.addAll(mysqlSyncKeyVariables);
return syncKeyVariables;
} finally {
TraceManager.finishSpan(traceObject);
}
}
private static List<String> getMysqlSyncKeyVariables(Set<PhysicalDbInstance> mysqlDbInstances, boolean isAllChange, boolean needSync, boolean existClickHouse) throws InterruptedException, ExecutionException, IOException {
if (mysqlDbInstances.size() == 0)
return new ArrayList<>();
Map<VariableMapKey, Future<KeyVariables>> keyVariablesTaskMap = new HashMap<>(mysqlDbInstances.size());
getAndSyncKeyVariablesForDataSources(mysqlDbInstances, keyVariablesTaskMap, needSync);
return diffMysqlKeyVariables(keyVariablesTaskMap, mysqlDbInstances, isAllChange, existClickHouse);
}
private static List<String> getClickHouseSyncKeyVariables(Set<PhysicalDbInstance> ckDbInstances, boolean isAllChange, boolean needSync, boolean existMysql) throws InterruptedException, ExecutionException, IOException {
if (ckDbInstances.size() == 0)
return new ArrayList<>();
Boolean lowerCase = (isAllChange && !existMysql) ? null : DbleServer.getInstance().getConfig().isLowerCase();
if (lowerCase != null && lowerCase) {
StringBuilder sb = new StringBuilder();
sb.append("The configuration add Clickhouse. Since clickhouse is not case sensitive, so the values of lower_case_table_names for previous dbInstances must be 0.");
throw new IOException(sb.toString());
}
Map<VariableMapKey, Future<KeyVariables>> keyVariablesTaskMap = new HashMap<>(ckDbInstances.size());
getAndSyncKeyVariablesForDataSources(ckDbInstances, keyVariablesTaskMap, needSync);
return diffClickHouseKeyVariables(keyVariablesTaskMap, ckDbInstances);
}
private static List<String> diffMysqlKeyVariables(Map<VariableMapKey, Future<KeyVariables>> keyVariablesTaskMap,
Set<PhysicalDbInstance> dbInstanceList,
boolean isAllChange,
boolean existClickHouse) throws InterruptedException, ExecutionException, IOException { // Mysql
List<String> msgList = new ArrayList<>();
if (CollectionUtil.isEmpty(keyVariablesTaskMap))
return msgList;
String msg;
Boolean lowerCase = isAllChange ? null : DbleServer.getInstance().getConfig().isLowerCase();
boolean reInitLowerCase = false;
Set<String> leftGroup = new HashSet<>();
Set<String> rightGroup = new HashSet<>();
int minNodePacketSize = Integer.MAX_VALUE;
int minVersion = VersionUtil.getMajorVersion(SystemConfig.getInstance().getFakeMySQLVersion());
for (Map.Entry<VariableMapKey, Future<KeyVariables>> entry : keyVariablesTaskMap.entrySet()) {
@@ -198,95 +241,102 @@ public final class ConfigUtil {
Future<KeyVariables> future = entry.getValue();
KeyVariables keyVariables = future.get();
if (keyVariables != null) {
if (isFirst) {
// lowerCase
if (lowerCase == null) {
reInitLowerCase = true;
lowerCase = keyVariables.isLowerCase();
isFirst = false;
firstGroup.add(variableMapKey.getDataSourceName());
} else if (keyVariables.isLowerCase() != lowerCase) {
secondGroup.add(variableMapKey.getDataSourceName());
leftGroup.add(variableMapKey.getDataSourceName());
} else if (keyVariables.isLowerCase() == lowerCase) {
leftGroup.add(variableMapKey.getDataSourceName());
} else {
rightGroup.add(variableMapKey.getDataSourceName());
}
minNodePacketSize = Math.min(minNodePacketSize, keyVariables.getMaxPacketSize());
// version
Integer majorVersion = VersionUtil.getMajorVersionWithoutDefaultValue(keyVariables.getVersion());
if (majorVersion == null) {
LOGGER.warn("the backend mysql server version [{}] is unrecognized, we will treat as default official mysql version 5.*. ", keyVariables.getVersion());
majorVersion = 5;
}
minVersion = Math.min(minVersion, majorVersion);
PhysicalDbInstance instance = variableMapKey.getDbInstance();
// The back_log value indicates how many requests can be stacked during this short time before MySQL momentarily stops answering new requests
PhysicalDbInstance instance = variableMapKey.getDbInstance();
int minCon = instance.getConfig().getMinCon();
int backLog = keyVariables.getBackLog();
if (backLog < minCon) {
msg = "dbGroup[" + instance.getDbGroup().getGroupName() + "," + instance.getName() + "] the value of back_log may too small, current value is " + backLog + ", recommended value is " + minCon;
list.add(msg);
msgList.add(msg);
LOGGER.warn(msg);
}
}
}
// maxPacketSize
if (minNodePacketSize < SystemConfig.getInstance().getMaxPacketSize() + KeyVariables.MARGIN_PACKET_SIZE) {
SystemConfig.getInstance().setMaxPacketSize(minNodePacketSize - KeyVariables.MARGIN_PACKET_SIZE);
msg = "dble's maxPacketSize will be set to (the min of all dbGroup's max_allowed_packet) - " + KeyVariables.MARGIN_PACKET_SIZE + ":" + (minNodePacketSize - KeyVariables.MARGIN_PACKET_SIZE);
list.add(msg);
msgList.add(msg);
LOGGER.warn(msg);
}
if (minVersion < VersionUtil.getMajorVersion(SystemConfig.getInstance().getFakeMySQLVersion())) {
throw new ConfigException("the dble version[=" + SystemConfig.getInstance().getFakeMySQLVersion() + "] cannot be higher than the minimum version of the backend mysql node,pls check the backend mysql node.");
}
if (secondGroup.size() != 0) {
// if all datasoure's lower case are not equal, throw exception
StringBuilder sb = new StringBuilder("The values of lower_case_table_names for backend MySQLs are different.");
String firstGroupValue;
String secondGroupValue;
if (lowerCase) {
firstGroupValue = " not 0 :";
secondGroupValue = " 0 :";
// version
checkVersion(minVersion, "mysql");
// if all dbInstance's lower case are not equal, throw exception
if (rightGroup.size() != 0) {
StringBuilder sb = new StringBuilder();
if (existClickHouse) {
sb.append("The configuration contains Clickhouse. Since clickhouse is not case sensitive, so the values of lower_case_table_names for dbInstances must be 0. ");
} else {
firstGroupValue = " 0 :";
secondGroupValue = " not 0 :";
sb.append("The values of lower_case_table_names for dbInstances are different. ");
}
if (reInitLowerCase) {
sb.append("These dbInstances's [");
sb.append(Strings.join(leftGroup, ','));
sb.append("] value is");
sb.append(lowerCase ? " not 0" : " 0");
sb.append(". And these dbInstances's [");
sb.append(Strings.join(rightGroup, ','));
sb.append("] value is");
sb.append(lowerCase ? " 0" : " not 0");
sb.append(".");
} else {
sb.append("These previous dbInstances's value is");
sb.append(lowerCase ? " not 0" : " 0");
sb.append(".but these dbInstances's [");
sb.append(Strings.join(rightGroup, ','));
sb.append("] value is");
sb.append(lowerCase ? " 0" : " not 0");
sb.append(".");
}
sb.append("These MySQL's value is");
sb.append(firstGroupValue);
sb.append(Strings.join(firstGroup, ','));
sb.append(".And these MySQL's value is");
sb.append(secondGroupValue);
sb.append(Strings.join(secondGroup, ','));
sb.append(".");
throw new IOException(sb.toString());
}
if (existClickHouse && (lowerCase != null && lowerCase)) {
throw new IOException("The configuration contains Clickhouse. Since clickhouse is not case sensitive, so the values of lower_case_table_names for all dbInstances must be 0. Current all dbInstances are 1.");
}
dbInstanceList.forEach(dbInstance -> dbInstance.setNeedSkipHeartTest(true));
DbleTempConfig.getInstance().setLowerCase(lowerCase);
return list;
return msgList;
}
@Nullable
private static List<String> getClickHouseSyncKeyVariables(Map<String, PhysicalDbGroup> dbGroups, boolean needSync) throws InterruptedException, ExecutionException, IOException {
String msg = null;
List<String> list = new ArrayList<>();
if (dbGroups.size() == 0) {
//with no dbGroups, do not check the variables
return list;
}
Map<VariableMapKey, Future<KeyVariables>> keyVariablesTaskMap = new HashMap<>(dbGroups.size());
List<PhysicalDbInstance> dbInstanceList = Lists.newArrayList();
getAndSyncKeyVariablesForDataSources(dbGroups, keyVariablesTaskMap, needSync, dbInstanceList);
private static List<String> diffClickHouseKeyVariables(Map<VariableMapKey, Future<KeyVariables>> keyVariablesTaskMap,
Set<PhysicalDbInstance> dbInstanceList) throws InterruptedException, ExecutionException, IOException { // Mysql
List<String> msgList = new ArrayList<>();
if (CollectionUtil.isEmpty(keyVariablesTaskMap))
return msgList;
boolean lowerCase = false;
boolean isFirst = true;
Set<String> firstGroup = new HashSet<>();
Set<String> secondGroup = new HashSet<>();
String msg;
final boolean lowerCaseA = false; // clickhouse is not case sensitive
int minNodePacketSize = Integer.MAX_VALUE;
int minVersion = VersionUtil.getMajorVersion(SystemConfig.getInstance().getFakeMySQLVersion());
for (Map.Entry<VariableMapKey, Future<KeyVariables>> entry : keyVariablesTaskMap.entrySet()) {
VariableMapKey variableMapKey = entry.getKey();
Future<KeyVariables> future = entry.getValue();
KeyVariables keyVariables = future.get();
if (keyVariables != null) {
if (isFirst) {
lowerCase = keyVariables.isLowerCase();
isFirst = false;
firstGroup.add(variableMapKey.getDataSourceName());
} else if (keyVariables.isLowerCase() != lowerCase) {
secondGroup.add(variableMapKey.getDataSourceName());
}
minNodePacketSize = Math.min(minNodePacketSize, keyVariables.getMaxPacketSize());
Integer majorVersion = VersionUtil.getMajorVersionWithoutDefaultValue(keyVariables.getVersion());
if (majorVersion == null) {
@@ -296,94 +346,36 @@ public final class ConfigUtil {
minVersion = Math.min(minVersion, majorVersion);
}
}
// maxPacketSize
if (minNodePacketSize < SystemConfig.getInstance().getMaxPacketSize() + KeyVariables.MARGIN_PACKET_SIZE) {
SystemConfig.getInstance().setMaxPacketSize(minNodePacketSize - KeyVariables.MARGIN_PACKET_SIZE);
msg = "dble's maxPacketSize will be set to (the min of all dbGroup's max_allowed_packet) - " + KeyVariables.MARGIN_PACKET_SIZE + ":" + (minNodePacketSize - KeyVariables.MARGIN_PACKET_SIZE);
list.add(msg);
msgList.add(msg);
LOGGER.warn(msg);
}
if (minVersion < VersionUtil.getMajorVersion(SystemConfig.getInstance().getFakeMySQLVersion())) {
throw new ConfigException("the dble version[=" + SystemConfig.getInstance().getFakeMySQLVersion() + "] cannot be higher than the minimum version of the backend clickHouse node,pls check the backend clickHouse node.");
}
if (secondGroup.size() != 0) {
// if all datasoure's lower case are not equal, throw exception
StringBuilder sb = new StringBuilder("The values of lower_case_table_names for backend clickHouse are different.");
String firstGroupValue;
String secondGroupValue;
if (lowerCase) {
firstGroupValue = " not 0 :";
secondGroupValue = " 0 :";
} else {
firstGroupValue = " 0 :";
secondGroupValue = " not 0 :";
}
sb.append("These clickHouse's value is");
sb.append(firstGroupValue);
sb.append(Strings.join(firstGroup, ','));
sb.append(".And these clickHouse's value is");
sb.append(secondGroupValue);
sb.append(Strings.join(secondGroup, ','));
sb.append(".");
throw new IOException(sb.toString());
}
// version
checkVersion(minVersion, "clickHouse");
dbInstanceList.forEach(dbInstance -> dbInstance.setNeedSkipHeartTest(true));
DbleTempConfig.getInstance().setLowerCase(lowerCase);
return list;
DbleTempConfig.getInstance().setLowerCase(lowerCaseA);
return msgList;
}
private static void getAndSyncKeyVariablesForDataSources(List<ChangeItem> changeItemList, Map<VariableMapKey, Future<KeyVariables>> keyVariablesTaskMap,
boolean needSync, List<PhysicalDbInstance> dbInstanceList) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(changeItemList.size());
for (ChangeItem changeItem : changeItemList) {
Object item = changeItem.getItem();
if (changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_INSTANCE) {
PhysicalDbInstance ds = (PhysicalDbInstance) item;
if (ds.isDisabled() || !ds.isTestConnSuccess() || ds.isFakeNode()) {
continue;
}
getKeyVariablesForDataSource(service, ds, ds.getDbGroupConfig().getName(), keyVariablesTaskMap, needSync);
dbInstanceList.add(ds);
} else if (changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_GROUP) {
PhysicalDbGroup dbGroup = (PhysicalDbGroup) item;
for (PhysicalDbInstance ds : dbGroup.getAllDbInstanceMap().values()) {
if (ds.isDisabled() || !ds.isTestConnSuccess() || ds.isFakeNode()) {
continue;
}
getKeyVariablesForDataSource(service, ds, ds.getDbGroupConfig().getName(), keyVariablesTaskMap, needSync);
dbInstanceList.add(ds);
}
}
}
service.shutdown();
int i = 0;
while (!service.awaitTermination(100, TimeUnit.MILLISECONDS)) {
if (LOGGER.isDebugEnabled()) {
if (i == 0) {
LOGGER.info("wait to get all dbInstances's get key variable");
}
i++;
if (i == 100) { //log every 10 seconds
i = 0;
}
}
private static void checkVersion(int currentMinVersion, String databaseType) throws ConfigException {
String fakeMySQLVersion = SystemConfig.getInstance().getFakeMySQLVersion();
if (currentMinVersion < VersionUtil.getMajorVersion(fakeMySQLVersion)) {
throw new ConfigException("the dble version[=" + fakeMySQLVersion + "] cannot be higher than the minimum version of the backend " + databaseType + " node,pls check the backend " + databaseType + " node.");
}
}
private static void getAndSyncKeyVariablesForDataSources(Map<String, PhysicalDbGroup> dbGroups, Map<VariableMapKey, Future<KeyVariables>> keyVariablesTaskMap,
boolean needSync, List<PhysicalDbInstance> dbInstanceList) throws InterruptedException {
ExecutorService service = Executors.newFixedThreadPool(dbGroups.size());
for (Map.Entry<String, PhysicalDbGroup> entry : dbGroups.entrySet()) {
String hostName = entry.getKey();
PhysicalDbGroup pool = entry.getValue();
private static void getAndSyncKeyVariablesForDataSources(Set<PhysicalDbInstance> availableDbInstances,
Map<VariableMapKey, Future<KeyVariables>> keyVariablesTaskMap,
boolean needSync) throws InterruptedException {
for (PhysicalDbInstance ds : pool.getDbInstances(true)) {
if (ds.isDisabled() || !ds.isTestConnSuccess() || ds.isFakeNode()) {
continue;
}
getKeyVariablesForDataSource(service, ds, hostName, keyVariablesTaskMap, needSync);
dbInstanceList.add(ds);
}
ExecutorService service = Executors.newFixedThreadPool(availableDbInstances.size());
for (PhysicalDbInstance ds : availableDbInstances) {
getKeyVariablesForDataSource(service, ds, ds.getDbGroupConfig().getName(), keyVariablesTaskMap, needSync);
}
service.shutdown();
int i = 0;
@@ -7,6 +7,7 @@ package com.actiontech.dble.server.variables;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.config.model.db.type.DataBaseType;
import com.actiontech.dble.meta.ReloadLogHelper;
import com.actiontech.dble.singleton.TraceManager;
import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler;
@@ -33,6 +34,7 @@ public class VarsExtractorHandler {
private Map<String, PhysicalDbGroup> dbGroups;
private volatile SystemVariables systemVariables = null;
private volatile boolean fetchFailed = false;
private boolean isExistClickHouse = false;
public VarsExtractorHandler(Map<String, PhysicalDbGroup> dbGroups) {
this.dbGroups = dbGroups;
@@ -44,7 +46,11 @@ public class VarsExtractorHandler {
public SystemVariables execute() {
TraceManager.TraceObject traceObject = TraceManager.threadTrace("get-system-variables-from-backend");
PhysicalDbInstance physicalDbInstance = getPhysicalDbInstance();
/**
* get system Variables only from available Mysql instances (does not contain ClickHouse instance);
* if there is only ClickHouse in the instances, just fake a copy of the system parameters, see tryInitVars().
*/
PhysicalDbInstance physicalDbInstance = getMysqlPhysicalDbInstance();
try {
OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(MYSQL_SHOW_VARIABLES_COLS, new MysqlVarsListener(this));
if (physicalDbInstance != null) {
@@ -53,7 +59,7 @@ public class VarsExtractorHandler {
waitOrRetry(physicalDbInstance);
} else {
tryInitVars();
LOGGER.warn("No dbInstance is alive, server can not get 'show variables' result");
LOGGER.warn("No Mysql dbInstance is alive, server can not get 'show variables' result");
}
return systemVariables;
} finally {
@@ -62,32 +68,37 @@ public class VarsExtractorHandler {
}
}
private PhysicalDbInstance getPhysicalDbInstance() {
private PhysicalDbInstance getMysqlPhysicalDbInstance() {
if (dbGroups == null || dbGroups.isEmpty()) {
return null;
}
PhysicalDbInstance ds = null;
List<PhysicalDbGroup> dbGroupList = dbGroups.values().stream().filter(dbGroup -> dbGroup.getDbGroupConfig().existInstanceProvideVars()).collect(Collectors.toList());
List<PhysicalDbGroup> dbGroupList = dbGroups.values().stream().collect(Collectors.toList());
for (PhysicalDbGroup dbGroup : dbGroupList) {
PhysicalDbInstance dsTest = dbGroup.getWriteDbInstance();
if (dsTest.isTestConnSuccess()) {
ds = dsTest;
}
if (ds != null) {
break;
if (dsTest.getConfig().getDataBaseType() == DataBaseType.MYSQL) {
ds = dsTest;
break;
} else {
isExistClickHouse = true;
}
}
}
if (ds == null) {
outLoop:
for (PhysicalDbGroup dbGroup : dbGroupList) {
for (PhysicalDbInstance dsTest : dbGroup.getDbInstances(false)) {
if (dsTest.isTestConnSuccess()) {
ds = dsTest;
break;
if (dsTest.getConfig().getDataBaseType() == DataBaseType.MYSQL) {
ds = dsTest;
break outLoop;
} else {
isExistClickHouse = true;
}
}
}
if (ds != null) {
break;
}
}
}
return ds;
@@ -144,11 +155,7 @@ public class VarsExtractorHandler {
}
private void tryInitVars() {
if (dbGroups == null || dbGroups.isEmpty()) {
return;
}
List<PhysicalDbGroup> dbGroupList = dbGroups.values().stream().filter(dbGroup -> dbGroup.getDbGroupConfig().existInstanceProvideVars()).collect(Collectors.toList());
if (dbGroupList.isEmpty()) {
if (isExistClickHouse) {
systemVariables = new SystemVariables();
systemVariables.setDefaultValue("lower_case_table_names", "0");
}
@@ -58,6 +58,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static com.actiontech.dble.cluster.path.ClusterPathUtil.SEPARATOR;
import static com.actiontech.dble.meta.ReloadStatus.TRIGGER_TYPE_COMMAND;
import static com.actiontech.dble.services.manager.response.ChangeItemType.PHYSICAL_DB_INSTANCE;
public final class ReloadConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(ReloadConfig.class);
@@ -332,7 +333,7 @@ public final class ReloadConfig {
private static SystemVariables checkVersionAGetSystemVariables(ConfigInitializer loader, Map<String, PhysicalDbGroup> newDbGroups, List<ChangeItem> changeItemList, boolean forceAllReload) throws Exception {
ReloadLogHelper.briefInfo("check and get system variables from random node start");
SystemVariables newSystemVariables;
if (forceAllReload) {
if (forceAllReload || ConfigUtil.isAllDbInstancesChange(changeItemList)) {
//check version/packetSize/lowerCase
ConfigUtil.getAndSyncKeyVariables(newDbGroups, true);
//system variables
@@ -496,14 +497,14 @@ public final class ReloadConfig {
MapDifference<String, PhysicalDbInstance> dbInstanceMapDifference = Maps.difference(newDbInstanceMap, oldDbInstanceMap);
//delete
dbInstanceMapDifference.entriesOnlyOnRight().values().stream().map(dbInstance -> new ChangeItem(ChangeType.DELETE, dbInstance, ChangeItemType.PHYSICAL_DB_INSTANCE)).forEach(changeItemList::add);
dbInstanceMapDifference.entriesOnlyOnRight().values().stream().map(dbInstance -> new ChangeItem(ChangeType.DELETE, dbInstance, PHYSICAL_DB_INSTANCE)).forEach(changeItemList::add);
//add
dbInstanceMapDifference.entriesOnlyOnLeft().values().stream().map(dbInstance -> new ChangeItem(ChangeType.ADD, dbInstance, ChangeItemType.PHYSICAL_DB_INSTANCE)).forEach(changeItemList::add);
dbInstanceMapDifference.entriesOnlyOnLeft().values().stream().map(dbInstance -> new ChangeItem(ChangeType.ADD, dbInstance, PHYSICAL_DB_INSTANCE)).forEach(changeItemList::add);
//update
dbInstanceMapDifference.entriesDiffering().values().stream().map(physicalDbInstanceValueDifference -> {
PhysicalDbInstance newDbInstance = physicalDbInstanceValueDifference.leftValue();
PhysicalDbInstance oldDbInstance = physicalDbInstanceValueDifference.rightValue();
ChangeItem changeItem = new ChangeItem(ChangeType.UPDATE, newDbInstance, ChangeItemType.PHYSICAL_DB_INSTANCE);
ChangeItem changeItem = new ChangeItem(ChangeType.UPDATE, newDbInstance, PHYSICAL_DB_INSTANCE);
if (!newDbInstance.equalsForConnectionPool(oldDbInstance)) {
changeItem.setAffectConnectionPool(true);
}