mirror of
https://github.com/actiontech/dble.git
synced 2026-01-06 04:40:17 -06:00
[inner-2096] fix: the problem of hung in the 16M's big package (#3606)
(cherry picked from commit 8839c82cdd)
This commit is contained in:
@@ -8,6 +8,7 @@ import com.actiontech.dble.net.connection.WriteAbleService;
|
||||
import com.actiontech.dble.net.mysql.MySQLPacket;
|
||||
import com.actiontech.dble.services.VariablesService;
|
||||
import com.actiontech.dble.services.mysqlsharding.ShardingService;
|
||||
import com.actiontech.dble.services.rwsplit.RWSplitService;
|
||||
import com.actiontech.dble.singleton.TraceManager;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@@ -72,27 +73,31 @@ public abstract class AbstractService extends VariablesService implements Servic
|
||||
ByteUtil.writeUB3(singlePacket, MySQLPacket.MAX_PACKET_SIZE);
|
||||
byte packetId = data[3];
|
||||
singlePacket[3] = packetId;
|
||||
buffer = writeToBuffer(singlePacket, buffer);
|
||||
buffer = writeToBuffer0(singlePacket, buffer);
|
||||
while (length >= MySQLPacket.MAX_PACKET_SIZE) {
|
||||
singlePacket = new byte[MySQLPacket.MAX_PACKET_SIZE + MySQLPacket.PACKET_HEADER_SIZE];
|
||||
ByteUtil.writeUB3(singlePacket, MySQLPacket.MAX_PACKET_SIZE);
|
||||
singlePacket[3] = ++packetId;
|
||||
if (this instanceof ShardingService) {
|
||||
singlePacket[3] = (byte) ((ShardingService) this).nextPacketId();
|
||||
} else if (this instanceof RWSplitService) {
|
||||
singlePacket[3] = (byte) ((RWSplitService) this).nextPacketId();
|
||||
}
|
||||
System.arraycopy(data, srcPos, singlePacket, MySQLPacket.PACKET_HEADER_SIZE, MySQLPacket.MAX_PACKET_SIZE);
|
||||
srcPos += MySQLPacket.MAX_PACKET_SIZE;
|
||||
length -= MySQLPacket.MAX_PACKET_SIZE;
|
||||
buffer = writeToBuffer(singlePacket, buffer);
|
||||
buffer = writeToBuffer0(singlePacket, buffer);
|
||||
}
|
||||
singlePacket = new byte[length + MySQLPacket.PACKET_HEADER_SIZE];
|
||||
ByteUtil.writeUB3(singlePacket, length);
|
||||
singlePacket[3] = ++packetId;
|
||||
if (this instanceof ShardingService) {
|
||||
singlePacket[3] = (byte) ((ShardingService) this).nextPacketId();
|
||||
} else if (this instanceof RWSplitService) {
|
||||
singlePacket[3] = (byte) ((RWSplitService) this).nextPacketId();
|
||||
}
|
||||
System.arraycopy(data, srcPos, singlePacket, MySQLPacket.PACKET_HEADER_SIZE, length);
|
||||
buffer = writeToBuffer(singlePacket, buffer);
|
||||
buffer = writeToBuffer0(singlePacket, buffer);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
@@ -103,9 +108,13 @@ public abstract class AbstractService extends VariablesService implements Servic
|
||||
|
||||
@Override
|
||||
public ByteBuffer writeToBuffer(byte[] src, ByteBuffer buffer) {
|
||||
if (src.length > MySQLPacket.MAX_PACKET_SIZE + MySQLPacket.PACKET_HEADER_SIZE) {
|
||||
if (src.length >= MySQLPacket.MAX_PACKET_SIZE + MySQLPacket.PACKET_HEADER_SIZE) {
|
||||
return this.writeBigPackageToBuffer(src, buffer);
|
||||
}
|
||||
return writeToBuffer0(src, buffer);
|
||||
}
|
||||
|
||||
private ByteBuffer writeToBuffer0(byte[] src, ByteBuffer buffer) {
|
||||
int offset = 0;
|
||||
int length = src.length;
|
||||
int remaining = buffer.remaining();
|
||||
|
||||
Reference in New Issue
Block a user