refactor multi_groupby

This commit is contained in:
yanhuqing
2017-11-22 10:55:58 +08:00
parent d54effbf3c
commit d83edd05cc
27 changed files with 397 additions and 1872 deletions

View File

@@ -24,14 +24,6 @@
</Match>
<!-- protocol:ignore -->
<!-- need refactor -->
<Match>
<Class name="com.actiontech.dble.sqlengine.mpp.UnsafeRowGrouper"/>
</Match>
<Match>
<Class name="com.actiontech.dble.sqlengine.mpp.RowDataPacketGrouper"/>
</Match>
<Match>
<Package name="com.actiontech.dble.util.dataMigrator.dataIOImpl"/>
</Match>

View File

@@ -44,7 +44,7 @@ public class MultiNodeDdlHandler extends MultiNodeHandler {
private ErrorPacket err;
private volatile boolean errConn = false;
private Set<BackendConnection> closedConnSet;
public MultiNodeDdlHandler(int sqlType, RouteResultset rrs, NonBlockingSession session) {
public MultiNodeDdlHandler(RouteResultset rrs, NonBlockingSession session) {
super(session);
if (rrs.getNodes() == null) {
throw new IllegalArgumentException("routeNode is null!");
@@ -59,7 +59,8 @@ public class MultiNodeDdlHandler extends MultiNodeHandler {
this.session = session;
this.oriRrs = rrs;
this.handler = new MultiNodeQueryHandler(sqlType, rrs, session);
this.handler = new MultiNodeQueryHandler(rrs, session);
this.errConn = false;
}
@@ -250,6 +251,7 @@ public class MultiNodeDdlHandler extends MultiNodeHandler {
handler.execute();
} catch (Exception e) {
LOGGER.warn(String.valueOf(source) + oriRrs, e);
session.handleSpecial(oriRrs, source.getSchema(), false);
source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
}
if (session.isPrepared()) {

View File

@@ -121,8 +121,7 @@ public abstract class MultiNodeHandler implements ResponseHandler {
session.closeAndClearResources(error);
} else {
session.getSource().setTxInterrupt(this.error);
// clear resources
clearResources();
this.clearResources();
}
}
}

View File

@@ -18,17 +18,12 @@ import com.actiontech.dble.cache.LayerCachePool;
import com.actiontech.dble.config.ErrorCode;
import com.actiontech.dble.config.ServerConfig;
import com.actiontech.dble.log.transaction.TxnLogHelper;
import com.actiontech.dble.memory.unsafe.row.UnsafeRow;
import com.actiontech.dble.net.mysql.*;
import com.actiontech.dble.route.RouteResultset;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.server.NonBlockingSession;
import com.actiontech.dble.server.ServerConnection;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.sqlengine.mpp.AbstractDataNodeMerge;
import com.actiontech.dble.sqlengine.mpp.ColMeta;
import com.actiontech.dble.sqlengine.mpp.DataMergeService;
import com.actiontech.dble.sqlengine.mpp.DataNodeMergeManager;
import com.actiontech.dble.statistic.stat.QueryResult;
import com.actiontech.dble.statistic.stat.QueryResultDispatcher;
import com.actiontech.dble.util.FormatUtil;
@@ -36,39 +31,38 @@ import com.actiontech.dble.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @author mycat
*/
public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataResponseHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeQueryHandler.class);
private final RouteResultset rrs;
private final NonBlockingSession session;
private final AbstractDataNodeMerge dataMergeSvr;
protected final RouteResultset rrs;
protected final NonBlockingSession session;
private final boolean sessionAutocommit;
protected long affectedRows;
protected long selectRows;
protected long startTime;
protected long netInBytes;
protected List<BackendConnection> errConnection;
protected long netOutBytes;
protected boolean prepared;
protected ErrorPacket err;
protected int fieldCount = 0;
protected volatile boolean fieldsReturned;
private long insertId;
private String primaryKeyTable = null;
private int primaryKeyIndex = -1;
private int fieldCount = 0;
private long affectedRows;
private long selectRows;
private long insertId;
private volatile boolean fieldsReturned;
private long startTime;
private long netInBytes;
private long netOutBytes;
private boolean prepared;
private List<FieldPacket> fieldPackets = new ArrayList<>();
private ErrorPacket err;
private List<BackendConnection> errConnection;
private volatile ByteBuffer byteBuffer;
private Set<BackendConnection> closedConnSet;
public MultiNodeQueryHandler(int sqlType, RouteResultset rrs, NonBlockingSession session) {
public MultiNodeQueryHandler(RouteResultset rrs, NonBlockingSession session) {
super(session);
if (rrs.getNodes() == null) {
throw new IllegalArgumentException("routeNode is null!");
@@ -77,18 +71,8 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
LOGGER.debug("execute multi node query " + rrs.getStatement());
}
this.rrs = rrs;
int isOffHeapUseOffHeapForMerge = DbleServer.getInstance().getConfig().getSystem().getUseOffHeapForMerge();
if (ServerParse.SELECT == sqlType && rrs.needMerge()) {
if (isOffHeapUseOffHeapForMerge == 1) {
dataMergeSvr = new DataNodeMergeManager(this, rrs);
} else {
dataMergeSvr = new DataMergeService(this, rrs);
}
} else {
dataMergeSvr = null;
if (ServerParse.SELECT == sqlType) {
byteBuffer = session.getSource().allocate();
}
if (ServerParse.SELECT == rrs.getSqlType()) {
byteBuffer = session.getSource().allocate();
}
this.sessionAutocommit = session.getSource().isAutocommit();
this.session = session;
@@ -282,11 +266,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
return;
}
fieldsReturned = true;
if (dataMergeSvr != null) {
mergeFieldEof(fields, eof);
} else {
executeFieldEof(header, fields, eof);
}
executeFieldEof(header, fields, eof);
} catch (Exception e) {
handleDataProcessException(e);
} finally {
@@ -348,44 +328,35 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
this.selectRows++;
RouteResultsetNode rNode = (RouteResultsetNode) conn.getAttachment();
String dataNode = rNode.getName();
if (dataMergeSvr != null) {
// even through discarding the all rest data, we can't
//close the connection for tx control such as rollback or commit.
// So the "isClosedByDiscard" variable is unnecessary.
// @author Uncle-pan
// @since 2016-03-25
dataMergeSvr.onNewRecord(dataNode, row);
} else {
if (rrs.getLimitSize() >= 0) {
if (selectRows <= rrs.getLimitStart()) {
return false;
} else if (selectRows > (rrs.getLimitStart() < 0 ? 0 : rrs.getLimitStart()) + rrs.getLimitSize()) {
return false;
}
if (rrs.getLimitSize() >= 0) {
if (selectRows <= rrs.getLimitStart() ||
(selectRows > (rrs.getLimitStart() < 0 ? 0 : rrs.getLimitStart()) + rrs.getLimitSize())) {
return false;
}
RowDataPacket rowDataPkg = null;
// cache primaryKey-> dataNode
if (primaryKeyIndex != -1) {
}
RowDataPacket rowDataPkg = null;
// cache primaryKey-> dataNode
if (primaryKeyIndex != -1) {
rowDataPkg = new RowDataPacket(fieldCount);
rowDataPkg.read(row);
String primaryKey = new String(rowDataPkg.fieldValues.get(primaryKeyIndex));
LayerCachePool pool = DbleServer.getInstance().getRouterService().getTableId2DataNodeCache();
if (pool != null) {
pool.putIfAbsent(primaryKeyTable, primaryKey, dataNode);
}
}
row[3] = ++packetId;
if (prepared) {
if (rowDataPkg == null) {
rowDataPkg = new RowDataPacket(fieldCount);
rowDataPkg.read(row);
String primaryKey = new String(rowDataPkg.fieldValues.get(primaryKeyIndex));
LayerCachePool pool = DbleServer.getInstance().getRouterService().getTableId2DataNodeCache();
if (pool != null) {
pool.putIfAbsent(primaryKeyTable, primaryKey, dataNode);
}
}
row[3] = ++packetId;
if (prepared) {
if (rowDataPkg == null) {
rowDataPkg = new RowDataPacket(fieldCount);
rowDataPkg.read(row);
}
BinaryRowDataPacket binRowDataPk = new BinaryRowDataPacket();
binRowDataPk.read(fieldPackets, rowDataPkg);
binRowDataPk.write(byteBuffer, session.getSource(), true);
} else {
byteBuffer = session.getSource().writeToBuffer(row, byteBuffer);
}
BinaryRowDataPacket binRowDataPk = new BinaryRowDataPacket();
binRowDataPk.read(fieldPackets, rowDataPkg);
binRowDataPk.write(byteBuffer, session.getSource(), true);
} else {
byteBuffer = session.getSource().writeToBuffer(row, byteBuffer);
}
} catch (Exception e) {
handleDataProcessException(e);
@@ -400,14 +371,6 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
if (closedConnSet != null) {
closedConnSet.clear();
}
lock.lock();
try {
if (dataMergeSvr != null) {
dataMergeSvr.clear();
}
} finally {
lock.unlock();
}
}
@Override
@@ -460,29 +423,20 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
}
private void writeEofResult(byte[] eof, ServerConnection source) {
if (dataMergeSvr != null) {
try {
dataMergeSvr.outputMergeResult(session, eof);
} catch (Exception e) {
handleDataProcessException(e);
}
} else {
lock.lock();
try {
eof[3] = ++packetId;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("last packet id:" + packetId);
}
byteBuffer = source.writeToBuffer(eof, byteBuffer);
source.write(byteBuffer);
} finally {
lock.unlock();
lock.lock();
try {
eof[3] = ++packetId;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("last packet id:" + packetId);
}
byteBuffer = source.writeToBuffer(eof, byteBuffer);
source.write(byteBuffer);
} finally {
lock.unlock();
}
}
private void doSqlStat(ServerConnection source) {
protected void doSqlStat(ServerConnection source) {
if (DbleServer.getInstance().getConfig().getSystem().getUseSqlStat() == 1) {
int resultSize = source.getWriteQueue().size() * DbleServer.getInstance().getConfig().getSystem().getBufferPoolPageSize();
if (rrs != null && rrs.getStatement() != null) {
@@ -495,132 +449,6 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
}
}
/**
* send the final result to the client
*
* @param source ServerConnection
* @param eof eof bytes
* @param iterator Iterator
*/
public void outputMergeResult(final ServerConnection source, final byte[] eof, Iterator<UnsafeRow> iterator) {
lock.lock();
try {
ByteBuffer buffer = session.getSource().allocate();
final RouteResultset routeResultset = this.dataMergeSvr.getRrs();
/* cut the result for the limit statement */
int start = routeResultset.getLimitStart();
int end = start + routeResultset.getLimitSize();
int index = 0;
if (start < 0)
start = 0;
if (routeResultset.getLimitSize() < 0)
end = Integer.MAX_VALUE;
if (prepared) {
while (iterator.hasNext()) {
UnsafeRow row = iterator.next();
if (index >= start) {
row.setPacketId(++packetId);
BinaryRowDataPacket binRowPacket = new BinaryRowDataPacket();
binRowPacket.read(fieldPackets, row);
buffer = binRowPacket.write(buffer, source, true);
}
index++;
if (index == end) {
break;
}
}
} else {
while (iterator.hasNext()) {
UnsafeRow row = iterator.next();
if (index >= start) {
row.setPacketId(++packetId);
buffer = row.write(buffer, source, true);
}
index++;
if (index == end) {
break;
}
}
}
eof[3] = ++packetId;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("last packet id:" + packetId);
}
source.write(source.writeToBuffer(eof, buffer));
} catch (Exception e) {
handleDataProcessException(e);
} finally {
try {
dataMergeSvr.clear();
} finally {
lock.unlock();
}
}
}
public void outputMergeResult(final ServerConnection source,
final byte[] eof, List<RowDataPacket> results) {
lock.lock();
try {
ByteBuffer buffer = session.getSource().allocate();
final RouteResultset routeResultset = this.dataMergeSvr.getRrs();
int start = routeResultset.getLimitStart();
int end = start + routeResultset.getLimitSize();
if (start < 0) {
start = 0;
}
if (routeResultset.getLimitSize() < 0) {
end = results.size();
}
if (end > results.size()) {
end = results.size();
}
if (prepared) {
for (int i = start; i < end; i++) {
RowDataPacket row = results.get(i);
BinaryRowDataPacket binRowDataPk = new BinaryRowDataPacket();
binRowDataPk.read(fieldPackets, row);
binRowDataPk.setPacketId(++packetId);
//binRowDataPk.write(source);
buffer = binRowDataPk.write(buffer, session.getSource(), true);
}
} else {
for (int i = start; i < end; i++) {
RowDataPacket row = results.get(i);
row.setPacketId(++packetId);
buffer = row.write(buffer, source, true);
}
}
eof[3] = ++packetId;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("last packet id:" + packetId);
}
source.write(source.writeToBuffer(eof, buffer));
} catch (Exception e) {
handleDataProcessException(e);
} finally {
try {
dataMergeSvr.clear();
} finally {
lock.unlock();
}
}
}
private void executeFieldEof(byte[] header, List<byte[]> fields, byte[] eof) {
ServerConnection source = session.getSource();
fieldCount = fields.size();
@@ -662,45 +490,6 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
byteBuffer = source.writeToBuffer(eof, byteBuffer);
}
private void mergeFieldEof(List<byte[]> fields, byte[] eof) throws IOException {
fieldCount = fields.size();
ResultSetHeaderPacket packet = new ResultSetHeaderPacket();
packet.setPacketId(++packetId);
packet.setFieldCount(fieldCount);
ServerConnection source = session.getSource();
ByteBuffer buffer = source.allocate();
buffer = packet.write(buffer, source, true);
Map<String, ColMeta> columnToIndex = new HashMap<>(fieldCount);
for (int i = 0, len = fieldCount; i < len; ++i) {
byte[] field = fields.get(i);
FieldPacket fieldPkg = new FieldPacket();
fieldPkg.read(field);
if (rrs.getSchema() != null) {
fieldPkg.setDb(rrs.getSchema().getBytes());
}
if (rrs.getTableAlias() != null) {
fieldPkg.setTable(rrs.getTableAlias().getBytes());
}
if (rrs.getTable() != null) {
fieldPkg.setOrgTable(rrs.getTable().getBytes());
}
fieldPackets.add(fieldPkg);
String fieldName = new String(fieldPkg.getName()).toUpperCase();
if (!columnToIndex.containsKey(fieldName)) {
ColMeta colMeta = new ColMeta(i, fieldPkg.getType());
colMeta.setDecimals(fieldPkg.getDecimals());
columnToIndex.put(fieldName, colMeta);
}
fieldPkg.setPacketId(++packetId);
buffer = fieldPkg.write(buffer, source, false);
}
eof[3] = ++packetId;
buffer = source.writeToBuffer(eof, buffer);
source.write(buffer);
dataMergeSvr.onRowMetaData(columnToIndex, fieldCount);
}
public void handleDataProcessException(Exception e) {
if (!errorResponse.get()) {
@@ -733,7 +522,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR
this.prepared = prepared;
}
private void handleEndPacket(byte[] data, AutoTxOperation txOperation, BackendConnection conn) {
protected void handleEndPacket(byte[] data, AutoTxOperation txOperation, BackendConnection conn) {
ServerConnection source = session.getSource();
if (source.isAutocommit() && !source.isTxStart() && conn.isModifiedSQLExecuted()) {
if (nodeCount < 0) {

View File

@@ -0,0 +1,225 @@
/*
* Copyright (C) 2016-2017 ActionTech.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.backend.mysql.nio.handler;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.BackendConnection;
import com.actiontech.dble.backend.mysql.nio.MySQLConnection;
import com.actiontech.dble.backend.mysql.nio.handler.builder.BaseHandlerBuilder;
import com.actiontech.dble.backend.mysql.nio.handler.query.impl.OutputHandler;
import com.actiontech.dble.backend.mysql.nio.handler.transaction.AutoTxOperation;
import com.actiontech.dble.backend.mysql.nio.handler.util.ArrayMinHeap;
import com.actiontech.dble.backend.mysql.nio.handler.util.HeapItem;
import com.actiontech.dble.backend.mysql.nio.handler.util.RowDataComparator;
import com.actiontech.dble.net.mysql.FieldPacket;
import com.actiontech.dble.net.mysql.RowDataPacket;
import com.actiontech.dble.plan.Order;
import com.actiontech.dble.plan.common.item.ItemField;
import com.actiontech.dble.route.RouteResultset;
import com.actiontech.dble.server.NonBlockingSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
public class MultiNodeSelectHandler extends MultiNodeQueryHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeSelectHandler.class);
private final int queueSize;
private Map<BackendConnection, BlockingQueue<HeapItem>> queues;
private RowDataComparator rowComparator;
private OutputHandler outputHandler;
private volatile boolean noNeedRows = false;
public MultiNodeSelectHandler(RouteResultset rrs, NonBlockingSession session) {
super(rrs, session);
this.queueSize = DbleServer.getInstance().getConfig().getSystem().getMergeQueueSize();
this.queues = new ConcurrentHashMap<>();
outputHandler = new OutputHandler(BaseHandlerBuilder.getSequenceId(), session, false);
}
@Override
public void okResponse(byte[] data, BackendConnection conn) {
this.netOutBytes += data.length;
boolean executeResponse = conn.syncAndExecute();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("received ok response ,executeResponse:" + executeResponse + " from " + conn);
}
if (executeResponse) {
String reason = "unexpected okResponse";
LOGGER.warn(reason);
}
}
@Override
public void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacket> fieldPacketsNull, byte[] eof,
boolean isLeft, BackendConnection conn) {
this.netOutBytes += header.length;
this.netOutBytes += eof.length;
queues.put(conn, new LinkedBlockingQueue<HeapItem>(queueSize));
lock.lock();
try {
if (isFail()) {
handleEndPacket(err.toBytes(), AutoTxOperation.ROLLBACK, conn);
} else {
if (!fieldsReturned) {
fieldsReturned = true;
mergeFieldEof(fields, conn);
}
if (--nodeCount > 0) {
return;
}
startOwnThread();
}
} catch (Exception e) {
handleDataProcessException(e);
} finally {
lock.unlock();
}
}
@Override
public void rowEofResponse(final byte[] eof, boolean isLeft, BackendConnection conn) {
BlockingQueue<HeapItem> queue = queues.get(conn);
if (queue == null)
return;
try {
queue.put(HeapItem.nullItem());
} catch (InterruptedException e) {
LOGGER.warn("rowEofResponse error", e);
}
}
@Override
public boolean rowResponse(final byte[] row, RowDataPacket rowPacketNull, boolean isLeft, BackendConnection conn) {
if (errorResponse.get() || noNeedRows) {
return true;
}
BlockingQueue<HeapItem> queue = queues.get(conn);
if (queue == null)
return true;
RowDataPacket rp = new RowDataPacket(fieldCount);
rp.read(row);
HeapItem item = new HeapItem(row, rp, (MySQLConnection) conn);
try {
queue.put(item);
} catch (InterruptedException e) {
LOGGER.warn("rowResponse error", e);
}
return false;
}
@Override
public void writeQueueAvailable() {
}
private void mergeFieldEof(List<byte[]> fields, BackendConnection conn) throws IOException {
fieldCount = fields.size();
List<FieldPacket> fieldPackets = new ArrayList<>();
for (byte[] field : fields) {
this.netOutBytes += field.length;
FieldPacket fieldPacket = new FieldPacket();
fieldPacket.read(field);
if (rrs.getSchema() != null) {
fieldPacket.setDb(rrs.getSchema().getBytes());
}
if (rrs.getTableAlias() != null) {
fieldPacket.setTable(rrs.getTableAlias().getBytes());
}
if (rrs.getTable() != null) {
fieldPacket.setOrgTable(rrs.getTable().getBytes());
}
fieldPackets.add(fieldPacket);
}
List<Order> orderBys = new ArrayList<>();
for (String groupBy : rrs.getGroupByCols()) {
ItemField itemField = new ItemField(rrs.getSchema(), rrs.getTableAlias(), groupBy);
orderBys.add(new Order(itemField));
}
rowComparator = new RowDataComparator(fieldPackets, orderBys);
outputHandler.fieldEofResponse(null, null, fieldPackets, null, false, conn);
}
private void startOwnThread() {
DbleServer.getInstance().getComplexQueryExecutor().execute(new Runnable() {
@Override
public void run() {
ownThreadJob();
}
});
}
private void ownThreadJob() {
try {
ArrayMinHeap<HeapItem> heap = new ArrayMinHeap<>(new Comparator<HeapItem>() {
@Override
public int compare(HeapItem o1, HeapItem o2) {
RowDataPacket row1 = o1.getRowPacket();
RowDataPacket row2 = o2.getRowPacket();
if (row1 == null || row2 == null) {
if (row1 == row2)
return 0;
if (row1 == null)
return -1;
return 1;
}
return rowComparator.compare(row1, row2);
}
});
// init heap
for (Map.Entry<BackendConnection, BlockingQueue<HeapItem>> entry : queues.entrySet()) {
HeapItem firstItem = entry.getValue().take();
heap.add(firstItem);
}
while (!heap.isEmpty()) {
if (isFail())
return;
HeapItem top = heap.peak();
if (top.isNullItem()) {
heap.poll();
} else {
BlockingQueue<HeapItem> topItemQueue = queues.get(top.getIndex());
HeapItem item = topItemQueue.take();
heap.replaceTop(item);
//limit
this.selectRows++;
if (rrs.getLimitSize() >= 0) {
if (selectRows <= rrs.getLimitStart()) {
continue;
} else if (selectRows > (rrs.getLimitStart() < 0 ? 0 : rrs.getLimitStart()) + rrs.getLimitSize()) {
noNeedRows = true;
while (!heap.isEmpty()) {
HeapItem itemToDiscard = heap.poll();
if (!itemToDiscard.isNullItem()) {
BlockingQueue<HeapItem> discardQueue = queues.get(itemToDiscard.getIndex());
boolean isClear = false;
while (!isClear) {
if (discardQueue.take().isNullItem() || isFail()) {
isClear = true;
}
}
}
}
continue;
}
}
outputHandler.rowResponse(top.getRowData(), top.getRowPacket(), false, top.getIndex());
}
}
outputHandler.rowEofResponse(null, false, queues.keySet().iterator().next());
doSqlStat(session.getSource());
} catch (Exception e) {
String msg = "Merge thread error, " + e.getLocalizedMessage();
LOGGER.warn(msg, e);
session.onQueryError(msg.getBytes());
}
}
}

View File

@@ -134,7 +134,7 @@ public class SingleNodeHandler implements ResponseHandler, LoadDataResponseHandl
int errPort = source.getLocalPort();
String errMsg = " errNo:" + errPkg.getErrNo() + " " + new String(errPkg.getMessage());
LOGGER.warn("execute sql err :" + errMsg + " con:" + conn +
LOGGER.warn("execute sql err :" + errMsg + " con:" + conn +
" frontend host:" + errHost + "/" + errPort + "/" + errUser);
session.releaseConnectionIfSafe(conn, false);
@@ -358,10 +358,6 @@ public class SingleNodeHandler implements ResponseHandler, LoadDataResponseHandl
session.getSource().close(reason);
}
public void clearResources() {
}
@Override
public void requestDataResponse(byte[] data, BackendConnection conn) {
LoadDataUtil.requestFileDataResponse(data, conn);

View File

@@ -394,7 +394,7 @@ public abstract class BaseHandlerBuilder {
return (arg.getSumFuncs().size() > 0 || arg.getGroupBys().size() > 0);
}
protected static long getSequenceId() {
public static long getSequenceId() {
return sequenceId.incrementAndGet();
}

View File

@@ -47,6 +47,7 @@ public class MultiNodeMergeHandler extends OwnThreadDMLHandler {
private RouteResultsetNode[] route;
private int reachedConCount;
private boolean isEasyMerge;
private volatile boolean noNeedRows = false;
public MultiNodeMergeHandler(long id, RouteResultsetNode[] route, boolean autocommit, NonBlockingSession session,
List<Order> orderBys) {
@@ -119,7 +120,7 @@ public class MultiNodeMergeHandler extends OwnThreadDMLHandler {
@Override
public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, BackendConnection conn) {
if (terminate.get())
if (terminate.get() || noNeedRows)
return true;
if (isEasyMerge) {
@@ -190,7 +191,6 @@ public class MultiNodeMergeHandler extends OwnThreadDMLHandler {
HeapItem firstItem = entry.getValue().take();
heap.add(firstItem);
}
boolean filterFinished = false;
while (!heap.isEmpty()) {
if (terminate.get())
return;
@@ -201,11 +201,20 @@ public class MultiNodeMergeHandler extends OwnThreadDMLHandler {
BlockingQueue<HeapItem> topItemQueue = queues.get(top.getIndex());
HeapItem item = topItemQueue.take();
heap.replaceTop(item);
if (filterFinished) {
continue;
}
if (nextHandler.rowResponse(top.getRowData(), top.getRowPacket(), this.isLeft, top.getIndex())) {
filterFinished = true;
noNeedRows = true;
while (!heap.isEmpty()) {
HeapItem itemToDiscard = heap.poll();
if (!itemToDiscard.isNullItem()) {
BlockingQueue<HeapItem> discardQueue = queues.get(itemToDiscard.getIndex());
boolean isClear = false;
while (!isClear) {
if (discardQueue.take().isNullItem() || terminate.get()) {
isClear = true;
}
}
}
}
}
}
}

View File

@@ -45,6 +45,23 @@ public class RowDataComparator implements Comparator<RowDataPacket> {
}
}
public RowDataComparator(List<FieldPacket> fps, List<Order> orders) {
sourceFields = HandlerTool.createFields(fps);
if (orders != null && orders.size() > 0) {
ascList = new ArrayList<>();
cmpFields = new ArrayList<>();
cmpItems = new ArrayList<>();
for (Order order : orders) {
Item cmpItem = HandlerTool.createFieldItem(order.getItem(), sourceFields, 0);
cmpItems.add(cmpItem);
FieldPacket tmpFp = new FieldPacket();
cmpItem.makeField(tmpFp);
Field cmpField = HandlerTool.createField(tmpFp);
cmpFields.add(cmpField);
ascList.add(order.getSortOrder() == SQLOrderingSpecification.ASC);
}
}
}
public void sort(List<RowDataPacket> rows) {
Comparator<RowDataPacket> c = new Comparator<RowDataPacket>() {

View File

@@ -102,7 +102,7 @@ public class GroupByLocalResult extends LocalResult {
if (!bufferMC.addSize(incrementSize)) {
needFlush = true;
}
} else if (!needFlush && currentMemory > maxMemory) {
} else if (currentMemory > maxMemory) {
needFlush = true;
}
if (needFlush) {

View File

@@ -65,7 +65,7 @@ public abstract class LocalResult implements ResultStore {
if (!bufferMC.addSize(incrementSize)) {
needFlush = true;
}
} else if (!needFlush && currentMemory > maxMemory) {
} else if (currentMemory > maxMemory) {
needFlush = true;
}
if (needFlush) {

View File

@@ -28,12 +28,11 @@ public final class RouteResultset implements Serializable {
private boolean needOptimizer;
private int limitStart;
private boolean cacheAble;
// used to store table's ID->datanodes cache
// used to store table's ID->data nodes cache
// format is table.primaryKey
private String primaryKey;
// limit output total
private int limitSize;
private SQLMerge sqlMerge;
private boolean callStatement = false; // is Call Statement
@@ -55,7 +54,15 @@ public final class RouteResultset implements Serializable {
// if force master,set canRunInReadDB=false
// if force slave set runOnSlave,default null means not effect
private Boolean runOnSlave = null;
private String[] groupByCols;
public String[] getGroupByCols() {
return groupByCols;
}
public void setGroupByCols(String[] groupByCols) {
this.groupByCols = groupByCols;
}
public boolean isNeedOptimizer() {
return needOptimizer;
}
@@ -135,10 +142,6 @@ public final class RouteResultset implements Serializable {
}
public SQLMerge getSqlMerge() {
return sqlMerge;
}
public boolean isCacheAble() {
return cacheAble;
}
@@ -147,10 +150,6 @@ public final class RouteResultset implements Serializable {
this.cacheAble = cacheAble;
}
public boolean needMerge() {
return sqlMerge != null;
}
public int getSqlType() {
return sqlType;
}
@@ -159,17 +158,6 @@ public final class RouteResultset implements Serializable {
return limitStart;
}
public String[] getGroupByCols() {
return (sqlMerge != null) ? sqlMerge.getGroupByCols() : null;
}
private SQLMerge createSQLMergeIfNull() {
if (sqlMerge == null) {
sqlMerge = new SQLMerge();
}
return sqlMerge;
}
public void setLimitStart(int limitStart) {
this.limitStart = limitStart;
}
@@ -185,7 +173,7 @@ public final class RouteResultset implements Serializable {
public void setPrimaryKey(String primaryKey) {
if (!primaryKey.contains(".")) {
throw new java.lang.IllegalArgumentException(
"must be table.primarykey fomat :" + primaryKey);
"must be table.primaryKey format :" + primaryKey);
}
this.primaryKey = primaryKey;
}
@@ -193,18 +181,11 @@ public final class RouteResultset implements Serializable {
/**
* return primary key items ,first is table name ,seconds is primary key
*
* @return
*/
public String[] getPrimaryKeyItems() {
return primaryKey.split("\\.");
}
public void setGroupByCols(String[] groupByCols) {
if (groupByCols != null && groupByCols.length > 0) {
createSQLMergeIfNull().setGroupByCols(groupByCols);
}
}
public void setSrcStatement(String srcStatement) {
this.srcStatement = srcStatement;
}

View File

@@ -1,20 +0,0 @@
/*
* Copyright (C) 2016-2017 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.route;
import java.io.Serializable;
public class SQLMerge implements Serializable {
private String[] groupByCols;
public String[] getGroupByCols() {
return groupByCols;
}
public void setGroupByCols(String[] groupByCols) {
this.groupByCols = groupByCols;
}
}

View File

@@ -174,7 +174,7 @@ public class DruidInsertParser extends DruidInsertReplaceParser {
LOGGER.debug("found partition node for child table to insert " + dn + " sql :" + sql);
}
RouterUtil.routeToSingleNode(rrs, dn);
sc.getSession2().execute(rrs, ServerParse.INSERT);
sc.getSession2().execute(rrs);
}
});
}

View File

@@ -328,7 +328,7 @@ public class DruidReplaceParser extends DruidInsertReplaceParser {
LOGGER.debug("found partition node for child table to insert " + dn + " sql :" + sql);
}
RouterUtil.routeToSingleNode(rrs, dn);
sc.getSession2().execute(rrs, ServerParse.REPLACE);
sc.getSession2().execute(rrs);
}
});
}

View File

@@ -56,15 +56,11 @@ public class NonBlockingSession implements Session {
public static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSession.class);
public static final int CANCEL_STATUS_INIT = 0;
public static final int CANCEL_STATUS_COMMITTING = 1;
public static final int CANCEL_STATUS_CANCELING = 2;
static final int CANCEL_STATUS_CANCELING = 2;
private final ServerConnection source;
private final ConcurrentMap<RouteResultsetNode, BackendConnection> target;
// life-cycle: each sql execution
private volatile SingleNodeHandler singleNodeHandler;
private volatile MultiNodeQueryHandler multiNodeHandler;
private volatile MultiNodeDdlHandler multiNodeDdlHandler;
private RollbackNodesHandler rollbackHandler;
private CommitNodesHandler commitHandler;
private volatile String xaTxId;
@@ -141,7 +137,7 @@ public class NonBlockingSession implements Session {
}
@Override
public void execute(RouteResultset rrs, int type) {
public void execute(RouteResultset rrs) {
// clear prev execute resources
clearHandlesResources();
if (LOGGER.isDebugEnabled()) {
@@ -166,7 +162,7 @@ public class NonBlockingSession implements Session {
this.xaState = TxState.TX_STARTED_STATE;
}
if (nodes.length == 1) {
singleNodeHandler = new SingleNodeHandler(rrs, this);
SingleNodeHandler singleNodeHandler = new SingleNodeHandler(rrs, this);
if (this.isPrepared()) {
singleNodeHandler.setPrepared(true);
}
@@ -181,39 +177,56 @@ public class NonBlockingSession implements Session {
this.setPrepared(false);
}
} else {
if (rrs.getSqlType() != ServerParse.DDL) {
/**
* here, just a try! The sync is the superfluous, because there are hearbeats at every backend node.
* We don't do 2pc or 3pc. Beause mysql(that is, resource manager) don't support that for ddl statements.
*/
multiNodeHandler = new MultiNodeQueryHandler(type, rrs, this);
if (this.isPrepared()) {
multiNodeHandler.setPrepared(true);
}
try {
multiNodeHandler.execute();
} catch (Exception e) {
handleSpecial(rrs, source.getSchema(), false);
LOGGER.warn(String.valueOf(source) + rrs, e);
source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
}
if (this.isPrepared()) {
this.setPrepared(false);
}
} else {
checkBackupStatus();
multiNodeDdlHandler = new MultiNodeDdlHandler(type, rrs, this);
try {
multiNodeDdlHandler.execute();
} catch (Exception e) {
LOGGER.warn(String.valueOf(source) + rrs, e);
source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
}
executeMultiResultSet(rrs);
}
}
private void executeMultiResultSet(RouteResultset rrs) {
if (rrs.getSqlType() == ServerParse.DDL) {
/*
* here, just a try! The sync is the superfluous, because there are heartbeats at every backend node.
* We don't do 2pc or 3pc. Because mysql(that is, resource manager) don't support that for ddl statements.
*/
checkBackupStatus();
MultiNodeDdlHandler multiNodeDdlHandler = new MultiNodeDdlHandler(rrs, this);
try {
multiNodeDdlHandler.execute();
} catch (Exception e) {
LOGGER.warn(String.valueOf(source) + rrs, e);
source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
}
} else if (ServerParse.SELECT == rrs.getSqlType() && rrs.getGroupByCols() != null) {
MultiNodeSelectHandler multiNodeSelectHandler = new MultiNodeSelectHandler(rrs, this);
if (this.isPrepared()) {
multiNodeSelectHandler.setPrepared(true);
}
try {
multiNodeSelectHandler.execute();
} catch (Exception e) {
LOGGER.warn(String.valueOf(source) + rrs, e);
source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
}
if (this.isPrepared()) {
this.setPrepared(false);
}
} else {
MultiNodeQueryHandler multiNodeHandler = new MultiNodeQueryHandler(rrs, this);
if (this.isPrepared()) {
multiNodeHandler.setPrepared(true);
}
try {
multiNodeHandler.execute();
} catch (Exception e) {
LOGGER.warn(String.valueOf(source) + rrs, e);
source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString());
}
if (this.isPrepared()) {
this.setPrepared(false);
}
}
}
public void execute(PlanNode node) {
private void executeMultiResultSet(PlanNode node) {
init();
HandlerBuilder builder = new HandlerBuilder(node, this);
try {
@@ -256,11 +269,11 @@ public class NonBlockingSession implements Session {
//sub Query build will be blocked, so use ComplexQueryExecutor
@Override
public void run() {
execute(finalNode);
executeMultiResultSet(finalNode);
}
});
} else {
execute(node);
executeMultiResultSet(node);
}
}
@@ -288,7 +301,7 @@ public class NonBlockingSession implements Session {
}
}
private CommitNodesHandler createCommitNodesHandler() {
private void resetCommitNodesHandler() {
if (commitHandler == null) {
if (this.getSessionXaID() == null) {
commitHandler = new NormalCommitNodesHandler(this);
@@ -303,7 +316,6 @@ public class NonBlockingSession implements Session {
commitHandler = new XACommitNodesHandler(this);
}
}
return commitHandler;
}
public void commit() {
@@ -316,7 +328,7 @@ public class NonBlockingSession implements Session {
return;
}
checkBackupStatus();
createCommitNodesHandler();
resetCommitNodesHandler();
commitHandler.commit();
}
@@ -327,7 +339,7 @@ public class NonBlockingSession implements Session {
needWaitFinished = true;
}
private RollbackNodesHandler createRollbackNodesHandler() {
private void resetRollbackNodesHandler() {
if (rollbackHandler == null) {
if (this.getSessionXaID() == null) {
rollbackHandler = new NormalRollbackNodesHandler(this);
@@ -342,7 +354,6 @@ public class NonBlockingSession implements Session {
rollbackHandler = new XARollbackNodesHandler(this);
}
}
return rollbackHandler;
}
public void rollback() {
@@ -357,7 +368,7 @@ public class NonBlockingSession implements Session {
source.write(buffer);
return;
}
createRollbackNodesHandler();
resetRollbackNodesHandler();
rollbackHandler.rollback();
}
@@ -409,7 +420,7 @@ public class NonBlockingSession implements Session {
/**
* Only used when kill @@connection is Issued
*/
public void initiativeTerminate() {
void initiativeTerminate() {
for (BackendConnection node : target.values()) {
node.terminate("client closed ");
@@ -489,10 +500,7 @@ public class NonBlockingSession implements Session {
/**
* @return previous bound connection
*/
public BackendConnection bindConnection(RouteResultsetNode key,
BackendConnection conn) {
// System.out.println("bind connection "+conn+
// " to key "+key.getName()+" on sesion "+this);
public BackendConnection bindConnection(RouteResultsetNode key, BackendConnection conn) {
return target.put(key, conn);
}
@@ -562,23 +570,6 @@ public class NonBlockingSession implements Session {
}
private void clearHandlesResources() {
SingleNodeHandler singleHandler = singleNodeHandler;
if (singleHandler != null) {
singleHandler.clearResources();
singleNodeHandler = null;
}
MultiNodeDdlHandler multiDdlHandler = multiNodeDdlHandler;
if (multiDdlHandler != null) {
multiDdlHandler.clearResources();
multiNodeDdlHandler = null;
}
MultiNodeQueryHandler multiHandler = multiNodeHandler;
if (multiHandler != null) {
multiHandler.clearResources();
multiNodeHandler = null;
}
if (rollbackHandler != null) {
rollbackHandler.clearResources();
}

View File

@@ -294,7 +294,7 @@ public class ServerConnection extends FrontendConnection {
}
RouterUtil.routeToRandomNode(rrs, schemaInfo.getSchemaConfig(), schemaInfo.getTable());
}
session.execute(rrs, sqlType);
session.execute(rrs);
} catch (Exception e) {
executeException(e, stmt);
}
@@ -320,7 +320,7 @@ public class ServerConnection extends FrontendConnection {
executeException(e, sql);
return;
}
session.execute(rrs, type);
session.execute(rrs);
}
private void addTableMetaLock(RouteResultset rrs) throws SQLNonTransientException {

View File

@@ -26,7 +26,7 @@ public interface Session {
/**
* execute session
*/
void execute(RouteResultset rrs, int type);
void execute(RouteResultset rrs);
/**
* commit session

View File

@@ -185,7 +185,7 @@ public final class ServerLoadDataInfileHandler implements LoadDataInfileHandler
if (rrs != null) {
flushDataToFile();
isStartLoadData = false;
serverConnection.getSession2().execute(rrs, ServerParse.LOAD_DATA_INFILE_SQL);
serverConnection.getSession2().execute(rrs);
}
}
}
@@ -581,7 +581,7 @@ public final class ServerLoadDataInfileHandler implements LoadDataInfileHandler
RouteResultset rrs = buildResultSet(routeResultMap);
if (rrs != null) {
flushDataToFile();
serverConnection.getSession2().execute(rrs, ServerParse.LOAD_DATA_INFILE_SQL);
serverConnection.getSession2().execute(rrs);
}
// sendOk(++packID);

View File

@@ -1,137 +0,0 @@
/*
* Copyright (C) 2016-2017 ActionTech.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.sqlengine.mpp;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.mysql.nio.handler.MultiNodeQueryHandler;
import com.actiontech.dble.net.mysql.RowDataPacket;
import com.actiontech.dble.route.RouteResultset;
import com.actiontech.dble.server.NonBlockingSession;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Created by zagnix on 2016/7/6.
*/
public abstract class AbstractDataNodeMerge implements Runnable {
private static final Logger LOGGER = Logger.getLogger(AbstractDataNodeMerge.class);
/**
* col size
*/
protected int fieldCount;
/**
* cache the router
*/
protected final RouteResultset rrs;
protected MultiNodeQueryHandler multiQueryHandler = null;
/**
* the end packet
*/
protected PackWraper endFlagPack = new PackWraper();
/**
* rowData queue
*/
protected BlockingQueue<PackWraper> packs = new LinkedBlockingQueue<>();
/**
* the merge thread is running
*/
protected final AtomicBoolean running = new AtomicBoolean(false);
public AbstractDataNodeMerge(MultiNodeQueryHandler handler, RouteResultset rrs) {
this.rrs = rrs;
this.multiQueryHandler = handler;
}
/**
* Add a row pack, and may be wake up a business thread to work if not running.
*
* @param pack row pack
* @return true wake up a business thread, otherwise false
* @author Uncle-pan
* @since 2016-03-23
*/
protected final boolean addPack(final PackWraper pack) {
packs.add(pack);
if (running.get()) {
return false;
}
final DbleServer server = DbleServer.getInstance();
server.getBusinessExecutor().execute(this);
return true;
}
/**
* PackWraper the new row data
* process new record (mysql binary data),if data can output to client
* ,return true
*
* @param dataNode DN's name (data from this dataNode)
* @param rowData raw data
*/
public boolean onNewRecord(String dataNode, byte[] rowData) {
final PackWraper data = new PackWraper();
data.setDataNode(dataNode);
data.setRowData(rowData);
addPack(data);
return false;
}
/**
* get the index array of row according map
*
* @param columns
* @param toIndexMap
* @return
*/
protected static int[] toColumnIndex(String[] columns, Map<String, ColMeta> toIndexMap) {
int[] result = new int[columns.length];
ColMeta curColMeta;
for (int i = 0; i < columns.length; i++) {
curColMeta = toIndexMap.get(columns[i].toUpperCase());
if (curColMeta == null) {
throw new IllegalArgumentException(
"all columns in group by clause should be in the selected column list.!" + columns[i]);
}
result[i] = curColMeta.getColIndex();
}
return result;
}
@Override
public abstract void run();
public abstract void onRowMetaData(Map<String, ColMeta> columnToIndex, int fieldSize) throws IOException;
public void outputMergeResult(NonBlockingSession session, byte[] eof) {
addPack(endFlagPack);
}
public RouteResultset getRrs() {
return this.rrs;
}
/**
* @return (i*(offset+size) row)
*/
public abstract List<RowDataPacket> getResults(byte[] eof);
public abstract void clear();
}

View File

@@ -1,178 +0,0 @@
/*
* Copyright (C) 2016-2017 ActionTech.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.sqlengine.mpp;
import com.actiontech.dble.backend.mysql.BufferUtil;
import com.actiontech.dble.backend.mysql.nio.handler.MultiNodeQueryHandler;
import com.actiontech.dble.net.mysql.EOFPacket;
import com.actiontech.dble.net.mysql.RowDataPacket;
import com.actiontech.dble.route.RouteResultset;
import com.actiontech.dble.route.RouteResultsetNode;
import com.actiontech.dble.server.ServerConnection;
import org.apache.log4j.Logger;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/*
* Copyright (C) 2016-2017 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
/**
* Data merge service handle data Min,Max,AVG group . order by . limit
*
* @author wuzhih /modify by coder_czp/2015/11/2
* <p>
* Fixbug: sql timeout and hang problem.
* @author Uncle-pan
* @since 2016-03-23
*/
public class DataMergeService extends AbstractDataNodeMerge {
private RowDataPacketGrouper grouper;
private Map<String, LinkedList<RowDataPacket>> result = new HashMap<>();
private static final Logger LOGGER = Logger.getLogger(DataMergeService.class);
public DataMergeService(MultiNodeQueryHandler handler, RouteResultset rrs) {
super(handler, rrs);
for (RouteResultsetNode node : rrs.getNodes()) {
result.put(node.getName(), new LinkedList<RowDataPacket>());
}
}
/**
* @param columnToIndex columnToIndex
* @param fieldSize fieldSize
*/
public void onRowMetaData(Map<String, ColMeta> columnToIndex, int fieldSize) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("field metadata keys:" + columnToIndex.keySet());
LOGGER.debug("field metadata values:" + columnToIndex.values());
}
int[] groupColumnIndexes = null;
this.fieldCount = fieldSize;
if (rrs.getGroupByCols() != null) {
groupColumnIndexes = toColumnIndex(rrs.getGroupByCols(), columnToIndex);
}
grouper = new RowDataPacketGrouper(groupColumnIndexes);
}
/**
* release resources
*/
public void clear() {
result.clear();
grouper = null;
}
@Override
public void run() {
// sort-or-group: no need for us to using multi-threads, because
//both sorter and group are synchronized!!
// @author Uncle-pan
// @since 2016-03-23
if (!running.compareAndSet(false, true)) {
return;
}
// eof handler has been placed to "if (pack == END_FLAG_PACK){}" in for-statement
// @author Uncle-pan
// @since 2016-03-23
boolean nulPack = false;
try {
// loop-on-packs
for (; ; ) {
final PackWraper pack = packs.poll();
// async: handling row pack queue, this business thread should exit when no pack
// @author Uncle-pan
// @since 2016-03-23
if (pack == null) {
nulPack = true;
break;
}
// eof: handling eof pack and exit
if (pack == endFlagPack) {
final int warningCount = 0;
final EOFPacket eofPacket = new EOFPacket();
final ByteBuffer eof = ByteBuffer.allocate(9);
BufferUtil.writeUB3(eof, eofPacket.calcPacketSize());
eof.put(eofPacket.getPacketId());
eof.put(eofPacket.getFieldCount());
BufferUtil.writeUB2(eof, warningCount);
BufferUtil.writeUB2(eof, eofPacket.getStatus());
final ServerConnection source = multiQueryHandler.getSession().getSource();
final byte[] array = eof.array();
multiQueryHandler.outputMergeResult(source, array, getResults(array));
break;
}
// merge: sort-or-group, or simple add
final RowDataPacket row = new RowDataPacket(fieldCount);
row.read(pack.getRowData());
if (grouper != null) {
grouper.addRow(row);
} else {
result.get(pack.getDataNode()).add(row);
}
} // rof
} catch (final Exception e) {
multiQueryHandler.handleDataProcessException(e);
} finally {
running.set(false);
}
// try to check packs, it's possible that adding a pack after polling a null pack
//and before this time pointer!!
// @author Uncle-pan
// @since 2016-03-23
if (nulPack && !packs.isEmpty()) {
this.run();
}
}
/**
* return merged data (i * (offset + size) rows at most)
*
* @return list
*/
public List<RowDataPacket> getResults(byte[] eof) {
List<RowDataPacket> tmpResult = null;
if (this.grouper != null) {
tmpResult = grouper.getResult();
grouper = null;
}
//no grouper and sorter
if (tmpResult == null) {
tmpResult = new LinkedList<>();
for (RouteResultsetNode node : rrs.getNodes()) {
tmpResult.addAll(result.get(node.getName()));
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("prepare mpp merge result for " + rrs.getStatement());
}
return tmpResult;
}
}

View File

@@ -1,225 +0,0 @@
/*
* Copyright (C) 2016-2017 ActionTech.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.sqlengine.mpp;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.mysql.BufferUtil;
import com.actiontech.dble.backend.mysql.MySQLMessage;
import com.actiontech.dble.backend.mysql.nio.handler.MultiNodeQueryHandler;
import com.actiontech.dble.memory.SeverMemory;
import com.actiontech.dble.memory.unsafe.memory.mm.DataNodeMemoryManager;
import com.actiontech.dble.memory.unsafe.memory.mm.MemoryManager;
import com.actiontech.dble.memory.unsafe.row.BufferHolder;
import com.actiontech.dble.memory.unsafe.row.StructType;
import com.actiontech.dble.memory.unsafe.row.UnsafeRow;
import com.actiontech.dble.memory.unsafe.row.UnsafeRowWriter;
import com.actiontech.dble.memory.unsafe.utils.ServerPropertyConf;
import com.actiontech.dble.memory.unsafe.utils.sort.PrefixComparator;
import com.actiontech.dble.memory.unsafe.utils.sort.PrefixComparators;
import com.actiontech.dble.memory.unsafe.utils.sort.RowPrefixComputer;
import com.actiontech.dble.memory.unsafe.utils.sort.UnsafeExternalRowSorter;
import com.actiontech.dble.net.mysql.EOFPacket;
import com.actiontech.dble.net.mysql.RowDataPacket;
import com.actiontech.dble.route.RouteResultset;
import com.actiontech.dble.server.ServerConnection;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* Created by zagnix on 2016/6/21.
*/
public class DataNodeMergeManager extends AbstractDataNodeMerge {
private static final Logger LOGGER = Logger.getLogger(DataNodeMergeManager.class);
/**
* UnsafeRowGrouper
*/
private UnsafeRowGrouper unsafeRowGrouper = null;
/**
* global merge sorter
*/
private UnsafeExternalRowSorter globalMergeResult = null;
/**
* the context of sorter
*/
private final SeverMemory serverMemory;
private final MemoryManager memoryManager;
private final ServerPropertyConf conf;
public DataNodeMergeManager(MultiNodeQueryHandler handler, RouteResultset rrs) {
super(handler, rrs);
this.serverMemory = DbleServer.getInstance().getServerMemory();
this.memoryManager = serverMemory.getResultMergeMemoryManager();
this.conf = serverMemory.getConf();
}
public void onRowMetaData(Map<String, ColMeta> columnToIndex, int fieldSize) throws IOException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("field metadata keys:" + columnToIndex.keySet());
LOGGER.debug("field metadata values:" + columnToIndex.values());
}
this.fieldCount = fieldSize;
String[] groupByCols = rrs.getGroupByCols();
unsafeRowGrouper = new UnsafeRowGrouper(columnToIndex, groupByCols);
// 1.schema
StructType schema = new StructType(columnToIndex, fieldSize);
if (groupByCols != null) {
OrderCol[] orderCols = new OrderCol[groupByCols.length];
for (int i = 0; i < groupByCols.length; i++) {
orderCols[i] = new OrderCol(columnToIndex.get(groupByCols[i].toUpperCase()));
}
schema.setOrderCols(orderCols);
}
//2 .PrefixComputer
UnsafeExternalRowSorter.PrefixComputer prefixComputer = new RowPrefixComputer(schema);
//3 .PrefixComparator ,ASC/DESC and the default is ASC
PrefixComparator prefixComparator = PrefixComparators.LONG;
DataNodeMemoryManager dataNodeMemoryManager = new DataNodeMemoryManager(memoryManager,
Thread.currentThread().getId());
globalMergeResult = new UnsafeExternalRowSorter(
dataNodeMemoryManager,
serverMemory,
schema,
prefixComparator,
prefixComputer,
conf.getSizeAsBytes("server.buffer.pageSize", "1m"),
false,
true);
}
@Override
public List<RowDataPacket> getResults(byte[] eof) {
return null;
}
@Override
public void run() {
if (!running.compareAndSet(false, true)) {
return;
}
boolean nulPack = false;
try {
for (; ; ) {
final PackWraper pack = packs.poll();
if (pack == null) {
nulPack = true;
break;
}
if (pack == endFlagPack) {
/*
* if last date node send row eof packet
* means all the data have received
*/
final int warningCount = 0;
final EOFPacket eofPacket = new EOFPacket();
final ByteBuffer eof = ByteBuffer.allocate(9);
BufferUtil.writeUB3(eof, eofPacket.calcPacketSize());
eof.put(eofPacket.getPacketId());
eof.put(eofPacket.getFieldCount());
BufferUtil.writeUB2(eof, warningCount);
BufferUtil.writeUB2(eof, eofPacket.getStatus());
final ServerConnection source = multiQueryHandler.getSession().getSource();
final byte[] array = eof.array();
Iterator<UnsafeRow> iterator;
if (unsafeRowGrouper != null) {
iterator = unsafeRowGrouper.getResult(globalMergeResult);
} else {
iterator = globalMergeResult.sort();
}
if (iterator != null) {
multiQueryHandler.outputMergeResult(source, array, iterator);
}
if (unsafeRowGrouper != null) {
unsafeRowGrouper.free();
unsafeRowGrouper = null;
}
if (globalMergeResult != null) {
globalMergeResult.cleanupResources();
globalMergeResult = null;
}
break;
}
UnsafeRow unsafeRow = new UnsafeRow(fieldCount);
BufferHolder bufferHolder = new BufferHolder(unsafeRow, 0);
UnsafeRowWriter unsafeRowWriter = new UnsafeRowWriter(bufferHolder, fieldCount);
bufferHolder.reset();
// make a row to filled col
MySQLMessage mm = new MySQLMessage(pack.getRowData());
mm.readUB3();
mm.read();
for (int i = 0; i < fieldCount; i++) {
byte[] colValue = mm.readBytesWithLength();
if (colValue != null)
unsafeRowWriter.write(i, colValue);
else
unsafeRow.setNullAt(i);
}
unsafeRow.setTotalSize(bufferHolder.totalSize());
if (unsafeRowGrouper != null) {
unsafeRowGrouper.addRow(unsafeRow);
} else {
globalMergeResult.insertRow(unsafeRow);
}
}
} catch (final Exception e) {
multiQueryHandler.handleDataProcessException(e);
} finally {
running.set(false);
if (nulPack && !packs.isEmpty()) {
this.run();
}
}
}
/**
* release the resource of DataNodeMergeManager
*/
public void clear() {
if (unsafeRowGrouper != null) {
unsafeRowGrouper.free();
unsafeRowGrouper = null;
}
if (globalMergeResult != null) {
globalMergeResult.cleanupResources();
globalMergeResult = null;
}
}
}

View File

@@ -1,27 +0,0 @@
/*
* Copyright (C) 2016-2017 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.sqlengine.mpp;
import com.actiontech.dble.net.mysql.RowDataPacket;
import com.actiontech.dble.sqlengine.mpp.tmp.RowDataSorter;
public class RangRowDataPacketSorter extends RowDataSorter {
public RangRowDataPacketSorter(OrderCol[] orderCols) {
super(orderCols);
}
public boolean ascDesc(int byColumnIndex) {
return this.orderCols[byColumnIndex].orderType == OrderCol.COL_ORDER_TYPE_ASC;
}
public int compareRowData(RowDataPacket l, RowDataPacket r, int byColumnIndex) {
byte[] left = l.fieldValues.get(this.orderCols[byColumnIndex].colMeta.getColIndex());
byte[] right = r.fieldValues.get(this.orderCols[byColumnIndex].colMeta.getColIndex());
return RowDataPacketSorter.compareObject(left, right, this.orderCols[byColumnIndex]);
}
}

View File

@@ -1,64 +0,0 @@
/*
* Copyright (C) 2016-2017 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.sqlengine.mpp;
import com.actiontech.dble.net.mysql.RowDataPacket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
/**
* implement group function select a,count(*),sum(*) from A group by a
*
* @author wuzhih
*/
public class RowDataPacketGrouper {
private List<RowDataPacket> result = Collections.synchronizedList(new ArrayList<RowDataPacket>());
private final int[] groupColumnIndexes;
public RowDataPacketGrouper(int[] groupColumnIndexes) {
super();
this.groupColumnIndexes = groupColumnIndexes;
}
public List<RowDataPacket> getResult() {
return result;
}
public void addRow(RowDataPacket rowDataPkg) {
for (RowDataPacket row : result) {
if (sameGroupColumns(rowDataPkg, row)) {
return;
}
}
// not aggreated ,insert new
result.add(rowDataPkg);
}
// private static final
private boolean sameGroupColumns(RowDataPacket newRow, RowDataPacket existRow) {
if (groupColumnIndexes == null) { // select count(*) from aaa , or group
// column
return true;
}
for (int groupColumnIndex : groupColumnIndexes) {
if (!Arrays.equals(newRow.fieldValues.get(groupColumnIndex),
existRow.fieldValues.get(groupColumnIndex))) {
return false;
}
}
return true;
}
}

View File

@@ -1,486 +0,0 @@
/*
* Copyright (C) 2016-2017 ActionTech.
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.sqlengine.mpp;
import com.actiontech.dble.DbleServer;
import com.actiontech.dble.memory.SeverMemory;
import com.actiontech.dble.memory.unsafe.KVIterator;
import com.actiontech.dble.memory.unsafe.map.UnsafeFixedWidthAggregationMap;
import com.actiontech.dble.memory.unsafe.memory.mm.DataNodeMemoryManager;
import com.actiontech.dble.memory.unsafe.memory.mm.MemoryManager;
import com.actiontech.dble.memory.unsafe.row.BufferHolder;
import com.actiontech.dble.memory.unsafe.row.StructType;
import com.actiontech.dble.memory.unsafe.row.UnsafeRow;
import com.actiontech.dble.memory.unsafe.row.UnsafeRowWriter;
import com.actiontech.dble.memory.unsafe.utils.BytesTools;
import com.actiontech.dble.memory.unsafe.utils.ServerPropertyConf;
import com.actiontech.dble.memory.unsafe.utils.sort.UnsafeExternalRowSorter;
import com.actiontech.dble.util.ByteUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Created by zagnix on 2016/6/26.
* <p>
* implement group function select a,count(*),sum(*) from A group by a
*/
public class UnsafeRowGrouper {
private static final Logger LOGGER = LoggerFactory.getLogger(UnsafeRowGrouper.class);
private UnsafeFixedWidthAggregationMap aggregationMap = null;
private final Map<String, ColMeta> columnToIndexes;
private String[] sortColumnsByIndex = null;
private UnsafeRow valueKey = null;
private BufferHolder bufferHolder = null;
private UnsafeRowWriter unsafeRowWriter = null;
private final int groupKeyFieldCount;
private final int valueFieldCount;
private StructType groupKeySchema;
private StructType aggBufferSchema;
private UnsafeRow emptyAggregationBuffer;
public UnsafeRowGrouper(Map<String, ColMeta> columnToIndexes, String[] columns) {
super();
assert columns != null;
assert columnToIndexes != null;
this.columnToIndexes = columnToIndexes;
this.sortColumnsByIndex = columns != null ? toSortColumnsByIndex(columns, columnToIndexes) : null;
this.groupKeyFieldCount = columns != null ? columns.length : 0;
this.valueFieldCount = columnToIndexes != null ? columnToIndexes.size() : 0;
LOGGER.debug("columnToIndex :" + (columnToIndexes != null ? columnToIndexes.toString() : "null"));
SeverMemory serverMemory = DbleServer.getInstance().getServerMemory();
MemoryManager memoryManager = serverMemory.getResultMergeMemoryManager();
ServerPropertyConf conf = serverMemory.getConf();
initGroupKey();
initEmptyValueKey();
DataNodeMemoryManager dataNodeMemoryManager =
new DataNodeMemoryManager(memoryManager, Thread.currentThread().getId());
aggregationMap = new UnsafeFixedWidthAggregationMap(
emptyAggregationBuffer,
aggBufferSchema,
groupKeySchema,
dataNodeMemoryManager,
2 * 1024,
conf.getSizeAsBytes("server.buffer.pageSize", "1m"),
false);
}
private String[] toSortColumnsByIndex(String[] columns, Map<String, ColMeta> colMetaMap) {
Map<String, Integer> map = new HashMap<>();
ColMeta curColMeta;
for (String column : columns) {
curColMeta = colMetaMap.get(column.toUpperCase());
if (curColMeta == null) {
throw new IllegalArgumentException(
"all columns in group by clause should be in the selected column list.!" + column);
}
map.put(column, curColMeta.getColIndex());
}
String[] sortColumnsIndexes = new String[map.size()];
List<Map.Entry<String, Integer>> entryList = new ArrayList<>(
map.entrySet());
Collections.sort(entryList, new Comparator<Map.Entry<String, Integer>>() {
@Override
public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
return o1.getValue().compareTo(o2.getValue());
}
});
Iterator<Map.Entry<String, Integer>> iterator = entryList.iterator();
Map.Entry<String, Integer> tmpEntry = null;
int index = 0;
while (iterator.hasNext()) {
tmpEntry = iterator.next();
sortColumnsIndexes[index++] = tmpEntry.getKey();
}
return sortColumnsIndexes;
}
private void initGroupKey() {
Map<String, ColMeta> groupColMetaMap = new HashMap<>(this.groupKeyFieldCount);
UnsafeRow groupKey = new UnsafeRow(this.groupKeyFieldCount);
bufferHolder = new BufferHolder(groupKey, 0);
unsafeRowWriter = new UnsafeRowWriter(bufferHolder, this.groupKeyFieldCount);
bufferHolder.reset();
ColMeta curColMeta = null;
for (int i = 0; i < this.groupKeyFieldCount; i++) {
curColMeta = this.columnToIndexes.get(sortColumnsByIndex[i].toUpperCase());
groupColMetaMap.put(sortColumnsByIndex[i], curColMeta);
switch (curColMeta.getColType()) {
case ColMeta.COL_TYPE_BIT:
groupKey.setByte(i, (byte) 0);
break;
case ColMeta.COL_TYPE_INT:
case ColMeta.COL_TYPE_INT24:
case ColMeta.COL_TYPE_LONG:
groupKey.setInt(i, 0);
break;
case ColMeta.COL_TYPE_SHORT:
groupKey.setShort(i, (short) 0);
break;
case ColMeta.COL_TYPE_FLOAT:
groupKey.setFloat(i, 0);
break;
case ColMeta.COL_TYPE_DOUBLE:
groupKey.setDouble(i, 0);
break;
case ColMeta.COL_TYPE_NEWDECIMAL:
//groupKey.setDouble(i, 0);
unsafeRowWriter.write(i, new BigDecimal(0L));
break;
case ColMeta.COL_TYPE_LONGLONG:
groupKey.setLong(i, 0);
break;
default:
unsafeRowWriter.write(i, "init".getBytes());
break;
}
}
groupKey.setTotalSize(bufferHolder.totalSize());
groupKeySchema = new StructType(groupColMetaMap, this.groupKeyFieldCount);
}
private void initEmptyValueKey() {
emptyAggregationBuffer = new UnsafeRow(this.valueFieldCount);
bufferHolder = new BufferHolder(emptyAggregationBuffer, 0);
unsafeRowWriter = new UnsafeRowWriter(bufferHolder, this.valueFieldCount);
bufferHolder.reset();
ColMeta curColMeta = null;
for (Map.Entry<String, ColMeta> fieldEntry : columnToIndexes.entrySet()) {
curColMeta = fieldEntry.getValue();
switch (curColMeta.getColType()) {
case ColMeta.COL_TYPE_BIT:
emptyAggregationBuffer.setByte(curColMeta.getColIndex(), (byte) 0);
break;
case ColMeta.COL_TYPE_INT:
case ColMeta.COL_TYPE_INT24:
case ColMeta.COL_TYPE_LONG:
emptyAggregationBuffer.setInt(curColMeta.getColIndex(), 0);
break;
case ColMeta.COL_TYPE_SHORT:
emptyAggregationBuffer.setShort(curColMeta.getColIndex(), (short) 0);
break;
case ColMeta.COL_TYPE_LONGLONG:
emptyAggregationBuffer.setLong(curColMeta.getColIndex(), 0);
break;
case ColMeta.COL_TYPE_FLOAT:
emptyAggregationBuffer.setFloat(curColMeta.getColIndex(), 0);
break;
case ColMeta.COL_TYPE_DOUBLE:
emptyAggregationBuffer.setDouble(curColMeta.getColIndex(), 0);
break;
case ColMeta.COL_TYPE_NEWDECIMAL:
//emptyAggregationBuffer.setDouble(curColMeta.colIndex, 0);
unsafeRowWriter.write(curColMeta.getColIndex(), new BigDecimal(0L));
break;
default:
unsafeRowWriter.write(curColMeta.getColIndex(), "init".getBytes());
break;
}
}
emptyAggregationBuffer.setTotalSize(bufferHolder.totalSize());
aggBufferSchema = new StructType(columnToIndexes, this.valueFieldCount);
}
public Iterator<UnsafeRow> getResult(@Nonnull UnsafeExternalRowSorter sorter) throws IOException {
KVIterator<UnsafeRow, UnsafeRow> iterator = aggregationMap.iterator();
/**
* group having
*/
insertValue(sorter);
return sorter.sort();
}
/**
* is Avg Field
*
* @param columnName
* @return
*/
private boolean isAvgField(String columnName) {
Pattern pattern = Pattern.compile("AVG([1-9]\\d*|0)SUM");
Matcher matcher = pattern.matcher(columnName);
return matcher.find();
}
public UnsafeRow getAllBinaryRow(UnsafeRow row) throws UnsupportedEncodingException {
UnsafeRow value = new UnsafeRow(this.valueFieldCount);
bufferHolder = new BufferHolder(value, 0);
unsafeRowWriter = new UnsafeRowWriter(bufferHolder, this.valueFieldCount);
bufferHolder.reset();
ColMeta curColMeta = null;
for (Map.Entry<String, ColMeta> fieldEntry : columnToIndexes.entrySet()) {
curColMeta = fieldEntry.getValue();
if (!row.isNullAt(curColMeta.getColIndex())) {
switch (curColMeta.getColType()) {
case ColMeta.COL_TYPE_BIT:
unsafeRowWriter.write(curColMeta.getColIndex(), row.getByte(curColMeta.getColIndex()));
break;
case ColMeta.COL_TYPE_INT:
case ColMeta.COL_TYPE_LONG:
case ColMeta.COL_TYPE_INT24:
unsafeRowWriter.write(curColMeta.getColIndex(),
BytesTools.int2Bytes(row.getInt(curColMeta.getColIndex())));
break;
case ColMeta.COL_TYPE_SHORT:
unsafeRowWriter.write(curColMeta.getColIndex(),
BytesTools.short2Bytes(row.getShort(curColMeta.getColIndex())));
break;
case ColMeta.COL_TYPE_LONGLONG:
unsafeRowWriter.write(curColMeta.getColIndex(),
BytesTools.long2Bytes(row.getLong(curColMeta.getColIndex())));
break;
case ColMeta.COL_TYPE_FLOAT:
unsafeRowWriter.write(curColMeta.getColIndex(),
BytesTools.float2Bytes(row.getFloat(curColMeta.getColIndex())));
break;
case ColMeta.COL_TYPE_DOUBLE:
unsafeRowWriter.write(curColMeta.getColIndex(),
BytesTools.double2Bytes(row.getDouble(curColMeta.getColIndex())));
break;
case ColMeta.COL_TYPE_NEWDECIMAL:
int scale = curColMeta.getDecimals();
BigDecimal decimalVal = row.getDecimal(curColMeta.getColIndex(), scale);
unsafeRowWriter.write(curColMeta.getColIndex(), decimalVal.toString().getBytes());
break;
default:
unsafeRowWriter.write(curColMeta.getColIndex(),
row.getBinary(curColMeta.getColIndex()));
break;
}
} else {
unsafeRowWriter.setNullAt(curColMeta.getColIndex());
}
}
value.setTotalSize(bufferHolder.totalSize());
return value;
}
private void insertValue(@Nonnull UnsafeExternalRowSorter sorter) {
KVIterator<UnsafeRow, UnsafeRow> it = aggregationMap.iterator();
try {
while (it.next()) {
UnsafeRow row = getAllBinaryRow(it.getValue());
sorter.insertRow(row);
}
} catch (IOException e) {
LOGGER.error(e.getMessage());
}
}
private boolean lt(byte[] l, byte[] r) {
return -1 != ByteUtil.compareNumberByte(l, r);
}
private boolean gt(byte[] l, byte[] r) {
return 1 != ByteUtil.compareNumberByte(l, r);
}
private boolean eq(byte[] l, byte[] r) {
return 0 != ByteUtil.compareNumberByte(l, r);
}
private boolean neq(byte[] l, byte[] r) {
return 0 == ByteUtil.compareNumberByte(l, r);
}
private UnsafeRow getGroupKey(UnsafeRow row) throws UnsupportedEncodingException {
UnsafeRow key = null;
if (this.sortColumnsByIndex == null) {
/**
* no group by key word
* select count(*) from table;
*/
key = new UnsafeRow(this.groupKeyFieldCount + 1);
bufferHolder = new BufferHolder(key, 0);
unsafeRowWriter = new UnsafeRowWriter(bufferHolder, this.groupKeyFieldCount + 1);
bufferHolder.reset();
unsafeRowWriter.write(0, "same".getBytes());
key.setTotalSize(bufferHolder.totalSize());
return key;
}
key = new UnsafeRow(this.groupKeyFieldCount);
bufferHolder = new BufferHolder(key, 0);
unsafeRowWriter = new UnsafeRowWriter(bufferHolder, this.groupKeyFieldCount);
bufferHolder.reset();
ColMeta curColMeta = null;
for (int i = 0; i < this.groupKeyFieldCount; i++) {
curColMeta = this.columnToIndexes.get(sortColumnsByIndex[i].toUpperCase());
if (!row.isNullAt(curColMeta.getColIndex())) {
switch (curColMeta.getColType()) {
case ColMeta.COL_TYPE_BIT:
key.setByte(i, row.getByte(curColMeta.getColIndex()));
// fallthrough
case ColMeta.COL_TYPE_INT:
case ColMeta.COL_TYPE_LONG:
case ColMeta.COL_TYPE_INT24:
key.setInt(i,
BytesTools.getInt(row.getBinary(curColMeta.getColIndex())));
break;
case ColMeta.COL_TYPE_SHORT:
key.setShort(i,
BytesTools.getShort(row.getBinary(curColMeta.getColIndex())));
break;
case ColMeta.COL_TYPE_FLOAT:
key.setFloat(i,
BytesTools.getFloat(row.getBinary(curColMeta.getColIndex())));
break;
case ColMeta.COL_TYPE_DOUBLE:
key.setDouble(i,
BytesTools.getDouble(row.getBinary(curColMeta.getColIndex())));
break;
case ColMeta.COL_TYPE_NEWDECIMAL:
//key.setDouble(i, BytesTools.getDouble(row.getBinary(curColMeta.colIndex)));
unsafeRowWriter.write(i,
new BigDecimal(new String(row.getBinary(curColMeta.getColIndex()))));
break;
case ColMeta.COL_TYPE_LONGLONG:
key.setLong(i,
BytesTools.getLong(row.getBinary(curColMeta.getColIndex())));
break;
default:
unsafeRowWriter.write(i,
row.getBinary(curColMeta.getColIndex()));
break;
}
} else {
key.setNullAt(i);
}
}
key.setTotalSize(bufferHolder.totalSize());
return key;
}
private UnsafeRow getValue(UnsafeRow row) throws UnsupportedEncodingException {
UnsafeRow value = new UnsafeRow(this.valueFieldCount);
bufferHolder = new BufferHolder(value, 0);
unsafeRowWriter = new UnsafeRowWriter(bufferHolder, this.valueFieldCount);
bufferHolder.reset();
ColMeta curColMeta = null;
for (Map.Entry<String, ColMeta> fieldEntry : columnToIndexes.entrySet()) {
curColMeta = fieldEntry.getValue();
if (!row.isNullAt(curColMeta.getColIndex())) {
switch (curColMeta.getColType()) {
case ColMeta.COL_TYPE_BIT:
value.setByte(curColMeta.getColIndex(), row.getByte(curColMeta.getColIndex()));
break;
case ColMeta.COL_TYPE_INT:
case ColMeta.COL_TYPE_LONG:
case ColMeta.COL_TYPE_INT24:
value.setInt(curColMeta.getColIndex(),
BytesTools.getInt(row.getBinary(curColMeta.getColIndex())));
break;
case ColMeta.COL_TYPE_SHORT:
value.setShort(curColMeta.getColIndex(),
BytesTools.getShort(row.getBinary(curColMeta.getColIndex())));
break;
case ColMeta.COL_TYPE_LONGLONG:
value.setLong(curColMeta.getColIndex(),
BytesTools.getLong(row.getBinary(curColMeta.getColIndex())));
break;
case ColMeta.COL_TYPE_FLOAT:
value.setFloat(curColMeta.getColIndex(),
BytesTools.getFloat(row.getBinary(curColMeta.getColIndex())));
break;
case ColMeta.COL_TYPE_DOUBLE:
value.setDouble(curColMeta.getColIndex(), BytesTools.getDouble(row.getBinary(curColMeta.getColIndex())));
break;
case ColMeta.COL_TYPE_NEWDECIMAL:
//value.setDouble(curColMeta.colIndex, BytesTools.getDouble(row.getBinary(curColMeta.colIndex)));
unsafeRowWriter.write(curColMeta.getColIndex(),
new BigDecimal(new String(row.getBinary(curColMeta.getColIndex()))));
break;
default:
unsafeRowWriter.write(curColMeta.getColIndex(),
row.getBinary(curColMeta.getColIndex()));
break;
}
} else {
switch (curColMeta.getColType()) {
case ColMeta.COL_TYPE_NEWDECIMAL:
BigDecimal nullDecimal = null;
unsafeRowWriter.write(curColMeta.getColIndex(), nullDecimal);
break;
default:
value.setNullAt(curColMeta.getColIndex());
break;
}
}
}
value.setTotalSize(bufferHolder.totalSize());
return value;
}
public void addRow(UnsafeRow rowDataPkg) throws UnsupportedEncodingException {
UnsafeRow key = getGroupKey(rowDataPkg);
UnsafeRow value = getValue(rowDataPkg);
if (!aggregationMap.find(key)) {
aggregationMap.put(key, value);
}
}
public void free() {
if (aggregationMap != null)
aggregationMap.free();
}
}

View File

@@ -1,184 +0,0 @@
/*
* Copyright (C) 2016-2017 ActionTech.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.memory.unsafe.sort;
import com.actiontech.dble.memory.SeverMemory;
import com.actiontech.dble.memory.unsafe.memory.mm.DataNodeMemoryManager;
import com.actiontech.dble.memory.unsafe.memory.mm.MemoryManager;
import com.actiontech.dble.memory.unsafe.row.BufferHolder;
import com.actiontech.dble.memory.unsafe.row.StructType;
import com.actiontech.dble.memory.unsafe.row.UnsafeRow;
import com.actiontech.dble.memory.unsafe.row.UnsafeRowWriter;
import com.actiontech.dble.memory.unsafe.utils.ServerPropertyConf;
import com.actiontech.dble.memory.unsafe.utils.sort.PrefixComparator;
import com.actiontech.dble.memory.unsafe.utils.sort.PrefixComparators;
import com.actiontech.dble.memory.unsafe.utils.sort.RowPrefixComputer;
import com.actiontech.dble.memory.unsafe.utils.sort.UnsafeExternalRowSorter;
import com.actiontech.dble.sqlengine.mpp.ColMeta;
import com.actiontech.dble.sqlengine.mpp.OrderCol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
/**
* Created by zagnix on 16-7-9.
*/
public class TestSorter implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(TestSorter.class);
private static final int TEST_SIZE = 1000000;
private static int TASK_SIZE = 100;
private static CountDownLatch countDownLatch = new CountDownLatch(100);
public void runSorter(SeverMemory severMemory,
MemoryManager memoryManager,
ServerPropertyConf conf) throws NoSuchFieldException, IllegalAccessException, IOException {
DataNodeMemoryManager dataNodeMemoryManager = new DataNodeMemoryManager(memoryManager,
Thread.currentThread().getId());
/**
* 1.schema ,mock a field
*
*/
int fieldCount = 3;
ColMeta colMeta = null;
Map<String, ColMeta> colMetaMap = new HashMap<String, ColMeta>(fieldCount);
colMeta = new ColMeta(0, ColMeta.COL_TYPE_STRING);
colMetaMap.put("id", colMeta);
colMeta = new ColMeta(1, ColMeta.COL_TYPE_STRING);
colMetaMap.put("name", colMeta);
colMeta = new ColMeta(2, ColMeta.COL_TYPE_STRING);
colMetaMap.put("age", colMeta);
OrderCol[] orderCols = new OrderCol[1];
OrderCol orderCol = new OrderCol(colMetaMap.get("id"),
OrderCol.COL_ORDER_TYPE_ASC);
orderCols[0] = orderCol;
/**
* 2 .PrefixComputer
*/
StructType schema = new StructType(colMetaMap, fieldCount);
schema.setOrderCols(orderCols);
UnsafeExternalRowSorter.PrefixComputer prefixComputer =
new RowPrefixComputer(schema);
/**
* 3 .PrefixComparator defalut is ASC, or set DESC
*/
final PrefixComparator prefixComparator = PrefixComparators.LONG;
UnsafeExternalRowSorter sorter =
new UnsafeExternalRowSorter(dataNodeMemoryManager,
severMemory,
schema,
prefixComparator,
prefixComputer,
conf.getSizeAsBytes("server.buffer.pageSize", "1m"),
true,
true);
UnsafeRow unsafeRow;
BufferHolder bufferHolder;
UnsafeRowWriter unsafeRowWriter;
String line = "testUnsafeRow";
final Random rand = new Random(42);
for (int i = 0; i < TEST_SIZE; i++) {
unsafeRow = new UnsafeRow(3);
bufferHolder = new BufferHolder(unsafeRow);
unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 3);
bufferHolder.reset();
String key = getRandomString(rand.nextInt(300) + 100);
unsafeRowWriter.write(0, key.getBytes());
unsafeRowWriter.write(1, line.getBytes());
unsafeRowWriter.write(2, ("35" + 1).getBytes());
unsafeRow.setTotalSize(bufferHolder.totalSize());
sorter.insertRow(unsafeRow);
}
Iterator<UnsafeRow> iter = sorter.sort();
UnsafeRow row = null;
int indexprint = 0;
while (iter.hasNext()) {
row = iter.next();
indexprint++;
}
sorter.cleanupResources();
countDownLatch.countDown();
System.out.println("Thread ID :" + Thread.currentThread().getId() + "Index : " + indexprint);
}
public static String getRandomString(int length) { //length of string
String base = "abcdefghijklmnopqrstuvwxyz0123456789";
Random random = new Random();
StringBuffer sb = new StringBuffer();
for (int i = 0; i < length; i++) {
int number = random.nextInt(base.length());
sb.append(base.charAt(number));
}
return sb.toString();
}
final SeverMemory severMemory;
final MemoryManager memoryManager;
final ServerPropertyConf conf;
public TestSorter(SeverMemory severMemory, MemoryManager memoryManager, ServerPropertyConf conf) throws NoSuchFieldException, IllegalAccessException {
this.severMemory = severMemory;
this.memoryManager = memoryManager;
this.conf = conf;
}
@Override
public void run() {
try {
runSorter(severMemory, memoryManager, conf);
} catch (NoSuchFieldException e) {
logger.error(e.getMessage());
} catch (IllegalAccessException e) {
logger.error(e.getMessage());
} catch (IOException e) {
logger.error(e.getMessage());
}
}
public static void main(String[] args) throws Exception {
SeverMemory severMemory;
MemoryManager memoryManager;
ServerPropertyConf conf;
severMemory = new SeverMemory();
memoryManager = severMemory.getResultMergeMemoryManager();
conf = severMemory.getConf();
for (int i = 0; i < TASK_SIZE; i++) {
Thread thread = new Thread(new TestSorter(severMemory, memoryManager, conf));
thread.start();
}
while (countDownLatch.getCount() != 0) {
System.err.println("count ========================>" + countDownLatch.getCount());
Thread.sleep(1000);
}
System.err.println(TASK_SIZE + " tasks sorter finished ok !!!!!!!!!");
System.exit(1);
}
}

View File

@@ -1,155 +0,0 @@
/*
* Copyright (C) 2016-2017 ActionTech.
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
*/
package com.actiontech.dble.memory.unsafe.sort;
import com.actiontech.dble.memory.SeverMemory;
import com.actiontech.dble.memory.unsafe.memory.mm.DataNodeMemoryManager;
import com.actiontech.dble.memory.unsafe.memory.mm.MemoryManager;
import com.actiontech.dble.memory.unsafe.row.BufferHolder;
import com.actiontech.dble.memory.unsafe.row.StructType;
import com.actiontech.dble.memory.unsafe.row.UnsafeRow;
import com.actiontech.dble.memory.unsafe.row.UnsafeRowWriter;
import com.actiontech.dble.memory.unsafe.utils.ServerPropertyConf;
import com.actiontech.dble.memory.unsafe.utils.sort.PrefixComparator;
import com.actiontech.dble.memory.unsafe.utils.sort.PrefixComparators;
import com.actiontech.dble.memory.unsafe.utils.sort.RowPrefixComputer;
import com.actiontech.dble.memory.unsafe.utils.sort.UnsafeExternalRowSorter;
import com.actiontech.dble.sqlengine.mpp.ColMeta;
import com.actiontech.dble.sqlengine.mpp.OrderCol;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
/**
* Created by zagnix on 2016/6/19.
*/
public class UnsafeExternalRowSorterTest {
private static final int TEST_SIZE = 100000;
public static final Logger LOGGER = LoggerFactory.getLogger(UnsafeExternalRowSorterTest.class);
/**
* test type LONG,INT,SHORT,Float,Double,String,Binary
*/
@Test
public void testUnsafeExternalRowSorter() throws NoSuchFieldException, IllegalAccessException, IOException {
SeverMemory severMemory = new SeverMemory();
MemoryManager memoryManager = severMemory.getResultMergeMemoryManager();
ServerPropertyConf conf = severMemory.getConf();
DataNodeMemoryManager dataNodeMemoryManager = new DataNodeMemoryManager(memoryManager,
Thread.currentThread().getId());
int fieldCount = 3;
ColMeta colMeta = null;
Map<String, ColMeta> colMetaMap = new HashMap<String, ColMeta>(fieldCount);
colMeta = new ColMeta(0, ColMeta.COL_TYPE_STRING);
colMetaMap.put("id", colMeta);
colMeta = new ColMeta(1, ColMeta.COL_TYPE_STRING);
colMetaMap.put("name", colMeta);
colMeta = new ColMeta(2, ColMeta.COL_TYPE_STRING);
colMetaMap.put("age", colMeta);
OrderCol[] orderCols = new OrderCol[1];
OrderCol orderCol = new OrderCol(colMetaMap.get("id"),
OrderCol.COL_ORDER_TYPE_ASC);
orderCols[0] = orderCol;
/**
* 2 .PrefixComputer
*/
StructType schema = new StructType(colMetaMap, fieldCount);
schema.setOrderCols(orderCols);
UnsafeExternalRowSorter.PrefixComputer prefixComputer =
new RowPrefixComputer(schema);
final PrefixComparator prefixComparator = PrefixComparators.LONG;
UnsafeExternalRowSorter sorter =
new UnsafeExternalRowSorter(dataNodeMemoryManager,
severMemory,
schema,
prefixComparator,
prefixComputer,
conf.getSizeAsBytes("server.buffer.pageSize", "1m"),
true,
true);
UnsafeRow unsafeRow;
BufferHolder bufferHolder;
UnsafeRowWriter unsafeRowWriter;
String line = "testUnsafeRow";
// List<Float> floats = new ArrayList<Float>();
List<Long> longs = new ArrayList<Long>();
final Random rand = new Random(42);
for (int i = 0; i < TEST_SIZE; i++) {
unsafeRow = new UnsafeRow(3);
bufferHolder = new BufferHolder(unsafeRow);
unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 3);
bufferHolder.reset();
String key = getRandomString(rand.nextInt(300) + 100);
//long v = rand.nextLong();
// longs.add(v);
unsafeRowWriter.write(0, key.getBytes());
// unsafeRowWriter.write(0, BytesTools.toBytes(v));
unsafeRowWriter.write(1, line.getBytes());
unsafeRowWriter.write(2, ("35" + 1).getBytes());
unsafeRow.setTotalSize(bufferHolder.totalSize());
sorter.insertRow(unsafeRow);
}
Iterator<UnsafeRow> iter = sorter.sort();
/*
float [] com = new float[floats.size()];
for (int i = 0; i <floats.size() ; i++) {
com[i] = floats.get(i);
}
Arrays.sort(com);
long[] com = new long[longs.size()];
for (int i = 0; i < longs.size() ; i++) {
com[i] = longs.get(i);
}
Arrays.sort(com);
*/
UnsafeRow row = null;
int indexprint = 0;
while (iter.hasNext()) {
row = iter.next();
// LOGGER.error(indexprint + " " + row.getUTF8String(0));
//Assert.assertEquals(com[indexprint],
// BytesTools.toLong(row.getBinary(0)));
// Double c = Double.parseDouble(String.valueOf(com[indexprint])) ;
// Double c1 = Double.parseDouble(String.valueOf(BytesTools.toFloat(row.getBinary(0)))) ;
// Assert.assertEquals(0,c.compareTo(c1));
indexprint++;
}
Assert.assertEquals(TEST_SIZE, indexprint);
}
public static String getRandomString(int length) {
String base = "abcdefghijklmnopqrstuvwxyz0123456789";
Random random = new Random();
StringBuffer sb = new StringBuffer();
for (int i = 0; i < length; i++) {
int number = random.nextInt(base.length());
sb.append(base.charAt(number));
}
return sb.toString();
}
}