diff --git a/src/main/java/com/actiontech/dble/net/service/AbstractService.java b/src/main/java/com/actiontech/dble/net/service/AbstractService.java index 4873efac4..0921c67b7 100644 --- a/src/main/java/com/actiontech/dble/net/service/AbstractService.java +++ b/src/main/java/com/actiontech/dble/net/service/AbstractService.java @@ -147,6 +147,7 @@ public abstract class AbstractService implements Service { } public void writeDirectly(ByteBuffer buffer) { + markFinished(); this.connection.write(buffer); } @@ -161,16 +162,17 @@ public abstract class AbstractService implements Service { } } - public void write(MySQLPacket packet) { if (packet.isEndOfSession() || packet.isEndOfQuery()) { TraceManager.sessionFinish(this); } + markFinished(); packet.bufferWrite(connection); } public void writeWithBuffer(MySQLPacket packet, ByteBuffer buffer) { buffer = packet.write(buffer, this, true); + markFinished(); connection.write(buffer); if (packet.isEndOfSession() || packet.isEndOfQuery()) { TraceManager.sessionFinish(this); @@ -290,6 +292,7 @@ public abstract class AbstractService implements Service { protected abstract void handleInnerData(byte[] data); public void writeOkPacket() { + markFinished(); OkPacket ok = new OkPacket(); byte packet = (byte) this.getPacketId().incrementAndGet(); ok.read(OkPacket.OK); @@ -310,6 +313,7 @@ public abstract class AbstractService implements Service { } protected void writeErrMessage(byte id, int vendorCode, String sqlState, String msg) { + markFinished(); ErrorPacket err = new ErrorPacket(); err.setPacketId(id); err.setErrNo(vendorCode); @@ -317,4 +321,7 @@ public abstract class AbstractService implements Service { err.setMessage(StringUtil.encode(msg, connection.getCharsetName().getResults())); err.write(connection); } + + protected void markFinished() { + } } diff --git a/src/main/java/com/actiontech/dble/route/parser/druid/impl/ddl/DruidDropTableParser.java b/src/main/java/com/actiontech/dble/route/parser/druid/impl/ddl/DruidDropTableParser.java index 7de22647c..e9f0f277b 100644 --- a/src/main/java/com/actiontech/dble/route/parser/druid/impl/ddl/DruidDropTableParser.java +++ b/src/main/java/com/actiontech/dble/route/parser/druid/impl/ddl/DruidDropTableParser.java @@ -8,7 +8,6 @@ package com.actiontech.dble.route.parser.druid.impl.ddl; import com.actiontech.dble.cluster.values.DDLInfo; import com.actiontech.dble.config.model.sharding.SchemaConfig; import com.actiontech.dble.config.model.sharding.table.BaseTableConfig; -import com.actiontech.dble.net.mysql.OkPacket; import com.actiontech.dble.route.RouteResultset; import com.actiontech.dble.route.parser.druid.ServerSchemaStatVisitor; import com.actiontech.dble.route.parser.druid.impl.DefaultDruidParser; @@ -45,7 +44,7 @@ public class DruidDropTableParser extends DefaultDruidParser { Map tables = schemaInfo.getSchemaConfig().getTables(); BaseTableConfig tc = tables.get(schemaInfo.getTable()); if (tc == null) { - service.writeDirectly(service.writeToBuffer(OkPacket.OK, service.allocate())); + service.writeOkPacket(); rrs.setFinishedExecute(true); } else { RouterUtil.routeToDDLNode(schemaInfo, rrs); diff --git a/src/main/java/com/actiontech/dble/server/handler/KillHandler.java b/src/main/java/com/actiontech/dble/server/handler/KillHandler.java index c829cc700..7e6832b98 100644 --- a/src/main/java/com/actiontech/dble/server/handler/KillHandler.java +++ b/src/main/java/com/actiontech/dble/server/handler/KillHandler.java @@ -10,10 +10,8 @@ import com.actiontech.dble.config.ErrorCode; import com.actiontech.dble.net.IOProcessor; import com.actiontech.dble.net.connection.BackendConnection; import com.actiontech.dble.net.connection.FrontendConnection; -import com.actiontech.dble.net.mysql.OkPacket; import com.actiontech.dble.route.RouteResultsetNode; import com.actiontech.dble.server.NonBlockingSession; - import com.actiontech.dble.server.SessionStage; import com.actiontech.dble.services.mysqlsharding.ShardingService; import com.actiontech.dble.util.StringUtil; @@ -78,13 +76,13 @@ public final class KillHandler { NonBlockingSession killSession = ((ShardingService) killConn.getService()).getSession2(); if (killSession.getTransactionManager().getXAStage() != null || killSession.getSessionStage() == SessionStage.Init || killSession.getSessionStage() == SessionStage.Finished) { - getOkPacket(service).write(service.getConnection()); + service.writeOkPacket(); return; } killSession.setKilled(true); // return ok to front connection that sends kill query - getOkPacket(service).write(service.getConnection()); + service.writeOkPacket(); while (true) { if (!killSession.isKilled()) { @@ -117,9 +115,7 @@ public final class KillHandler { private static void killConnection(long id, ShardingService service) { // kill myself if (id == service.getConnection().getId()) { - OkPacket packet = getOkPacket(service); - packet.setPacketId(0); - packet.write(service.getConnection()); + service.writeOkPacket(); return; } @@ -134,7 +130,7 @@ public final class KillHandler { } fc.killAndClose("killed");*/ - getOkPacket(service).write(service.getConnection()); + service.writeOkPacket(); } private static FrontendConnection findFrontConn(long connId) { @@ -148,14 +144,4 @@ public final class KillHandler { return fc; } - private static OkPacket getOkPacket(ShardingService service) { - byte packetId = (byte) service.getSession2().getPacketId().get(); - OkPacket packet = new OkPacket(); - packet.setPacketId(++packetId); - packet.setAffectedRows(0); - packet.setServerStatus(2); - service.getSession2().multiStatementPacket(packet, packetId); - return packet; - } - } diff --git a/src/main/java/com/actiontech/dble/server/handler/MysqlSystemSchemaHandler.java b/src/main/java/com/actiontech/dble/server/handler/MysqlSystemSchemaHandler.java index cdf983fa4..7bef02d0a 100644 --- a/src/main/java/com/actiontech/dble/server/handler/MysqlSystemSchemaHandler.java +++ b/src/main/java/com/actiontech/dble/server/handler/MysqlSystemSchemaHandler.java @@ -37,7 +37,7 @@ public final class MysqlSystemSchemaHandler { } if (mySqlSelectQueryBlock == null) { - service.writeDirectly(service.writeToBuffer(OkPacket.OK, service.allocate())); + service.writeOkPacket(); return; } diff --git a/src/main/java/com/actiontech/dble/server/response/SptDrop.java b/src/main/java/com/actiontech/dble/server/response/SptDrop.java index e585e7fe9..a1ea72e3c 100644 --- a/src/main/java/com/actiontech/dble/server/response/SptDrop.java +++ b/src/main/java/com/actiontech/dble/server/response/SptDrop.java @@ -1,12 +1,11 @@ /* -* Copyright (C) 2016-2020 ActionTech. -* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. -* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. -*/ + * Copyright (C) 2016-2020 ActionTech. + * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ package com.actiontech.dble.server.response; import com.actiontech.dble.config.ErrorCode; -import com.actiontech.dble.net.mysql.OkPacket; import com.actiontech.dble.services.mysqlsharding.ShardingService; public final class SptDrop { @@ -16,7 +15,7 @@ public final class SptDrop { public static void response(ShardingService service) { String name = service.getSptPrepare().getName(); if (service.getSptPrepare().delPrepare(name)) { - service.writeDirectly(service.writeToBuffer(OkPacket.OK, service.allocate())); + service.writeOkPacket(); } else { service.writeErrMessage(ErrorCode.ER_UNKNOWN_STMT_HANDLER, "Unknown prepared statement handler" + name + " given to DEALLOCATE PREPARE"); } diff --git a/src/main/java/com/actiontech/dble/server/response/SptPrepare.java b/src/main/java/com/actiontech/dble/server/response/SptPrepare.java index ba2ef3a74..be798793c 100644 --- a/src/main/java/com/actiontech/dble/server/response/SptPrepare.java +++ b/src/main/java/com/actiontech/dble/server/response/SptPrepare.java @@ -1,12 +1,11 @@ /* -* Copyright (C) 2016-2020 ActionTech. -* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. -* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. -*/ + * Copyright (C) 2016-2020 ActionTech. + * based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ package com.actiontech.dble.server.response; import com.actiontech.dble.config.ErrorCode; -import com.actiontech.dble.net.mysql.OkPacket; import com.actiontech.dble.server.parser.ScriptPrepareParse; import com.actiontech.dble.services.mysqlsharding.ShardingService; @@ -33,7 +32,7 @@ public final class SptPrepare { List args = new LinkedList(); ScriptPrepareParse.parseStmt(stmt, args); service.getSptPrepare().setPrepare(name, args); - service.writeDirectly(service.writeToBuffer(OkPacket.OK, service.allocate())); + service.writeOkPacket(); } else { service.writeErrMessage(ErrorCode.ER_PARSE_ERROR, "SQL syntax error"); } diff --git a/src/main/java/com/actiontech/dble/services/mysqlsharding/ShardingService.java b/src/main/java/com/actiontech/dble/services/mysqlsharding/ShardingService.java index 5934f281b..c79bcada2 100644 --- a/src/main/java/com/actiontech/dble/services/mysqlsharding/ShardingService.java +++ b/src/main/java/com/actiontech/dble/services/mysqlsharding/ShardingService.java @@ -396,21 +396,16 @@ public class ShardingService extends BusinessService { public void writeErrMessage(String sqlState, String msg, int vendorCode) { byte packetId = (byte) this.getSession2().getPacketId().get(); writeErrMessage(++packetId, vendorCode, sqlState, msg); - if (session.isDiscard() || session.isKilled()) { - session.setKilled(false); - session.setDiscard(false); - } } @Override - protected void writeErrMessage(byte id, int vendorCode, String sqlState, String msg) { - markFinished(); - super.writeErrMessage(id, vendorCode, sqlState, msg); - } - public void markFinished() { if (session != null) { session.setStageFinished(); + if (session.isDiscard() || session.isKilled()) { + session.setKilled(false); + session.setDiscard(false); + } } } @@ -539,6 +534,7 @@ public class ShardingService extends BusinessService { @Override public void write(MySQLPacket packet) { boolean multiQueryFlag = session.multiStatementPacket(packet); + markFinished(); if (packet.isEndOfSession()) { //error finished do resource clean up session.resetMultiStatementStatus(); @@ -569,6 +565,7 @@ public class ShardingService extends BusinessService { @Override public void writeWithBuffer(MySQLPacket packet, ByteBuffer buffer) { boolean multiQueryFlag = session.multiStatementPacket(packet); + markFinished(); if (packet.isEndOfSession()) { //error finished do resource clean up session.resetMultiStatementStatus(); diff --git a/src/main/java/com/actiontech/dble/sqlengine/SetTestJob.java b/src/main/java/com/actiontech/dble/sqlengine/SetTestJob.java index 438156124..fcab04e6c 100644 --- a/src/main/java/com/actiontech/dble/sqlengine/SetTestJob.java +++ b/src/main/java/com/actiontech/dble/sqlengine/SetTestJob.java @@ -147,7 +147,7 @@ public class SetTestJob implements ResponseHandler, Runnable { ResetConnHandler handler = new ResetConnHandler(); responseService.setResponseHandler(handler); responseService.setComplexQuery(true); - responseService.writeDirectly(responseService.writeToBuffer(ResetConnectionPacket.RESET, responseService.allocate())); + responseService.writeDirectly(ResetConnectionPacket.RESET); } } @@ -192,7 +192,7 @@ public class SetTestJob implements ResponseHandler, Runnable { ResetConnHandler handler = new ResetConnHandler(); responseService.setResponseHandler(handler); responseService.setComplexQuery(true); - responseService.writeDirectly(responseService.writeToBuffer(ResetConnectionPacket.RESET, responseService.allocate())); + responseService.writeDirectly(ResetConnectionPacket.RESET); } }