inner-1956:fix MultiNodeSelectHandler hang (#3461)

Signed-off-by: dcy10000 <dcy10000@gmail.com>
This commit is contained in:
Rico
2022-11-15 10:03:04 +08:00
committed by dcy10000
parent 78c3f64bf0
commit 5addfa888b
4 changed files with 25 additions and 3 deletions

View File

@@ -34,6 +34,7 @@ public abstract class MultiNodeHandler implements ResponseHandler {
protected Set<RouteResultsetNode> unResponseRrns = Sets.newConcurrentHashSet();
protected int errorConnsCnt = 0;
protected boolean firstResponsed = false;
protected boolean complexQuery = false;
public MultiNodeHandler(NonBlockingSession session) {
if (session == null) {

View File

@@ -171,6 +171,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
}
conn.getBackendService().setResponseHandler(this);
conn.getBackendService().setSession(session);
conn.getBackendService().setComplexQuery(complexQuery);
conn.getBackendService().executeMultiNode(node, session.getShardingService(), sessionAutocommit && !session.getShardingService().isTxStart() && !node.isModifySQL());
}

View File

@@ -51,6 +51,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
public MultiNodeSelectHandler(RouteResultset rrs, NonBlockingSession session) {
super(rrs, session, false);
this.complexQuery = true;
this.queueSize = SystemConfig.getInstance().getMergeQueueSize();
this.queues = new ConcurrentHashMap<>();
if (CollectionUtil.isEmpty(rrs.getSelectCols())) {
@@ -213,7 +214,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
}
while (!heap.isEmpty()) {
if (isFail())
return;
break;
HeapItem top = heap.peak();
if (top.isNullItem()) {
heap.poll();
@@ -256,7 +257,9 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
}
QueryResultDispatcher.doSqlStat(rrs, session, selectRows, netOutBytes, resultSize);
assert service != null;
nextHandler.rowEofResponse(null, false, service);
if (!isFail()) {
nextHandler.rowEofResponse(null, false, service);
}
} catch (MySQLOutPutException e) {
String msg = e.getLocalizedMessage();
LOGGER.info(msg, e);

View File

@@ -6,6 +6,7 @@
package com.actiontech.dble.services.mysqlauthenticate;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.heartbeat.HeartbeatSQLJob;
import com.actiontech.dble.backend.mysql.CharsetUtil;
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
import com.actiontech.dble.backend.pool.PooledConnectionListener;
@@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.Executor;
import static com.actiontech.dble.config.ErrorCode.ER_ACCESS_DENIED_ERROR;
@@ -46,9 +48,13 @@ public class MySQLBackAuthService extends BackendService implements AuthService
private volatile boolean authSwitchMore;
private volatile PluginName pluginName;
private volatile long serverCapabilities;
private volatile boolean highPriority = false;
public MySQLBackAuthService(BackendConnection connection, String user, String schema, String passwd, PooledConnectionListener listener, ResponseHandler handler) {
super(connection);
if (handler instanceof HeartbeatSQLJob) {
highPriority = true;
}
this.user = user;
this.schema = schema;
this.passwd = passwd;
@@ -59,6 +65,9 @@ public class MySQLBackAuthService extends BackendService implements AuthService
// only for com_change_user
public MySQLBackAuthService(BackendConnection connection, String user, String passwd, ResponseHandler handler) {
super(connection);
if (handler instanceof HeartbeatSQLJob) {
highPriority = true;
}
this.user = user;
this.passwd = passwd;
this.handler = handler;
@@ -294,8 +303,16 @@ public class MySQLBackAuthService extends BackendService implements AuthService
}
protected boolean isSupportFlowControl() {
return false;
}
@Override
protected Executor getExecutor() {
if (highPriority) {
return DbleServer.getInstance().getComplexQueryExecutor();
} else {
return DbleServer.getInstance().getBackendExecutor();
}
}
}