From de13077fe881da516a7ad30cd831c1914de2e1ea Mon Sep 17 00:00:00 2001 From: wenyh1 <2365151147@qq.com> Date: Tue, 23 Mar 2021 13:16:39 +0800 Subject: [PATCH] inner-983&2184: fixed load data lost connection (#2545) (cherry picked from commit 2b7c5cc536ee9b092eb51492eacc711ca0e7d20e) --- .../dble/net/service/AbstractService.java | 44 ++++++++++--------- .../dble/net/service/ServiceTask.java | 21 ++++----- 2 files changed, 31 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/actiontech/dble/net/service/AbstractService.java b/src/main/java/com/actiontech/dble/net/service/AbstractService.java index 16e405e06..52c768704 100644 --- a/src/main/java/com/actiontech/dble/net/service/AbstractService.java +++ b/src/main/java/com/actiontech/dble/net/service/AbstractService.java @@ -41,7 +41,7 @@ public abstract class AbstractService implements Service { private volatile boolean isSupportCompress = false; protected volatile ProtoHandler proto; protected final BlockingQueue taskQueue; - private int sequenceId = 0; + private int extraPartOfBigPacketCount = 0; public AbstractService(AbstractConnection connection) { this.connection = connection; @@ -64,7 +64,7 @@ public abstract class AbstractService implements Service { switch (result.getCode()) { case PART_OF_BIG_PACKET: - sequenceId++; + extraPartOfBigPacketCount++; if (!result.isHasMorePacket()) { connection.readReachEnd(); dataBuffer.clear(); @@ -99,8 +99,26 @@ public abstract class AbstractService implements Service { if (packetData == null) { return; } + int tmpCount = extraPartOfBigPacketCount; + if (!isSupportCompress) { + extraPartOfBigPacketCount = 0; + handleTask(new ServiceTask(packetData, this, tmpCount)); + } else { + final ConcurrentLinkedQueue decompressUnfinishedDataQueue = new ConcurrentLinkedQueue<>(); + List packs = CompressUtil.decompressMysqlPacket(packetData, decompressUnfinishedDataQueue); + if (decompressUnfinishedDataQueue.isEmpty()) { + extraPartOfBigPacketCount = 0; + } + for (byte[] pack : packs) { + if (pack.length != 0) { + handleTask(new ServiceTask(pack, this, tmpCount)); + } + } + } + } + + public void handleTask(ServiceTask task) { if (beforeHandlingTask()) { - ServiceTask task = new ServiceTask(packetData, this, sequenceId); try { taskQueue.put(task); } catch (InterruptedException e) { @@ -322,28 +340,12 @@ public abstract class AbstractService implements Service { try { byte[] data = executeTask.getOrgData(); if (data != null && !executeTask.isReuse()) { - this.setPacketId(executeTask.getSequenceId()); + this.setPacketId(executeTask.getLastSequenceId()); } if (data != null && data.length - MySQLPacket.PACKET_HEADER_SIZE >= SystemConfig.getInstance().getMaxPacketSize()) { throw new IllegalArgumentException("Packet for query is too large (" + data.length + " > " + SystemConfig.getInstance().getMaxPacketSize() + ").You can change maxPacketSize value in bootstrap.cnf."); } - - if (isSupportCompress()) { - final ConcurrentLinkedQueue decompressUnfinishedDataQueue = new ConcurrentLinkedQueue<>(); - List packs = CompressUtil.decompressMysqlPacket(data, decompressUnfinishedDataQueue); - if (decompressUnfinishedDataQueue.isEmpty()) { - sequenceId = 0; - } - for (byte[] pack : packs) { - if (pack.length != 0) { - handleInnerData(pack); - } - } - } else { - sequenceId = 0; - this.handleInnerData(data); - - } + this.handleInnerData(data); } catch (Throwable e) { String msg = e.getMessage(); if (StringUtil.isEmpty(msg)) { diff --git a/src/main/java/com/actiontech/dble/net/service/ServiceTask.java b/src/main/java/com/actiontech/dble/net/service/ServiceTask.java index cd5608621..d63b184c2 100644 --- a/src/main/java/com/actiontech/dble/net/service/ServiceTask.java +++ b/src/main/java/com/actiontech/dble/net/service/ServiceTask.java @@ -1,6 +1,6 @@ package com.actiontech.dble.net.service; -import javax.annotation.Nullable; +import java.util.Arrays; /** * Created by szf on 2020/6/18. @@ -11,13 +11,13 @@ public class ServiceTask { private volatile boolean highPriority = false; private final boolean reuse; private final Service service; - private Integer sequenceId = null; + private int extraPartOfBigPacketCount = 0; - public ServiceTask(byte[] orgData, Service service, @Nullable Integer sequenceId) { + public ServiceTask(byte[] orgData, Service service, int extraPartsOfBigPacketCount) { this.orgData = orgData; this.service = service; this.reuse = false; - this.sequenceId = sequenceId; + this.extraPartOfBigPacketCount = extraPartsOfBigPacketCount; } public ServiceTask(byte[] orgData, Service service, boolean reuse) { @@ -47,15 +47,10 @@ public class ServiceTask { return reuse; } - public int getSequenceId() { - if (sequenceId != null) { - return sequenceId; - } else { - if (orgData == null) { - throw new IllegalStateException("can't get Sequence Id from null"); - } - return (orgData[3]); + public int getLastSequenceId() { + if (orgData == null || orgData.length < 4) { + throw new IllegalStateException("can't get Sequence Id from " + Arrays.toString(orgData)); } - + return (orgData[3]) + extraPartOfBigPacketCount; } }