From 7db6aa27c56dce18b0960273fbd67fa050e23d52 Mon Sep 17 00:00:00 2001 From: Rico Date: Fri, 16 Dec 2022 16:15:10 +0800 Subject: [PATCH] inner-2050: big packet support (#3554) Signed-off-by: dcy10000 Signed-off-by: dcy10000 --- .../handler/MultiNodeDDLExecuteHandler.java | 19 +++-- .../handler/MultiNodeDdlPrepareHandler.java | 19 +++-- .../nio/handler/MultiNodeQueryHandler.java | 18 +++-- .../handler/Impl/MySQLProtoHandlerImpl.java | 26 +++++-- .../proto/handler/ProtoHandlerResult.java | 75 ++++++++++++++----- .../proto/handler/ProtoHandlerResultCode.java | 19 ++++- .../net/connection/AbstractConnection.java | 2 +- .../dble/net/mysql/CommandPacket.java | 1 - .../dble/net/mysql/MySQLPacket.java | 2 +- .../dble/net/mysql/RowDataPacket.java | 3 +- .../dble/net/service/AbstractService.java | 66 ++++++++++------ .../dble/net/service/ServiceTask.java | 17 ++++- .../handler/LoadDataProtoHandlerImpl.java | 5 +- .../actiontech/dble/util/CompressUtil.java | 10 ++- 14 files changed, 200 insertions(+), 82 deletions(-) diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDDLExecuteHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDDLExecuteHandler.java index 2eaa5b2ad..65b13e6be 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDDLExecuteHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDDLExecuteHandler.java @@ -9,6 +9,7 @@ import com.actiontech.dble.DbleServer; import com.actiontech.dble.backend.datasource.ShardingNode; import com.actiontech.dble.cluster.values.DDLTraceInfo; import com.actiontech.dble.config.ErrorCode; +import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.log.transaction.TxnLogHelper; import com.actiontech.dble.net.connection.BackendConnection; import com.actiontech.dble.net.mysql.ErrorPacket; @@ -61,15 +62,19 @@ public class MultiNodeDDLExecuteHandler extends MultiNodeQueryHandler { lock.unlock(); } LOGGER.debug("rrs.getRunOnSlave()-" + rrs.getRunOnSlave()); - StringBuilder sb = new StringBuilder(); - for (final RouteResultsetNode node : rrs.getNodes()) { + for (RouteResultsetNode node : rrs.getNodes()) { unResponseRrns.add(node); - if (node.isModifySQL()) { - sb.append("[").append(node.getName()).append("]").append(node.getStatement()).append(";\n"); - } } - if (sb.length() > 0) { - TxnLogHelper.putTxnLog(session.getShardingService(), sb.toString()); + if (SystemConfig.getInstance().getRecordTxn() == 1) { + StringBuilder sb = new StringBuilder(); + for (final RouteResultsetNode node : rrs.getNodes()) { + if (node.isModifySQL()) { + sb.append("[").append(node.getName()).append("]").append(node.getStatement()).append(";\n"); + } + } + if (sb.length() > 0) { + TxnLogHelper.putTxnLog(session.getShardingService(), sb.toString()); + } } DDLTraceManager.getInstance().updateDDLStatus(DDLTraceInfo.DDLStage.EXECUTE_START, session.getShardingService()); diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDdlPrepareHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDdlPrepareHandler.java index 767437bb5..c574465e5 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDdlPrepareHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDdlPrepareHandler.java @@ -9,6 +9,7 @@ import com.actiontech.dble.DbleServer; import com.actiontech.dble.backend.datasource.ShardingNode; import com.actiontech.dble.cluster.values.DDLTraceInfo; import com.actiontech.dble.config.ErrorCode; +import com.actiontech.dble.config.model.SystemConfig; import com.actiontech.dble.log.transaction.TxnLogHelper; import com.actiontech.dble.net.connection.BackendConnection; import com.actiontech.dble.net.mysql.ErrorPacket; @@ -89,15 +90,19 @@ public class MultiNodeDdlPrepareHandler extends MultiNodeHandler implements Exec } LOGGER.debug("rrs.getRunOnSlave()-" + rrs.getRunOnSlave()); - StringBuilder sb = new StringBuilder(); - for (final RouteResultsetNode node : rrs.getNodes()) { + for (RouteResultsetNode node : rrs.getNodes()) { unResponseRrns.add(node); - if (node.isModifySQL()) { - sb.append("[").append(node.getName()).append("]").append(node.getStatement()).append(";\n"); - } } - if (sb.length() > 0) { - TxnLogHelper.putTxnLog(session.getShardingService(), sb.toString()); + if (SystemConfig.getInstance().getRecordTxn() == 1) { + StringBuilder sb = new StringBuilder(); + for (final RouteResultsetNode node : rrs.getNodes()) { + if (node.isModifySQL()) { + sb.append("[").append(node.getName()).append("]").append(node.getStatement()).append(";\n"); + } + } + if (sb.length() > 0) { + TxnLogHelper.putTxnLog(session.getShardingService(), sb.toString()); + } } DDLTraceManager.getInstance().updateDDLStatus(DDLTraceInfo.DDLStage.CONN_TEST_START, session.getShardingService()); diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java index ed63c90e7..aaa336e2f 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java @@ -126,15 +126,19 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR lock.unlock(); } LOGGER.debug("rrs.getRunOnSlave()-" + rrs.getRunOnSlave()); - StringBuilder sb = new StringBuilder(); - for (final RouteResultsetNode node : rrs.getNodes()) { + for (RouteResultsetNode node : rrs.getNodes()) { unResponseRrns.add(node); - if (node.isModifySQL()) { - sb.append("[").append(node.getName()).append("]").append(node.getStatement()).append(";\n"); - } } - if (sb.length() > 0) { - TxnLogHelper.putTxnLog(session.getShardingService(), sb.toString()); + if (SystemConfig.getInstance().getRecordTxn() == 1) { + StringBuilder sb = new StringBuilder(); + for (final RouteResultsetNode node : rrs.getNodes()) { + if (node.isModifySQL()) { + sb.append("[").append(node.getName()).append("]").append(node.getStatement()).append(";\n"); + } + } + if (sb.length() > 0) { + TxnLogHelper.putTxnLog(session.getShardingService(), sb.toString()); + } } for (final RouteResultsetNode node : rrs.getNodes()) { diff --git a/src/main/java/com/actiontech/dble/backend/mysql/proto/handler/Impl/MySQLProtoHandlerImpl.java b/src/main/java/com/actiontech/dble/backend/mysql/proto/handler/Impl/MySQLProtoHandlerImpl.java index 9eed32a93..ec6094d28 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/proto/handler/Impl/MySQLProtoHandlerImpl.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/proto/handler/Impl/MySQLProtoHandlerImpl.java @@ -6,6 +6,7 @@ import com.actiontech.dble.net.mysql.MySQLPacket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; import java.nio.ByteBuffer; import static com.actiontech.dble.backend.mysql.proto.handler.ProtoHandlerResultCode.*; @@ -23,17 +24,19 @@ public class MySQLProtoHandlerImpl implements ProtoHandler { } @Override + @Nonnull public ProtoHandlerResult handle(ByteBuffer dataBuffer, int offset, boolean isSupportCompress) { int position = dataBuffer.position(); int length = getPacketLength(dataBuffer, offset, isSupportCompress); + final ProtoHandlerResult.ProtoHandlerResultBuilder builder = ProtoHandlerResult.builder(); if (length == -1) { if (offset != 0) { - return new ProtoHandlerResult(BUFFER_PACKET_UNCOMPLETE, offset); + return builder.setCode(BUFFER_PACKET_UNCOMPLETE).setHasMorePacket(false).setOffset(offset).build(); } else if (!dataBuffer.hasRemaining()) { throw new RuntimeException("invalid dataBuffer capacity ,too little buffer size " + dataBuffer.capacity()); } - return new ProtoHandlerResult(BUFFER_PACKET_UNCOMPLETE, offset); + return builder.setCode(BUFFER_PACKET_UNCOMPLETE).setHasMorePacket(false).setOffset(offset).build(); } if (position >= offset + length) { // handle this package @@ -42,7 +45,9 @@ public class MySQLProtoHandlerImpl implements ProtoHandler { dataBuffer.get(data, 0, length); data = checkData(data, length); if (data == null) { - return new ProtoHandlerResult(REACH_END_BUFFER, offset); + builder.setCode(PART_OF_BIG_PACKET); + } else { + builder.setCode(COMPLETE_PACKET); } // offset to next position @@ -51,17 +56,21 @@ public class MySQLProtoHandlerImpl implements ProtoHandler { if (position != offset) { // try next package parse //dataBufferOffset = offset; + //should reset position after read. dataBuffer.position(position); - return new ProtoHandlerResult(STLL_DATA_REMING, offset, data); + builder.setHasMorePacket(true); + } else { + builder.setHasMorePacket(false); } - return new ProtoHandlerResult(REACH_END_BUFFER, offset, data); + builder.setOffset(offset).setPacketData(data); + return builder.build(); } else { // not read whole message package ,so check if buffer enough and // compact dataBuffer if (!dataBuffer.hasRemaining()) { - return new ProtoHandlerResult(BUFFER_NOT_BIG_ENOUGH, offset, length); + return builder.setCode(BUFFER_NOT_BIG_ENOUGH).setHasMorePacket(false).setOffset(offset).setPacketLength(length).build(); } else { - return new ProtoHandlerResult(BUFFER_PACKET_UNCOMPLETE, offset, length); + return builder.setCode(BUFFER_PACKET_UNCOMPLETE).setHasMorePacket(false).setOffset(offset).setPacketLength(length).build(); } } } @@ -86,10 +95,12 @@ public class MySQLProtoHandlerImpl implements ProtoHandler { private byte[] checkData(byte[] data, int length) { //session packet should be set to the latest one + //todo: this method doesn't apply for compress if (length >= com.actiontech.dble.net.mysql.MySQLPacket.MAX_PACKET_SIZE + com.actiontech.dble.net.mysql.MySQLPacket.PACKET_HEADER_SIZE) { if (incompleteData == null) { incompleteData = data; } else { + //skip header in package byte[] nextData = new byte[data.length - com.actiontech.dble.net.mysql.MySQLPacket.PACKET_HEADER_SIZE]; System.arraycopy(data, com.actiontech.dble.net.mysql.MySQLPacket.PACKET_HEADER_SIZE, nextData, 0, data.length - com.actiontech.dble.net.mysql.MySQLPacket.PACKET_HEADER_SIZE); incompleteData = dataMerge(nextData); @@ -108,6 +119,7 @@ public class MySQLProtoHandlerImpl implements ProtoHandler { } private byte[] dataMerge(byte[] data) { + //todo: could optimize here. for example ,use linked-buffer byte[] newData = new byte[incompleteData.length + data.length]; System.arraycopy(incompleteData, 0, newData, 0, incompleteData.length); System.arraycopy(data, 0, newData, incompleteData.length, data.length); diff --git a/src/main/java/com/actiontech/dble/backend/mysql/proto/handler/ProtoHandlerResult.java b/src/main/java/com/actiontech/dble/backend/mysql/proto/handler/ProtoHandlerResult.java index 22f559b32..52bc3b6ca 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/proto/handler/ProtoHandlerResult.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/proto/handler/ProtoHandlerResult.java @@ -1,30 +1,20 @@ package com.actiontech.dble.backend.mysql.proto.handler; -public class ProtoHandlerResult { +import org.jetbrains.annotations.Nullable; + +public final class ProtoHandlerResult { final ProtoHandlerResultCode code; final int offset; final int packetLength; final byte[] packetData; + final boolean hasMorePacket; - public ProtoHandlerResult(ProtoHandlerResultCode code, int offset, byte[] packetData) { + private ProtoHandlerResult(ProtoHandlerResultCode code, int offset, int packetLength, byte[] packetData, boolean hasMorePacket) { this.code = code; this.offset = offset; - this.packetData = packetData; - this.packetLength = 0; - } - - public ProtoHandlerResult(ProtoHandlerResultCode code, int offset) { - this.code = code; - this.offset = offset; - this.packetData = null; - this.packetLength = 0; - } - - public ProtoHandlerResult(ProtoHandlerResultCode code, int offset, int packetLength) { - this.code = code; - this.offset = offset; - this.packetData = null; this.packetLength = packetLength; + this.packetData = packetData; + this.hasMorePacket = hasMorePacket; } public ProtoHandlerResultCode getCode() { @@ -35,6 +25,7 @@ public class ProtoHandlerResult { return offset; } + @Nullable public byte[] getPacketData() { return packetData; } @@ -42,6 +33,56 @@ public class ProtoHandlerResult { public int getPacketLength() { return packetLength; } + + public boolean isHasMorePacket() { + return hasMorePacket; + } + + public static ProtoHandlerResultBuilder builder() { + return new ProtoHandlerResultBuilder(); + } + + + public static final class ProtoHandlerResultBuilder { + ProtoHandlerResultCode code; + int offset; + int packetLength; + byte[] packetData; + boolean hasMorePacket; + + private ProtoHandlerResultBuilder() { + } + + + public ProtoHandlerResultBuilder setCode(ProtoHandlerResultCode val) { + this.code = val; + return this; + } + + public ProtoHandlerResultBuilder setOffset(int val) { + this.offset = val; + return this; + } + + public ProtoHandlerResultBuilder setPacketLength(int val) { + this.packetLength = val; + return this; + } + + public ProtoHandlerResultBuilder setPacketData(byte[] val) { + this.packetData = val; + return this; + } + + public ProtoHandlerResultBuilder setHasMorePacket(boolean val) { + this.hasMorePacket = val; + return this; + } + + public ProtoHandlerResult build() { + return new ProtoHandlerResult(code, offset, packetLength, packetData, hasMorePacket); + } + } } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/proto/handler/ProtoHandlerResultCode.java b/src/main/java/com/actiontech/dble/backend/mysql/proto/handler/ProtoHandlerResultCode.java index 76f8df814..2af20176f 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/proto/handler/ProtoHandlerResultCode.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/proto/handler/ProtoHandlerResultCode.java @@ -5,8 +5,25 @@ package com.actiontech.dble.backend.mysql.proto.handler; */ public enum ProtoHandlerResultCode { - REACH_END_BUFFER, + /** + * receive a complete packet and has no more data exists in buffer. + */ + + COMPLETE_PACKET, + + /** + * receive a part of big packet. + */ + PART_OF_BIG_PACKET, BUFFER_PACKET_UNCOMPLETE, BUFFER_NOT_BIG_ENOUGH, + + + @Deprecated + REACH_END_BUFFER, + /** + * receive a complete packet and has rest data exists in buffer. + */ + @Deprecated STLL_DATA_REMING } diff --git a/src/main/java/com/actiontech/dble/net/connection/AbstractConnection.java b/src/main/java/com/actiontech/dble/net/connection/AbstractConnection.java index aa5c2f0af..1ab4619ed 100644 --- a/src/main/java/com/actiontech/dble/net/connection/AbstractConnection.java +++ b/src/main/java/com/actiontech/dble/net/connection/AbstractConnection.java @@ -346,7 +346,7 @@ public abstract class AbstractConnection implements Connection { // so we check again try { this.socketWR.doNextWriteCheck(); - } catch (Exception e) { + } catch (Throwable e) { LOGGER.info("writeDirectly err:", e); this.close("writeDirectly err:" + e); } diff --git a/src/main/java/com/actiontech/dble/net/mysql/CommandPacket.java b/src/main/java/com/actiontech/dble/net/mysql/CommandPacket.java index da33b80ab..7bdc2e844 100644 --- a/src/main/java/com/actiontech/dble/net/mysql/CommandPacket.java +++ b/src/main/java/com/actiontech/dble/net/mysql/CommandPacket.java @@ -130,7 +130,6 @@ public class CommandPacket extends MySQLPacket { BufferUtil.writeUB3(buffer, MySQLPacket.MAX_PACKET_SIZE); buffer.put(packetId++); remain = writeBody(buffer, isFirst, remain); - service.getSession().getShardingService().nextPacketId(); service.writeDirectly(buffer); isFirst = false; } diff --git a/src/main/java/com/actiontech/dble/net/mysql/MySQLPacket.java b/src/main/java/com/actiontech/dble/net/mysql/MySQLPacket.java index 19e547962..ee2b77a26 100644 --- a/src/main/java/com/actiontech/dble/net/mysql/MySQLPacket.java +++ b/src/main/java/com/actiontech/dble/net/mysql/MySQLPacket.java @@ -171,7 +171,7 @@ public abstract class MySQLPacket { //HEADER_SIZE public static final int PACKET_HEADER_SIZE = 4; - + //2^24-1 public static final int MAX_PACKET_SIZE = 16777215; public static final int MAX_EOF_SIZE = 9; diff --git a/src/main/java/com/actiontech/dble/net/mysql/RowDataPacket.java b/src/main/java/com/actiontech/dble/net/mysql/RowDataPacket.java index 5f60b587a..9f7f3930b 100644 --- a/src/main/java/com/actiontech/dble/net/mysql/RowDataPacket.java +++ b/src/main/java/com/actiontech/dble/net/mysql/RowDataPacket.java @@ -12,7 +12,6 @@ import com.actiontech.dble.backend.mysql.nio.handler.util.RowDataComparator; import com.actiontech.dble.buffer.BufferPool; import com.actiontech.dble.net.connection.AbstractConnection; import com.actiontech.dble.net.service.AbstractService; - import com.actiontech.dble.singleton.BufferPoolManager; import java.nio.ByteBuffer; @@ -98,7 +97,7 @@ public class RowDataPacket extends MySQLPacket { service.writeDirectly(bb); ByteBuffer tmpBuffer = service.allocate(totalSize); BufferUtil.writeUB3(tmpBuffer, calcPacketSize()); - tmpBuffer.put(packetId--); + tmpBuffer.put(packetId); writeBody(tmpBuffer); byte[] array = tmpBuffer.array(); service.recycleBuffer(tmpBuffer); diff --git a/src/main/java/com/actiontech/dble/net/service/AbstractService.java b/src/main/java/com/actiontech/dble/net/service/AbstractService.java index c96935a11..517e25274 100644 --- a/src/main/java/com/actiontech/dble/net/service/AbstractService.java +++ b/src/main/java/com/actiontech/dble/net/service/AbstractService.java @@ -12,6 +12,7 @@ import com.actiontech.dble.net.mysql.ErrorPacket; import com.actiontech.dble.net.mysql.MySQLPacket; import com.actiontech.dble.net.mysql.OkPacket; import com.actiontech.dble.services.mysqlsharding.MySQLResponseService; +import com.actiontech.dble.services.mysqlsharding.ShardingService; import com.actiontech.dble.singleton.TraceManager; import com.actiontech.dble.util.CompressUtil; import com.actiontech.dble.util.StringUtil; @@ -38,6 +39,7 @@ public abstract class AbstractService implements Service { private volatile boolean isSupportCompress = false; protected volatile ProtoHandler proto; protected final BlockingQueue taskQueue; + private int sequenceId = 0; public AbstractService(AbstractConnection connection) { this.connection = connection; @@ -53,44 +55,50 @@ public abstract class AbstractService implements Service { @Override public void handle(ByteBuffer dataBuffer) { this.sessionStart(); - boolean hasReming = true; + boolean hasRemaining = true; int offset = 0; - while (hasReming) { + while (hasRemaining) { ProtoHandlerResult result = proto.handle(dataBuffer, offset, isSupportCompress); switch (result.getCode()) { - case REACH_END_BUFFER: - connection.readReachEnd(); - byte[] packetData = result.getPacketData(); - if (packetData != null) { - taskCreate(packetData); + case PART_OF_BIG_PACKET: + + sequenceId++; + if (!result.isHasMorePacket()) { + connection.readReachEnd(); + dataBuffer.clear(); + } + + break; + case COMPLETE_PACKET: + taskCreate(result.getPacketData()); + if (!result.isHasMorePacket()) { + connection.readReachEnd(); + dataBuffer.clear(); } - dataBuffer.clear(); - hasReming = false; break; case BUFFER_PACKET_UNCOMPLETE: connection.compactReadBuffer(dataBuffer, result.getOffset()); - hasReming = false; break; case BUFFER_NOT_BIG_ENOUGH: connection.ensureFreeSpaceOfReadBuffer(dataBuffer, result.getOffset(), result.getPacketLength()); - hasReming = false; break; - case STLL_DATA_REMING: - byte[] partData = result.getPacketData(); - if (partData != null) { - taskCreate(partData); - } - offset = result.getOffset(); - continue; default: throw new RuntimeException("unknown error when read data"); } + + hasRemaining = result.isHasMorePacket(); + if (hasRemaining) { + offset = result.getOffset(); + } } } protected void taskCreate(byte[] packetData) { + if (packetData == null) { + return; + } if (beforeHandlingTask()) { - ServiceTask task = new ServiceTask(packetData, this); + ServiceTask task = new ServiceTask(packetData, this, sequenceId); try { taskQueue.put(task); } catch (InterruptedException e) { @@ -209,11 +217,15 @@ public abstract class AbstractService implements Service { length -= (MySQLPacket.MAX_PACKET_SIZE + MySQLPacket.PACKET_HEADER_SIZE); ByteUtil.writeUB3(singlePacket, MySQLPacket.MAX_PACKET_SIZE); singlePacket[3] = data[3]; + byte currentPacketId = data[3]; buffer = writeToBuffer(singlePacket, buffer); while (length >= MySQLPacket.MAX_PACKET_SIZE) { singlePacket = new byte[MySQLPacket.MAX_PACKET_SIZE + MySQLPacket.PACKET_HEADER_SIZE]; ByteUtil.writeUB3(singlePacket, MySQLPacket.MAX_PACKET_SIZE); - singlePacket[3] = (byte) nextPacketId(); + singlePacket[3] = ++currentPacketId; + if (this instanceof ShardingService) { + singlePacket[3] = (byte) this.nextPacketId(); + } System.arraycopy(data, srcPos, singlePacket, MySQLPacket.PACKET_HEADER_SIZE, MySQLPacket.MAX_PACKET_SIZE); srcPos += MySQLPacket.MAX_PACKET_SIZE; length -= MySQLPacket.MAX_PACKET_SIZE; @@ -221,7 +233,10 @@ public abstract class AbstractService implements Service { } singlePacket = new byte[length + MySQLPacket.PACKET_HEADER_SIZE]; ByteUtil.writeUB3(singlePacket, length); - singlePacket[3] = (byte) nextPacketId(); + singlePacket[3] = ++currentPacketId; + if (this instanceof ShardingService) { + singlePacket[3] = (byte) this.nextPacketId(); + } System.arraycopy(data, srcPos, singlePacket, MySQLPacket.PACKET_HEADER_SIZE, length); buffer = writeToBuffer(singlePacket, buffer); return buffer; @@ -297,16 +312,21 @@ public abstract class AbstractService implements Service { try { byte[] data = executeTask.getOrgData(); if (data != null && !executeTask.isReuse()) { - this.setPacketId(data[3]); + this.setPacketId(executeTask.getSequenceId()); } if (isSupportCompress()) { - List packs = CompressUtil.decompressMysqlPacket(data, new ConcurrentLinkedQueue<>()); + final ConcurrentLinkedQueue decompressUnfinishedDataQueue = new ConcurrentLinkedQueue<>(); + List packs = CompressUtil.decompressMysqlPacket(data, decompressUnfinishedDataQueue); + if (decompressUnfinishedDataQueue.isEmpty()) { + sequenceId = 0; + } for (byte[] pack : packs) { if (pack.length != 0) { handleInnerData(pack); } } } else { + sequenceId = 0; this.handleInnerData(data); } diff --git a/src/main/java/com/actiontech/dble/net/service/ServiceTask.java b/src/main/java/com/actiontech/dble/net/service/ServiceTask.java index bed7388fc..cd5608621 100644 --- a/src/main/java/com/actiontech/dble/net/service/ServiceTask.java +++ b/src/main/java/com/actiontech/dble/net/service/ServiceTask.java @@ -1,5 +1,7 @@ package com.actiontech.dble.net.service; +import javax.annotation.Nullable; + /** * Created by szf on 2020/6/18. */ @@ -9,11 +11,13 @@ public class ServiceTask { private volatile boolean highPriority = false; private final boolean reuse; private final Service service; + private Integer sequenceId = null; - public ServiceTask(byte[] orgData, Service service) { + public ServiceTask(byte[] orgData, Service service, @Nullable Integer sequenceId) { this.orgData = orgData; this.service = service; this.reuse = false; + this.sequenceId = sequenceId; } public ServiceTask(byte[] orgData, Service service, boolean reuse) { @@ -43,4 +47,15 @@ public class ServiceTask { return reuse; } + public int getSequenceId() { + if (sequenceId != null) { + return sequenceId; + } else { + if (orgData == null) { + throw new IllegalStateException("can't get Sequence Id from null"); + } + return (orgData[3]); + } + + } } diff --git a/src/main/java/com/actiontech/dble/services/mysqlsharding/handler/LoadDataProtoHandlerImpl.java b/src/main/java/com/actiontech/dble/services/mysqlsharding/handler/LoadDataProtoHandlerImpl.java index f67188f06..138c69219 100644 --- a/src/main/java/com/actiontech/dble/services/mysqlsharding/handler/LoadDataProtoHandlerImpl.java +++ b/src/main/java/com/actiontech/dble/services/mysqlsharding/handler/LoadDataProtoHandlerImpl.java @@ -22,8 +22,7 @@ public class LoadDataProtoHandlerImpl extends MySQLProtoHandlerImpl { public ProtoHandlerResult handle(ByteBuffer dataBuffer, int dataBufferOffset, boolean isSupportCompress) { ProtoHandlerResult result = super.handle(dataBuffer, dataBufferOffset, isSupportCompress); switch (result.getCode()) { - case REACH_END_BUFFER: - case STLL_DATA_REMING: + case COMPLETE_PACKET: byte[] packetData = result.getPacketData(); if (packetData != null) { if (isEndOfDataFile(packetData)) { @@ -31,7 +30,7 @@ public class LoadDataProtoHandlerImpl extends MySQLProtoHandlerImpl { } else { loadDataHandler.handle(packetData); } - return new ProtoHandlerResult(result.getCode(), result.getOffset()); + return ProtoHandlerResult.builder().setCode(result.getCode()).setOffset(result.getOffset()).setPacketLength(result.getPacketLength()).setHasMorePacket(result.isHasMorePacket()).build(); } return result; default: diff --git a/src/main/java/com/actiontech/dble/util/CompressUtil.java b/src/main/java/com/actiontech/dble/util/CompressUtil.java index 0f6dd1cff..19c1f851e 100644 --- a/src/main/java/com/actiontech/dble/util/CompressUtil.java +++ b/src/main/java/com/actiontech/dble/util/CompressUtil.java @@ -113,7 +113,7 @@ public final class CompressUtil { byteBuf.put(packet); //body } else { - + //todo: process for big package byte[] compress = compress(packet); //compress BufferUtil.writeUB3(byteBuf, compress.length); @@ -150,13 +150,15 @@ public final class CompressUtil { List lst = new ArrayList(1); lst.add(data); return lst; - //compressed failed + } else if (oldLen == 0) { + // Uncompressed Payload. byte[] readBytes = msg.readBytes(); return splitPack(readBytes, decompressUnfinishedDataQueue); - //decompress + } else { + // Compressed Payload. byte[] de = decompress(data, 7, data.length - 7); return splitPack(de, decompressUnfinishedDataQueue); } @@ -170,7 +172,7 @@ public final class CompressUtil { * @return */ private static List splitPack(byte[] in, ConcurrentLinkedQueue decompressUnfinishedDataQueue) { - + //todo: process for big package //merge in = mergeBytes(in, decompressUnfinishedDataQueue);