diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/OwnThreadDMLHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/OwnThreadDMLHandler.java index 3c671c051..a6b08e234 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/OwnThreadDMLHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/OwnThreadDMLHandler.java @@ -7,6 +7,7 @@ package com.actiontech.dble.backend.mysql.nio.handler.query; import com.actiontech.dble.DbleServer; import com.actiontech.dble.net.Session; +import com.actiontech.dble.net.mysql.RowDataPacket; import java.util.concurrent.atomic.AtomicBoolean; @@ -17,6 +18,9 @@ import java.util.concurrent.atomic.AtomicBoolean; * @CreateTime 2014/11/27 */ public abstract class OwnThreadDMLHandler extends BaseDMLHandler { + + public static final RowDataPacket TERMINATED_ROW = new RowDataPacket(0); + /* if the own thread need to terminated, true if own thread running */ private AtomicBoolean ownJobFlag; private Object ownThreadLock = new Object(); diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/OrderByHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/OrderByHandler.java index 8c51d73a1..877e86751 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/OrderByHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/OrderByHandler.java @@ -85,7 +85,7 @@ public class OrderByHandler extends OwnThreadDMLHandler { if (terminate.get()) return; try { - queue.put(new RowDataPacket(0)); + queue.put(TERMINATED_ROW); } catch (InterruptedException e) { //ignore error } @@ -142,9 +142,9 @@ public class OrderByHandler extends OwnThreadDMLHandler { } @Override - protected void terminateThread() throws Exception { + protected void terminateThread() { this.queue.clear(); - this.queue.add(new RowDataPacket(0)); + this.queue.add(TERMINATED_ROW); } @Override diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/groupby/DirectGroupByHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/groupby/DirectGroupByHandler.java index 9c8c5cf87..2f1ae623e 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/groupby/DirectGroupByHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/groupby/DirectGroupByHandler.java @@ -208,7 +208,7 @@ public class DirectGroupByHandler extends OwnThreadDMLHandler { try { // @bug1042 for (int i = 0; i < bucketSize; i++) - queue.put(new RowDataPacket(0)); + queue.put(TERMINATED_ROW); } catch (InterruptedException e) { //ignore error } @@ -308,7 +308,7 @@ public class DirectGroupByHandler extends OwnThreadDMLHandler { protected void terminateThread() throws Exception { this.queue.clear(); for (int i = 0; i < bucketSize; i++) - queue.put(new RowDataPacket(0)); + queue.put(TERMINATED_ROW); } @Override diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/groupby/directgroupby/GroupByBucket.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/groupby/directgroupby/GroupByBucket.java index bc2bf8951..4aeec2372 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/groupby/directgroupby/GroupByBucket.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/groupby/directgroupby/GroupByBucket.java @@ -5,6 +5,7 @@ package com.actiontech.dble.backend.mysql.nio.handler.query.impl.groupby.directgroupby; +import com.actiontech.dble.backend.mysql.nio.handler.query.OwnThreadDMLHandler; import com.actiontech.dble.backend.mysql.nio.handler.util.RowDataComparator; import com.actiontech.dble.backend.mysql.store.GroupByLocalResult; import com.actiontech.dble.buffer.BufferPool; @@ -50,7 +51,7 @@ public class GroupByBucket extends GroupByLocalResult { RowDataPacket groupedRow = null; while ((groupedRow = next()) != null) outData.put(groupedRow); - outData.put(new RowDataPacket((0))); + outData.put(OwnThreadDMLHandler.TERMINATED_ROW); } catch (Exception e) { e.printStackTrace(); } @@ -59,4 +60,10 @@ public class GroupByBucket extends GroupByLocalResult { thread.start(); } + @Override + public void close() { + inData.add(OwnThreadDMLHandler.TERMINATED_ROW); + super.close(); + } + } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/JoinHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/JoinHandler.java index 950818656..c275e0c71 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/JoinHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/JoinHandler.java @@ -155,7 +155,7 @@ public class JoinHandler extends OwnThreadDMLHandler { if (terminate.get()) { return; } - RowDataPacket eofRow = new RowDataPacket(0); + RowDataPacket eofRow = TERMINATED_ROW; try { if (isLeft) { LOGGER.debug("row eof left"); @@ -359,16 +359,15 @@ public class JoinHandler extends OwnThreadDMLHandler { /** * only for terminate. * - * @param row * @param columnCount * @param deque * @throws InterruptedException */ - private void addEndRowToDeque(RowDataPacket row, int columnCount, FairLinkedBlockingDeque deque) + private void addEndRowToDeque(int columnCount, FairLinkedBlockingDeque deque) throws InterruptedException { LocalResult newLocalResult = new UnSortedLocalResult(columnCount, pool, this.charset). setMemSizeController(session.getJoinBufferMC()); - newLocalResult.add(row); + newLocalResult.add(TERMINATED_ROW); newLocalResult.done(); LocalResult localResult = deque.addOrReplaceLast(newLocalResult); if (localResult != null) @@ -377,10 +376,8 @@ public class JoinHandler extends OwnThreadDMLHandler { @Override protected void terminateThread() throws Exception { - RowDataPacket eofRow = new RowDataPacket(0); - addEndRowToDeque(eofRow, leftFieldPackets.size(), leftQueue); - RowDataPacket eofRow2 = new RowDataPacket(0); - addEndRowToDeque(eofRow2, rightFieldPackets.size(), rightQueue); + addEndRowToDeque(leftFieldPackets.size(), leftQueue); + addEndRowToDeque(rightFieldPackets.size(), rightQueue); } @Override diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/NotInHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/NotInHandler.java index 2a74b4216..946d18c14 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/NotInHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/join/NotInHandler.java @@ -112,7 +112,7 @@ public class NotInHandler extends OwnThreadDMLHandler { if (terminate.get()) { return; } - RowDataPacket eofRow = new RowDataPacket(0); + RowDataPacket eofRow = TERMINATED_ROW; try { if (isLeft) { // logger.debug("row eof left"); @@ -224,16 +224,15 @@ public class NotInHandler extends OwnThreadDMLHandler { /** * only for terminate. * - * @param row * @param columnCount * @param deque * @throws InterruptedException */ - private void addEndRowToDeque(RowDataPacket row, int columnCount, FairLinkedBlockingDeque deque) + private void addEndRowToDeque(int columnCount, FairLinkedBlockingDeque deque) throws InterruptedException { LocalResult newLocalResult = new UnSortedLocalResult(columnCount, pool, this.charset). setMemSizeController(session.getJoinBufferMC()); - newLocalResult.add(row); + newLocalResult.add(TERMINATED_ROW); newLocalResult.done(); LocalResult localResult = deque.addOrReplaceLast(newLocalResult); if (localResult != null) @@ -242,10 +241,8 @@ public class NotInHandler extends OwnThreadDMLHandler { @Override protected void terminateThread() throws Exception { - RowDataPacket eofRow = new RowDataPacket(0); - addEndRowToDeque(eofRow, leftFieldPackets.size(), leftQueue); - RowDataPacket eofRow2 = new RowDataPacket(0); - addEndRowToDeque(eofRow2, rightFieldPackets.size(), rightQueue); + addEndRowToDeque(leftFieldPackets.size(), leftQueue); + addEndRowToDeque(rightFieldPackets.size(), rightQueue); } @Override