fix bug about kill query (#2272)

This commit is contained in:
Collapsar
2020-11-12 15:43:27 +08:00
committed by GitHub
parent 9f28db3b8d
commit ddbff59643
8 changed files with 32 additions and 45 deletions

View File

@@ -147,6 +147,7 @@ public abstract class AbstractService implements Service {
}
public void writeDirectly(ByteBuffer buffer) {
markFinished();
this.connection.write(buffer);
}
@@ -161,16 +162,17 @@ public abstract class AbstractService implements Service {
}
}
public void write(MySQLPacket packet) {
if (packet.isEndOfSession() || packet.isEndOfQuery()) {
TraceManager.sessionFinish(this);
}
markFinished();
packet.bufferWrite(connection);
}
public void writeWithBuffer(MySQLPacket packet, ByteBuffer buffer) {
buffer = packet.write(buffer, this, true);
markFinished();
connection.write(buffer);
if (packet.isEndOfSession() || packet.isEndOfQuery()) {
TraceManager.sessionFinish(this);
@@ -290,6 +292,7 @@ public abstract class AbstractService implements Service {
protected abstract void handleInnerData(byte[] data);
public void writeOkPacket() {
markFinished();
OkPacket ok = new OkPacket();
byte packet = (byte) this.getPacketId().incrementAndGet();
ok.read(OkPacket.OK);
@@ -310,6 +313,7 @@ public abstract class AbstractService implements Service {
}
protected void writeErrMessage(byte id, int vendorCode, String sqlState, String msg) {
markFinished();
ErrorPacket err = new ErrorPacket();
err.setPacketId(id);
err.setErrNo(vendorCode);
@@ -317,4 +321,7 @@ public abstract class AbstractService implements Service {
err.setMessage(StringUtil.encode(msg, connection.getCharsetName().getResults()));
err.write(connection);
}
protected void markFinished() {
}
}

View File

@@ -8,7 +8,6 @@ package com.actiontech.dble.route.parser.druid.impl.ddl;
import com.actiontech.dble.cluster.values.DDLInfo;
import com.actiontech.dble.config.model.sharding.SchemaConfig;
import com.actiontech.dble.config.model.sharding.table.BaseTableConfig;
import com.actiontech.dble.net.mysql.OkPacket;
import com.actiontech.dble.route.RouteResultset;
import com.actiontech.dble.route.parser.druid.ServerSchemaStatVisitor;
import com.actiontech.dble.route.parser.druid.impl.DefaultDruidParser;
@@ -45,7 +44,7 @@ public class DruidDropTableParser extends DefaultDruidParser {
Map<String, BaseTableConfig> tables = schemaInfo.getSchemaConfig().getTables();
BaseTableConfig tc = tables.get(schemaInfo.getTable());
if (tc == null) {
service.writeDirectly(service.writeToBuffer(OkPacket.OK, service.allocate()));
service.writeOkPacket();
rrs.setFinishedExecute(true);
} else {
RouterUtil.routeToDDLNode(schemaInfo, rrs);

View File

@@ -10,10 +10,8 @@ import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.net.IOProcessor;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.connection.FrontendConnection;
import com.actiontech.dble.net.mysql.OkPacket;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.server.SessionStage;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.util.StringUtil;
@@ -78,13 +76,13 @@ public final class KillHandler {
NonBlockingSession killSession = ((ShardingService) killConn.getService()).getSession2();
if (killSession.getTransactionManager().getXAStage() != null ||
killSession.getSessionStage() == SessionStage.Init || killSession.getSessionStage() == SessionStage.Finished) {
getOkPacket(service).write(service.getConnection());
service.writeOkPacket();
return;
}
killSession.setKilled(true);
// return ok to front connection that sends kill query
getOkPacket(service).write(service.getConnection());
service.writeOkPacket();
while (true) {
if (!killSession.isKilled()) {
@@ -117,9 +115,7 @@ public final class KillHandler {
private static void killConnection(long id, ShardingService service) {
// kill myself
if (id == service.getConnection().getId()) {
OkPacket packet = getOkPacket(service);
packet.setPacketId(0);
packet.write(service.getConnection());
service.writeOkPacket();
return;
}
@@ -134,7 +130,7 @@ public final class KillHandler {
}
fc.killAndClose("killed");*/
getOkPacket(service).write(service.getConnection());
service.writeOkPacket();
}
private static FrontendConnection findFrontConn(long connId) {
@@ -148,14 +144,4 @@ public final class KillHandler {
return fc;
}
private static OkPacket getOkPacket(ShardingService service) {
byte packetId = (byte) service.getSession2().getPacketId().get();
OkPacket packet = new OkPacket();
packet.setPacketId(++packetId);
packet.setAffectedRows(0);
packet.setServerStatus(2);
service.getSession2().multiStatementPacket(packet, packetId);
return packet;
}
}

View File

@@ -37,7 +37,7 @@ public final class MysqlSystemSchemaHandler {
}
if (mySqlSelectQueryBlock == null) {
service.writeDirectly(service.writeToBuffer(OkPacket.OK, service.allocate()));
service.writeOkPacket();
return;
}

View File

@@ -1,12 +1,11 @@
/*
* Copyright (C) 2016-2020 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
* Copyright (C) 2016-2020 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.server.response;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.net.mysql.OkPacket;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
public final class SptDrop {
@@ -16,7 +15,7 @@ public final class SptDrop {
public static void response(ShardingService service) {
String name = service.getSptPrepare().getName();
if (service.getSptPrepare().delPrepare(name)) {
service.writeDirectly(service.writeToBuffer(OkPacket.OK, service.allocate()));
service.writeOkPacket();
} else {
service.writeErrMessage(ErrorCode.ER_UNKNOWN_STMT_HANDLER, "Unknown prepared statement handler" + name + " given to DEALLOCATE PREPARE");
}

View File

@@ -1,12 +1,11 @@
/*
* Copyright (C) 2016-2020 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
* Copyright (C) 2016-2020 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.server.response;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.net.mysql.OkPacket;
import com.actiontech.dble.server.parser.ScriptPrepareParse;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
@@ -33,7 +32,7 @@ public final class SptPrepare {
List<String> args = new LinkedList();
ScriptPrepareParse.parseStmt(stmt, args);
service.getSptPrepare().setPrepare(name, args);
service.writeDirectly(service.writeToBuffer(OkPacket.OK, service.allocate()));
service.writeOkPacket();
} else {
service.writeErrMessage(ErrorCode.ER_PARSE_ERROR, "SQL syntax error");
}

View File

@@ -396,21 +396,16 @@ public class ShardingService extends BusinessService {
public void writeErrMessage(String sqlState, String msg, int vendorCode) {
byte packetId = (byte) this.getSession2().getPacketId().get();
writeErrMessage(++packetId, vendorCode, sqlState, msg);
if (session.isDiscard() || session.isKilled()) {
session.setKilled(false);
session.setDiscard(false);
}
}
@Override
protected void writeErrMessage(byte id, int vendorCode, String sqlState, String msg) {
markFinished();
super.writeErrMessage(id, vendorCode, sqlState, msg);
}
public void markFinished() {
if (session != null) {
session.setStageFinished();
if (session.isDiscard() || session.isKilled()) {
session.setKilled(false);
session.setDiscard(false);
}
}
}
@@ -539,6 +534,7 @@ public class ShardingService extends BusinessService {
@Override
public void write(MySQLPacket packet) {
boolean multiQueryFlag = session.multiStatementPacket(packet);
markFinished();
if (packet.isEndOfSession()) {
//error finished do resource clean up
session.resetMultiStatementStatus();
@@ -569,6 +565,7 @@ public class ShardingService extends BusinessService {
@Override
public void writeWithBuffer(MySQLPacket packet, ByteBuffer buffer) {
boolean multiQueryFlag = session.multiStatementPacket(packet);
markFinished();
if (packet.isEndOfSession()) {
//error finished do resource clean up
session.resetMultiStatementStatus();

View File

@@ -147,7 +147,7 @@ public class SetTestJob implements ResponseHandler, Runnable {
ResetConnHandler handler = new ResetConnHandler();
responseService.setResponseHandler(handler);
responseService.setComplexQuery(true);
responseService.writeDirectly(responseService.writeToBuffer(ResetConnectionPacket.RESET, responseService.allocate()));
responseService.writeDirectly(ResetConnectionPacket.RESET);
}
}
@@ -192,7 +192,7 @@ public class SetTestJob implements ResponseHandler, Runnable {
ResetConnHandler handler = new ResetConnHandler();
responseService.setResponseHandler(handler);
responseService.setComplexQuery(true);
responseService.writeDirectly(responseService.writeToBuffer(ResetConnectionPacket.RESET, responseService.allocate()));
responseService.writeDirectly(ResetConnectionPacket.RESET);
}
}