diff --git a/src/main/java/com/actiontech/dble/net/connection/AbstractConnection.java b/src/main/java/com/actiontech/dble/net/connection/AbstractConnection.java index 991b28acf..df390c1bf 100644 --- a/src/main/java/com/actiontech/dble/net/connection/AbstractConnection.java +++ b/src/main/java/com/actiontech/dble/net/connection/AbstractConnection.java @@ -58,7 +58,7 @@ public abstract class AbstractConnection implements Connection { protected long netInBytes; protected long netOutBytes; protected long lastLargeMessageTime; - private int sequenceId = 0; + private int extraPartOfBigPacketCount = 0; protected final ConcurrentLinkedQueue writeQueue = new ConcurrentLinkedQueue<>(); private final ConcurrentLinkedQueue decompressUnfinishedDataQueue = new ConcurrentLinkedQueue<>(); @@ -99,7 +99,7 @@ public abstract class AbstractConnection implements Connection { switch (result.getCode()) { case PART_OF_BIG_PACKET: - sequenceId++; + extraPartOfBigPacketCount++; if (!result.isHasMorePacket()) { readReachEnd(); dataBuffer.clear(); @@ -133,18 +133,18 @@ public abstract class AbstractConnection implements Connection { private void processPacketData(ProtoHandlerResult result) { byte[] packetData = result.getPacketData(); if (packetData != null) { - int tmpSequenceId = sequenceId; + int tmpCount = extraPartOfBigPacketCount; if (!isSupportCompress) { - sequenceId = 0; - service.handle(new ServiceTask(packetData, service, tmpSequenceId)); + extraPartOfBigPacketCount = 0; + service.handle(new ServiceTask(packetData, service, tmpCount)); } else { List packs = CompressUtil.decompressMysqlPacket(packetData, decompressUnfinishedDataQueue); if (decompressUnfinishedDataQueue.isEmpty()) { - sequenceId = 0; + extraPartOfBigPacketCount = 0; } for (byte[] pack : packs) { if (pack.length != 0) { - service.handle(new ServiceTask(pack, service, tmpSequenceId++)); + service.handle(new ServiceTask(pack, service, tmpCount)); } } } diff --git a/src/main/java/com/actiontech/dble/net/service/ServiceTask.java b/src/main/java/com/actiontech/dble/net/service/ServiceTask.java index 5b9e6ee48..f8eaef2a6 100644 --- a/src/main/java/com/actiontech/dble/net/service/ServiceTask.java +++ b/src/main/java/com/actiontech/dble/net/service/ServiceTask.java @@ -1,6 +1,6 @@ package com.actiontech.dble.net.service; -import javax.annotation.Nullable; +import java.util.Arrays; /** * Created by szf on 2020/6/18. @@ -10,13 +10,18 @@ public class ServiceTask { private final byte[] orgData; private final boolean reuse; private final Service service; - private Integer sequenceId = null; + private int extraPartOfBigPacketCount = 0; - public ServiceTask(byte[] orgData, Service service, @Nullable Integer sequenceId) { + /** + * @param orgData + * @param service + * @param extraPartsOfBigPacketCount if orgData are big packet, it contains some *extra* parts of big packet,you should pass the count.If orgData isn't big packet ,just set 0. + */ + public ServiceTask(byte[] orgData, Service service, int extraPartsOfBigPacketCount) { this.orgData = orgData; this.service = service; this.reuse = false; - this.sequenceId = sequenceId; + this.extraPartOfBigPacketCount = extraPartsOfBigPacketCount; } public ServiceTask(byte[] orgData, Service service, boolean reuse) { @@ -37,15 +42,10 @@ public class ServiceTask { return reuse; } - public int getSequenceId() { - if (sequenceId != null) { - return sequenceId; - } else { - if (orgData == null) { - throw new IllegalStateException("can't get Sequence Id from null"); - } - return (orgData[3]); + public int getLastSequenceId() { + if (orgData == null || orgData.length < 4) { + throw new IllegalStateException("can't get Sequence Id from " + Arrays.toString(orgData)); } - + return (orgData[3]) + extraPartOfBigPacketCount; } } diff --git a/src/main/java/com/actiontech/dble/services/FrontendService.java b/src/main/java/com/actiontech/dble/services/FrontendService.java index 734ba6c44..21fa12868 100644 --- a/src/main/java/com/actiontech/dble/services/FrontendService.java +++ b/src/main/java/com/actiontech/dble/services/FrontendService.java @@ -107,7 +107,7 @@ public abstract class FrontendService extends AbstractServ if (executeTask != null) { byte[] data = executeTask.getOrgData(); if (data != null && !executeTask.isReuse()) { - this.setPacketId(executeTask.getSequenceId()); + this.setPacketId(executeTask.getLastSequenceId()); } this.handleInnerData(data); @@ -134,7 +134,7 @@ public abstract class FrontendService extends AbstractServ private void taskToPriorityQueue(ServiceTask task) { DbleServer.getInstance().getFrontPriorityQueue().offer(task); - DbleServer.getInstance().getFrontHandlerQueue().offer(new ServiceTask(null, null, null)); + DbleServer.getInstance().getFrontHandlerQueue().offer(new ServiceTask(null, null, 0)); } @Override