diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeLoadDataHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeLoadDataHandler.java index c85f4b62a..7072f4fe9 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeLoadDataHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeLoadDataHandler.java @@ -89,9 +89,6 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa LOGGER.debug("execute multi node query " + rrs.getStatement()); } this.rrs = rrs; - if (ServerParse.LOAD_DATA_INFILE_SQL == rrs.getSqlType()) { - byteBuffer = session.getSource().allocate(); - } this.sessionAutocommit = session.getShardingService().isAutocommit(); this.modifiedSQL = rrs.getNodes()[0].isModifySQL(); requestScope = session.getShardingService().getRequestScope(); @@ -207,6 +204,15 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa @Override public void writeRemainBuffer() { + lock.lock(); + try { + if (byteBuffer != null) { + session.getShardingService().writeDirectly(byteBuffer, WriteFlags.PART); + byteBuffer = null; + } + } finally { + lock.unlock(); + } } void cleanBuffer() { @@ -313,6 +319,9 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa ErrorPacket errorPacket = createErrPkg(this.error, err.getErrNo()); handleEndPacket(errorPacket, AutoTxOperation.ROLLBACK, false); } + } catch (Exception e) { + cleanBuffer(); + handleDataProcessException(e); } finally { lock.unlock(); } @@ -344,6 +353,10 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa session.setRowCount(affectedRows); shardingService.setLastInsertId(packet.getInsertId()); handleEndPacket(packet, AutoTxOperation.ROLLBACK, true); + cleanBuffer(); + } catch (Exception e) { + cleanBuffer(); + handleDataProcessException(e); } finally { lock.unlock(); } @@ -436,8 +449,12 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa doSqlStat(); deleteErrorFile(); handleEndPacket(ok, AutoTxOperation.COMMIT, true); + cleanBuffer(); } } + } catch (Exception e) { + cleanBuffer(); + handleDataProcessException(e); } finally { lock.unlock(); } @@ -463,6 +480,10 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa if (fieldsReturned) { return; } + + if (byteBuffer == null && ServerParse.LOAD_DATA_INFILE_SQL == rrs.getSqlType()) { + byteBuffer = session.getSource().allocate(); + } this.resultSize += header.length; for (byte[] field : fields) { this.resultSize += field.length; @@ -577,6 +598,9 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa public boolean rowResponse(final byte[] row, RowDataPacket rowPacketNull, boolean isLeft, @NotNull AbstractService service) { lock.lock(); try { + if (session.closed()) { + cleanBuffer(); + } RouteResultsetNode rrn = (RouteResultsetNode) ((MySQLResponseService) service).getAttachment(); RowDataPacket rowDataPkg = new RowDataPacket(3); rowDataPkg.read(row); @@ -586,6 +610,9 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa warnings.put(name, Lists.newArrayList()); } warnings.get(name).add(new String(rowDataPkg.fieldValues.get(2))); + } catch (Exception e) { + cleanBuffer(); + handleDataProcessException(e); } finally { lock.unlock(); }