[inner-2368] fix: when some nodes have been closed during the ddl execution phase, the front side needs to respond with an error.

(cherry picked from commit 2cfcac2850)
This commit is contained in:
wenyh1
2023-10-08 17:20:08 +08:00
parent b87c49893e
commit 28941f413e
2 changed files with 18 additions and 11 deletions

View File

@@ -33,9 +33,7 @@ import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -67,6 +65,7 @@ public abstract class BaseDDLHandler implements ResponseHandler, ExecutableHandl
protected final ReentrantLock lock = new ReentrantLock();
protected HashMap<RouteResultsetNode, Integer> nodeResponseStatus = Maps.newHashMap();
protected Set<MySQLResponseService> closedConnSet = new HashSet<>(1);
protected AtomicBoolean writeToClientFlag = new AtomicBoolean(false);
protected AtomicBoolean specialHandleFlag = new AtomicBoolean(false); // execute special handling only once
protected volatile String errMsg;
@@ -294,7 +293,7 @@ public abstract class BaseDDLHandler implements ResponseHandler, ExecutableHandl
MySQLResponseService responseService = (MySQLResponseService) service;
final RouteResultsetNode node = (RouteResultsetNode) responseService.getAttachment();
if (checkIsAlreadyClosed(node)) return;
if (checkIsAlreadyClosed(node, responseService)) return;
LOGGER.warn("backend connect {}, conn info:{}", closeReason0, service);
DDLTraceHelper.log(session.getShardingService(), d -> d.infoByNode(node.getName(), stage, DDLTraceHelper.Status.fail, closeReason0));
@@ -333,11 +332,15 @@ public abstract class BaseDDLHandler implements ResponseHandler, ExecutableHandl
}
}
protected boolean checkIsAlreadyClosed(final RouteResultsetNode node) {
protected boolean checkIsAlreadyClosed(final RouteResultsetNode node, final MySQLResponseService mysqlResponseService) {
lock.lock();
try {
if (nodeResponseStatus.get(node) == null || nodeResponseStatus.get(node) == STATUS_CONN_CLOSE) return true;
nodeResponseStatus.put(node, STATUS_CONN_CLOSE);
if (closedConnSet.contains(mysqlResponseService)) {
nodeResponseStatus.put(node, STATUS_CONN_CLOSE);
return true;
} else {
closedConnSet.add(mysqlResponseService);
}
session.getTargetMap().remove(node);
return false;
} finally {
@@ -375,6 +378,7 @@ public abstract class BaseDDLHandler implements ResponseHandler, ExecutableHandl
protected void clearResources() {
nodeResponseStatus.clear();
closedConnSet.clear();
}
protected void handleEndPacket(MySQLPacket packet) {

View File

@@ -110,13 +110,16 @@ public class MultiNodeDdlPrepareHandler extends BaseDDLHandler {
}
@Override
protected boolean checkIsAlreadyClosed(final RouteResultsetNode node) {
protected boolean checkIsAlreadyClosed(final RouteResultsetNode node, final MySQLResponseService mysqlResponseService) {
lock.lock();
try {
if (finishedTest) return true;
if (nodeResponseStatus.get(node) == null || nodeResponseStatus.get(node) == STATUS_CONN_CLOSE) return true;
nodeResponseStatus.put(node, STATUS_CONN_CLOSE);
session.getTargetMap().remove(node);
if (closedConnSet.contains(mysqlResponseService)) {
nodeResponseStatus.put(node, STATUS_CONN_CLOSE);
return true;
} else {
closedConnSet.add(mysqlResponseService);
}
return false;
} finally {
lock.unlock();