add missing file

This commit is contained in:
yanhuqing666
2016-12-15 10:03:27 +08:00
parent 255079dbc7
commit 7e341be5e2
7 changed files with 540 additions and 1 deletions

2
.gitignore vendored
View File

@@ -3,7 +3,7 @@
.metadata
.gradle
bin/
tmp/
/tmp/
*.tmp
*.bak
*.swp

View File

@@ -0,0 +1,57 @@
package io.mycat.sqlengine.mpp.tmp;
import java.util.List;
import io.mycat.net.mysql.RowDataPacket;
/**
* @author coderczp-2014-12-17
*/
public interface HeapItf {
/**
* 构建堆
*/
void buildHeap();
/**
* 获取堆根节点
*
* @return
*/
RowDataPacket getRoot();
/**
* 向堆添加元素
*
* @param row
*/
void add(RowDataPacket row);
/**
* 获取堆数据
*
* @return
*/
List<RowDataPacket> getData();
/**
* 设置根节点元素
*
* @param root
*/
void setRoot(RowDataPacket root);
/**
* 向已满的堆添加元素
*
* @param row
*/
boolean addIfRequired(RowDataPacket row);
/**
* 堆排序
*/
void heapSort(int size);
}

View File

@@ -0,0 +1,109 @@
package io.mycat.sqlengine.mpp.tmp;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
/**
*
* @author coderczp-2014-12-8
*/
public class IntMinHeap {
private int i;
private int[] data;
public IntMinHeap(int[] data) {
this.data = data;
}
public void buildMinHeap() {
int len = data.length;
for (int i = len / 2 - 1; i >= 0; i--) {
heapify(i);
}
}
private void heapify(int i) {
int l = left(i);
int r = right(i);
int max = i;
int len = data.length;
if (l < len && data[l] < data[i]) {
max = l;
}
if (r < len && data[r] < data[max]) {
max = r;
}
if (i == max) {
return;
}
swap(i, max);
heapify(max);
}
private int right(int i) {
return (i + 1) << 1;
}
private int left(int i) {
return ((i + 1) << 1) - 1;
}
private void swap(int i, int j) {
int tmp = data[i];
data[i] = data[j];
data[j] = tmp;
}
public int getRoot() {
return data[0];
}
public void setRoot(int root) {
data[0] = root;
heapify(0);
}
public int[] getData() {
return data;
}
public synchronized void add(int row) {
data[i++] = row;
}
// 淘汰堆里最大的数据
public void addIfRequired(int row) {
int root = getRoot();
if (row > root) {
setRoot(row);
}
}
public static void main(String[] args) {
Set<Integer> set = new HashSet<Integer>();
int dataCount = 30;
Random rd = new Random();
int bound = dataCount * 3;
while (set.size() < dataCount) {
set.add(rd.nextInt(bound));
}
int i = 0;
int topN = 5;
int[] data = new int[topN];
Iterator<Integer> it = set.iterator();
while (i < topN) {
data[i++] = it.next();
}
System.out.println(set);
IntMinHeap heap = new IntMinHeap(data);
heap.buildMinHeap();
while (it.hasNext()) {
heap.addIfRequired(it.next());
}
System.out.println(Arrays.toString(data));
}
}

View File

