From 79b7dbefb612e276c64c5b66748bd7f5ea18bc16 Mon Sep 17 00:00:00 2001 From: wenyh <44251917+wenyh1@users.noreply.github.com> Date: Mon, 30 Nov 2020 21:43:46 +0800 Subject: [PATCH] slave dble reload failure(cherry pick from DMP-6128) (#2348) --- .../dble/cluster/general/kVtoXml/ClusterToXml.java | 7 +------ .../cluster/general/listener/ClusterClearKeyListener.java | 6 +++--- .../cluster/general/response/ConfigStatusResponse.java | 7 +++++-- 3 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/actiontech/dble/cluster/general/kVtoXml/ClusterToXml.java b/src/main/java/com/actiontech/dble/cluster/general/kVtoXml/ClusterToXml.java index 746b4eaf5..0aa9f29d1 100644 --- a/src/main/java/com/actiontech/dble/cluster/general/kVtoXml/ClusterToXml.java +++ b/src/main/java/com/actiontech/dble/cluster/general/kVtoXml/ClusterToXml.java @@ -50,10 +50,9 @@ public final class ClusterToXml { xmlProcess.initJaxbClass(); //add listener to watch the Prefix of the keys + new ConfigStatusResponse(listener); new PauseShardingNodeResponse(listener); - final ClusterSingleKeyListener configStatusListener = new ClusterSingleKeyListener(ClusterPathUtil.getConfStatusPath() + SEPARATOR, new ConfigStatusResponse(), sender); - final ClusterSingleKeyListener binlogPauseListener = new ClusterSingleKeyListener(ClusterPathUtil.getBinlogPause() + SEPARATOR, new BinlogPauseStatusResponse(), sender); final ClusterSingleKeyListener ddlListener = new ClusterSingleKeyListener(ClusterPathUtil.getDDLPath() + SEPARATOR, new DdlChildResponse(), sender); @@ -75,10 +74,6 @@ public final class ClusterToXml { thread6.setName("BINLOG_PAUSE_UCORE_LISTENER"); thread6.start(); - Thread thread7 = new Thread(configStatusListener); - thread7.setName("CONFIG_STATUS_UCORE_LISTENER"); - thread7.start(); - Thread thread2 = new Thread(ddlListener); thread2.setName("DDL_UCORE_LISTENER"); threads.add(thread2); diff --git a/src/main/java/com/actiontech/dble/cluster/general/listener/ClusterClearKeyListener.java b/src/main/java/com/actiontech/dble/cluster/general/listener/ClusterClearKeyListener.java index 29b43f0ca..151b2f8b7 100644 --- a/src/main/java/com/actiontech/dble/cluster/general/listener/ClusterClearKeyListener.java +++ b/src/main/java/com/actiontech/dble/cluster/general/listener/ClusterClearKeyListener.java @@ -72,14 +72,14 @@ public class ClusterClearKeyListener implements Runnable { newKeyMap.put(output.getKeys(i), output.getValues(i)); if (cache.get(output.getKeys(i)) != null) { if (!cache.get(output.getKeys(i)).equals(output.getValues(i))) { - if (output.getKeys(i).equalsIgnoreCase(ClusterPathUtil.getConfStatusPath())) { + if (output.getKeys(i).equalsIgnoreCase(ClusterPathUtil.getConfStatusOperatorPath())) { reloadKv = new KvBean(output.getKeys(i), output.getValues(i), KvBean.UPDATE); } else { diffMap.put(output.getKeys(i), new KvBean(output.getKeys(i), output.getValues(i), KvBean.UPDATE)); } } } else { - if (output.getKeys(i).equalsIgnoreCase(ClusterPathUtil.getConfStatusPath())) { + if (output.getKeys(i).equalsIgnoreCase(ClusterPathUtil.getConfStatusOperatorPath())) { reloadKv = new KvBean(output.getKeys(i), output.getValues(i), KvBean.ADD); } else { diffMap.put(output.getKeys(i), new KvBean(output.getKeys(i), output.getValues(i), KvBean.ADD)); @@ -90,7 +90,7 @@ public class ClusterClearKeyListener implements Runnable { //find out the deleted Key for (Map.Entry entry : cache.entrySet()) { if (!newKeyMap.containsKey(entry.getKey())) { - if (entry.getKey().equalsIgnoreCase(ClusterPathUtil.getConfStatusPath())) { + if (entry.getKey().equalsIgnoreCase(ClusterPathUtil.getConfStatusOperatorPath())) { reloadKv = new KvBean(entry.getKey(), entry.getValue(), KvBean.DELETE); } else { diffMap.put(entry.getKey(), new KvBean(entry.getKey(), entry.getValue(), KvBean.DELETE)); diff --git a/src/main/java/com/actiontech/dble/cluster/general/response/ConfigStatusResponse.java b/src/main/java/com/actiontech/dble/cluster/general/response/ConfigStatusResponse.java index c9a3315f7..fc47de8e4 100644 --- a/src/main/java/com/actiontech/dble/cluster/general/response/ConfigStatusResponse.java +++ b/src/main/java/com/actiontech/dble/cluster/general/response/ConfigStatusResponse.java @@ -10,6 +10,7 @@ import com.actiontech.dble.btrace.provider.ClusterDelayProvider; import com.actiontech.dble.cluster.ClusterLogic; import com.actiontech.dble.cluster.ClusterPathUtil; import com.actiontech.dble.cluster.general.bean.KvBean; +import com.actiontech.dble.cluster.general.listener.ClusterClearKeyListener; import com.actiontech.dble.cluster.values.ConfStatus; import com.actiontech.dble.config.model.SystemConfig; import org.slf4j.Logger; @@ -22,8 +23,10 @@ public class ConfigStatusResponse implements ClusterXmlLoader { private static final Logger LOGGER = LoggerFactory.getLogger(BinlogPauseStatusResponse.class); + private static final String CONFIG_STATUS_OPERATOR_PATH = ClusterPathUtil.getConfStatusOperatorPath(); - public ConfigStatusResponse() { + public ConfigStatusResponse(ClusterClearKeyListener confListener) { + confListener.addChild(this, CONFIG_STATUS_OPERATOR_PATH); } @Override @@ -36,7 +39,7 @@ public class ConfigStatusResponse implements ClusterXmlLoader { LOGGER.info("notify " + configValue.getKey() + " " + configValue.getValue() + " " + configValue.getChangeType()); String path = configValue.getKey(); String[] paths = path.split(ClusterPathUtil.SEPARATOR); - if (paths.length != ClusterLogic.getPathHeight(ClusterPathUtil.getConfStatusPath()) + 1) { + if (paths.length != ClusterLogic.getPathHeight(CONFIG_STATUS_OPERATOR_PATH)) { return; } if ("".equals(configValue.getValue())) {