From 077004b7a78335f2b1b021375ad54a54f76ccece Mon Sep 17 00:00:00 2001 From: dcy Date: Wed, 11 Dec 2024 10:47:27 +0800 Subject: [PATCH] inner-2413: recreate heartbeat connection when timeout --- .../dble/backend/heartbeat/HeartbeatSQLJob.java | 5 +++++ .../dble/backend/heartbeat/MySQLHeartbeat.java | 7 +++++++ .../actiontech/dble/config/model/SystemConfig.java | 10 ++++++++++ .../java/com/actiontech/dble/net/IOProcessor.java | 12 ++++++++++++ .../com/actiontech/dble/singleton/SystemParams.java | 1 + 5 files changed, 35 insertions(+) diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/HeartbeatSQLJob.java b/src/main/java/com/actiontech/dble/backend/heartbeat/HeartbeatSQLJob.java index ffe02775d..a0994720e 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/HeartbeatSQLJob.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/HeartbeatSQLJob.java @@ -206,4 +206,9 @@ public class HeartbeatSQLJob implements ResponseHandler { public boolean isQuit() { return connectionRef.getStamp() == 2; } + + + public MySQLHeartbeat getHeartbeat() { + return heartbeat; + } } diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java index d6e5f2a99..6b53354a4 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java @@ -47,6 +47,7 @@ public class MySQLHeartbeat { private final DbInstanceSyncRecorder asyncRecorder = new DbInstanceSyncRecorder(); private final PhysicalDbInstance source; protected volatile MySQLHeartbeatStatus status; + private volatile long beginTimeoutTime = 0; private String heartbeatSQL; private long heartbeatTimeout; // during the time, heart failed will ignore private final AtomicInteger errorCount = new AtomicInteger(0); @@ -258,10 +259,16 @@ public class MySQLHeartbeat { } if (status != MySQLHeartbeatStatus.TIMEOUT) { LOGGER.warn("heartbeat to [{}] setTimeout, previous status is {}", source.getConfig().getUrl(), status); + beginTimeoutTime = System.currentTimeMillis(); status = MySQLHeartbeatStatus.TIMEOUT; } } + + public long getBeginTimeoutTime() { + return beginTimeoutTime; + } + public boolean isHeartBeatOK() { if (status == MySQLHeartbeatStatus.OK || status == MySQLHeartbeatStatus.INIT) { return true; diff --git a/src/main/java/com/actiontech/dble/config/model/SystemConfig.java b/src/main/java/com/actiontech/dble/config/model/SystemConfig.java index a41380c6c..3f4338019 100644 --- a/src/main/java/com/actiontech/dble/config/model/SystemConfig.java +++ b/src/main/java/com/actiontech/dble/config/model/SystemConfig.java @@ -92,6 +92,7 @@ public final class SystemConfig { private long idleTimeout = DEFAULT_IDLE_TIMEOUT; // sql execute timeout (second) private long sqlExecuteTimeout = 300; + private long heartbeatSqlExecuteTimeout = 10; // connection will force close if received close packet but haven't been closed after closeTimeout milliseconds. // set the value too big is not a good idea. private long closeTimeout = 100; @@ -972,6 +973,14 @@ public final class SystemConfig { } + public long getHeartbeatSqlExecuteTimeout() { + return heartbeatSqlExecuteTimeout; + } + + public void setHeartbeatSqlExecuteTimeout(long heartbeatSqlExecuteTimeout) { + this.heartbeatSqlExecuteTimeout = heartbeatSqlExecuteTimeout; + } + public int getTxIsolation() { return txIsolation; } @@ -2015,6 +2024,7 @@ public final class SystemConfig { ", sqlDumpLogTimeBasedRotate=" + sqlDumpLogTimeBasedRotate + ", sqlDumpLogDeleteFileAge='" + sqlDumpLogDeleteFileAge + '\'' + ", queryForUpdateMaxRowsSize=" + queryForUpdateMaxRowsSize + + ", heartbeatSqlExecuteTimeout=" + heartbeatSqlExecuteTimeout + "]"; } diff --git a/src/main/java/com/actiontech/dble/net/IOProcessor.java b/src/main/java/com/actiontech/dble/net/IOProcessor.java index 5841278fa..0a1199171 100644 --- a/src/main/java/com/actiontech/dble/net/IOProcessor.java +++ b/src/main/java/com/actiontech/dble/net/IOProcessor.java @@ -7,6 +7,9 @@ package com.actiontech.dble.net; import com.actiontech.dble.backend.datasource.PhysicalDbGroup; +import com.actiontech.dble.backend.heartbeat.HeartbeatSQLJob; +import com.actiontech.dble.backend.heartbeat.MySQLHeartbeat; +import com.actiontech.dble.backend.heartbeat.MySQLHeartbeatStatus; import com.actiontech.dble.backend.datasource.PhysicalDbInstance; import com.actiontech.dble.backend.mysql.nio.handler.transaction.xa.stage.XAStage; import com.actiontech.dble.backend.mysql.xa.TxState; @@ -189,6 +192,7 @@ public final class IOProcessor { private void backendCheck() { long sqlTimeout = SystemConfig.getInstance().getSqlExecuteTimeout() * 1000L; + final long heartbeatSqlExecuteTimeout = SystemConfig.getInstance().getHeartbeatSqlExecuteTimeout() * 1000L; Iterator> it = backends.entrySet().iterator(); while (it.hasNext()) { BackendConnection c = it.next().getValue(); @@ -228,6 +232,14 @@ public final class IOProcessor { if (!c.getBackendService().isDDL() && c.getState() == PooledConnection.STATE_IN_USE && c.getBackendService().isExecuting() && c.getLastTime() < TimeUtil.currentTimeMillis() - sqlTimeout) { LOGGER.info("found backend connection SQL timeout ,close it " + c); c.close("sql timeout"); + } else if ((c.getBackendService().getResponseHandler() instanceof HeartbeatSQLJob)) { + if (heartbeatSqlExecuteTimeout > 0) { + final MySQLHeartbeat heartbeat = ((HeartbeatSQLJob) c.getBackendService().getResponseHandler()).getHeartbeat(); + if (c.getBackendService().isExecuting() && heartbeat.getStatus() == MySQLHeartbeatStatus.TIMEOUT && heartbeat.getBeginTimeoutTime() < System.currentTimeMillis() - heartbeatSqlExecuteTimeout) { + LOGGER.info("found backend heartbeat connection SQL timeout ,close it " + c); + c.close("heart sql timeout"); + } + } } // clean closed conn or check time out diff --git a/src/main/java/com/actiontech/dble/singleton/SystemParams.java b/src/main/java/com/actiontech/dble/singleton/SystemParams.java index f9c1c82fa..a68f5a9ef 100644 --- a/src/main/java/com/actiontech/dble/singleton/SystemParams.java +++ b/src/main/java/com/actiontech/dble/singleton/SystemParams.java @@ -95,6 +95,7 @@ public final class SystemParams { readOnlyParams.add(new ParamInfo("checkTableConsistencyPeriod", sysConfig.getCheckTableConsistencyPeriod() + "ms", "The period of consistency tableStructure check. The default value is 1800000ms(means 30minutes=30*60*1000)")); readOnlyParams.add(new ParamInfo("processorCheckPeriod", sysConfig.getProcessorCheckPeriod() + "ms", "The period between the jobs for cleaning the closed or overtime connections. The default is 1000ms")); readOnlyParams.add(new ParamInfo("sqlExecuteTimeout", sysConfig.getSqlExecuteTimeout() + "s", "The max query executing time.If time out,the connection will be closed. The default is 300 seconds")); + readOnlyParams.add(new ParamInfo("heartbeatSqlExecuteTimeout", sysConfig.getHeartbeatSqlExecuteTimeout() + "s", "The max heartbeat query executing time.If time out,the connection will be closed. The default is 10 seconds.set 0 to disable it.")); readOnlyParams.add(new ParamInfo("recordTxn", sysConfig.getRecordTxn() + "", "Whether the transaction be recorded as a file, the default value is 0")); readOnlyParams.add(new ParamInfo("transactionLogBaseDir", sysConfig.getTransactionLogBaseDir(), "The directory of the transaction record file, the default value is ./txlogs/")); readOnlyParams.add(new ParamInfo("transactionLogBaseName", sysConfig.getTransactionLogBaseName(), "The name of the transaction record file. The default value is server-tx"));