@@ -0,0 +1,139 @@
package io.mycat.sqlengine.mpp.tmp;
import java.util.ArrayList;
import java.util.List;
import io.mycat.net.mysql.RowDataPacket;
/**
* 最大堆排序,适用于顺序排序
*
* @author coderczp-2014-12-8
*/
public class MaxHeap implements HeapItf {
private RowDataCmp cmp;
private List<RowDataPacket> data;
public MaxHeap(RowDataCmp cmp, int size) {
this.cmp = cmp;
this.data = new ArrayList<>();
}
@Override
public void buildHeap() {
int len = data.size();
for (int i = len / 2 - 1; i >= 0; i--) {
heapifyRecursive(i, len);
}
}
private void heapify(int i, int size) {
int max = 0;
int mid = size >> 1;// ==size/2
while (i <= mid) {
max = i;
int left = i << 1;
int right = left + 1;
if (left < size && cmp.compare(data.get(left), data.get(i)) > 0) {
max = left;
}
if (right < size && cmp.compare(data.get(right), data.get(max)) > 0) {
max = right;
}
if (i == max) {
break;
}
if (i != max) {
RowDataPacket tmp = data.get(i);
data.set(i, data.get(max));
data.set(max, tmp);
i = max;
}
}
}
// 递归版本
protected void heapifyRecursive(int i, int size) {
int l = left(i);
int r = right(i);
int max = i;
if (l < size && cmp.compare(data.get(l), data.get(i)) > 0) {
max = l;
}
if (r < size && cmp.compare(data.get(r), data.get(max)) > 0) {
max = r;
}
if (i == max) {
return;
}
swap(i, max);
heapifyRecursive(max, size);
}
private int right(int i) {
return (i + 1) << 1;
}
private int left(int i) {
return ((i + 1) << 1) - 1;
}
private void swap(int i, int j) {
RowDataPacket tmp = data.get(i);
RowDataPacket elementAt = data.get(j);
data.set(i, elementAt);
data.set(j, tmp);
}
@Override
public RowDataPacket getRoot() {
return data.get(0);
}
@Override
public void setRoot(RowDataPacket root) {
data.set(0, root);
heapifyRecursive(0, data.size());
}
@Override
public List<RowDataPacket> getData() {
return data;
}
@Override
public void add(RowDataPacket row) {
data.add(row);
}
@Override
public boolean addIfRequired(RowDataPacket row) {
// 淘汰堆里最小的数据
RowDataPacket root = getRoot();
if (cmp.compare(row, root) < 0) {
setRoot(row);
return true;
}
return false;
}
@Override
public void heapSort(int size) {
final int total = data.size();
// 容错处理
if (size <= 0 || size > total) {
size = total;
}
final int min = size == total ? 0 : (total - size - 1);
// 末尾与头交换,交换后调整最大堆
for (int i = total - 1; i > min; i--) {
swap(0, i);
heapifyRecursive(0, i);
}
}
}

View File

@@ -0,0 +1,107 @@
package io.mycat.sqlengine.mpp.tmp;
import java.util.ArrayList;
import java.util.List;
import io.mycat.net.mysql.RowDataPacket;
/**
* 最小堆排序,适用于倒序排序
*
* @author coderczp-2014-12-8
*/
public class MinHeap implements HeapItf {
private RowDataCmp cmp;
private List<RowDataPacket> data;
public MinHeap(RowDataCmp cmp, int size) {
this.cmp = cmp;
this.data = new ArrayList<>();
}
@Override
public void buildHeap() {
int len = data.size();
for (int i = len / 2 - 1; i >= 0; i--) {
heapify(i, len);
}
}
private void heapify(int i, int size) {
int l = left(i);
int r = right(i);
int smallest = i;
if (l < size && cmp.compare(data.get(l), data.get(i)) < 0) {
smallest = l;
}
if (r < size && cmp.compare(data.get(r), data.get(smallest)) < 0) {
smallest = r;
}
if (i == smallest) {
return;
}
swap(i, smallest);
heapify(smallest, size);
}
private int right(int i) {
return (i + 1) << 1;
}
private int left(int i) {
return ((i + 1) << 1) - 1;
}
private void swap(int i, int j) {
RowDataPacket tmp = data.get(i);
RowDataPacket elementAt = data.get(j);
data.set(i, elementAt);
data.set(j, tmp);
}
public RowDataPacket getRoot() {
return data.get(0);
}
public void setRoot(RowDataPacket root) {
data.set(0, root);
heapify(0, data.size());
}
public List<RowDataPacket> getData() {
return data;
}
public void add(RowDataPacket row) {
data.add(row);
}
@Override
public boolean addIfRequired(RowDataPacket row) {
// 淘汰堆里最小的数据
RowDataPacket root = getRoot();
if (cmp.compare(row, root) > 0) {
setRoot(row);
return true;
}
return false;
}
@Override
public void heapSort(int size) {
final int total = data.size();
//容错处理
if (size <= 0 || size > total) {
size = total;
}
final int min = size == total ? 0 : (total - size - 1);
//末尾与头交换,交换后调整最大堆
for (int i = total - 1; i > min; i--) {
swap(0, i);
heapify(0, i);
}
}
}

