inner-2413: recreate heartbeat connection when timeout

This commit is contained in:
dcy
2024-12-11 10:47:27 +08:00
parent fd42d138fb
commit 077004b7a7
5 changed files with 35 additions and 0 deletions

View File

@@ -206,4 +206,9 @@ public class HeartbeatSQLJob implements ResponseHandler {
public boolean isQuit() {
return connectionRef.getStamp() == 2;
}
public MySQLHeartbeat getHeartbeat() {
return heartbeat;
}
}

View File

@@ -47,6 +47,7 @@ public class MySQLHeartbeat {
private final DbInstanceSyncRecorder asyncRecorder = new DbInstanceSyncRecorder();
private final PhysicalDbInstance source;
protected volatile MySQLHeartbeatStatus status;
private volatile long beginTimeoutTime = 0;
private String heartbeatSQL;
private long heartbeatTimeout; // during the time, heart failed will ignore
private final AtomicInteger errorCount = new AtomicInteger(0);
@@ -258,10 +259,16 @@ public class MySQLHeartbeat {
}
if (status != MySQLHeartbeatStatus.TIMEOUT) {
LOGGER.warn("heartbeat to [{}] setTimeout, previous status is {}", source.getConfig().getUrl(), status);
beginTimeoutTime = System.currentTimeMillis();
status = MySQLHeartbeatStatus.TIMEOUT;
}
}
public long getBeginTimeoutTime() {
return beginTimeoutTime;
}
public boolean isHeartBeatOK() {
if (status == MySQLHeartbeatStatus.OK || status == MySQLHeartbeatStatus.INIT) {
return true;

View File

@@ -92,6 +92,7 @@ public final class SystemConfig {
private long idleTimeout = DEFAULT_IDLE_TIMEOUT;
// sql execute timeout (second)
private long sqlExecuteTimeout = 300;
private long heartbeatSqlExecuteTimeout = 10;
// connection will force close if received close packet but haven't been closed after closeTimeout milliseconds.
// set the value too big is not a good idea.
private long closeTimeout = 100;
@@ -972,6 +973,14 @@ public final class SystemConfig {
}
public long getHeartbeatSqlExecuteTimeout() {
return heartbeatSqlExecuteTimeout;
}
public void setHeartbeatSqlExecuteTimeout(long heartbeatSqlExecuteTimeout) {
this.heartbeatSqlExecuteTimeout = heartbeatSqlExecuteTimeout;
}
public int getTxIsolation() {
return txIsolation;
}
@@ -2015,6 +2024,7 @@ public final class SystemConfig {
", sqlDumpLogTimeBasedRotate=" + sqlDumpLogTimeBasedRotate +
", sqlDumpLogDeleteFileAge='" + sqlDumpLogDeleteFileAge + '\'' +
", queryForUpdateMaxRowsSize=" + queryForUpdateMaxRowsSize +
", heartbeatSqlExecuteTimeout=" + heartbeatSqlExecuteTimeout +
"]";
}

View File

@@ -7,6 +7,9 @@ package com.actiontech.dble.net;
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
import com.actiontech.dble.backend.heartbeat.HeartbeatSQLJob;
import com.actiontech.dble.backend.heartbeat.MySQLHeartbeat;
import com.actiontech.dble.backend.heartbeat.MySQLHeartbeatStatus;
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.xa.stage.XAStage;
import com.actiontech.dble.backend.mysql.xa.TxState;
@@ -189,6 +192,7 @@ public final class IOProcessor {
private void backendCheck() {
long sqlTimeout = SystemConfig.getInstance().getSqlExecuteTimeout() * 1000L;
final long heartbeatSqlExecuteTimeout = SystemConfig.getInstance().getHeartbeatSqlExecuteTimeout() * 1000L;
Iterator<Entry<Long, BackendConnection>> it = backends.entrySet().iterator();
while (it.hasNext()) {
BackendConnection c = it.next().getValue();
@@ -228,6 +232,14 @@ public final class IOProcessor {
if (!c.getBackendService().isDDL() && c.getState() == PooledConnection.STATE_IN_USE && c.getBackendService().isExecuting() && c.getLastTime() < TimeUtil.currentTimeMillis() - sqlTimeout) {
LOGGER.info("found backend connection SQL timeout ,close it " + c);
c.close("sql timeout");
} else if ((c.getBackendService().getResponseHandler() instanceof HeartbeatSQLJob)) {
if (heartbeatSqlExecuteTimeout > 0) {
final MySQLHeartbeat heartbeat = ((HeartbeatSQLJob) c.getBackendService().getResponseHandler()).getHeartbeat();
if (c.getBackendService().isExecuting() && heartbeat.getStatus() == MySQLHeartbeatStatus.TIMEOUT && heartbeat.getBeginTimeoutTime() < System.currentTimeMillis() - heartbeatSqlExecuteTimeout) {
LOGGER.info("found backend heartbeat connection SQL timeout ,close it " + c);
c.close("heart sql timeout");
}
}
}
// clean closed conn or check time out

View File

@@ -95,6 +95,7 @@ public final class SystemParams {
readOnlyParams.add(new ParamInfo("checkTableConsistencyPeriod", sysConfig.getCheckTableConsistencyPeriod() + "ms", "The period of consistency tableStructure check. The default value is 1800000ms(means 30minutes=30*60*1000)"));
readOnlyParams.add(new ParamInfo("processorCheckPeriod", sysConfig.getProcessorCheckPeriod() + "ms", "The period between the jobs for cleaning the closed or overtime connections. The default is 1000ms"));
readOnlyParams.add(new ParamInfo("sqlExecuteTimeout", sysConfig.getSqlExecuteTimeout() + "s", "The max query executing time.If time out,the connection will be closed. The default is 300 seconds"));
readOnlyParams.add(new ParamInfo("heartbeatSqlExecuteTimeout", sysConfig.getHeartbeatSqlExecuteTimeout() + "s", "The max heartbeat query executing time.If time out,the connection will be closed. The default is 10 seconds.set 0 to disable it."));
readOnlyParams.add(new ParamInfo("recordTxn", sysConfig.getRecordTxn() + "", "Whether the transaction be recorded as a file, the default value is 0"));
readOnlyParams.add(new ParamInfo("transactionLogBaseDir", sysConfig.getTransactionLogBaseDir(), "The directory of the transaction record file, the default value is ./txlogs/"));
readOnlyParams.add(new ParamInfo("transactionLogBaseName", sysConfig.getTransactionLogBaseName(), "The name of the transaction record file. The default value is server-tx"));