inner-1239:avoid thread remaining for group by (#2761)

This commit is contained in:
Collapsar
2021-07-02 14:56:46 +08:00
committed by GitHub
parent ddba6b12d4
commit 18e58d9806
6 changed files with 27 additions and 22 deletions
@@ -7,6 +7,7 @@ package com.actiontech.dble.backend.mysql.nio.handler.query;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.net.Session;
import com.actiontech.dble.net.mysql.RowDataPacket;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -17,6 +18,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
* @CreateTime 2014/11/27
*/
public abstract class OwnThreadDMLHandler extends BaseDMLHandler {
public static final RowDataPacket TERMINATED_ROW = new RowDataPacket(0);
/* if the own thread need to terminated, true if own thread running */
private AtomicBoolean ownJobFlag;
private Object ownThreadLock = new Object();
@@ -87,7 +87,7 @@ public class OrderByHandler extends OwnThreadDMLHandler {
if (terminate.get())
return;
try {
queue.put(new RowDataPacket(0));
queue.put(TERMINATED_ROW);
} catch (InterruptedException e) {
//ignore error
}
@@ -148,9 +148,9 @@ public class OrderByHandler extends OwnThreadDMLHandler {
}
@Override
protected void terminateThread() throws Exception {
protected void terminateThread() {
this.queue.clear();
this.queue.add(new RowDataPacket(0));
this.queue.add(TERMINATED_ROW);
}
@Override
@@ -214,7 +214,7 @@ public class DirectGroupByHandler extends OwnThreadDMLHandler {
try {
// @bug1042
for (int i = 0; i < bucketSize; i++)
queue.put(new RowDataPacket(0));
queue.put(TERMINATED_ROW);
} catch (InterruptedException e) {
//ignore error
}
@@ -314,7 +314,7 @@ public class DirectGroupByHandler extends OwnThreadDMLHandler {
protected void terminateThread() throws Exception {
this.queue.clear();
for (int i = 0; i < bucketSize; i++)
queue.put(new RowDataPacket(0));
queue.put(TERMINATED_ROW);
}
@Override
@@ -5,6 +5,7 @@
package com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.directgroupby;
import com.actiontech.dble.backend.mysql.nio.handler.query.OwnThreadDMLHandler;
import com.actiontech.dble.backend.mysql.nio.handler.util.RowDataComparator;
import com.actiontech.dble.backend.mysql.store.GroupByLocalResult;
import com.actiontech.dble.buffer.BufferPool;
@@ -50,7 +51,7 @@ public class GroupByBucket extends GroupByLocalResult {
RowDataPacket groupedRow = null;
while ((groupedRow = next()) != null)
outData.put(groupedRow);
outData.put(new RowDataPacket((0)));
outData.put(OwnThreadDMLHandler.TERMINATED_ROW);
} catch (Exception e) {
e.printStackTrace();
}
@@ -59,4 +60,10 @@ public class GroupByBucket extends GroupByLocalResult {
thread.start();
}
@Override
public void close() {
inData.add(OwnThreadDMLHandler.TERMINATED_ROW);
super.close();
}
}
@@ -160,7 +160,7 @@ public class JoinHandler extends OwnThreadDMLHandler {
if (terminate.get()) {
return;
}
RowDataPacket eofRow = new RowDataPacket(0);
RowDataPacket eofRow = TERMINATED_ROW;
try {
if (isLeft) {
LOGGER.debug("row eof left");
@@ -368,16 +368,15 @@ public class JoinHandler extends OwnThreadDMLHandler {
/**
* only for terminate.
*
* @param row
* @param columnCount
* @param deque
* @throws InterruptedException
*/
private void addEndRowToDeque(RowDataPacket row, int columnCount, FairLinkedBlockingDeque<LocalResult> deque)
private void addEndRowToDeque(int columnCount, FairLinkedBlockingDeque<LocalResult> deque)
throws InterruptedException {
LocalResult newLocalResult = new UnSortedLocalResult(columnCount, pool, this.charset).
setMemSizeController(session.getJoinBufferMC());
newLocalResult.add(row);
newLocalResult.add(TERMINATED_ROW);
newLocalResult.done();
LocalResult localResult = deque.addOrReplaceLast(newLocalResult);
if (localResult != null)
@@ -386,10 +385,8 @@ public class JoinHandler extends OwnThreadDMLHandler {
@Override
protected void terminateThread() throws Exception {
RowDataPacket eofRow = new RowDataPacket(0);
addEndRowToDeque(eofRow, leftFieldPackets.size(), leftQueue);
RowDataPacket eofRow2 = new RowDataPacket(0);
addEndRowToDeque(eofRow2, rightFieldPackets.size(), rightQueue);
addEndRowToDeque(leftFieldPackets.size(), leftQueue);
addEndRowToDeque(rightFieldPackets.size(), rightQueue);
}
@Override
@@ -113,7 +113,7 @@ public class NotInHandler extends OwnThreadDMLHandler {
if (terminate.get()) {
return;
}
RowDataPacket eofRow = new RowDataPacket(0);
RowDataPacket eofRow = TERMINATED_ROW;
try {
if (isLeft) {
// logger.debug("row eof left");
@@ -229,16 +229,15 @@ public class NotInHandler extends OwnThreadDMLHandler {
/**
* only for terminate.
*
* @param row
* @param columnCount
* @param deque
* @throws InterruptedException
*/
private void addEndRowToDeque(RowDataPacket row, int columnCount, FairLinkedBlockingDeque<LocalResult> deque)
private void addEndRowToDeque(int columnCount, FairLinkedBlockingDeque<LocalResult> deque)
throws InterruptedException {
LocalResult newLocalResult = new UnSortedLocalResult(columnCount, pool, this.charset).
setMemSizeController(session.getJoinBufferMC());
newLocalResult.add(row);
newLocalResult.add(TERMINATED_ROW);
newLocalResult.done();
LocalResult localResult = deque.addOrReplaceLast(newLocalResult);
if (localResult != null)
@@ -247,10 +246,8 @@ public class NotInHandler extends OwnThreadDMLHandler {
@Override
protected void terminateThread() throws Exception {
RowDataPacket eofRow = new RowDataPacket(0);
addEndRowToDeque(eofRow, leftFieldPackets.size(), leftQueue);
RowDataPacket eofRow2 = new RowDataPacket(0);
addEndRowToDeque(eofRow2, rightFieldPackets.size(), rightQueue);
addEndRowToDeque(leftFieldPackets.size(), leftQueue);
addEndRowToDeque(rightFieldPackets.size(), rightQueue);
}
@Override