inner-1892:fix login operation consume thread pool (#3370)

Signed-off-by: dcy <dcy10000@gmail.com>

(cherry picked from commit 993df28f94)
Signed-off-by: dcy <dcy10000@gmail.com>
This commit is contained in:
Rico
2022-08-23 17:23:16 +08:00
committed by dcy
parent 7b1a7e7b93
commit 7e9c308a83
3 changed files with 47 additions and 11 deletions
@@ -16,4 +16,8 @@ public final class GeneralProvider {
public static void showGeneralLog() {
}
public static void beforeAuthSuccess() {
}
}
@@ -2,6 +2,7 @@ package com.actiontech.dble.services.mysqlauthenticate;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.mysql.CharsetUtil;
import com.actiontech.dble.btrace.provider.GeneralProvider;
import com.actiontech.dble.config.Capabilities;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.config.Versions;
@@ -23,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.Future;
import static com.actiontech.dble.services.mysqlauthenticate.PluginName.caching_sha2_password;
import static com.actiontech.dble.services.mysqlauthenticate.PluginName.mysql_native_password;
@@ -41,6 +43,7 @@ public class MySQLFrontAuthService extends FrontendService implements AuthServic
private volatile AuthPacket authPacket;
private volatile boolean needAuthSwitched;
private volatile PluginName pluginName;
private volatile Future<?> asyncLogin;
public MySQLFrontAuthService(AbstractConnection connection) {
super(connection);
@@ -66,20 +69,34 @@ public class MySQLFrontAuthService extends FrontendService implements AuthServic
pingResponse();
return;
}
if (needAuthSwitched) {
handleSwitchResponse(data);
} else {
handleAuthPacket(data);
}
} catch (Exception e) {
LOGGER.error("illegal auth packet {}", data, e);
writeErrMessage(ErrorCode.ER_ACCESS_DENIED_ERROR, "illegal auth packet, the detail error message is " + e.getMessage());
connection.close("illegal auth packet");
} finally {
TraceManager.finishSpan(this, traceObject);
return;
}
asyncLogin = DbleServer.getInstance().getComplexQueryExecutor().submit(() -> {
try {
GeneralProvider.beforeAuthSuccess();
if (needAuthSwitched) {
handleSwitchResponse(data);
} else {
handleAuthPacket(data);
}
} catch (Exception e) {
if (e.getCause() != null && e.getCause() instanceof InterruptedException) {
// print nothing
} else {
LOGGER.error("illegal auth {}", data, e);
writeErrMessage(ErrorCode.ER_ACCESS_DENIED_ERROR, "illegal auth , the detail error message is " + e.getMessage());
connection.close("illegal auth");
}
} finally {
TraceManager.finishSpan(this, traceObject);
}
});
}
private void pingResponse() {
@@ -269,4 +286,14 @@ public class MySQLFrontAuthService extends FrontendService implements AuthServic
public boolean haveNotReceivedMessage() {
return !receivedMessage;
}
@Override
public void cleanup() {
final Future<?> loginFuture = this.asyncLogin;
if (loginFuture != null && !loginFuture.isDone()) {
loginFuture.cancel(true);
this.asyncLogin = null;
}
super.cleanup();
}
}
@@ -24,6 +24,7 @@ public class MysqlDatabaseHandler {
private Set<String> databases = new HashSet<>();
private final Condition finishCond = lock.newCondition();
private boolean isFinish = false;
private SQLJob sqlJob;
public MysqlDatabaseHandler(Map<String, PhysicalDbGroup> dbGroups) {
this.dbGroups = dbGroups;
@@ -40,7 +41,7 @@ public class MysqlDatabaseHandler {
MultiRowSQLQueryResultHandler resultHandler = new MultiRowSQLQueryResultHandler(new String[]{mysqlShowDataBasesCols}, new MySQLShowDatabasesListener(mysqlShowDataBasesCols));
PhysicalDbInstance ds = getPhysicalDbInstance(dbGroupName);
if (ds != null) {
SQLJob sqlJob = new SQLJob(MYSQL_SHOW_DATABASES, null, resultHandler, ds);
sqlJob = new SQLJob(MYSQL_SHOW_DATABASES, null, resultHandler, ds);
sqlJob.run();
waitDone();
} else {
@@ -55,7 +56,7 @@ public class MysqlDatabaseHandler {
String mysqlShowDataBasesCols = "Database";
MultiRowSQLQueryResultHandler resultHandler = new MultiRowSQLQueryResultHandler(new String[]{mysqlShowDataBasesCols}, new MySQLShowDatabasesListener(mysqlShowDataBasesCols));
if (ds != null) {
OneTimeConnJob sqlJob = new OneTimeConnJob(MYSQL_SHOW_DATABASES, null, resultHandler, ds);
sqlJob = new OneTimeConnJob(MYSQL_SHOW_DATABASES, null, resultHandler, ds);
sqlJob.run();
waitDone();
} else {
@@ -97,6 +98,10 @@ public class MysqlDatabaseHandler {
}
} catch (InterruptedException e) {
LOGGER.info("[MysqlDatabaseHandler] conn Interrupted: " + e);
if (sqlJob != null) {
sqlJob.terminate("thread interrupted");
}
throw new IllegalStateException(e);
} finally {
lock.unlock();
}