inner 1927 - fix initial status to quit status (#3455)

This commit is contained in:
Collapsar
2022-11-02 12:59:40 +08:00
committed by GitHub
parent d2512c96ae
commit 644971772d

View File

@@ -29,6 +29,11 @@ public class HeartbeatSQLJob implements ResponseHandler {
private final String sql;
private final SQLJobHandler jobHandler;
/*
* (null, 0) -> initial
* (conn, 1) -> heartbeat
* (null, 2) -> quit
*/
private final AtomicStampedReference<BackendConnection> connectionRef = new AtomicStampedReference<>(null, 0);
private final AtomicBoolean finished = new AtomicBoolean(false);
private final MySQLHeartbeat heartbeat;
@@ -45,13 +50,17 @@ public class HeartbeatSQLJob implements ResponseHandler {
}
public void terminate() {
if (connectionRef.compareAndSet(null, null, 0, 2)) {
LOGGER.info("[heartbeat]terminate timeout heartbeat job.");
return;
}
final BackendConnection con = this.connectionRef.getReference();
if (connectionRef.compareAndSet(con, null, 1, 2)) {
if (con != null && !con.isClosed()) {
String errMsg = heartbeat.getMessage() == null ? "heart beat quit" : heartbeat.getMessage();
LOGGER.info("[heartbeat]terminate this job reason:" + errMsg + " con:" + con + " sql " + this.sql);
con.businessClose("[heartbeat] quit");
}
connectionRef.set(null, 2);
if (con != null && !con.isClosed()) {
String errMsg = heartbeat.getMessage() == null ? "heart beat quit" : heartbeat.getMessage();
LOGGER.info("[heartbeat]terminate this job reason:" + errMsg + " con:" + con + " sql " + this.sql);
con.businessClose("[heartbeat] quit");
}
}
@@ -86,7 +95,7 @@ public class HeartbeatSQLJob implements ResponseHandler {
if (conn != null) {
try {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[heartbeat]do heartbeat,conn is " + conn);
LOGGER.debug("[heartbeat]do heartbeat,conn is {}", conn);
}
if (System.nanoTime() > responseTime + keepAlive) {
String reason = "[heartbeat]connect timeoutthe connection may be unreachable for a long time due to TCP retransmission";
@@ -122,7 +131,7 @@ public class HeartbeatSQLJob implements ResponseHandler {
@Override
public void connectionError(Throwable e, Object attachment) {
LOGGER.warn("[heartbeat]can't get connection for sql :" + sql, e);
LOGGER.warn("[heartbeat]can't get connection for sql : {}", sql, e);
updateResponseTime();
heartbeat.setErrorResult("heartbeat connection Error");
doFinished(true);
@@ -177,7 +186,7 @@ public class HeartbeatSQLJob implements ResponseHandler {
@Override
public void connectionClose(@NotNull AbstractService service, String reason) {
LOGGER.warn("[heartbeat]conn for sql[" + sql + "] is closed, due to " + reason + ", we will try again immediately");
LOGGER.warn("[heartbeat]conn for sql[{}] is closed, due to {}, we will try again immediately", sql, reason);
updateResponseTime();
if (!heartbeat.doHeartbeatRetry()) {
heartbeat.setErrorResult("heartbeat conn for sql[" + sql + "] is closed, due to " + reason);