diff --git a/findbugs-exclude.xml b/findbugs-exclude.xml index 44d491f8b..70ab6312d 100644 --- a/findbugs-exclude.xml +++ b/findbugs-exclude.xml @@ -24,14 +24,6 @@ - - - - - - - - diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDdlHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDdlHandler.java index d0fabcd36..739025803 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDdlHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeDdlHandler.java @@ -44,7 +44,7 @@ public class MultiNodeDdlHandler extends MultiNodeHandler { private ErrorPacket err; private volatile boolean errConn = false; private Set closedConnSet; - public MultiNodeDdlHandler(int sqlType, RouteResultset rrs, NonBlockingSession session) { + public MultiNodeDdlHandler(RouteResultset rrs, NonBlockingSession session) { super(session); if (rrs.getNodes() == null) { throw new IllegalArgumentException("routeNode is null!"); @@ -59,7 +59,8 @@ public class MultiNodeDdlHandler extends MultiNodeHandler { this.session = session; this.oriRrs = rrs; - this.handler = new MultiNodeQueryHandler(sqlType, rrs, session); + this.handler = new MultiNodeQueryHandler(rrs, session); + this.errConn = false; } @@ -250,6 +251,7 @@ public class MultiNodeDdlHandler extends MultiNodeHandler { handler.execute(); } catch (Exception e) { LOGGER.warn(String.valueOf(source) + oriRrs, e); + session.handleSpecial(oriRrs, source.getSchema(), false); source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString()); } if (session.isPrepared()) { diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeHandler.java index 3fbc4e990..c98c55de9 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeHandler.java @@ -121,8 +121,7 @@ public abstract class MultiNodeHandler implements ResponseHandler { session.closeAndClearResources(error); } else { session.getSource().setTxInterrupt(this.error); - // clear resources - clearResources(); + this.clearResources(); } } } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java index 1decff1a8..2324b33f4 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeQueryHandler.java @@ -18,17 +18,12 @@ import com.actiontech.dble.cache.LayerCachePool; import com.actiontech.dble.config.ErrorCode; import com.actiontech.dble.config.ServerConfig; import com.actiontech.dble.log.transaction.TxnLogHelper; -import com.actiontech.dble.memory.unsafe.row.UnsafeRow; import com.actiontech.dble.net.mysql.*; import com.actiontech.dble.route.RouteResultset; import com.actiontech.dble.route.RouteResultsetNode; import com.actiontech.dble.server.NonBlockingSession; import com.actiontech.dble.server.ServerConnection; import com.actiontech.dble.server.parser.ServerParse; -import com.actiontech.dble.sqlengine.mpp.AbstractDataNodeMerge; -import com.actiontech.dble.sqlengine.mpp.ColMeta; -import com.actiontech.dble.sqlengine.mpp.DataMergeService; -import com.actiontech.dble.sqlengine.mpp.DataNodeMergeManager; import com.actiontech.dble.statistic.stat.QueryResult; import com.actiontech.dble.statistic.stat.QueryResultDispatcher; import com.actiontech.dble.util.FormatUtil; @@ -36,39 +31,38 @@ import com.actiontech.dble.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.nio.ByteBuffer; -import java.util.*; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; /** * @author mycat */ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataResponseHandler { - private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeQueryHandler.class); - - private final RouteResultset rrs; - private final NonBlockingSession session; - private final AbstractDataNodeMerge dataMergeSvr; + protected final RouteResultset rrs; + protected final NonBlockingSession session; private final boolean sessionAutocommit; + protected long affectedRows; + protected long selectRows; + protected long startTime; + protected long netInBytes; + protected List errConnection; + protected long netOutBytes; + protected boolean prepared; + protected ErrorPacket err; + protected int fieldCount = 0; + protected volatile boolean fieldsReturned; + private long insertId; private String primaryKeyTable = null; private int primaryKeyIndex = -1; - private int fieldCount = 0; - private long affectedRows; - private long selectRows; - private long insertId; - private volatile boolean fieldsReturned; - private long startTime; - private long netInBytes; - private long netOutBytes; - private boolean prepared; private List fieldPackets = new ArrayList<>(); - private ErrorPacket err; - private List errConnection; private volatile ByteBuffer byteBuffer; private Set closedConnSet; - public MultiNodeQueryHandler(int sqlType, RouteResultset rrs, NonBlockingSession session) { + public MultiNodeQueryHandler(RouteResultset rrs, NonBlockingSession session) { super(session); if (rrs.getNodes() == null) { throw new IllegalArgumentException("routeNode is null!"); @@ -77,18 +71,8 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR LOGGER.debug("execute multi node query " + rrs.getStatement()); } this.rrs = rrs; - int isOffHeapUseOffHeapForMerge = DbleServer.getInstance().getConfig().getSystem().getUseOffHeapForMerge(); - if (ServerParse.SELECT == sqlType && rrs.needMerge()) { - if (isOffHeapUseOffHeapForMerge == 1) { - dataMergeSvr = new DataNodeMergeManager(this, rrs); - } else { - dataMergeSvr = new DataMergeService(this, rrs); - } - } else { - dataMergeSvr = null; - if (ServerParse.SELECT == sqlType) { - byteBuffer = session.getSource().allocate(); - } + if (ServerParse.SELECT == rrs.getSqlType()) { + byteBuffer = session.getSource().allocate(); } this.sessionAutocommit = session.getSource().isAutocommit(); this.session = session; @@ -282,11 +266,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR return; } fieldsReturned = true; - if (dataMergeSvr != null) { - mergeFieldEof(fields, eof); - } else { - executeFieldEof(header, fields, eof); - } + executeFieldEof(header, fields, eof); } catch (Exception e) { handleDataProcessException(e); } finally { @@ -348,44 +328,35 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR this.selectRows++; RouteResultsetNode rNode = (RouteResultsetNode) conn.getAttachment(); String dataNode = rNode.getName(); - if (dataMergeSvr != null) { - // even through discarding the all rest data, we can't - //close the connection for tx control such as rollback or commit. - // So the "isClosedByDiscard" variable is unnecessary. - // @author Uncle-pan - // @since 2016-03-25 - dataMergeSvr.onNewRecord(dataNode, row); - } else { - if (rrs.getLimitSize() >= 0) { - if (selectRows <= rrs.getLimitStart()) { - return false; - } else if (selectRows > (rrs.getLimitStart() < 0 ? 0 : rrs.getLimitStart()) + rrs.getLimitSize()) { - return false; - } + + if (rrs.getLimitSize() >= 0) { + if (selectRows <= rrs.getLimitStart() || + (selectRows > (rrs.getLimitStart() < 0 ? 0 : rrs.getLimitStart()) + rrs.getLimitSize())) { + return false; } - RowDataPacket rowDataPkg = null; - // cache primaryKey-> dataNode - if (primaryKeyIndex != -1) { + } + RowDataPacket rowDataPkg = null; + // cache primaryKey-> dataNode + if (primaryKeyIndex != -1) { + rowDataPkg = new RowDataPacket(fieldCount); + rowDataPkg.read(row); + String primaryKey = new String(rowDataPkg.fieldValues.get(primaryKeyIndex)); + LayerCachePool pool = DbleServer.getInstance().getRouterService().getTableId2DataNodeCache(); + if (pool != null) { + pool.putIfAbsent(primaryKeyTable, primaryKey, dataNode); + } + } + row[3] = ++packetId; + if (prepared) { + if (rowDataPkg == null) { rowDataPkg = new RowDataPacket(fieldCount); rowDataPkg.read(row); - String primaryKey = new String(rowDataPkg.fieldValues.get(primaryKeyIndex)); - LayerCachePool pool = DbleServer.getInstance().getRouterService().getTableId2DataNodeCache(); - if (pool != null) { - pool.putIfAbsent(primaryKeyTable, primaryKey, dataNode); - } - } - row[3] = ++packetId; - if (prepared) { - if (rowDataPkg == null) { - rowDataPkg = new RowDataPacket(fieldCount); - rowDataPkg.read(row); - } - BinaryRowDataPacket binRowDataPk = new BinaryRowDataPacket(); - binRowDataPk.read(fieldPackets, rowDataPkg); - binRowDataPk.write(byteBuffer, session.getSource(), true); - } else { - byteBuffer = session.getSource().writeToBuffer(row, byteBuffer); } + BinaryRowDataPacket binRowDataPk = new BinaryRowDataPacket(); + binRowDataPk.read(fieldPackets, rowDataPkg); + binRowDataPk.write(byteBuffer, session.getSource(), true); + } else { + byteBuffer = session.getSource().writeToBuffer(row, byteBuffer); } } catch (Exception e) { handleDataProcessException(e); @@ -400,14 +371,6 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR if (closedConnSet != null) { closedConnSet.clear(); } - lock.lock(); - try { - if (dataMergeSvr != null) { - dataMergeSvr.clear(); - } - } finally { - lock.unlock(); - } } @Override @@ -460,29 +423,20 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR } private void writeEofResult(byte[] eof, ServerConnection source) { - if (dataMergeSvr != null) { - try { - dataMergeSvr.outputMergeResult(session, eof); - } catch (Exception e) { - handleDataProcessException(e); - } - - } else { - lock.lock(); - try { - eof[3] = ++packetId; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("last packet id:" + packetId); - } - byteBuffer = source.writeToBuffer(eof, byteBuffer); - source.write(byteBuffer); - } finally { - lock.unlock(); + lock.lock(); + try { + eof[3] = ++packetId; + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("last packet id:" + packetId); } + byteBuffer = source.writeToBuffer(eof, byteBuffer); + source.write(byteBuffer); + } finally { + lock.unlock(); } } - private void doSqlStat(ServerConnection source) { + protected void doSqlStat(ServerConnection source) { if (DbleServer.getInstance().getConfig().getSystem().getUseSqlStat() == 1) { int resultSize = source.getWriteQueue().size() * DbleServer.getInstance().getConfig().getSystem().getBufferPoolPageSize(); if (rrs != null && rrs.getStatement() != null) { @@ -495,132 +449,6 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR } } - /** - * send the final result to the client - * - * @param source ServerConnection - * @param eof eof bytes - * @param iterator Iterator - */ - public void outputMergeResult(final ServerConnection source, final byte[] eof, Iterator iterator) { - lock.lock(); - try { - ByteBuffer buffer = session.getSource().allocate(); - final RouteResultset routeResultset = this.dataMergeSvr.getRrs(); - - /* cut the result for the limit statement */ - int start = routeResultset.getLimitStart(); - int end = start + routeResultset.getLimitSize(); - int index = 0; - - if (start < 0) - start = 0; - - if (routeResultset.getLimitSize() < 0) - end = Integer.MAX_VALUE; - - if (prepared) { - while (iterator.hasNext()) { - UnsafeRow row = iterator.next(); - if (index >= start) { - row.setPacketId(++packetId); - BinaryRowDataPacket binRowPacket = new BinaryRowDataPacket(); - binRowPacket.read(fieldPackets, row); - buffer = binRowPacket.write(buffer, source, true); - } - index++; - if (index == end) { - break; - } - } - } else { - while (iterator.hasNext()) { - UnsafeRow row = iterator.next(); - if (index >= start) { - row.setPacketId(++packetId); - buffer = row.write(buffer, source, true); - } - index++; - if (index == end) { - break; - } - } - } - - eof[3] = ++packetId; - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("last packet id:" + packetId); - } - source.write(source.writeToBuffer(eof, buffer)); - } catch (Exception e) { - handleDataProcessException(e); - } finally { - try { - dataMergeSvr.clear(); - } finally { - lock.unlock(); - } - } - } - - public void outputMergeResult(final ServerConnection source, - final byte[] eof, List results) { - lock.lock(); - try { - ByteBuffer buffer = session.getSource().allocate(); - final RouteResultset routeResultset = this.dataMergeSvr.getRrs(); - - int start = routeResultset.getLimitStart(); - int end = start + routeResultset.getLimitSize(); - - if (start < 0) { - start = 0; - } - - if (routeResultset.getLimitSize() < 0) { - end = results.size(); - } - - if (end > results.size()) { - end = results.size(); - } - - if (prepared) { - for (int i = start; i < end; i++) { - RowDataPacket row = results.get(i); - BinaryRowDataPacket binRowDataPk = new BinaryRowDataPacket(); - binRowDataPk.read(fieldPackets, row); - binRowDataPk.setPacketId(++packetId); - //binRowDataPk.write(source); - buffer = binRowDataPk.write(buffer, session.getSource(), true); - } - } else { - for (int i = start; i < end; i++) { - RowDataPacket row = results.get(i); - row.setPacketId(++packetId); - buffer = row.write(buffer, source, true); - } - } - - eof[3] = ++packetId; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("last packet id:" + packetId); - } - source.write(source.writeToBuffer(eof, buffer)); - - } catch (Exception e) { - handleDataProcessException(e); - } finally { - try { - dataMergeSvr.clear(); - } finally { - lock.unlock(); - } - } - } - - private void executeFieldEof(byte[] header, List fields, byte[] eof) { ServerConnection source = session.getSource(); fieldCount = fields.size(); @@ -662,45 +490,6 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR byteBuffer = source.writeToBuffer(eof, byteBuffer); } - private void mergeFieldEof(List fields, byte[] eof) throws IOException { - fieldCount = fields.size(); - ResultSetHeaderPacket packet = new ResultSetHeaderPacket(); - packet.setPacketId(++packetId); - packet.setFieldCount(fieldCount); - ServerConnection source = session.getSource(); - ByteBuffer buffer = source.allocate(); - buffer = packet.write(buffer, source, true); - - Map columnToIndex = new HashMap<>(fieldCount); - for (int i = 0, len = fieldCount; i < len; ++i) { - byte[] field = fields.get(i); - FieldPacket fieldPkg = new FieldPacket(); - fieldPkg.read(field); - if (rrs.getSchema() != null) { - fieldPkg.setDb(rrs.getSchema().getBytes()); - } - if (rrs.getTableAlias() != null) { - fieldPkg.setTable(rrs.getTableAlias().getBytes()); - } - if (rrs.getTable() != null) { - fieldPkg.setOrgTable(rrs.getTable().getBytes()); - } - fieldPackets.add(fieldPkg); - String fieldName = new String(fieldPkg.getName()).toUpperCase(); - if (!columnToIndex.containsKey(fieldName)) { - ColMeta colMeta = new ColMeta(i, fieldPkg.getType()); - colMeta.setDecimals(fieldPkg.getDecimals()); - columnToIndex.put(fieldName, colMeta); - } - fieldPkg.setPacketId(++packetId); - buffer = fieldPkg.write(buffer, source, false); - - } - eof[3] = ++packetId; - buffer = source.writeToBuffer(eof, buffer); - source.write(buffer); - dataMergeSvr.onRowMetaData(columnToIndex, fieldCount); - } public void handleDataProcessException(Exception e) { if (!errorResponse.get()) { @@ -733,7 +522,7 @@ public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataR this.prepared = prepared; } - private void handleEndPacket(byte[] data, AutoTxOperation txOperation, BackendConnection conn) { + protected void handleEndPacket(byte[] data, AutoTxOperation txOperation, BackendConnection conn) { ServerConnection source = session.getSource(); if (source.isAutocommit() && !source.isTxStart() && conn.isModifiedSQLExecuted()) { if (nodeCount < 0) { diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java new file mode 100644 index 000000000..0a1ddc4b3 --- /dev/null +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/MultiNodeSelectHandler.java @@ -0,0 +1,225 @@ +/* + * Copyright (C) 2016-2017 ActionTech. + * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. + */ +package com.actiontech.dble.backend.mysql.nio.handler; + +import com.actiontech.dble.DbleServer; +import com.actiontech.dble.backend.BackendConnection; +import com.actiontech.dble.backend.mysql.nio.MySQLConnection; +import com.actiontech.dble.backend.mysql.nio.handler.builder.BaseHandlerBuilder; +import com.actiontech.dble.backend.mysql.nio.handler.query.impl.OutputHandler; +import com.actiontech.dble.backend.mysql.nio.handler.transaction.AutoTxOperation; +import com.actiontech.dble.backend.mysql.nio.handler.util.ArrayMinHeap; +import com.actiontech.dble.backend.mysql.nio.handler.util.HeapItem; +import com.actiontech.dble.backend.mysql.nio.handler.util.RowDataComparator; +import com.actiontech.dble.net.mysql.FieldPacket; +import com.actiontech.dble.net.mysql.RowDataPacket; +import com.actiontech.dble.plan.Order; +import com.actiontech.dble.plan.common.item.ItemField; +import com.actiontech.dble.route.RouteResultset; +import com.actiontech.dble.server.NonBlockingSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +public class MultiNodeSelectHandler extends MultiNodeQueryHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeSelectHandler.class); + private final int queueSize; + private Map> queues; + private RowDataComparator rowComparator; + private OutputHandler outputHandler; + private volatile boolean noNeedRows = false; + + public MultiNodeSelectHandler(RouteResultset rrs, NonBlockingSession session) { + super(rrs, session); + this.queueSize = DbleServer.getInstance().getConfig().getSystem().getMergeQueueSize(); + this.queues = new ConcurrentHashMap<>(); + outputHandler = new OutputHandler(BaseHandlerBuilder.getSequenceId(), session, false); + } + + @Override + public void okResponse(byte[] data, BackendConnection conn) { + this.netOutBytes += data.length; + boolean executeResponse = conn.syncAndExecute(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("received ok response ,executeResponse:" + executeResponse + " from " + conn); + } + if (executeResponse) { + String reason = "unexpected okResponse"; + LOGGER.warn(reason); + } + } + + @Override + public void fieldEofResponse(byte[] header, List fields, List fieldPacketsNull, byte[] eof, + boolean isLeft, BackendConnection conn) { + this.netOutBytes += header.length; + this.netOutBytes += eof.length; + queues.put(conn, new LinkedBlockingQueue(queueSize)); + lock.lock(); + try { + if (isFail()) { + handleEndPacket(err.toBytes(), AutoTxOperation.ROLLBACK, conn); + } else { + if (!fieldsReturned) { + fieldsReturned = true; + mergeFieldEof(fields, conn); + } + if (--nodeCount > 0) { + return; + } + startOwnThread(); + } + } catch (Exception e) { + handleDataProcessException(e); + } finally { + lock.unlock(); + } + } + + @Override + public void rowEofResponse(final byte[] eof, boolean isLeft, BackendConnection conn) { + BlockingQueue queue = queues.get(conn); + if (queue == null) + return; + try { + queue.put(HeapItem.nullItem()); + } catch (InterruptedException e) { + LOGGER.warn("rowEofResponse error", e); + } + } + + @Override + public boolean rowResponse(final byte[] row, RowDataPacket rowPacketNull, boolean isLeft, BackendConnection conn) { + if (errorResponse.get() || noNeedRows) { + return true; + } + BlockingQueue queue = queues.get(conn); + if (queue == null) + return true; + RowDataPacket rp = new RowDataPacket(fieldCount); + rp.read(row); + HeapItem item = new HeapItem(row, rp, (MySQLConnection) conn); + try { + queue.put(item); + } catch (InterruptedException e) { + LOGGER.warn("rowResponse error", e); + } + return false; + } + + @Override + public void writeQueueAvailable() { + } + + private void mergeFieldEof(List fields, BackendConnection conn) throws IOException { + fieldCount = fields.size(); + List fieldPackets = new ArrayList<>(); + for (byte[] field : fields) { + this.netOutBytes += field.length; + FieldPacket fieldPacket = new FieldPacket(); + fieldPacket.read(field); + if (rrs.getSchema() != null) { + fieldPacket.setDb(rrs.getSchema().getBytes()); + } + if (rrs.getTableAlias() != null) { + fieldPacket.setTable(rrs.getTableAlias().getBytes()); + } + if (rrs.getTable() != null) { + fieldPacket.setOrgTable(rrs.getTable().getBytes()); + } + fieldPackets.add(fieldPacket); + } + List orderBys = new ArrayList<>(); + for (String groupBy : rrs.getGroupByCols()) { + ItemField itemField = new ItemField(rrs.getSchema(), rrs.getTableAlias(), groupBy); + orderBys.add(new Order(itemField)); + } + rowComparator = new RowDataComparator(fieldPackets, orderBys); + outputHandler.fieldEofResponse(null, null, fieldPackets, null, false, conn); + } + + private void startOwnThread() { + DbleServer.getInstance().getComplexQueryExecutor().execute(new Runnable() { + @Override + public void run() { + ownThreadJob(); + } + }); + } + + private void ownThreadJob() { + try { + ArrayMinHeap heap = new ArrayMinHeap<>(new Comparator() { + @Override + public int compare(HeapItem o1, HeapItem o2) { + RowDataPacket row1 = o1.getRowPacket(); + RowDataPacket row2 = o2.getRowPacket(); + if (row1 == null || row2 == null) { + if (row1 == row2) + return 0; + if (row1 == null) + return -1; + return 1; + } + return rowComparator.compare(row1, row2); + } + }); + // init heap + for (Map.Entry> entry : queues.entrySet()) { + HeapItem firstItem = entry.getValue().take(); + heap.add(firstItem); + } + while (!heap.isEmpty()) { + if (isFail()) + return; + HeapItem top = heap.peak(); + if (top.isNullItem()) { + heap.poll(); + } else { + BlockingQueue topItemQueue = queues.get(top.getIndex()); + HeapItem item = topItemQueue.take(); + heap.replaceTop(item); + //limit + this.selectRows++; + if (rrs.getLimitSize() >= 0) { + if (selectRows <= rrs.getLimitStart()) { + continue; + } else if (selectRows > (rrs.getLimitStart() < 0 ? 0 : rrs.getLimitStart()) + rrs.getLimitSize()) { + noNeedRows = true; + while (!heap.isEmpty()) { + HeapItem itemToDiscard = heap.poll(); + if (!itemToDiscard.isNullItem()) { + BlockingQueue discardQueue = queues.get(itemToDiscard.getIndex()); + boolean isClear = false; + while (!isClear) { + if (discardQueue.take().isNullItem() || isFail()) { + isClear = true; + } + } + } + } + continue; + } + } + outputHandler.rowResponse(top.getRowData(), top.getRowPacket(), false, top.getIndex()); + } + } + outputHandler.rowEofResponse(null, false, queues.keySet().iterator().next()); + doSqlStat(session.getSource()); + } catch (Exception e) { + String msg = "Merge thread error, " + e.getLocalizedMessage(); + LOGGER.warn(msg, e); + session.onQueryError(msg.getBytes()); + } + } +} diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeHandler.java index ae55c9ae8..4b5616e78 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/SingleNodeHandler.java @@ -134,7 +134,7 @@ public class SingleNodeHandler implements ResponseHandler, LoadDataResponseHandl int errPort = source.getLocalPort(); String errMsg = " errNo:" + errPkg.getErrNo() + " " + new String(errPkg.getMessage()); - LOGGER.warn("execute sql err :" + errMsg + " con:" + conn + + LOGGER.warn("execute sql err :" + errMsg + " con:" + conn + " frontend host:" + errHost + "/" + errPort + "/" + errUser); session.releaseConnectionIfSafe(conn, false); @@ -358,10 +358,6 @@ public class SingleNodeHandler implements ResponseHandler, LoadDataResponseHandl session.getSource().close(reason); } - public void clearResources() { - - } - @Override public void requestDataResponse(byte[] data, BackendConnection conn) { LoadDataUtil.requestFileDataResponse(data, conn); diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/BaseHandlerBuilder.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/BaseHandlerBuilder.java index 1d9d8ef7a..355be286a 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/BaseHandlerBuilder.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/builder/BaseHandlerBuilder.java @@ -394,7 +394,7 @@ public abstract class BaseHandlerBuilder { return (arg.getSumFuncs().size() > 0 || arg.getGroupBys().size() > 0); } - protected static long getSequenceId() { + public static long getSequenceId() { return sequenceId.incrementAndGet(); } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java index e84c20623..0c1ac57d8 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/query/impl/MultiNodeMergeHandler.java @@ -47,6 +47,7 @@ public class MultiNodeMergeHandler extends OwnThreadDMLHandler { private RouteResultsetNode[] route; private int reachedConCount; private boolean isEasyMerge; + private volatile boolean noNeedRows = false; public MultiNodeMergeHandler(long id, RouteResultsetNode[] route, boolean autocommit, NonBlockingSession session, List orderBys) { @@ -119,7 +120,7 @@ public class MultiNodeMergeHandler extends OwnThreadDMLHandler { @Override public boolean rowResponse(byte[] row, RowDataPacket rowPacket, boolean isLeft, BackendConnection conn) { - if (terminate.get()) + if (terminate.get() || noNeedRows) return true; if (isEasyMerge) { @@ -190,7 +191,6 @@ public class MultiNodeMergeHandler extends OwnThreadDMLHandler { HeapItem firstItem = entry.getValue().take(); heap.add(firstItem); } - boolean filterFinished = false; while (!heap.isEmpty()) { if (terminate.get()) return; @@ -201,11 +201,20 @@ public class MultiNodeMergeHandler extends OwnThreadDMLHandler { BlockingQueue topItemQueue = queues.get(top.getIndex()); HeapItem item = topItemQueue.take(); heap.replaceTop(item); - if (filterFinished) { - continue; - } if (nextHandler.rowResponse(top.getRowData(), top.getRowPacket(), this.isLeft, top.getIndex())) { - filterFinished = true; + noNeedRows = true; + while (!heap.isEmpty()) { + HeapItem itemToDiscard = heap.poll(); + if (!itemToDiscard.isNullItem()) { + BlockingQueue discardQueue = queues.get(itemToDiscard.getIndex()); + boolean isClear = false; + while (!isClear) { + if (discardQueue.take().isNullItem() || terminate.get()) { + isClear = true; + } + } + } + } } } } diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/util/RowDataComparator.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/util/RowDataComparator.java index 43805b9bc..419ba3969 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/util/RowDataComparator.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/util/RowDataComparator.java @@ -45,6 +45,23 @@ public class RowDataComparator implements Comparator { } } + public RowDataComparator(List fps, List orders) { + sourceFields = HandlerTool.createFields(fps); + if (orders != null && orders.size() > 0) { + ascList = new ArrayList<>(); + cmpFields = new ArrayList<>(); + cmpItems = new ArrayList<>(); + for (Order order : orders) { + Item cmpItem = HandlerTool.createFieldItem(order.getItem(), sourceFields, 0); + cmpItems.add(cmpItem); + FieldPacket tmpFp = new FieldPacket(); + cmpItem.makeField(tmpFp); + Field cmpField = HandlerTool.createField(tmpFp); + cmpFields.add(cmpField); + ascList.add(order.getSortOrder() == SQLOrderingSpecification.ASC); + } + } + } public void sort(List rows) { Comparator c = new Comparator() { diff --git a/src/main/java/com/actiontech/dble/backend/mysql/store/GroupByLocalResult.java b/src/main/java/com/actiontech/dble/backend/mysql/store/GroupByLocalResult.java index 92fc05d4d..81bcf3465 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/store/GroupByLocalResult.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/store/GroupByLocalResult.java @@ -102,7 +102,7 @@ public class GroupByLocalResult extends LocalResult { if (!bufferMC.addSize(incrementSize)) { needFlush = true; } - } else if (!needFlush && currentMemory > maxMemory) { + } else if (currentMemory > maxMemory) { needFlush = true; } if (needFlush) { diff --git a/src/main/java/com/actiontech/dble/backend/mysql/store/LocalResult.java b/src/main/java/com/actiontech/dble/backend/mysql/store/LocalResult.java index 0ee68d683..88db1eb75 100644 --- a/src/main/java/com/actiontech/dble/backend/mysql/store/LocalResult.java +++ b/src/main/java/com/actiontech/dble/backend/mysql/store/LocalResult.java @@ -65,7 +65,7 @@ public abstract class LocalResult implements ResultStore { if (!bufferMC.addSize(incrementSize)) { needFlush = true; } - } else if (!needFlush && currentMemory > maxMemory) { + } else if (currentMemory > maxMemory) { needFlush = true; } if (needFlush) { diff --git a/src/main/java/com/actiontech/dble/route/RouteResultset.java b/src/main/java/com/actiontech/dble/route/RouteResultset.java index 92b956c0b..e7422951f 100644 --- a/src/main/java/com/actiontech/dble/route/RouteResultset.java +++ b/src/main/java/com/actiontech/dble/route/RouteResultset.java @@ -28,12 +28,11 @@ public final class RouteResultset implements Serializable { private boolean needOptimizer; private int limitStart; private boolean cacheAble; - // used to store table's ID->datanodes cache + // used to store table's ID->data nodes cache // format is table.primaryKey private String primaryKey; // limit output total private int limitSize; - private SQLMerge sqlMerge; private boolean callStatement = false; // is Call Statement @@ -55,7 +54,15 @@ public final class RouteResultset implements Serializable { // if force master,set canRunInReadDB=false // if force slave set runOnSlave,default null means not effect private Boolean runOnSlave = null; + private String[] groupByCols; + public String[] getGroupByCols() { + return groupByCols; + } + + public void setGroupByCols(String[] groupByCols) { + this.groupByCols = groupByCols; + } public boolean isNeedOptimizer() { return needOptimizer; } @@ -135,10 +142,6 @@ public final class RouteResultset implements Serializable { } - public SQLMerge getSqlMerge() { - return sqlMerge; - } - public boolean isCacheAble() { return cacheAble; } @@ -147,10 +150,6 @@ public final class RouteResultset implements Serializable { this.cacheAble = cacheAble; } - public boolean needMerge() { - return sqlMerge != null; - } - public int getSqlType() { return sqlType; } @@ -159,17 +158,6 @@ public final class RouteResultset implements Serializable { return limitStart; } - public String[] getGroupByCols() { - return (sqlMerge != null) ? sqlMerge.getGroupByCols() : null; - } - - private SQLMerge createSQLMergeIfNull() { - if (sqlMerge == null) { - sqlMerge = new SQLMerge(); - } - return sqlMerge; - } - public void setLimitStart(int limitStart) { this.limitStart = limitStart; } @@ -185,7 +173,7 @@ public final class RouteResultset implements Serializable { public void setPrimaryKey(String primaryKey) { if (!primaryKey.contains(".")) { throw new java.lang.IllegalArgumentException( - "must be table.primarykey fomat :" + primaryKey); + "must be table.primaryKey format :" + primaryKey); } this.primaryKey = primaryKey; } @@ -193,18 +181,11 @@ public final class RouteResultset implements Serializable { /** * return primary key items ,first is table name ,seconds is primary key * - * @return */ public String[] getPrimaryKeyItems() { return primaryKey.split("\\."); } - public void setGroupByCols(String[] groupByCols) { - if (groupByCols != null && groupByCols.length > 0) { - createSQLMergeIfNull().setGroupByCols(groupByCols); - } - } - public void setSrcStatement(String srcStatement) { this.srcStatement = srcStatement; } diff --git a/src/main/java/com/actiontech/dble/route/SQLMerge.java b/src/main/java/com/actiontech/dble/route/SQLMerge.java deleted file mode 100644 index 929afbc1b..000000000 --- a/src/main/java/com/actiontech/dble/route/SQLMerge.java +++ /dev/null @@ -1,20 +0,0 @@ -/* -* Copyright (C) 2016-2017 ActionTech. -* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. -* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. -*/ -package com.actiontech.dble.route; - -import java.io.Serializable; - -public class SQLMerge implements Serializable { - private String[] groupByCols; - - public String[] getGroupByCols() { - return groupByCols; - } - - public void setGroupByCols(String[] groupByCols) { - this.groupByCols = groupByCols; - } -} diff --git a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidInsertParser.java b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidInsertParser.java index cee935395..0876a5439 100644 --- a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidInsertParser.java +++ b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidInsertParser.java @@ -174,7 +174,7 @@ public class DruidInsertParser extends DruidInsertReplaceParser { LOGGER.debug("found partition node for child table to insert " + dn + " sql :" + sql); } RouterUtil.routeToSingleNode(rrs, dn); - sc.getSession2().execute(rrs, ServerParse.INSERT); + sc.getSession2().execute(rrs); } }); } diff --git a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidReplaceParser.java b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidReplaceParser.java index e9cc63ae6..30a22bdc0 100644 --- a/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidReplaceParser.java +++ b/src/main/java/com/actiontech/dble/route/parser/druid/impl/DruidReplaceParser.java @@ -328,7 +328,7 @@ public class DruidReplaceParser extends DruidInsertReplaceParser { LOGGER.debug("found partition node for child table to insert " + dn + " sql :" + sql); } RouterUtil.routeToSingleNode(rrs, dn); - sc.getSession2().execute(rrs, ServerParse.REPLACE); + sc.getSession2().execute(rrs); } }); } diff --git a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java index 07f6a1e8d..05619f7d0 100644 --- a/src/main/java/com/actiontech/dble/server/NonBlockingSession.java +++ b/src/main/java/com/actiontech/dble/server/NonBlockingSession.java @@ -56,15 +56,11 @@ public class NonBlockingSession implements Session { public static final Logger LOGGER = LoggerFactory.getLogger(NonBlockingSession.class); public static final int CANCEL_STATUS_INIT = 0; public static final int CANCEL_STATUS_COMMITTING = 1; - public static final int CANCEL_STATUS_CANCELING = 2; + static final int CANCEL_STATUS_CANCELING = 2; private final ServerConnection source; private final ConcurrentMap target; - // life-cycle: each sql execution - private volatile SingleNodeHandler singleNodeHandler; - private volatile MultiNodeQueryHandler multiNodeHandler; - private volatile MultiNodeDdlHandler multiNodeDdlHandler; private RollbackNodesHandler rollbackHandler; private CommitNodesHandler commitHandler; private volatile String xaTxId; @@ -141,7 +137,7 @@ public class NonBlockingSession implements Session { } @Override - public void execute(RouteResultset rrs, int type) { + public void execute(RouteResultset rrs) { // clear prev execute resources clearHandlesResources(); if (LOGGER.isDebugEnabled()) { @@ -166,7 +162,7 @@ public class NonBlockingSession implements Session { this.xaState = TxState.TX_STARTED_STATE; } if (nodes.length == 1) { - singleNodeHandler = new SingleNodeHandler(rrs, this); + SingleNodeHandler singleNodeHandler = new SingleNodeHandler(rrs, this); if (this.isPrepared()) { singleNodeHandler.setPrepared(true); } @@ -181,39 +177,56 @@ public class NonBlockingSession implements Session { this.setPrepared(false); } } else { - if (rrs.getSqlType() != ServerParse.DDL) { - /** - * here, just a try! The sync is the superfluous, because there are hearbeats at every backend node. - * We don't do 2pc or 3pc. Beause mysql(that is, resource manager) don't support that for ddl statements. - */ - multiNodeHandler = new MultiNodeQueryHandler(type, rrs, this); - if (this.isPrepared()) { - multiNodeHandler.setPrepared(true); - } - try { - multiNodeHandler.execute(); - } catch (Exception e) { - handleSpecial(rrs, source.getSchema(), false); - LOGGER.warn(String.valueOf(source) + rrs, e); - source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString()); - } - if (this.isPrepared()) { - this.setPrepared(false); - } - } else { - checkBackupStatus(); - multiNodeDdlHandler = new MultiNodeDdlHandler(type, rrs, this); - try { - multiNodeDdlHandler.execute(); - } catch (Exception e) { - LOGGER.warn(String.valueOf(source) + rrs, e); - source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString()); - } + executeMultiResultSet(rrs); + } + } + + private void executeMultiResultSet(RouteResultset rrs) { + if (rrs.getSqlType() == ServerParse.DDL) { + /* + * here, just a try! The sync is the superfluous, because there are heartbeats at every backend node. + * We don't do 2pc or 3pc. Because mysql(that is, resource manager) don't support that for ddl statements. + */ + checkBackupStatus(); + MultiNodeDdlHandler multiNodeDdlHandler = new MultiNodeDdlHandler(rrs, this); + try { + multiNodeDdlHandler.execute(); + } catch (Exception e) { + LOGGER.warn(String.valueOf(source) + rrs, e); + source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString()); + } + } else if (ServerParse.SELECT == rrs.getSqlType() && rrs.getGroupByCols() != null) { + MultiNodeSelectHandler multiNodeSelectHandler = new MultiNodeSelectHandler(rrs, this); + if (this.isPrepared()) { + multiNodeSelectHandler.setPrepared(true); + } + try { + multiNodeSelectHandler.execute(); + } catch (Exception e) { + LOGGER.warn(String.valueOf(source) + rrs, e); + source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString()); + } + if (this.isPrepared()) { + this.setPrepared(false); + } + } else { + MultiNodeQueryHandler multiNodeHandler = new MultiNodeQueryHandler(rrs, this); + if (this.isPrepared()) { + multiNodeHandler.setPrepared(true); + } + try { + multiNodeHandler.execute(); + } catch (Exception e) { + LOGGER.warn(String.valueOf(source) + rrs, e); + source.writeErrMessage(ErrorCode.ERR_HANDLE_DATA, e.toString()); + } + if (this.isPrepared()) { + this.setPrepared(false); } } } - public void execute(PlanNode node) { + private void executeMultiResultSet(PlanNode node) { init(); HandlerBuilder builder = new HandlerBuilder(node, this); try { @@ -256,11 +269,11 @@ public class NonBlockingSession implements Session { //sub Query build will be blocked, so use ComplexQueryExecutor @Override public void run() { - execute(finalNode); + executeMultiResultSet(finalNode); } }); } else { - execute(node); + executeMultiResultSet(node); } } @@ -288,7 +301,7 @@ public class NonBlockingSession implements Session { } } - private CommitNodesHandler createCommitNodesHandler() { + private void resetCommitNodesHandler() { if (commitHandler == null) { if (this.getSessionXaID() == null) { commitHandler = new NormalCommitNodesHandler(this); @@ -303,7 +316,6 @@ public class NonBlockingSession implements Session { commitHandler = new XACommitNodesHandler(this); } } - return commitHandler; } public void commit() { @@ -316,7 +328,7 @@ public class NonBlockingSession implements Session { return; } checkBackupStatus(); - createCommitNodesHandler(); + resetCommitNodesHandler(); commitHandler.commit(); } @@ -327,7 +339,7 @@ public class NonBlockingSession implements Session { needWaitFinished = true; } - private RollbackNodesHandler createRollbackNodesHandler() { + private void resetRollbackNodesHandler() { if (rollbackHandler == null) { if (this.getSessionXaID() == null) { rollbackHandler = new NormalRollbackNodesHandler(this); @@ -342,7 +354,6 @@ public class NonBlockingSession implements Session { rollbackHandler = new XARollbackNodesHandler(this); } } - return rollbackHandler; } public void rollback() { @@ -357,7 +368,7 @@ public class NonBlockingSession implements Session { source.write(buffer); return; } - createRollbackNodesHandler(); + resetRollbackNodesHandler(); rollbackHandler.rollback(); } @@ -409,7 +420,7 @@ public class NonBlockingSession implements Session { /** * Only used when kill @@connection is Issued */ - public void initiativeTerminate() { + void initiativeTerminate() { for (BackendConnection node : target.values()) { node.terminate("client closed "); @@ -489,10 +500,7 @@ public class NonBlockingSession implements Session { /** * @return previous bound connection */ - public BackendConnection bindConnection(RouteResultsetNode key, - BackendConnection conn) { - // System.out.println("bind connection "+conn+ - // " to key "+key.getName()+" on sesion "+this); + public BackendConnection bindConnection(RouteResultsetNode key, BackendConnection conn) { return target.put(key, conn); } @@ -562,23 +570,6 @@ public class NonBlockingSession implements Session { } private void clearHandlesResources() { - SingleNodeHandler singleHandler = singleNodeHandler; - if (singleHandler != null) { - singleHandler.clearResources(); - singleNodeHandler = null; - } - - MultiNodeDdlHandler multiDdlHandler = multiNodeDdlHandler; - if (multiDdlHandler != null) { - multiDdlHandler.clearResources(); - multiNodeDdlHandler = null; - } - - MultiNodeQueryHandler multiHandler = multiNodeHandler; - if (multiHandler != null) { - multiHandler.clearResources(); - multiNodeHandler = null; - } if (rollbackHandler != null) { rollbackHandler.clearResources(); } diff --git a/src/main/java/com/actiontech/dble/server/ServerConnection.java b/src/main/java/com/actiontech/dble/server/ServerConnection.java index 8394baee8..199f112ba 100644 --- a/src/main/java/com/actiontech/dble/server/ServerConnection.java +++ b/src/main/java/com/actiontech/dble/server/ServerConnection.java @@ -294,7 +294,7 @@ public class ServerConnection extends FrontendConnection { } RouterUtil.routeToRandomNode(rrs, schemaInfo.getSchemaConfig(), schemaInfo.getTable()); } - session.execute(rrs, sqlType); + session.execute(rrs); } catch (Exception e) { executeException(e, stmt); } @@ -320,7 +320,7 @@ public class ServerConnection extends FrontendConnection { executeException(e, sql); return; } - session.execute(rrs, type); + session.execute(rrs); } private void addTableMetaLock(RouteResultset rrs) throws SQLNonTransientException { diff --git a/src/main/java/com/actiontech/dble/server/Session.java b/src/main/java/com/actiontech/dble/server/Session.java index b35239780..ceba2720e 100644 --- a/src/main/java/com/actiontech/dble/server/Session.java +++ b/src/main/java/com/actiontech/dble/server/Session.java @@ -26,7 +26,7 @@ public interface Session { /** * execute session */ - void execute(RouteResultset rrs, int type); + void execute(RouteResultset rrs); /** * commit session diff --git a/src/main/java/com/actiontech/dble/server/handler/ServerLoadDataInfileHandler.java b/src/main/java/com/actiontech/dble/server/handler/ServerLoadDataInfileHandler.java index b6479ede5..afe92dcab 100644 --- a/src/main/java/com/actiontech/dble/server/handler/ServerLoadDataInfileHandler.java +++ b/src/main/java/com/actiontech/dble/server/handler/ServerLoadDataInfileHandler.java @@ -185,7 +185,7 @@ public final class ServerLoadDataInfileHandler implements LoadDataInfileHandler if (rrs != null) { flushDataToFile(); isStartLoadData = false; - serverConnection.getSession2().execute(rrs, ServerParse.LOAD_DATA_INFILE_SQL); + serverConnection.getSession2().execute(rrs); } } } @@ -581,7 +581,7 @@ public final class ServerLoadDataInfileHandler implements LoadDataInfileHandler RouteResultset rrs = buildResultSet(routeResultMap); if (rrs != null) { flushDataToFile(); - serverConnection.getSession2().execute(rrs, ServerParse.LOAD_DATA_INFILE_SQL); + serverConnection.getSession2().execute(rrs); } // sendOk(++packID); diff --git a/src/main/java/com/actiontech/dble/sqlengine/mpp/AbstractDataNodeMerge.java b/src/main/java/com/actiontech/dble/sqlengine/mpp/AbstractDataNodeMerge.java deleted file mode 100644 index b3023068e..000000000 --- a/src/main/java/com/actiontech/dble/sqlengine/mpp/AbstractDataNodeMerge.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright (C) 2016-2017 ActionTech. - * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. - */ - -package com.actiontech.dble.sqlengine.mpp; - -import com.actiontech.dble.DbleServer; -import com.actiontech.dble.backend.mysql.nio.handler.MultiNodeQueryHandler; -import com.actiontech.dble.net.mysql.RowDataPacket; -import com.actiontech.dble.route.RouteResultset; -import com.actiontech.dble.server.NonBlockingSession; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * Created by zagnix on 2016/7/6. - */ -public abstract class AbstractDataNodeMerge implements Runnable { - - private static final Logger LOGGER = Logger.getLogger(AbstractDataNodeMerge.class); - /** - * col size - */ - protected int fieldCount; - - /** - * cache the router - */ - protected final RouteResultset rrs; - protected MultiNodeQueryHandler multiQueryHandler = null; - /** - * the end packet - */ - protected PackWraper endFlagPack = new PackWraper(); - - - /** - * rowData queue - */ - protected BlockingQueue packs = new LinkedBlockingQueue<>(); - - /** - * the merge thread is running - */ - protected final AtomicBoolean running = new AtomicBoolean(false); - - public AbstractDataNodeMerge(MultiNodeQueryHandler handler, RouteResultset rrs) { - this.rrs = rrs; - this.multiQueryHandler = handler; - } - - - /** - * Add a row pack, and may be wake up a business thread to work if not running. - * - * @param pack row pack - * @return true wake up a business thread, otherwise false - * @author Uncle-pan - * @since 2016-03-23 - */ - protected final boolean addPack(final PackWraper pack) { - packs.add(pack); - if (running.get()) { - return false; - } - final DbleServer server = DbleServer.getInstance(); - server.getBusinessExecutor().execute(this); - return true; - } - - /** - * PackWraper the new row data - * process new record (mysql binary data),if data can output to client - * ,return true - * - * @param dataNode DN's name (data from this dataNode) - * @param rowData raw data - */ - public boolean onNewRecord(String dataNode, byte[] rowData) { - final PackWraper data = new PackWraper(); - data.setDataNode(dataNode); - data.setRowData(rowData); - addPack(data); - - return false; - } - - - /** - * get the index array of row according map - * - * @param columns - * @param toIndexMap - * @return - */ - protected static int[] toColumnIndex(String[] columns, Map toIndexMap) { - int[] result = new int[columns.length]; - ColMeta curColMeta; - for (int i = 0; i < columns.length; i++) { - curColMeta = toIndexMap.get(columns[i].toUpperCase()); - if (curColMeta == null) { - throw new IllegalArgumentException( - "all columns in group by clause should be in the selected column list.!" + columns[i]); - } - result[i] = curColMeta.getColIndex(); - } - return result; - } - - @Override - public abstract void run(); - - public abstract void onRowMetaData(Map columnToIndex, int fieldSize) throws IOException; - - public void outputMergeResult(NonBlockingSession session, byte[] eof) { - addPack(endFlagPack); - } - - public RouteResultset getRrs() { - return this.rrs; - } - - /** - * @return (i*(offset+size) row) - */ - public abstract List getResults(byte[] eof); - - public abstract void clear(); - -} diff --git a/src/main/java/com/actiontech/dble/sqlengine/mpp/DataMergeService.java b/src/main/java/com/actiontech/dble/sqlengine/mpp/DataMergeService.java deleted file mode 100644 index 55cbea29b..000000000 --- a/src/main/java/com/actiontech/dble/sqlengine/mpp/DataMergeService.java +++ /dev/null @@ -1,178 +0,0 @@ -/* - * Copyright (C) 2016-2017 ActionTech. - * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. - */ - -package com.actiontech.dble.sqlengine.mpp; - -import com.actiontech.dble.backend.mysql.BufferUtil; -import com.actiontech.dble.backend.mysql.nio.handler.MultiNodeQueryHandler; -import com.actiontech.dble.net.mysql.EOFPacket; -import com.actiontech.dble.net.mysql.RowDataPacket; -import com.actiontech.dble.route.RouteResultset; -import com.actiontech.dble.route.RouteResultsetNode; -import com.actiontech.dble.server.ServerConnection; -import org.apache.log4j.Logger; - -import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -/* -* Copyright (C) 2016-2017 ActionTech. -* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. -* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. -*/ - -/** - * Data merge service handle data Min,Max,AVG group . order by . limit - * - * @author wuzhih /modify by coder_czp/2015/11/2 - *

- * Fixbug: sql timeout and hang problem. - * @author Uncle-pan - * @since 2016-03-23 - */ -public class DataMergeService extends AbstractDataNodeMerge { - - private RowDataPacketGrouper grouper; - private Map> result = new HashMap<>(); - private static final Logger LOGGER = Logger.getLogger(DataMergeService.class); - - public DataMergeService(MultiNodeQueryHandler handler, RouteResultset rrs) { - super(handler, rrs); - - for (RouteResultsetNode node : rrs.getNodes()) { - result.put(node.getName(), new LinkedList()); - } - } - - - /** - * @param columnToIndex columnToIndex - * @param fieldSize fieldSize - */ - public void onRowMetaData(Map columnToIndex, int fieldSize) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("field metadata keys:" + columnToIndex.keySet()); - LOGGER.debug("field metadata values:" + columnToIndex.values()); - } - - int[] groupColumnIndexes = null; - this.fieldCount = fieldSize; - - if (rrs.getGroupByCols() != null) { - - groupColumnIndexes = toColumnIndex(rrs.getGroupByCols(), columnToIndex); - } - grouper = new RowDataPacketGrouper(groupColumnIndexes); - } - - - /** - * release resources - */ - public void clear() { - result.clear(); - grouper = null; - } - - @Override - public void run() { - // sort-or-group: no need for us to using multi-threads, because - //both sorter and group are synchronized!! - // @author Uncle-pan - // @since 2016-03-23 - if (!running.compareAndSet(false, true)) { - return; - } - // eof handler has been placed to "if (pack == END_FLAG_PACK){}" in for-statement - // @author Uncle-pan - // @since 2016-03-23 - boolean nulPack = false; - try { - // loop-on-packs - for (; ; ) { - final PackWraper pack = packs.poll(); - // async: handling row pack queue, this business thread should exit when no pack - // @author Uncle-pan - // @since 2016-03-23 - if (pack == null) { - nulPack = true; - break; - } - // eof: handling eof pack and exit - if (pack == endFlagPack) { - - - final int warningCount = 0; - final EOFPacket eofPacket = new EOFPacket(); - final ByteBuffer eof = ByteBuffer.allocate(9); - BufferUtil.writeUB3(eof, eofPacket.calcPacketSize()); - eof.put(eofPacket.getPacketId()); - eof.put(eofPacket.getFieldCount()); - BufferUtil.writeUB2(eof, warningCount); - BufferUtil.writeUB2(eof, eofPacket.getStatus()); - final ServerConnection source = multiQueryHandler.getSession().getSource(); - final byte[] array = eof.array(); - multiQueryHandler.outputMergeResult(source, array, getResults(array)); - break; - } - - - // merge: sort-or-group, or simple add - final RowDataPacket row = new RowDataPacket(fieldCount); - row.read(pack.getRowData()); - - if (grouper != null) { - grouper.addRow(row); - } else { - result.get(pack.getDataNode()).add(row); - } - } // rof - } catch (final Exception e) { - multiQueryHandler.handleDataProcessException(e); - } finally { - running.set(false); - } - // try to check packs, it's possible that adding a pack after polling a null pack - //and before this time pointer!! - // @author Uncle-pan - // @since 2016-03-23 - if (nulPack && !packs.isEmpty()) { - this.run(); - } - } - - - /** - * return merged data (i * (offset + size) rows at most) - * - * @return list - */ - public List getResults(byte[] eof) { - - List tmpResult = null; - - if (this.grouper != null) { - tmpResult = grouper.getResult(); - grouper = null; - } - - //no grouper and sorter - if (tmpResult == null) { - tmpResult = new LinkedList<>(); - for (RouteResultsetNode node : rrs.getNodes()) { - tmpResult.addAll(result.get(node.getName())); - } - } - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("prepare mpp merge result for " + rrs.getStatement()); - } - return tmpResult; - } -} - diff --git a/src/main/java/com/actiontech/dble/sqlengine/mpp/DataNodeMergeManager.java b/src/main/java/com/actiontech/dble/sqlengine/mpp/DataNodeMergeManager.java deleted file mode 100644 index 8e5abb36a..000000000 --- a/src/main/java/com/actiontech/dble/sqlengine/mpp/DataNodeMergeManager.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Copyright (C) 2016-2017 ActionTech. - * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. - */ - -package com.actiontech.dble.sqlengine.mpp; - -import com.actiontech.dble.DbleServer; -import com.actiontech.dble.backend.mysql.BufferUtil; -import com.actiontech.dble.backend.mysql.MySQLMessage; -import com.actiontech.dble.backend.mysql.nio.handler.MultiNodeQueryHandler; -import com.actiontech.dble.memory.SeverMemory; -import com.actiontech.dble.memory.unsafe.memory.mm.DataNodeMemoryManager; -import com.actiontech.dble.memory.unsafe.memory.mm.MemoryManager; -import com.actiontech.dble.memory.unsafe.row.BufferHolder; -import com.actiontech.dble.memory.unsafe.row.StructType; -import com.actiontech.dble.memory.unsafe.row.UnsafeRow; -import com.actiontech.dble.memory.unsafe.row.UnsafeRowWriter; -import com.actiontech.dble.memory.unsafe.utils.ServerPropertyConf; -import com.actiontech.dble.memory.unsafe.utils.sort.PrefixComparator; -import com.actiontech.dble.memory.unsafe.utils.sort.PrefixComparators; -import com.actiontech.dble.memory.unsafe.utils.sort.RowPrefixComputer; -import com.actiontech.dble.memory.unsafe.utils.sort.UnsafeExternalRowSorter; -import com.actiontech.dble.net.mysql.EOFPacket; -import com.actiontech.dble.net.mysql.RowDataPacket; -import com.actiontech.dble.route.RouteResultset; -import com.actiontech.dble.server.ServerConnection; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - - -/** - * Created by zagnix on 2016/6/21. - */ -public class DataNodeMergeManager extends AbstractDataNodeMerge { - - private static final Logger LOGGER = Logger.getLogger(DataNodeMergeManager.class); - - /** - * UnsafeRowGrouper - */ - private UnsafeRowGrouper unsafeRowGrouper = null; - - /** - * global merge sorter - */ - private UnsafeExternalRowSorter globalMergeResult = null; - - /** - * the context of sorter - */ - private final SeverMemory serverMemory; - private final MemoryManager memoryManager; - private final ServerPropertyConf conf; - - - public DataNodeMergeManager(MultiNodeQueryHandler handler, RouteResultset rrs) { - super(handler, rrs); - this.serverMemory = DbleServer.getInstance().getServerMemory(); - this.memoryManager = serverMemory.getResultMergeMemoryManager(); - this.conf = serverMemory.getConf(); - } - - - public void onRowMetaData(Map columnToIndex, int fieldSize) throws IOException { - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("field metadata keys:" + columnToIndex.keySet()); - LOGGER.debug("field metadata values:" + columnToIndex.values()); - } - this.fieldCount = fieldSize; - String[] groupByCols = rrs.getGroupByCols(); - unsafeRowGrouper = new UnsafeRowGrouper(columnToIndex, groupByCols); - // 1.schema - StructType schema = new StructType(columnToIndex, fieldSize); - if (groupByCols != null) { - OrderCol[] orderCols = new OrderCol[groupByCols.length]; - for (int i = 0; i < groupByCols.length; i++) { - orderCols[i] = new OrderCol(columnToIndex.get(groupByCols[i].toUpperCase())); - } - schema.setOrderCols(orderCols); - } - //2 .PrefixComputer - UnsafeExternalRowSorter.PrefixComputer prefixComputer = new RowPrefixComputer(schema); - - //3 .PrefixComparator ,ASC/DESC and the default is ASC - - PrefixComparator prefixComparator = PrefixComparators.LONG; - - - DataNodeMemoryManager dataNodeMemoryManager = new DataNodeMemoryManager(memoryManager, - Thread.currentThread().getId()); - - globalMergeResult = new UnsafeExternalRowSorter( - dataNodeMemoryManager, - serverMemory, - schema, - prefixComparator, - prefixComputer, - conf.getSizeAsBytes("server.buffer.pageSize", "1m"), - false, - true); - } - - @Override - public List getResults(byte[] eof) { - return null; - } - - @Override - public void run() { - - if (!running.compareAndSet(false, true)) { - return; - } - - boolean nulPack = false; - - try { - for (; ; ) { - final PackWraper pack = packs.poll(); - - if (pack == null) { - nulPack = true; - break; - } - if (pack == endFlagPack) { - /* - * if last date node send row eof packet - * means all the data have received - */ - final int warningCount = 0; - final EOFPacket eofPacket = new EOFPacket(); - final ByteBuffer eof = ByteBuffer.allocate(9); - BufferUtil.writeUB3(eof, eofPacket.calcPacketSize()); - eof.put(eofPacket.getPacketId()); - eof.put(eofPacket.getFieldCount()); - BufferUtil.writeUB2(eof, warningCount); - BufferUtil.writeUB2(eof, eofPacket.getStatus()); - final ServerConnection source = multiQueryHandler.getSession().getSource(); - final byte[] array = eof.array(); - - - Iterator iterator; - if (unsafeRowGrouper != null) { - iterator = unsafeRowGrouper.getResult(globalMergeResult); - } else { - iterator = globalMergeResult.sort(); - } - - if (iterator != null) { - multiQueryHandler.outputMergeResult(source, array, iterator); - } - - if (unsafeRowGrouper != null) { - unsafeRowGrouper.free(); - unsafeRowGrouper = null; - } - - if (globalMergeResult != null) { - globalMergeResult.cleanupResources(); - globalMergeResult = null; - } - - break; - } - - UnsafeRow unsafeRow = new UnsafeRow(fieldCount); - BufferHolder bufferHolder = new BufferHolder(unsafeRow, 0); - UnsafeRowWriter unsafeRowWriter = new UnsafeRowWriter(bufferHolder, fieldCount); - bufferHolder.reset(); - - // make a row to filled col - MySQLMessage mm = new MySQLMessage(pack.getRowData()); - mm.readUB3(); - mm.read(); - - for (int i = 0; i < fieldCount; i++) { - byte[] colValue = mm.readBytesWithLength(); - if (colValue != null) - unsafeRowWriter.write(i, colValue); - else - unsafeRow.setNullAt(i); - } - - unsafeRow.setTotalSize(bufferHolder.totalSize()); - - if (unsafeRowGrouper != null) { - unsafeRowGrouper.addRow(unsafeRow); - } else { - globalMergeResult.insertRow(unsafeRow); - } - } - - } catch (final Exception e) { - multiQueryHandler.handleDataProcessException(e); - } finally { - running.set(false); - if (nulPack && !packs.isEmpty()) { - this.run(); - } - } - } - - /** - * release the resource of DataNodeMergeManager - */ - public void clear() { - - if (unsafeRowGrouper != null) { - unsafeRowGrouper.free(); - unsafeRowGrouper = null; - } - - if (globalMergeResult != null) { - globalMergeResult.cleanupResources(); - globalMergeResult = null; - } - } -} diff --git a/src/main/java/com/actiontech/dble/sqlengine/mpp/RangRowDataPacketSorter.java b/src/main/java/com/actiontech/dble/sqlengine/mpp/RangRowDataPacketSorter.java deleted file mode 100644 index 7b4722105..000000000 --- a/src/main/java/com/actiontech/dble/sqlengine/mpp/RangRowDataPacketSorter.java +++ /dev/null @@ -1,27 +0,0 @@ -/* -* Copyright (C) 2016-2017 ActionTech. -* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. -* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. -*/ -package com.actiontech.dble.sqlengine.mpp; - -import com.actiontech.dble.net.mysql.RowDataPacket; -import com.actiontech.dble.sqlengine.mpp.tmp.RowDataSorter; - - -public class RangRowDataPacketSorter extends RowDataSorter { - public RangRowDataPacketSorter(OrderCol[] orderCols) { - super(orderCols); - } - - public boolean ascDesc(int byColumnIndex) { - return this.orderCols[byColumnIndex].orderType == OrderCol.COL_ORDER_TYPE_ASC; - } - - public int compareRowData(RowDataPacket l, RowDataPacket r, int byColumnIndex) { - byte[] left = l.fieldValues.get(this.orderCols[byColumnIndex].colMeta.getColIndex()); - byte[] right = r.fieldValues.get(this.orderCols[byColumnIndex].colMeta.getColIndex()); - - return RowDataPacketSorter.compareObject(left, right, this.orderCols[byColumnIndex]); - } -} diff --git a/src/main/java/com/actiontech/dble/sqlengine/mpp/RowDataPacketGrouper.java b/src/main/java/com/actiontech/dble/sqlengine/mpp/RowDataPacketGrouper.java deleted file mode 100644 index 58f0797d8..000000000 --- a/src/main/java/com/actiontech/dble/sqlengine/mpp/RowDataPacketGrouper.java +++ /dev/null @@ -1,64 +0,0 @@ -/* -* Copyright (C) 2016-2017 ActionTech. -* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. -* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. -*/ -package com.actiontech.dble.sqlengine.mpp; - -import com.actiontech.dble.net.mysql.RowDataPacket; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; - -/** - * implement group function select a,count(*),sum(*) from A group by a - * - * @author wuzhih - */ -public class RowDataPacketGrouper { - - private List result = Collections.synchronizedList(new ArrayList()); - private final int[] groupColumnIndexes; - - public RowDataPacketGrouper(int[] groupColumnIndexes) { - super(); - this.groupColumnIndexes = groupColumnIndexes; - } - - public List getResult() { - return result; - } - - public void addRow(RowDataPacket rowDataPkg) { - for (RowDataPacket row : result) { - if (sameGroupColumns(rowDataPkg, row)) { - return; - } - } - - // not aggreated ,insert new - result.add(rowDataPkg); - - } - - - // private static final - - private boolean sameGroupColumns(RowDataPacket newRow, RowDataPacket existRow) { - if (groupColumnIndexes == null) { // select count(*) from aaa , or group - // column - return true; - } - for (int groupColumnIndex : groupColumnIndexes) { - if (!Arrays.equals(newRow.fieldValues.get(groupColumnIndex), - existRow.fieldValues.get(groupColumnIndex))) { - return false; - } - - } - return true; - - } -} diff --git a/src/main/java/com/actiontech/dble/sqlengine/mpp/UnsafeRowGrouper.java b/src/main/java/com/actiontech/dble/sqlengine/mpp/UnsafeRowGrouper.java deleted file mode 100644 index ca84e61e1..000000000 --- a/src/main/java/com/actiontech/dble/sqlengine/mpp/UnsafeRowGrouper.java +++ /dev/null @@ -1,486 +0,0 @@ -/* -* Copyright (C) 2016-2017 ActionTech. -* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT. -* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. -*/ -package com.actiontech.dble.sqlengine.mpp; - -import com.actiontech.dble.DbleServer; -import com.actiontech.dble.memory.SeverMemory; -import com.actiontech.dble.memory.unsafe.KVIterator; -import com.actiontech.dble.memory.unsafe.map.UnsafeFixedWidthAggregationMap; -import com.actiontech.dble.memory.unsafe.memory.mm.DataNodeMemoryManager; -import com.actiontech.dble.memory.unsafe.memory.mm.MemoryManager; -import com.actiontech.dble.memory.unsafe.row.BufferHolder; -import com.actiontech.dble.memory.unsafe.row.StructType; -import com.actiontech.dble.memory.unsafe.row.UnsafeRow; -import com.actiontech.dble.memory.unsafe.row.UnsafeRowWriter; -import com.actiontech.dble.memory.unsafe.utils.BytesTools; -import com.actiontech.dble.memory.unsafe.utils.ServerPropertyConf; -import com.actiontech.dble.memory.unsafe.utils.sort.UnsafeExternalRowSorter; -import com.actiontech.dble.util.ByteUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.Nonnull; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.math.BigDecimal; -import java.util.*; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Created by zagnix on 2016/6/26. - *

- * implement group function select a,count(*),sum(*) from A group by a - */ -public class UnsafeRowGrouper { - private static final Logger LOGGER = LoggerFactory.getLogger(UnsafeRowGrouper.class); - - private UnsafeFixedWidthAggregationMap aggregationMap = null; - private final Map columnToIndexes; - private String[] sortColumnsByIndex = null; - private UnsafeRow valueKey = null; - private BufferHolder bufferHolder = null; - private UnsafeRowWriter unsafeRowWriter = null; - private final int groupKeyFieldCount; - private final int valueFieldCount; - private StructType groupKeySchema; - private StructType aggBufferSchema; - private UnsafeRow emptyAggregationBuffer; - - public UnsafeRowGrouper(Map columnToIndexes, String[] columns) { - super(); - assert columns != null; - assert columnToIndexes != null; - this.columnToIndexes = columnToIndexes; - this.sortColumnsByIndex = columns != null ? toSortColumnsByIndex(columns, columnToIndexes) : null; - this.groupKeyFieldCount = columns != null ? columns.length : 0; - this.valueFieldCount = columnToIndexes != null ? columnToIndexes.size() : 0; - - LOGGER.debug("columnToIndex :" + (columnToIndexes != null ? columnToIndexes.toString() : "null")); - - SeverMemory serverMemory = DbleServer.getInstance().getServerMemory(); - MemoryManager memoryManager = serverMemory.getResultMergeMemoryManager(); - ServerPropertyConf conf = serverMemory.getConf(); - - initGroupKey(); - initEmptyValueKey(); - - DataNodeMemoryManager dataNodeMemoryManager = - new DataNodeMemoryManager(memoryManager, Thread.currentThread().getId()); - - aggregationMap = new UnsafeFixedWidthAggregationMap( - emptyAggregationBuffer, - aggBufferSchema, - groupKeySchema, - dataNodeMemoryManager, - 2 * 1024, - conf.getSizeAsBytes("server.buffer.pageSize", "1m"), - false); - } - - private String[] toSortColumnsByIndex(String[] columns, Map colMetaMap) { - - Map map = new HashMap<>(); - - ColMeta curColMeta; - for (String column : columns) { - curColMeta = colMetaMap.get(column.toUpperCase()); - if (curColMeta == null) { - throw new IllegalArgumentException( - "all columns in group by clause should be in the selected column list.!" + column); - } - map.put(column, curColMeta.getColIndex()); - } - - - String[] sortColumnsIndexes = new String[map.size()]; - - List> entryList = new ArrayList<>( - map.entrySet()); - - Collections.sort(entryList, new Comparator>() { - @Override - public int compare(Map.Entry o1, Map.Entry o2) { - return o1.getValue().compareTo(o2.getValue()); - } - }); - - Iterator> iterator = entryList.iterator(); - Map.Entry tmpEntry = null; - - int index = 0; - - while (iterator.hasNext()) { - tmpEntry = iterator.next(); - sortColumnsIndexes[index++] = tmpEntry.getKey(); - } - - return sortColumnsIndexes; - } - - private void initGroupKey() { - Map groupColMetaMap = new HashMap<>(this.groupKeyFieldCount); - - UnsafeRow groupKey = new UnsafeRow(this.groupKeyFieldCount); - bufferHolder = new BufferHolder(groupKey, 0); - unsafeRowWriter = new UnsafeRowWriter(bufferHolder, this.groupKeyFieldCount); - bufferHolder.reset(); - - ColMeta curColMeta = null; - - for (int i = 0; i < this.groupKeyFieldCount; i++) { - curColMeta = this.columnToIndexes.get(sortColumnsByIndex[i].toUpperCase()); - groupColMetaMap.put(sortColumnsByIndex[i], curColMeta); - - - switch (curColMeta.getColType()) { - case ColMeta.COL_TYPE_BIT: - groupKey.setByte(i, (byte) 0); - break; - case ColMeta.COL_TYPE_INT: - case ColMeta.COL_TYPE_INT24: - case ColMeta.COL_TYPE_LONG: - groupKey.setInt(i, 0); - break; - case ColMeta.COL_TYPE_SHORT: - groupKey.setShort(i, (short) 0); - break; - case ColMeta.COL_TYPE_FLOAT: - groupKey.setFloat(i, 0); - break; - case ColMeta.COL_TYPE_DOUBLE: - groupKey.setDouble(i, 0); - break; - case ColMeta.COL_TYPE_NEWDECIMAL: - //groupKey.setDouble(i, 0); - unsafeRowWriter.write(i, new BigDecimal(0L)); - break; - case ColMeta.COL_TYPE_LONGLONG: - groupKey.setLong(i, 0); - break; - default: - unsafeRowWriter.write(i, "init".getBytes()); - break; - } - - } - groupKey.setTotalSize(bufferHolder.totalSize()); - - groupKeySchema = new StructType(groupColMetaMap, this.groupKeyFieldCount); - } - - private void initEmptyValueKey() { - emptyAggregationBuffer = new UnsafeRow(this.valueFieldCount); - bufferHolder = new BufferHolder(emptyAggregationBuffer, 0); - unsafeRowWriter = new UnsafeRowWriter(bufferHolder, this.valueFieldCount); - bufferHolder.reset(); - - ColMeta curColMeta = null; - for (Map.Entry fieldEntry : columnToIndexes.entrySet()) { - curColMeta = fieldEntry.getValue(); - - switch (curColMeta.getColType()) { - case ColMeta.COL_TYPE_BIT: - emptyAggregationBuffer.setByte(curColMeta.getColIndex(), (byte) 0); - break; - case ColMeta.COL_TYPE_INT: - case ColMeta.COL_TYPE_INT24: - case ColMeta.COL_TYPE_LONG: - emptyAggregationBuffer.setInt(curColMeta.getColIndex(), 0); - break; - case ColMeta.COL_TYPE_SHORT: - emptyAggregationBuffer.setShort(curColMeta.getColIndex(), (short) 0); - break; - case ColMeta.COL_TYPE_LONGLONG: - emptyAggregationBuffer.setLong(curColMeta.getColIndex(), 0); - break; - case ColMeta.COL_TYPE_FLOAT: - emptyAggregationBuffer.setFloat(curColMeta.getColIndex(), 0); - break; - case ColMeta.COL_TYPE_DOUBLE: - emptyAggregationBuffer.setDouble(curColMeta.getColIndex(), 0); - break; - case ColMeta.COL_TYPE_NEWDECIMAL: - //emptyAggregationBuffer.setDouble(curColMeta.colIndex, 0); - unsafeRowWriter.write(curColMeta.getColIndex(), new BigDecimal(0L)); - break; - default: - unsafeRowWriter.write(curColMeta.getColIndex(), "init".getBytes()); - break; - } - - } - - emptyAggregationBuffer.setTotalSize(bufferHolder.totalSize()); - aggBufferSchema = new StructType(columnToIndexes, this.valueFieldCount); - } - - - public Iterator getResult(@Nonnull UnsafeExternalRowSorter sorter) throws IOException { - KVIterator iterator = aggregationMap.iterator(); - /** - * group having - */ - insertValue(sorter); - return sorter.sort(); - } - - - /** - * is Avg Field - * - * @param columnName - * @return - */ - private boolean isAvgField(String columnName) { - Pattern pattern = Pattern.compile("AVG([1-9]\\d*|0)SUM"); - Matcher matcher = pattern.matcher(columnName); - return matcher.find(); - } - - - public UnsafeRow getAllBinaryRow(UnsafeRow row) throws UnsupportedEncodingException { - - UnsafeRow value = new UnsafeRow(this.valueFieldCount); - bufferHolder = new BufferHolder(value, 0); - unsafeRowWriter = new UnsafeRowWriter(bufferHolder, this.valueFieldCount); - bufferHolder.reset(); - ColMeta curColMeta = null; - - for (Map.Entry fieldEntry : columnToIndexes.entrySet()) { - curColMeta = fieldEntry.getValue(); - - if (!row.isNullAt(curColMeta.getColIndex())) { - switch (curColMeta.getColType()) { - case ColMeta.COL_TYPE_BIT: - unsafeRowWriter.write(curColMeta.getColIndex(), row.getByte(curColMeta.getColIndex())); - break; - case ColMeta.COL_TYPE_INT: - case ColMeta.COL_TYPE_LONG: - case ColMeta.COL_TYPE_INT24: - unsafeRowWriter.write(curColMeta.getColIndex(), - BytesTools.int2Bytes(row.getInt(curColMeta.getColIndex()))); - break; - case ColMeta.COL_TYPE_SHORT: - unsafeRowWriter.write(curColMeta.getColIndex(), - BytesTools.short2Bytes(row.getShort(curColMeta.getColIndex()))); - break; - case ColMeta.COL_TYPE_LONGLONG: - unsafeRowWriter.write(curColMeta.getColIndex(), - BytesTools.long2Bytes(row.getLong(curColMeta.getColIndex()))); - break; - case ColMeta.COL_TYPE_FLOAT: - unsafeRowWriter.write(curColMeta.getColIndex(), - BytesTools.float2Bytes(row.getFloat(curColMeta.getColIndex()))); - break; - case ColMeta.COL_TYPE_DOUBLE: - unsafeRowWriter.write(curColMeta.getColIndex(), - BytesTools.double2Bytes(row.getDouble(curColMeta.getColIndex()))); - break; - case ColMeta.COL_TYPE_NEWDECIMAL: - int scale = curColMeta.getDecimals(); - BigDecimal decimalVal = row.getDecimal(curColMeta.getColIndex(), scale); - unsafeRowWriter.write(curColMeta.getColIndex(), decimalVal.toString().getBytes()); - break; - default: - unsafeRowWriter.write(curColMeta.getColIndex(), - row.getBinary(curColMeta.getColIndex())); - break; - } - } else { - unsafeRowWriter.setNullAt(curColMeta.getColIndex()); - } - } - - value.setTotalSize(bufferHolder.totalSize()); - return value; - } - - private void insertValue(@Nonnull UnsafeExternalRowSorter sorter) { - KVIterator it = aggregationMap.iterator(); - try { - while (it.next()) { - UnsafeRow row = getAllBinaryRow(it.getValue()); - sorter.insertRow(row); - } - } catch (IOException e) { - LOGGER.error(e.getMessage()); - } - } - - - private boolean lt(byte[] l, byte[] r) { - return -1 != ByteUtil.compareNumberByte(l, r); - } - - private boolean gt(byte[] l, byte[] r) { - return 1 != ByteUtil.compareNumberByte(l, r); - } - - private boolean eq(byte[] l, byte[] r) { - return 0 != ByteUtil.compareNumberByte(l, r); - } - - private boolean neq(byte[] l, byte[] r) { - return 0 == ByteUtil.compareNumberByte(l, r); - } - - private UnsafeRow getGroupKey(UnsafeRow row) throws UnsupportedEncodingException { - - UnsafeRow key = null; - if (this.sortColumnsByIndex == null) { - /** - * no group by key word - * select count(*) from table; - */ - key = new UnsafeRow(this.groupKeyFieldCount + 1); - bufferHolder = new BufferHolder(key, 0); - unsafeRowWriter = new UnsafeRowWriter(bufferHolder, this.groupKeyFieldCount + 1); - bufferHolder.reset(); - unsafeRowWriter.write(0, "same".getBytes()); - key.setTotalSize(bufferHolder.totalSize()); - return key; - } - - - key = new UnsafeRow(this.groupKeyFieldCount); - bufferHolder = new BufferHolder(key, 0); - unsafeRowWriter = new UnsafeRowWriter(bufferHolder, this.groupKeyFieldCount); - bufferHolder.reset(); - - - ColMeta curColMeta = null; - for (int i = 0; i < this.groupKeyFieldCount; i++) { - curColMeta = this.columnToIndexes.get(sortColumnsByIndex[i].toUpperCase()); - if (!row.isNullAt(curColMeta.getColIndex())) { - switch (curColMeta.getColType()) { - case ColMeta.COL_TYPE_BIT: - key.setByte(i, row.getByte(curColMeta.getColIndex())); - // fallthrough - case ColMeta.COL_TYPE_INT: - case ColMeta.COL_TYPE_LONG: - case ColMeta.COL_TYPE_INT24: - key.setInt(i, - BytesTools.getInt(row.getBinary(curColMeta.getColIndex()))); - break; - case ColMeta.COL_TYPE_SHORT: - key.setShort(i, - BytesTools.getShort(row.getBinary(curColMeta.getColIndex()))); - break; - case ColMeta.COL_TYPE_FLOAT: - key.setFloat(i, - BytesTools.getFloat(row.getBinary(curColMeta.getColIndex()))); - break; - case ColMeta.COL_TYPE_DOUBLE: - key.setDouble(i, - BytesTools.getDouble(row.getBinary(curColMeta.getColIndex()))); - break; - case ColMeta.COL_TYPE_NEWDECIMAL: - //key.setDouble(i, BytesTools.getDouble(row.getBinary(curColMeta.colIndex))); - unsafeRowWriter.write(i, - new BigDecimal(new String(row.getBinary(curColMeta.getColIndex())))); - break; - case ColMeta.COL_TYPE_LONGLONG: - key.setLong(i, - BytesTools.getLong(row.getBinary(curColMeta.getColIndex()))); - break; - default: - unsafeRowWriter.write(i, - row.getBinary(curColMeta.getColIndex())); - break; - } - } else { - key.setNullAt(i); - } - } - - key.setTotalSize(bufferHolder.totalSize()); - - return key; - } - - private UnsafeRow getValue(UnsafeRow row) throws UnsupportedEncodingException { - - UnsafeRow value = new UnsafeRow(this.valueFieldCount); - bufferHolder = new BufferHolder(value, 0); - unsafeRowWriter = new UnsafeRowWriter(bufferHolder, this.valueFieldCount); - bufferHolder.reset(); - ColMeta curColMeta = null; - for (Map.Entry fieldEntry : columnToIndexes.entrySet()) { - curColMeta = fieldEntry.getValue(); - if (!row.isNullAt(curColMeta.getColIndex())) { - switch (curColMeta.getColType()) { - case ColMeta.COL_TYPE_BIT: - value.setByte(curColMeta.getColIndex(), row.getByte(curColMeta.getColIndex())); - break; - case ColMeta.COL_TYPE_INT: - case ColMeta.COL_TYPE_LONG: - case ColMeta.COL_TYPE_INT24: - value.setInt(curColMeta.getColIndex(), - BytesTools.getInt(row.getBinary(curColMeta.getColIndex()))); - - break; - case ColMeta.COL_TYPE_SHORT: - value.setShort(curColMeta.getColIndex(), - BytesTools.getShort(row.getBinary(curColMeta.getColIndex()))); - break; - case ColMeta.COL_TYPE_LONGLONG: - value.setLong(curColMeta.getColIndex(), - BytesTools.getLong(row.getBinary(curColMeta.getColIndex()))); - - - break; - case ColMeta.COL_TYPE_FLOAT: - value.setFloat(curColMeta.getColIndex(), - BytesTools.getFloat(row.getBinary(curColMeta.getColIndex()))); - - break; - case ColMeta.COL_TYPE_DOUBLE: - value.setDouble(curColMeta.getColIndex(), BytesTools.getDouble(row.getBinary(curColMeta.getColIndex()))); - - break; - case ColMeta.COL_TYPE_NEWDECIMAL: - //value.setDouble(curColMeta.colIndex, BytesTools.getDouble(row.getBinary(curColMeta.colIndex))); - unsafeRowWriter.write(curColMeta.getColIndex(), - new BigDecimal(new String(row.getBinary(curColMeta.getColIndex())))); - break; - default: - unsafeRowWriter.write(curColMeta.getColIndex(), - row.getBinary(curColMeta.getColIndex())); - break; - } - } else { - switch (curColMeta.getColType()) { - case ColMeta.COL_TYPE_NEWDECIMAL: - BigDecimal nullDecimal = null; - unsafeRowWriter.write(curColMeta.getColIndex(), nullDecimal); - break; - default: - value.setNullAt(curColMeta.getColIndex()); - break; - } - } - } - - - value.setTotalSize(bufferHolder.totalSize()); - return value; - } - - public void addRow(UnsafeRow rowDataPkg) throws UnsupportedEncodingException { - UnsafeRow key = getGroupKey(rowDataPkg); - UnsafeRow value = getValue(rowDataPkg); - - if (!aggregationMap.find(key)) { - aggregationMap.put(key, value); - } - } - - public void free() { - if (aggregationMap != null) - aggregationMap.free(); - } -} diff --git a/src/test/java/com/actiontech/dble/memory/unsafe/sort/TestSorter.java b/src/test/java/com/actiontech/dble/memory/unsafe/sort/TestSorter.java deleted file mode 100644 index 60b73d7a0..000000000 --- a/src/test/java/com/actiontech/dble/memory/unsafe/sort/TestSorter.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Copyright (C) 2016-2017 ActionTech. - * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. - */ - -package com.actiontech.dble.memory.unsafe.sort; - -import com.actiontech.dble.memory.SeverMemory; -import com.actiontech.dble.memory.unsafe.memory.mm.DataNodeMemoryManager; -import com.actiontech.dble.memory.unsafe.memory.mm.MemoryManager; -import com.actiontech.dble.memory.unsafe.row.BufferHolder; -import com.actiontech.dble.memory.unsafe.row.StructType; -import com.actiontech.dble.memory.unsafe.row.UnsafeRow; -import com.actiontech.dble.memory.unsafe.row.UnsafeRowWriter; -import com.actiontech.dble.memory.unsafe.utils.ServerPropertyConf; -import com.actiontech.dble.memory.unsafe.utils.sort.PrefixComparator; -import com.actiontech.dble.memory.unsafe.utils.sort.PrefixComparators; -import com.actiontech.dble.memory.unsafe.utils.sort.RowPrefixComputer; -import com.actiontech.dble.memory.unsafe.utils.sort.UnsafeExternalRowSorter; -import com.actiontech.dble.sqlengine.mpp.ColMeta; -import com.actiontech.dble.sqlengine.mpp.OrderCol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Random; -import java.util.concurrent.CountDownLatch; - -/** - * Created by zagnix on 16-7-9. - */ -public class TestSorter implements Runnable { - private static final Logger logger = LoggerFactory.getLogger(TestSorter.class); - - private static final int TEST_SIZE = 1000000; - private static int TASK_SIZE = 100; - private static CountDownLatch countDownLatch = new CountDownLatch(100); - - public void runSorter(SeverMemory severMemory, - MemoryManager memoryManager, - ServerPropertyConf conf) throws NoSuchFieldException, IllegalAccessException, IOException { - DataNodeMemoryManager dataNodeMemoryManager = new DataNodeMemoryManager(memoryManager, - Thread.currentThread().getId()); - /** - * 1.schema ,mock a field - * - */ - int fieldCount = 3; - ColMeta colMeta = null; - Map colMetaMap = new HashMap(fieldCount); - colMeta = new ColMeta(0, ColMeta.COL_TYPE_STRING); - colMetaMap.put("id", colMeta); - colMeta = new ColMeta(1, ColMeta.COL_TYPE_STRING); - colMetaMap.put("name", colMeta); - colMeta = new ColMeta(2, ColMeta.COL_TYPE_STRING); - colMetaMap.put("age", colMeta); - - - OrderCol[] orderCols = new OrderCol[1]; - OrderCol orderCol = new OrderCol(colMetaMap.get("id"), - OrderCol.COL_ORDER_TYPE_ASC); - orderCols[0] = orderCol; - /** - * 2 .PrefixComputer - */ - StructType schema = new StructType(colMetaMap, fieldCount); - schema.setOrderCols(orderCols); - - UnsafeExternalRowSorter.PrefixComputer prefixComputer = - new RowPrefixComputer(schema); - - /** - * 3 .PrefixComparator defalut is ASC, or set DESC - */ - final PrefixComparator prefixComparator = PrefixComparators.LONG; - - UnsafeExternalRowSorter sorter = - new UnsafeExternalRowSorter(dataNodeMemoryManager, - severMemory, - schema, - prefixComparator, - prefixComputer, - conf.getSizeAsBytes("server.buffer.pageSize", "1m"), - true, - true); - UnsafeRow unsafeRow; - BufferHolder bufferHolder; - UnsafeRowWriter unsafeRowWriter; - String line = "testUnsafeRow"; - final Random rand = new Random(42); - for (int i = 0; i < TEST_SIZE; i++) { - unsafeRow = new UnsafeRow(3); - bufferHolder = new BufferHolder(unsafeRow); - unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 3); - bufferHolder.reset(); - - String key = getRandomString(rand.nextInt(300) + 100); - - unsafeRowWriter.write(0, key.getBytes()); - unsafeRowWriter.write(1, line.getBytes()); - unsafeRowWriter.write(2, ("35" + 1).getBytes()); - - unsafeRow.setTotalSize(bufferHolder.totalSize()); - sorter.insertRow(unsafeRow); - } - Iterator iter = sorter.sort(); - UnsafeRow row = null; - int indexprint = 0; - while (iter.hasNext()) { - row = iter.next(); - indexprint++; - } - - sorter.cleanupResources(); - countDownLatch.countDown(); - - System.out.println("Thread ID :" + Thread.currentThread().getId() + "Index : " + indexprint); - } - - - public static String getRandomString(int length) { //length of string - String base = "abcdefghijklmnopqrstuvwxyz0123456789"; - Random random = new Random(); - StringBuffer sb = new StringBuffer(); - for (int i = 0; i < length; i++) { - int number = random.nextInt(base.length()); - sb.append(base.charAt(number)); - } - return sb.toString(); - } - - final SeverMemory severMemory; - final MemoryManager memoryManager; - final ServerPropertyConf conf; - - - public TestSorter(SeverMemory severMemory, MemoryManager memoryManager, ServerPropertyConf conf) throws NoSuchFieldException, IllegalAccessException { - this.severMemory = severMemory; - this.memoryManager = memoryManager; - this.conf = conf; - } - - @Override - public void run() { - try { - runSorter(severMemory, memoryManager, conf); - } catch (NoSuchFieldException e) { - logger.error(e.getMessage()); - } catch (IllegalAccessException e) { - logger.error(e.getMessage()); - } catch (IOException e) { - logger.error(e.getMessage()); - } - } - - public static void main(String[] args) throws Exception { - - SeverMemory severMemory; - MemoryManager memoryManager; - ServerPropertyConf conf; - - severMemory = new SeverMemory(); - memoryManager = severMemory.getResultMergeMemoryManager(); - conf = severMemory.getConf(); - - for (int i = 0; i < TASK_SIZE; i++) { - Thread thread = new Thread(new TestSorter(severMemory, memoryManager, conf)); - thread.start(); - } - - while (countDownLatch.getCount() != 0) { - System.err.println("count ========================>" + countDownLatch.getCount()); - Thread.sleep(1000); - } - - System.err.println(TASK_SIZE + " tasks sorter finished ok !!!!!!!!!"); - - System.exit(1); - } - -} diff --git a/src/test/java/com/actiontech/dble/memory/unsafe/sort/UnsafeExternalRowSorterTest.java b/src/test/java/com/actiontech/dble/memory/unsafe/sort/UnsafeExternalRowSorterTest.java deleted file mode 100644 index ac0d2210f..000000000 --- a/src/test/java/com/actiontech/dble/memory/unsafe/sort/UnsafeExternalRowSorterTest.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright (C) 2016-2017 ActionTech. - * License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher. - */ - -package com.actiontech.dble.memory.unsafe.sort; - -import com.actiontech.dble.memory.SeverMemory; -import com.actiontech.dble.memory.unsafe.memory.mm.DataNodeMemoryManager; -import com.actiontech.dble.memory.unsafe.memory.mm.MemoryManager; -import com.actiontech.dble.memory.unsafe.row.BufferHolder; -import com.actiontech.dble.memory.unsafe.row.StructType; -import com.actiontech.dble.memory.unsafe.row.UnsafeRow; -import com.actiontech.dble.memory.unsafe.row.UnsafeRowWriter; -import com.actiontech.dble.memory.unsafe.utils.ServerPropertyConf; -import com.actiontech.dble.memory.unsafe.utils.sort.PrefixComparator; -import com.actiontech.dble.memory.unsafe.utils.sort.PrefixComparators; -import com.actiontech.dble.memory.unsafe.utils.sort.RowPrefixComputer; -import com.actiontech.dble.memory.unsafe.utils.sort.UnsafeExternalRowSorter; -import com.actiontech.dble.sqlengine.mpp.ColMeta; -import com.actiontech.dble.sqlengine.mpp.OrderCol; -import org.junit.Assert; -import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.*; - -/** - * Created by zagnix on 2016/6/19. - */ -public class UnsafeExternalRowSorterTest { - - private static final int TEST_SIZE = 100000; - public static final Logger LOGGER = LoggerFactory.getLogger(UnsafeExternalRowSorterTest.class); - - /** - * test type LONG,INT,SHORT,Float,Double,String,Binary - */ - @Test - public void testUnsafeExternalRowSorter() throws NoSuchFieldException, IllegalAccessException, IOException { - SeverMemory severMemory = new SeverMemory(); - MemoryManager memoryManager = severMemory.getResultMergeMemoryManager(); - ServerPropertyConf conf = severMemory.getConf(); - DataNodeMemoryManager dataNodeMemoryManager = new DataNodeMemoryManager(memoryManager, - Thread.currentThread().getId()); - - int fieldCount = 3; - ColMeta colMeta = null; - Map colMetaMap = new HashMap(fieldCount); - colMeta = new ColMeta(0, ColMeta.COL_TYPE_STRING); - colMetaMap.put("id", colMeta); - colMeta = new ColMeta(1, ColMeta.COL_TYPE_STRING); - colMetaMap.put("name", colMeta); - colMeta = new ColMeta(2, ColMeta.COL_TYPE_STRING); - colMetaMap.put("age", colMeta); - - - OrderCol[] orderCols = new OrderCol[1]; - OrderCol orderCol = new OrderCol(colMetaMap.get("id"), - OrderCol.COL_ORDER_TYPE_ASC); - orderCols[0] = orderCol; - /** - * 2 .PrefixComputer - */ - StructType schema = new StructType(colMetaMap, fieldCount); - schema.setOrderCols(orderCols); - - UnsafeExternalRowSorter.PrefixComputer prefixComputer = - new RowPrefixComputer(schema); - - - final PrefixComparator prefixComparator = PrefixComparators.LONG; - - UnsafeExternalRowSorter sorter = - new UnsafeExternalRowSorter(dataNodeMemoryManager, - severMemory, - schema, - prefixComparator, - prefixComputer, - conf.getSizeAsBytes("server.buffer.pageSize", "1m"), - true, - true); - - UnsafeRow unsafeRow; - BufferHolder bufferHolder; - UnsafeRowWriter unsafeRowWriter; - String line = "testUnsafeRow"; - // List floats = new ArrayList(); - List longs = new ArrayList(); - final Random rand = new Random(42); - for (int i = 0; i < TEST_SIZE; i++) { - unsafeRow = new UnsafeRow(3); - bufferHolder = new BufferHolder(unsafeRow); - unsafeRowWriter = new UnsafeRowWriter(bufferHolder, 3); - bufferHolder.reset(); - - String key = getRandomString(rand.nextInt(300) + 100); - - //long v = rand.nextLong(); - // longs.add(v); - unsafeRowWriter.write(0, key.getBytes()); - // unsafeRowWriter.write(0, BytesTools.toBytes(v)); - unsafeRowWriter.write(1, line.getBytes()); - unsafeRowWriter.write(2, ("35" + 1).getBytes()); - - unsafeRow.setTotalSize(bufferHolder.totalSize()); - sorter.insertRow(unsafeRow); - } - - Iterator iter = sorter.sort(); -/* - float [] com = new float[floats.size()]; - for (int i = 0; i