View File

@@ -0,0 +1,44 @@
package io.mycat.sqlengine.mpp.tmp;
import java.util.Comparator;
import io.mycat.net.mysql.RowDataPacket;
import io.mycat.sqlengine.mpp.OrderCol;
import io.mycat.sqlengine.mpp.RowDataPacketSorter;
/**
*
* @author coderczp-2014-12-8
*/
public class RowDataCmp implements Comparator<RowDataPacket> {
private OrderCol[] orderCols;
public RowDataCmp(OrderCol[] orderCols) {
this.orderCols = orderCols;
}
@Override
public int compare(RowDataPacket o1, RowDataPacket o2) {
OrderCol[] tmp = this.orderCols;
int cmp = 0;
int len = tmp.length;
//依次比较order by语句上的多个排序字段的值
int type = OrderCol.COL_ORDER_TYPE_ASC;
for (int i = 0; i < len; i++) {
int colIndex = tmp[i].colMeta.colIndex;
byte[] left = o1.fieldValues.get(colIndex);
byte[] right = o2.fieldValues.get(colIndex);
if (tmp[i].orderType == type) {
cmp = RowDataPacketSorter.compareObject(left, right, tmp[i]);
} else {
cmp = RowDataPacketSorter.compareObject(right, left, tmp[i]);
}
if (cmp != 0) {
return cmp;
}
}
return cmp;
}
}

View File

@@ -0,0 +1,83 @@
package io.mycat.sqlengine.mpp.tmp;
import java.util.*;
import io.mycat.net.mysql.RowDataPacket;
import io.mycat.sqlengine.mpp.OrderCol;
import io.mycat.sqlengine.mpp.RowDataPacketSorter;
/**
*
* @author coderczp-2014-12-8
*/
public class RowDataSorter extends RowDataPacketSorter {
// 记录总数(=offset+limit)
private volatile int total;
// 查询的记录数(=limit)
private volatile int size;
// 堆
private volatile HeapItf heap;
// 多列比较器
private volatile RowDataCmp cmp;
// 是否执行过buildHeap
private volatile boolean hasBuild;
public RowDataSorter(OrderCol[] orderCols) {
super(orderCols);
this.cmp = new RowDataCmp(orderCols);
}
public synchronized void setLimit(int start, int size) {
// 容错处理
if (start < 0) {
start = 0;
}
if (size <= 0) {
this.total = this.size = Integer.MAX_VALUE;
} else {
this.total = start + size;
this.size = size;
}
// 统一采用顺序order by 条件交给比较器去处理
this.heap = new MaxHeap(cmp, total);
}
@Override
public synchronized boolean addRow(RowDataPacket row) {
if (heap.getData().size() < total) {
heap.add(row);
return true;
}
// 堆已满,构建最大堆,并执行淘汰元素逻辑
if (heap.getData().size() == total && hasBuild == false) {
heap.buildHeap();
hasBuild = true;
}
return heap.addIfRequired(row);
}
@Override
public List<RowDataPacket> getSortedResult() {
final List<RowDataPacket> data = heap.getData();
if (data.size() < 2) {
return data;
}
if (total - size > data.size()) {
return Collections.emptyList();
}
// 构建最大堆并排序
if (!hasBuild) {
heap.buildHeap();
}
heap.heapSort(this.size);
return heap.getData();
}
public RowDataCmp getCmp() {
return cmp;
}
}