#276 Kill @@connection send kill command to backend connection

This commit is contained in:
sunzhengfang
2017-09-25 10:14:47 +08:00
parent 38b46eb440
commit 2d68c1d297
5 changed files with 50 additions and 29 deletions
@@ -8,7 +8,6 @@ package com.actiontech.dble.manager.response;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.manager.ManagerConnection;
import com.actiontech.dble.net.FrontendConnection;
import com.actiontech.dble.net.NIOConnection;
import com.actiontech.dble.net.NIOProcessor;
import com.actiontech.dble.net.mysql.OkPacket;
import com.actiontech.dble.util.SplitUtil;
@@ -31,10 +30,10 @@ public final class KillConnection {
int count = 0;
List<FrontendConnection> list = getList(stmt, offset, mc);
if (list != null) {
for (NIOConnection c : list) {
for (FrontendConnection c : list) {
StringBuilder s = new StringBuilder();
LOGGER.warn(s.append(c).append("killed by manager").toString());
c.close("kill by manager");
c.killAndClose("kill by manager");
count++;
}
}
@@ -508,4 +508,8 @@ public abstract class FrontendConnection extends AbstractConnection {
public void close(String reason) {
super.close(isAuthenticated ? reason : "");
}
public void killAndClose(String reaseon) {
}
}
@@ -497,37 +497,35 @@ public class NonBlockingSession implements Session {
}
protected void kill() {
boolean hooked = false;
AtomicInteger count = null;
Map<RouteResultsetNode, BackendConnection> killees = null;
AtomicInteger count = new AtomicInteger(0);
Map<RouteResultsetNode, BackendConnection> killees = new HashMap<>();
for (Map.Entry<RouteResultsetNode, BackendConnection> entry : target.entrySet()) {
BackendConnection c = entry.getValue();
if (c != null) {
if (!hooked) {
hooked = true;
killees = new HashMap<>();
count = new AtomicInteger(0);
}
if (c != null && !c.isDDL()) {
killees.put(entry.getKey(), c);
count.incrementAndGet();
} else if (c != null && c.isDDL()) {
//if the sql executing is a ddl,do not kill the query,just close the connection
this.terminate();
return;
}
}
if (hooked) {
for (Entry<RouteResultsetNode, BackendConnection> en : killees.entrySet()) {
KillConnectionHandler kill = new KillConnectionHandler(
en.getValue(), this);
ServerConfig conf = DbleServer.getInstance().getConfig();
PhysicalDBNode dn = conf.getDataNodes().get(
en.getKey().getName());
try {
dn.getConnectionFromSameSource(null, true, en.getValue(),
kill, en.getKey());
} catch (Exception e) {
LOGGER.error(
"get killer connection failed for " + en.getKey(),
e);
kill.connectionError(e, null);
}
for (Entry<RouteResultsetNode, BackendConnection> en : killees.entrySet()) {
KillConnectionHandler kill = new KillConnectionHandler(
en.getValue(), this);
ServerConfig conf = DbleServer.getInstance().getConfig();
PhysicalDBNode dn = conf.getDataNodes().get(
en.getKey().getName());
try {
dn.getConnectionFromSameSource(en.getValue().getSchema(), true, en.getValue(),
kill, en.getKey());
} catch (Exception e) {
LOGGER.error(
"get killer connection failed for " + en.getKey(),
e);
kill.connectionError(e, null);
}
}
}
@@ -377,6 +377,22 @@ public class ServerConnection extends FrontendConnection {
}
}
@Override
public void killAndClose(String reason) {
if (session.getSource().isTxstart() && !session.cancelableStatusSet(NonBlockingSession.CANCEL_STATUS_CANCELING) &&
session.getXaState() != null && session.getXaState() != TxState.TX_INITIALIZE_STATE) {
//XA transaction in this phase(commit/rollback) close the front end and wait for the backend finished
super.close(reason);
} else {
//not a xa transaction ,close it
super.close(reason);
session.kill();
}
}
@Override
public String toString() {
return "ServerConnection [id=" + id + ", schema=" + schema + ", host=" + host +
@@ -50,7 +50,11 @@ public final class KillHandler {
}
}
if (fc != null) {
fc.close("killed");
if (!fc.getUser().equals(c.getUser())) {
c.writeErrMessage(ErrorCode.ER_NO_SUCH_THREAD, "can't kill other user's connection" + id);
return;
}
fc.killAndClose("killed");
getOkPacket().write(c);
} else {
c.writeErrMessage(ErrorCode.ER_NO_SUCH_THREAD, "Unknown connection id:" + id);