inner-983: fixed load data lost connection (#2545)

Signed-off-by: dcy <dcy10000@gmail.com>
This commit is contained in:
Rico
2021-03-23 13:16:39 +08:00
committed by GitHub
parent aba3be36c6
commit 2b7c5cc536
3 changed files with 22 additions and 22 deletions

View File

@@ -58,7 +58,7 @@ public abstract class AbstractConnection implements Connection {
protected long netInBytes;
protected long netOutBytes;
protected long lastLargeMessageTime;
private int sequenceId = 0;
private int extraPartOfBigPacketCount = 0;
protected final ConcurrentLinkedQueue<WriteOutTask> writeQueue = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<byte[]> decompressUnfinishedDataQueue = new ConcurrentLinkedQueue<>();
@@ -99,7 +99,7 @@ public abstract class AbstractConnection implements Connection {
switch (result.getCode()) {
case PART_OF_BIG_PACKET:
sequenceId++;
extraPartOfBigPacketCount++;
if (!result.isHasMorePacket()) {
readReachEnd();
dataBuffer.clear();
@@ -133,18 +133,18 @@ public abstract class AbstractConnection implements Connection {
private void processPacketData(ProtoHandlerResult result) {
byte[] packetData = result.getPacketData();
if (packetData != null) {
int tmpSequenceId = sequenceId;
int tmpCount = extraPartOfBigPacketCount;
if (!isSupportCompress) {
sequenceId = 0;
service.handle(new ServiceTask(packetData, service, tmpSequenceId));
extraPartOfBigPacketCount = 0;
service.handle(new ServiceTask(packetData, service, tmpCount));
} else {
List<byte[]> packs = CompressUtil.decompressMysqlPacket(packetData, decompressUnfinishedDataQueue);
if (decompressUnfinishedDataQueue.isEmpty()) {
sequenceId = 0;
extraPartOfBigPacketCount = 0;
}
for (byte[] pack : packs) {
if (pack.length != 0) {
service.handle(new ServiceTask(pack, service, tmpSequenceId++));
service.handle(new ServiceTask(pack, service, tmpCount));
}
}
}

View File

@@ -1,6 +1,6 @@
package com.actiontech.dble.net.service;
import javax.annotation.Nullable;
import java.util.Arrays;
/**
* Created by szf on 2020/6/18.
@@ -10,13 +10,18 @@ public class ServiceTask {
private final byte[] orgData;
private final boolean reuse;
private final Service service;
private Integer sequenceId = null;
private int extraPartOfBigPacketCount = 0;
public ServiceTask(byte[] orgData, Service service, @Nullable Integer sequenceId) {
/**
* @param orgData
* @param service
* @param extraPartsOfBigPacketCount if orgData are big packet, it contains some *extra* parts of big packet,you should pass the count.If orgData isn't big packet ,just set 0.
*/
public ServiceTask(byte[] orgData, Service service, int extraPartsOfBigPacketCount) {
this.orgData = orgData;
this.service = service;
this.reuse = false;
this.sequenceId = sequenceId;
this.extraPartOfBigPacketCount = extraPartsOfBigPacketCount;
}
public ServiceTask(byte[] orgData, Service service, boolean reuse) {
@@ -37,15 +42,10 @@ public class ServiceTask {
return reuse;
}
public int getSequenceId() {
if (sequenceId != null) {
return sequenceId;
} else {
if (orgData == null) {
throw new IllegalStateException("can't get Sequence Id from null");
}
return (orgData[3]);
public int getLastSequenceId() {
if (orgData == null || orgData.length < 4) {
throw new IllegalStateException("can't get Sequence Id from " + Arrays.toString(orgData));
}
return (orgData[3]) + extraPartOfBigPacketCount;
}
}

View File

@@ -107,7 +107,7 @@ public abstract class FrontendService<T extends UserConfig> extends AbstractServ
if (executeTask != null) {
byte[] data = executeTask.getOrgData();
if (data != null && !executeTask.isReuse()) {
this.setPacketId(executeTask.getSequenceId());
this.setPacketId(executeTask.getLastSequenceId());
}
this.handleInnerData(data);
@@ -134,7 +134,7 @@ public abstract class FrontendService<T extends UserConfig> extends AbstractServ
private void taskToPriorityQueue(ServiceTask task) {
DbleServer.getInstance().getFrontPriorityQueue().offer(task);
DbleServer.getInstance().getFrontHandlerQueue().offer(new ServiceTask(null, null, null));
DbleServer.getInstance().getFrontHandlerQueue().offer(new ServiceTask(null, null, 0));
}
@Override