inner 1927 - avoid timeout heartbeat connection remaining

This commit is contained in:
baofengqi
2022-11-14 13:35:52 +08:00
parent bbc94d27bf
commit b33070768b
2 changed files with 69 additions and 52 deletions
@@ -18,6 +18,7 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicStampedReference;
public class HeartbeatSQLJob implements ResponseHandler {
@@ -25,9 +26,14 @@ public class HeartbeatSQLJob implements ResponseHandler {
private final String sql;
private final SQLJobHandler jobHandler;
private BackendConnection connection;
private AtomicBoolean finished = new AtomicBoolean(false);
private MySQLHeartbeat heartbeat;
/*
* (null, 0) -> initial
* (conn, 1) -> heartbeat
* (null, 2) -> quit
*/
private final AtomicStampedReference<BackendConnection> connectionRef = new AtomicStampedReference<>(null, 0);
private final AtomicBoolean finished = new AtomicBoolean(false);
private final MySQLHeartbeat heartbeat;
public HeartbeatSQLJob(MySQLHeartbeat heartbeat, SQLJobHandler jobHandler) {
super();
@@ -37,16 +43,29 @@ public class HeartbeatSQLJob implements ResponseHandler {
}
public void terminate() {
if (connection != null && !connection.isClosed()) {
if (connectionRef.compareAndSet(null, null, 0, 2)) {
LOGGER.info("[heartbeat]terminate timeout heartbeat job.");
return;
}
final BackendConnection con = this.connectionRef.getReference();
connectionRef.set(null, 2);
if (con != null && !con.isClosed()) {
String errMsg = heartbeat.getMessage() == null ? "heart beat quit" : heartbeat.getMessage();
LOGGER.info("[heartbeat]terminate this job reason:" + errMsg + " con:" + connection + " sql " + this.sql);
connection.businessClose("[heartbeat] quit");
LOGGER.info("[heartbeat]terminate this job reason:" + errMsg + " con:" + con + " sql " + this.sql);
con.businessClose("[heartbeat] quit");
}
}
@Override
public void connectionAcquired(final BackendConnection conn) {
this.connection = conn;
if (!connectionRef.compareAndSet(null, conn, 0, 1)) {
String errMsg = "[heartbeat]timeout connection[id=" + conn.getId() + "] is acquired, but the conn is useless.";
LOGGER.info(errMsg);
conn.businessClose(errMsg);
return;
}
conn.getBackendService().setResponseHandler(this);
conn.getBackendService().setComplexQuery(true);
try {
@@ -64,21 +83,28 @@ public class HeartbeatSQLJob implements ResponseHandler {
public void execute() {
// reset
finished.set(false);
if (connection == null) {
LOGGER.warn("[heartbeat]connect timeout,please pay attention to network latency or packet loss.");
heartbeat.setErrorResult("connect timeout");
doFinished(true);
} else {
final BackendConnection conn = connectionRef.getReference();
if (conn != null) {
try {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[heartbeat]do heartbeat,conn is " + connection);
LOGGER.debug("[heartbeat]do heartbeat,conn is {}", conn);
}
connection.getBackendService().query(sql);
conn.getBackendService().query(sql);
} catch (Exception e) { // (UnsupportedEncodingException e) {
LOGGER.warn("[heartbeat]send heartbeat error", e);
heartbeat.setErrorResult("send heartbeat error, because of [" + e.getMessage() + "]");
doFinished(true);
}
return;
}
// heartbeat connection had been closed
if (connectionRef.getStamp() == 2) {
LOGGER.info("[heartbeat]connection had been closed.");
} else {
LOGGER.warn("[heartbeat]connect timeout,please pay attention to network latency or packet loss.");
heartbeat.setErrorResult("connect timeout");
doFinished(true);
}
}
@@ -90,7 +116,7 @@ public class HeartbeatSQLJob implements ResponseHandler {
@Override
public void connectionError(Throwable e, Object attachment) {
LOGGER.warn("[heartbeat]can't get connection for sql :" + sql, e);
LOGGER.warn("[heartbeat]can't get connection for sql : {}", sql, e);
heartbeat.setErrorResult("heartbeat connection Error");
doFinished(true);
}
@@ -103,7 +129,6 @@ public class HeartbeatSQLJob implements ResponseHandler {
LOGGER.warn("[heartbeat]error response errNo: {}, {} from of sql: {} at con: {} db user = {}",
errPg.getErrNo(), new String(errPg.getMessage()), sql, service,
responseService.getConnection().getInstance().getConfig().getUser());
heartbeat.setErrorResult(new String(errPg.getMessage()));
if (!((MySQLResponseService) service).syncAndExecute()) {
service.getConnection().businessClose("[heartbeat]unfinished sync");
@@ -140,7 +165,7 @@ public class HeartbeatSQLJob implements ResponseHandler {
@Override
public void connectionClose(AbstractService service, String reason) {
LOGGER.warn("[heartbeat]conn for sql[" + sql + "] is closed, due to " + reason + ", we will try again immediately");
LOGGER.warn("[heartbeat]conn for sql[{}] is closed, due to {}, we will try again immediately", sql, reason);
if (!heartbeat.doHeartbeatRetry()) {
heartbeat.setErrorResult("heartbeat conn for sql[" + sql + "] is closed, due to " + reason);
doFinished(true);
@@ -149,7 +174,10 @@ public class HeartbeatSQLJob implements ResponseHandler {
@Override
public String toString() {
return "HeartbeatSQLJob [sql=" + sql + ", jobHandler=" + jobHandler + ", backend conn" + connection + "]";
return "HeartbeatSQLJob [sql=" + sql + ", isQuit=" + isQuit() + ", jobHandler=" + jobHandler + ", backend conn" + connectionRef.getReference() + "]";
}
public boolean isQuit() {
return connectionRef.getStamp() == 2;
}
}
@@ -21,7 +21,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author mycat
@@ -40,15 +39,32 @@ public class MySQLDetector implements SQLQueryResultListener<SQLQueryResult<Map<
"Last_IO_Error"};
private static final String[] MYSQL_READ_ONLY_COLS = new String[]{"@@read_only"};
private final AtomicBoolean isQuit;
private MySQLHeartbeat heartbeat;
private final MySQLHeartbeat heartbeat;
private volatile long lastSendQryTime;
private volatile long lastReceivedQryTime;
private volatile HeartbeatSQLJob sqlJob;
private final HeartbeatSQLJob sqlJob;
public MySQLDetector(MySQLHeartbeat heartbeat) {
this.heartbeat = heartbeat;
this.isQuit = new AtomicBoolean(false);
String[] fetchCols = {};
if (heartbeat.getSource().getDbGroupConfig().isShowSlaveSql()) {
fetchCols = MYSQL_SLAVE_STATUS_COLS;
} else if (heartbeat.getSource().getDbGroupConfig().isSelectReadOnlySql()) {
fetchCols = MYSQL_READ_ONLY_COLS;
}
OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(fetchCols, this);
this.sqlJob = new HeartbeatSQLJob(heartbeat, resultHandler);
}
public void heartbeat() {
if (lastSendQryTime <= 0) {
lastSendQryTime = System.currentTimeMillis();
heartbeat.getSource().createConnectionSkipPool(null, sqlJob);
} else {
lastSendQryTime = System.currentTimeMillis();
sqlJob.execute();
}
}
boolean isHeartbeatTimeout() {
@@ -59,32 +75,12 @@ public class MySQLDetector implements SQLQueryResultListener<SQLQueryResult<Map<
return lastReceivedQryTime;
}
public void heartbeat() {
lastSendQryTime = System.currentTimeMillis();
if (sqlJob == null) {
String[] fetchCols = {};
if (heartbeat.getSource().getDbGroupConfig().isShowSlaveSql()) {
fetchCols = MYSQL_SLAVE_STATUS_COLS;
} else if (heartbeat.getSource().getDbGroupConfig().isSelectReadOnlySql()) {
fetchCols = MYSQL_READ_ONLY_COLS;
}
OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(fetchCols, this);
sqlJob = new HeartbeatSQLJob(heartbeat, resultHandler);
heartbeat.getSource().createConnectionSkipPool(null, sqlJob);
} else {
sqlJob.execute();
}
}
public void quit() {
if (isQuit.compareAndSet(false, true)) {
close();
}
sqlJob.terminate();
}
public boolean isQuit() {
return isQuit.get();
return sqlJob.isQuit();
}
@Override
@@ -219,11 +215,4 @@ public class MySQLDetector implements SQLQueryResultListener<SQLQueryResult<Map<
heartbeat.setResult(MySQLHeartbeatStatus.OK);
}
public void close() {
HeartbeatSQLJob curJob = sqlJob;
if (curJob != null) {
curJob.terminate();
sqlJob = null;
}
}
}