mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-11 10:33:08 -06:00
Merge pull request #1861 from dolthub/db/visit-level-order
Db/visit level order
This commit is contained in:
@@ -25,11 +25,11 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/d"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
)
|
||||
|
||||
type ValueInRange func(Value) (bool, error)
|
||||
@@ -622,7 +622,7 @@ func (m Map) HumanReadableString() string {
|
||||
// VisitMapLevelOrder writes hashes of internal node chunks to a writer
|
||||
// delimited with a newline character and returns the number or chunks written and the total number of
|
||||
// bytes written or an error if encountered
|
||||
func VisitMapLevelOrder(w io.Writer, m Map) (int64, int64, error) {
|
||||
func VisitMapLevelOrder(m Map, cb func(h hash.Hash) (int64, error)) (int64, int64, error) {
|
||||
chunkCount := int64(0)
|
||||
byteCount := int64(0)
|
||||
|
||||
@@ -641,15 +641,13 @@ func VisitMapLevelOrder(w io.Writer, m Map) (int64, int64, error) {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
p := []byte(r.TargetHash().String() + "\n")
|
||||
|
||||
n, err := w.Write(p)
|
||||
n, err := cb(r.TargetHash())
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
chunkCount++
|
||||
byteCount += int64(n)
|
||||
byteCount += n
|
||||
|
||||
v, err := r.TargetValue(context.Background(), m.valueReadWriter())
|
||||
if err != nil {
|
||||
@@ -667,3 +665,75 @@ func VisitMapLevelOrder(w io.Writer, m Map) (int64, int64, error) {
|
||||
|
||||
return chunkCount, byteCount, nil
|
||||
}
|
||||
|
||||
// VisitMapLevelOrderSized passes hashes of internal node chunks to a callback in level order,
|
||||
// batching and flushing chunks to prevent large levels from consuming excessive memory. It returns
|
||||
// the total number of chunks and bytes read, or an error.
|
||||
func VisitMapLevelOrderSized(ms []Map, batchSize int, cb func(h hash.Hash) (int64, error)) (int64, int64, error) {
|
||||
if len(ms) == 0 {
|
||||
return 0, 0, nil
|
||||
}
|
||||
if batchSize < 0 {
|
||||
return 0, 0, errors.New("invalid batch size")
|
||||
}
|
||||
|
||||
chunkCount := int64(0)
|
||||
byteCount := int64(0)
|
||||
|
||||
chunkHashes := []hash.Hash{}
|
||||
chunkMaps := []Map{}
|
||||
|
||||
flush := func() error {
|
||||
for _, h := range chunkHashes {
|
||||
n, err := cb(h)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
byteCount += n
|
||||
}
|
||||
chunkCount += int64(len(chunkHashes))
|
||||
cc, bc, err := VisitMapLevelOrderSized(chunkMaps, batchSize, cb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
chunkCount += cc
|
||||
byteCount += bc
|
||||
chunkHashes = []hash.Hash{}
|
||||
chunkMaps = []Map{}
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, m := range ms {
|
||||
if metaSeq, ok := m.orderedSequence.(metaSequence); ok {
|
||||
ts, err := metaSeq.tuples()
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
for _, t := range ts {
|
||||
r, err := t.ref()
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
chunkHashes = append(chunkHashes, r.TargetHash())
|
||||
v, err := r.TargetValue(context.Background(), m.valueReadWriter())
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
chunkMaps = append(chunkMaps, v.(Map))
|
||||
}
|
||||
} else if _, ok := m.orderedSequence.(mapLeafSequence); ok {
|
||||
}
|
||||
if len(chunkHashes) >= batchSize {
|
||||
if err := flush(); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := flush(); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
return chunkCount, byteCount, nil
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -38,6 +39,7 @@ import (
|
||||
|
||||
"github.com/dolthub/dolt/go/store/chunks"
|
||||
"github.com/dolthub/dolt/go/store/d"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
)
|
||||
|
||||
const testMapSize = 8000
|
||||
@@ -2095,7 +2097,6 @@ func TestMapWithStructShouldHaveOptionalFields(t *testing.T) {
|
||||
|
||||
func TestMapWithNil(t *testing.T) {
|
||||
vrw := newTestValueStore()
|
||||
|
||||
assert.Panics(t, func() {
|
||||
NewMap(context.Background(), nil, Float(42))
|
||||
})
|
||||
@@ -2109,3 +2110,66 @@ func TestMapWithNil(t *testing.T) {
|
||||
NewSet(context.Background(), vrw, String("a"), String("b"), Float(42), nil)
|
||||
})
|
||||
}
|
||||
|
||||
func TestVisitMapLevelOrderSized(t *testing.T) {
|
||||
smallTestChunks()
|
||||
defer normalProductionChunks()
|
||||
|
||||
assert := assert.New(t)
|
||||
|
||||
tests := []struct {
|
||||
description string
|
||||
mapSize int
|
||||
batchSize int
|
||||
}{
|
||||
{
|
||||
description: "large batch",
|
||||
mapSize: testMapSize * 4,
|
||||
batchSize: 200,
|
||||
},
|
||||
{
|
||||
description: "medium batch",
|
||||
mapSize: testMapSize * 2,
|
||||
batchSize: 20,
|
||||
},
|
||||
{
|
||||
description: "small batch",
|
||||
mapSize: testMapSize,
|
||||
batchSize: 2,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.description, func(t *testing.T) {
|
||||
vrw := newTestValueStore()
|
||||
kvs := []Value{}
|
||||
for i := 0; i < test.mapSize; i++ {
|
||||
kvs = append(kvs, Float(i), Float(i+1))
|
||||
}
|
||||
|
||||
m, err := NewMap(context.Background(), vrw, kvs...)
|
||||
d.PanicIfError(err)
|
||||
|
||||
expectedChunkHashes := make([]hash.Hash, 0)
|
||||
_, _, err = VisitMapLevelOrder(m, func(h hash.Hash) (int64, error) {
|
||||
expectedChunkHashes = append(expectedChunkHashes, h)
|
||||
return 0, nil
|
||||
})
|
||||
d.PanicIfError(err)
|
||||
|
||||
actualChunkHashes := make([]hash.Hash, 0)
|
||||
_, _, err = VisitMapLevelOrderSized([]Map{m}, test.batchSize, func(h hash.Hash) (int64, error) {
|
||||
actualChunkHashes = append(actualChunkHashes, h)
|
||||
return 0, nil
|
||||
})
|
||||
d.PanicIfError(err)
|
||||
sort.Slice(expectedChunkHashes, func(i, j int) bool {
|
||||
return expectedChunkHashes[i].Less(expectedChunkHashes[j])
|
||||
})
|
||||
sort.Slice(actualChunkHashes, func(i, j int) bool {
|
||||
return actualChunkHashes[i].Less(actualChunkHashes[j])
|
||||
})
|
||||
assert.Equal(expectedChunkHashes, actualChunkHashes)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user