mirror of
https://github.com/actiontech/dble.git
synced 2026-01-06 04:40:17 -06:00
[inner-2256] fix: narrow down the scope of syncing dbGroup state
(cherry picked from commit 539b3c76d6)
This commit is contained in:
@@ -11,6 +11,7 @@ import com.actiontech.dble.cluster.logic.ClusterOperation;
|
||||
import com.actiontech.dble.cluster.path.ChildPathMeta;
|
||||
import com.actiontech.dble.cluster.path.PathMeta;
|
||||
import com.actiontech.dble.cluster.values.*;
|
||||
import com.actiontech.dble.services.manager.response.ReloadConfig;
|
||||
import org.apache.logging.log4j.util.Strings;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
@@ -114,12 +115,12 @@ public final class ClusterHelper {
|
||||
return ClusterGeneralConfig.getInstance().getClusterSender().getOnlineMap();
|
||||
}
|
||||
|
||||
public static void writeConfToCluster() throws Exception {
|
||||
public static void writeConfToCluster(ReloadConfig.ReloadResult reloadResult) throws Exception {
|
||||
ClusterLogic.forConfig().syncSequenceJsonToCluster();
|
||||
ClusterLogic.forConfig().syncDbJsonToCluster();
|
||||
ClusterLogic.forConfig().syncShardingJsonToCluster();
|
||||
ClusterLogic.forConfig().syncUseJsonToCluster();
|
||||
ClusterLogic.forHA().syncDbGroupStatusToCluster();
|
||||
ClusterLogic.forHA().syncDbGroupStatusToCluster(reloadResult);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
|
||||
@@ -86,8 +86,8 @@ public class ConfigClusterLogic extends AbstractClusterLogic {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
boolean result = ReloadConfig.reloadByConfig(Integer.parseInt(params), false);
|
||||
if (!checkLocalResult(result)) {
|
||||
ReloadConfig.ReloadResult result = ReloadConfig.reloadByConfig(Integer.parseInt(params), false);
|
||||
if (!checkLocalResult(result.isSuccess())) {
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
|
||||
@@ -16,6 +16,7 @@ import com.actiontech.dble.cluster.zkprocess.entity.DbGroups;
|
||||
import com.actiontech.dble.cluster.zkprocess.entity.dbGroups.DBGroup;
|
||||
import com.actiontech.dble.cluster.zkprocess.entity.dbGroups.DBInstance;
|
||||
import com.actiontech.dble.config.model.SystemConfig;
|
||||
import com.actiontech.dble.services.manager.response.ReloadConfig;
|
||||
import com.actiontech.dble.singleton.HaConfigManager;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.JsonElement;
|
||||
@@ -86,6 +87,30 @@ public class HAClusterLogic extends AbstractClusterLogic {
|
||||
LOGGER.info("syncDbGroupStatusToCluster success");
|
||||
}
|
||||
|
||||
public void syncDbGroupStatusToCluster(ReloadConfig.ReloadResult reloadResult) throws Exception {
|
||||
LOGGER.info("syncDbGroupStatusToCluster start");
|
||||
HaConfigManager.getInstance().init(true);
|
||||
Map<String, RawJson> dbGroupStatusMap = HaConfigManager.getInstance().getSourceJsonList();
|
||||
|
||||
Map<String, PhysicalDbGroup> recycleHostMap = reloadResult.getRecycleHostMap();
|
||||
if (recycleHostMap != null) {
|
||||
for (Map.Entry<String, PhysicalDbGroup> groupEntry : recycleHostMap.entrySet()) {
|
||||
String dbGroupName = groupEntry.getKey();
|
||||
LOGGER.debug("delete dbGroup_status:{}", dbGroupName);
|
||||
clusterHelper.cleanKV(ClusterMetaUtil.getHaStatusPath(dbGroupName));
|
||||
}
|
||||
}
|
||||
Map<String, PhysicalDbGroup> addOrChangeHostMap = reloadResult.getAddOrChangeHostMap();
|
||||
if (addOrChangeHostMap != null) {
|
||||
for (Map.Entry<String, PhysicalDbGroup> groupEntry : addOrChangeHostMap.entrySet()) {
|
||||
RawJson dbGroupStatusJson = dbGroupStatusMap.get(groupEntry.getKey());
|
||||
LOGGER.debug("add dbGroup_status:{}---{}", groupEntry.getKey(), dbGroupStatusJson);
|
||||
clusterHelper.setKV(ClusterMetaUtil.getHaStatusPath(groupEntry.getKey()), dbGroupStatusJson);
|
||||
}
|
||||
}
|
||||
LOGGER.info("syncDbGroupStatusToCluster success");
|
||||
}
|
||||
|
||||
void syncHaStatusFromCluster(Gson gson, DbGroups dbs, List<DBGroup> dbGroupList) {
|
||||
try {
|
||||
List<ClusterEntry<RawJson>> statusKVList = this.getKVBeanOfChildPath(ClusterChildMetaUtil.getHaStatusPath());
|
||||
|
||||
@@ -138,21 +138,21 @@ public final class ReloadConfig {
|
||||
try {
|
||||
ReloadLogHelper.briefInfo("added configLock");
|
||||
//step 2 reload the local config file
|
||||
boolean reloadResult;
|
||||
ReloadResult reloadResult;
|
||||
if (confStatus.getStatus().equals(ConfStatus.Status.MANAGER_INSERT) || confStatus.getStatus().equals(ConfStatus.Status.MANAGER_UPDATE) ||
|
||||
confStatus.getStatus().equals(ConfStatus.Status.MANAGER_DELETE)) {
|
||||
reloadResult = reloadByConfig(loadAllMode, true);
|
||||
} else {
|
||||
reloadResult = reloadByLocalXml(loadAllMode);
|
||||
}
|
||||
if (!reloadResult) {
|
||||
if (!reloadResult.isSuccess()) {
|
||||
throw new ReloadException(ErrorCode.ER_RELOAD_INTERRUPUTED, "reload interruputed by others, config should be reload");
|
||||
}
|
||||
ReloadLogHelper.briefInfo("single instance(self) finished");
|
||||
ClusterDelayProvider.delayAfterMasterLoad();
|
||||
|
||||
//step 3 if the reload with no error ,than write the config file into cluster center remote
|
||||
ClusterHelper.writeConfToCluster();
|
||||
ClusterHelper.writeConfToCluster(reloadResult);
|
||||
ReloadLogHelper.briefInfo("sent config file to cluster center");
|
||||
|
||||
//step 4 write the reload flag and self reload result into cluster center,notify the other dble to reload
|
||||
@@ -196,17 +196,17 @@ public final class ReloadConfig {
|
||||
if (!ReloadManager.startReload(TRIGGER_TYPE_COMMAND, confStatus)) {
|
||||
throw new ReloadException(ErrorCode.ER_YES, "reload status error ,other client or cluster may in reload");
|
||||
}
|
||||
boolean reloadResult;
|
||||
ReloadResult reloadResult;
|
||||
if (confStatus.getStatus().equals(ConfStatus.Status.MANAGER_INSERT) || confStatus.getStatus().equals(ConfStatus.Status.MANAGER_UPDATE) ||
|
||||
confStatus.getStatus().equals(ConfStatus.Status.MANAGER_DELETE)) {
|
||||
reloadResult = reloadByConfig(loadAllMode, true);
|
||||
} else {
|
||||
reloadResult = reloadByLocalXml(loadAllMode);
|
||||
}
|
||||
if (reloadResult && returnFlag) {
|
||||
if (reloadResult.isSuccess() && returnFlag) {
|
||||
// ok package
|
||||
return;
|
||||
} else if (!reloadResult) {
|
||||
} else if (!reloadResult.isSuccess()) {
|
||||
throw new ReloadException(ErrorCode.ER_RELOAD_INTERRUPUTED, "reload interruputed by others,metadata should be reload");
|
||||
}
|
||||
} finally {
|
||||
@@ -223,11 +223,11 @@ public final class ReloadConfig {
|
||||
c.writeErrMessage(ErrorCode.ER_YES, sb);
|
||||
}
|
||||
|
||||
public static boolean reloadByLocalXml(final int loadAllMode) throws Exception {
|
||||
public static ReloadResult reloadByLocalXml(final int loadAllMode) throws Exception {
|
||||
return reload(loadAllMode, null, null, null, null);
|
||||
}
|
||||
|
||||
public static boolean reloadByConfig(final int loadAllMode, boolean isWriteToLocal) throws Exception {
|
||||
public static ReloadResult reloadByConfig(final int loadAllMode, boolean isWriteToLocal) throws Exception {
|
||||
RawJson userConfig = DbleTempConfig.getInstance().getUserConfig();
|
||||
userConfig = userConfig == null ? DbleServer.getInstance().getConfig().getUserConfig() : userConfig;
|
||||
RawJson dbConfig = DbleTempConfig.getInstance().getDbConfig();
|
||||
@@ -236,7 +236,7 @@ public final class ReloadConfig {
|
||||
shardingConfig = shardingConfig == null ? DbleServer.getInstance().getConfig().getShardingConfig() : shardingConfig;
|
||||
RawJson sequenceConfig = DbleTempConfig.getInstance().getSequenceConfig();
|
||||
sequenceConfig = sequenceConfig == null ? DbleServer.getInstance().getConfig().getSequenceConfig() : sequenceConfig;
|
||||
final boolean reloadResult = reload(loadAllMode, userConfig, dbConfig, shardingConfig, sequenceConfig);
|
||||
final ReloadResult reloadResult = reload(loadAllMode, userConfig, dbConfig, shardingConfig, sequenceConfig);
|
||||
|
||||
ReloadLogHelper.briefInfo("clean temp config ...");
|
||||
DbleTempConfig.getInstance().clean();
|
||||
@@ -247,7 +247,7 @@ public final class ReloadConfig {
|
||||
return reloadResult;
|
||||
}
|
||||
|
||||
private static boolean reload(final int loadAllMode, RawJson userConfig, RawJson dbConfig, RawJson shardingConfig, RawJson sequenceConfig) throws Exception {
|
||||
private static ReloadResult reload(final int loadAllMode, RawJson userConfig, RawJson dbConfig, RawJson shardingConfig, RawJson sequenceConfig) throws Exception {
|
||||
TraceManager.TraceObject traceObject = TraceManager.threadTrace("self-reload");
|
||||
try {
|
||||
// load configuration
|
||||
@@ -316,7 +316,7 @@ public final class ReloadConfig {
|
||||
if (!loader.isFullyConfigured()) {
|
||||
recycleServerConnections();
|
||||
}
|
||||
return result;
|
||||
return packReloadResult(result, changeItemList, forceAllReload, newDbGroups, oldConfig.getDbGroups());
|
||||
} catch (Exception e) {
|
||||
initFailed(newDbGroups);
|
||||
throw e;
|
||||
@@ -326,6 +326,35 @@ public final class ReloadConfig {
|
||||
}
|
||||
}
|
||||
|
||||
private static ReloadResult packReloadResult(boolean result, List<ChangeItem> changeItemList,
|
||||
boolean forceAllReload,
|
||||
Map<String, PhysicalDbGroup> newDbGroups,
|
||||
Map<String, PhysicalDbGroup> oldDbGroups) {
|
||||
if (forceAllReload) {
|
||||
return new ReloadResult(result, newDbGroups, oldDbGroups);
|
||||
} else {
|
||||
Map<String, PhysicalDbGroup> addOrChangeMap0 = new HashMap<>();
|
||||
Map<String, PhysicalDbGroup> recycleMap0 = new HashMap<>();
|
||||
for (ChangeItem changeItem : changeItemList) {
|
||||
if (changeItem.getItemType() == ChangeItemType.PHYSICAL_DB_GROUP) {
|
||||
PhysicalDbGroup dbGroup = ((PhysicalDbGroup) changeItem.getItem());
|
||||
switch (changeItem.getType()) {
|
||||
case ADD:
|
||||
case UPDATE:
|
||||
addOrChangeMap0.put(dbGroup.getGroupName(), dbGroup);
|
||||
break;
|
||||
case DELETE:
|
||||
recycleMap0.put(dbGroup.getGroupName(), dbGroup);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return new ReloadResult(result, addOrChangeMap0, recycleMap0);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* check version/packetSize/lowerCase
|
||||
* get system variables
|
||||
@@ -650,4 +679,28 @@ public final class ReloadConfig {
|
||||
service.writeErrMessage(errorCode, "Reload Failure, The reason is " + errorMsg);
|
||||
}
|
||||
}
|
||||
|
||||
public static class ReloadResult { // dbGroup
|
||||
private final boolean success;
|
||||
private final Map<String, PhysicalDbGroup> addOrChangeHostMap;
|
||||
private final Map<String, PhysicalDbGroup> recycleHostMap;
|
||||
|
||||
public ReloadResult(boolean success, Map<String, PhysicalDbGroup> addOrChangeHostMap, Map<String, PhysicalDbGroup> recycleHostMap) {
|
||||
this.success = success;
|
||||
this.addOrChangeHostMap = addOrChangeHostMap;
|
||||
this.recycleHostMap = recycleHostMap;
|
||||
}
|
||||
|
||||
public boolean isSuccess() {
|
||||
return success;
|
||||
}
|
||||
|
||||
public Map<String, PhysicalDbGroup> getAddOrChangeHostMap() {
|
||||
return addOrChangeHostMap;
|
||||
}
|
||||
|
||||
public Map<String, PhysicalDbGroup> getRecycleHostMap() {
|
||||
return recycleHostMap;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user