From 7e341be5e2344903aaeb2e9065e04a4ccfb7a2cb Mon Sep 17 00:00:00 2001 From: yanhuqing666 Date: Thu, 15 Dec 2016 10:03:27 +0800 Subject: [PATCH] add missing file --- .gitignore | 2 +- .../io/mycat/sqlengine/mpp/tmp/HeapItf.java | 57 +++++++ .../mycat/sqlengine/mpp/tmp/IntMinHeap.java | 109 ++++++++++++++ .../io/mycat/sqlengine/mpp/tmp/MaxHeap.java | 139 ++++++++++++++++++ .../io/mycat/sqlengine/mpp/tmp/MinHeap.java | 107 ++++++++++++++ .../mycat/sqlengine/mpp/tmp/RowDataCmp.java | 44 ++++++ .../sqlengine/mpp/tmp/RowDataSorter.java | 83 +++++++++++ 7 files changed, 540 insertions(+), 1 deletion(-) create mode 100644 src/main/java/io/mycat/sqlengine/mpp/tmp/HeapItf.java create mode 100644 src/main/java/io/mycat/sqlengine/mpp/tmp/IntMinHeap.java create mode 100644 src/main/java/io/mycat/sqlengine/mpp/tmp/MaxHeap.java create mode 100644 src/main/java/io/mycat/sqlengine/mpp/tmp/MinHeap.java create mode 100644 src/main/java/io/mycat/sqlengine/mpp/tmp/RowDataCmp.java create mode 100644 src/main/java/io/mycat/sqlengine/mpp/tmp/RowDataSorter.java diff --git a/.gitignore b/.gitignore index e3f4e7ecc..4e28a465b 100644 --- a/.gitignore +++ b/.gitignore @@ -3,7 +3,7 @@ .metadata .gradle bin/ -tmp/ +/tmp/ *.tmp *.bak *.swp diff --git a/src/main/java/io/mycat/sqlengine/mpp/tmp/HeapItf.java b/src/main/java/io/mycat/sqlengine/mpp/tmp/HeapItf.java new file mode 100644 index 000000000..01ff39d50 --- /dev/null +++ b/src/main/java/io/mycat/sqlengine/mpp/tmp/HeapItf.java @@ -0,0 +1,57 @@ +package io.mycat.sqlengine.mpp.tmp; + +import java.util.List; + +import io.mycat.net.mysql.RowDataPacket; + +/** + * @author coderczp-2014-12-17 + */ +public interface HeapItf { + + /** + * 构建堆 + */ + void buildHeap(); + + /** + * 获取堆根节点 + * + * @return + */ + RowDataPacket getRoot(); + + /** + * 向堆添加元素 + * + * @param row + */ + void add(RowDataPacket row); + + /** + * 获取堆数据 + * + * @return + */ + List getData(); + + /** + * 设置根节点元素 + * + * @param root + */ + void setRoot(RowDataPacket root); + + /** + * 向已满的堆添加元素 + * + * @param row + */ + boolean addIfRequired(RowDataPacket row); + + /** + * 堆排序 + */ + void heapSort(int size); + +} diff --git a/src/main/java/io/mycat/sqlengine/mpp/tmp/IntMinHeap.java b/src/main/java/io/mycat/sqlengine/mpp/tmp/IntMinHeap.java new file mode 100644 index 000000000..11219cf99 --- /dev/null +++ b/src/main/java/io/mycat/sqlengine/mpp/tmp/IntMinHeap.java @@ -0,0 +1,109 @@ +package io.mycat.sqlengine.mpp.tmp; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Random; +import java.util.Set; + +/** + * + * @author coderczp-2014-12-8 + */ +public class IntMinHeap { + + private int i; + private int[] data; + + public IntMinHeap(int[] data) { + this.data = data; + } + + public void buildMinHeap() { + int len = data.length; + for (int i = len / 2 - 1; i >= 0; i--) { + heapify(i); + } + } + + private void heapify(int i) { + int l = left(i); + int r = right(i); + int max = i; + int len = data.length; + if (l < len && data[l] < data[i]) { + max = l; + } + if (r < len && data[r] < data[max]) { + max = r; + } + if (i == max) { + return; + } + swap(i, max); + heapify(max); + } + + private int right(int i) { + return (i + 1) << 1; + } + + private int left(int i) { + return ((i + 1) << 1) - 1; + } + + private void swap(int i, int j) { + int tmp = data[i]; + data[i] = data[j]; + data[j] = tmp; + } + + public int getRoot() { + return data[0]; + } + + public void setRoot(int root) { + data[0] = root; + heapify(0); + } + + public int[] getData() { + return data; + } + + public synchronized void add(int row) { + data[i++] = row; + } + + // 淘汰堆里最大的数据 + public void addIfRequired(int row) { + int root = getRoot(); + if (row > root) { + setRoot(row); + } + } + + public static void main(String[] args) { + Set set = new HashSet(); + int dataCount = 30; + Random rd = new Random(); + int bound = dataCount * 3; + while (set.size() < dataCount) { + set.add(rd.nextInt(bound)); + } + int i = 0; + int topN = 5; + int[] data = new int[topN]; + Iterator it = set.iterator(); + while (i < topN) { + data[i++] = it.next(); + } + System.out.println(set); + IntMinHeap heap = new IntMinHeap(data); + heap.buildMinHeap(); + while (it.hasNext()) { + heap.addIfRequired(it.next()); + } + System.out.println(Arrays.toString(data)); + } +} diff --git a/src/main/java/io/mycat/sqlengine/mpp/tmp/MaxHeap.java b/src/main/java/io/mycat/sqlengine/mpp/tmp/MaxHeap.java new file mode 100644 index 000000000..26220bc0c --- /dev/null +++ b/src/main/java/io/mycat/sqlengine/mpp/tmp/MaxHeap.java @@ -0,0 +1,139 @@ +package io.mycat.sqlengine.mpp.tmp; + +import java.util.ArrayList; +import java.util.List; + +import io.mycat.net.mysql.RowDataPacket; + +/** + * 最大堆排序,适用于顺序排序 + * + * @author coderczp-2014-12-8 + */ +public class MaxHeap implements HeapItf { + + private RowDataCmp cmp; + private List data; + + public MaxHeap(RowDataCmp cmp, int size) { + this.cmp = cmp; + this.data = new ArrayList<>(); + } + + @Override + public void buildHeap() { + int len = data.size(); + for (int i = len / 2 - 1; i >= 0; i--) { + heapifyRecursive(i, len); + } + } + + private void heapify(int i, int size) { + int max = 0; + int mid = size >> 1;// ==size/2 + while (i <= mid) { + max = i; + int left = i << 1; + int right = left + 1; + if (left < size && cmp.compare(data.get(left), data.get(i)) > 0) { + max = left; + } + if (right < size && cmp.compare(data.get(right), data.get(max)) > 0) { + max = right; + } + if (i == max) { + break; + } + if (i != max) { + RowDataPacket tmp = data.get(i); + data.set(i, data.get(max)); + data.set(max, tmp); + i = max; + } + } + + } + + // 递归版本 + protected void heapifyRecursive(int i, int size) { + int l = left(i); + int r = right(i); + int max = i; + if (l < size && cmp.compare(data.get(l), data.get(i)) > 0) { + max = l; + } + if (r < size && cmp.compare(data.get(r), data.get(max)) > 0) { + max = r; + } + if (i == max) { + return; + } + swap(i, max); + heapifyRecursive(max, size); + } + + + private int right(int i) { + return (i + 1) << 1; + } + + private int left(int i) { + return ((i + 1) << 1) - 1; + } + + private void swap(int i, int j) { + RowDataPacket tmp = data.get(i); + RowDataPacket elementAt = data.get(j); + data.set(i, elementAt); + data.set(j, tmp); + } + + @Override + public RowDataPacket getRoot() { + return data.get(0); + } + + @Override + public void setRoot(RowDataPacket root) { + data.set(0, root); + heapifyRecursive(0, data.size()); + } + + @Override + public List getData() { + return data; + } + + @Override + public void add(RowDataPacket row) { + data.add(row); + } + + @Override + public boolean addIfRequired(RowDataPacket row) { + // 淘汰堆里最小的数据 + RowDataPacket root = getRoot(); + if (cmp.compare(row, root) < 0) { + setRoot(row); + return true; + } + return false; + } + + @Override + public void heapSort(int size) { + final int total = data.size(); + // 容错处理 + if (size <= 0 || size > total) { + size = total; + } + final int min = size == total ? 0 : (total - size - 1); + + // 末尾与头交换,交换后调整最大堆 + for (int i = total - 1; i > min; i--) { + swap(0, i); + heapifyRecursive(0, i); + } + } + +} diff --git a/src/main/java/io/mycat/sqlengine/mpp/tmp/MinHeap.java b/src/main/java/io/mycat/sqlengine/mpp/tmp/MinHeap.java new file mode 100644 index 000000000..dd4ad63ba --- /dev/null +++ b/src/main/java/io/mycat/sqlengine/mpp/tmp/MinHeap.java @@ -0,0 +1,107 @@ +package io.mycat.sqlengine.mpp.tmp; + +import java.util.ArrayList; +import java.util.List; + +import io.mycat.net.mysql.RowDataPacket; + +/** + * 最小堆排序,适用于倒序排序 + * + * @author coderczp-2014-12-8 + */ +public class MinHeap implements HeapItf { + + private RowDataCmp cmp; + private List data; + + public MinHeap(RowDataCmp cmp, int size) { + this.cmp = cmp; + this.data = new ArrayList<>(); + } + + @Override + public void buildHeap() { + int len = data.size(); + for (int i = len / 2 - 1; i >= 0; i--) { + heapify(i, len); + } + } + + private void heapify(int i, int size) { + int l = left(i); + int r = right(i); + int smallest = i; + if (l < size && cmp.compare(data.get(l), data.get(i)) < 0) { + smallest = l; + } + if (r < size && cmp.compare(data.get(r), data.get(smallest)) < 0) { + smallest = r; + } + if (i == smallest) { + return; + } + swap(i, smallest); + heapify(smallest, size); + } + + private int right(int i) { + return (i + 1) << 1; + } + + private int left(int i) { + return ((i + 1) << 1) - 1; + } + + private void swap(int i, int j) { + RowDataPacket tmp = data.get(i); + RowDataPacket elementAt = data.get(j); + data.set(i, elementAt); + data.set(j, tmp); + } + + public RowDataPacket getRoot() { + return data.get(0); + } + + public void setRoot(RowDataPacket root) { + data.set(0, root); + heapify(0, data.size()); + } + + public List getData() { + return data; + } + + public void add(RowDataPacket row) { + data.add(row); + } + + @Override + public boolean addIfRequired(RowDataPacket row) { + // 淘汰堆里最小的数据 + RowDataPacket root = getRoot(); + if (cmp.compare(row, root) > 0) { + setRoot(row); + return true; + } + return false; + } + + @Override + public void heapSort(int size) { + final int total = data.size(); + //容错处理 + if (size <= 0 || size > total) { + size = total; + } + final int min = size == total ? 0 : (total - size - 1); + + //末尾与头交换,交换后调整最大堆 + for (int i = total - 1; i > min; i--) { + swap(0, i); + heapify(0, i); + } + } + +} diff --git a/src/main/java/io/mycat/sqlengine/mpp/tmp/RowDataCmp.java b/src/main/java/io/mycat/sqlengine/mpp/tmp/RowDataCmp.java new file mode 100644 index 000000000..a22066cdc --- /dev/null +++ b/src/main/java/io/mycat/sqlengine/mpp/tmp/RowDataCmp.java @@ -0,0 +1,44 @@ +package io.mycat.sqlengine.mpp.tmp; + +import java.util.Comparator; + +import io.mycat.net.mysql.RowDataPacket; +import io.mycat.sqlengine.mpp.OrderCol; +import io.mycat.sqlengine.mpp.RowDataPacketSorter; + +/** + * + * @author coderczp-2014-12-8 + */ +public class RowDataCmp implements Comparator { + + private OrderCol[] orderCols; + + public RowDataCmp(OrderCol[] orderCols) { + this.orderCols = orderCols; + } + + @Override + public int compare(RowDataPacket o1, RowDataPacket o2) { + OrderCol[] tmp = this.orderCols; + int cmp = 0; + int len = tmp.length; + //依次比较order by语句上的多个排序字段的值 + int type = OrderCol.COL_ORDER_TYPE_ASC; + for (int i = 0; i < len; i++) { + int colIndex = tmp[i].colMeta.colIndex; + byte[] left = o1.fieldValues.get(colIndex); + byte[] right = o2.fieldValues.get(colIndex); + if (tmp[i].orderType == type) { + cmp = RowDataPacketSorter.compareObject(left, right, tmp[i]); + } else { + cmp = RowDataPacketSorter.compareObject(right, left, tmp[i]); + } + if (cmp != 0) { + return cmp; + } + } + return cmp; + } + +} diff --git a/src/main/java/io/mycat/sqlengine/mpp/tmp/RowDataSorter.java b/src/main/java/io/mycat/sqlengine/mpp/tmp/RowDataSorter.java new file mode 100644 index 000000000..8a3869f01 --- /dev/null +++ b/src/main/java/io/mycat/sqlengine/mpp/tmp/RowDataSorter.java @@ -0,0 +1,83 @@ +package io.mycat.sqlengine.mpp.tmp; + +import java.util.*; + +import io.mycat.net.mysql.RowDataPacket; +import io.mycat.sqlengine.mpp.OrderCol; +import io.mycat.sqlengine.mpp.RowDataPacketSorter; + +/** + * + * @author coderczp-2014-12-8 + */ +public class RowDataSorter extends RowDataPacketSorter { + + // 记录总数(=offset+limit) + private volatile int total; + // 查询的记录数(=limit) + private volatile int size; + // 堆 + private volatile HeapItf heap; + // 多列比较器 + private volatile RowDataCmp cmp; + // 是否执行过buildHeap + private volatile boolean hasBuild; + + public RowDataSorter(OrderCol[] orderCols) { + super(orderCols); + this.cmp = new RowDataCmp(orderCols); + } + + public synchronized void setLimit(int start, int size) { + // 容错处理 + if (start < 0) { + start = 0; + } + if (size <= 0) { + this.total = this.size = Integer.MAX_VALUE; + } else { + this.total = start + size; + this.size = size; + } + // 统一采用顺序,order by 条件交给比较器去处理 + this.heap = new MaxHeap(cmp, total); + } + + @Override + public synchronized boolean addRow(RowDataPacket row) { + if (heap.getData().size() < total) { + heap.add(row); + return true; + } + // 堆已满,构建最大堆,并执行淘汰元素逻辑 + if (heap.getData().size() == total && hasBuild == false) { + heap.buildHeap(); + hasBuild = true; + } + return heap.addIfRequired(row); + } + + @Override + public List getSortedResult() { + final List data = heap.getData(); + if (data.size() < 2) { + return data; + } + + if (total - size > data.size()) { + return Collections.emptyList(); + } + + // 构建最大堆并排序 + if (!hasBuild) { + heap.buildHeap(); + } + heap.heapSort(this.size); + return heap.getData(); + } + + public RowDataCmp getCmp() { + return cmp; + } + +}