inner-2050: big packet support (#3554)

Signed-off-by: dcy10000 <dcy10000@gmail.com>

Signed-off-by: dcy10000 <dcy10000@gmail.com>
This commit is contained in:
Rico
2022-12-16 16:15:10 +08:00
committed by GitHub
parent dd521ddf3b
commit 7db6aa27c5
14 changed files with 200 additions and 82 deletions
@@ -9,6 +9,7 @@ import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.datasource.ShardingNode;
import com.actiontech.dble.cluster.values.DDLTraceInfo;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.log.transaction.TxnLogHelper;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.mysql.ErrorPacket;
@@ -61,15 +62,19 @@ public class MultiNodeDDLExecuteHandler extends MultiNodeQueryHandler {
lock.unlock();
}
LOGGER.debug("rrs.getRunOnSlave()-" + rrs.getRunOnSlave());
StringBuilder sb = new StringBuilder();
for (final RouteResultsetNode node : rrs.getNodes()) {
for (RouteResultsetNode node : rrs.getNodes()) {
unResponseRrns.add(node);
if (node.isModifySQL()) {
sb.append("[").append(node.getName()).append("]").append(node.getStatement()).append(";\n");
}
}
if (sb.length() > 0) {
TxnLogHelper.putTxnLog(session.getShardingService(), sb.toString());
if (SystemConfig.getInstance().getRecordTxn() == 1) {
StringBuilder sb = new StringBuilder();
for (final RouteResultsetNode node : rrs.getNodes()) {
if (node.isModifySQL()) {
sb.append("[").append(node.getName()).append("]").append(node.getStatement()).append(";\n");
}
}
if (sb.length() > 0) {
TxnLogHelper.putTxnLog(session.getShardingService(), sb.toString());
}
}
DDLTraceManager.getInstance().updateDDLStatus(DDLTraceInfo.DDLStage.EXECUTE_START, session.getShardingService());
@@ -9,6 +9,7 @@ import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.datasource.ShardingNode;
import com.actiontech.dble.cluster.values.DDLTraceInfo;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.config.model.SystemConfig;
import com.actiontech.dble.log.transaction.TxnLogHelper;
import com.actiontech.dble.net.connection.BackendConnection;
import com.actiontech.dble.net.mysql.ErrorPacket;
@@ -89,15 +90,19 @@ public class MultiNodeDdlPrepareHandler extends MultiNodeHandler implements Exec
}
LOGGER.debug("rrs.getRunOnSlave()-" + rrs.getRunOnSlave());
StringBuilder sb = new StringBuilder();
for (final RouteResultsetNode node : rrs.getNodes()) {
for (RouteResultsetNode node : rrs.getNodes()) {
unResponseRrns.add(node);
if (node.isModifySQL()) {
sb.append("[").append(node.getName()).append("]").append(node.getStatement()).append(";\n");
}
}
if (sb.length() > 0) {
TxnLogHelper.putTxnLog(session.getShardingService(), sb.toString());
if (SystemConfig.getInstance().getRecordTxn() == 1) {
StringBuilder sb = new StringBuilder();
for (final RouteResultsetNode node : rrs.getNodes()) {
if (node.isModifySQL()) {
sb.append("[").append(node.getName()).append("]").append(node.getStatement()).append(";\n");
}
}
if (sb.length() > 0) {
TxnLogHelper.putTxnLog(session.getShardingService(), sb.toString());
}
}
DDLTraceManager.getInstance().updateDDLStatus(DDLTraceInfo.DDLStage.CONN_TEST_START, session.getShardingService());
@@ -126,15 +126,19 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
lock.unlock();
}
LOGGER.debug("rrs.getRunOnSlave()-" + rrs.getRunOnSlave());
StringBuilder sb = new StringBuilder();
for (final RouteResultsetNode node : rrs.getNodes()) {
for (RouteResultsetNode node : rrs.getNodes()) {
unResponseRrns.add(node);
if (node.isModifySQL()) {
sb.append("[").append(node.getName()).append("]").append(node.getStatement()).append(";\n");
}
}
if (sb.length() > 0) {
TxnLogHelper.putTxnLog(session.getShardingService(), sb.toString());
if (SystemConfig.getInstance().getRecordTxn() == 1) {
StringBuilder sb = new StringBuilder();
for (final RouteResultsetNode node : rrs.getNodes()) {
if (node.isModifySQL()) {
sb.append("[").append(node.getName()).append("]").append(node.getStatement()).append(";\n");
}
}
if (sb.length() > 0) {
TxnLogHelper.putTxnLog(session.getShardingService(), sb.toString());
}
}
for (final RouteResultsetNode node : rrs.getNodes()) {
@@ -6,6 +6,7 @@ import com.actiontech.dble.net.mysql.MySQLPacket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.nio.ByteBuffer;
import static com.actiontech.dble.backend.mysql.proto.handler.ProtoHandlerResultCode.*;
@@ -23,17 +24,19 @@ public class MySQLProtoHandlerImpl implements ProtoHandler {
}
@Override
@Nonnull
public ProtoHandlerResult handle(ByteBuffer dataBuffer, int offset, boolean isSupportCompress) {
int position = dataBuffer.position();
int length = getPacketLength(dataBuffer, offset, isSupportCompress);
final ProtoHandlerResult.ProtoHandlerResultBuilder builder = ProtoHandlerResult.builder();
if (length == -1) {
if (offset != 0) {
return new ProtoHandlerResult(BUFFER_PACKET_UNCOMPLETE, offset);
return builder.setCode(BUFFER_PACKET_UNCOMPLETE).setHasMorePacket(false).setOffset(offset).build();
} else if (!dataBuffer.hasRemaining()) {
throw new RuntimeException("invalid dataBuffer capacity ,too little buffer size " +
dataBuffer.capacity());
}
return new ProtoHandlerResult(BUFFER_PACKET_UNCOMPLETE, offset);
return builder.setCode(BUFFER_PACKET_UNCOMPLETE).setHasMorePacket(false).setOffset(offset).build();
}
if (position >= offset + length) {
// handle this package
@@ -42,7 +45,9 @@ public class MySQLProtoHandlerImpl implements ProtoHandler {
dataBuffer.get(data, 0, length);
data = checkData(data, length);
if (data == null) {
return new ProtoHandlerResult(REACH_END_BUFFER, offset);
builder.setCode(PART_OF_BIG_PACKET);
} else {
builder.setCode(COMPLETE_PACKET);
}
// offset to next position
@@ -51,17 +56,21 @@ public class MySQLProtoHandlerImpl implements ProtoHandler {
if (position != offset) {
// try next package parse
//dataBufferOffset = offset;
//should reset position after read.
dataBuffer.position(position);
return new ProtoHandlerResult(STLL_DATA_REMING, offset, data);
builder.setHasMorePacket(true);
} else {
builder.setHasMorePacket(false);
}
return new ProtoHandlerResult(REACH_END_BUFFER, offset, data);
builder.setOffset(offset).setPacketData(data);
return builder.build();
} else {
// not read whole message package ,so check if buffer enough and
// compact dataBuffer
if (!dataBuffer.hasRemaining()) {
return new ProtoHandlerResult(BUFFER_NOT_BIG_ENOUGH, offset, length);
return builder.setCode(BUFFER_NOT_BIG_ENOUGH).setHasMorePacket(false).setOffset(offset).setPacketLength(length).build();
} else {
return new ProtoHandlerResult(BUFFER_PACKET_UNCOMPLETE, offset, length);
return builder.setCode(BUFFER_PACKET_UNCOMPLETE).setHasMorePacket(false).setOffset(offset).setPacketLength(length).build();
}
}
}
@@ -86,10 +95,12 @@ public class MySQLProtoHandlerImpl implements ProtoHandler {
private byte[] checkData(byte[] data, int length) {
//session packet should be set to the latest one
//todo: this method doesn't apply for compress
if (length >= com.actiontech.dble.net.mysql.MySQLPacket.MAX_PACKET_SIZE + com.actiontech.dble.net.mysql.MySQLPacket.PACKET_HEADER_SIZE) {
if (incompleteData == null) {
incompleteData = data;
} else {
//skip header in package
byte[] nextData = new byte[data.length - com.actiontech.dble.net.mysql.MySQLPacket.PACKET_HEADER_SIZE];
System.arraycopy(data, com.actiontech.dble.net.mysql.MySQLPacket.PACKET_HEADER_SIZE, nextData, 0, data.length - com.actiontech.dble.net.mysql.MySQLPacket.PACKET_HEADER_SIZE);
incompleteData = dataMerge(nextData);
@@ -108,6 +119,7 @@ public class MySQLProtoHandlerImpl implements ProtoHandler {
}
private byte[] dataMerge(byte[] data) {
//todo: could optimize here. for example ,use linked-buffer
byte[] newData = new byte[incompleteData.length + data.length];
System.arraycopy(incompleteData, 0, newData, 0, incompleteData.length);
System.arraycopy(data, 0, newData, incompleteData.length, data.length);
@@ -1,30 +1,20 @@
package com.actiontech.dble.backend.mysql.proto.handler;
public class ProtoHandlerResult {
import org.jetbrains.annotations.Nullable;
public final class ProtoHandlerResult {
final ProtoHandlerResultCode code;
final int offset;
final int packetLength;
final byte[] packetData;
final boolean hasMorePacket;
public ProtoHandlerResult(ProtoHandlerResultCode code, int offset, byte[] packetData) {
private ProtoHandlerResult(ProtoHandlerResultCode code, int offset, int packetLength, byte[] packetData, boolean hasMorePacket) {
this.code = code;
this.offset = offset;
this.packetData = packetData;
this.packetLength = 0;
}
public ProtoHandlerResult(ProtoHandlerResultCode code, int offset) {
this.code = code;
this.offset = offset;
this.packetData = null;
this.packetLength = 0;
}
public ProtoHandlerResult(ProtoHandlerResultCode code, int offset, int packetLength) {
this.code = code;
this.offset = offset;
this.packetData = null;
this.packetLength = packetLength;
this.packetData = packetData;
this.hasMorePacket = hasMorePacket;
}
public ProtoHandlerResultCode getCode() {
@@ -35,6 +25,7 @@ public class ProtoHandlerResult {
return offset;
}
@Nullable
public byte[] getPacketData() {
return packetData;
}
@@ -42,6 +33,56 @@ public class ProtoHandlerResult {
public int getPacketLength() {
return packetLength;
}
public boolean isHasMorePacket() {
return hasMorePacket;
}
public static ProtoHandlerResultBuilder builder() {
return new ProtoHandlerResultBuilder();
}
public static final class ProtoHandlerResultBuilder {
ProtoHandlerResultCode code;
int offset;
int packetLength;
byte[] packetData;
boolean hasMorePacket;
private ProtoHandlerResultBuilder() {
}
public ProtoHandlerResultBuilder setCode(ProtoHandlerResultCode val) {
this.code = val;
return this;
}
public ProtoHandlerResultBuilder setOffset(int val) {
this.offset = val;
return this;
}
public ProtoHandlerResultBuilder setPacketLength(int val) {
this.packetLength = val;
return this;
}
public ProtoHandlerResultBuilder setPacketData(byte[] val) {
this.packetData = val;
return this;
}
public ProtoHandlerResultBuilder setHasMorePacket(boolean val) {
this.hasMorePacket = val;
return this;
}
public ProtoHandlerResult build() {
return new ProtoHandlerResult(code, offset, packetLength, packetData, hasMorePacket);
}
}
}
@@ -5,8 +5,25 @@ package com.actiontech.dble.backend.mysql.proto.handler;
*/
public enum ProtoHandlerResultCode {
REACH_END_BUFFER,
/**
* receive a complete packet and has no more data exists in buffer.
*/
COMPLETE_PACKET,
/**
* receive a part of big packet.
*/
PART_OF_BIG_PACKET,
BUFFER_PACKET_UNCOMPLETE,
BUFFER_NOT_BIG_ENOUGH,
@Deprecated
REACH_END_BUFFER,
/**
* receive a complete packet and has rest data exists in buffer.
*/
@Deprecated
STLL_DATA_REMING
}
@@ -346,7 +346,7 @@ public abstract class AbstractConnection implements Connection {
// so we check again
try {
this.socketWR.doNextWriteCheck();
} catch (Exception e) {
} catch (Throwable e) {
LOGGER.info("writeDirectly err:", e);
this.close("writeDirectly err:" + e);
}
@@ -130,7 +130,6 @@ public class CommandPacket extends MySQLPacket {
BufferUtil.writeUB3(buffer, MySQLPacket.MAX_PACKET_SIZE);
buffer.put(packetId++);
remain = writeBody(buffer, isFirst, remain);
service.getSession().getShardingService().nextPacketId();
service.writeDirectly(buffer);
isFirst = false;
}
@@ -171,7 +171,7 @@ public abstract class MySQLPacket {
//HEADER_SIZE
public static final int PACKET_HEADER_SIZE = 4;
//2^24-1
public static final int MAX_PACKET_SIZE = 16777215;
public static final int MAX_EOF_SIZE = 9;
@@ -12,7 +12,6 @@ import com.actiontech.dble.backend.mysql.nio.handler.util.RowDataComparator;
import com.actiontech.dble.buffer.BufferPool;
import com.actiontech.dble.net.connection.AbstractConnection;
import com.actiontech.dble.net.service.AbstractService;
import com.actiontech.dble.singleton.BufferPoolManager;
import java.nio.ByteBuffer;
@@ -98,7 +97,7 @@ public class RowDataPacket extends MySQLPacket {
service.writeDirectly(bb);
ByteBuffer tmpBuffer = service.allocate(totalSize);
BufferUtil.writeUB3(tmpBuffer, calcPacketSize());
tmpBuffer.put(packetId--);
tmpBuffer.put(packetId);
writeBody(tmpBuffer);
byte[] array = tmpBuffer.array();
service.recycleBuffer(tmpBuffer);
@@ -12,6 +12,7 @@ import com.actiontech.dble.net.mysql.ErrorPacket;
import com.actiontech.dble.net.mysql.MySQLPacket;
import com.actiontech.dble.net.mysql.OkPacket;
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
import com.actiontech.dble.services.mysqlsharding.ShardingService;
import com.actiontech.dble.singleton.TraceManager;
import com.actiontech.dble.util.CompressUtil;
import com.actiontech.dble.util.StringUtil;
@@ -38,6 +39,7 @@ public abstract class AbstractService implements Service {
private volatile boolean isSupportCompress = false;
protected volatile ProtoHandler proto;
protected final BlockingQueue<ServiceTask> taskQueue;
private int sequenceId = 0;
public AbstractService(AbstractConnection connection) {
this.connection = connection;
@@ -53,44 +55,50 @@ public abstract class AbstractService implements Service {
@Override
public void handle(ByteBuffer dataBuffer) {
this.sessionStart();
boolean hasReming = true;
boolean hasRemaining = true;
int offset = 0;
while (hasReming) {
while (hasRemaining) {
ProtoHandlerResult result = proto.handle(dataBuffer, offset, isSupportCompress);
switch (result.getCode()) {
case REACH_END_BUFFER:
connection.readReachEnd();
byte[] packetData = result.getPacketData();
if (packetData != null) {
taskCreate(packetData);
case PART_OF_BIG_PACKET:
sequenceId++;
if (!result.isHasMorePacket()) {
connection.readReachEnd();
dataBuffer.clear();
}
break;
case COMPLETE_PACKET:
taskCreate(result.getPacketData());
if (!result.isHasMorePacket()) {
connection.readReachEnd();
dataBuffer.clear();
}
dataBuffer.clear();
hasReming = false;
break;
case BUFFER_PACKET_UNCOMPLETE:
connection.compactReadBuffer(dataBuffer, result.getOffset());
hasReming = false;
break;
case BUFFER_NOT_BIG_ENOUGH:
connection.ensureFreeSpaceOfReadBuffer(dataBuffer, result.getOffset(), result.getPacketLength());
hasReming = false;
break;
case STLL_DATA_REMING:
byte[] partData = result.getPacketData();
if (partData != null) {
taskCreate(partData);
}
offset = result.getOffset();
continue;
default:
throw new RuntimeException("unknown error when read data");
}
hasRemaining = result.isHasMorePacket();
if (hasRemaining) {
offset = result.getOffset();
}
}
}
protected void taskCreate(byte[] packetData) {
if (packetData == null) {
return;
}
if (beforeHandlingTask()) {
ServiceTask task = new ServiceTask(packetData, this);
ServiceTask task = new ServiceTask(packetData, this, sequenceId);
try {
taskQueue.put(task);
} catch (InterruptedException e) {
@@ -209,11 +217,15 @@ public abstract class AbstractService implements Service {
length -= (MySQLPacket.MAX_PACKET_SIZE + MySQLPacket.PACKET_HEADER_SIZE);
ByteUtil.writeUB3(singlePacket, MySQLPacket.MAX_PACKET_SIZE);
singlePacket[3] = data[3];
byte currentPacketId = data[3];
buffer = writeToBuffer(singlePacket, buffer);
while (length >= MySQLPacket.MAX_PACKET_SIZE) {
singlePacket = new byte[MySQLPacket.MAX_PACKET_SIZE + MySQLPacket.PACKET_HEADER_SIZE];
ByteUtil.writeUB3(singlePacket, MySQLPacket.MAX_PACKET_SIZE);
singlePacket[3] = (byte) nextPacketId();
singlePacket[3] = ++currentPacketId;
if (this instanceof ShardingService) {
singlePacket[3] = (byte) this.nextPacketId();
}
System.arraycopy(data, srcPos, singlePacket, MySQLPacket.PACKET_HEADER_SIZE, MySQLPacket.MAX_PACKET_SIZE);
srcPos += MySQLPacket.MAX_PACKET_SIZE;
length -= MySQLPacket.MAX_PACKET_SIZE;
@@ -221,7 +233,10 @@ public abstract class AbstractService implements Service {
}
singlePacket = new byte[length + MySQLPacket.PACKET_HEADER_SIZE];
ByteUtil.writeUB3(singlePacket, length);
singlePacket[3] = (byte) nextPacketId();
singlePacket[3] = ++currentPacketId;
if (this instanceof ShardingService) {
singlePacket[3] = (byte) this.nextPacketId();
}
System.arraycopy(data, srcPos, singlePacket, MySQLPacket.PACKET_HEADER_SIZE, length);
buffer = writeToBuffer(singlePacket, buffer);
return buffer;
@@ -297,16 +312,21 @@ public abstract class AbstractService implements Service {
try {
byte[] data = executeTask.getOrgData();
if (data != null && !executeTask.isReuse()) {
this.setPacketId(data[3]);
this.setPacketId(executeTask.getSequenceId());
}
if (isSupportCompress()) {
List<byte[]> packs = CompressUtil.decompressMysqlPacket(data, new ConcurrentLinkedQueue<>());
final ConcurrentLinkedQueue<byte[]> decompressUnfinishedDataQueue = new ConcurrentLinkedQueue<>();
List<byte[]> packs = CompressUtil.decompressMysqlPacket(data, decompressUnfinishedDataQueue);
if (decompressUnfinishedDataQueue.isEmpty()) {
sequenceId = 0;
}
for (byte[] pack : packs) {
if (pack.length != 0) {
handleInnerData(pack);
}
}
} else {
sequenceId = 0;
this.handleInnerData(data);
}
@@ -1,5 +1,7 @@
package com.actiontech.dble.net.service;
import javax.annotation.Nullable;
/**
* Created by szf on 2020/6/18.
*/
@@ -9,11 +11,13 @@ public class ServiceTask {
private volatile boolean highPriority = false;
private final boolean reuse;
private final Service service;
private Integer sequenceId = null;
public ServiceTask(byte[] orgData, Service service) {
public ServiceTask(byte[] orgData, Service service, @Nullable Integer sequenceId) {
this.orgData = orgData;
this.service = service;
this.reuse = false;
this.sequenceId = sequenceId;
}
public ServiceTask(byte[] orgData, Service service, boolean reuse) {
@@ -43,4 +47,15 @@ public class ServiceTask {
return reuse;
}
public int getSequenceId() {
if (sequenceId != null) {
return sequenceId;
} else {
if (orgData == null) {
throw new IllegalStateException("can't get Sequence Id from null");
}
return (orgData[3]);
}
}
}
@@ -22,8 +22,7 @@ public class LoadDataProtoHandlerImpl extends MySQLProtoHandlerImpl {
public ProtoHandlerResult handle(ByteBuffer dataBuffer, int dataBufferOffset, boolean isSupportCompress) {
ProtoHandlerResult result = super.handle(dataBuffer, dataBufferOffset, isSupportCompress);
switch (result.getCode()) {
case REACH_END_BUFFER:
case STLL_DATA_REMING:
case COMPLETE_PACKET:
byte[] packetData = result.getPacketData();
if (packetData != null) {
if (isEndOfDataFile(packetData)) {
@@ -31,7 +30,7 @@ public class LoadDataProtoHandlerImpl extends MySQLProtoHandlerImpl {
} else {
loadDataHandler.handle(packetData);
}
return new ProtoHandlerResult(result.getCode(), result.getOffset());
return ProtoHandlerResult.builder().setCode(result.getCode()).setOffset(result.getOffset()).setPacketLength(result.getPacketLength()).setHasMorePacket(result.isHasMorePacket()).build();
}
return result;
default:
@@ -113,7 +113,7 @@ public final class CompressUtil {
byteBuf.put(packet); //body
} else {
//todo: process for big package
byte[] compress = compress(packet); //compress
BufferUtil.writeUB3(byteBuf, compress.length);
@@ -150,13 +150,15 @@ public final class CompressUtil {
List<byte[]> lst = new ArrayList(1);
lst.add(data);
return lst;
//compressed failed
} else if (oldLen == 0) {
// Uncompressed Payload.
byte[] readBytes = msg.readBytes();
return splitPack(readBytes, decompressUnfinishedDataQueue);
//decompress
} else {
// Compressed Payload.
byte[] de = decompress(data, 7, data.length - 7);
return splitPack(de, decompressUnfinishedDataQueue);
}
@@ -170,7 +172,7 @@ public final class CompressUtil {
* @return
*/
private static List<byte[]> splitPack(byte[] in, ConcurrentLinkedQueue<byte[]> decompressUnfinishedDataQueue) {
//todo: process for big package
//merge
in = mergeBytes(in, decompressUnfinishedDataQueue);