fix loaddata Memory overflow inner 1936 (#3436)

This commit is contained in:
ylinzhu
2022-10-18 17:13:47 +08:00
committed by lin
parent 7557aa183b
commit 91f5a16ec0
@@ -85,9 +85,6 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
LOGGER.debug("execute multi node query " + rrs.getStatement());
}
this.rrs = rrs;
if (ServerParse.LOAD_DATA_INFILE_SQL == rrs.getSqlType()) {
byteBuffer = session.getSource().allocate();
}
this.sessionAutocommit = session.getShardingService().isAutocommit();
this.modifiedSQL = rrs.getNodes()[0].isModifySQL();
requestScope = session.getShardingService().getRequestScope();
@@ -209,6 +206,15 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
@Override
public void writeRemainBuffer() {
lock.lock();
try {
if (byteBuffer != null) {
session.getShardingService().writeDirectly(byteBuffer);
byteBuffer = null;
}
} finally {
lock.unlock();
}
}
void cleanBuffer() {
@@ -316,6 +322,9 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
ErrorPacket errorPacket = createErrPkg(this.error, err.getErrNo());
handleEndPacket(errorPacket, AutoTxOperation.ROLLBACK, false);
}
} catch (Exception e) {
cleanBuffer();
handleDataProcessException(e);
} finally {
lock.unlock();
}
@@ -347,6 +356,10 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
session.setRowCount(affectedRows);
shardingService.setLastInsertId(packet.getInsertId());
handleEndPacket(packet, AutoTxOperation.ROLLBACK, true);
cleanBuffer();
} catch (Exception e) {
cleanBuffer();
handleDataProcessException(e);
} finally {
lock.unlock();
}
@@ -438,8 +451,12 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
doSqlStat();
deleteErrorFile();
handleEndPacket(ok, AutoTxOperation.COMMIT, true);
cleanBuffer();
}
}
} catch (Exception e) {
cleanBuffer();
handleDataProcessException(e);
} finally {
lock.unlock();
}
@@ -465,6 +482,10 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
if (fieldsReturned) {
return;
}
if (byteBuffer == null && ServerParse.LOAD_DATA_INFILE_SQL == rrs.getSqlType()) {
byteBuffer = session.getSource().allocate();
}
this.resultSize += header.length;
for (byte[] field : fields) {
this.resultSize += field.length;
@@ -579,6 +600,9 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
public boolean rowResponse(final byte[] row, RowDataPacket rowPacketNull, boolean isLeft, AbstractService service) {
lock.lock();
try {
if (session.closed()) {
cleanBuffer();
}
RouteResultsetNode rrn = (RouteResultsetNode) ((MySQLResponseService) service).getAttachment();
RowDataPacket rowDataPkg = new RowDataPacket(3);
rowDataPkg.read(row);
@@ -588,6 +612,9 @@ public class MultiNodeLoadDataHandler extends MultiNodeHandler implements LoadDa
warnings.put(name, Lists.newArrayList());
}
warnings.get(name).add(new String(rowDataPkg.fieldValues.get(2)));
} catch (Exception e) {
cleanBuffer();
handleDataProcessException(e);
} finally {
lock.unlock();
}