inner-1892:fix login operation consume thread pool (#3370)

* inner-1892:fix login operation consume thread pool

Signed-off-by: dcy <dcy10000@gmail.com>

* inner-1892:fix login operation consume thread pool

Signed-off-by: dcy <dcy10000@gmail.com>

Signed-off-by: dcy <dcy10000@gmail.com>
This commit is contained in:
Rico
2022-08-23 17:23:16 +08:00
committed by GitHub
parent 98df96ad8d
commit 993df28f94
2 changed files with 42 additions and 13 deletions

View File

@@ -25,6 +25,7 @@ public class ShowDatabaseHandler {
private boolean isFinish = false;
private String showDatabases = "show databases";
private String showDataBasesCols;
private SQLJob sqlJob;
public ShowDatabaseHandler(Map<String, PhysicalDbGroup> dbGroups, String showDataBasesCols) {
@@ -42,7 +43,7 @@ public class ShowDatabaseHandler {
MultiRowSQLQueryResultHandler resultHandler = new MultiRowSQLQueryResultHandler(new String[]{showDataBasesCols}, new ShowDatabasesListener(showDataBasesCols));
PhysicalDbInstance ds = getPhysicalDbInstance(dbGroupName);
if (ds != null) {
SQLJob sqlJob = new SQLJob(showDatabases, null, resultHandler, ds);
sqlJob = new SQLJob(showDatabases, null, resultHandler, ds);
sqlJob.run();
waitDone();
} else {
@@ -57,7 +58,7 @@ public class ShowDatabaseHandler {
reset();
MultiRowSQLQueryResultHandler resultHandler = new MultiRowSQLQueryResultHandler(new String[]{showDataBasesCols}, new ShowDatabasesListener(showDataBasesCols));
if (ds != null) {
OneTimeConnJob sqlJob = new OneTimeConnJob(showDatabases, null, resultHandler, ds);
sqlJob = new OneTimeConnJob(showDatabases, null, resultHandler, ds);
sqlJob.run();
waitDone();
} else {
@@ -98,7 +99,11 @@ public class ShowDatabaseHandler {
finishCond.await();
}
} catch (InterruptedException e) {
LOGGER.info("[ClickHouseDatabaseHandler] conn Interrupted: " + e);
LOGGER.info("[MysqlDatabaseHandler] conn Interrupted: " + e);
if (sqlJob != null) {
sqlJob.terminate("thread interrupted");
}
throw new IllegalStateException(e);
} finally {
lock.unlock();
}

View File

@@ -28,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.Future;
import static com.actiontech.dble.services.mysqlauthenticate.PluginName.caching_sha2_password;
import static com.actiontech.dble.services.mysqlauthenticate.PluginName.mysql_native_password;
@@ -46,6 +47,7 @@ public class MySQLFrontAuthService extends FrontendService implements AuthServic
private volatile AuthPacket authPacket;
private volatile boolean needAuthSwitched;
private volatile PluginName pluginName;
private volatile Future<?> asyncLogin;
public MySQLFrontAuthService(AbstractConnection connection) {
super(connection);
@@ -87,21 +89,34 @@ public class MySQLFrontAuthService extends FrontendService implements AuthServic
pingResponse();
return;
}
GeneralProvider.beforeAuthSuccess();
if (needAuthSwitched) {
handleSwitchResponse(data);
} else {
handleAuthPacket(data);
}
} catch (Exception e) {
LOGGER.error("illegal auth packet {}", data, e);
writeErrMessage(ErrorCode.ER_ACCESS_DENIED_ERROR, "illegal auth packet, the detail error message is " + e.getMessage());
connection.close("illegal auth packet");
} finally {
TraceManager.finishSpan(this, traceObject);
return;
}
asyncLogin = DbleServer.getInstance().getComplexQueryExecutor().submit(() -> {
try {
GeneralProvider.beforeAuthSuccess();
if (needAuthSwitched) {
handleSwitchResponse(data);
} else {
handleAuthPacket(data);
}
} catch (Exception e) {
if (e.getCause() != null && e.getCause() instanceof InterruptedException) {
// print nothing
} else {
LOGGER.error("illegal auth {}", data, e);
writeErrMessage(ErrorCode.ER_ACCESS_DENIED_ERROR, "illegal auth , the detail error message is " + e.getMessage());
connection.close("illegal auth");
}
} finally {
TraceManager.finishSpan(this, traceObject);
}
});
}
private void handleSSLProtoData(byte[] data) {
@@ -302,4 +317,13 @@ public class MySQLFrontAuthService extends FrontendService implements AuthServic
return !receivedMessage;
}
@Override
public void cleanup() {
final Future<?> loginFuture = this.asyncLogin;
if (loginFuture != null && !loginFuture.isDone()) {
loginFuture.cancel(true);
this.asyncLogin = null;
}
super.cleanup();
}
}