mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-11 02:59:34 -06:00
refactored extracted interface as 'sequenceSplitter'
This commit is contained in:
@@ -179,7 +179,7 @@ func ApplyNEdits(ctx context.Context, edits EditProvider, m Map, numEdits int64)
|
||||
|
||||
if ch == nil {
|
||||
var err error
|
||||
ch, err = newSequenceChunker(ctx, cur, 0, vrw, makeMapLeafChunkFn(vrw), newOrderedMetaSequenceChunkFn(MapKind, vrw), newMapChunker, mapHashValueBytes)
|
||||
ch, err = newMapLeafChunkerFromCursor(ctx, cur, vrw)
|
||||
|
||||
if ae.SetIfError(err) {
|
||||
continue
|
||||
@@ -420,3 +420,9 @@ func appendToWRes(ctx context.Context, wRes *mapWorkResult, cur *sequenceCursor,
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func newMapLeafChunkerFromCursor(ctx context.Context, cur *sequenceCursor, vrw ValueReadWriter) (*sequenceChunker, error) {
|
||||
makeChunk := makeMapLeafChunkFn(vrw)
|
||||
makeParentChunk := newOrderedMetaSequenceChunkFn(MapKind, vrw)
|
||||
return newSequenceChunker(ctx, cur, 0, vrw, makeChunk, makeParentChunk, newMapChunker, mapHashValueBytes)
|
||||
}
|
||||
|
||||
@@ -267,17 +267,19 @@ func (b Blob) Concat(ctx context.Context, other Blob) (Blob, error) {
|
||||
}
|
||||
|
||||
func (b Blob) newChunker(ctx context.Context, cur *sequenceCursor, vrw ValueReadWriter) (*sequenceChunker, error) {
|
||||
return newSequenceChunker(ctx, cur, 0, vrw, makeBlobLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(BlobKind, vrw), newBlobChunker, hashByte)
|
||||
makeChunk := makeBlobLeafChunkFn(vrw)
|
||||
parentMakeChunk := newIndexedMetaSequenceChunkFn(BlobKind, vrw)
|
||||
return newSequenceChunker(ctx, cur, 0, vrw, makeChunk, parentMakeChunk, newBlobChunker, hashByte)
|
||||
}
|
||||
|
||||
func hashByte(item sequenceItem, c chunker) error {
|
||||
return c.Write(func(bw *binaryNomsWriter) error {
|
||||
func hashByte(item sequenceItem, sp sequenceSplitter) error {
|
||||
return sp.Append(func(bw *binaryNomsWriter) error {
|
||||
bw.writeUint8(item.(byte))
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func newBlobChunker(nbf *NomsBinFormat, salt byte) chunker {
|
||||
func newBlobChunker(nbf *NomsBinFormat, salt byte) sequenceSplitter {
|
||||
return newRollingByteHasher(nbf, salt)
|
||||
}
|
||||
|
||||
@@ -343,6 +345,12 @@ func (cbr *BlobReader) Seek(offset int64, whence int) (int64, error) {
|
||||
return abs, nil
|
||||
}
|
||||
|
||||
func newEmptyBlobChunker(ctx context.Context, vrw ValueReadWriter) (*sequenceChunker, error) {
|
||||
makeChunk := makeBlobLeafChunkFn(vrw)
|
||||
makeParentChunk := newIndexedMetaSequenceChunkFn(BlobKind, vrw)
|
||||
return newEmptySequenceChunker(ctx, vrw, makeChunk, makeParentChunk, newBlobChunker, hashByte)
|
||||
}
|
||||
|
||||
func makeBlobLeafChunkFn(vrw ValueReadWriter) makeChunkFn {
|
||||
return func(level uint64, items []sequenceItem) (Collection, orderedKey, uint64, error) {
|
||||
d.PanicIfFalse(level == 0)
|
||||
@@ -430,13 +438,12 @@ func readBlobsP(ctx context.Context, vrw ValueReadWriter, rs ...io.Reader) (Blob
|
||||
}
|
||||
|
||||
func readBlob(ctx context.Context, r io.Reader, vrw ValueReadWriter) (Blob, error) {
|
||||
sc, err := newEmptySequenceChunker(ctx, vrw, makeBlobLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(BlobKind, vrw), newBlobChunker, hashByte)
|
||||
|
||||
sc, err := newEmptyBlobChunker(ctx, vrw)
|
||||
if err != nil {
|
||||
return Blob{}, err
|
||||
}
|
||||
|
||||
// TODO: The code below is temporary. It's basically a custom leaf-level chunker for blobs. There are substational
|
||||
// TODO: The code below is temporary. It's basically a custom leaf-level sequenceSplitter for blobs. There are substational
|
||||
// perf gains by doing it this way as it avoids the cost of boxing every single byte which is chunked.
|
||||
chunkBuff := [8192]byte{}
|
||||
chunkBytes := chunkBuff[:]
|
||||
|
||||
@@ -485,10 +485,12 @@ func (l List) DiffWithLimit(ctx context.Context, last List, changes chan<- Splic
|
||||
}
|
||||
|
||||
func (l List) newChunker(ctx context.Context, cur *sequenceCursor, vrw ValueReadWriter) (*sequenceChunker, error) {
|
||||
return newSequenceChunker(ctx, cur, 0, vrw, makeListLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(ListKind, vrw), newListChunker, hashValueBytes)
|
||||
makeChunk := makeListLeafChunkFn(vrw)
|
||||
makeParentChunk := newIndexedMetaSequenceChunkFn(ListKind, vrw)
|
||||
return newSequenceChunker(ctx, cur, 0, vrw, makeChunk, makeParentChunk, newListChunker, hashValueBytes)
|
||||
}
|
||||
|
||||
func newListChunker(nbf *NomsBinFormat, salt byte) chunker {
|
||||
func newListChunker(nbf *NomsBinFormat, salt byte) sequenceSplitter {
|
||||
return newRollingValueHasher(nbf, salt)
|
||||
}
|
||||
|
||||
|
||||
@@ -139,7 +139,7 @@ func (le *ListEditor) List(ctx context.Context) (List, error) {
|
||||
|
||||
var err error
|
||||
if ch == nil {
|
||||
ch, err = newSequenceChunker(ctx, cur, 0, vrw, makeListLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(ListKind, vrw), newListChunker, hashValueBytes)
|
||||
ch, err = newListLeafChunker(ctx, cur, vrw)
|
||||
} else {
|
||||
err = ch.advanceTo(ctx, cur)
|
||||
}
|
||||
@@ -185,6 +185,12 @@ func (le *ListEditor) List(ctx context.Context) (List, error) {
|
||||
return newList(seq), nil
|
||||
}
|
||||
|
||||
func newListLeafChunker(ctx context.Context, cur *sequenceCursor, vrw ValueReadWriter) (*sequenceChunker, error) {
|
||||
makeChunk := makeListLeafChunkFn(vrw)
|
||||
makeParentChunk := newIndexedMetaSequenceChunkFn(ListKind, vrw)
|
||||
return newSequenceChunker(ctx, cur, 0, vrw, makeChunk, makeParentChunk, newListChunker, hashValueBytes)
|
||||
}
|
||||
|
||||
func collapseListEdit(newEdit, edit *listEdit) bool {
|
||||
if newEdit.idx+newEdit.removed < edit.idx ||
|
||||
edit.idx+uint64(len(edit.inserted)) < newEdit.idx {
|
||||
|
||||
@@ -46,15 +46,15 @@ func newMap(seq orderedSequence) Map {
|
||||
return Map{seq}
|
||||
}
|
||||
|
||||
func mapHashValueBytes(item sequenceItem, c chunker) error {
|
||||
func mapHashValueBytes(item sequenceItem, sp sequenceSplitter) error {
|
||||
entry := item.(mapEntry)
|
||||
err := hashValueBytes(entry.key, c)
|
||||
err := hashValueBytes(entry.key, sp)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = hashValueBytes(entry.value, c)
|
||||
err = hashValueBytes(entry.value, sp)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -63,7 +63,7 @@ func mapHashValueBytes(item sequenceItem, c chunker) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func newMapChunker(nbf *NomsBinFormat, salt byte) chunker {
|
||||
func newMapChunker(nbf *NomsBinFormat, salt byte) sequenceSplitter {
|
||||
return newRollingValueHasher(nbf, salt)
|
||||
}
|
||||
|
||||
@@ -612,7 +612,9 @@ func makeMapLeafChunkFn(vrw ValueReadWriter) makeChunkFn {
|
||||
}
|
||||
|
||||
func newEmptyMapSequenceChunker(ctx context.Context, vrw ValueReadWriter) (*sequenceChunker, error) {
|
||||
return newEmptySequenceChunker(ctx, vrw, makeMapLeafChunkFn(vrw), newOrderedMetaSequenceChunkFn(MapKind, vrw), newMapChunker, mapHashValueBytes)
|
||||
makeChunk := makeMapLeafChunkFn(vrw)
|
||||
makeParentChunk := newOrderedMetaSequenceChunkFn(MapKind, vrw)
|
||||
return newEmptySequenceChunker(ctx, vrw, makeChunk, makeParentChunk, newMapChunker, mapHashValueBytes)
|
||||
}
|
||||
|
||||
func (m Map) readFrom(nbf *NomsBinFormat, b *binaryNomsReader) (Value, error) {
|
||||
|
||||
@@ -559,8 +559,8 @@ func (ms metaSequence) getChildren(ctx context.Context, start, end uint64) ([]se
|
||||
return seqs, err
|
||||
}
|
||||
|
||||
func metaHashValueBytes(item sequenceItem, c chunker) error {
|
||||
return c.Write(func(bw *binaryNomsWriter) error {
|
||||
func metaHashValueBytes(item sequenceItem, sp sequenceSplitter) error {
|
||||
return sp.Append(func(bw *binaryNomsWriter) error {
|
||||
bw.writeRaw(item.(metaTuple).buff)
|
||||
return nil
|
||||
})
|
||||
|
||||
@@ -96,9 +96,9 @@ func newRollingValueHasher(nbf *NomsBinFormat, salt byte) *rollingValueHasher {
|
||||
return rv
|
||||
}
|
||||
|
||||
var _ chunker = &rollingValueHasher{}
|
||||
var _ sequenceSplitter = &rollingValueHasher{}
|
||||
|
||||
func (rv *rollingValueHasher) Write(cb func(w *binaryNomsWriter) error) (err error) {
|
||||
func (rv *rollingValueHasher) Append(cb func(w *binaryNomsWriter) error) (err error) {
|
||||
err = cb(&rv.bw)
|
||||
if err == nil {
|
||||
rv.sl.Update(rv.bw.data())
|
||||
@@ -136,7 +136,8 @@ func (rv *rollingValueHasher) Reset() {
|
||||
rv.sl.Reset()
|
||||
}
|
||||
|
||||
// rollingByteHasher is a chunker for Blobs
|
||||
// rollingByteHasher is a sequenceSplitter for Blobs. It directly hashes
|
||||
// bytes streams without using Sloppy for pseudo-compression.
|
||||
type rollingByteHasher struct {
|
||||
bw binaryNomsWriter
|
||||
idx uint32
|
||||
@@ -163,9 +164,9 @@ func newRollingByteHasher(nbf *NomsBinFormat, salt byte) *rollingByteHasher {
|
||||
return rb
|
||||
}
|
||||
|
||||
var _ chunker = &rollingByteHasher{}
|
||||
var _ sequenceSplitter = &rollingByteHasher{}
|
||||
|
||||
func (bh *rollingByteHasher) Write(cb func(w *binaryNomsWriter) error) (err error) {
|
||||
func (bh *rollingByteHasher) Append(cb func(w *binaryNomsWriter) error) (err error) {
|
||||
err = cb(&bh.bw)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -27,26 +27,37 @@ import (
|
||||
"github.com/dolthub/dolt/go/store/d"
|
||||
)
|
||||
|
||||
// chunker decides where byte streams should be split.
|
||||
type chunker interface {
|
||||
Write(func(bw *binaryNomsWriter) error) error
|
||||
Nbf() *NomsBinFormat
|
||||
// sequenceSplitter decides where sequences should be split into chunks.
|
||||
type sequenceSplitter interface {
|
||||
// Append provides more sequenceItems to the splitter. Callers pass a callback
|
||||
// function that uses |bw| to serialize sequenceItems. Splitter's make chunk
|
||||
// boundary decisions based on the contents of the byte buffer |bw.buff|. Upon
|
||||
// return, callers can use |CrossedBoundary| to see if a chunk boundary has crossed.
|
||||
Append(func(bw *binaryNomsWriter) error) error
|
||||
|
||||
// CrossedBoundary returns true if the provided sequenceItems have caused a chunk
|
||||
// boundary to be crossed.
|
||||
CrossedBoundary() bool
|
||||
|
||||
// Reset clears the current byte buffer and resets the state of the splitter.
|
||||
Reset()
|
||||
|
||||
// Nbf returns the splitter's NomsBinFormat.
|
||||
Nbf() *NomsBinFormat
|
||||
}
|
||||
|
||||
func hashValueBytes(item sequenceItem, c chunker) error {
|
||||
return c.Write(func(bw *binaryNomsWriter) error {
|
||||
func hashValueBytes(item sequenceItem, sp sequenceSplitter) error {
|
||||
return sp.Append(func(bw *binaryNomsWriter) error {
|
||||
v := item.(Value)
|
||||
return v.writeTo(bw, c.Nbf())
|
||||
return v.writeTo(bw, sp.Nbf())
|
||||
})
|
||||
}
|
||||
|
||||
// newChunkerFn makes a chunker.
|
||||
type newChunkerFn func(fmt *NomsBinFormat, salt byte) chunker
|
||||
// newSplitterFn makes a sequenceSplitter.
|
||||
type newSplitterFn func(fmt *NomsBinFormat, salt byte) sequenceSplitter
|
||||
|
||||
// hashValueBytesFn translates |item| into a byte stream to provide to |ch|.
|
||||
type hashValueBytesFn func(item sequenceItem, ch chunker) error
|
||||
// hashValueBytesFn translates |item| into a byte stream to provide to |sp|.
|
||||
type hashValueBytesFn func(item sequenceItem, sp sequenceSplitter) error
|
||||
|
||||
// makeChunkFn takes a sequence of items to chunk, and returns the result of chunking those items,
|
||||
// a tuple of a reference to that chunk which can itself be chunked + its underlying value.
|
||||
@@ -61,17 +72,17 @@ type sequenceChunker struct {
|
||||
makeChunk, parentMakeChunk makeChunkFn
|
||||
isLeaf bool
|
||||
hashValueBytes hashValueBytesFn
|
||||
newCh newChunkerFn
|
||||
ch chunker
|
||||
newCh newSplitterFn
|
||||
sp sequenceSplitter
|
||||
done bool
|
||||
unwrittenCol Collection
|
||||
}
|
||||
|
||||
func newEmptySequenceChunker(ctx context.Context, vrw ValueReadWriter, makeChunk, parentMakeChunk makeChunkFn, newCh newChunkerFn, hashValueBytes hashValueBytesFn) (*sequenceChunker, error) {
|
||||
func newEmptySequenceChunker(ctx context.Context, vrw ValueReadWriter, makeChunk, parentMakeChunk makeChunkFn, newCh newSplitterFn, hashValueBytes hashValueBytesFn) (*sequenceChunker, error) {
|
||||
return newSequenceChunker(ctx, nil, uint64(0), vrw, makeChunk, parentMakeChunk, newCh, hashValueBytes)
|
||||
}
|
||||
|
||||
func newSequenceChunker(ctx context.Context, cur *sequenceCursor, level uint64, vrw ValueReadWriter, makeChunk, parentMakeChunk makeChunkFn, newCh newChunkerFn, hashValueBytes hashValueBytesFn) (*sequenceChunker, error) {
|
||||
func newSequenceChunker(ctx context.Context, cur *sequenceCursor, level uint64, vrw ValueReadWriter, makeChunk, parentMakeChunk makeChunkFn, newCh newSplitterFn, hashValueBytes hashValueBytesFn) (*sequenceChunker, error) {
|
||||
d.PanicIfFalse(makeChunk != nil)
|
||||
d.PanicIfFalse(parentMakeChunk != nil)
|
||||
d.PanicIfFalse(hashValueBytes != nil)
|
||||
@@ -90,7 +101,7 @@ func newSequenceChunker(ctx context.Context, cur *sequenceCursor, level uint64,
|
||||
isLeaf: true,
|
||||
hashValueBytes: hashValueBytes,
|
||||
newCh: newCh,
|
||||
ch: newCh(vrw.Format(), byte(level%256)),
|
||||
sp: newCh(vrw.Format(), byte(level%256)),
|
||||
done: false,
|
||||
unwrittenCol: nil,
|
||||
}
|
||||
@@ -255,13 +266,13 @@ func (sc *sequenceChunker) advanceTo(ctx context.Context, next *sequenceCursor)
|
||||
func (sc *sequenceChunker) Append(ctx context.Context, item sequenceItem) (bool, error) {
|
||||
d.PanicIfTrue(item == nil)
|
||||
sc.current = append(sc.current, item)
|
||||
err := sc.hashValueBytes(item, sc.ch)
|
||||
err := sc.hashValueBytes(item, sc.sp)
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if sc.ch.CrossedBoundary() {
|
||||
if sc.sp.CrossedBoundary() {
|
||||
// When a metaTuple contains a key that is so large that it causes a chunk boundary to be crossed simply by encoding
|
||||
// the metaTuple then we will create a metaTuple to encode the metaTuple containing the same key again, and again
|
||||
// crossing a chunk boundary causes infinite recursion. The solution is not to allow a metaTuple with a single
|
||||
@@ -312,7 +323,7 @@ func (sc *sequenceChunker) createParent(ctx context.Context) error {
|
||||
sc.parent.isLeaf = false
|
||||
|
||||
if sc.unwrittenCol != nil {
|
||||
// There is an unwritten collection, but this chunker now has a parent, so
|
||||
// There is an unwritten collection, but this sequenceSplitter now has a parent, so
|
||||
// write it. See createSequence().
|
||||
_, err := sc.vrw.WriteValue(ctx, sc.unwrittenCol)
|
||||
|
||||
@@ -372,7 +383,7 @@ func (sc *sequenceChunker) createSequence(ctx context.Context, write bool) (sequ
|
||||
|
||||
func (sc *sequenceChunker) handleChunkBoundary(ctx context.Context) error {
|
||||
d.PanicIfFalse(len(sc.current) > 0)
|
||||
sc.ch.Reset()
|
||||
sc.sp.Reset()
|
||||
if sc.parent == nil {
|
||||
err := sc.createParent(ctx)
|
||||
|
||||
@@ -395,7 +406,7 @@ func (sc *sequenceChunker) handleChunkBoundary(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Returns true if this chunker or any of its parents have any pending items in their |current| slice.
|
||||
// Returns true if this sequenceSplitter or any of its parents have any pending items in their |current| slice.
|
||||
func (sc *sequenceChunker) anyPending() bool {
|
||||
if len(sc.current) > 0 {
|
||||
return true
|
||||
@@ -435,11 +446,11 @@ func (sc *sequenceChunker) Done(ctx context.Context) (sequence, error) {
|
||||
return sc.parent.Done(ctx)
|
||||
}
|
||||
|
||||
// At this point, we know this chunker contains, in |current| every item at this level of the resulting tree. To see this, consider that there are two ways a chunker can enter items into its |current|: (1) as the result of resume() with the cursor on anything other than the first item in the sequence, and (2) as a result of a child chunker hitting an explicit chunk boundary during either Append() or finalize(). The only way there can be no items in some parent chunker's |current| is if this chunker began with cursor within its first existing chunk (and thus all parents resume()'d with a cursor on their first item) and continued through all sebsequent items without creating any explicit chunk boundaries (and thus never sent any items up to a parent as a result of chunking). Therefore, this chunker's current must contain all items within the current sequence.
|
||||
// At this point, we know this sequenceSplitter contains, in |current| every item at this level of the resulting tree. To see this, consider that there are two ways a sequenceSplitter can enter items into its |current|: (1) as the result of resume() with the cursor on anything other than the first item in the sequence, and (2) as a result of a child sequenceSplitter hitting an explicit chunk boundary during either Append() or finalize(). The only way there can be no items in some parent sequenceSplitter's |current| is if this sequenceSplitter began with cursor within its first existing chunk (and thus all parents resume()'d with a cursor on their first item) and continued through all sebsequent items without creating any explicit chunk boundaries (and thus never sent any items up to a parent as a result of chunking). Therefore, this sequenceSplitter's current must contain all items within the current sequence.
|
||||
|
||||
// This level must represent *a* root of the tree, but it is possibly non-canonical. There are three cases to consider:
|
||||
|
||||
// (1) This is "leaf" chunker and thus produced tree of depth 1 which contains exactly one chunk (never hit a boundary), or (2) This in an internal node of the tree which contains multiple references to child nodes. In either case, this is the canonical root of the tree.
|
||||
// (1) This is "leaf" sequenceSplitter and thus produced tree of depth 1 which contains exactly one chunk (never hit a boundary), or (2) This in an internal node of the tree which contains multiple references to child nodes. In either case, this is the canonical root of the tree.
|
||||
if sc.isLeaf || len(sc.current) > 1 {
|
||||
seq, _, err := sc.createSequence(ctx, false)
|
||||
|
||||
@@ -450,7 +461,7 @@ func (sc *sequenceChunker) Done(ctx context.Context) (sequence, error) {
|
||||
return seq, nil
|
||||
}
|
||||
|
||||
// (3) This is an internal node of the tree which contains a single reference to a child node. This can occur if a non-leaf chunker happens to chunk on the first item (metaTuple) appended. In this case, this is the root of the tree, but it is *not* canonical and we must walk down until we find cases (1) or (2), above.
|
||||
// (3) This is an internal node of the tree which contains a single reference to a child node. This can occur if a non-leaf sequenceSplitter happens to chunk on the first item (metaTuple) appended. In this case, this is the root of the tree, but it is *not* canonical and we must walk down until we find cases (1) or (2), above.
|
||||
d.PanicIfFalse(!sc.isLeaf && len(sc.current) == 1)
|
||||
mt := sc.current[0].(metaTuple)
|
||||
|
||||
|
||||
@@ -73,7 +73,7 @@ func concat(ctx context.Context, fst, snd sequence, newSequenceChunker newSequen
|
||||
cur = cur.parent
|
||||
if cur != nil && ch.parent == nil {
|
||||
// If fst is shallower than snd, its cur will have a parent whereas the
|
||||
// chunker to snd won't. In that case, create a parent for fst.
|
||||
// sequenceSplitter to snd won't. In that case, create a parent for fst.
|
||||
err := ch.createParent(ctx)
|
||||
|
||||
if err != nil {
|
||||
|
||||
@@ -345,7 +345,7 @@ func newEmptySetSequenceChunker(ctx context.Context, vrw ValueReadWriter) (*sequ
|
||||
return newEmptySequenceChunker(ctx, vrw, makeSetLeafChunkFn(vrw), newOrderedMetaSequenceChunkFn(SetKind, vrw), newSetChunker, hashValueBytes)
|
||||
}
|
||||
|
||||
func newSetChunker(nbf *NomsBinFormat, salt byte) chunker {
|
||||
func newSetChunker(nbf *NomsBinFormat, salt byte) sequenceSplitter {
|
||||
return newRollingValueHasher(nbf, salt)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user