Merge pull request #3822 from actiontech/inner-2358

[inner-2358] manager side uses managerFrontHandlerQueue when authenticating
This commit is contained in:
wenyh
2023-09-14 17:29:53 +08:00
committed by GitHub
3 changed files with 56 additions and 7 deletions

View File

@@ -9,7 +9,7 @@ package com.actiontech.dble.services.factorys;
import com.actiontech.dble.net.SocketWR;
import com.actiontech.dble.net.connection.FrontendConnection;
import com.actiontech.dble.net.factory.FrontendConnectionFactory;
import com.actiontech.dble.services.mysqlauthenticate.MySQLFrontAuthService;
import com.actiontech.dble.services.mysqlauthenticate.MySQLManagerFrontAuthService;
import java.io.IOException;
import java.nio.channels.NetworkChannel;
@@ -19,7 +19,7 @@ public class ManagerConnectionFactory extends FrontendConnectionFactory {
@Override
protected FrontendConnection getConnection(NetworkChannel channel, SocketWR socketWR) throws IOException {
FrontendConnection c = new FrontendConnection(channel, socketWR, true);
c.setService(new MySQLFrontAuthService(c));
c.setService(new MySQLManagerFrontAuthService(c));
return c;
}
}

View File

@@ -0,0 +1,33 @@
/*
* Copyright (C) 2016-2023 ActionTech.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.services.mysqlauthenticate;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.net.connection.AbstractConnection;
import com.actiontech.dble.net.service.NotificationServiceTask;
import com.actiontech.dble.net.service.ServiceTask;
/**
* Created by szf on 2020/6/18.
*/
public class MySQLManagerFrontAuthService extends MySQLFrontAuthService {
public MySQLManagerFrontAuthService(AbstractConnection connection) {
super(connection);
}
@Override
public void handle(ServiceTask task) {
beforeInsertServiceTask(task);
task.setTaskId(taskId.getAndIncrement());
DbleServer.getInstance().getManagerFrontHandlerQueue().offer(task);
}
public void notifyTaskThread() {
DbleServer.getInstance().getManagerFrontHandlerQueue().offerFirst(new NotificationServiceTask(this));
}
}

View File

@@ -118,7 +118,7 @@ public final class ThreadManager {
throw new Exception("The recover operation of threadPool[" + threadName + "] is not supported");
}
} else {
throw new Exception("The recover operation of threadPool[" + threadName + "] is not supported");
throw new Exception("The recover operation of thread[" + threadName + "] is not supported");
}
}
@@ -157,13 +157,21 @@ public final class ThreadManager {
XaCheckHandler.stopXaIdCheckPeriod();
LOGGER.info("manual shutdown threadPool[{}] ... end ...", TIMER_SCHEDULER_WORKER_NAME);
break;
case FRONT_WORKER_NAME:
case FRONT_MANAGER_WORKER_NAME:
case BACKEND_WORKER_NAME:
case WRITE_TO_BACKEND_WORKER_NAME:
case COMPLEX_QUERY_EXECUTOR_NAME:
case NIO_FRONT_RW:
case NIO_BACKEND_RW:
throw new Exception("The recover operation of threadPool[" + threadPoolName + "] is not supported");
default:
throw new Exception("The shutdown operation of thread[" + TIMER_SCHEDULER_WORKER_NAME + "] is not supported");
throw new Exception("The threadPool[" + threadPoolName + "] does not exist");
}
}
public static void recoverThreadPool(String threadName) throws Exception {
switch (threadName) {
public static void recoverThreadPool(String threadPoolName) throws Exception {
switch (threadPoolName) {
case TIMER_WORKER_NAME:
if (!DbleServer.getInstance().getTimerExecutor().isShutdown()) {
throw new Exception("threadPool[" + TIMER_WORKER_NAME + "] is not shutdown, no need to recover");
@@ -199,8 +207,16 @@ public final class ThreadManager {
XaCheckHandler.startXaIdCheckPeriod();
LOGGER.info("manual recover threadPool[{}] ... end ...", TIMER_SCHEDULER_WORKER_NAME);
break;
case FRONT_WORKER_NAME:
case FRONT_MANAGER_WORKER_NAME:
case BACKEND_WORKER_NAME:
case WRITE_TO_BACKEND_WORKER_NAME:
case COMPLEX_QUERY_EXECUTOR_NAME:
case NIO_FRONT_RW:
case NIO_BACKEND_RW:
throw new Exception("The recover operation of threadPool[" + threadPoolName + "] is not supported");
default:
throw new Exception("The recover operation of threadPool[" + threadName + "] is not supported");
throw new Exception("The threadPool[" + threadPoolName + "] does not exist");
}
}