Merge branch 'master' into inner-1948

This commit is contained in:
wenyh
2022-10-27 11:17:01 +08:00
committed by GitHub
32 changed files with 1841 additions and 45 deletions

View File

@@ -40,4 +40,5 @@ public final class AlarmCode {
public static final String SHARDING_NODE_LACK = "DBLE_SHARDING_NODE_LACK"; //Resolve by trigger
public static final String DB_INSTANCE_LOWER_CASE_ERROR = "DBLE_DB_INSTANCE_LOWER_CASE_ERROR"; //Resolve by trigger
public static final String DB_SLAVE_INSTANCE_DELAY = "DBLE_DB_SLAVE_INSTANCE_DELAY"; //Resolve by trigger
public static final String DB_MASTER_INSTANCE_DELAY_FAIL = "DB_MASTER_INSTANCE_DELAY_FAIL";
}

View File

@@ -9,6 +9,7 @@ import com.actiontech.dble.DbleServer;
import com.actiontech.dble.alarm.AlarmCode;
import com.actiontech.dble.alarm.Alert;
import com.actiontech.dble.alarm.AlertUtil;
import com.actiontech.dble.backend.delyDetection.DelayDetection;
import com.actiontech.dble.backend.heartbeat.MySQLHeartbeat;
import com.actiontech.dble.backend.mysql.nio.MySQLInstance;
import com.actiontech.dble.cluster.JsonFactory;
@@ -20,6 +21,7 @@ import com.actiontech.dble.config.helper.GetAndSyncDbInstanceKeyVariables;
import com.actiontech.dble.config.helper.KeyVariables;
import com.actiontech.dble.config.model.db.DbGroupConfig;
import com.actiontech.dble.config.model.db.DbInstanceConfig;
import com.actiontech.dble.config.model.db.type.DataBaseType;
import com.actiontech.dble.meta.ReloadLogHelper;
import com.actiontech.dble.net.IOProcessor;
import com.actiontech.dble.net.Session;
@@ -27,6 +29,8 @@ import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.connection.PooledConnection;
import com.actiontech.dble.rwsplit.RWSplitNonBlockingSession;
import com.actiontech.dble.singleton.HaConfigManager;
import com.actiontech.dble.util.StringUtil;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
@@ -38,6 +42,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class PhysicalDbGroup {
@@ -67,6 +72,9 @@ public class PhysicalDbGroup {
private Set<Session> rwSplitSessionSet = Sets.newConcurrentHashSet();
private volatile Integer state = Integer.valueOf(INITIAL);
//delayDetection
private AtomicLong logicTimestamp = new AtomicLong();
public static final int STATE_DELETING = 2;
public static final int STATE_ABANDONED = 1;
@@ -182,11 +190,12 @@ public class PhysicalDbGroup {
this.analysisUseless = analysisUseless;
}
private boolean checkSlaveSynStatus() {
return (dbGroupConfig.getDelayThreshold() != -1) &&
(dbGroupConfig.isShowSlaveSql());
private boolean checkSlaveSynStatus(PhysicalDbInstance ds) {
return (dbGroupConfig.getDelayThreshold() != -1 &&
dbGroupConfig.isShowSlaveSql()) || ds.getDbGroup().isDelayDetectionStart() ;
}
public PhysicalDbInstance getWriteDbInstance() {
return writeDbInstance;
}
@@ -196,14 +205,14 @@ public class PhysicalDbGroup {
ReloadLogHelper.debug("init new group :{},reason:{}", LOGGER, this.toString(), reason);
}
for (Map.Entry<String, PhysicalDbInstance> entry : allSourceMap.entrySet()) {
entry.getValue().init(reason, true);
entry.getValue().init(reason, true, true);
}
}
public void startOfFresh(List<String> sourceNames, String reason) {
for (String sourceName : sourceNames) {
if (allSourceMap.containsKey(sourceName)) {
allSourceMap.get(sourceName).init(reason, false);
allSourceMap.get(sourceName).init(reason, false, false);
}
}
}
@@ -242,7 +251,7 @@ public class PhysicalDbGroup {
for (String sourceName : sourceNames) {
if (allSourceMap.containsKey(sourceName)) {
PhysicalDbInstance dbInstance = allSourceMap.get(sourceName);
dbInstance.stop(reason, closeFront, false, dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || writeDbInstance == dbInstance);
dbInstance.stop(reason, closeFront, false, dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || writeDbInstance == dbInstance, false);
}
}
@@ -310,6 +319,21 @@ public class PhysicalDbGroup {
}
}
public void startDelayDetection() {
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
if (dbInstance.delayDetection.isStop()) {
dbInstance.delayDetection = new DelayDetection(dbInstance);
}
dbInstance.startDelayDetection();
}
}
public void stopDelayDetection(String reason) {
for (PhysicalDbInstance dbInstance : allSourceMap.values()) {
dbInstance.stopDelayDetection(reason);
}
}
public Collection<PhysicalDbInstance> getDbInstances(boolean isAll) {
if (!isAll && rwSplitMode == RW_SPLIT_OFF) {
@@ -455,7 +479,7 @@ public class PhysicalDbGroup {
continue;
}
if (ds.isAlive() && (!checkSlaveSynStatus() || ds.canSelectAsReadNode())) {
if (ds.isAlive() && (!checkSlaveSynStatus(ds) || ds.canSelectAsReadNode(ds))) {
if (ds.getLogCount() != 0) {
ds.setLogCount(0);
}
@@ -562,7 +586,7 @@ public class PhysicalDbGroup {
newWriteHost.setReadInstance(false);
String oldWriteInstance = writeDbInstance.getName();
writeDbInstance = newWriteHost;
newWriteHost.start("switch master from " + oldWriteInstance + " to the instance", false);
newWriteHost.start("switch master from " + oldWriteInstance + " to the instance", false, false);
return this.getClusterHaJson();
} catch (Exception e) {
LOGGER.warn("switchMaster Exception ", e);
@@ -734,6 +758,19 @@ public class PhysicalDbGroup {
this.allSourceMap.put(dbInstance.getName(), dbInstance);
}
public AtomicLong getLogicTimestamp() {
return logicTimestamp;
}
public void setLogicTimestamp(AtomicLong logicTimestamp) {
this.logicTimestamp = logicTimestamp;
}
public boolean isDelayDetectionStart() {
return !Strings.isNullOrEmpty(dbGroupConfig.getDelayDatabase()) &&
dbGroupConfig.getDelayThreshold() > 0 && dbGroupConfig.getDelayPeriodMillis() > 0 && getDbGroupConfig().getWriteInstanceConfig().getDataBaseType() == DataBaseType.MYSQL;
}
public boolean equalsBaseInfo(PhysicalDbGroup pool) {
return pool.dbGroupConfig.equalsBaseInfo(this.dbGroupConfig) &&
pool.rwSplitMode == this.rwSplitMode &&
@@ -754,6 +791,13 @@ public class PhysicalDbGroup {
pool.getDbGroupConfig().getKeepAlive() == this.getDbGroupConfig().getKeepAlive();
}
public boolean equalsForDelayDetection(PhysicalDbGroup pool) {
return pool.getDbGroupConfig().getDelayThreshold() == (this.dbGroupConfig.getDelayThreshold()) &&
pool.getDbGroupConfig().getDelayPeriodMillis() == this.dbGroupConfig.getDelayPeriodMillis() &&
StringUtil.equals(pool.getDbGroupConfig().getDelayDatabase(), this.dbGroupConfig.getDelayDatabase()) &&
pool.getDbGroupConfig().getKeepAlive() == this.getDbGroupConfig().getKeepAlive();
}
public void copyBaseInfo(PhysicalDbGroup physicalDbGroup) {
this.dbGroupConfig = physicalDbGroup.dbGroupConfig;
@@ -780,4 +824,6 @@ public class PhysicalDbGroup {
", rwSplitUseless=" + rwSplitUseless +
'}';
}
}

View File

@@ -6,6 +6,8 @@
package com.actiontech.dble.backend.datasource;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.delyDetection.DelayDetection;
import com.actiontech.dble.backend.delyDetection.DelayDetectionStatus;
import com.actiontech.dble.backend.heartbeat.MySQLHeartbeat;
import com.actiontech.dble.backend.mysql.nio.handler.ConnectionHeartBeatHandler;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
@@ -53,6 +55,9 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
private volatile boolean fakeNode = false;
private final LongAdder readCount = new LongAdder();
private final LongAdder writeCount = new LongAdder();
private volatile DelayDetectionStatus delayDetectionStatus = DelayDetectionStatus.STOP;
protected DelayDetection delayDetection;
private final AtomicBoolean isInitial = new AtomicBoolean(false);
@@ -63,6 +68,7 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
private volatile boolean needSkipHeartTest = false;
private volatile int logCount;
public PhysicalDbInstance(DbInstanceConfig config, DbGroupConfig dbGroupConfig, boolean isReadNode) {
this.config = config;
this.name = config.getInstanceName();
@@ -81,7 +87,7 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
this.disabled = new AtomicBoolean(org.disabled.get());
}
public void init(String reason, boolean isInitHeartbeat) {
public void init(String reason, boolean isInitHeartbeat, boolean delayDetectionStart) {
if (disabled.get() || fakeNode) {
LOGGER.info("init dbInstance[{}] because {}, but it is disabled or a fakeNode, skip initialization.", this.dbGroup.getGroupName() + "." + name, reason);
return;
@@ -95,7 +101,10 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
checkPoolSize();
LOGGER.info("init dbInstance[{}]", this.dbGroup.getGroupName() + "." + name);
start(reason, isInitHeartbeat);
if (delayDetectionStart) {
delayDetection = new DelayDetection(this);
}
start(reason, isInitHeartbeat, delayDetectionStart);
}
protected void checkPoolSize() {
@@ -367,7 +376,14 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
return config;
}
boolean canSelectAsReadNode() {
boolean canSelectAsReadNode(PhysicalDbInstance ds) {
if (dbGroup.isDelayDetectionStart()) {
DelayDetectionStatus status = ds.getDelayDetectionStatus();
if (status == DelayDetectionStatus.ERROR || status == DelayDetectionStatus.TIMEOUT) {
return false;
}
return true;
}
Integer slaveBehindMaster = heartbeat.getSlaveBehindMaster();
int dbSynStatus = heartbeat.getDbSynStatus();
if (slaveBehindMaster == null || dbSynStatus == MySQLHeartbeat.DB_SYN_ERROR) {
@@ -387,11 +403,41 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
heartbeat.start(config.getPoolConfig().getHeartbeatPeriodMillis());
}
void start(String reason, boolean isStartHeartbeat) {
public void startDelayDetection() {
if (this.isDisabled() || this.isFakeNode()) {
LOGGER.info("the instance[{}] is disabled or fake node, skip to start delayDetection.", this.dbGroup.getGroupName() + "." + name);
return;
}
if (!dbGroup.isDelayDetectionStart()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("this instance does not require delay detection to be enabled");
}
return;
}
long initialDelay = 0;
if (readInstance) {
initialDelay = dbGroupConfig.getDelayPeriodMillis();
}
delayDetection.start(initialDelay);
}
public void start() {
if (this.isDisabled() || this.isFakeNode()) {
LOGGER.info("the instance[{}] is disabled or fake node, skip to start heartbeat.", this.dbGroup.getGroupName() + "." + name);
return;
}
heartbeat.start(config.getPoolConfig().getHeartbeatPeriodMillis());
}
void start(String reason, boolean isStartHeartbeat, boolean delayDetectionStart) {
startPool(reason);
if (isStartHeartbeat) {
startHeartbeat();
}
if (delayDetectionStart && dbGroup.isDelayDetectionStart()) {
startDelayDetection();
}
}
public void startPool(String reason) {
@@ -430,7 +476,7 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
}
public void stopDirectly(String reason, boolean closeFront, boolean isStopPool) {
stop(reason, closeFront, true, dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this || isStopPool);
stop(reason, closeFront, true, dbGroupConfig.getRwSplitMode() != RW_SPLIT_OFF || dbGroup.getWriteDbInstance() == this || isStopPool, true);
}
public void stop(String reason, boolean closeFront, boolean isStopPool) {
@@ -449,10 +495,13 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
stopDirectly(reason, closeFront, false);
}
protected void stop(String reason, boolean closeFront, boolean isStopHeartbeat, boolean isStopPool) {
protected void stop(String reason, boolean closeFront, boolean isStopHeartbeat, boolean isStopPool, boolean delayDetectionStop) {
if (isStopHeartbeat) {
stopHeartbeat(reason);
}
if (delayDetectionStop) {
stopDelayDetection(reason);
}
if (isStopPool) {
stopPool(reason, closeFront);
}
@@ -466,6 +515,15 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
heartbeat.stop(reason);
}
public void stopDelayDetection(String reason) {
if (LOGGER.isDebugEnabled()) {
ReloadLogHelper.debug("stop delayDetection :{},reason:{}", LOGGER, this.toString(), reason);
}
if (Objects.nonNull(delayDetection)) {
delayDetection.stop(reason);
}
}
public void stopPool(String reason, boolean closeFront) {
LOGGER.info("stop connection pool of physical db instance[{}], due to {}", this.dbGroup.getGroupName() + "." + name, reason);
connectionPool.stop(reason, closeFront);
@@ -512,7 +570,7 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
public boolean enable() {
if (disabled.compareAndSet(true, false)) {
start("execute manager cmd of enable", true);
start("execute manager cmd of enable", true, true);
return true;
}
return false;
@@ -554,6 +612,17 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
this.logCount = logCount;
}
public DelayDetectionStatus getDelayDetectionStatus() {
return delayDetectionStatus;
}
public void setDelayDetectionStatus(DelayDetectionStatus delayDetectionStatus) {
this.delayDetectionStatus = delayDetectionStatus;
}
public DelayDetection getDelayDetection() {
return delayDetection;
}
@Override
public boolean equals(Object o) {
@@ -569,6 +638,7 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
Objects.equals(disabled.get(), that.disabled.get());
}
@Override
public int hashCode() {
return Objects.hash(name, config, readInstance, dbGroupConfig, dbGroup, disabled, heartbeat);
@@ -599,6 +669,15 @@ public abstract class PhysicalDbInstance implements ReadTimeStatusInstance {
this.disabled.get() == dbInstance.isDisabled();
}
public boolean equalsForDelayDetection(PhysicalDbInstance dbInstance) {
return this.config.getUrl().equals(dbInstance.getConfig().getUrl()) &&
this.config.getPort() == dbInstance.getConfig().getPort() &&
this.config.getUser().equals(dbInstance.getConfig().getUser()) &&
this.config.getPassword().equals(dbInstance.getConfig().getPassword()) &&
this.config.isUsingDecrypt() == dbInstance.getConfig().isUsingDecrypt() &&
this.disabled.get() == dbInstance.isDisabled();
}
public boolean equalsForTestConn(PhysicalDbInstance dbInstance) {
return this.config.getUrl().equals(dbInstance.getConfig().getUrl()) &&
this.config.getPort() == dbInstance.getConfig().getPort() &&

View File

@@ -0,0 +1,295 @@
package com.actiontech.dble.backend.delyDetection;
import com.actiontech.dble.alarm.AlarmCode;
import com.actiontech.dble.alarm.Alert;
import com.actiontech.dble.alarm.AlertUtil;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.config.model.db.DbGroupConfig;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.singleton.Scheduler;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class DelayDetection {
public static final Logger LOGGER = LoggerFactory.getLogger(DelayDetection.class);
private boolean tableExists;
private volatile boolean stop = true;
private volatile BackendConnection conn;
private volatile ScheduledFuture scheduledFuture;
private final PhysicalDbInstance source;
private final DbGroupConfig dbGroupConfig;
private DelayDetectionStatus delayDetectionStatus;
private DelayDetectionTask delayDetectionTask;
private int delayThreshold;
private int delayPeriodMillis;
private AtomicLong version = new AtomicLong();
private AtomicBoolean isChecking = new AtomicBoolean();
private volatile LocalDateTime lastSendQryTime = LocalDateTime.now();
private volatile LocalDateTime lastReceivedQryTime = LocalDateTime.now();
private volatile long delayVal = 0;
private volatile int logicUpdate = 0;
//table source field
private String sourceName;
private String sqlTableName;
private String updateSQL;
private String selectSQL;
private String createTableSQL;
private String errorMessage;
public DelayDetection(PhysicalDbInstance source) {
this.source = source;
this.dbGroupConfig = source.getDbGroupConfig();
delayThreshold = dbGroupConfig.getDelayThreshold();
delayPeriodMillis = dbGroupConfig.getDelayPeriodMillis();
delayDetectionStatus = DelayDetectionStatus.INIT;
delayDetectionTask = new DelayDetectionTask(this);
synSql();
}
private void synSql() {
String[] str = {"dble", SystemConfig.getInstance().getInstanceName(), dbGroupConfig.getName()};
sourceName = Joiner.on("_").join(str);
String schema = dbGroupConfig.getDelayDatabase();
String tableName = ".u_delay ";
sqlTableName = schema + tableName;
StringBuilder select = new StringBuilder("select logic_timestamp from ? where source = '?'");
selectSQL = convert(select, Lists.newArrayList(sqlTableName, sourceName));
StringBuilder create = new StringBuilder("create table if not exists ? (source VARCHAR(256) primary key,real_timestamp varchar(26) NOT NULL,logic_timestamp BIGINT default 0)");
createTableSQL = convert(create, Lists.newArrayList(sqlTableName));
}
private String convert(StringBuilder template, List<String> list) {
StringBuilder sb = new StringBuilder(template);
String replace = "?";
for (String str : list) {
int index = sb.indexOf(replace);
sb.replace(index, index + 1, str);
}
return sb.toString();
}
public void start(long initialDelay) {
LOGGER.info("start delayDetection of instance[{}]", source);
if (Objects.nonNull(scheduledFuture)) {
stop("the legacy thread is not closed");
}
stop = false;
this.scheduledFuture = Scheduler.getInstance().getScheduledExecutor().scheduleAtFixedRate(() -> execute(),
initialDelay, delayPeriodMillis, TimeUnit.MILLISECONDS);
}
public void execute() {
if (isChecking.compareAndSet(false, true)) {
if (delayDetectionTask.isQuit()) {
delayDetectionTask = new DelayDetectionTask(this);
}
if (!source.isReadInstance()) {
StringBuilder update = new StringBuilder("replace into ? (source,real_timestamp,logic_timestamp) values ('?','?',?)");
List<String> strings = Lists.newArrayList(sqlTableName, sourceName, String.valueOf(LocalDateTime.now()), String.valueOf(source.getDbGroup().getLogicTimestamp().incrementAndGet()));
updateSQL = convert(update, strings);
}
delayDetectionTask.execute();
} else {
LocalDateTime result = lastReceivedQryTime;
if (lastSendQryTime.getNano() > lastReceivedQryTime.getNano()) {
result = lastSendQryTime;
}
Duration duration = Duration.between(result, LocalDateTime.now());
if (duration.toMillis() > delayThreshold) {
if (source.isReadInstance()) {
delayVal = -1;
}
errorMessage = "connection did not respond after the delayThreshold time was exceeded";
setResult(DelayDetectionStatus.TIMEOUT);
}
}
}
public void stop(String reason) {
LOGGER.info("stop delayDetection of instance[{}], due to {}", source, reason);
stop = true;
if (Objects.nonNull(scheduledFuture)) {
scheduledFuture.cancel(true);
delayDetectionStatus = DelayDetectionStatus.STOP;
source.setDelayDetectionStatus(delayDetectionStatus);
cancel(reason);
scheduledFuture = null;
}
}
public void setResult(DelayDetectionStatus result) {
isChecking.set(false);
switch (result) {
case OK:
setOk();
break;
case TIMEOUT:
setTimeout();
break;
case ERROR:
setError();
break;
default:
break;
}
errorMessage = null;
}
public void cancel(String reason) {
LOGGER.warn("delayDetection need cancel ,reason is {}", reason);
delayDetectionTask.close();
updateLastReceivedQryTime();
version.set(0);
final BackendConnection connection = conn;
if (Objects.nonNull(connection) && !connection.isClosed()) {
connection.businessClose(reason);
}
conn = null;
errorMessage = reason;
setResult(DelayDetectionStatus.ERROR);
}
public void delayCal(long delay) {
PhysicalDbGroup dbGroup = source.getDbGroup();
long logic = dbGroup.getLogicTimestamp().get();
long result = logic - delay;
DelayDetectionStatus writeDbStatus = dbGroup.getWriteDbInstance().getDelayDetectionStatus();
delayVal = result * delayPeriodMillis;
//writeDbStatus is error,salve was considered normal
if (writeDbStatus == DelayDetectionStatus.ERROR || writeDbStatus == DelayDetectionStatus.TIMEOUT) {
setResult(DelayDetectionStatus.OK);
return;
}
if (delayThreshold > delayVal) {
setResult(DelayDetectionStatus.OK);
} else {
errorMessage = "found MySQL master/slave Replication delay" + source.getConfig() + ",sync time delay: " + delayVal + " ms";
setResult(DelayDetectionStatus.TIMEOUT);
}
}
public LocalDateTime getLastReceivedQryTime() {
return lastReceivedQryTime;
}
public void delayDetectionRetry() {
execute();
}
public void updateLastSendQryTime() {
lastSendQryTime = LocalDateTime.now();
}
public void updateLastReceivedQryTime() {
lastReceivedQryTime = LocalDateTime.now();
}
private void setTimeout() {
LOGGER.warn("delayDetection to [" + source.getConfig().getUrl() + "] setTimeout");
delayDetectionStatus = DelayDetectionStatus.TIMEOUT;
source.setDelayDetectionStatus(delayDetectionStatus);
alert(AlarmCode.DB_SLAVE_INSTANCE_DELAY, errorMessage, dbGroupConfig.instanceDatabaseType().name());
}
private void setError() {
LOGGER.warn("delayDetection to [" + source.getConfig().getUrl() + "] setError");
delayDetectionStatus = DelayDetectionStatus.ERROR;
source.setDelayDetectionStatus(delayDetectionStatus);
if (!source.isReadInstance()) {
alert(AlarmCode.DB_MASTER_INSTANCE_DELAY_FAIL, "reason is " + errorMessage + "delayDetection status:" + delayDetectionStatus, dbGroupConfig.instanceDatabaseType().name());
}
}
private void setOk() {
LOGGER.debug("delayDetection to [" + source.getConfig().getUrl() + "] setOK");
delayDetectionStatus = DelayDetectionStatus.OK;
source.setDelayDetectionStatus(delayDetectionStatus);
}
private void alert(String alarmCode, String errMsg, String databaseType) {
String alertKey = source.getDbGroupConfig().getName() + "-" + source.getConfig().getInstanceName();
Map<String, String> labels = AlertUtil.genSingleLabel("dbInstance", alertKey);
AlertUtil.alert(alarmCode, Alert.AlertLevel.WARN, errMsg, databaseType, source.getConfig().getId(), labels);
}
public PhysicalDbInstance getSource() {
return source;
}
public BackendConnection getConn() {
return conn;
}
public void setConn(BackendConnection conn) {
this.conn = conn;
}
public AtomicLong getVersion() {
return version;
}
public void setVersion(AtomicLong version) {
this.version = version;
}
public String getUpdateSQL() {
return updateSQL;
}
public String getSelectSQL() {
return selectSQL;
}
public boolean isStop() {
return stop;
}
public boolean isTableExists() {
return tableExists;
}
public void setTableExists(boolean tableExists) {
this.tableExists = tableExists;
}
public String getCreateTableSQL() {
return createTableSQL;
}
public long getDelayVal() {
return delayVal;
}
public String getErrorMessage() {
return errorMessage;
}
public int getLogicUpdate() {
return logicUpdate;
}
public void setLogicUpdate(int logicUpdate) {
this.logicUpdate = logicUpdate;
}
}

View File

@@ -0,0 +1,167 @@
package com.actiontech.dble.backend.delyDetection;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.mysql.ErrorPacket;
import com.actiontech.dble.net.mysql.FieldPacket;
import com.actiontech.dble.net.mysql.RowDataPacket;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler;
import com.actiontech.dble.sqlengine.SQLJobHandler;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
public class DelayDetectionSqlJob implements ResponseHandler {
public static final Logger LOGGER = LoggerFactory.getLogger(DelayDetectionSqlJob.class);
private final DelayDetection delayDetection;
private final SQLJobHandler jobHandler;
private String sql;
private long versionVal;
private AtomicBoolean finished = new AtomicBoolean(false);
private LocalDateTime responseTime;
private long keepAlive;
public DelayDetectionSqlJob(DelayDetection delayDetection, OneRawSQLQueryResultHandler jobHandler) {
this.delayDetection = delayDetection;
this.jobHandler = jobHandler;
int delayPeriodMillis = delayDetection.getSource().getDbGroupConfig().getDelayPeriodMillis();
this.keepAlive = delayDetection.getSource().getDbGroupConfig().getKeepAlive() + delayPeriodMillis;
sql = delayDetection.getSelectSQL();
updateResponseTime();
}
public void execute() {
updateResponseTime();
finished.set(false);
if (!delayDetection.isTableExists()) {
sql = delayDetection.getCreateTableSQL();
} else if (delayDetection.getSource().isReadInstance()) {
sql = delayDetection.getSelectSQL();
} else {
sql = delayDetection.getUpdateSQL();
}
BackendConnection conn = delayDetection.getConn();
if (Objects.isNull(conn)) {
LOGGER.warn("[delayDetection]connection establishment timeout,please pay attention to network latency or packet loss.");
delayDetection.cancel("connection establishment timeout");
doFinished(true);
return;
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[delayDetection]do delayDetection,conn is " + conn);
}
try {
LocalDateTime now = LocalDateTime.now();
Duration duration = Duration.between(responseTime, now);
if (duration.toMillis() > keepAlive) {
LOGGER.warn("[delayDetection]connection execution timeout {},please pay attention to network latency or packet loss.", duration.toMillis());
delayDetection.cancel("connection execution timeout");
doFinished(true);
}
conn.getBackendService().query(sql);
} catch (Exception e) {
LOGGER.warn("[delayDetection]send delayDetection error", e);
delayDetection.cancel("send delayDetection error, because of [" + e.getMessage() + "]");
doFinished(true);
}
}
public void setVersionVal(long versionVal) {
this.versionVal = versionVal;
}
private void doFinished(boolean failed) {
if (finished.compareAndSet(false, true)) {
jobHandler.finished(null, failed);
}
}
private void updateResponseTime() {
responseTime = LocalDateTime.now();
}
@Override
public void connectionError(Throwable e, Object attachment) {
LOGGER.warn("[delayDetection]can't get connection for sql :" + sql, e);
updateResponseTime();
delayDetection.cancel("delayDetection connection Error");
doFinished(true);
}
@Override
public void connectionAcquired(BackendConnection connection) {
if (delayDetection.getVersion().get() == versionVal) {
updateResponseTime();
connection.getBackendService().setResponseHandler(this);
connection.getBackendService().setComplexQuery(true);
delayDetection.setConn(connection);
execute();
}
}
@Override
public void errorResponse(byte[] err, @NotNull AbstractService service) {
ErrorPacket errPg = new ErrorPacket();
errPg.read(err);
MySQLResponseService responseService = (MySQLResponseService) service;
LOGGER.warn("[delayDetection]error response errNo: {}, {} from of sql: {} at con: {} db user = {}",
errPg.getErrNo(), new String(errPg.getMessage()), sql, service,
responseService.getConnection().getInstance().getConfig().getUser());
updateResponseTime();
delayDetection.cancel(new String(errPg.getMessage()));
if (!((MySQLResponseService) service).syncAndExecute()) {
service.getConnection().businessClose("[delayDetection]unfinished sync");
doFinished(true);
return;
}
doFinished(true);
}
@Override
public void okResponse(byte[] ok, @NotNull AbstractService service) {
updateResponseTime();
if (((MySQLResponseService) service).syncAndExecute()) {
doFinished(false);
}
}
@Override
public void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacket> fieldPackets, byte[] eof, boolean isLeft, @NotNull AbstractService service) {
updateResponseTime();
jobHandler.onHeader(fields);
}
@Override
public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, @NotNull AbstractService service) {
updateResponseTime();
jobHandler.onRowData(row);
return false;
}
@Override
public void rowEofResponse(byte[] eof, boolean isLeft, @NotNull AbstractService service) {
updateResponseTime();
doFinished(false);
}
@Override
public void connectionClose(@NotNull AbstractService service, String reason) {
updateResponseTime();
LOGGER.warn("[delayDetection]conn for sql[" + sql + "] is closed, due to " + reason + ", we will try again immediately");
delayDetection.cancel("delayDetection conn for sql[" + sql + "] is closed, due to " + reason);
delayDetection.delayDetectionRetry();
doFinished(true);
}
}

View File

@@ -0,0 +1,11 @@
package com.actiontech.dble.backend.delyDetection;
public enum DelayDetectionStatus {
INIT(), OK(), TIMEOUT(), ERROR(), STOP();
@Override
public String toString() {
return super.toString().toLowerCase();
}
}

View File

@@ -0,0 +1,75 @@
package com.actiontech.dble.backend.delyDetection;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.sqlengine.OneRawSQLQueryResultHandler;
import com.actiontech.dble.sqlengine.SQLQueryResult;
import com.actiontech.dble.sqlengine.SQLQueryResultListener;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
public class DelayDetectionTask {
private DelayDetection delayDetection;
private DelayDetectionSqlJob sqlJob;
private static final String LOGIC_TIMESTAMP = "logic_timestamp";
private AtomicBoolean quit = new AtomicBoolean(false);
private static final String[] MYSQL_DELAY_DETECTION_COLS = new String[]{
"logic_timestamp",
};
public DelayDetectionTask(DelayDetection delayDetection) {
this.delayDetection = delayDetection;
}
public void execute() {
delayDetection.updateLastSendQryTime();
if (Objects.isNull(sqlJob)) {
String[] fetchCols = {};
if (delayDetection.getSource().isReadInstance()) {
fetchCols = MYSQL_DELAY_DETECTION_COLS;
}
OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(fetchCols, new DelayDetectionListener());
sqlJob = new DelayDetectionSqlJob(delayDetection, resultHandler);
sqlJob.setVersionVal(delayDetection.getVersion().incrementAndGet());
delayDetection.getSource().createConnectionSkipPool(null, sqlJob);
} else {
sqlJob.execute();
}
}
public boolean isQuit() {
return quit.get();
}
public void close() {
if (quit.compareAndSet(false, true)) {
sqlJob = null;
}
}
private class DelayDetectionListener implements SQLQueryResultListener<SQLQueryResult<Map<String, String>>> {
@Override
public void onResult(SQLQueryResult<Map<String, String>> result) {
delayDetection.updateLastReceivedQryTime();
if (!result.isSuccess()) {
return;
}
PhysicalDbInstance source = delayDetection.getSource();
if (source.isReadInstance()) {
Map<String, String> resultResult = result.getResult();
String logicTimestamp = Optional.ofNullable(resultResult.get(LOGIC_TIMESTAMP)).orElse(String.valueOf(source.getDbGroup().getLogicTimestamp().get()));
long logic = Long.parseLong(logicTimestamp);
delayDetection.delayCal(logic);
} else {
delayDetection.setResult(DelayDetectionStatus.OK);
}
//after the CREATE statement is successfully executed, the update statement needs to be executed
if (!delayDetection.isTableExists()) {
delayDetection.setTableExists(true);
delayDetection.execute();
}
}
}
}

View File

@@ -95,7 +95,7 @@ public class MySQLDetector implements SQLQueryResultListener<SQLQueryResult<Map<
if (result.isSuccess()) {
PhysicalDbInstance source = heartbeat.getSource();
Map<String, String> resultResult = result.getResult();
if (source.getDbGroupConfig().isShowSlaveSql()) {
if (source.getDbGroupConfig().isShowSlaveSql() && !source.getDbGroup().isDelayDetectionStart()) {
setStatusBySlave(source, resultResult);
} else if (source.getDbGroupConfig().isSelectReadOnlySql()) {
setStatusByReadOnly(source, resultResult);
@@ -190,7 +190,7 @@ public class MySQLDetector implements SQLQueryResultListener<SQLQueryResult<Map<
String secondsBehindMaster = resultResult.get("Seconds_Behind_Master");
if (null != secondsBehindMaster && !"".equals(secondsBehindMaster) && !"NULL".equalsIgnoreCase(secondsBehindMaster)) {
int behindMaster = Integer.parseInt(secondsBehindMaster);
int delayThreshold = source.getDbGroupConfig().getDelayThreshold();
int delayThreshold = source.getDbGroupConfig().getDelayThreshold() / 1000;
if (delayThreshold > 0) {
String alertKey = source.getDbGroupConfig().getName() + "-" + source.getConfig().getInstanceName();
if (behindMaster > delayThreshold) {

View File

@@ -231,7 +231,8 @@ public class MySQLHeartbeat {
AlertUtil.alertResolve(AlarmCode.HEARTBEAT_FAIL, Alert.AlertLevel.WARN, "mysql", this.source.getConfig().getId(), labels);
}
//after the heartbeat changes from failure to success, it needs to be expanded immediately
if (source.getTotalConnections() == 0) {
if (source.getTotalConnections() == 0 && !status.equals(MySQLHeartbeatStatus.INIT) && !status.equals(MySQLHeartbeatStatus.OK)) {
LOGGER.debug("[updatePoolCapacity] heartbeat to [{}] setOk, previous status is {}", source, status);
source.updatePoolCapacity();
}
if (isStop) {

View File

@@ -80,9 +80,12 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
}
try {
ConnectionPoolProvider.getConnGetFrenshLocekAfter();
int waiting = waiters.get();
for (PooledConnection conn : allConnections) {
if (conn.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
newPooledEntry(schema, waiters.get());
if (waiting > 0 && conn.getCreateByWaiter().compareAndSet(true, false)) {
newPooledEntry(schema, waiting, true);
}
return conn;
}
}
@@ -100,11 +103,12 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
try {
final int waiting = waiterNum;
ConnectionPoolProvider.getConnGetFrenshLocekAfter();
ConnectionPoolProvider.borrowConnectionBefore();
for (PooledConnection conn : allConnections) {
if (conn.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
if (waiting > 1) {
newPooledEntry(schema, waiting - 1);
if (waiting > 0 && conn.getCreateByWaiter().compareAndSet(true, false)) {
newPooledEntry(schema, waiting, true);
}
return conn;
}
@@ -112,14 +116,18 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
waiterNum = waiters.incrementAndGet();
try {
newPooledEntry(schema, waiterNum);
newPooledEntry(schema, waiterNum, true);
ConnectionPoolProvider.newConnectionAfter();
timeout = timeUnit.toNanos(timeout);
do {
final long start = System.nanoTime();
final PooledConnection bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
if (bagEntry != null) {
bagEntry.getCreateByWaiter().set(false);
}
return bagEntry;
}
@@ -135,7 +143,7 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
}
}
private void newPooledEntry(final String schema, final int waiting) {
private void newPooledEntry(final String schema, final int waiting, boolean createByWaiter) {
if (instance.isDisabled() || isClosed.get()) {
return;
}
@@ -144,7 +152,7 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
Map<String, String> labels = AlertUtil.genSingleLabel("dbInstance", alertKey);
if (waiting > 0) {
if (totalConnections.incrementAndGet() <= config.getMaxCon()) {
newConnection(schema, ConnectionPool.this);
newConnection(schema, ConnectionPool.this, createByWaiter);
if (ToResolveContainer.REACH_MAX_CON.contains(alertKey)) {
AlertUtil.alertResolve(AlarmCode.REACH_MAX_CON, Alert.AlertLevel.WARN, "dble", config.getId(), labels,
ToResolveContainer.REACH_MAX_CON, alertKey);
@@ -194,7 +202,7 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
}
for (int i = 0; i < connectionsToAdd; i++) {
// newPooledEntry(schemas[i % schemas.length]);
newPooledEntry(null, 1);
newPooledEntry(null, 1, false);
}
}
@@ -208,6 +216,8 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
return;
}
LOGGER.debug("connection create success: createByWaiter:{},new connection:{}", conn.getCreateByWaiter().get(), conn);
conn.lazySet(STATE_NOT_IN_USE);
// spin until a thread takes it or none are waiting
while (waiters.get() > 0 && conn.getState() == STATE_NOT_IN_USE && !handoffQueue.offer(conn)) {
@@ -224,6 +234,9 @@ public class ConnectionPool extends PoolBase implements PooledConnectionListener
@Override
public void onCreateFail(PooledConnection conn, Throwable e) {
if (conn == null || conn.getIsCreateFail().compareAndSet(false, true)) {
if (conn != null) {
LOGGER.debug("connection create fail: createByWaiter:{},new connection:{}", conn.getCreateByWaiter().get(), conn);
}
LOGGER.warn("create connection fail " + e.getMessage());
totalConnections.decrementAndGet();
// conn can be null if newChannel crashed (eg SocketException("too many open files"))

View File

@@ -7,6 +7,7 @@ package com.actiontech.dble.backend.pool;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import com.actiontech.dble.config.model.db.DbInstanceConfig;
import com.actiontech.dble.net.connection.PooledConnection;
import com.actiontech.dble.net.factory.PooledConnectionFactory;
import java.io.IOException;
@@ -36,9 +37,10 @@ public class PoolBase {
}
}
void newConnection(String schema, PooledConnectionListener listener) {
void newConnection(String schema, PooledConnectionListener listener, boolean createByWaiter) {
try {
factory.make(instance, listener, schema);
PooledConnection pooledConnection = factory.make(instance, listener, schema);
pooledConnection.getCreateByWaiter().set(createByWaiter);
} catch (IOException ioe) {
listener.onCreateFail(null, ioe);
}

View File

@@ -22,5 +22,13 @@ public final class ConnectionPoolProvider {
}
public static void newConnectionAfter() {
}
public static void borrowConnectionBefore() {
}
}

View File

@@ -30,6 +30,12 @@ public class DBGroup implements Named {
@XmlAttribute
protected String disableHA;
@XmlAttribute
protected Integer delayPeriodMillis;
@XmlAttribute
protected String delayDatabase;
protected HeartBeat heartbeat;
protected List<DBInstance> dbInstance;
@@ -106,6 +112,21 @@ public class DBGroup implements Named {
this.disableHA = disableHA;
}
public Integer getDelayPeriodMillis() {
return delayPeriodMillis;
}
public void setDelayPeriodMillis(Integer delayPeriodMillis) {
this.delayPeriodMillis = delayPeriodMillis;
}
public String getDelayDatabase() {
return delayDatabase;
}
public void setDelayDatabase(String delayDatabase) {
this.delayDatabase = delayDatabase;
}
@Override
public String toString() {
@@ -115,6 +136,10 @@ public class DBGroup implements Named {
name +
", delayThreshold=" +
delayThreshold +
", delayPeriodMillis=" +
delayPeriodMillis +
", delayDatabase=" +
delayDatabase +
", disableHA=" +
disableHA +
", heartbeat=" +

View File

@@ -41,7 +41,7 @@ import com.actiontech.dble.services.manager.response.ChangeType;
import com.actiontech.dble.singleton.*;
import com.actiontech.dble.util.StringUtil;
import com.actiontech.dble.util.TimeUtil;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -489,7 +489,7 @@ public class ServerConfig {
private void initDbGroupByMap(List<ChangeItem> changeItemList, Map<String, PhysicalDbGroup> oldDbGroupMap, Map<String, ShardingNode> newShardingNodes,
boolean isFullyConfigured, int loadAllMode) {
List<PhysicalDbGroup> updateDbGroupList = Lists.newArrayList();
Map<ChangeItem, PhysicalDbGroup> updateDbGroupMap = Maps.newHashMap();
for (ChangeItem changeItem : changeItemList) {
Object item = changeItem.getItem();
ChangeItemType itemType = changeItem.getItemType();
@@ -498,7 +498,7 @@ public class ServerConfig {
addItem(item, itemType, oldDbGroupMap, newShardingNodes, isFullyConfigured);
break;
case UPDATE:
updateItem(item, itemType, oldDbGroupMap, newShardingNodes, changeItem, updateDbGroupList, loadAllMode);
updateItem(item, itemType, oldDbGroupMap, newShardingNodes, changeItem, updateDbGroupMap, loadAllMode);
break;
case DELETE:
deleteItem(item, itemType, oldDbGroupMap, loadAllMode);
@@ -507,9 +507,14 @@ public class ServerConfig {
break;
}
}
for (PhysicalDbGroup physicalDbGroup : updateDbGroupList) {
physicalDbGroup.startHeartbeat();
}
updateDbGroupMap.forEach((changeItem, dbGroup) -> {
if (changeItem.isAffectHeartbeat()) {
dbGroup.startHeartbeat();
}
if (changeItem.isAffectDelayDetection() && dbGroup.isDelayDetectionStart()) {
dbGroup.startDelayDetection();
}
});
}
private void deleteItem(Object item, ChangeItemType itemType, Map<String, PhysicalDbGroup> oldDbGroupMap, int loadAllMode) {
@@ -536,17 +541,28 @@ public class ServerConfig {
}
private void updateItem(Object item, ChangeItemType itemType, Map<String, PhysicalDbGroup> oldDbGroupMap, Map<String, ShardingNode> newShardingNodes, ChangeItem changeItem,
List<PhysicalDbGroup> updateDbGroupList, int loadAllMode) {
Map<ChangeItem, PhysicalDbGroup> updateDbGroupMap, int loadAllMode) {
if (itemType == ChangeItemType.PHYSICAL_DB_GROUP) {
//change dbGroup
PhysicalDbGroup physicalDbGroup = (PhysicalDbGroup) item;
PhysicalDbGroup oldDbGroup = oldDbGroupMap.get(physicalDbGroup.getGroupName());
boolean dbGroupCopy = false;
if (changeItem.isAffectHeartbeat()) {
oldDbGroup.stopHeartbeat("reload config, stop group heartbeat");
oldDbGroup.copyBaseInfo(physicalDbGroup);
dbGroupCopy = true;
//create a new heartbeat in the follow-up
updateDbGroupList.add(oldDbGroup);
} else {
updateDbGroupMap.put(changeItem, oldDbGroup);
}
if (changeItem.isAffectDelayDetection()) {
oldDbGroup.stopDelayDetection("reload config, stop group delayDetection");
if (!dbGroupCopy) {
oldDbGroup.copyBaseInfo(physicalDbGroup);
dbGroupCopy = true;
}
updateDbGroupMap.put(changeItem, oldDbGroup);
}
if (!dbGroupCopy) {
oldDbGroup.copyBaseInfo(physicalDbGroup);
}
reloadSchema(oldDbGroup, newShardingNodes);
@@ -564,15 +580,15 @@ public class ServerConfig {
}
oldDbGroupMap.put(physicalDbGroup.getGroupName(), oldDbGroup);
} else if (itemType == ChangeItemType.PHYSICAL_DB_INSTANCE) {
if (changeItem.isAffectHeartbeat() || changeItem.isAffectConnectionPool()) {
if (changeItem.isAffectHeartbeat() || changeItem.isAffectConnectionPool() || changeItem.isAffectDelayDetection()) {
PhysicalDbInstance physicalDbInstance = (PhysicalDbInstance) item;
PhysicalDbGroup physicalDbGroup = oldDbGroupMap.get(physicalDbInstance.getDbGroupConfig().getName());
PhysicalDbInstance oldDbInstance = physicalDbGroup.getAllDbInstanceMap().get(physicalDbInstance.getName());
oldDbInstance.stop("reload config, recycle old instance", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0), true);
oldDbInstance.stopDirectly("reload config, recycle old instance", ((loadAllMode & ManagerParseConfig.OPTF_MODE) != 0), true);
oldDbInstance = null;
removeDbInstance(physicalDbGroup, physicalDbInstance.getName());
physicalDbGroup.setDbInstance(physicalDbInstance);
physicalDbInstance.init("reload config", true);
physicalDbInstance.init("reload config", true, true);
} else {
PhysicalDbInstance physicalDbInstance = (PhysicalDbInstance) item;
PhysicalDbGroup physicalDbGroup = oldDbGroupMap.get(physicalDbInstance.getDbGroupConfig().getName());
@@ -620,7 +636,7 @@ public class ServerConfig {
PhysicalDbGroup physicalDbGroup = oldDbGroupMap.get(dbInstance.getDbGroupConfig().getName());
if (isFullyConfigured) {
physicalDbGroup.setDbInstance(dbInstance);
dbInstance.init("reload config", true);
dbInstance.init("reload config", true, true);
} else {
LOGGER.info("dbGroup[" + dbInstance.getDbGroupConfig().getName() + "] is not fullyConfigured, so doing nothing");
}

View File

@@ -148,9 +148,13 @@ public class DBConverter {
throw new ConfigException("dbGroup[" + dbGroupName + "]'s child database type must be consistent");
}
}
int delayPeriodMillis = Optional.ofNullable(dbGroup.getDelayPeriodMillis()).orElse(-1);
String delayDatabase = dbGroup.getDelayDatabase();
DbGroupConfig dbGroupConf = new DbGroupConfig(dbGroupName, writeDbConf, readInstanceConfigList, delayThreshold, disableHA);
dbGroupConf.setRwSplitMode(rwSplitMode);
dbGroupConf.setHeartbeatSQL(heartbeatSQL);
dbGroupConf.setDelayDatabase(delayDatabase);
dbGroupConf.setDelayPeriodMillis(delayPeriodMillis);
int heartbeatTimeout = Optional.ofNullable(heartbeat.getTimeout()).orElse(0);
dbGroupConf.setHeartbeatTimeout(heartbeatTimeout * 1000);
int heartbeatErrorRetryCount = Optional.ofNullable(heartbeat.getErrorRetryCount()).orElse(1);

View File

@@ -26,6 +26,9 @@ public class DbGroupConfig {
private boolean isShowSlaveSql = false;
private boolean isSelectReadOnlySql = false;
private int delayThreshold;
private int delayPeriodMillis;
private String delayDatabase;
private int heartbeatTimeout = 0;
private int errorRetryCount = 1;
@@ -156,6 +159,22 @@ public class DbGroupConfig {
return writeInstanceConfig.getDataBaseType();
}
public int getDelayPeriodMillis() {
return delayPeriodMillis;
}
public void setDelayPeriodMillis(int delayPeriodMillis) {
this.delayPeriodMillis = delayPeriodMillis;
}
public String getDelayDatabase() {
return delayDatabase;
}
public void setDelayDatabase(String delayDatabase) {
this.delayDatabase = delayDatabase;
}
public boolean equalsBaseInfo(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
@@ -169,6 +188,8 @@ public class DbGroupConfig {
errorRetryCount == that.errorRetryCount &&
keepAlive == that.keepAlive &&
disableHA == that.disableHA &&
delayPeriodMillis == that.delayPeriodMillis &&
delayDatabase == that.delayDatabase &&
Objects.equals(name, that.name) &&
Objects.equals(heartbeatSQL, that.heartbeatSQL);
}
@@ -184,6 +205,8 @@ public class DbGroupConfig {
", isShowSlaveSql=" + isShowSlaveSql +
", isSelectReadOnlySql=" + isSelectReadOnlySql +
", delayThreshold=" + delayThreshold +
", delayPeriodMillis=" + delayPeriodMillis +
", delayDatabase=" + delayDatabase +
", heartbeatTimeout=" + heartbeatTimeout +
", errorRetryCount=" + errorRetryCount +
", keepAlive=" + keepAlive +

View File

@@ -34,6 +34,8 @@ public abstract class PooledConnection extends AbstractConnection {
public static final Comparator<PooledConnection> LAST_ACCESS_COMPARABLE;
private AtomicBoolean createByWaiter = new AtomicBoolean(false);
static {
LAST_ACCESS_COMPARABLE = Comparator.comparingLong(entryOne -> entryOne.lastTime);
}
@@ -122,4 +124,8 @@ public abstract class PooledConnection extends AbstractConnection {
public AtomicBoolean getIsCreateFail() {
return isCreateFail;
}
public AtomicBoolean getCreateByWaiter() {
return createByWaiter;
}
}

View File

@@ -14,6 +14,8 @@ import com.actiontech.dble.plan.common.item.Item;
import com.actiontech.dble.plan.common.item.function.bitfunc.ItemFuncBitCount;
import com.actiontech.dble.plan.common.item.function.castfunc.*;
import com.actiontech.dble.plan.common.item.function.convertfunc.*;
import com.actiontech.dble.plan.common.item.function.jsonfunc.ItemFuncJsonExtract;
import com.actiontech.dble.plan.common.item.function.jsonfunc.ItemFuncJsonUnQuote;
import com.actiontech.dble.plan.common.item.function.mathsfunc.*;
import com.actiontech.dble.plan.common.item.function.operator.cmpfunc.*;
import com.actiontech.dble.plan.common.item.function.operator.controlfunc.ItemFuncIfnull;

View File

@@ -0,0 +1,561 @@
/*
* Copyright (C) 2016-2022 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.plan.common.item.function.jsonfunc;
import java.io.Closeable;
import java.io.Flushable;
import java.io.IOException;
import java.io.Writer;
import java.util.Arrays;
import static com.actiontech.dble.plan.common.item.function.jsonfunc.JsonScope.*;
/**
* migrate from com.google.gson.stream.JsonWriter class in package gson 2.8.9
* for issue inner-1940
* <p>
* migrate reason: "keep the same format as mysql"
* improve: add space for ":"&","
*/
public class CustomJsonWriter implements Closeable, Flushable {
/*
* From RFC 7159, "All Unicode characters may be placed within the
* quotation marks except for the characters that must be escaped:
* quotation mark, reverse solidus, and the control characters
* (U+0000 through U+001F)."
*
* We also escape '\u2028' and '\u2029', which JavaScript interprets as
* newline characters. This prevents eval() from failing with a syntax
* error. http://code.google.com/p/google-gson/issues/detail?id=341
*/
private static final String[] REPLACEMENT_CHARS;
private static final String[] HTML_SAFE_REPLACEMENT_CHARS;
static {
REPLACEMENT_CHARS = new String[128];
for (int i = 0; i <= 0x1f; i++) {
REPLACEMENT_CHARS[i] = String.format("\\u%04x", (int) i);
}
REPLACEMENT_CHARS['"'] = "\\\"";
REPLACEMENT_CHARS['\\'] = "\\\\";
REPLACEMENT_CHARS['\t'] = "\\t";
REPLACEMENT_CHARS['\b'] = "\\b";
REPLACEMENT_CHARS['\n'] = "\\n";
REPLACEMENT_CHARS['\r'] = "\\r";
REPLACEMENT_CHARS['\f'] = "\\f";
HTML_SAFE_REPLACEMENT_CHARS = REPLACEMENT_CHARS.clone();
HTML_SAFE_REPLACEMENT_CHARS['<'] = "\\u003c";
HTML_SAFE_REPLACEMENT_CHARS['>'] = "\\u003e";
HTML_SAFE_REPLACEMENT_CHARS['&'] = "\\u0026";
HTML_SAFE_REPLACEMENT_CHARS['='] = "\\u003d";
HTML_SAFE_REPLACEMENT_CHARS['\''] = "\\u0027";
}
/**
* The output data, containing at most one top-level array or object.
*/
private final Writer out;
private int[] stack = new int[32];
private int stackSize = 0;
{
push(EMPTY_DOCUMENT);
}
/**
* A string containing a full set of spaces for a single level of
* indentation, or null for no pretty printing.
*/
private String indent;
/**
* The name/value separator; either ":" or ": ".
*/
private String separator = ": ";
private boolean lenient;
private boolean htmlSafe;
private String deferredName;
private boolean serializeNulls = true;
/**
* Creates a new instance that writes a JSON-encoded stream to {@code out}.
* For best performance, ensure {@link Writer} is buffered; wrapping in
* {@link java.io.BufferedWriter BufferedWriter} if necessary.
*/
public CustomJsonWriter(Writer out) {
if (out == null) {
throw new NullPointerException("out == null");
}
this.out = out;
}
/**
* Sets the indentation string to be repeated for each level of indentation
* in the encoded document. If {@code indent.isEmpty()} the encoded document
* will be compact. Otherwise the encoded document will be more
* human-readable.
*
* @param indent a string containing only whitespace.
*/
public final void setIndent(String indent) {
if (indent.length() == 0) {
this.indent = null;
this.separator = ":";
} else {
this.indent = indent;
this.separator = ": ";
}
}
/**
* Configure this writer to relax its syntax rules. By default, this writer
* only emits well-formed JSON as specified by <a
* href="http://www.ietf.org/rfc/rfc7159.txt">RFC 7159</a>. Setting the writer
* to lenient permits the following:
* <ul>
* <li>Top-level values of any type. With strict writing, the top-level
* value must be an object or an array.
* <li>Numbers may be {@link Double#isNaN() NaNs} or {@link
* Double#isInfinite() infinities}.
* </ul>
*/
public final void setLenient(boolean lenient) {
this.lenient = lenient;
}
/**
* Returns true if this writer has relaxed syntax rules.
*/
public boolean isLenient() {
return lenient;
}
/**
* Configure this writer to emit JSON that's safe for direct inclusion in HTML
* and XML documents. This escapes the HTML characters {@code <}, {@code >},
* {@code &} and {@code =} before writing them to the stream. Without this
* setting, your XML/HTML encoder should replace these characters with the
* corresponding escape sequences.
*/
public final void setHtmlSafe(boolean htmlSafe) {
this.htmlSafe = htmlSafe;
}
/**
* Returns true if this writer writes JSON that's safe for inclusion in HTML
* and XML documents.
*/
public final boolean isHtmlSafe() {
return htmlSafe;
}
/**
* Sets whether object members are serialized when their value is null.
* This has no impact on array elements. The default is true.
*/
public final void setSerializeNulls(boolean serializeNulls) {
this.serializeNulls = serializeNulls;
}
/**
* Returns true if object members are serialized when their value is null.
* This has no impact on array elements. The default is true.
*/
public final boolean getSerializeNulls() {
return serializeNulls;
}
/**
* Begins encoding a new array. Each call to this method must be paired with
* a call to {@link #endArray}.
*
* @return this writer.
*/
public CustomJsonWriter beginArray() throws IOException {
writeDeferredName();
return open(EMPTY_ARRAY, '[');
}
/**
* Ends encoding the current array.
*
* @return this writer.
*/
public CustomJsonWriter endArray() throws IOException {
return close(EMPTY_ARRAY, NONEMPTY_ARRAY, ']');
}
/**
* Begins encoding a new object. Each call to this method must be paired
* with a call to {@link #endObject}.
*
* @return this writer.
*/
public CustomJsonWriter beginObject() throws IOException {
writeDeferredName();
return open(EMPTY_OBJECT, '{');
}
/**
* Ends encoding the current object.
*
* @return this writer.
*/
public CustomJsonWriter endObject() throws IOException {
return close(EMPTY_OBJECT, NONEMPTY_OBJECT, '}');
}
/**
* Enters a new scope by appending any necessary whitespace and the given
* bracket.
*/
private CustomJsonWriter open(int empty, char openBracket) throws IOException {
beforeValue();
push(empty);
out.write(openBracket);
return this;
}
/**
* Flushes and closes this writer and the underlying {@link Writer}.
*
* @throws IOException if the JSON document is incomplete.
*/
public void close() throws IOException {
out.close();
int size = stackSize;
if (size > 1 || size == 1 && stack[size - 1] != NONEMPTY_DOCUMENT) {
throw new IOException("Incomplete document");
}
stackSize = 0;
}
/**
* Closes the current scope by appending any necessary whitespace and the
* given bracket.
*/
private CustomJsonWriter close(int empty, int nonempty, char closeBracket)
throws IOException {
int context = peek();
if (context != nonempty && context != empty) {
throw new IllegalStateException("Nesting problem.");
}
if (deferredName != null) {
throw new IllegalStateException("Dangling name: " + deferredName);
}
stackSize--;
if (context == nonempty) {
newline();
}
out.write(closeBracket);
return this;
}
private void push(int newTop) {
if (stackSize == stack.length) {
stack = Arrays.copyOf(stack, stackSize * 2);
}
stack[stackSize++] = newTop;
}
/**
* Returns the value on the top of the stack.
*/
private int peek() {
if (stackSize == 0) {
throw new IllegalStateException("JsonWriter is closed.");
}
return stack[stackSize - 1];
}
/**
* Replace the value on the top of the stack with the given value.
*/
private void replaceTop(int topOfStack) {
stack[stackSize - 1] = topOfStack;
}
/**
* Encodes the property name.
*
* @param name the name of the forthcoming value. May not be null.
* @return this writer.
*/
public CustomJsonWriter name(String name) throws IOException {
if (name == null) {
throw new NullPointerException("name == null");
}
if (deferredName != null) {
throw new IllegalStateException();
}
if (stackSize == 0) {
throw new IllegalStateException("JsonWriter is closed.");
}
deferredName = name;
return this;
}
private void writeDeferredName() throws IOException {
if (deferredName != null) {
beforeName();
string(deferredName);
deferredName = null;
}
}
/**
* Writes {@code value} directly to the writer without quoting or
* escaping.
*
* @param value the literal string value, or null to encode a null literal.
* @return this writer.
*/
public CustomJsonWriter jsonValue(String value) throws IOException {
if (value == null) {
return nullValue();
}
writeDeferredName();
beforeValue();
out.append(value);
return this;
}
/**
* Encodes {@code null}.
*
* @return this writer.
*/
public CustomJsonWriter nullValue() throws IOException {
if (deferredName != null) {
if (serializeNulls) {
writeDeferredName();
} else {
deferredName = null;
return this; // skip the name and the value
}
}
beforeValue();
out.write("null");
return this;
}
/**
* Encodes {@code value}.
*
* @param value the literal string value, or null to encode a null literal.
* @return this writer.
*/
public CustomJsonWriter value(String value) throws IOException {
if (value == null) {
return nullValue();
}
writeDeferredName();
beforeValue();
string(value);
return this;
}
/**
* Encodes {@code value}.
*
* @return this writer.
*/
public CustomJsonWriter value(boolean value) throws IOException {
writeDeferredName();
beforeValue();
out.write(value ? "true" : "false");
return this;
}
/**
* Encodes {@code value}.
*
* @return this writer.
*/
public CustomJsonWriter value(Boolean value) throws IOException {
if (value == null) {
return nullValue();
}
writeDeferredName();
beforeValue();
out.write(value ? "true" : "false");
return this;
}
/**
* Encodes {@code value}.
*
* @param value a finite value. May not be {@link Double#isNaN() NaNs} or
* {@link Double#isInfinite() infinities}.
* @return this writer.
*/
public CustomJsonWriter value(double value) throws IOException {
writeDeferredName();
if (!lenient && (Double.isNaN(value) || Double.isInfinite(value))) {
throw new IllegalArgumentException("Numeric values must be finite, but was " + value);
}
beforeValue();
out.append(Double.toString(value));
return this;
}
/**
* Encodes {@code value}.
*
* @return this writer.
*/
public CustomJsonWriter value(long value) throws IOException {
writeDeferredName();
beforeValue();
out.write(Long.toString(value));
return this;
}
/**
* Encodes {@code value}.
*
* @param value a finite value. May not be {@link Double#isNaN() NaNs} or
* {@link Double#isInfinite() infinities}.
* @return this writer.
*/
public CustomJsonWriter value(Number value) throws IOException {
if (value == null) {
return nullValue();
}
writeDeferredName();
String string = value.toString();
if (!lenient && (string.equals("-Infinity") || string.equals("Infinity") || string.equals("NaN"))) {
throw new IllegalArgumentException("Numeric values must be finite, but was " + value);
}
beforeValue();
out.append(string);
return this;
}
/**
* Ensures all buffered data is written to the underlying {@link Writer}
* and flushes that writer.
*/
public void flush() throws IOException {
if (stackSize == 0) {
throw new IllegalStateException("JsonWriter is closed.");
}
out.flush();
}
private void string(String value) throws IOException {
String[] replacements = htmlSafe ? HTML_SAFE_REPLACEMENT_CHARS : REPLACEMENT_CHARS;
out.write('\"');
int last = 0;
int length = value.length();
// CHECKSTYLE:OFF
for (int i = 0; i < length; i++) {
char c = value.charAt(i);
String replacement;
if (c < 128) {
replacement = replacements[c];
if (replacement == null) {
continue;
}
} else if (c == '\u2028') {
replacement = "\\u2028";
} else if (c == '\u2029') {
replacement = "\\u2029";
} else {
continue;
}
if (last < i) {
out.write(value, last, i - last);
}
out.write(replacement);
last = i + 1;
}
// CHECKSTYLE:ON
if (last < length) {
out.write(value, last, length - last);
}
out.write('\"');
}
private void newline() throws IOException {
if (indent == null) {
return;
}
out.write('\n');
for (int i = 1, size = stackSize; i < size; i++) {
out.write(indent);
}
}
/**
* Inserts any necessary separators and whitespace before a name. Also
* adjusts the stack to expect the name's value.
*/
private void beforeName() throws IOException {
int context = peek();
if (context == NONEMPTY_OBJECT) { // first in object
out.write(", ");
} else if (context != EMPTY_OBJECT) { // not in an object!
throw new IllegalStateException("Nesting problem.");
}
newline();
replaceTop(DANGLING_NAME);
}
/**
* Inserts any necessary separators and whitespace before a literal value,
* inline array, or inline object. Also adjusts the stack to expect either a
* closing bracket or another element.
*/
@SuppressWarnings("fallthrough")
private void beforeValue() throws IOException {
switch (peek()) {
case NONEMPTY_DOCUMENT:
if (!lenient) {
throw new IllegalStateException(
"JSON must have only one top-level value.");
}
// fall-through
case EMPTY_DOCUMENT: // first in document
replaceTop(NONEMPTY_DOCUMENT);
break;
case EMPTY_ARRAY: // first in array
replaceTop(NONEMPTY_ARRAY);
newline();
break;
case NONEMPTY_ARRAY: // another in array
out.append(", ");
newline();
break;
case DANGLING_NAME: // value for name
out.append(separator);
replaceTop(NONEMPTY_OBJECT);
break;
default:
throw new IllegalStateException("Nesting problem.");
}
}
}

View File

@@ -1,15 +1,29 @@
package com.actiontech.dble.plan.common.item.function.strfunc;
/*
* Copyright (C) 2016-2022 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.plan.common.item.function.jsonfunc;
import com.actiontech.dble.plan.common.item.Item;
import com.actiontech.dble.plan.common.item.function.ItemFunc;
import com.actiontech.dble.plan.common.item.function.strfunc.ItemStrFunc;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.internal.Streams;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.io.StringWriter;
import java.util.*;
/**
* the json extract function.
* this implementation learn from mysql's
*
* @author dcy
* Create Date: 2022-01-24
*/
@@ -90,15 +104,42 @@ public class ItemFuncJsonExtract extends ItemStrFunc {
if (unquote && result.isJsonPrimitive()) {
outputResult = (result.getAsString());
} else {
outputResult = (result.toString());
outputResult = jsonToString(result);
}
} else {
outputResult = (results.toString());
outputResult = jsonToString(results);
}
return outputResult;
}
/**
* migrate from JsonElement#toString()
*
* @param node
* @return
*/
private static String jsonToString(JsonElement node) {
try {
StringWriter stringWriter = new StringWriter();
final CustomJsonWriter originWriter = new CustomJsonWriter(stringWriter);
originWriter.setLenient(true);
JsonWriter jsonWriter = new JsonWriterAdaptor(stringWriter, originWriter);
Streams.write(node, jsonWriter);
return stringWriter.toString();
} catch (IOException e) {
throw new AssertionError(e);
}
}
private static String jsonToString(Collection<JsonElement> results) {
final JsonArray jsonArray = new JsonArray();
for (JsonElement result : results) {
jsonArray.add(result);
}
return jsonToString(jsonArray);
}
private static class JsonSeeker {
boolean couldReturnMultipleMatches = false;
@@ -310,6 +351,7 @@ public class ItemFuncJsonExtract extends ItemStrFunc {
/**
* for **
*
* @return
*/
private PathLeg parseEllipsisLeg() {
@@ -343,6 +385,7 @@ public class ItemFuncJsonExtract extends ItemStrFunc {
} else {
findEndOfMemberName();
int endIndex = index;
//decode escape string
boolean wasQuoted = (pattern[beginIndex] == DOUBLE_QUOTE);
String tmpS;
if (wasQuoted) {

View File

@@ -1,7 +1,14 @@
package com.actiontech.dble.plan.common.item.function.strfunc;
/*
* Copyright (C) 2016-2022 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.plan.common.item.function.jsonfunc;
import com.actiontech.dble.plan.common.item.Item;
import com.actiontech.dble.plan.common.item.function.ItemFunc;
import com.actiontech.dble.plan.common.item.function.strfunc.ItemStrFunc;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;

View File

@@ -0,0 +1,63 @@
/*
* Copyright (C) 2016-2022 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.plan.common.item.function.jsonfunc;
/**
* migrate from gson 2.8.9
* for issue inner-1940
*/
final class JsonScope {
/**
* An array with no elements requires no separators or newlines before
* it is closed.
*/
static final int EMPTY_ARRAY = 1;
/**
* A array with at least one value requires a comma and newline before
* the next element.
*/
static final int NONEMPTY_ARRAY = 2;
/**
* An object with no name/value pairs requires no separators or newlines
* before it is closed.
*/
static final int EMPTY_OBJECT = 3;
/**
* An object whose most recent element is a key. The next element must
* be a value.
*/
static final int DANGLING_NAME = 4;
/**
* An object with at least one name/value pair requires a comma and
* newline before the next element.
*/
static final int NONEMPTY_OBJECT = 5;
/**
* No object or array has been started.
*/
static final int EMPTY_DOCUMENT = 6;
/**
* A document with at an array or object.
*/
static final int NONEMPTY_DOCUMENT = 7;
/**
* A document that's been closed and cannot be accessed.
*/
static final int CLOSED = 8;
private JsonScope() {
}
}

View File

@@ -0,0 +1,120 @@
/*
* Copyright (C) 2016-2022 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.plan.common.item.function.jsonfunc;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.io.Writer;
/**
* @author dcy
* Create Date: 2022-09-28
*/
final class JsonWriterAdaptor extends JsonWriter {
CustomJsonWriter jsonWriter;
JsonWriterAdaptor(Writer out, CustomJsonWriter jsonWriter) {
super(out);
this.jsonWriter = jsonWriter;
}
@Override
public boolean isLenient() {
return jsonWriter.isLenient();
}
@Override
public JsonWriter beginArray() throws IOException {
jsonWriter.beginArray();
return this;
}
@Override
public JsonWriter endArray() throws IOException {
jsonWriter.endArray();
return this;
}
@Override
public JsonWriter beginObject() throws IOException {
jsonWriter.beginObject();
return this;
}
@Override
public JsonWriter endObject() throws IOException {
jsonWriter.endObject();
return this;
}
@Override
public JsonWriter name(String name) throws IOException {
jsonWriter.name(name);
return this;
}
@Override
public JsonWriter jsonValue(String value) throws IOException {
jsonWriter.jsonValue(value);
return this;
}
@Override
public JsonWriter nullValue() throws IOException {
jsonWriter.nullValue();
return this;
}
@Override
public JsonWriter value(String value) throws IOException {
jsonWriter.value(value);
return this;
}
@Override
public JsonWriter value(boolean value) throws IOException {
jsonWriter.value(value);
return this;
}
@Override
public JsonWriter value(Boolean value) throws IOException {
jsonWriter.value(value);
return this;
}
@Override
public JsonWriter value(double value) throws IOException {
jsonWriter.value(value);
return this;
}
@Override
public JsonWriter value(long value) throws IOException {
jsonWriter.value(value);
return this;
}
@Override
public JsonWriter value(Number value) throws IOException {
jsonWriter.value(value);
return this;
}
@Override
public void flush() throws IOException {
jsonWriter.flush();
}
@Override
public void close() throws IOException {
jsonWriter.close();
}
}

View File

@@ -21,6 +21,8 @@ import com.actiontech.dble.plan.common.item.function.castfunc.ItemFuncBinaryCast
import com.actiontech.dble.plan.common.item.function.castfunc.ItemFuncConvCharset;
import com.actiontech.dble.plan.common.item.function.castfunc.ItemNCharTypeCast;
import com.actiontech.dble.plan.common.item.function.convertfunc.ItemCharTypeConvert;
import com.actiontech.dble.plan.common.item.function.jsonfunc.ItemFuncJsonExtract;
import com.actiontech.dble.plan.common.item.function.jsonfunc.ItemFuncJsonUnQuote;
import com.actiontech.dble.plan.common.item.function.mathsfunc.ItemFuncConv;
import com.actiontech.dble.plan.common.item.function.mathsfunc.operator.*;
import com.actiontech.dble.plan.common.item.function.operator.cmpfunc.*;

View File

@@ -153,7 +153,11 @@ public class RWSplitNonBlockingSession extends Session {
Boolean isMaster = canRunOnMaster(master); // first
boolean firstValue = isMaster == null ? false : isMaster;
long rwStickyTime = SystemConfig.getInstance().getRwStickyTime();
if (rwGroup.getRwSplitMode() != PhysicalDbGroup.RW_SPLIT_OFF && (rwStickyTime > 0) && !firstValue) {
boolean rwSticky = rwStickyTime > 0;
if (rwGroup.isDelayDetectionStart()) {
rwSticky = false;
}
if (rwGroup.getRwSplitMode() != PhysicalDbGroup.RW_SPLIT_OFF && rwSticky && !firstValue) {
if (this.getPreWriteResponseTime() > 0 && System.currentTimeMillis() - this.getPreWriteResponseTime() <= rwStickyTime) {
isMaster = true;
if (LOGGER.isDebugEnabled()) {

View File

@@ -75,6 +75,7 @@ public final class ManagerSchemaInfo {
registerTable(new DbleBackendConnectionsAssociateThread());
registerTable(new DbleClusterRenewThread());
registerTable(new RecyclingResource());
registerTable(new DbleDelayDetection());
}
private void initViews() {

View File

@@ -46,6 +46,8 @@ public class DbleDbGroup extends ManagerWritableTable {
public static final String COLUMN_DELAY_THRESHOLD = "delay_threshold";
public static final String COLUMN_DISABLE_HA = "disable_ha";
public static final String COLUMN_ACTIVE = "active";
public static final String DELAY_PERIOD_MILLIS = "delay_period_millis";
public static final String DELAY_DATABASE = "delay_database";
private final List<LinkedHashMap<String, String>> tempRowList = Lists.newArrayList();
@@ -81,6 +83,12 @@ public class DbleDbGroup extends ManagerWritableTable {
columns.put(COLUMN_DELAY_THRESHOLD, new ColumnMeta(COLUMN_DELAY_THRESHOLD, "int(11)", true, "-1"));
columnsType.put(COLUMN_DELAY_THRESHOLD, Fields.FIELD_TYPE_LONG);
columns.put(DELAY_PERIOD_MILLIS, new ColumnMeta(DELAY_PERIOD_MILLIS, "int(11)", true, "-1"));
columnsType.put(DELAY_PERIOD_MILLIS, Fields.FIELD_TYPE_LONG);
columns.put(DELAY_DATABASE, new ColumnMeta(DELAY_DATABASE, "varchar(255)", true, null));
columnsType.put(DELAY_DATABASE, Fields.FIELD_TYPE_VAR_STRING);
columns.put(COLUMN_DISABLE_HA, new ColumnMeta(COLUMN_DISABLE_HA, "varchar(5)", true, "false"));
columnsType.put(COLUMN_DISABLE_HA, Fields.FIELD_TYPE_VAR_STRING);
@@ -215,6 +223,13 @@ public class DbleDbGroup extends ManagerWritableTable {
case COLUMN_DISABLE_HA:
dbGroup.setDisableHA(value);
break;
case DELAY_PERIOD_MILLIS:
dbGroup.setDelayPeriodMillis(Integer.parseInt(value));
break;
case DELAY_DATABASE:
dbGroup.setDelayDatabase(String.valueOf(value));
break;
default:
break;
}
@@ -283,6 +298,14 @@ public class DbleDbGroup extends ManagerWritableTable {
if (!StringUtil.isBlank(heartbeatKeepAliveStr) && IntegerUtil.parseInt(heartbeatKeepAliveStr) < 0) {
throw new ConfigException("Column '" + COLUMN_KEEP_ALIVE + "' should be an integer greater than or equal to 0!");
}
String delayPeriodMillis = row.get(DELAY_PERIOD_MILLIS);
delayDetectionCheck(delayPeriodMillis);
}
}
private void delayDetectionCheck(String delayPeriodMillis) {
if (!StringUtil.isBlank(delayPeriodMillis) && IntegerUtil.parseInt(delayPeriodMillis) < -1) {
throw new ConfigException("Column '" + COLUMN_DELAY_THRESHOLD + "' should be an integer greater than -1!");
}
}
@@ -295,6 +318,8 @@ public class DbleDbGroup extends ManagerWritableTable {
map.put(COLUMN_KEEP_ALIVE, String.valueOf(dbGroupConfig.getKeepAlive()));
map.put(COLUMN_RW_SPLIT_MODE, String.valueOf(dbGroupConfig.getRwSplitMode()));
map.put(COLUMN_DELAY_THRESHOLD, String.valueOf(dbGroupConfig.getDelayThreshold()));
map.put(DELAY_PERIOD_MILLIS, String.valueOf(dbGroupConfig.getDelayPeriodMillis()));
map.put(DELAY_DATABASE, String.valueOf(dbGroupConfig.getDelayDatabase()));
map.put(COLUMN_DISABLE_HA, String.valueOf(dbGroupConfig.isDisableHA()));
return map;
}

View File

@@ -0,0 +1,160 @@
/*
* Copyright (C) 2016-2022 ActionTech.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.services.manager.information.tables;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.delyDetection.DelayDetection;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.config.Fields;
import com.actiontech.dble.config.ServerConfig;
import com.actiontech.dble.config.model.db.DbInstanceConfig;
import com.actiontech.dble.meta.ColumnMeta;
import com.actiontech.dble.services.manager.information.ManagerWritableTable;
import com.actiontech.dble.util.StringUtil;
import com.google.common.collect.Lists;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class DbleDelayDetection extends ManagerWritableTable {
private static final String TABLE_NAME = "delay_detection";
private static final String COLUMN_DB_GROUP_NAME = "db_group_name";
private static final String COLUMN_NAME = "name";
private static final String COLUMN_HOST = "host";
private static final String COLUMN_DELAY = "delay";
private static final String COLUMN_STATUS = "status";
private static final String COLUMN_MESSAGE = "message";
private static final String COLUMN_LAST_ACTIVE_TIME = "last_active_time";
private static final String COLUMN_BACKEND_CONN_ID = "backend_conn_id";
private static final String COLUMN_LOGIC_UPDATE = "logic_update";
public DbleDelayDetection() {
super(TABLE_NAME, 9);
}
@Override
protected void initColumnAndType() {
columns.put(COLUMN_DB_GROUP_NAME, new ColumnMeta(COLUMN_DB_GROUP_NAME, "varchar(64)", false));
columnsType.put(COLUMN_DB_GROUP_NAME, Fields.FIELD_TYPE_VAR_STRING);
columns.put(COLUMN_NAME, new ColumnMeta(COLUMN_NAME, "varchar(64)", false, true));
columnsType.put(COLUMN_NAME, Fields.FIELD_TYPE_VAR_STRING);
columns.put(COLUMN_HOST, new ColumnMeta(COLUMN_HOST, "int(11)", false));
columnsType.put(COLUMN_HOST, Fields.FIELD_TYPE_VAR_STRING);
columns.put(COLUMN_DELAY, new ColumnMeta(COLUMN_DELAY, "int(11)", false));
columnsType.put(COLUMN_DELAY, Fields.FIELD_TYPE_LONG);
columns.put(COLUMN_STATUS, new ColumnMeta(COLUMN_STATUS, "varchar(3)", false));
columnsType.put(COLUMN_STATUS, Fields.FIELD_TYPE_VAR_STRING);
columns.put(COLUMN_MESSAGE, new ColumnMeta(COLUMN_MESSAGE, "varchar(1024)", false));
columnsType.put(COLUMN_MESSAGE, Fields.FIELD_TYPE_VAR_STRING);
columns.put(COLUMN_LAST_ACTIVE_TIME, new ColumnMeta(COLUMN_LAST_ACTIVE_TIME, "timestamp", false));
columnsType.put(COLUMN_LAST_ACTIVE_TIME, Fields.FIELD_TYPE_TIMESTAMP);
columns.put(COLUMN_BACKEND_CONN_ID, new ColumnMeta(COLUMN_BACKEND_CONN_ID, "int(11)", false));
columnsType.put(COLUMN_BACKEND_CONN_ID, Fields.FIELD_TYPE_LONG);
columns.put(COLUMN_LOGIC_UPDATE, new ColumnMeta(COLUMN_LOGIC_UPDATE, "int(11)", false));
columnsType.put(COLUMN_LOGIC_UPDATE, Fields.FIELD_TYPE_LONG);
}
protected List<LinkedHashMap<String, String>> getRows() {
List<LinkedHashMap<String, String>> results = new ArrayList<>();
ServerConfig conf = DbleServer.getInstance().getConfig();
Map<String, PhysicalDbGroup> dbGroups = conf.getDbGroups();
for (PhysicalDbGroup dbGroup : dbGroups.values()) {
if (dbGroup.isDelayDetectionStart()) {
for (PhysicalDbInstance dbInstance : dbGroup.getDbInstances(true)) {
LinkedHashMap<String, String> row = getRow(dbInstance);
if (!row.isEmpty()) {
results.add(row);
}
}
}
}
return results;
}
private LinkedHashMap<String, String> getRow(PhysicalDbInstance dbInstance) {
LinkedHashMap<String, String> row = new LinkedHashMap<>();
final DelayDetection delayDetection = dbInstance.getDelayDetection();
if (Objects.isNull(delayDetection) || delayDetection.isStop()) {
return row;
}
DbInstanceConfig config = dbInstance.getConfig();
row.put(COLUMN_DB_GROUP_NAME, dbInstance.getDbGroup().getGroupName());
row.put(COLUMN_NAME, dbInstance.getName());
row.put(COLUMN_HOST, config.getUrl());
row.put(COLUMN_DELAY, String.valueOf(delayDetection.getDelayVal()));
row.put(COLUMN_STATUS, String.valueOf(dbInstance.getDelayDetectionStatus()));
row.put(COLUMN_MESSAGE, delayDetection.getErrorMessage());
row.put(COLUMN_LAST_ACTIVE_TIME, String.valueOf(delayDetection.getLastReceivedQryTime()));
row.put(COLUMN_LOGIC_UPDATE, String.valueOf(delayDetection.getLogicUpdate()));
Optional.ofNullable(delayDetection.getConn()).ifPresent(connection -> row.put(COLUMN_BACKEND_CONN_ID, String.valueOf(connection.getId())));
return row;
}
@Override
public int insertRows(List<LinkedHashMap<String, String>> rows) throws SQLException {
throw new SQLException("Access denied for table '" + tableName + "'", "42000", ErrorCode.ER_ACCESS_DENIED_ERROR);
}
@Override
public int updateRows(Set<LinkedHashMap<String, String>> affectPks, LinkedHashMap<String, String> values) throws SQLException {
if (values.size() != 1 || !values.containsKey(COLUMN_LOGIC_UPDATE)) {
throw new SQLException("only column '" + COLUMN_LOGIC_UPDATE + "' is writable", "42S22", ErrorCode.ER_ERROR_ON_WRITE);
}
final ReentrantReadWriteLock lock = DbleServer.getInstance().getConfig().getLock();
lock.writeLock().lock();
try {
int val = Integer.parseInt(values.get(COLUMN_LOGIC_UPDATE));
ServerConfig conf = DbleServer.getInstance().getConfig();
Map<String, PhysicalDbGroup> dbGroups = conf.getDbGroups();
List<PhysicalDbInstance> instanceList = Lists.newArrayList();
for (LinkedHashMap<String, String> affectPk : affectPks) {
String groupName = affectPk.get(COLUMN_DB_GROUP_NAME);
String name = affectPk.get(COLUMN_NAME);
for (PhysicalDbGroup physicalDbGroup : dbGroups.values()) {
if (StringUtil.equals(groupName, physicalDbGroup.getGroupName()) && physicalDbGroup.isDelayDetectionStart()) {
PhysicalDbInstance instance = physicalDbGroup.getDbInstances(true).stream().filter(dbInstance -> StringUtil.equals(name, dbInstance.getName()) && Objects.nonNull(dbInstance.getDelayDetection())).findFirst().get();
DelayDetection delayDetection = instance.getDelayDetection();
int logicUpdate = delayDetection.getLogicUpdate();
if (val != logicUpdate + 1) {
throw new SQLException("parameter only increment is allowed to be 1", "42S22", ErrorCode.ER_TABLE_CANT_HANDLE_AUTO_INCREMENT);
}
instanceList.add(instance);
}
}
}
for (PhysicalDbInstance instance : instanceList) {
instance.stopDelayDetection("the management end is shut down manually");
instance.startDelayDetection();
instance.getDelayDetection().setLogicUpdate(val);
}
} finally {
lock.writeLock().unlock();
}
return affectPks.size();
}
@Override
public int deleteRows(Set<LinkedHashMap<String, String>> affectPks) throws SQLException {
throw new SQLException("Access denied for table '" + tableName + "'", "42000", ErrorCode.ER_ACCESS_DENIED_ERROR);
}
}

View File

@@ -5,6 +5,7 @@ public class ChangeItem {
private Object item;
private ChangeItemType itemType;
private boolean affectHeartbeat;
private boolean affectDelayDetection;
private boolean affectConnectionPool;
private boolean affectTestConn;
private boolean affectEntryDbGroup;
@@ -77,6 +78,14 @@ public class ChangeItem {
this.item = item;
}
public boolean isAffectDelayDetection() {
return affectDelayDetection;
}
public void setAffectDelayDetection(boolean affectDelayDetection) {
this.affectDelayDetection = affectDelayDetection;
}
@Override
public String toString() {
return "ChangeItem{" +
@@ -87,6 +96,7 @@ public class ChangeItem {
", affectTestConn=" + affectTestConn +
", affectEntryDbGroup=" + affectEntryDbGroup +
", affectPoolCapacity=" + affectPoolCapacity +
", affectDelayDetection=" + affectDelayDetection +
'}';
}
}

View File

@@ -150,9 +150,27 @@ public final class DryRun {
list.add(new ErrorInfo("Cluster", "NOTICE", "Dble is in single mod"));
}
delayDetection(serverConfig, list);
printResult(service, list);
}
private static void delayDetection(ServerConfig serverConfig, List<ErrorInfo> list) {
Map<String, PhysicalDbGroup> dbGroups = serverConfig.getDbGroups();
dbGroups.forEach((k, v) -> {
DataBaseType dataBaseType = v.getDbGroupConfig().instanceDatabaseType();
if (dataBaseType == DataBaseType.MYSQL && v.isDelayDetectionStart()) {
String delayDatabase = v.getDbGroupConfig().getDelayDatabase();
ShowDatabaseHandler mysqlShowDatabaseHandler = new ShowDatabaseHandler(dbGroups, "Database");
Set<String> databases = mysqlShowDatabaseHandler.execute(v.getWriteDbInstance());
if (!databases.contains(delayDatabase)) {
list.add(new ErrorInfo("Xml", "WARNING", "database " + delayDatabase + " doesn't exists in dbGroup[" + v.getGroupName() + "]"));
}
}
});
}
private static void ucoreConnectionTest(List<ErrorInfo> list) {
try {
String selfPath = ClusterPathUtil.getOnlinePath(SystemConfig.getInstance().getInstanceName());

View File

@@ -494,6 +494,9 @@ public final class ReloadConfig {
if (!newDbGroup.equalsForHeartbeat(oldDbGroup)) {
changeItem.setAffectHeartbeat(true);
}
if (!newDbGroup.equalsForDelayDetection(oldDbGroup)) {
changeItem.setAffectDelayDetection(true);
}
changeItemList.add(changeItem);
}
@@ -520,6 +523,9 @@ public final class ReloadConfig {
if (!newDbInstance.equalsForHeartbeat(oldDbInstance)) {
changeItem.setAffectHeartbeat(true);
}
if (!newDbInstance.equalsForDelayDetection(oldDbInstance)) {
changeItem.setAffectDelayDetection(true);
}
if (!newDbInstance.equalsForTestConn(oldDbInstance)) {
changeItem.setAffectTestConn(true);
} else {

View File

@@ -53,6 +53,8 @@
<xs:attribute name="name" type="xs:string" use="required"/>
<xs:attribute name="rwSplitMode" type="xs:nonNegativeInteger" use="required"/>
<xs:attribute name="delayThreshold" type="xs:integer"/>
<xs:attribute name="delayPeriodMillis" type="xs:integer"/>
<xs:attribute name="delayDatabase" type="xs:string"/>
<xs:attribute name="disableHA" type="xs:boolean"/>
</xs:complexType>
</xs:element>