inner-1956:fix MultiNodeSelectHandler hang (#3461)

Signed-off-by: dcy10000 <dcy10000@gmail.com>
This commit is contained in:
Rico
2022-11-15 10:03:04 +08:00
committed by dcy10000
parent 6b3b266e8a
commit 3a05c22799
4 changed files with 26 additions and 2 deletions

View File

@@ -34,6 +34,7 @@ public abstract class MultiNodeHandler implements ResponseHandler {
protected Set<RouteResultsetNode> unResponseRrns = Sets.newConcurrentHashSet();
protected int errorConnsCnt = 0;
protected boolean firstResponsed = false;
protected boolean complexQuery = false;
public MultiNodeHandler(NonBlockingSession session) {
if (session == null) {

View File

@@ -177,6 +177,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
}
conn.getBackendService().setResponseHandler(this);
conn.getBackendService().setSession(session);
conn.getBackendService().setComplexQuery(complexQuery);
conn.getBackendService().executeMultiNode(node, session.getShardingService(), sessionAutocommit && !session.getShardingService().isTxStart() && !node.isModifySQL());
}

View File

@@ -50,6 +50,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
public MultiNodeSelectHandler(RouteResultset rrs, NonBlockingSession session) {
super(rrs, session, false);
this.complexQuery = true;
this.queueSize = SystemConfig.getInstance().getMergeQueueSize();
this.queues = new ConcurrentHashMap<>();
if (CollectionUtil.isEmpty(rrs.getSelectCols())) {
@@ -212,7 +213,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
}
while (!heap.isEmpty()) {
if (isFail())
return;
break;
HeapItem top = heap.peak();
if (top.isNullItem()) {
heap.poll();
@@ -255,7 +256,9 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
}
doSqlStat();
assert service != null;
nextHandler.rowEofResponse(null, false, service);
if (!isFail()) {
nextHandler.rowEofResponse(null, false, service);
}
} catch (MySQLOutPutException e) {
String msg = e.getLocalizedMessage();
LOGGER.info(msg, e);

View File

@@ -1,6 +1,7 @@
package com.actiontech.dble.services.mysqlauthenticate;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.heartbeat.HeartbeatSQLJob;
import com.actiontech.dble.backend.mysql.CharsetUtil;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import com.actiontech.dble.backend.pool.PooledConnectionListener;
@@ -22,6 +23,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.Executor;
import static com.actiontech.dble.config.ErrorCode.ER_ACCESS_DENIED_ERROR;
@@ -41,9 +43,13 @@ public class MySQLBackAuthService extends BackendService implements AuthService
private volatile boolean authSwitchMore;
private volatile PluginName pluginName;
private volatile long serverCapabilities;
private volatile boolean highPriority = false;
public MySQLBackAuthService(BackendConnection connection, String user, String schema, String passwd, PooledConnectionListener listener, ResponseHandler handler) {
super(connection);
if (handler instanceof HeartbeatSQLJob) {
highPriority = true;
}
this.user = user;
this.schema = schema;
this.passwd = passwd;
@@ -54,6 +60,9 @@ public class MySQLBackAuthService extends BackendService implements AuthService
// only for com_change_user
public MySQLBackAuthService(BackendConnection connection, String user, String passwd, ResponseHandler handler) {
super(connection);
if (handler instanceof HeartbeatSQLJob) {
highPriority = true;
}
this.user = user;
this.passwd = passwd;
this.handler = handler;
@@ -287,4 +296,14 @@ public class MySQLBackAuthService extends BackendService implements AuthService
public boolean haveNotReceivedMessage() {
throw new UnsupportedOperationException();
}
@Override
protected Executor getExecutor() {
if (highPriority) {
return DbleServer.getInstance().getComplexQueryExecutor();
} else {
return DbleServer.getInstance().getBackendExecutor();
}
}
}