diff --git a/src/main/java/com/actiontech/dble/alarm/AlarmCode.java b/src/main/java/com/actiontech/dble/alarm/AlarmCode.java index 1d33a6fd9..2c1e20b6a 100644 --- a/src/main/java/com/actiontech/dble/alarm/AlarmCode.java +++ b/src/main/java/com/actiontech/dble/alarm/AlarmCode.java @@ -40,4 +40,5 @@ public final class AlarmCode { public static final String SHARDING_NODE_LACK = "DBLE_SHARDING_NODE_LACK"; //Resolve by trigger public static final String DB_INSTANCE_LOWER_CASE_ERROR = "DBLE_DB_INSTANCE_LOWER_CASE_ERROR"; //Resolve by trigger public static final String DB_SLAVE_INSTANCE_DELAY = "DBLE_DB_SLAVE_INSTANCE_DELAY"; //Resolve by trigger + public static final String DB_MASTER_INSTANCE_DELAY_FAIL = "DB_MASTER_INSTANCE_DELAY_FAIL"; } diff --git a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java index 76d3e98cc..72543d770 100644 --- a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java +++ b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbGroup.java @@ -9,6 +9,7 @@ import com.actiontech.dble.DbleServer; import com.actiontech.dble.alarm.AlarmCode; import com.actiontech.dble.alarm.Alert; import com.actiontech.dble.alarm.AlertUtil; +import com.actiontech.dble.backend.delyDetection.DelayDetection; import com.actiontech.dble.backend.heartbeat.MySQLHeartbeat; import com.actiontech.dble.backend.mysql.nio.MySQLInstance; import com.actiontech.dble.cluster.JsonFactory; @@ -20,6 +21,7 @@ import com.actiontech.dble.config.helper.GetAndSyncDbInstanceKeyVariables; import com.actiontech.dble.config.helper.KeyVariables; import com.actiontech.dble.config.model.db.DbGroupConfig; import com.actiontech.dble.config.model.db.DbInstanceConfig; +import com.actiontech.dble.config.model.db.type.DataBaseType; import com.actiontech.dble.meta.ReloadLogHelper; import com.actiontech.dble.net.IOProcessor; import com.actiontech.dble.net.Session; @@ -27,6 +29,8 @@ import com.actiontech.dble.net.connection.BackendConnection; import com.actiontech.dble.net.connection.PooledConnection; import com.actiontech.dble.rwsplit.RWSplitNonBlockingSession; import com.actiontech.dble.singleton.HaConfigManager; +import com.actiontech.dble.util.StringUtil; +import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.gson.Gson; @@ -38,6 +42,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.lang.reflect.Type; import java.util.*; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; public class PhysicalDbGroup { @@ -67,6 +72,9 @@ public class PhysicalDbGroup { private Set rwSplitSessionSet = Sets.newConcurrentHashSet(); private volatile Integer state = Integer.valueOf(INITIAL); + //delayDetection + private AtomicLong logicTimestamp = new AtomicLong(); + public static final int STATE_DELETING = 2; public static final int STATE_ABANDONED = 1; @@ -182,11 +190,12 @@ public class PhysicalDbGroup { this.analysisUseless = analysisUseless; } - private boolean checkSlaveSynStatus() { - return (dbGroupConfig.getDelayThreshold() != -1) && - (dbGroupConfig.isShowSlaveSql()); + private boolean checkSlaveSynStatus(PhysicalDbInstance ds) { + return (dbGroupConfig.getDelayThreshold() != -1 && + dbGroupConfig.isShowSlaveSql()) || ds.getDbGroup().isDelayDetectionStart() ; } + public PhysicalDbInstance getWriteDbInstance() { return writeDbInstance; } @@ -196,14 +205,14 @@ public class PhysicalDbGroup { ReloadLogHelper.debug("init new group :{},reason:{}", LOGGER, this.toString(), reason); } for (Map.Entry entry : allSourceMap.entrySet()) { - entry.getValue().init(reason, true); + entry.getValue().init(reason, true, true); } } public void startOfFresh(List sourceNames, String reason) { for (String sourceName : sourceNames) { if (allSourceMap.containsKey(sourceName)) { - allSourceMap.get(sourceName).init(reason, false); + allSourceMap.get(sourceName).init(reason, false, false); } } } @@ -242,7 +251,7 @@ public class PhysicalDbGroup { for (String sourceName : sourceNames) { if (allSourceMap.containsKey(sourceName)) { PhysicalDbInstance dbInstance = allSourceMap.get(sourceName); - dbInstance.stop(reason, closeFront, false, dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || writeDbInstance == dbInstance); + dbInstance.stop(reason, closeFront, false, dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || writeDbInstance == dbInstance, false); } } @@ -310,6 +319,21 @@ public class PhysicalDbGroup { } } + public void startDelayDetection() { + for (PhysicalDbInstance dbInstance : allSourceMap.values()) { + if (dbInstance.delayDetection.isStop()) { + dbInstance.delayDetection = new DelayDetection(dbInstance); + } + dbInstance.startDelayDetection(); + } + } + + public void stopDelayDetection(String reason) { + for (PhysicalDbInstance dbInstance : allSourceMap.values()) { + dbInstance.stopDelayDetection(reason); + } + } + public Collection getDbInstances(boolean isAll) { if (!isAll && rwSplitMode == RW_SPLIT_OFF) { @@ -455,7 +479,7 @@ public class PhysicalDbGroup { continue; } - if (ds.isAlive() && (!checkSlaveSynStatus() || ds.canSelectAsReadNode())) { + if (ds.isAlive() && (!checkSlaveSynStatus(ds) || ds.canSelectAsReadNode(ds))) { if (ds.getLogCount() != 0) { ds.setLogCount(0); } @@ -562,7 +586,7 @@ public class PhysicalDbGroup { newWriteHost.setReadInstance(false); String oldWriteInstance = writeDbInstance.getName(); writeDbInstance = newWriteHost; - newWriteHost.start("switch master from " + oldWriteInstance + " to the instance", false); + newWriteHost.start("switch master from " + oldWriteInstance + " to the instance", false, false); return this.getClusterHaJson(); } catch (Exception e) { LOGGER.warn("switchMaster Exception ", e); @@ -734,6 +758,19 @@ public class PhysicalDbGroup { this.allSourceMap.put(dbInstance.getName(), dbInstance); } + public AtomicLong getLogicTimestamp() { + return logicTimestamp; + } + + public void setLogicTimestamp(AtomicLong logicTimestamp) { + this.logicTimestamp = logicTimestamp; + } + + public boolean isDelayDetectionStart() { + return !Strings.isNullOrEmpty(dbGroupConfig.getDelayDatabase()) && + dbGroupConfig.getDelayThreshold() > 0 && dbGroupConfig.getDelayPeriodMillis() > 0 && getDbGroupConfig().getWriteInstanceConfig().getDataBaseType() == DataBaseType.MYSQL; + } + public boolean equalsBaseInfo(PhysicalDbGroup pool) { return pool.dbGroupConfig.equalsBaseInfo(this.dbGroupConfig) && pool.rwSplitMode == this.rwSplitMode && @@ -754,6 +791,13 @@ public class PhysicalDbGroup { pool.getDbGroupConfig().getKeepAlive() == this.getDbGroupConfig().getKeepAlive(); } + public boolean equalsForDelayDetection(PhysicalDbGroup pool) { + return pool.getDbGroupConfig().getDelayThreshold() == (this.dbGroupConfig.getDelayThreshold()) && + pool.getDbGroupConfig().getDelayPeriodMillis() == this.dbGroupConfig.getDelayPeriodMillis() && + StringUtil.equals(pool.getDbGroupConfig().getDelayDatabase(), this.dbGroupConfig.getDelayDatabase()) && + pool.getDbGroupConfig().getKeepAlive() == this.getDbGroupConfig().getKeepAlive(); + } + public void copyBaseInfo(PhysicalDbGroup physicalDbGroup) { this.dbGroupConfig = physicalDbGroup.dbGroupConfig; @@ -780,4 +824,6 @@ public class PhysicalDbGroup { ", rwSplitUseless=" + rwSplitUseless + '}'; } + + } diff --git a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbInstance.java b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbInstance.java index a2f8109f7..077758a17 100644 --- a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbInstance.java +++ b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDbInstance.java @@ -6,6 +6,8 @@ package com.actiontech.dble.backend.datasource; import com.actiontech.dble.DbleServer; +import com.actiontech.dble.backend.delyDetection.DelayDetection; +import com.actiontech.dble.backend.delyDetection.DelayDetectionStatus; import com.actiontech.dble.backend.heartbeat.MySQLHeartbeat; import com.actiontech.dble.backend.mysql.nio.handler.ConnectionHeartBeatHandler; import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler; @@ -53,6 +55,9 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { private volatile boolean fakeNode = false; private final LongAdder readCount = new LongAdder(); private final LongAdder writeCount = new LongAdder(); + private volatile DelayDetectionStatus delayDetectionStatus = DelayDetectionStatus.STOP; + protected DelayDetection delayDetection; + private final AtomicBoolean isInitial = new AtomicBoolean(false); @@ -63,6 +68,7 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { private volatile boolean needSkipHeartTest = false; private volatile int logCount; + public PhysicalDbInstance(DbInstanceConfig config, DbGroupConfig dbGroupConfig, boolean isReadNode) { this.config = config; this.name = config.getInstanceName(); @@ -81,7 +87,7 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { this.disabled = new AtomicBoolean(org.disabled.get()); } - public void init(String reason, boolean isInitHeartbeat) { + public void init(String reason, boolean isInitHeartbeat, boolean delayDetectionStart) { if (disabled.get() || fakeNode) { LOGGER.info("init dbInstance[{}] because {}, but it is disabled or a fakeNode, skip initialization.", this.dbGroup.getGroupName() + "." + name, reason); return; @@ -95,7 +101,10 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { checkPoolSize(); LOGGER.info("init dbInstance[{}]", this.dbGroup.getGroupName() + "." + name); - start(reason, isInitHeartbeat); + if (delayDetectionStart) { + delayDetection = new DelayDetection(this); + } + start(reason, isInitHeartbeat, delayDetectionStart); } protected void checkPoolSize() { @@ -367,7 +376,14 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { return config; } - boolean canSelectAsReadNode() { + boolean canSelectAsReadNode(PhysicalDbInstance ds) { + if (dbGroup.isDelayDetectionStart()) { + DelayDetectionStatus status = ds.getDelayDetectionStatus(); + if (status == DelayDetectionStatus.ERROR || status == DelayDetectionStatus.TIMEOUT) { + return false; + } + return true; + } Integer slaveBehindMaster = heartbeat.getSlaveBehindMaster(); int dbSynStatus = heartbeat.getDbSynStatus(); if (slaveBehindMaster == null || dbSynStatus == MySQLHeartbeat.DB_SYN_ERROR) { @@ -387,11 +403,41 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { heartbeat.start(config.getPoolConfig().getHeartbeatPeriodMillis()); } - void start(String reason, boolean isStartHeartbeat) { + public void startDelayDetection() { + if (this.isDisabled() || this.isFakeNode()) { + LOGGER.info("the instance[{}] is disabled or fake node, skip to start delayDetection.", this.dbGroup.getGroupName() + "." + name); + return; + } + if (!dbGroup.isDelayDetectionStart()) { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("this instance does not require delay detection to be enabled"); + } + return; + } + long initialDelay = 0; + if (readInstance) { + initialDelay = dbGroupConfig.getDelayPeriodMillis(); + } + delayDetection.start(initialDelay); + } + + public void start() { + if (this.isDisabled() || this.isFakeNode()) { + LOGGER.info("the instance[{}] is disabled or fake node, skip to start heartbeat.", this.dbGroup.getGroupName() + "." + name); + return; + } + + heartbeat.start(config.getPoolConfig().getHeartbeatPeriodMillis()); + } + + void start(String reason, boolean isStartHeartbeat, boolean delayDetectionStart) { startPool(reason); if (isStartHeartbeat) { startHeartbeat(); } + if (delayDetectionStart && dbGroup.isDelayDetectionStart()) { + startDelayDetection(); + } } public void startPool(String reason) { @@ -430,7 +476,7 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { } public void stopDirectly(String reason, boolean closeFront, boolean isStopPool) { - stop(reason, closeFront, true, dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this || isStopPool); + stop(reason, closeFront, true, dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this || isStopPool, true); } public void stop(String reason, boolean closeFront, boolean isStopPool) { @@ -449,10 +495,13 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { stopDirectly(reason, closeFront, false); } - protected void stop(String reason, boolean closeFront, boolean isStopHeartbeat, boolean isStopPool) { + protected void stop(String reason, boolean closeFront, boolean isStopHeartbeat, boolean isStopPool, boolean delayDetectionStop) { if (isStopHeartbeat) { stopHeartbeat(reason); } + if (delayDetectionStop) { + stopDelayDetection(reason); + } if (isStopPool) { stopPool(reason, closeFront); } @@ -466,6 +515,15 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { heartbeat.stop(reason); } + public void stopDelayDetection(String reason) { + if (LOGGER.isDebugEnabled()) { + ReloadLogHelper.debug("stop delayDetection :{},reason:{}", LOGGER, this.toString(), reason); + } + if (Objects.nonNull(delayDetection)) { + delayDetection.stop(reason); + } + } + public void stopPool(String reason, boolean closeFront) { LOGGER.info("stop connection pool of physical db instance[{}], due to {}", this.dbGroup.getGroupName() + "." + name, reason); connectionPool.stop(reason, closeFront); @@ -512,7 +570,7 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { public boolean enable() { if (disabled.compareAndSet(true, false)) { - start("execute manager cmd of enable", true); + start("execute manager cmd of enable", true, true); return true; } return false; @@ -554,6 +612,17 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { this.logCount = logCount; } + public DelayDetectionStatus getDelayDetectionStatus() { + return delayDetectionStatus; + } + + public void setDelayDetectionStatus(DelayDetectionStatus delayDetectionStatus) { + this.delayDetectionStatus = delayDetectionStatus; + } + + public DelayDetection getDelayDetection() { + return delayDetection; + } @Override public boolean equals(Object o) { @@ -569,6 +638,7 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { Objects.equals(disabled.get(), that.disabled.get()); } + @Override public int hashCode() { return Objects.hash(name, config, readInstance, dbGroupConfig, dbGroup, disabled, heartbeat); @@ -599,6 +669,15 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance { this.disabled.get() == dbInstance.isDisabled(); } + public boolean equalsForDelayDetection(PhysicalDbInstance dbInstance) { + return this.config.getUrl().equals(dbInstance.getConfig().getUrl()) && + this.config.getPort() == dbInstance.getConfig().getPort() && + this.config.getUser().equals(dbInstance.getConfig().getUser()) && + this.config.getPassword().equals(dbInstance.getConfig().getPassword()) && + this.config.isUsingDecrypt() == dbInstance.getConfig().isUsingDecrypt() && + this.disabled.get() == dbInstance.isDisabled(); + } + public boolean equalsForTestConn(PhysicalDbInstance dbInstance) { return this.config.getUrl().equals(dbInstance.getConfig().getUrl()) && this.config.getPort() == dbInstance.getConfig().getPort() && diff --git a/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetection.java b/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetection.java new file mode 100644 index 000000000..8d0fc41f7 --- /dev/null +++ b/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetection.java @@ -0,0 +1,295 @@ +package com.actiontech.dble.backend.delyDetection; + +import com.actiontech.dble.alarm.AlarmCode; +import com.actiontech.dble.alarm.Alert; +import com.actiontech.dble.alarm.AlertUtil; +import com.actiontech.dble.backend.datasource.PhysicalDbGroup; +import com.actiontech.dble.backend.datasource.PhysicalDbInstance; +import com.actiontech.dble.config.model.SystemConfig; +import com.actiontech.dble.config.model.db.DbGroupConfig; +import com.actiontech.dble.net.connection.BackendConnection; +import com.actiontech.dble.singleton.Scheduler; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +public class DelayDetection { + public static final Logger LOGGER = LoggerFactory.getLogger(DelayDetection.class); + + private boolean tableExists; + private volatile boolean stop = true; + private volatile BackendConnection conn; + private volatile ScheduledFuture scheduledFuture; + private final PhysicalDbInstance source; + private final DbGroupConfig dbGroupConfig; + private DelayDetectionStatus delayDetectionStatus; + private DelayDetectionTask delayDetectionTask; + private int delayThreshold; + private int delayPeriodMillis; + private AtomicLong version = new AtomicLong(); + private AtomicBoolean isChecking = new AtomicBoolean(); + private volatile LocalDateTime lastSendQryTime = LocalDateTime.now(); + private volatile LocalDateTime lastReceivedQryTime = LocalDateTime.now(); + private volatile long delayVal = 0; + private volatile int logicUpdate = 0; + + //table source field + private String sourceName; + private String sqlTableName; + private String updateSQL; + private String selectSQL; + private String createTableSQL; + private String errorMessage; + + + public DelayDetection(PhysicalDbInstance source) { + this.source = source; + this.dbGroupConfig = source.getDbGroupConfig(); + delayThreshold = dbGroupConfig.getDelayThreshold(); + delayPeriodMillis = dbGroupConfig.getDelayPeriodMillis(); + delayDetectionStatus = DelayDetectionStatus.INIT; + delayDetectionTask = new DelayDetectionTask(this); + synSql(); + } + + private void synSql() { + String[] str = {"dble", SystemConfig.getInstance().getInstanceName(), dbGroupConfig.getName()}; + sourceName = Joiner.on("_").join(str); + String schema = dbGroupConfig.getDelayDatabase(); + String tableName = ".u_delay "; + sqlTableName = schema + tableName; + StringBuilder select = new StringBuilder("select logic_timestamp from ? where source = '?'"); + selectSQL = convert(select, Lists.newArrayList(sqlTableName, sourceName)); + StringBuilder create = new StringBuilder("create table if not exists ? (source VARCHAR(256) primary key,real_timestamp varchar(26) NOT NULL,logic_timestamp BIGINT default 0)"); + createTableSQL = convert(create, Lists.newArrayList(sqlTableName)); + } + + private String convert(StringBuilder template, List list) { + StringBuilder sb = new StringBuilder(template); + String replace = "?"; + for (String str : list) { + int index = sb.indexOf(replace); + sb.replace(index, index + 1, str); + } + return sb.toString(); + } + + public void start(long initialDelay) { + LOGGER.info("start delayDetection of instance[{}]", source); + if (Objects.nonNull(scheduledFuture)) { + stop("the legacy thread is not closed"); + } + stop = false; + this.scheduledFuture = Scheduler.getInstance().getScheduledExecutor().scheduleAtFixedRate(() -> execute(), + initialDelay, delayPeriodMillis, TimeUnit.MILLISECONDS); + } + + public void execute() { + if (isChecking.compareAndSet(false, true)) { + if (delayDetectionTask.isQuit()) { + delayDetectionTask = new DelayDetectionTask(this); + } + if (!source.isReadInstance()) { + StringBuilder update = new StringBuilder("replace into ? (source,real_timestamp,logic_timestamp) values ('?','?',?)"); + List strings = Lists.newArrayList(sqlTableName, sourceName, String.valueOf(LocalDateTime.now()), String.valueOf(source.getDbGroup().getLogicTimestamp().incrementAndGet())); + updateSQL = convert(update, strings); + } + delayDetectionTask.execute(); + } else { + LocalDateTime result = lastReceivedQryTime; + if (lastSendQryTime.getNano() > lastReceivedQryTime.getNano()) { + result = lastSendQryTime; + } + Duration duration = Duration.between(result, LocalDateTime.now()); + if (duration.toMillis() > delayThreshold) { + if (source.isReadInstance()) { + delayVal = -1; + } + errorMessage = "connection did not respond after the delayThreshold time was exceeded"; + setResult(DelayDetectionStatus.TIMEOUT); + } + } + } + + public void stop(String reason) { + LOGGER.info("stop delayDetection of instance[{}], due to {}", source, reason); + stop = true; + if (Objects.nonNull(scheduledFuture)) { + scheduledFuture.cancel(true); + delayDetectionStatus = DelayDetectionStatus.STOP; + source.setDelayDetectionStatus(delayDetectionStatus); + cancel(reason); + scheduledFuture = null; + } + } + + public void setResult(DelayDetectionStatus result) { + isChecking.set(false); + switch (result) { + case OK: + setOk(); + break; + case TIMEOUT: + setTimeout(); + break; + case ERROR: + setError(); + break; + default: + break; + } + errorMessage = null; + } + + public void cancel(String reason) { + LOGGER.warn("delayDetection need cancel ,reason is {}", reason); + delayDetectionTask.close(); + updateLastReceivedQryTime(); + version.set(0); + final BackendConnection connection = conn; + if (Objects.nonNull(connection) && !connection.isClosed()) { + connection.businessClose(reason); + } + conn = null; + errorMessage = reason; + setResult(DelayDetectionStatus.ERROR); + } + + public void delayCal(long delay) { + PhysicalDbGroup dbGroup = source.getDbGroup(); + long logic = dbGroup.getLogicTimestamp().get(); + long result = logic - delay; + DelayDetectionStatus writeDbStatus = dbGroup.getWriteDbInstance().getDelayDetectionStatus(); + delayVal = result * delayPeriodMillis; + + //writeDbStatus is error,salve was considered normal + if (writeDbStatus == DelayDetectionStatus.ERROR || writeDbStatus == DelayDetectionStatus.TIMEOUT) { + setResult(DelayDetectionStatus.OK); + return; + } + if (delayThreshold > delayVal) { + setResult(DelayDetectionStatus.OK); + } else { + errorMessage = "found MySQL master/slave Replication delay" + source.getConfig() + ",sync time delay: " + delayVal + " ms"; + setResult(DelayDetectionStatus.TIMEOUT); + } + } + + public LocalDateTime getLastReceivedQryTime() { + return lastReceivedQryTime; + } + + public void delayDetectionRetry() { + execute(); + } + + public void updateLastSendQryTime() { + lastSendQryTime = LocalDateTime.now(); + } + + public void updateLastReceivedQryTime() { + lastReceivedQryTime = LocalDateTime.now(); + } + + private void setTimeout() { + LOGGER.warn("delayDetection to [" + source.getConfig().getUrl() + "] setTimeout"); + delayDetectionStatus = DelayDetectionStatus.TIMEOUT; + source.setDelayDetectionStatus(delayDetectionStatus); + alert(AlarmCode.DB_SLAVE_INSTANCE_DELAY, errorMessage, dbGroupConfig.instanceDatabaseType().name()); + } + + private void setError() { + LOGGER.warn("delayDetection to [" + source.getConfig().getUrl() + "] setError"); + delayDetectionStatus = DelayDetectionStatus.ERROR; + source.setDelayDetectionStatus(delayDetectionStatus); + if (!source.isReadInstance()) { + alert(AlarmCode.DB_MASTER_INSTANCE_DELAY_FAIL, "reason is " + errorMessage + "delayDetection status:" + delayDetectionStatus, dbGroupConfig.instanceDatabaseType().name()); + } + } + + private void setOk() { + LOGGER.debug("delayDetection to [" + source.getConfig().getUrl() + "] setOK"); + delayDetectionStatus = DelayDetectionStatus.OK; + source.setDelayDetectionStatus(delayDetectionStatus); + } + + private void alert(String alarmCode, String errMsg, String databaseType) { + String alertKey = source.getDbGroupConfig().getName() + "-" + source.getConfig().getInstanceName(); + Map labels = AlertUtil.genSingleLabel("dbInstance", alertKey); + AlertUtil.alert(alarmCode, Alert.AlertLevel.WARN, errMsg, databaseType, source.getConfig().getId(), labels); + } + + public PhysicalDbInstance getSource() { + return source; + } + + public BackendConnection getConn() { + return conn; + } + + public void setConn(BackendConnection conn) { + this.conn = conn; + } + + public AtomicLong getVersion() { + return version; + } + + public void setVersion(AtomicLong version) { + this.version = version; + } + + public String getUpdateSQL() { + return updateSQL; + } + + public String getSelectSQL() { + return selectSQL; + } + + public boolean isStop() { + return stop; + } + + public boolean isTableExists() { + return tableExists; + } + + public void setTableExists(boolean tableExists) { + this.tableExists = tableExists; + } + + public String getCreateTableSQL() { + return createTableSQL; + } + + public long getDelayVal() { + return delayVal; + } + + public String getErrorMessage() { + return errorMessage; + } + + public int getLogicUpdate() { + return logicUpdate; + } + + public void setLogicUpdate(int logicUpdate) { + this.logicUpdate = logicUpdate; + } +} + + + diff --git a/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetectionSqlJob.java b/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetectionSqlJob.java new file mode 100644 index 000000000..8552190f1 --- /dev/null +++ b/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetectionSqlJob.java @@ -0,0 +1,167 @@ +package com.actiontech.dble.backend.delyDetection; + +import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler; +import com.actiontech.dble.net.connection.BackendConnection; +import com.actiontech.dble.net.mysql.ErrorPacket; +import com.actiontech.dble.net.mysql.FieldPacket; +import com.actiontech.dble.net.mysql.RowDataPacket; +import com.actiontech.dble.net.service.AbstractService; +import com.actiontech.dble.services.mysqlsharding.MySQLResponseService; +import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler; +import com.actiontech.dble.sqlengine.SQLJobHandler; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; + +public class DelayDetectionSqlJob implements ResponseHandler { + + public static final Logger LOGGER = LoggerFactory.getLogger(DelayDetectionSqlJob.class); + + private final DelayDetection delayDetection; + private final SQLJobHandler jobHandler; + private String sql; + private long versionVal; + private AtomicBoolean finished = new AtomicBoolean(false); + private LocalDateTime responseTime; + private long keepAlive; + + public DelayDetectionSqlJob(DelayDetection delayDetection, OneRawSQLQueryResultHandler jobHandler) { + this.delayDetection = delayDetection; + this.jobHandler = jobHandler; + int delayPeriodMillis = delayDetection.getSource().getDbGroupConfig().getDelayPeriodMillis(); + this.keepAlive = delayDetection.getSource().getDbGroupConfig().getKeepAlive() + delayPeriodMillis; + sql = delayDetection.getSelectSQL(); + updateResponseTime(); + } + + public void execute() { + updateResponseTime(); + finished.set(false); + if (!delayDetection.isTableExists()) { + sql = delayDetection.getCreateTableSQL(); + } else if (delayDetection.getSource().isReadInstance()) { + sql = delayDetection.getSelectSQL(); + } else { + sql = delayDetection.getUpdateSQL(); + } + BackendConnection conn = delayDetection.getConn(); + if (Objects.isNull(conn)) { + LOGGER.warn("[delayDetection]connection establishment timeout,please pay attention to network latency or packet loss."); + delayDetection.cancel("connection establishment timeout"); + doFinished(true); + return; + } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("[delayDetection]do delayDetection,conn is " + conn); + } + try { + LocalDateTime now = LocalDateTime.now(); + Duration duration = Duration.between(responseTime, now); + if (duration.toMillis() > keepAlive) { + LOGGER.warn("[delayDetection]connection execution timeout {},please pay attention to network latency or packet loss.", duration.toMillis()); + delayDetection.cancel("connection execution timeout,"); + doFinished(true); + } + conn.getBackendService().query(sql); + } catch (Exception e) { + LOGGER.warn("[delayDetection]send delayDetection error", e); + delayDetection.cancel("send delayDetection error, because of [" + e.getMessage() + "]"); + doFinished(true); + } + } + + public void setVersionVal(long versionVal) { + this.versionVal = versionVal; + } + + private void doFinished(boolean failed) { + if (finished.compareAndSet(false, true)) { + jobHandler.finished(null, failed); + } + } + + private void updateResponseTime() { + responseTime = LocalDateTime.now(); + } + + @Override + public void connectionError(Throwable e, Object attachment) { + LOGGER.warn("[delayDetection]can't get connection for sql :" + sql, e); + updateResponseTime(); + delayDetection.cancel("delayDetection connection Error"); + doFinished(true); + } + + @Override + public void connectionAcquired(BackendConnection connection) { + if (delayDetection.getVersion().get() == versionVal) { + updateResponseTime(); + connection.getBackendService().setResponseHandler(this); + connection.getBackendService().setComplexQuery(true); + delayDetection.setConn(connection); + execute(); + } + } + + @Override + public void errorResponse(byte[] err, @NotNull AbstractService service) { + ErrorPacket errPg = new ErrorPacket(); + errPg.read(err); + MySQLResponseService responseService = (MySQLResponseService) service; + LOGGER.warn("[delayDetection]error response errNo: {}, {} from of sql: {} at con: {} db user = {}", + errPg.getErrNo(), new String(errPg.getMessage()), sql, service, + responseService.getConnection().getInstance().getConfig().getUser()); + updateResponseTime(); + delayDetection.cancel(new String(errPg.getMessage())); + if (!((MySQLResponseService) service).syncAndExecute()) { + service.getConnection().businessClose("[delayDetection]unfinished sync"); + doFinished(true); + return; + } + doFinished(true); + } + + @Override + public void okResponse(byte[] ok, @NotNull AbstractService service) { + updateResponseTime(); + if (((MySQLResponseService) service).syncAndExecute()) { + doFinished(false); + } + } + + @Override + public void fieldEofResponse(byte[] header, List fields, List fieldPackets, byte[] eof, boolean isLeft, @NotNull AbstractService service) { + updateResponseTime(); + jobHandler.onHeader(fields); + } + + @Override + public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, @NotNull AbstractService service) { + updateResponseTime(); + jobHandler.onRowData(row); + return false; + } + + @Override + public void rowEofResponse(byte[] eof, boolean isLeft, @NotNull AbstractService service) { + updateResponseTime(); + doFinished(false); + } + + @Override + public void connectionClose(@NotNull AbstractService service, String reason) { + updateResponseTime(); + LOGGER.warn("[delayDetection]conn for sql[" + sql + "] is closed, due to " + reason + ", we will try again immediately"); + delayDetection.cancel("delayDetection conn for sql[" + sql + "] is closed, due to " + reason); + delayDetection.delayDetectionRetry(); + doFinished(true); + } + + +} diff --git a/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetectionStatus.java b/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetectionStatus.java new file mode 100644 index 000000000..50c6c120b --- /dev/null +++ b/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetectionStatus.java @@ -0,0 +1,11 @@ +package com.actiontech.dble.backend.delyDetection; + +public enum DelayDetectionStatus { + INIT(), OK(), TIMEOUT(), ERROR(), STOP(); + + @Override + public String toString() { + return super.toString().toLowerCase(); + } + +} diff --git a/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetectionTask.java b/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetectionTask.java new file mode 100644 index 000000000..bf86022b4 --- /dev/null +++ b/src/main/java/com/actiontech/dble/backend/delyDetection/DelayDetectionTask.java @@ -0,0 +1,75 @@ +package com.actiontech.dble.backend.delyDetection; + +import com.actiontech.dble.backend.datasource.PhysicalDbInstance; +import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler; +import com.actiontech.dble.sqlengine.SQLQueryResult; +import com.actiontech.dble.sqlengine.SQLQueryResultListener; + +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; + +public class DelayDetectionTask { + private DelayDetection delayDetection; + private DelayDetectionSqlJob sqlJob; + private static final String LOGIC_TIMESTAMP = "logic_timestamp"; + private AtomicBoolean quit = new AtomicBoolean(false); + private static final String[] MYSQL_DELAY_DETECTION_COLS = new String[]{ + "logic_timestamp", + }; + + public DelayDetectionTask(DelayDetection delayDetection) { + this.delayDetection = delayDetection; + } + + public void execute() { + delayDetection.updateLastSendQryTime(); + if (Objects.isNull(sqlJob)) { + String[] fetchCols = {}; + if (delayDetection.getSource().isReadInstance()) { + fetchCols = MYSQL_DELAY_DETECTION_COLS; + } + OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(fetchCols, new DelayDetectionListener()); + sqlJob = new DelayDetectionSqlJob(delayDetection, resultHandler); + sqlJob.setVersionVal(delayDetection.getVersion().incrementAndGet()); + delayDetection.getSource().createConnectionSkipPool(null, sqlJob); + } else { + sqlJob.execute(); + } + } + + public boolean isQuit() { + return quit.get(); + } + + public void close() { + if (quit.compareAndSet(false, true)) { + sqlJob = null; + } + } + + private class DelayDetectionListener implements SQLQueryResultListener>> { + @Override + public void onResult(SQLQueryResult> result) { + delayDetection.updateLastReceivedQryTime(); + if (!result.isSuccess()) { + return; + } + PhysicalDbInstance source = delayDetection.getSource(); + if (source.isReadInstance()) { + Map resultResult = result.getResult(); + String logicTimestamp = Optional.ofNullable(resultResult.get(LOGIC_TIMESTAMP)).orElse(String.valueOf(source.getDbGroup().getLogicTimestamp().get())); + long logic = Long.parseLong(logicTimestamp); + delayDetection.delayCal(logic); + } else { + delayDetection.setResult(DelayDetectionStatus.OK); + } + //after the CREATE statement is successfully executed, the update statement needs to be executed + if (!delayDetection.isTableExists()) { + delayDetection.setTableExists(true); + delayDetection.execute(); + } + } + } +} diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDetector.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDetector.java index cf0e1f262..d052b1ace 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDetector.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLDetector.java @@ -95,7 +95,7 @@ public class MySQLDetector implements SQLQueryResultListener resultResult = result.getResult(); - if (source.getDbGroupConfig().isShowSlaveSql()) { + if (source.getDbGroupConfig().isShowSlaveSql() && !source.getDbGroup().isDelayDetectionStart()) { setStatusBySlave(source, resultResult); } else if (source.getDbGroupConfig().isSelectReadOnlySql()) { setStatusByReadOnly(source, resultResult); @@ -190,7 +190,7 @@ public class MySQLDetector implements SQLQueryResultListener 0) { String alertKey = source.getDbGroupConfig().getName() + "-" + source.getConfig().getInstanceName(); if (behindMaster > delayThreshold) { diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java index 4a6f21532..a48745f7e 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java @@ -231,7 +231,8 @@ public class MySQLHeartbeat { AlertUtil.alertResolve(AlarmCode.HEARTBEAT_FAIL, Alert.AlertLevel.WARN, "mysql", this.source.getConfig().getId(), labels); } //after the heartbeat changes from failure to success, it needs to be expanded immediately - if (source.getTotalConnections() == 0) { + if (source.getTotalConnections() == 0 && !status.equals(MySQLHeartbeatStatus.INIT) && !status.equals(MySQLHeartbeatStatus.OK)) { + LOGGER.debug("[updatePoolCapacity] heartbeat to [{}] setOk, previous status is {}", source, status); source.updatePoolCapacity(); } if (isStop) { diff --git a/src/main/java/com/actiontech/dble/backend/pool/ConnectionPool.java b/src/main/java/com/actiontech/dble/backend/pool/ConnectionPool.java index 391774e04..c8d8cf68e 100644 --- a/src/main/java/com/actiontech/dble/backend/pool/ConnectionPool.java +++ b/src/main/java/com/actiontech/dble/backend/pool/ConnectionPool.java @@ -80,9 +80,12 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener } try { ConnectionPoolProvider.getConnGetFrenshLocekAfter(); + int waiting = waiters.get(); for (PooledConnection conn : allConnections) { if (conn.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { - newPooledEntry(schema, waiters.get()); + if (waiting > 0 && conn.getCreateByWaiter().compareAndSet(true, false)) { + newPooledEntry(schema, waiting, true); + } return conn; } } @@ -100,11 +103,12 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener try { final int waiting = waiterNum; ConnectionPoolProvider.getConnGetFrenshLocekAfter(); + ConnectionPoolProvider.borrowConnectionBefore(); for (PooledConnection conn : allConnections) { if (conn.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { // If we may have stolen another waiter's connection, request another bag add. - if (waiting > 1) { - newPooledEntry(schema, waiting - 1); + if (waiting > 0 && conn.getCreateByWaiter().compareAndSet(true, false)) { + newPooledEntry(schema, waiting, true); } return conn; } @@ -112,14 +116,18 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener waiterNum = waiters.incrementAndGet(); try { - newPooledEntry(schema, waiterNum); + newPooledEntry(schema, waiterNum, true); + ConnectionPoolProvider.newConnectionAfter(); timeout = timeUnit.toNanos(timeout); do { final long start = System.nanoTime(); final PooledConnection bagEntry = handoffQueue.poll(timeout, NANOSECONDS); if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) { + if (bagEntry != null) { + bagEntry.getCreateByWaiter().set(false); + } return bagEntry; } @@ -135,7 +143,7 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener } } - private void newPooledEntry(final String schema, final int waiting) { + private void newPooledEntry(final String schema, final int waiting, boolean createByWaiter) { if (instance.isDisabled() || isClosed.get()) { return; } @@ -144,7 +152,7 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener Map labels = AlertUtil.genSingleLabel("dbInstance", alertKey); if (waiting > 0) { if (totalConnections.incrementAndGet() <= config.getMaxCon()) { - newConnection(schema, ConnectionPool.this); + newConnection(schema, ConnectionPool.this, createByWaiter); if (ToResolveContainer.REACH_MAX_CON.contains(alertKey)) { AlertUtil.alertResolve(AlarmCode.REACH_MAX_CON, Alert.AlertLevel.WARN, "dble", config.getId(), labels, ToResolveContainer.REACH_MAX_CON, alertKey); @@ -194,7 +202,7 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener } for (int i = 0; i < connectionsToAdd; i++) { // newPooledEntry(schemas[i % schemas.length]); - newPooledEntry(null, 1); + newPooledEntry(null, 1, false); } } @@ -208,6 +216,8 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener return; } + LOGGER.debug("connection create success: createByWaiter:{},new connection:{}", conn.getCreateByWaiter().get(), conn); + conn.lazySet(STATE_NOT_IN_USE); // spin until a thread takes it or none are waiting while (waiters.get() > 0 && conn.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(conn)) { @@ -224,6 +234,9 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener @Override public void onCreateFail(PooledConnection conn, Throwable e) { if (conn == null || conn.getIsCreateFail().compareAndSet(false, true)) { + if (conn != null) { + LOGGER.debug("connection create fail: createByWaiter:{},new connection:{}", conn.getCreateByWaiter().get(), conn); + } LOGGER.warn("create connection fail " + e.getMessage()); totalConnections.decrementAndGet(); // conn can be null if newChannel crashed (eg SocketException("too many open files")) diff --git a/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java b/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java index 724ed4b15..9b0f51acf 100644 --- a/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java +++ b/src/main/java/com/actiontech/dble/backend/pool/PoolBase.java @@ -7,6 +7,7 @@ package com.actiontech.dble.backend.pool; import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler; import com.actiontech.dble.config.model.db.DbInstanceConfig; +import com.actiontech.dble.net.connection.PooledConnection; import com.actiontech.dble.net.factory.PooledConnectionFactory; import java.io.IOException; @@ -36,9 +37,10 @@ public class PoolBase { } } - void newConnection(String schema, PooledConnectionListener listener) { + void newConnection(String schema, PooledConnectionListener listener, boolean createByWaiter) { try { - factory.make(instance, listener, schema); + PooledConnection pooledConnection = factory.make(instance, listener, schema); + pooledConnection.getCreateByWaiter().set(createByWaiter); } catch (IOException ioe) { listener.onCreateFail(null, ioe); } diff --git a/src/main/java/com/actiontech/dble/btrace/provider/ConnectionPoolProvider.java b/src/main/java/com/actiontech/dble/btrace/provider/ConnectionPoolProvider.java index c366d2000..f236498ec 100644 --- a/src/main/java/com/actiontech/dble/btrace/provider/ConnectionPoolProvider.java +++ b/src/main/java/com/actiontech/dble/btrace/provider/ConnectionPoolProvider.java @@ -22,5 +22,13 @@ public final class ConnectionPoolProvider { } + public static void newConnectionAfter() { + + } + + public static void borrowConnectionBefore() { + + } + } diff --git a/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/dbGroups/DBGroup.java b/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/dbGroups/DBGroup.java index 5de8c4026..4dd42d7e5 100644 --- a/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/dbGroups/DBGroup.java +++ b/src/main/java/com/actiontech/dble/cluster/zkprocess/entity/dbGroups/DBGroup.java @@ -30,6 +30,12 @@ public class DBGroup implements Named { @XmlAttribute protected String disableHA; + @XmlAttribute + protected Integer delayPeriodMillis; + + @XmlAttribute + protected String delayDatabase; + protected HeartBeat heartbeat; protected List dbInstance; @@ -106,6 +112,21 @@ public class DBGroup implements Named { this.disableHA = disableHA; } + public Integer getDelayPeriodMillis() { + return delayPeriodMillis; + } + + public void setDelayPeriodMillis(Integer delayPeriodMillis) { + this.delayPeriodMillis = delayPeriodMillis; + } + + public String getDelayDatabase() { + return delayDatabase; + } + + public void setDelayDatabase(String delayDatabase) { + this.delayDatabase = delayDatabase; + } @Override public String toString() { @@ -115,6 +136,10 @@ public class DBGroup implements Named { name + ", delayThreshold=" + delayThreshold + + ", delayPeriodMillis=" + + delayPeriodMillis + + ", delayDatabase=" + + delayDatabase + ", disableHA=" + disableHA + ", heartbeat=" + diff --git a/src/main/java/com/actiontech/dble/config/ServerConfig.java b/src/main/java/com/actiontech/dble/config/ServerConfig.java index 3fe60631d..40e6dd55c 100644 --- a/src/main/java/com/actiontech/dble/config/ServerConfig.java +++ b/src/main/java/com/actiontech/dble/config/ServerConfig.java @@ -41,7 +41,7 @@ import com.actiontech.dble.services.manager.response.ChangeType; import com.actiontech.dble.singleton.*; import com.actiontech.dble.util.StringUtil; import com.actiontech.dble.util.TimeUtil; -import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -489,7 +489,7 @@ public class ServerConfig { private void initDbGroupByMap(List changeItemList, Map oldDbGroupMap, Map newShardingNodes, boolean isFullyConfigured, int loadAllMode) { - List updateDbGroupList = Lists.newArrayList(); + Map updateDbGroupMap = Maps.newHashMap(); for (ChangeItem changeItem : changeItemList) { Object item = changeItem.getItem(); ChangeItemType itemType = changeItem.getItemType(); @@ -498,7 +498,7 @@ public class ServerConfig { addItem(item, itemType, oldDbGroupMap, newShardingNodes, isFullyConfigured); break; case UPDATE: - updateItem(item, itemType, oldDbGroupMap, newShardingNodes, changeItem, updateDbGroupList, loadAllMode); + updateItem(item, itemType, oldDbGroupMap, newShardingNodes, changeItem, updateDbGroupMap, loadAllMode); break; case DELETE: deleteItem(item, itemType, oldDbGroupMap, loadAllMode); @@ -507,9 +507,14 @@ public class ServerConfig { break; } } - for (PhysicalDbGroup physicalDbGroup : updateDbGroupList) { - physicalDbGroup.startHeartbeat(); - } + updateDbGroupMap.forEach((changeItem, dbGroup) -> { + if (changeItem.isAffectHeartbeat()) { + dbGroup.startHeartbeat(); + } + if (changeItem.isAffectDelayDetection() && dbGroup.isDelayDetectionStart()) { + dbGroup.startDelayDetection(); + } + }); } private void deleteItem(Object item, ChangeItemType itemType, Map oldDbGroupMap, int loadAllMode) { @@ -536,17 +541,28 @@ public class ServerConfig { } private void updateItem(Object item, ChangeItemType itemType, Map oldDbGroupMap, Map newShardingNodes, ChangeItem changeItem, - List updateDbGroupList, int loadAllMode) { + Map updateDbGroupMap, int loadAllMode) { if (itemType == ChangeItemType.PHYSICAL_DB_GROUP) { //change dbGroup PhysicalDbGroup physicalDbGroup = (PhysicalDbGroup) item; PhysicalDbGroup oldDbGroup = oldDbGroupMap.get(physicalDbGroup.getGroupName()); + boolean dbGroupCopy = false; if (changeItem.isAffectHeartbeat()) { oldDbGroup.stopHeartbeat("reload config, stop group heartbeat"); oldDbGroup.copyBaseInfo(physicalDbGroup); + dbGroupCopy = true; //create a new heartbeat in the follow-up - updateDbGroupList.add(oldDbGroup); - } else { + updateDbGroupMap.put(changeItem, oldDbGroup); + } + if (changeItem.isAffectDelayDetection()) { + oldDbGroup.stopDelayDetection("reload config, stop group delayDetection"); + if (!dbGroupCopy) { + oldDbGroup.copyBaseInfo(physicalDbGroup); + dbGroupCopy = true; + } + updateDbGroupMap.put(changeItem, oldDbGroup); + } + if (!dbGroupCopy) { oldDbGroup.copyBaseInfo(physicalDbGroup); } reloadSchema(oldDbGroup, newShardingNodes); @@ -564,15 +580,15 @@ public class ServerConfig { } oldDbGroupMap.put(physicalDbGroup.getGroupName(), oldDbGroup); } else if (itemType == ChangeItemType.PHYSICAL_DB_INSTANCE) { - if (changeItem.isAffectHeartbeat() || changeItem.isAffectConnectionPool()) { + if (changeItem.isAffectHeartbeat() || changeItem.isAffectConnectionPool() || changeItem.isAffectDelayDetection()) { PhysicalDbInstance physicalDbInstance = (PhysicalDbInstance) item; PhysicalDbGroup physicalDbGroup = oldDbGroupMap.get(physicalDbInstance.getDbGroupConfig().getName()); PhysicalDbInstance oldDbInstance = physicalDbGroup.getAllDbInstanceMap().get(physicalDbInstance.getName()); - oldDbInstance.stop("reload config, recycle old instance", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0), true); + oldDbInstance.stopDirectly("reload config, recycle old instance", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0), true); oldDbInstance = null; removeDbInstance(physicalDbGroup, physicalDbInstance.getName()); physicalDbGroup.setDbInstance(physicalDbInstance); - physicalDbInstance.init("reload config", true); + physicalDbInstance.init("reload config", true, true); } else { PhysicalDbInstance physicalDbInstance = (PhysicalDbInstance) item; PhysicalDbGroup physicalDbGroup = oldDbGroupMap.get(physicalDbInstance.getDbGroupConfig().getName()); @@ -620,7 +636,7 @@ public class ServerConfig { PhysicalDbGroup physicalDbGroup = oldDbGroupMap.get(dbInstance.getDbGroupConfig().getName()); if (isFullyConfigured) { physicalDbGroup.setDbInstance(dbInstance); - dbInstance.init("reload config", true); + dbInstance.init("reload config", true, true); } else { LOGGER.info("dbGroup[" + dbInstance.getDbGroupConfig().getName() + "] is not fullyConfigured, so doing nothing"); } diff --git a/src/main/java/com/actiontech/dble/config/converter/DBConverter.java b/src/main/java/com/actiontech/dble/config/converter/DBConverter.java index c635909c5..c0cac88b9 100644 --- a/src/main/java/com/actiontech/dble/config/converter/DBConverter.java +++ b/src/main/java/com/actiontech/dble/config/converter/DBConverter.java @@ -148,9 +148,13 @@ public class DBConverter { throw new ConfigException("dbGroup[" + dbGroupName + "]'s child database type must be consistent"); } } + int delayPeriodMillis = Optional.ofNullable(dbGroup.getDelayPeriodMillis()).orElse(-1); + String delayDatabase = dbGroup.getDelayDatabase(); DbGroupConfig dbGroupConf = new DbGroupConfig(dbGroupName, writeDbConf, readInstanceConfigList, delayThreshold, disableHA); dbGroupConf.setRwSplitMode(rwSplitMode); dbGroupConf.setHeartbeatSQL(heartbeatSQL); + dbGroupConf.setDelayDatabase(delayDatabase); + dbGroupConf.setDelayPeriodMillis(delayPeriodMillis); int heartbeatTimeout = Optional.ofNullable(heartbeat.getTimeout()).orElse(0); dbGroupConf.setHeartbeatTimeout(heartbeatTimeout * 1000); int heartbeatErrorRetryCount = Optional.ofNullable(heartbeat.getErrorRetryCount()).orElse(1); diff --git a/src/main/java/com/actiontech/dble/config/model/db/DbGroupConfig.java b/src/main/java/com/actiontech/dble/config/model/db/DbGroupConfig.java index cecdec29b..c2ffe609a 100644 --- a/src/main/java/com/actiontech/dble/config/model/db/DbGroupConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/db/DbGroupConfig.java @@ -26,6 +26,9 @@ public class DbGroupConfig { private boolean isShowSlaveSql = false; private boolean isSelectReadOnlySql = false; private int delayThreshold; + private int delayPeriodMillis; + private String delayDatabase; + private int heartbeatTimeout = 0; private int errorRetryCount = 1; @@ -156,6 +159,22 @@ public class DbGroupConfig { return writeInstanceConfig.getDataBaseType(); } + public int getDelayPeriodMillis() { + return delayPeriodMillis; + } + + public void setDelayPeriodMillis(int delayPeriodMillis) { + this.delayPeriodMillis = delayPeriodMillis; + } + + public String getDelayDatabase() { + return delayDatabase; + } + + public void setDelayDatabase(String delayDatabase) { + this.delayDatabase = delayDatabase; + } + public boolean equalsBaseInfo(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; @@ -169,6 +188,8 @@ public class DbGroupConfig { errorRetryCount == that.errorRetryCount && keepAlive == that.keepAlive && disableHA == that.disableHA && + delayPeriodMillis == that.delayPeriodMillis && + delayDatabase == that.delayDatabase && Objects.equals(name, that.name) && Objects.equals(heartbeatSQL, that.heartbeatSQL); } @@ -184,6 +205,8 @@ public class DbGroupConfig { ", isShowSlaveSql=" + isShowSlaveSql + ", isSelectReadOnlySql=" + isSelectReadOnlySql + ", delayThreshold=" + delayThreshold + + ", delayPeriodMillis=" + delayPeriodMillis + + ", delayDatabase=" + delayDatabase + ", heartbeatTimeout=" + heartbeatTimeout + ", errorRetryCount=" + errorRetryCount + ", keepAlive=" + keepAlive + diff --git a/src/main/java/com/actiontech/dble/net/connection/PooledConnection.java b/src/main/java/com/actiontech/dble/net/connection/PooledConnection.java index a1629842b..a3e1a251e 100644 --- a/src/main/java/com/actiontech/dble/net/connection/PooledConnection.java +++ b/src/main/java/com/actiontech/dble/net/connection/PooledConnection.java @@ -34,6 +34,8 @@ public abstract class PooledConnection extends AbstractConnection { public static final Comparator LAST_ACCESS_COMPARABLE; + private AtomicBoolean createByWaiter = new AtomicBoolean(false); + static { LAST_ACCESS_COMPARABLE = Comparator.comparingLong(entryOne -> entryOne.lastTime); } @@ -122,4 +124,8 @@ public abstract class PooledConnection extends AbstractConnection { public AtomicBoolean getIsCreateFail() { return isCreateFail; } + + public AtomicBoolean getCreateByWaiter() { + return createByWaiter; + } } diff --git a/src/main/java/com/actiontech/dble/plan/common/item/function/ItemCreate.java b/src/main/java/com/actiontech/dble/plan/common/item/function/ItemCreate.java index 5d9c26e2b..437454686 100644 --- a/src/main/java/com/actiontech/dble/plan/common/item/function/ItemCreate.java +++ b/src/main/java/com/actiontech/dble/plan/common/item/function/ItemCreate.java @@ -14,6 +14,8 @@ import com.actiontech.dble.plan.common.item.Item; import com.actiontech.dble.plan.common.item.function.bitfunc.ItemFuncBitCount; import com.actiontech.dble.plan.common.item.function.castfunc.*; import com.actiontech.dble.plan.common.item.function.convertfunc.*; +import com.actiontech.dble.plan.common.item.function.jsonfunc.ItemFuncJsonExtract; +import com.actiontech.dble.plan.common.item.function.jsonfunc.ItemFuncJsonUnQuote; import com.actiontech.dble.plan.common.item.function.mathsfunc.*; import com.actiontech.dble.plan.common.item.function.operator.cmpfunc.*; import com.actiontech.dble.plan.common.item.function.operator.controlfunc.ItemFuncIfnull; diff --git a/src/main/java/com/actiontech/dble/plan/common/item/function/jsonfunc/CustomJsonWriter.java b/src/main/java/com/actiontech/dble/plan/common/item/function/jsonfunc/CustomJsonWriter.java new file mode 100644 index 000000000..878f4e165 --- /dev/null +++ b/src/main/java/com/actiontech/dble/plan/common/item/function/jsonfunc/CustomJsonWriter.java @@ -0,0 +1,561 @@ +/* + * Copyright (C) 2016-2022 ActionTech. + * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ + +package com.actiontech.dble.plan.common.item.function.jsonfunc; + +import java.io.Closeable; +import java.io.Flushable; +import java.io.IOException; +import java.io.Writer; +import java.util.Arrays; + +import static com.actiontech.dble.plan.common.item.function.jsonfunc.JsonScope.*; + +/** + * migrate from com.google.gson.stream.JsonWriter class in package gson 2.8.9 + * for issue inner-1940 + *

+ * migrate reason: "keep the same format as mysql" + * improve: add space for ":"&"," + */ +public class CustomJsonWriter implements Closeable, Flushable { + + /* + * From RFC 7159, "All Unicode characters may be placed within the + * quotation marks except for the characters that must be escaped: + * quotation mark, reverse solidus, and the control characters + * (U+0000 through U+001F)." + * + * We also escape '\u2028' and '\u2029', which JavaScript interprets as + * newline characters. This prevents eval() from failing with a syntax + * error. http://code.google.com/p/google-gson/issues/detail?id=341 + */ + private static final String[] REPLACEMENT_CHARS; + private static final String[] HTML_SAFE_REPLACEMENT_CHARS; + + static { + REPLACEMENT_CHARS = new String[128]; + for (int i = 0; i <= 0x1f; i++) { + REPLACEMENT_CHARS[i] = String.format("\\u%04x", (int) i); + } + REPLACEMENT_CHARS['"'] = "\\\""; + REPLACEMENT_CHARS['\\'] = "\\\\"; + REPLACEMENT_CHARS['\t'] = "\\t"; + REPLACEMENT_CHARS['\b'] = "\\b"; + REPLACEMENT_CHARS['\n'] = "\\n"; + REPLACEMENT_CHARS['\r'] = "\\r"; + REPLACEMENT_CHARS['\f'] = "\\f"; + HTML_SAFE_REPLACEMENT_CHARS = REPLACEMENT_CHARS.clone(); + HTML_SAFE_REPLACEMENT_CHARS['<'] = "\\u003c"; + HTML_SAFE_REPLACEMENT_CHARS['>'] = "\\u003e"; + HTML_SAFE_REPLACEMENT_CHARS['&'] = "\\u0026"; + HTML_SAFE_REPLACEMENT_CHARS['='] = "\\u003d"; + HTML_SAFE_REPLACEMENT_CHARS['\''] = "\\u0027"; + } + + /** + * The output data, containing at most one top-level array or object. + */ + private final Writer out; + + private int[] stack = new int[32]; + private int stackSize = 0; + + { + push(EMPTY_DOCUMENT); + } + + /** + * A string containing a full set of spaces for a single level of + * indentation, or null for no pretty printing. + */ + private String indent; + + /** + * The name/value separator; either ":" or ": ". + */ + private String separator = ": "; + + private boolean lenient; + + private boolean htmlSafe; + + private String deferredName; + + private boolean serializeNulls = true; + + /** + * Creates a new instance that writes a JSON-encoded stream to {@code out}. + * For best performance, ensure {@link Writer} is buffered; wrapping in + * {@link java.io.BufferedWriter BufferedWriter} if necessary. + */ + public CustomJsonWriter(Writer out) { + if (out == null) { + throw new NullPointerException("out == null"); + } + this.out = out; + } + + /** + * Sets the indentation string to be repeated for each level of indentation + * in the encoded document. If {@code indent.isEmpty()} the encoded document + * will be compact. Otherwise the encoded document will be more + * human-readable. + * + * @param indent a string containing only whitespace. + */ + public final void setIndent(String indent) { + if (indent.length() == 0) { + this.indent = null; + this.separator = ":"; + } else { + this.indent = indent; + this.separator = ": "; + } + } + + /** + * Configure this writer to relax its syntax rules. By default, this writer + * only emits well-formed JSON as specified by RFC 7159. Setting the writer + * to lenient permits the following: + *

    + *
  • Top-level values of any type. With strict writing, the top-level + * value must be an object or an array. + *
  • Numbers may be {@link Double#isNaN() NaNs} or {@link + * Double#isInfinite() infinities}. + *
+ */ + public final void setLenient(boolean lenient) { + this.lenient = lenient; + } + + /** + * Returns true if this writer has relaxed syntax rules. + */ + public boolean isLenient() { + return lenient; + } + + /** + * Configure this writer to emit JSON that's safe for direct inclusion in HTML + * and XML documents. This escapes the HTML characters {@code <}, {@code >}, + * {@code &} and {@code =} before writing them to the stream. Without this + * setting, your XML/HTML encoder should replace these characters with the + * corresponding escape sequences. + */ + public final void setHtmlSafe(boolean htmlSafe) { + this.htmlSafe = htmlSafe; + } + + /** + * Returns true if this writer writes JSON that's safe for inclusion in HTML + * and XML documents. + */ + public final boolean isHtmlSafe() { + return htmlSafe; + } + + /** + * Sets whether object members are serialized when their value is null. + * This has no impact on array elements. The default is true. + */ + public final void setSerializeNulls(boolean serializeNulls) { + this.serializeNulls = serializeNulls; + } + + /** + * Returns true if object members are serialized when their value is null. + * This has no impact on array elements. The default is true. + */ + public final boolean getSerializeNulls() { + return serializeNulls; + } + + /** + * Begins encoding a new array. Each call to this method must be paired with + * a call to {@link #endArray}. + * + * @return this writer. + */ + public CustomJsonWriter beginArray() throws IOException { + writeDeferredName(); + return open(EMPTY_ARRAY, '['); + } + + /** + * Ends encoding the current array. + * + * @return this writer. + */ + public CustomJsonWriter endArray() throws IOException { + return close(EMPTY_ARRAY, NONEMPTY_ARRAY, ']'); + } + + /** + * Begins encoding a new object. Each call to this method must be paired + * with a call to {@link #endObject}. + * + * @return this writer. + */ + public CustomJsonWriter beginObject() throws IOException { + writeDeferredName(); + return open(EMPTY_OBJECT, '{'); + } + + /** + * Ends encoding the current object. + * + * @return this writer. + */ + public CustomJsonWriter endObject() throws IOException { + return close(EMPTY_OBJECT, NONEMPTY_OBJECT, '}'); + } + + /** + * Enters a new scope by appending any necessary whitespace and the given + * bracket. + */ + private CustomJsonWriter open(int empty, char openBracket) throws IOException { + beforeValue(); + push(empty); + out.write(openBracket); + return this; + } + + /** + * Flushes and closes this writer and the underlying {@link Writer}. + * + * @throws IOException if the JSON document is incomplete. + */ + public void close() throws IOException { + out.close(); + + int size = stackSize; + if (size > 1 || size == 1 && stack[size - 1] != NONEMPTY_DOCUMENT) { + throw new IOException("Incomplete document"); + } + stackSize = 0; + } + + /** + * Closes the current scope by appending any necessary whitespace and the + * given bracket. + */ + private CustomJsonWriter close(int empty, int nonempty, char closeBracket) + throws IOException { + int context = peek(); + if (context != nonempty && context != empty) { + throw new IllegalStateException("Nesting problem."); + } + if (deferredName != null) { + throw new IllegalStateException("Dangling name: " + deferredName); + } + + stackSize--; + if (context == nonempty) { + newline(); + } + out.write(closeBracket); + return this; + } + + private void push(int newTop) { + if (stackSize == stack.length) { + stack = Arrays.copyOf(stack, stackSize * 2); + } + stack[stackSize++] = newTop; + } + + /** + * Returns the value on the top of the stack. + */ + private int peek() { + if (stackSize == 0) { + throw new IllegalStateException("JsonWriter is closed."); + } + return stack[stackSize - 1]; + } + + /** + * Replace the value on the top of the stack with the given value. + */ + private void replaceTop(int topOfStack) { + stack[stackSize - 1] = topOfStack; + } + + /** + * Encodes the property name. + * + * @param name the name of the forthcoming value. May not be null. + * @return this writer. + */ + public CustomJsonWriter name(String name) throws IOException { + if (name == null) { + throw new NullPointerException("name == null"); + } + if (deferredName != null) { + throw new IllegalStateException(); + } + if (stackSize == 0) { + throw new IllegalStateException("JsonWriter is closed."); + } + deferredName = name; + return this; + } + + private void writeDeferredName() throws IOException { + if (deferredName != null) { + beforeName(); + string(deferredName); + deferredName = null; + } + } + + + /** + * Writes {@code value} directly to the writer without quoting or + * escaping. + * + * @param value the literal string value, or null to encode a null literal. + * @return this writer. + */ + public CustomJsonWriter jsonValue(String value) throws IOException { + if (value == null) { + return nullValue(); + } + writeDeferredName(); + beforeValue(); + out.append(value); + return this; + } + + /** + * Encodes {@code null}. + * + * @return this writer. + */ + public CustomJsonWriter nullValue() throws IOException { + if (deferredName != null) { + if (serializeNulls) { + writeDeferredName(); + } else { + deferredName = null; + return this; // skip the name and the value + } + } + beforeValue(); + out.write("null"); + return this; + } + + + /** + * Encodes {@code value}. + * + * @param value the literal string value, or null to encode a null literal. + * @return this writer. + */ + public CustomJsonWriter value(String value) throws IOException { + if (value == null) { + return nullValue(); + } + writeDeferredName(); + beforeValue(); + string(value); + return this; + } + + /** + * Encodes {@code value}. + * + * @return this writer. + */ + public CustomJsonWriter value(boolean value) throws IOException { + writeDeferredName(); + beforeValue(); + out.write(value ? "true" : "false"); + return this; + } + + /** + * Encodes {@code value}. + * + * @return this writer. + */ + public CustomJsonWriter value(Boolean value) throws IOException { + if (value == null) { + return nullValue(); + } + writeDeferredName(); + beforeValue(); + out.write(value ? "true" : "false"); + return this; + } + + /** + * Encodes {@code value}. + * + * @param value a finite value. May not be {@link Double#isNaN() NaNs} or + * {@link Double#isInfinite() infinities}. + * @return this writer. + */ + public CustomJsonWriter value(double value) throws IOException { + writeDeferredName(); + if (!lenient && (Double.isNaN(value) || Double.isInfinite(value))) { + throw new IllegalArgumentException("Numeric values must be finite, but was " + value); + } + beforeValue(); + out.append(Double.toString(value)); + return this; + } + + /** + * Encodes {@code value}. + * + * @return this writer. + */ + public CustomJsonWriter value(long value) throws IOException { + writeDeferredName(); + beforeValue(); + out.write(Long.toString(value)); + return this; + } + + /** + * Encodes {@code value}. + * + * @param value a finite value. May not be {@link Double#isNaN() NaNs} or + * {@link Double#isInfinite() infinities}. + * @return this writer. + */ + public CustomJsonWriter value(Number value) throws IOException { + if (value == null) { + return nullValue(); + } + + writeDeferredName(); + String string = value.toString(); + if (!lenient && (string.equals("-Infinity") || string.equals("Infinity") || string.equals("NaN"))) { + throw new IllegalArgumentException("Numeric values must be finite, but was " + value); + } + beforeValue(); + out.append(string); + return this; + } + + /** + * Ensures all buffered data is written to the underlying {@link Writer} + * and flushes that writer. + */ + public void flush() throws IOException { + if (stackSize == 0) { + throw new IllegalStateException("JsonWriter is closed."); + } + out.flush(); + } + + + private void string(String value) throws IOException { + String[] replacements = htmlSafe ? HTML_SAFE_REPLACEMENT_CHARS : REPLACEMENT_CHARS; + out.write('\"'); + int last = 0; + int length = value.length(); + // CHECKSTYLE:OFF + for (int i = 0; i < length; i++) { + char c = value.charAt(i); + String replacement; + if (c < 128) { + replacement = replacements[c]; + if (replacement == null) { + continue; + } + + } else if (c == '\u2028') { + replacement = "\\u2028"; + } else if (c == '\u2029') { + replacement = "\\u2029"; + } else { + continue; + } + if (last < i) { + out.write(value, last, i - last); + } + out.write(replacement); + last = i + 1; + } + // CHECKSTYLE:ON + if (last < length) { + out.write(value, last, length - last); + } + out.write('\"'); + } + + + private void newline() throws IOException { + if (indent == null) { + return; + } + + out.write('\n'); + for (int i = 1, size = stackSize; i < size; i++) { + out.write(indent); + } + } + + /** + * Inserts any necessary separators and whitespace before a name. Also + * adjusts the stack to expect the name's value. + */ + private void beforeName() throws IOException { + int context = peek(); + if (context == NONEMPTY_OBJECT) { // first in object + out.write(", "); + } else if (context != EMPTY_OBJECT) { // not in an object! + throw new IllegalStateException("Nesting problem."); + } + newline(); + replaceTop(DANGLING_NAME); + } + + /** + * Inserts any necessary separators and whitespace before a literal value, + * inline array, or inline object. Also adjusts the stack to expect either a + * closing bracket or another element. + */ + @SuppressWarnings("fallthrough") + private void beforeValue() throws IOException { + switch (peek()) { + case NONEMPTY_DOCUMENT: + if (!lenient) { + throw new IllegalStateException( + "JSON must have only one top-level value."); + } + // fall-through + case EMPTY_DOCUMENT: // first in document + replaceTop(NONEMPTY_DOCUMENT); + break; + + case EMPTY_ARRAY: // first in array + replaceTop(NONEMPTY_ARRAY); + newline(); + break; + + case NONEMPTY_ARRAY: // another in array + out.append(", "); + newline(); + break; + + case DANGLING_NAME: // value for name + out.append(separator); + replaceTop(NONEMPTY_OBJECT); + break; + + default: + throw new IllegalStateException("Nesting problem."); + } + } +} diff --git a/src/main/java/com/actiontech/dble/plan/common/item/function/strfunc/ItemFuncJsonExtract.java b/src/main/java/com/actiontech/dble/plan/common/item/function/jsonfunc/ItemFuncJsonExtract.java similarity index 90% rename from src/main/java/com/actiontech/dble/plan/common/item/function/strfunc/ItemFuncJsonExtract.java rename to src/main/java/com/actiontech/dble/plan/common/item/function/jsonfunc/ItemFuncJsonExtract.java index 8b631b490..3a1311947 100644 --- a/src/main/java/com/actiontech/dble/plan/common/item/function/strfunc/ItemFuncJsonExtract.java +++ b/src/main/java/com/actiontech/dble/plan/common/item/function/jsonfunc/ItemFuncJsonExtract.java @@ -1,15 +1,29 @@ -package com.actiontech.dble.plan.common.item.function.strfunc; +/* + * Copyright (C) 2016-2022 ActionTech. + * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ + +package com.actiontech.dble.plan.common.item.function.jsonfunc; import com.actiontech.dble.plan.common.item.Item; import com.actiontech.dble.plan.common.item.function.ItemFunc; +import com.actiontech.dble.plan.common.item.function.strfunc.ItemStrFunc; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import com.google.gson.internal.Streams; +import com.google.gson.stream.JsonWriter; +import java.io.IOException; +import java.io.StringWriter; import java.util.*; /** + * the json extract function. + * this implementation learn from mysql's。 + * * @author dcy * Create Date: 2022-01-24 */ @@ -90,15 +104,42 @@ public class ItemFuncJsonExtract extends ItemStrFunc { if (unquote && result.isJsonPrimitive()) { outputResult = (result.getAsString()); } else { - outputResult = (result.toString()); + outputResult = jsonToString(result); } } else { - outputResult = (results.toString()); + outputResult = jsonToString(results); } return outputResult; } + /** + * migrate from JsonElement#toString() + * + * @param node + * @return + */ + private static String jsonToString(JsonElement node) { + try { + StringWriter stringWriter = new StringWriter(); + final CustomJsonWriter originWriter = new CustomJsonWriter(stringWriter); + originWriter.setLenient(true); + JsonWriter jsonWriter = new JsonWriterAdaptor(stringWriter, originWriter); + Streams.write(node, jsonWriter); + return stringWriter.toString(); + } catch (IOException e) { + throw new AssertionError(e); + } + } + + + private static String jsonToString(Collection results) { + final JsonArray jsonArray = new JsonArray(); + for (JsonElement result : results) { + jsonArray.add(result); + } + return jsonToString(jsonArray); + } private static class JsonSeeker { boolean couldReturnMultipleMatches = false; @@ -310,6 +351,7 @@ public class ItemFuncJsonExtract extends ItemStrFunc { /** * for ** + * * @return */ private PathLeg parseEllipsisLeg() { @@ -343,6 +385,7 @@ public class ItemFuncJsonExtract extends ItemStrFunc { } else { findEndOfMemberName(); int endIndex = index; + //decode escape string boolean wasQuoted = (pattern[beginIndex] == DOUBLE_QUOTE); String tmpS; if (wasQuoted) { diff --git a/src/main/java/com/actiontech/dble/plan/common/item/function/strfunc/ItemFuncJsonUnQuote.java b/src/main/java/com/actiontech/dble/plan/common/item/function/jsonfunc/ItemFuncJsonUnQuote.java similarity index 85% rename from src/main/java/com/actiontech/dble/plan/common/item/function/strfunc/ItemFuncJsonUnQuote.java rename to src/main/java/com/actiontech/dble/plan/common/item/function/jsonfunc/ItemFuncJsonUnQuote.java index 5fa52333d..b75473b13 100644 --- a/src/main/java/com/actiontech/dble/plan/common/item/function/strfunc/ItemFuncJsonUnQuote.java +++ b/src/main/java/com/actiontech/dble/plan/common/item/function/jsonfunc/ItemFuncJsonUnQuote.java @@ -1,7 +1,14 @@ -package com.actiontech.dble.plan.common.item.function.strfunc; +/* + * Copyright (C) 2016-2022 ActionTech. + * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ + +package com.actiontech.dble.plan.common.item.function.jsonfunc; import com.actiontech.dble.plan.common.item.Item; import com.actiontech.dble.plan.common.item.function.ItemFunc; +import com.actiontech.dble.plan.common.item.function.strfunc.ItemStrFunc; import com.google.gson.JsonElement; import com.google.gson.JsonParser; diff --git a/src/main/java/com/actiontech/dble/plan/common/item/function/jsonfunc/JsonScope.java b/src/main/java/com/actiontech/dble/plan/common/item/function/jsonfunc/JsonScope.java new file mode 100644 index 000000000..105a89a2c --- /dev/null +++ b/src/main/java/com/actiontech/dble/plan/common/item/function/jsonfunc/JsonScope.java @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2016-2022 ActionTech. + * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ + +package com.actiontech.dble.plan.common.item.function.jsonfunc; + +/** + * migrate from gson 2.8.9 + * for issue inner-1940 + */ +final class JsonScope { + + /** + * An array with no elements requires no separators or newlines before + * it is closed. + */ + static final int EMPTY_ARRAY = 1; + + /** + * A array with at least one value requires a comma and newline before + * the next element. + */ + static final int NONEMPTY_ARRAY = 2; + + /** + * An object with no name/value pairs requires no separators or newlines + * before it is closed. + */ + static final int EMPTY_OBJECT = 3; + + /** + * An object whose most recent element is a key. The next element must + * be a value. + */ + static final int DANGLING_NAME = 4; + + /** + * An object with at least one name/value pair requires a comma and + * newline before the next element. + */ + static final int NONEMPTY_OBJECT = 5; + + /** + * No object or array has been started. + */ + static final int EMPTY_DOCUMENT = 6; + + /** + * A document with at an array or object. + */ + static final int NONEMPTY_DOCUMENT = 7; + + /** + * A document that's been closed and cannot be accessed. + */ + static final int CLOSED = 8; + + private JsonScope() { + + } +} diff --git a/src/main/java/com/actiontech/dble/plan/common/item/function/jsonfunc/JsonWriterAdaptor.java b/src/main/java/com/actiontech/dble/plan/common/item/function/jsonfunc/JsonWriterAdaptor.java new file mode 100644 index 000000000..efc8f6b50 --- /dev/null +++ b/src/main/java/com/actiontech/dble/plan/common/item/function/jsonfunc/JsonWriterAdaptor.java @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2016-2022 ActionTech. + * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ + +package com.actiontech.dble.plan.common.item.function.jsonfunc; + +import com.google.gson.stream.JsonWriter; + +import java.io.IOException; +import java.io.Writer; + +/** + * @author dcy + * Create Date: 2022-09-28 + */ +final class JsonWriterAdaptor extends JsonWriter { + CustomJsonWriter jsonWriter; + + JsonWriterAdaptor(Writer out, CustomJsonWriter jsonWriter) { + super(out); + this.jsonWriter = jsonWriter; + } + + @Override + public boolean isLenient() { + return jsonWriter.isLenient(); + } + + @Override + public JsonWriter beginArray() throws IOException { + jsonWriter.beginArray(); + return this; + } + + @Override + public JsonWriter endArray() throws IOException { + jsonWriter.endArray(); + return this; + } + + @Override + public JsonWriter beginObject() throws IOException { + jsonWriter.beginObject(); + return this; + } + + @Override + public JsonWriter endObject() throws IOException { + jsonWriter.endObject(); + return this; + } + + @Override + public JsonWriter name(String name) throws IOException { + jsonWriter.name(name); + return this; + } + + + + @Override + public JsonWriter jsonValue(String value) throws IOException { + jsonWriter.jsonValue(value); + return this; + } + + @Override + public JsonWriter nullValue() throws IOException { + jsonWriter.nullValue(); + return this; + } + + @Override + public JsonWriter value(String value) throws IOException { + jsonWriter.value(value); + return this; + } + + @Override + public JsonWriter value(boolean value) throws IOException { + jsonWriter.value(value); + return this; + } + + @Override + public JsonWriter value(Boolean value) throws IOException { + jsonWriter.value(value); + return this; + } + + @Override + public JsonWriter value(double value) throws IOException { + jsonWriter.value(value); + return this; + } + + @Override + public JsonWriter value(long value) throws IOException { + jsonWriter.value(value); + return this; + } + + @Override + public JsonWriter value(Number value) throws IOException { + jsonWriter.value(value); + return this; + } + + @Override + public void flush() throws IOException { + jsonWriter.flush(); + } + + @Override + public void close() throws IOException { + jsonWriter.close(); + } +} diff --git a/src/main/java/com/actiontech/dble/plan/visitor/MySQLItemVisitor.java b/src/main/java/com/actiontech/dble/plan/visitor/MySQLItemVisitor.java index 045fdf31e..310a84c52 100644 --- a/src/main/java/com/actiontech/dble/plan/visitor/MySQLItemVisitor.java +++ b/src/main/java/com/actiontech/dble/plan/visitor/MySQLItemVisitor.java @@ -21,6 +21,8 @@ import com.actiontech.dble.plan.common.item.function.castfunc.ItemFuncBinaryCast import com.actiontech.dble.plan.common.item.function.castfunc.ItemFuncConvCharset; import com.actiontech.dble.plan.common.item.function.castfunc.ItemNCharTypeCast; import com.actiontech.dble.plan.common.item.function.convertfunc.ItemCharTypeConvert; +import com.actiontech.dble.plan.common.item.function.jsonfunc.ItemFuncJsonExtract; +import com.actiontech.dble.plan.common.item.function.jsonfunc.ItemFuncJsonUnQuote; import com.actiontech.dble.plan.common.item.function.mathsfunc.ItemFuncConv; import com.actiontech.dble.plan.common.item.function.mathsfunc.operator.*; import com.actiontech.dble.plan.common.item.function.operator.cmpfunc.*; diff --git a/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java b/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java index 0066e2f3c..1f9be9908 100644 --- a/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java +++ b/src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java @@ -153,7 +153,11 @@ public class RWSplitNonBlockingSession extends Session { Boolean isMaster = canRunOnMaster(master); // first boolean firstValue = isMaster == null ? false : isMaster; long rwStickyTime = SystemConfig.getInstance().getRwStickyTime(); - if (rwGroup.getRwSplitMode() != PhysicalDbGroup.RW_SPLIT_OFF && (rwStickyTime > 0) && !firstValue) { + boolean rwSticky = rwStickyTime > 0; + if (rwGroup.isDelayDetectionStart()) { + rwSticky = false; + } + if (rwGroup.getRwSplitMode() != PhysicalDbGroup.RW_SPLIT_OFF && rwSticky && !firstValue) { if (this.getPreWriteResponseTime() > 0 && System.currentTimeMillis() - this.getPreWriteResponseTime() <= rwStickyTime) { isMaster = true; if (LOGGER.isDebugEnabled()) { diff --git a/src/main/java/com/actiontech/dble/services/manager/information/ManagerSchemaInfo.java b/src/main/java/com/actiontech/dble/services/manager/information/ManagerSchemaInfo.java index 8747ec727..f05fa0b16 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/ManagerSchemaInfo.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/ManagerSchemaInfo.java @@ -75,6 +75,7 @@ public final class ManagerSchemaInfo { registerTable(new DbleBackendConnectionsAssociateThread()); registerTable(new DbleClusterRenewThread()); registerTable(new RecyclingResource()); + registerTable(new DbleDelayDetection()); } private void initViews() { diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDbGroup.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDbGroup.java index 78794a55d..bac1b960c 100644 --- a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDbGroup.java +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDbGroup.java @@ -46,6 +46,8 @@ public class DbleDbGroup extends ManagerWritableTable { public static final String COLUMN_DELAY_THRESHOLD = "delay_threshold"; public static final String COLUMN_DISABLE_HA = "disable_ha"; public static final String COLUMN_ACTIVE = "active"; + public static final String DELAY_PERIOD_MILLIS = "delay_period_millis"; + public static final String DELAY_DATABASE = "delay_database"; private final List> tempRowList = Lists.newArrayList(); @@ -81,6 +83,12 @@ public class DbleDbGroup extends ManagerWritableTable { columns.put(COLUMN_DELAY_THRESHOLD, new ColumnMeta(COLUMN_DELAY_THRESHOLD, "int(11)", true, "-1")); columnsType.put(COLUMN_DELAY_THRESHOLD, Fields.FIELD_TYPE_LONG); + columns.put(DELAY_PERIOD_MILLIS, new ColumnMeta(DELAY_PERIOD_MILLIS, "int(11)", true, "-1")); + columnsType.put(DELAY_PERIOD_MILLIS, Fields.FIELD_TYPE_LONG); + + columns.put(DELAY_DATABASE, new ColumnMeta(DELAY_DATABASE, "varchar(255)", true, null)); + columnsType.put(DELAY_DATABASE, Fields.FIELD_TYPE_VAR_STRING); + columns.put(COLUMN_DISABLE_HA, new ColumnMeta(COLUMN_DISABLE_HA, "varchar(5)", true, "false")); columnsType.put(COLUMN_DISABLE_HA, Fields.FIELD_TYPE_VAR_STRING); @@ -215,6 +223,13 @@ public class DbleDbGroup extends ManagerWritableTable { case COLUMN_DISABLE_HA: dbGroup.setDisableHA(value); break; + case DELAY_PERIOD_MILLIS: + dbGroup.setDelayPeriodMillis(Integer.parseInt(value)); + break; + case DELAY_DATABASE: + dbGroup.setDelayDatabase(String.valueOf(value)); + break; + default: break; } @@ -283,6 +298,14 @@ public class DbleDbGroup extends ManagerWritableTable { if (!StringUtil.isBlank(heartbeatKeepAliveStr) && IntegerUtil.parseInt(heartbeatKeepAliveStr) < 0) { throw new ConfigException("Column '" + COLUMN_KEEP_ALIVE + "' should be an integer greater than or equal to 0!"); } + String delayPeriodMillis = row.get(DELAY_PERIOD_MILLIS); + delayDetectionCheck(delayPeriodMillis); + } + } + + private void delayDetectionCheck(String delayPeriodMillis) { + if (!StringUtil.isBlank(delayPeriodMillis) && IntegerUtil.parseInt(delayPeriodMillis) < -1) { + throw new ConfigException("Column '" + COLUMN_DELAY_THRESHOLD + "' should be an integer greater than -1!"); } } @@ -295,6 +318,8 @@ public class DbleDbGroup extends ManagerWritableTable { map.put(COLUMN_KEEP_ALIVE, String.valueOf(dbGroupConfig.getKeepAlive())); map.put(COLUMN_RW_SPLIT_MODE, String.valueOf(dbGroupConfig.getRwSplitMode())); map.put(COLUMN_DELAY_THRESHOLD, String.valueOf(dbGroupConfig.getDelayThreshold())); + map.put(DELAY_PERIOD_MILLIS, String.valueOf(dbGroupConfig.getDelayPeriodMillis())); + map.put(DELAY_DATABASE, String.valueOf(dbGroupConfig.getDelayDatabase())); map.put(COLUMN_DISABLE_HA, String.valueOf(dbGroupConfig.isDisableHA())); return map; } diff --git a/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDelayDetection.java b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDelayDetection.java new file mode 100644 index 000000000..e5153e06b --- /dev/null +++ b/src/main/java/com/actiontech/dble/services/manager/information/tables/DbleDelayDetection.java @@ -0,0 +1,160 @@ +/* + * Copyright (C) 2016-2022 ActionTech. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ + +package com.actiontech.dble.services.manager.information.tables; + +import com.actiontech.dble.DbleServer; +import com.actiontech.dble.backend.datasource.PhysicalDbGroup; +import com.actiontech.dble.backend.datasource.PhysicalDbInstance; +import com.actiontech.dble.backend.delyDetection.DelayDetection; +import com.actiontech.dble.config.ErrorCode; +import com.actiontech.dble.config.Fields; +import com.actiontech.dble.config.ServerConfig; +import com.actiontech.dble.config.model.db.DbInstanceConfig; +import com.actiontech.dble.meta.ColumnMeta; +import com.actiontech.dble.services.manager.information.ManagerWritableTable; +import com.actiontech.dble.util.StringUtil; +import com.google.common.collect.Lists; + +import java.sql.SQLException; +import java.util.*; +import java.util.concurrent.locks.ReentrantReadWriteLock; + + +public class DbleDelayDetection extends ManagerWritableTable { + private static final String TABLE_NAME = "delay_detection"; + + private static final String COLUMN_DB_GROUP_NAME = "db_group_name"; + private static final String COLUMN_NAME = "name"; + private static final String COLUMN_HOST = "host"; + private static final String COLUMN_DELAY = "delay"; + private static final String COLUMN_STATUS = "status"; + private static final String COLUMN_MESSAGE = "message"; + private static final String COLUMN_LAST_ACTIVE_TIME = "last_active_time"; + private static final String COLUMN_BACKEND_CONN_ID = "backend_conn_id"; + private static final String COLUMN_LOGIC_UPDATE = "logic_update"; + + public DbleDelayDetection() { + super(TABLE_NAME, 9); + } + + @Override + protected void initColumnAndType() { + + columns.put(COLUMN_DB_GROUP_NAME, new ColumnMeta(COLUMN_DB_GROUP_NAME, "varchar(64)", false)); + columnsType.put(COLUMN_DB_GROUP_NAME, Fields.FIELD_TYPE_VAR_STRING); + + columns.put(COLUMN_NAME, new ColumnMeta(COLUMN_NAME, "varchar(64)", false, true)); + columnsType.put(COLUMN_NAME, Fields.FIELD_TYPE_VAR_STRING); + + columns.put(COLUMN_HOST, new ColumnMeta(COLUMN_HOST, "int(11)", false)); + columnsType.put(COLUMN_HOST, Fields.FIELD_TYPE_VAR_STRING); + + columns.put(COLUMN_DELAY, new ColumnMeta(COLUMN_DELAY, "int(11)", false)); + columnsType.put(COLUMN_DELAY, Fields.FIELD_TYPE_LONG); + + columns.put(COLUMN_STATUS, new ColumnMeta(COLUMN_STATUS, "varchar(3)", false)); + columnsType.put(COLUMN_STATUS, Fields.FIELD_TYPE_VAR_STRING); + + columns.put(COLUMN_MESSAGE, new ColumnMeta(COLUMN_MESSAGE, "varchar(1024)", false)); + columnsType.put(COLUMN_MESSAGE, Fields.FIELD_TYPE_VAR_STRING); + + columns.put(COLUMN_LAST_ACTIVE_TIME, new ColumnMeta(COLUMN_LAST_ACTIVE_TIME, "timestamp", false)); + columnsType.put(COLUMN_LAST_ACTIVE_TIME, Fields.FIELD_TYPE_TIMESTAMP); + + columns.put(COLUMN_BACKEND_CONN_ID, new ColumnMeta(COLUMN_BACKEND_CONN_ID, "int(11)", false)); + columnsType.put(COLUMN_BACKEND_CONN_ID, Fields.FIELD_TYPE_LONG); + + columns.put(COLUMN_LOGIC_UPDATE, new ColumnMeta(COLUMN_LOGIC_UPDATE, "int(11)", false)); + columnsType.put(COLUMN_LOGIC_UPDATE, Fields.FIELD_TYPE_LONG); + + + } + + protected List> getRows() { + List> results = new ArrayList<>(); + ServerConfig conf = DbleServer.getInstance().getConfig(); + Map dbGroups = conf.getDbGroups(); + for (PhysicalDbGroup dbGroup : dbGroups.values()) { + if (dbGroup.isDelayDetectionStart()) { + for (PhysicalDbInstance dbInstance : dbGroup.getDbInstances(true)) { + LinkedHashMap row = getRow(dbInstance); + if (!row.isEmpty()) { + results.add(row); + } + } + } + } + return results; + } + + private LinkedHashMap getRow(PhysicalDbInstance dbInstance) { + LinkedHashMap row = new LinkedHashMap<>(); + final DelayDetection delayDetection = dbInstance.getDelayDetection(); + if (Objects.isNull(delayDetection) || delayDetection.isStop()) { + return row; + } + DbInstanceConfig config = dbInstance.getConfig(); + row.put(COLUMN_DB_GROUP_NAME, dbInstance.getDbGroup().getGroupName()); + row.put(COLUMN_NAME, dbInstance.getName()); + row.put(COLUMN_HOST, config.getUrl()); + row.put(COLUMN_DELAY, String.valueOf(delayDetection.getDelayVal())); + row.put(COLUMN_STATUS, String.valueOf(dbInstance.getDelayDetectionStatus())); + row.put(COLUMN_MESSAGE, delayDetection.getErrorMessage()); + row.put(COLUMN_LAST_ACTIVE_TIME, String.valueOf(delayDetection.getLastReceivedQryTime())); + row.put(COLUMN_LOGIC_UPDATE, String.valueOf(delayDetection.getLogicUpdate())); + Optional.ofNullable(delayDetection.getConn()).ifPresent(connection -> row.put(COLUMN_BACKEND_CONN_ID, String.valueOf(connection.getId()))); + return row; + } + + @Override + public int insertRows(List> rows) throws SQLException { + throw new SQLException("Access denied for table '" + tableName + "'", "42000", ErrorCode.ER_ACCESS_DENIED_ERROR); + } + + @Override + public int updateRows(Set> affectPks, LinkedHashMap values) throws SQLException { + if (values.size() != 1 || !values.containsKey(COLUMN_LOGIC_UPDATE)) { + throw new SQLException("only column '" + COLUMN_LOGIC_UPDATE + "' is writable", "42S22", ErrorCode.ER_ERROR_ON_WRITE); + } + final ReentrantReadWriteLock lock = DbleServer.getInstance().getConfig().getLock(); + lock.writeLock().lock(); + try { + int val = Integer.parseInt(values.get(COLUMN_LOGIC_UPDATE)); + ServerConfig conf = DbleServer.getInstance().getConfig(); + Map dbGroups = conf.getDbGroups(); + List instanceList = Lists.newArrayList(); + for (LinkedHashMap affectPk : affectPks) { + String groupName = affectPk.get(COLUMN_DB_GROUP_NAME); + String name = affectPk.get(COLUMN_NAME); + for (PhysicalDbGroup physicalDbGroup : dbGroups.values()) { + if (StringUtil.equals(groupName, physicalDbGroup.getGroupName()) && physicalDbGroup.isDelayDetectionStart()) { + PhysicalDbInstance instance = physicalDbGroup.getDbInstances(true).stream().filter(dbInstance -> StringUtil.equals(name, dbInstance.getName()) && Objects.nonNull(dbInstance.getDelayDetection())).findFirst().get(); + DelayDetection delayDetection = instance.getDelayDetection(); + int logicUpdate = delayDetection.getLogicUpdate(); + if (val != logicUpdate + 1) { + throw new SQLException("parameter only increment is allowed to be 1", "42S22", ErrorCode.ER_TABLE_CANT_HANDLE_AUTO_INCREMENT); + } + instanceList.add(instance); + } + } + } + for (PhysicalDbInstance instance : instanceList) { + instance.stopDelayDetection("the management end is shut down manually"); + instance.startDelayDetection(); + instance.getDelayDetection().setLogicUpdate(val); + + } + } finally { + lock.writeLock().unlock(); + } + return affectPks.size(); + } + + @Override + public int deleteRows(Set> affectPks) throws SQLException { + throw new SQLException("Access denied for table '" + tableName + "'", "42000", ErrorCode.ER_ACCESS_DENIED_ERROR); + } +} diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ChangeItem.java b/src/main/java/com/actiontech/dble/services/manager/response/ChangeItem.java index 246669983..5cc94a1e0 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ChangeItem.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ChangeItem.java @@ -5,6 +5,7 @@ public class ChangeItem { private Object item; private ChangeItemType itemType; private boolean affectHeartbeat; + private boolean affectDelayDetection; private boolean affectConnectionPool; private boolean affectTestConn; private boolean affectEntryDbGroup; @@ -77,6 +78,14 @@ public class ChangeItem { this.item = item; } + public boolean isAffectDelayDetection() { + return affectDelayDetection; + } + + public void setAffectDelayDetection(boolean affectDelayDetection) { + this.affectDelayDetection = affectDelayDetection; + } + @Override public String toString() { return "ChangeItem{" + @@ -87,6 +96,7 @@ public class ChangeItem { ", affectTestConn=" + affectTestConn + ", affectEntryDbGroup=" + affectEntryDbGroup + ", affectPoolCapacity=" + affectPoolCapacity + + ", affectDelayDetection=" + affectDelayDetection + '}'; } } diff --git a/src/main/java/com/actiontech/dble/services/manager/response/DryRun.java b/src/main/java/com/actiontech/dble/services/manager/response/DryRun.java index 64435e968..126fc2692 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/DryRun.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/DryRun.java @@ -150,9 +150,27 @@ public final class DryRun { list.add(new ErrorInfo("Cluster", "NOTICE", "Dble is in single mod")); } + delayDetection(serverConfig, list); + + printResult(service, list); } + private static void delayDetection(ServerConfig serverConfig, List list) { + Map dbGroups = serverConfig.getDbGroups(); + dbGroups.forEach((k, v) -> { + DataBaseType dataBaseType = v.getDbGroupConfig().instanceDatabaseType(); + if (dataBaseType == DataBaseType.MYSQL && v.isDelayDetectionStart()) { + String delayDatabase = v.getDbGroupConfig().getDelayDatabase(); + ShowDatabaseHandler mysqlShowDatabaseHandler = new ShowDatabaseHandler(dbGroups, "Database"); + Set databases = mysqlShowDatabaseHandler.execute(v.getWriteDbInstance()); + if (!databases.contains(delayDatabase)) { + list.add(new ErrorInfo("Xml", "WARNING", "database " + delayDatabase + " doesn't exists in dbGroup[" + v.getGroupName() + "]")); + } + } + }); + } + private static void ucoreConnectionTest(List list) { try { String selfPath = ClusterPathUtil.getOnlinePath(SystemConfig.getInstance().getInstanceName()); diff --git a/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java b/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java index f2ad968a1..80fa67d22 100644 --- a/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java +++ b/src/main/java/com/actiontech/dble/services/manager/response/ReloadConfig.java @@ -494,6 +494,9 @@ public final class ReloadConfig { if (!newDbGroup.equalsForHeartbeat(oldDbGroup)) { changeItem.setAffectHeartbeat(true); } + if (!newDbGroup.equalsForDelayDetection(oldDbGroup)) { + changeItem.setAffectDelayDetection(true); + } changeItemList.add(changeItem); } @@ -520,6 +523,9 @@ public final class ReloadConfig { if (!newDbInstance.equalsForHeartbeat(oldDbInstance)) { changeItem.setAffectHeartbeat(true); } + if (!newDbInstance.equalsForDelayDetection(oldDbInstance)) { + changeItem.setAffectDelayDetection(true); + } if (!newDbInstance.equalsForTestConn(oldDbInstance)) { changeItem.setAffectTestConn(true); } else { diff --git a/src/main/resources/db_detail.xsd b/src/main/resources/db_detail.xsd index 423a2fad0..df35aa9fe 100644 --- a/src/main/resources/db_detail.xsd +++ b/src/main/resources/db_detail.xsd @@ -53,6 +53,8 @@ + +