diff --git a/src/main/java/com/actiontech/dble/manager/response/KillConnection.java b/src/main/java/com/actiontech/dble/manager/response/KillConnection.java index c8042d2b6..65186d01b 100644 --- a/src/main/java/com/actiontech/dble/manager/response/KillConnection.java +++ b/src/main/java/com/actiontech/dble/manager/response/KillConnection.java @@ -8,7 +8,6 @@ package com.actiontech.dble.manager.response; import com.actiontech.dble.DbleServer; import com.actiontech.dble.manager.ManagerConnection; import com.actiontech.dble.net.FrontendConnection; -import com.actiontech.dble.net.NIOConnection; import com.actiontech.dble.net.NIOProcessor; import com.actiontech.dble.net.mysql.OkPacket; import com.actiontech.dble.util.SplitUtil; @@ -31,10 +30,10 @@ public final class KillConnection { int count = 0; List list = getList(stmt, offset, mc); if (list != null) { - for (NIOConnection c : list) { + for (FrontendConnection c : list) { StringBuilder s = new StringBuilder(); LOGGER.warn(s.append(c).append("killed by manager").toString()); - c.close("kill by manager"); + c.killAndClose("kill by manager"); count++; } } diff --git a/src/main/java/com/actiontech/dble/net/FrontendConnection.java b/src/main/java/com/actiontech/dble/net/FrontendConnection.java index 5fc4b978f..e0032c45d 100644 --- a/src/main/java/com/actiontech/dble/net/FrontendConnection.java +++ b/src/main/java/com/actiontech/dble/net/FrontendConnection.java @@ -508,4 +508,8 @@ public abstract class FrontendConnection extends AbstractConnection { public void close(String reason) { super.close(isAuthenticated ? reason : ""); } + + public void killAndClose(String reaseon) { + + } } diff --git a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java index e2cb992e5..0cd879741 100644 --- a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java +++ b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java @@ -497,37 +497,35 @@ public class NonBlockingSession implements Session { } protected void kill() { - boolean hooked = false; - AtomicInteger count = null; - Map killees = null; + AtomicInteger count = new AtomicInteger(0); + Map killees = new HashMap<>(); + for (Map.Entry entry : target.entrySet()) { BackendConnection c = entry.getValue(); - if (c != null) { - if (!hooked) { - hooked = true; - killees = new HashMap<>(); - count = new AtomicInteger(0); - } + if (c != null && !c.isDDL()) { killees.put(entry.getKey(), c); count.incrementAndGet(); + } else if (c != null && c.isDDL()) { + //if the sql executing is a ddl,do not kill the query,just close the connection + this.terminate(); + return; } } - if (hooked) { - for (Entry en : killees.entrySet()) { - KillConnectionHandler kill = new KillConnectionHandler( - en.getValue(), this); - ServerConfig conf = DbleServer.getInstance().getConfig(); - PhysicalDBNode dn = conf.getDataNodes().get( - en.getKey().getName()); - try { - dn.getConnectionFromSameSource(null, true, en.getValue(), - kill, en.getKey()); - } catch (Exception e) { - LOGGER.error( - "get killer connection failed for " + en.getKey(), - e); - kill.connectionError(e, null); - } + + for (Entry en : killees.entrySet()) { + KillConnectionHandler kill = new KillConnectionHandler( + en.getValue(), this); + ServerConfig conf = DbleServer.getInstance().getConfig(); + PhysicalDBNode dn = conf.getDataNodes().get( + en.getKey().getName()); + try { + dn.getConnectionFromSameSource(en.getValue().getSchema(), true, en.getValue(), + kill, en.getKey()); + } catch (Exception e) { + LOGGER.error( + "get killer connection failed for " + en.getKey(), + e); + kill.connectionError(e, null); } } } diff --git a/src/main/java/com/actiontech/dble/server/ServerConnection.java b/src/main/java/com/actiontech/dble/server/ServerConnection.java index b8ac42e78..0b02b2ed4 100644 --- a/src/main/java/com/actiontech/dble/server/ServerConnection.java +++ b/src/main/java/com/actiontech/dble/server/ServerConnection.java @@ -377,6 +377,22 @@ public class ServerConnection extends FrontendConnection { } } + + + + @Override + public void killAndClose(String reason) { + if (session.getSource().isTxstart() && !session.cancelableStatusSet(NonBlockingSession.CANCEL_STATUS_CANCELING) && + session.getXaState() != null && session.getXaState() != TxState.TX_INITIALIZE_STATE) { + //XA transaction in this phase(commit/rollback) close the front end and wait for the backend finished + super.close(reason); + } else { + //not a xa transaction ,close it + super.close(reason); + session.kill(); + } + } + @Override public String toString() { return "ServerConnection [id=" + id + ", schema=" + schema + ", host=" + host + diff --git a/src/main/java/com/actiontech/dble/server/handler/KillHandler.java b/src/main/java/com/actiontech/dble/server/handler/KillHandler.java index a319dbbb3..2dda16260 100644 --- a/src/main/java/com/actiontech/dble/server/handler/KillHandler.java +++ b/src/main/java/com/actiontech/dble/server/handler/KillHandler.java @@ -50,7 +50,11 @@ public final class KillHandler { } } if (fc != null) { - fc.close("killed"); + if (!fc.getUser().equals(c.getUser())) { + c.writeErrMessage(ErrorCode.ER_NO_SUCH_THREAD, "can't kill other user's connection" + id); + return; + } + fc.killAndClose("killed"); getOkPacket().write(c); } else { c.writeErrMessage(ErrorCode.ER_NO_SUCH_THREAD, "Unknown connection id:" + id);