diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/HeartbeatSQLJob.java b/src/main/java/com/actiontech/dble/backend/heartbeat/HeartbeatSQLJob.java index 90724427f..8124f949e 100644 --- a/src/main/java/com/actiontech/dble/backend/heartbeat/HeartbeatSQLJob.java +++ b/src/main/java/com/actiontech/dble/backend/heartbeat/HeartbeatSQLJob.java @@ -29,6 +29,11 @@ public class HeartbeatSQLJob implements ResponseHandler { private final String sql; private final SQLJobHandler jobHandler; + /* + * (null, 0) -> initial + * (conn, 1) -> heartbeat + * (null, 2) -> quit + */ private final AtomicStampedReference connectionRef = new AtomicStampedReference<>(null, 0); private final AtomicBoolean finished = new AtomicBoolean(false); private final MySQLHeartbeat heartbeat; @@ -45,13 +50,17 @@ public class HeartbeatSQLJob implements ResponseHandler { } public void terminate() { + if (connectionRef.compareAndSet(null, null, 0, 2)) { + LOGGER.info("[heartbeat]terminate timeout heartbeat job."); + return; + } + final BackendConnection con = this.connectionRef.getReference(); - if (connectionRef.compareAndSet(con, null, 1, 2)) { - if (con != null && !con.isClosed()) { - String errMsg = heartbeat.getMessage() == null ? "heart beat quit" : heartbeat.getMessage(); - LOGGER.info("[heartbeat]terminate this job reason:" + errMsg + " con:" + con + " sql " + this.sql); - con.businessClose("[heartbeat] quit"); - } + connectionRef.set(null, 2); + if (con != null && !con.isClosed()) { + String errMsg = heartbeat.getMessage() == null ? "heart beat quit" : heartbeat.getMessage(); + LOGGER.info("[heartbeat]terminate this job reason:" + errMsg + " con:" + con + " sql " + this.sql); + con.businessClose("[heartbeat] quit"); } } @@ -86,7 +95,7 @@ public class HeartbeatSQLJob implements ResponseHandler { if (conn != null) { try { if (LOGGER.isDebugEnabled()) { - LOGGER.debug("[heartbeat]do heartbeat,conn is " + conn); + LOGGER.debug("[heartbeat]do heartbeat,conn is {}", conn); } if (System.nanoTime() > responseTime + keepAlive) { String reason = "[heartbeat]connect timeout,the connection may be unreachable for a long time due to TCP retransmission"; @@ -122,7 +131,7 @@ public class HeartbeatSQLJob implements ResponseHandler { @Override public void connectionError(Throwable e, Object attachment) { - LOGGER.warn("[heartbeat]can't get connection for sql :" + sql, e); + LOGGER.warn("[heartbeat]can't get connection for sql : {}", sql, e); updateResponseTime(); heartbeat.setErrorResult("heartbeat connection Error"); doFinished(true); @@ -177,7 +186,7 @@ public class HeartbeatSQLJob implements ResponseHandler { @Override public void connectionClose(@NotNull AbstractService service, String reason) { - LOGGER.warn("[heartbeat]conn for sql[" + sql + "] is closed, due to " + reason + ", we will try again immediately"); + LOGGER.warn("[heartbeat]conn for sql[{}] is closed, due to {}, we will try again immediately", sql, reason); updateResponseTime(); if (!heartbeat.doHeartbeatRetry()) { heartbeat.setErrorResult("heartbeat conn for sql[" + sql + "] is closed, due to " + reason);