diff --git a/src/main/java/com/actiontech/dble/net/service/AbstractService.java b/src/main/java/com/actiontech/dble/net/service/AbstractService.java index 51dc15e34..733a8051d 100644 --- a/src/main/java/com/actiontech/dble/net/service/AbstractService.java +++ b/src/main/java/com/actiontech/dble/net/service/AbstractService.java @@ -8,6 +8,7 @@ import com.actiontech.dble.net.connection.WriteAbleService; import com.actiontech.dble.net.mysql.MySQLPacket; import com.actiontech.dble.services.VariablesService; import com.actiontech.dble.services.mysqlsharding.ShardingService; +import com.actiontech.dble.services.rwsplit.RWSplitService; import com.actiontech.dble.singleton.TraceManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -72,27 +73,31 @@ public abstract class AbstractService extends VariablesService implements Servic ByteUtil.writeUB3(singlePacket, MySQLPacket.MAX_PACKET_SIZE); byte packetId = data[3]; singlePacket[3] = packetId; - buffer = writeToBuffer(singlePacket, buffer); + buffer = writeToBuffer0(singlePacket, buffer); while (length >= MySQLPacket.MAX_PACKET_SIZE) { singlePacket = new byte[MySQLPacket.MAX_PACKET_SIZE + MySQLPacket.PACKET_HEADER_SIZE]; ByteUtil.writeUB3(singlePacket, MySQLPacket.MAX_PACKET_SIZE); singlePacket[3] = ++packetId; if (this instanceof ShardingService) { singlePacket[3] = (byte) ((ShardingService) this).nextPacketId(); + } else if (this instanceof RWSplitService) { + singlePacket[3] = (byte) ((RWSplitService) this).nextPacketId(); } System.arraycopy(data, srcPos, singlePacket, MySQLPacket.PACKET_HEADER_SIZE, MySQLPacket.MAX_PACKET_SIZE); srcPos += MySQLPacket.MAX_PACKET_SIZE; length -= MySQLPacket.MAX_PACKET_SIZE; - buffer = writeToBuffer(singlePacket, buffer); + buffer = writeToBuffer0(singlePacket, buffer); } singlePacket = new byte[length + MySQLPacket.PACKET_HEADER_SIZE]; ByteUtil.writeUB3(singlePacket, length); singlePacket[3] = ++packetId; if (this instanceof ShardingService) { singlePacket[3] = (byte) ((ShardingService) this).nextPacketId(); + } else if (this instanceof RWSplitService) { + singlePacket[3] = (byte) ((RWSplitService) this).nextPacketId(); } System.arraycopy(data, srcPos, singlePacket, MySQLPacket.PACKET_HEADER_SIZE, length); - buffer = writeToBuffer(singlePacket, buffer); + buffer = writeToBuffer0(singlePacket, buffer); return buffer; } @@ -103,9 +108,13 @@ public abstract class AbstractService extends VariablesService implements Servic @Override public ByteBuffer writeToBuffer(byte[] src, ByteBuffer buffer) { - if (src.length > MySQLPacket.MAX_PACKET_SIZE + MySQLPacket.PACKET_HEADER_SIZE) { + if (src.length >= MySQLPacket.MAX_PACKET_SIZE + MySQLPacket.PACKET_HEADER_SIZE) { return this.writeBigPackageToBuffer(src, buffer); } + return writeToBuffer0(src, buffer); + } + + private ByteBuffer writeToBuffer0(byte[] src, ByteBuffer buffer) { int offset = 0; int length = src.length; int remaining = buffer.remaining();