mirror of
https://github.com/actiontech/dble.git
synced 2026-01-06 04:40:17 -06:00
fix loaddata Memory overflow inner 1936 (#3436)
This commit is contained in:
@@ -89,9 +89,6 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
|
||||
LOGGER.debug("execute multi node query " + rrs.getStatement());
|
||||
}
|
||||
this.rrs = rrs;
|
||||
if (ServerParse.LOAD_DATA_INFILE_SQL == rrs.getSqlType()) {
|
||||
byteBuffer = session.getSource().allocate();
|
||||
}
|
||||
this.sessionAutocommit = session.getShardingService().isAutocommit();
|
||||
this.modifiedSQL = rrs.getNodes()[0].isModifySQL();
|
||||
requestScope = session.getShardingService().getRequestScope();
|
||||
@@ -207,6 +204,15 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
|
||||
|
||||
@Override
|
||||
public void writeRemainBuffer() {
|
||||
lock.lock();
|
||||
try {
|
||||
if (byteBuffer != null) {
|
||||
session.getShardingService().writeDirectly(byteBuffer, WriteFlags.PART);
|
||||
byteBuffer = null;
|
||||
}
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void cleanBuffer() {
|
||||
@@ -313,6 +319,9 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
|
||||
ErrorPacket errorPacket = createErrPkg(this.error, err.getErrNo());
|
||||
handleEndPacket(errorPacket, AutoTxOperation.ROLLBACK, false);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
cleanBuffer();
|
||||
handleDataProcessException(e);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
@@ -344,6 +353,10 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
|
||||
session.setRowCount(affectedRows);
|
||||
shardingService.setLastInsertId(packet.getInsertId());
|
||||
handleEndPacket(packet, AutoTxOperation.ROLLBACK, true);
|
||||
cleanBuffer();
|
||||
} catch (Exception e) {
|
||||
cleanBuffer();
|
||||
handleDataProcessException(e);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
@@ -436,8 +449,12 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
|
||||
doSqlStat();
|
||||
deleteErrorFile();
|
||||
handleEndPacket(ok, AutoTxOperation.COMMIT, true);
|
||||
cleanBuffer();
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
cleanBuffer();
|
||||
handleDataProcessException(e);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
@@ -463,6 +480,10 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
|
||||
if (fieldsReturned) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (byteBuffer == null && ServerParse.LOAD_DATA_INFILE_SQL == rrs.getSqlType()) {
|
||||
byteBuffer = session.getSource().allocate();
|
||||
}
|
||||
this.resultSize += header.length;
|
||||
for (byte[] field : fields) {
|
||||
this.resultSize += field.length;
|
||||
@@ -577,6 +598,9 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
|
||||
public boolean rowResponse(final byte[] row, RowDataPacket rowPacketNull, boolean isLeft, @NotNull AbstractService service) {
|
||||
lock.lock();
|
||||
try {
|
||||
if (session.closed()) {
|
||||
cleanBuffer();
|
||||
}
|
||||
RouteResultsetNode rrn = (RouteResultsetNode) ((MySQLResponseService) service).getAttachment();
|
||||
RowDataPacket rowDataPkg = new RowDataPacket(3);
|
||||
rowDataPkg.read(row);
|
||||
@@ -586,6 +610,9 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
|
||||
warnings.put(name, Lists.newArrayList());
|
||||
}
|
||||
warnings.get(name).add(new String(rowDataPkg.fieldValues.get(2)));
|
||||
} catch (Exception e) {
|
||||
cleanBuffer();
|
||||
handleDataProcessException(e);
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user