Merge remote-tracking branch 'origin/load-data' into load-data

This commit is contained in:
collapsar
2019-03-05 10:09:09 +08:00
6 changed files with 27 additions and 2 deletions
@@ -6,6 +6,7 @@
package com.actiontech.dble.config.loader.zkprocess.zktoxml.listen;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.btrace.provider.ClusterDelayProvider;
import com.actiontech.dble.cluster.ClusterParamCfg;
import com.actiontech.dble.config.loader.zkprocess.comm.NotifyService;
import com.actiontech.dble.config.loader.zkprocess.comm.ZkConfig;
@@ -45,6 +46,7 @@ public class ConfigStatusListener extends ZkMultiLoader implements NotifyService
@Override
public boolean notifyProcess() throws Exception {
ClusterDelayProvider.delayAfterGetNotice();
if (DbleServer.getInstance().getFrontProcessors() != null) {
String value = this.getDataToString(currZkPath);
ConfStatus status = new ConfStatus(value);
@@ -54,7 +56,9 @@ public class ConfigStatusListener extends ZkMultiLoader implements NotifyService
LOGGER.info("ConfigStatusListener notifyProcess zk to object :" + status);
if (status.getStatus() == ConfStatus.Status.ROLLBACK) {
try {
ClusterDelayProvider.delayBeforeSlaveRollback();
RollbackConfig.rollback();
ClusterDelayProvider.delayAfterSlaveRollback();
LOGGER.info("rollback config: sent config status success to zk start");
ZKUtils.createTempNode(KVPathUtil.getConfStatusPath(), ZkConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID),
SUCCESS.getBytes(StandardCharsets.UTF_8));
@@ -77,12 +81,14 @@ public class ConfigStatusListener extends ZkMultiLoader implements NotifyService
}
}
try {
ClusterDelayProvider.delayBeforeSlaveReload();
LOGGER.info("reload config: ready to reload config");
if (status.getStatus() == ConfStatus.Status.RELOAD_ALL) {
ReloadConfig.reloadAll(Integer.parseInt(status.getParams()));
} else {
ReloadConfig.reload();
}
ClusterDelayProvider.delayAfterSlaveReload();
LOGGER.info("reload config: sent config status success to zk start");
ZKUtils.createTempNode(KVPathUtil.getConfStatusPath(), ZkConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID), SUCCESS.getBytes(StandardCharsets.UTF_8));
LOGGER.info("reload config: sent config status success to zk end");
@@ -88,6 +88,7 @@ public final class ReloadConfig {
return;
}
LOGGER.info("reload config: added distributeLock " + KVPathUtil.getConfChangeLockPath() + " to zk");
ClusterDelayProvider.delayAfterReloadLock();
try {
reloadWithZookeeper(loadAll, loadAllMode, zkConn, c);
} finally {
@@ -192,6 +193,8 @@ public final class ReloadConfig {
try {
load(loadAll, loadAllMode);
LOGGER.info("reload config: single instance(self) finished");
ClusterDelayProvider.delayAfterMasterLoad();
XmltoZkMain.writeConfFileToZK(loadAll, loadAllMode);
LOGGER.info("reload config: sent config status to ucore");
//tell zk this instance has prepared
@@ -208,6 +211,7 @@ public final class ReloadConfig {
preparedList = zkConn.getChildren().forPath(KVPathUtil.getConfStatusPath());
}
LOGGER.info("reload config: all instances finished ");
ClusterDelayProvider.delayBeforeDeleteReloadLock();
StringBuilder sbErrorInfo = new StringBuilder();
for (String child : preparedList) {
String childPath = ZKPaths.makePath(KVPathUtil.getConfStatusPath(), child);
@@ -57,6 +57,7 @@ public final class RollbackConfig {
if (!distributeLock.acquire(100, TimeUnit.MILLISECONDS)) {
c.writeErrMessage(ErrorCode.ER_YES, "Other instance is reloading/rollbacking, please try again later.");
} else {
ClusterDelayProvider.delayAfterReloadLock();
try {
rollbackWithZk(zkConn, c);
} finally {
@@ -148,6 +149,8 @@ public final class RollbackConfig {
lock.lock();
try {
rollback();
ClusterDelayProvider.delayAfterMasterRollback();
XmltoZkMain.rollbackConf();
//tell zk this instance has prepared
ZKUtils.createTempNode(KVPathUtil.getConfStatusPath(), ZkConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID));
@@ -160,6 +163,7 @@ public final class RollbackConfig {
onlineList = zkConn.getChildren().forPath(KVPathUtil.getOnlinePath());
preparedList = zkConn.getChildren().forPath(KVPathUtil.getConfStatusPath());
}
ClusterDelayProvider.delayBeforeDeleterollbackLock();
for (String child : preparedList) {
zkConn.delete().forPath(ZKPaths.makePath(KVPathUtil.getConfStatusPath(), child));
}
@@ -6,6 +6,7 @@
package com.actiontech.dble.meta;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.btrace.provider.ClusterDelayProvider;
import com.actiontech.dble.cluster.ClusterParamCfg;
import com.actiontech.dble.config.loader.zkprocess.comm.ZkConfig;
import com.actiontech.dble.config.loader.zkprocess.zookeeper.process.DDLInfo;
@@ -28,6 +29,7 @@ public class DDLChildListener implements PathChildrenCacheListener {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
ClusterDelayProvider.delayAfterGetDdlNotice();
ChildData childData = event.getData();
switch (event.getType()) {
case CHILD_ADDED:
@@ -63,6 +65,7 @@ public class DDLChildListener implements PathChildrenCacheListener {
String[] tableInfo = nodeName.split("\\.");
final String schema = StringUtil.removeBackQuote(tableInfo[0]);
final String table = StringUtil.removeBackQuote(tableInfo[1]);
ClusterDelayProvider.delayBeforeUpdateMeta();
try {
DbleServer.getInstance().getTmManager().addMetaLock(schema, table, ddlInfo.getSql());
} catch (Exception t) {
@@ -85,12 +88,14 @@ public class DDLChildListener implements PathChildrenCacheListener {
if (DDLStatus.INIT == ddlInfo.getStatus()) {
return;
}
ClusterDelayProvider.delayBeforeUpdateMeta();
//to judge the table is be drop
if (ddlInfo.getType() == DDLInfo.DDLType.DROP_TABLE) {
DbleServer.getInstance().getTmManager().updateMetaData(ddlInfo.getSchema(), table, ddlInfo.getSql(), DDLInfo.DDLStatus.SUCCESS.equals(ddlInfo.getStatus()), false, ddlInfo.getType());
} else {
//else get the lastest table meta from db
//else get the latest table meta from db
DbleServer.getInstance().getTmManager().updateOnetableWithBackData(DbleServer.getInstance().getConfig(), ddlInfo.getSchema(), table);
ClusterDelayProvider.delayBeforeDdlResponse();
try {
DbleServer.getInstance().getTmManager().notifyResponseClusterDDL(ddlInfo.getSchema(), table, ddlInfo.getSql(), DDLInfo.DDLStatus.SUCCESS, ddlInfo.getType(), false);
} catch (Exception e) {
@@ -495,6 +495,7 @@ public class ProxyMetaManager {
String nodeName = StringUtil.getFullName(schema, table);
String nodePath = ZKPaths.makePath(KVPathUtil.getDDLPath(), nodeName);
zkConn.create().forPath(nodePath, ddlInfo.toString().getBytes(StandardCharsets.UTF_8));
ClusterDelayProvider.delayAfterDdlLockMeta();
} else if (DbleServer.getInstance().isUseUcore()) {
DDLInfo ddlInfo = new DDLInfo(schema, sql, UcoreConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID), DDLInfo.DDLStatus.INIT, DDLInfo.DDLType.UNKNOWN);
String nodeName = StringUtil.getUFullName(schema, table);
@@ -513,10 +514,10 @@ public class ProxyMetaManager {
public void notifyResponseClusterDDL(String schema, String table, String sql, DDLInfo.DDLStatus ddlStatus, DDLInfo.DDLType ddlType, boolean needNotifyOther) throws Exception {
ClusterDelayProvider.delayAfterDdlExecuted();
if (DbleServer.getInstance().isUseZK()) {
notifyResponseZKDdl(schema, table, sql, ddlStatus, ddlType, needNotifyOther);
} else if (DbleServer.getInstance().isUseUcore()) {
ClusterDelayProvider.delayAfterDdlExecuted();
notifyReponseUcoreDDL(schema, table, sql, ddlStatus, ddlType, needNotifyOther);
}
}
@@ -531,11 +532,14 @@ public class ProxyMetaManager {
String thisNode = ZkConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID);
ZKUtils.createTempNode(instancePath, thisNode);
if (needNotifyOther) {
ClusterDelayProvider.delayBeforeDdlNotice();
zkConn.setData().forPath(nodePath, ddlInfo.toString().getBytes(StandardCharsets.UTF_8));
ClusterDelayProvider.delayAfterDdlNotice();
while (true) {
List<String> preparedList = zkConn.getChildren().forPath(instancePath);
List<String> onlineList = zkConn.getChildren().forPath(KVPathUtil.getOnlinePath());
if (preparedList.size() >= onlineList.size()) {
ClusterDelayProvider.delayBeforeDdlNoticeDeleted();
zkConn.delete().deletingChildrenIfNeeded().forPath(nodePath);
break;
}
@@ -6,6 +6,7 @@
package com.actiontech.dble.meta;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.btrace.provider.ClusterDelayProvider;
import com.actiontech.dble.cluster.ClusterParamCfg;
import com.actiontech.dble.config.loader.zkprocess.comm.ZkConfig;
import com.alibaba.fastjson.JSONObject;
@@ -27,6 +28,7 @@ public class ViewChildListener implements PathChildrenCacheListener {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
ChildData childData = event.getData();
ClusterDelayProvider.delayWhenReponseViewNotic();
switch (event.getType()) {
case CHILD_ADDED:
createOrUpdateViewMeta(childData, false);