mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-26 03:30:09 -05:00
/go/store/types/map.go: replace VisitMapLevelOrder with memory efficient sized version
This commit is contained in:
+56
-38
@@ -25,11 +25,9 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/dolthub/dolt/go/store/d"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type ValueInRange func(Value) (bool, error)
|
||||
@@ -619,50 +617,70 @@ func (m Map) HumanReadableString() string {
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
// VisitMapLevelOrder writes hashes of internal node chunks to a writer
|
||||
// delimited with a newline character and returns the number or chunks written and the total number of
|
||||
// bytes written or an error if encountered
|
||||
func VisitMapLevelOrder(w io.Writer, m Map) (int64, int64, error) {
|
||||
// VisitMapLevelOrderSized passes hashes of internal node chunks to a callback in level order,
|
||||
// batching and flushing chunks to prevent large levels from consuming excessive memory. It returns
|
||||
// the total number of chunks and bytes read, or an error.
|
||||
func VisitMapLevelOrderSized(ms []Map, cb func(h hash.Hash) (int64, error)) (int64, int64, error) {
|
||||
if len(ms) == 0 {
|
||||
return 0, 0, nil
|
||||
}
|
||||
|
||||
chunkCount := int64(0)
|
||||
byteCount := int64(0)
|
||||
|
||||
curLevel := []Map{m}
|
||||
for len(curLevel) > 0 {
|
||||
nextLevel := []Map{}
|
||||
for _, m := range curLevel {
|
||||
if metaSeq, ok := m.orderedSequence.(metaSequence); ok {
|
||||
ts, err := metaSeq.tuples()
|
||||
chunkHashes := []hash.Hash{}
|
||||
chunkMaps := []Map{}
|
||||
|
||||
flush := func() error {
|
||||
for _, h := range chunkHashes {
|
||||
n, err := cb(h)
|
||||
byteCount += n
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
chunkCount += int64(len(chunkHashes))
|
||||
cc, bc, err := VisitMapLevelOrderSized(chunkMaps, cb)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
chunkCount += cc
|
||||
byteCount += bc
|
||||
chunkHashes = []hash.Hash{}
|
||||
chunkMaps = []Map{}
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, m := range ms {
|
||||
if metaSeq, ok := m.orderedSequence.(metaSequence); ok {
|
||||
ts, err := metaSeq.tuples()
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
for _, t := range ts {
|
||||
r, err := t.ref()
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
for _, t := range ts {
|
||||
r, err := t.ref()
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
p := []byte(r.TargetHash().String() + "\n")
|
||||
|
||||
n, err := w.Write(p)
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
chunkCount++
|
||||
byteCount += int64(n)
|
||||
|
||||
v, err := r.TargetValue(context.Background(), m.valueReadWriter())
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
nextLevel = append(nextLevel, v.(Map))
|
||||
chunkHashes = append(chunkHashes, r.TargetHash())
|
||||
v, err := r.TargetValue(context.Background(), m.valueReadWriter())
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
} else if _, ok := m.orderedSequence.(mapLeafSequence); ok {
|
||||
|
||||
chunkMaps = append(chunkMaps, v.(Map))
|
||||
}
|
||||
} else if _, ok := m.orderedSequence.(mapLeafSequence); ok {
|
||||
}
|
||||
if len(chunkHashes) >= 32768 {
|
||||
if err := flush(); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
}
|
||||
curLevel = nextLevel
|
||||
}
|
||||
|
||||
if err := flush(); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
return chunkCount, byteCount, nil
|
||||
|
||||
Reference in New Issue
Block a user