memory improvements

This commit is contained in:
Brian Hendriks
2019-04-13 14:33:53 -07:00
parent 368c927e62
commit 074bf08fad
9 changed files with 423 additions and 144 deletions

View File

@@ -12,15 +12,21 @@ func sorter(in, out chan KVPSlice) {
}
}
func merger(in chan [2]KVPSlice, out chan KVPSlice) {
func merger(in chan [2]*KVPCollection, out chan *KVPCollection) {
for {
slices, ok := <-in
colls, ok := <-in
if !ok {
return
}
res := slices[0].Merge(slices[1])
var res *KVPCollection
if colls[1] == nil {
res = colls[0]
} else {
res = colls[0].DestructiveMerge(colls[1])
}
out <- res
}
}
@@ -35,7 +41,7 @@ type AsyncSortedEdits struct {
doneChan chan bool
accumulating []KVP
sortedSlices [][]KVP
sortedColls []*KVPCollection
}
func NewAsyncSortedEdits(sliceSize, asyncConCurrency, sortConcurrency int) *AsyncSortedEdits {
@@ -61,7 +67,7 @@ func NewAsyncSortedEdits(sliceSize, asyncConCurrency, sortConcurrency int) *Asyn
resultChan: resChan,
doneChan: doneChan,
accumulating: make([]KVP, 0, sliceSize),
sortedSlices: nil}
sortedColls: nil}
}
func (ase *AsyncSortedEdits) Set(k, v types.Value) {
@@ -82,7 +88,9 @@ func (ase *AsyncSortedEdits) pollSortedSlices() {
for {
select {
case val := <-ase.resultChan:
ase.sortedSlices = append(ase.sortedSlices, val)
coll := NewKVPCollection(val)
ase.sortedColls = append(ase.sortedColls, coll)
default:
return
}
@@ -108,7 +116,8 @@ func (ase *AsyncSortedEdits) wait() {
for {
select {
case val := <-ase.resultChan:
ase.sortedSlices = append(ase.sortedSlices, val)
coll := NewKVPCollection(val)
ase.sortedColls = append(ase.sortedColls, coll)
case <-ase.doneChan:
running--
@@ -122,9 +131,10 @@ func (ase *AsyncSortedEdits) wait() {
}
func (ase *AsyncSortedEdits) Sort() {
sortDir := true
for len(ase.sortedSlices) > 2 {
pairs := pairSlices(ase.sortedSlices)
for len(ase.sortedColls) > 2 {
pairs := pairCollections(ase.sortedColls)
ase.sortedColls = nil
numPairs := len(pairs)
numGoRs := ase.sortConcurrency
@@ -132,8 +142,8 @@ func (ase *AsyncSortedEdits) Sort() {
numGoRs = numPairs
}
sortChan := make(chan [2]KVPSlice, numPairs)
resChan := make(chan KVPSlice, numPairs)
sortChan := make(chan [2]*KVPCollection, numPairs)
resChan := make(chan *KVPCollection, numPairs)
for i := 0; i < numGoRs; i++ {
go func() {
defer func() {
@@ -148,14 +158,13 @@ func (ase *AsyncSortedEdits) Sort() {
sortChan <- pair
}
ase.sortedSlices = nil
close(sortChan)
done := false
for !done {
select {
case val := <-resChan:
ase.sortedSlices = append(ase.sortedSlices, val)
ase.sortedColls = append(ase.sortedColls, val)
case <-ase.doneChan:
numGoRs--
@@ -166,69 +175,37 @@ func (ase *AsyncSortedEdits) Sort() {
}
}
}
sortDir = !sortDir
}
}
func pairSlices(slices [][]KVP) [][2]KVPSlice {
numSlices := len(slices)
pairs := make([][2]KVPSlice, 0, numSlices/2+1)
sort.Slice(slices, func(i, j int) bool {
return len(slices[i]) < len(slices[j])
func pairCollections(colls []*KVPCollection) [][2]*KVPCollection {
numColls := len(colls)
pairs := make([][2]*KVPCollection, 0, numColls/2+1)
sort.Slice(colls, func(i, j int) bool {
return colls[i].Size() < colls[j].Size()
})
if numSlices%2 == 1 {
x := slices[numSlices-1]
y := slices[numSlices-2]
z := slices[numSlices-3]
if numColls%2 == 1 {
pairs = append(pairs, [2]*KVPCollection{colls[numColls-1], nil})
middle := len(y) / 2
pairs = append(pairs, [2]KVPSlice{x, y[:middle]})
pairs = append(pairs, [2]KVPSlice{y[middle:], z})
slices = slices[:numSlices-3]
numSlices -= 3
colls = colls[:numColls-1]
numColls -= 1
}
for i, j := 0, numSlices-1; i < numSlices/2; i, j = i+1, j-1 {
pairs = append(pairs, [2]KVPSlice{slices[i], slices[j]})
for i, j := 0, numColls-1; i < numColls/2; i, j = i+1, j-1 {
pairs = append(pairs, [2]*KVPCollection{colls[i], colls[j]})
}
return pairs
}
func (ase *AsyncSortedEdits) Iterator() *SortedEditItr {
var left KVPSlice
var right KVPSlice
if len(ase.sortedSlices) > 0 {
left = ase.sortedSlices[0]
func (ase *AsyncSortedEdits) Iterator() KVPIterator {
switch len(ase.sortedColls) {
case 1:
return NewItr(ase.sortedColls[0])
case 2:
return NewSortedEditItr(ase.sortedColls[0], ase.sortedColls[1])
}
if len(ase.sortedSlices) > 1 {
right = ase.sortedSlices[1]
}
if len(ase.sortedSlices) > 2 {
panic("wtf")
}
return NewSortedEditItr(left, right)
}
func (ase *AsyncSortedEdits) PanicIfNotInOrder() {
itr := ase.Iterator()
prev := itr.Next()
for {
curr := itr.Next()
if !prev.Key.Less(curr.Key) {
panic("Not in order")
}
prev = curr
}
panic("Sort needs to be called prior to getting an Iterator.")
}

View File

@@ -1,59 +1,34 @@
package ase
import "github.com/attic-labs/noms/go/types"
type SortedEditItr struct {
left KVPSlice
right KVPSlice
lIdx int
rIdx int
lKey types.Value
rKey types.Value
leftItr *KVPCollItr
rightItr *KVPCollItr
done bool
}
func NewSortedEditItr(left, right KVPSlice) *SortedEditItr {
var lKey types.Value
var rKey types.Value
func NewSortedEditItr(left, right *KVPCollection) *SortedEditItr {
leftItr := NewItr(left)
rightItr := NewItr(right)
if left != nil {
lKey = left[0].Key
}
if right != nil {
rKey = right[0].Key
}
return &SortedEditItr{left, right, 0, 0, lKey, rKey}
return &SortedEditItr{leftItr, rightItr, false}
}
func (itr *SortedEditItr) Next() *KVP {
if itr.lKey == nil && itr.rKey == nil {
if itr.done {
return nil
}
if itr.rKey == nil || itr.lKey != nil && itr.lKey.Less(itr.rKey) {
idx := itr.lIdx
itr.lIdx++
if itr.lIdx < len(itr.left) {
itr.lKey = itr.left[itr.lIdx].Key
} else {
itr.lKey = nil
}
return &itr.left[idx]
} else {
idx := itr.rIdx
itr.rIdx++
if itr.rIdx < len(itr.right) {
itr.rKey = itr.right[itr.rIdx].Key
} else {
itr.rKey = nil
}
return &itr.right[idx]
lesser := itr.rightItr
if itr.leftItr.Less(itr.rightItr) {
lesser = itr.leftItr
}
kvp := lesser.Next()
itr.done = kvp == nil
return kvp
}
func (itr *SortedEditItr) Size() int {
return itr.leftItr.coll.totalSize + itr.rightItr.coll.totalSize
}

View File

@@ -1,6 +1,13 @@
package ase
import "github.com/attic-labs/noms/go/types"
import (
"fmt"
"github.com/attic-labs/noms/go/types"
)
type KVPIterator interface {
Next() *KVP
}
type KVP struct {
Key types.Value
@@ -21,40 +28,24 @@ func (kvps KVPSlice) Swap(i, j int) {
kvps[i], kvps[j] = kvps[j], kvps[i]
}
func (kvps KVPSlice) Merge(other KVPSlice) KVPSlice {
i := 0
j := 0
k := 0
x := kvps[i]
y := other[j]
dest := make(KVPSlice, len(kvps)+len(other))
func IsInOrder(itr KVPIterator) (bool, int) {
count := 1
prev := itr.Next()
for {
if x.Key.Less(y.Key) {
dest[k] = x
k++
i++
var curr *KVP
curr = itr.Next()
if i < len(kvps) {
x = kvps[i]
} else {
copy(dest[k:], other[j:])
break
}
} else {
dest[k] = y
k++
j++
if j < len(other) {
y = other[j]
} else {
copy(dest[k:], kvps[i:])
break
}
if curr == nil {
break
} else if !prev.Key.Less(curr.Key) && !prev.Key.Equals(curr.Key) {
fmt.Println(types.EncodedValue(prev.Key), ">=", types.EncodedValue(curr.Key))
return false, count
}
count++
prev = curr
}
return dest
return true, count
}

View File

@@ -0,0 +1,87 @@
package ase
import "sort"
type KVPCollBuilder struct {
filled []KVPSlice
toFill []KVPSlice
currSl KVPSlice
currSlSize int
currIdx int
numItems int
buffSize int
}
func NewKVPCollBuilder(buffSize int) *KVPCollBuilder {
buffs := []KVPSlice{make(KVPSlice, buffSize)}
currSl := make(KVPSlice, buffSize)
return &KVPCollBuilder{nil, buffs, currSl, buffSize, 0, 0, buffSize}
}
func (cb *KVPCollBuilder) AddBuffer(buff KVPSlice) {
if cap(buff) != cb.buffSize {
panic("All buffers should be created with the same capacity.")
}
cb.toFill = append(cb.toFill, buff[:cap(buff)])
sort.Slice(cb.toFill, func(i, j int) bool {
return len(cb.toFill[i]) < len(cb.toFill[j])
})
}
func (cb *KVPCollBuilder) AddKVP(kvp KVP) {
cb.currSl[cb.currIdx] = kvp
cb.currIdx++
if cb.currIdx == cb.currSlSize {
cb.doneWithCurrBuff()
}
}
func (cb *KVPCollBuilder) doneWithCurrBuff() {
cb.numItems += cb.currIdx
cb.filled = append(cb.filled, cb.currSl[:cb.currIdx])
cb.currIdx = 0
cb.currSl = cb.toFill[0]
cb.currSlSize = len(cb.currSl)
cb.toFill = cb.toFill[1:]
}
func (cb *KVPCollBuilder) MoveRemaining(itr *KVPCollItr) {
remInCurr := itr.currSlSize - itr.idx
remInDest := cb.currSlSize - cb.currIdx
if remInDest < remInCurr {
cb.doneWithCurrBuff()
}
copy(cb.currSl[cb.currIdx:], itr.currSl[itr.idx:])
cb.currIdx += remInCurr
cb.doneWithCurrBuff()
for itr.slIdx++; itr.slIdx < itr.coll.numSlices; itr.slIdx++ {
currSl := itr.coll.slices[itr.slIdx]
cb.filled = append(cb.filled, currSl)
cb.numItems += len(currSl)
}
}
func (cb *KVPCollBuilder) Build() *KVPCollection {
if cb.currIdx != 0 {
cb.doneWithCurrBuff()
}
maxSize := len(cb.filled[0])
for i := 1; i < len(cb.filled); i++ {
currSize := len(cb.filled[i])
if currSize > maxSize {
maxSize = currSize
}
}
return &KVPCollection{maxSize, len(cb.filled), cb.numItems, cb.filled}
}

View File

@@ -0,0 +1,69 @@
package ase
import "github.com/attic-labs/noms/go/types"
type KVPCollItr struct {
coll *KVPCollection
done bool
slIdx int
idx int
currSl KVPSlice
currSlSize int
currKey types.Value
}
func NewItr(coll *KVPCollection) *KVPCollItr {
firstSl := coll.slices[0]
firstKey := firstSl[0].Key
slSize := len(firstSl)
return &KVPCollItr{coll, false, 0, 0, firstSl, slSize, firstKey}
}
func (itr *KVPCollItr) Less(other *KVPCollItr) bool {
return other.currKey == nil || itr.currKey != nil && itr.currKey.Less(other.currKey)
}
func (itr *KVPCollItr) nextForDestructiveMerge() (*KVP, KVPSlice, bool) {
if itr.done {
return nil, nil, true
}
kvp := &itr.currSl[itr.idx]
itr.idx++
if itr.idx == itr.currSlSize {
exhausted := itr.currSl
itr.idx = 0
itr.slIdx++
if itr.slIdx < itr.coll.numSlices {
itr.currSl = itr.coll.slices[itr.slIdx]
itr.currSlSize = len(itr.currSl)
itr.currKey = itr.currSl[itr.idx].Key
} else {
itr.done = true
itr.currKey = nil
}
return kvp, exhausted, itr.done
}
itr.currKey = itr.currSl[itr.idx].Key
return kvp, nil, false
}
func (itr *KVPCollItr) Next() *KVP {
kvp, _, _ := itr.nextForDestructiveMerge()
return kvp
}
func (itr *KVPCollItr) Reset() {
itr.done = false
itr.slIdx = 0
itr.idx = 0
itr.currSl = itr.coll.slices[0]
itr.currSlSize = len(itr.currSl)
itr.currKey = itr.currSl[0].Key
}

View File

@@ -0,0 +1,79 @@
package ase
import (
"github.com/attic-labs/noms/go/types"
)
type KVPCollection struct {
buffSize int
numSlices int
totalSize int
slices []KVPSlice
}
func NewKVPCollection(sl KVPSlice) *KVPCollection {
return newKVPColl(cap(sl), 1, len(sl), []KVPSlice{sl})
}
func newKVPColl(maxSize, numSlices, totalSize int, slices []KVPSlice) *KVPCollection {
if slices == nil {
panic("invalid params")
}
return &KVPCollection{maxSize, numSlices, totalSize, slices}
}
func (coll *KVPCollection) String() string {
itr := coll.Iterator()
val := itr.Next()
keys := make([]types.Value, coll.totalSize)
for i := 0; val != nil; i++ {
keys[i] = val.Key
val = itr.Next()
}
tpl := types.NewTuple(keys...)
return types.EncodedValue(tpl)
}
func (coll *KVPCollection) Size() int {
return coll.totalSize
}
func (coll *KVPCollection) Iterator() *KVPCollItr {
return NewItr(coll)
}
func (left *KVPCollection) DestructiveMerge(right *KVPCollection) *KVPCollection {
if left.buffSize != right.buffSize {
panic("Cannot merge collections with varying buffer sizes.")
}
lItr := left.Iterator()
rItr := right.Iterator()
resBuilder := NewKVPCollBuilder(left.buffSize)
var done bool
var kvp *KVP
var exhaustedBuff KVPSlice
var currItr *KVPCollItr
var otherItr *KVPCollItr
for !done {
currItr, otherItr = rItr, lItr
if lItr.Less(rItr) {
currItr, otherItr = lItr, rItr
}
kvp, exhaustedBuff, done = currItr.nextForDestructiveMerge()
resBuilder.AddKVP(*kvp)
if exhaustedBuff != nil {
resBuilder.AddBuffer(exhaustedBuff)
}
}
resBuilder.MoveRemaining(otherItr)
return resBuilder.Build()
}

View File

@@ -0,0 +1,92 @@
package ase
import (
"github.com/attic-labs/noms/go/types"
"math/rand"
"sort"
"testing"
"time"
)
func TestKVPCollection(t *testing.T) {
rng := rand.New(rand.NewSource(0))
testKVPCollection(t, rng)
for i := 0; i < 64; i++ {
seed := time.Now().UnixNano()
t.Log(seed)
rng := rand.New(rand.NewSource(seed))
testKVPCollection(t, rng)
}
}
func testKVPCollection(t *testing.T, rng *rand.Rand) {
const (
maxSize = 1024
minSize = 4
maxColls = 128
minColls = 3
)
numColls := int(minColls + rng.Int31n(maxColls-minColls))
colls := make([]*KVPCollection, numColls)
size := int(minSize + rng.Int31n(maxSize-minSize))
t.Log("num collections:", numColls, "- buffer size", size)
for i := 0; i < numColls; i++ {
colls[i] = createKVPColl(rng, size)
}
for len(colls) > 1 {
for i, coll := range colls {
inOrder, _ := IsInOrder(NewItr(coll))
if !inOrder {
t.Fatal(i, "not in order")
}
}
var newColls []*KVPCollection
for i, j := 0, len(colls)-1; i <= j; i, j = i+1, j-1 {
if i == j {
newColls = append(newColls, colls[i])
} else {
s1 := colls[i].Size()
s2 := colls[j].Size()
//fmt.Print(colls[i].String(), "+", colls[j].String())
mergedColl := colls[i].DestructiveMerge(colls[j])
ms := mergedColl.Size()
if s1+s2 != ms {
t.Fatal("wrong size")
}
//fmt.Println("=", mergedColl.String())
newColls = append(newColls, mergedColl)
}
}
colls = newColls
}
inOrder, numItems := IsInOrder(NewItr(colls[0]))
if !inOrder {
t.Fatal("collection not in order")
} else if numItems != numColls*size {
t.Fatal("Unexpected size")
}
}
func createKVPColl(rng *rand.Rand, size int) *KVPCollection {
kvps := make(KVPSlice, size)
for i := 0; i < size; i++ {
kvps[i] = KVP{types.Uint(rng.Uint64() % 10000), types.NullValue}
}
sort.Stable(kvps)
return NewKVPCollection(kvps)
}

View File

@@ -36,6 +36,10 @@ func (msb *ASEBench) AddEdits(nextEdit NextEdit) {
func (msb *ASEBench) SortEdits() {
msb.ase.Sort()
itr := msb.ase.Iterator()
numItems, inOrder := ase.IsInOrder(itr)
log.Println("in order:", inOrder, "- num items:", numItems)
}
func (msb *ASEBench) Map() {

View File

@@ -22,6 +22,8 @@ func main() {
profPath := flag.String("profpath", "./", "")
cpuProf := flag.Bool("cpuprof", false, "")
memProf := flag.Bool("memprof", false, "")
meBench := flag.Bool("me-bench", false, "")
aseBench := flag.Bool("ase-bench", false, "")
count := flag.Int("n", 1000000, "")
flag.Parse()
@@ -37,10 +39,13 @@ func main() {
defer profile.Start(profile.MemProfile).Stop()
}
toBench := []MEBenchmark{
NewNomsMEBench(),
NewASEBench(50000, 2, 8),
//NewASEBench2(50000, 2, 8),
var toBench []MEBenchmark
if *meBench {
toBench = append(toBench, NewNomsMEBench())
}
if *aseBench {
toBench = append(toBench, NewASEBench(10000, 2, 8))
}
log.Printf("Running each benchmark for %d items\n", *count)