diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeHandler.java index 4229916aa..ad791603a 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeHandler.java @@ -34,6 +34,7 @@ public abstract class MultiNodeHandler implements ResponseHandler { protected Set unResponseRrns = Sets.newConcurrentHashSet(); protected int errorConnsCnt = 0; protected boolean firstResponsed = false; + protected boolean complexQuery = false; public MultiNodeHandler(NonBlockingSession session) { if (session == null) { diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java index 861b463a5..ea34729b5 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java @@ -171,6 +171,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR } conn.getBackendService().setResponseHandler(this); conn.getBackendService().setSession(session); + conn.getBackendService().setComplexQuery(complexQuery); conn.getBackendService().executeMultiNode(node, session.getShardingService(), sessionAutocommit && !session.getShardingService().isTxStart() && !node.isModifySQL()); } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java index f09e1d53e..debcc9bf7 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java @@ -51,6 +51,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler { public MultiNodeSelectHandler(RouteResultset rrs, NonBlockingSession session) { super(rrs, session, false); + this.complexQuery = true; this.queueSize = SystemConfig.getInstance().getMergeQueueSize(); this.queues = new ConcurrentHashMap<>(); if (CollectionUtil.isEmpty(rrs.getSelectCols())) { @@ -213,7 +214,7 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler { } while (!heap.isEmpty()) { if (isFail()) - return; + break; HeapItem top = heap.peak(); if (top.isNullItem()) { heap.poll(); @@ -256,7 +257,9 @@ public class MultiNodeSelectHandler extends MultiNodeQueryHandler { } QueryResultDispatcher.doSqlStat(rrs, session, selectRows, netOutBytes, resultSize); assert service != null; - nextHandler.rowEofResponse(null, false, service); + if (!isFail()) { + nextHandler.rowEofResponse(null, false, service); + } } catch (MySQLOutPutException e) { String msg = e.getLocalizedMessage(); LOGGER.info(msg, e); diff --git a/src/main/java/com/actiontech/dble/services/mysqlauthenticate/MySQLBackAuthService.java b/src/main/java/com/actiontech/dble/services/mysqlauthenticate/MySQLBackAuthService.java index 7e138e3f6..a2c636f8e 100644 --- a/src/main/java/com/actiontech/dble/services/mysqlauthenticate/MySQLBackAuthService.java +++ b/src/main/java/com/actiontech/dble/services/mysqlauthenticate/MySQLBackAuthService.java @@ -6,6 +6,7 @@ package com.actiontech.dble.services.mysqlauthenticate; import com.actiontech.dble.DbleServer; +import com.actiontech.dble.backend.heartbeat.HeartbeatSQLJob; import com.actiontech.dble.backend.mysql.CharsetUtil; import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler; import com.actiontech.dble.backend.pool.PooledConnectionListener; @@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.security.NoSuchAlgorithmException; +import java.util.concurrent.Executor; import static com.actiontech.dble.config.ErrorCode.ER_ACCESS_DENIED_ERROR; @@ -46,9 +48,13 @@ public class MySQLBackAuthService extends BackendService implements AuthService private volatile boolean authSwitchMore; private volatile PluginName pluginName; private volatile long serverCapabilities; + private volatile boolean highPriority = false; public MySQLBackAuthService(BackendConnection connection, String user, String schema, String passwd, PooledConnectionListener listener, ResponseHandler handler) { super(connection); + if (handler instanceof HeartbeatSQLJob) { + highPriority = true; + } this.user = user; this.schema = schema; this.passwd = passwd; @@ -59,6 +65,9 @@ public class MySQLBackAuthService extends BackendService implements AuthService // only for com_change_user public MySQLBackAuthService(BackendConnection connection, String user, String passwd, ResponseHandler handler) { super(connection); + if (handler instanceof HeartbeatSQLJob) { + highPriority = true; + } this.user = user; this.passwd = passwd; this.handler = handler; @@ -294,8 +303,16 @@ public class MySQLBackAuthService extends BackendService implements AuthService } - protected boolean isSupportFlowControl() { return false; } + + @Override + protected Executor getExecutor() { + if (highPriority) { + return DbleServer.getInstance().getComplexQueryExecutor(); + } else { + return DbleServer.getInstance().getBackendExecutor(); + } + } }