inner-912: fixed:lost connection when dble doing cluster ddl (#2506)

Signed-off-by: dcy <dcy10000@gmail.com>
(cherry picked from commit 7e53d8473f)
This commit is contained in:
Rico
2021-03-10 16:16:26 +08:00
committed by dcy10000
parent b0e6e85026
commit e659650db8
3 changed files with 122 additions and 58 deletions
@@ -171,6 +171,30 @@ public final class ClusterLogic {
}
}
public static void processStatusEvent(String keyName, DDLInfo ddlInfo, DDLInfo.DDLStatus status) {
try {
switch (status) {
case INIT:
ClusterLogic.initDDLEvent(keyName, ddlInfo);
break;
case SUCCESS:
// just release local lock
ClusterLogic.ddlUpdateEvent(keyName, ddlInfo);
break;
case FAILED:
// just release local lock
ClusterLogic.ddlFailedEvent(keyName);
break;
default:
break;
}
} catch (Exception e) {
LOGGER.info("Error when update the meta data of the DDL " + ddlInfo.toString());
}
}
public static void deleteDDLNodeEvent(DDLInfo ddlInfo, String path) throws Exception {
LOGGER.info("DDL node " + path + " removed , and DDL info is " + ddlInfo.toString());
@@ -45,21 +45,26 @@ public class DdlChildResponse implements ClusterXmlLoader {
return; //self node
}
if (KvBean.DELETE.equals(configValue.getChangeType())) {
ClusterLogic.deleteDDLNodeEvent(ddlInfo, path);
return;
switch (configValue.getChangeType()) {
case KvBean.ADD: {
String key = paths[paths.length - 1];
initMeta(key, ddlInfo);
break;
}
case KvBean.UPDATE: {
String key = paths[paths.length - 1];
updateMeta(key, ddlInfo);
break;
}
case KvBean.DELETE:
ClusterLogic.deleteDDLNodeEvent(ddlInfo, path);
break;
default:
break;
}
String key = paths[paths.length - 1];
//if the start node is preparing to do the ddl
if (ddlInfo.getStatus() == DDLInfo.DDLStatus.INIT) {
ClusterLogic.initDDLEvent(key, ddlInfo);
} else if (ddlInfo.getStatus() == DDLInfo.DDLStatus.SUCCESS) {
ClusterLogic.ddlUpdateEvent(key, ddlInfo);
} else if (ddlInfo.getStatus() == DDLInfo.DDLStatus.FAILED) {
ClusterLogic.ddlFailedEvent(key);
}
}
@@ -67,4 +72,35 @@ public class DdlChildResponse implements ClusterXmlLoader {
public void notifyCluster() throws Exception {
}
private void initMeta(String keyName, DDLInfo ddlInfo) {
ClusterLogic.processStatusEvent(keyName, ddlInfo, DDLInfo.DDLStatus.INIT);
if (DDLInfo.DDLStatus.INIT != ddlInfo.getStatus()) {
LOGGER.warn("get a special CREATE event when doing cluster ddl , status:{}, data is {}", ddlInfo.getStatus(), ddlInfo.toString());
ClusterLogic.processStatusEvent(keyName, ddlInfo, ddlInfo.getStatus());
}
}
private void updateMeta(String keyName, DDLInfo ddlInfo) {
if (DDLInfo.DDLStatus.INIT == ddlInfo.getStatus()) {
//missing DELETE event.
LOGGER.warn("get a special UPDATE event when doing cluster ddl , status:{}, data is {}", ddlInfo.getStatus(), ddlInfo.toString());
ClusterLogic.processStatusEvent(keyName, ddlInfo, ddlInfo.getStatus());
} else {
// just release local lock
ClusterLogic.processStatusEvent(keyName, ddlInfo, ddlInfo.getStatus());
}
}
}
@@ -32,74 +32,78 @@ public class DDLChildListener implements PathChildrenCacheListener {
}
ClusterDelayProvider.delayAfterGetDdlNotice();
ChildData childData = event.getData();
final String ddlInfoStr = new String(childData.getData(), StandardCharsets.UTF_8);
DDLInfo ddlInfo = new DDLInfo(ddlInfoStr);
switch (event.getType()) {
case CHILD_ADDED:
try {
initDDL(childData);
} catch (Exception e) {
LOGGER.warn("CHILD_ADDED error", e);
}
LOGGER.info("DDL node " + childData.getPath() + " created , and data is " + ddlInfoStr);
break;
case CHILD_UPDATED:
updateMeta(childData);
LOGGER.info("DDL node " + childData.getPath() + " updated , and data is " + ddlInfoStr);
break;
case CHILD_REMOVED:
deleteNode(childData);
LOGGER.info("DDL node " + childData.getPath() + " deleted , and data is " + ddlInfoStr);
break;
default:
break;
}
if (ddlInfo.getFrom().equals(SystemConfig.getInstance().getInstanceName())) {
LOGGER.info("DDL node " + childData.getPath() + " is from myself ,so just return ,and data is " + ddlInfo.toString());
return; //self node
}
switch (event.getType()) {
case CHILD_ADDED:
initMeta(childData, ddlInfo);
break;
case CHILD_UPDATED:
updateMeta(childData, ddlInfo);
break;
case CHILD_REMOVED:
deleteNode(childData, ddlInfo);
break;
default:
break;
}
}
private void initDDL(ChildData childData) throws Exception {
String childPath = childData.getPath();
String data = new String(childData.getData(), StandardCharsets.UTF_8);
DDLInfo ddlInfo = new DDLInfo(data);
private void initMeta(ChildData childData, DDLInfo ddlInfo) {
final String childPath = childData.getPath();
String keyName = childPath.substring(childPath.lastIndexOf("/") + 1);
ClusterLogic.processStatusEvent(keyName, ddlInfo, DDLStatus.INIT);
LOGGER.info("DDL node " + childData.getPath() + " created , and data is " + data);
if (ddlInfo.getFrom().equals(SystemConfig.getInstance().getInstanceName())) {
LOGGER.info("DDL node " + childPath + " is from myself ,so just return ,and data is " + data);
return; //self node
}
if (DDLStatus.INIT != ddlInfo.getStatus()) {
return;
LOGGER.warn("get a special CREATE event of zk when doing cluster ddl , status:{}, data is {}", ddlInfo.getStatus(), ddlInfo.toString());
ClusterLogic.processStatusEvent(keyName, ddlInfo, ddlInfo.getStatus());
}
String keyName = childPath.substring(childPath.lastIndexOf("/") + 1);
ClusterLogic.initDDLEvent(keyName, ddlInfo);
}
private void updateMeta(ChildData childData) {
String childPath = childData.getPath();
String data = new String(childData.getData(), StandardCharsets.UTF_8);
DDLInfo ddlInfo = new DDLInfo(data);
if (ddlInfo.getFrom().equals(SystemConfig.getInstance().getInstanceName())) {
LOGGER.info("DDL node " + childData.getPath() + " is from myself ,so just return ,and data is " + data);
return; //self node
}
private void updateMeta(ChildData childData, DDLInfo ddlInfo) {
final String childPath = childData.getPath();
String keyName = childPath.substring(childPath.lastIndexOf("/") + 1);
if (DDLStatus.INIT == ddlInfo.getStatus()) {
return;
}
String keyName = childPath.substring(childPath.lastIndexOf("/") + 1);
LOGGER.info("DDL node " + childPath + " updated , and data is " + ddlInfo);
// just release local lock
if (ddlInfo.getStatus() == DDLStatus.FAILED) {
try {
ClusterLogic.ddlFailedEvent(keyName);
} catch (Exception e) {
LOGGER.info("Error when update the meta data of the DDL " + ddlInfo.toString());
}
//missing DELETE event.
LOGGER.warn("get a special UPDATE event of zk when doing cluster ddl , status:{}, data is {}", ddlInfo.getStatus(), ddlInfo.toString());
ClusterLogic.processStatusEvent(keyName, ddlInfo, ddlInfo.getStatus());
} else {
try {
ClusterLogic.ddlUpdateEvent(keyName, ddlInfo);
} catch (Exception e) {
LOGGER.info("Error when update the meta data of the DDL " + ddlInfo.toString());
}
// just release local lock
ClusterLogic.processStatusEvent(keyName, ddlInfo, ddlInfo.getStatus());
}
}
private void deleteNode(ChildData childData) throws Exception {
String data = new String(childData.getData(), StandardCharsets.UTF_8);
DDLInfo ddlInfo = new DDLInfo(data);
private void deleteNode(ChildData childData, DDLInfo ddlInfo) throws Exception {
ClusterLogic.deleteDDLNodeEvent(ddlInfo, childData.getPath());
}
}