From 3f6e7133cb63a4a2fedbe185ca71b24ad21df835 Mon Sep 17 00:00:00 2001 From: Andy Arthur Date: Mon, 11 Oct 2021 12:26:51 -0700 Subject: [PATCH] refactored extracted interface as 'sequenceSplitter' --- go/store/types/apply_map_edits.go | 8 +++- go/store/types/blob.go | 21 ++++++--- go/store/types/list.go | 6 ++- go/store/types/list_editor.go | 8 +++- go/store/types/map.go | 12 +++--- go/store/types/meta_sequence.go | 4 +- go/store/types/rolling_value_hasher.go | 11 ++--- go/store/types/sequence_chunker.go | 59 +++++++++++++++----------- go/store/types/sequence_concat.go | 2 +- go/store/types/set.go | 2 +- 10 files changed, 84 insertions(+), 49 deletions(-) diff --git a/go/store/types/apply_map_edits.go b/go/store/types/apply_map_edits.go index 247bb6c5ab..ed275f408b 100644 --- a/go/store/types/apply_map_edits.go +++ b/go/store/types/apply_map_edits.go @@ -179,7 +179,7 @@ func ApplyNEdits(ctx context.Context, edits EditProvider, m Map, numEdits int64) if ch == nil { var err error - ch, err = newSequenceChunker(ctx, cur, 0, vrw, makeMapLeafChunkFn(vrw), newOrderedMetaSequenceChunkFn(MapKind, vrw), newMapChunker, mapHashValueBytes) + ch, err = newMapLeafChunkerFromCursor(ctx, cur, vrw) if ae.SetIfError(err) { continue @@ -420,3 +420,9 @@ func appendToWRes(ctx context.Context, wRes *mapWorkResult, cur *sequenceCursor, return nil } + +func newMapLeafChunkerFromCursor(ctx context.Context, cur *sequenceCursor, vrw ValueReadWriter) (*sequenceChunker, error) { + makeChunk := makeMapLeafChunkFn(vrw) + makeParentChunk := newOrderedMetaSequenceChunkFn(MapKind, vrw) + return newSequenceChunker(ctx, cur, 0, vrw, makeChunk, makeParentChunk, newMapChunker, mapHashValueBytes) +} diff --git a/go/store/types/blob.go b/go/store/types/blob.go index 9709677e8c..88303d0259 100644 --- a/go/store/types/blob.go +++ b/go/store/types/blob.go @@ -267,17 +267,19 @@ func (b Blob) Concat(ctx context.Context, other Blob) (Blob, error) { } func (b Blob) newChunker(ctx context.Context, cur *sequenceCursor, vrw ValueReadWriter) (*sequenceChunker, error) { - return newSequenceChunker(ctx, cur, 0, vrw, makeBlobLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(BlobKind, vrw), newBlobChunker, hashByte) + makeChunk := makeBlobLeafChunkFn(vrw) + parentMakeChunk := newIndexedMetaSequenceChunkFn(BlobKind, vrw) + return newSequenceChunker(ctx, cur, 0, vrw, makeChunk, parentMakeChunk, newBlobChunker, hashByte) } -func hashByte(item sequenceItem, c chunker) error { - return c.Write(func(bw *binaryNomsWriter) error { +func hashByte(item sequenceItem, sp sequenceSplitter) error { + return sp.Append(func(bw *binaryNomsWriter) error { bw.writeUint8(item.(byte)) return nil }) } -func newBlobChunker(nbf *NomsBinFormat, salt byte) chunker { +func newBlobChunker(nbf *NomsBinFormat, salt byte) sequenceSplitter { return newRollingByteHasher(nbf, salt) } @@ -343,6 +345,12 @@ func (cbr *BlobReader) Seek(offset int64, whence int) (int64, error) { return abs, nil } +func newEmptyBlobChunker(ctx context.Context, vrw ValueReadWriter) (*sequenceChunker, error) { + makeChunk := makeBlobLeafChunkFn(vrw) + makeParentChunk := newIndexedMetaSequenceChunkFn(BlobKind, vrw) + return newEmptySequenceChunker(ctx, vrw, makeChunk, makeParentChunk, newBlobChunker, hashByte) +} + func makeBlobLeafChunkFn(vrw ValueReadWriter) makeChunkFn { return func(level uint64, items []sequenceItem) (Collection, orderedKey, uint64, error) { d.PanicIfFalse(level == 0) @@ -430,13 +438,12 @@ func readBlobsP(ctx context.Context, vrw ValueReadWriter, rs ...io.Reader) (Blob } func readBlob(ctx context.Context, r io.Reader, vrw ValueReadWriter) (Blob, error) { - sc, err := newEmptySequenceChunker(ctx, vrw, makeBlobLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(BlobKind, vrw), newBlobChunker, hashByte) - + sc, err := newEmptyBlobChunker(ctx, vrw) if err != nil { return Blob{}, err } - // TODO: The code below is temporary. It's basically a custom leaf-level chunker for blobs. There are substational + // TODO: The code below is temporary. It's basically a custom leaf-level sequenceSplitter for blobs. There are substational // perf gains by doing it this way as it avoids the cost of boxing every single byte which is chunked. chunkBuff := [8192]byte{} chunkBytes := chunkBuff[:] diff --git a/go/store/types/list.go b/go/store/types/list.go index 1aa9f8a2ea..3491050ff1 100644 --- a/go/store/types/list.go +++ b/go/store/types/list.go @@ -485,10 +485,12 @@ func (l List) DiffWithLimit(ctx context.Context, last List, changes chan<- Splic } func (l List) newChunker(ctx context.Context, cur *sequenceCursor, vrw ValueReadWriter) (*sequenceChunker, error) { - return newSequenceChunker(ctx, cur, 0, vrw, makeListLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(ListKind, vrw), newListChunker, hashValueBytes) + makeChunk := makeListLeafChunkFn(vrw) + makeParentChunk := newIndexedMetaSequenceChunkFn(ListKind, vrw) + return newSequenceChunker(ctx, cur, 0, vrw, makeChunk, makeParentChunk, newListChunker, hashValueBytes) } -func newListChunker(nbf *NomsBinFormat, salt byte) chunker { +func newListChunker(nbf *NomsBinFormat, salt byte) sequenceSplitter { return newRollingValueHasher(nbf, salt) } diff --git a/go/store/types/list_editor.go b/go/store/types/list_editor.go index cc9321700b..7e46c48085 100644 --- a/go/store/types/list_editor.go +++ b/go/store/types/list_editor.go @@ -139,7 +139,7 @@ func (le *ListEditor) List(ctx context.Context) (List, error) { var err error if ch == nil { - ch, err = newSequenceChunker(ctx, cur, 0, vrw, makeListLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(ListKind, vrw), newListChunker, hashValueBytes) + ch, err = newListLeafChunker(ctx, cur, vrw) } else { err = ch.advanceTo(ctx, cur) } @@ -185,6 +185,12 @@ func (le *ListEditor) List(ctx context.Context) (List, error) { return newList(seq), nil } +func newListLeafChunker(ctx context.Context, cur *sequenceCursor, vrw ValueReadWriter) (*sequenceChunker, error) { + makeChunk := makeListLeafChunkFn(vrw) + makeParentChunk := newIndexedMetaSequenceChunkFn(ListKind, vrw) + return newSequenceChunker(ctx, cur, 0, vrw, makeChunk, makeParentChunk, newListChunker, hashValueBytes) +} + func collapseListEdit(newEdit, edit *listEdit) bool { if newEdit.idx+newEdit.removed < edit.idx || edit.idx+uint64(len(edit.inserted)) < newEdit.idx { diff --git a/go/store/types/map.go b/go/store/types/map.go index b045cbf259..f6b623a6f3 100644 --- a/go/store/types/map.go +++ b/go/store/types/map.go @@ -46,15 +46,15 @@ func newMap(seq orderedSequence) Map { return Map{seq} } -func mapHashValueBytes(item sequenceItem, c chunker) error { +func mapHashValueBytes(item sequenceItem, sp sequenceSplitter) error { entry := item.(mapEntry) - err := hashValueBytes(entry.key, c) + err := hashValueBytes(entry.key, sp) if err != nil { return err } - err = hashValueBytes(entry.value, c) + err = hashValueBytes(entry.value, sp) if err != nil { return err @@ -63,7 +63,7 @@ func mapHashValueBytes(item sequenceItem, c chunker) error { return nil } -func newMapChunker(nbf *NomsBinFormat, salt byte) chunker { +func newMapChunker(nbf *NomsBinFormat, salt byte) sequenceSplitter { return newRollingValueHasher(nbf, salt) } @@ -612,7 +612,9 @@ func makeMapLeafChunkFn(vrw ValueReadWriter) makeChunkFn { } func newEmptyMapSequenceChunker(ctx context.Context, vrw ValueReadWriter) (*sequenceChunker, error) { - return newEmptySequenceChunker(ctx, vrw, makeMapLeafChunkFn(vrw), newOrderedMetaSequenceChunkFn(MapKind, vrw), newMapChunker, mapHashValueBytes) + makeChunk := makeMapLeafChunkFn(vrw) + makeParentChunk := newOrderedMetaSequenceChunkFn(MapKind, vrw) + return newEmptySequenceChunker(ctx, vrw, makeChunk, makeParentChunk, newMapChunker, mapHashValueBytes) } func (m Map) readFrom(nbf *NomsBinFormat, b *binaryNomsReader) (Value, error) { diff --git a/go/store/types/meta_sequence.go b/go/store/types/meta_sequence.go index 5dcba40d40..2576682321 100644 --- a/go/store/types/meta_sequence.go +++ b/go/store/types/meta_sequence.go @@ -559,8 +559,8 @@ func (ms metaSequence) getChildren(ctx context.Context, start, end uint64) ([]se return seqs, err } -func metaHashValueBytes(item sequenceItem, c chunker) error { - return c.Write(func(bw *binaryNomsWriter) error { +func metaHashValueBytes(item sequenceItem, sp sequenceSplitter) error { + return sp.Append(func(bw *binaryNomsWriter) error { bw.writeRaw(item.(metaTuple).buff) return nil }) diff --git a/go/store/types/rolling_value_hasher.go b/go/store/types/rolling_value_hasher.go index 3102674814..87de605770 100644 --- a/go/store/types/rolling_value_hasher.go +++ b/go/store/types/rolling_value_hasher.go @@ -96,9 +96,9 @@ func newRollingValueHasher(nbf *NomsBinFormat, salt byte) *rollingValueHasher { return rv } -var _ chunker = &rollingValueHasher{} +var _ sequenceSplitter = &rollingValueHasher{} -func (rv *rollingValueHasher) Write(cb func(w *binaryNomsWriter) error) (err error) { +func (rv *rollingValueHasher) Append(cb func(w *binaryNomsWriter) error) (err error) { err = cb(&rv.bw) if err == nil { rv.sl.Update(rv.bw.data()) @@ -136,7 +136,8 @@ func (rv *rollingValueHasher) Reset() { rv.sl.Reset() } -// rollingByteHasher is a chunker for Blobs +// rollingByteHasher is a sequenceSplitter for Blobs. It directly hashes +// bytes streams without using Sloppy for pseudo-compression. type rollingByteHasher struct { bw binaryNomsWriter idx uint32 @@ -163,9 +164,9 @@ func newRollingByteHasher(nbf *NomsBinFormat, salt byte) *rollingByteHasher { return rb } -var _ chunker = &rollingByteHasher{} +var _ sequenceSplitter = &rollingByteHasher{} -func (bh *rollingByteHasher) Write(cb func(w *binaryNomsWriter) error) (err error) { +func (bh *rollingByteHasher) Append(cb func(w *binaryNomsWriter) error) (err error) { err = cb(&bh.bw) if err != nil { return err diff --git a/go/store/types/sequence_chunker.go b/go/store/types/sequence_chunker.go index b68d6a45a9..b06b2f7273 100644 --- a/go/store/types/sequence_chunker.go +++ b/go/store/types/sequence_chunker.go @@ -27,26 +27,37 @@ import ( "github.com/dolthub/dolt/go/store/d" ) -// chunker decides where byte streams should be split. -type chunker interface { - Write(func(bw *binaryNomsWriter) error) error - Nbf() *NomsBinFormat +// sequenceSplitter decides where sequences should be split into chunks. +type sequenceSplitter interface { + // Append provides more sequenceItems to the splitter. Callers pass a callback + // function that uses |bw| to serialize sequenceItems. Splitter's make chunk + // boundary decisions based on the contents of the byte buffer |bw.buff|. Upon + // return, callers can use |CrossedBoundary| to see if a chunk boundary has crossed. + Append(func(bw *binaryNomsWriter) error) error + + // CrossedBoundary returns true if the provided sequenceItems have caused a chunk + // boundary to be crossed. CrossedBoundary() bool + + // Reset clears the current byte buffer and resets the state of the splitter. Reset() + + // Nbf returns the splitter's NomsBinFormat. + Nbf() *NomsBinFormat } -func hashValueBytes(item sequenceItem, c chunker) error { - return c.Write(func(bw *binaryNomsWriter) error { +func hashValueBytes(item sequenceItem, sp sequenceSplitter) error { + return sp.Append(func(bw *binaryNomsWriter) error { v := item.(Value) - return v.writeTo(bw, c.Nbf()) + return v.writeTo(bw, sp.Nbf()) }) } -// newChunkerFn makes a chunker. -type newChunkerFn func(fmt *NomsBinFormat, salt byte) chunker +// newSplitterFn makes a sequenceSplitter. +type newSplitterFn func(fmt *NomsBinFormat, salt byte) sequenceSplitter -// hashValueBytesFn translates |item| into a byte stream to provide to |ch|. -type hashValueBytesFn func(item sequenceItem, ch chunker) error +// hashValueBytesFn translates |item| into a byte stream to provide to |sp|. +type hashValueBytesFn func(item sequenceItem, sp sequenceSplitter) error // makeChunkFn takes a sequence of items to chunk, and returns the result of chunking those items, // a tuple of a reference to that chunk which can itself be chunked + its underlying value. @@ -61,17 +72,17 @@ type sequenceChunker struct { makeChunk, parentMakeChunk makeChunkFn isLeaf bool hashValueBytes hashValueBytesFn - newCh newChunkerFn - ch chunker + newCh newSplitterFn + sp sequenceSplitter done bool unwrittenCol Collection } -func newEmptySequenceChunker(ctx context.Context, vrw ValueReadWriter, makeChunk, parentMakeChunk makeChunkFn, newCh newChunkerFn, hashValueBytes hashValueBytesFn) (*sequenceChunker, error) { +func newEmptySequenceChunker(ctx context.Context, vrw ValueReadWriter, makeChunk, parentMakeChunk makeChunkFn, newCh newSplitterFn, hashValueBytes hashValueBytesFn) (*sequenceChunker, error) { return newSequenceChunker(ctx, nil, uint64(0), vrw, makeChunk, parentMakeChunk, newCh, hashValueBytes) } -func newSequenceChunker(ctx context.Context, cur *sequenceCursor, level uint64, vrw ValueReadWriter, makeChunk, parentMakeChunk makeChunkFn, newCh newChunkerFn, hashValueBytes hashValueBytesFn) (*sequenceChunker, error) { +func newSequenceChunker(ctx context.Context, cur *sequenceCursor, level uint64, vrw ValueReadWriter, makeChunk, parentMakeChunk makeChunkFn, newCh newSplitterFn, hashValueBytes hashValueBytesFn) (*sequenceChunker, error) { d.PanicIfFalse(makeChunk != nil) d.PanicIfFalse(parentMakeChunk != nil) d.PanicIfFalse(hashValueBytes != nil) @@ -90,7 +101,7 @@ func newSequenceChunker(ctx context.Context, cur *sequenceCursor, level uint64, isLeaf: true, hashValueBytes: hashValueBytes, newCh: newCh, - ch: newCh(vrw.Format(), byte(level%256)), + sp: newCh(vrw.Format(), byte(level%256)), done: false, unwrittenCol: nil, } @@ -255,13 +266,13 @@ func (sc *sequenceChunker) advanceTo(ctx context.Context, next *sequenceCursor) func (sc *sequenceChunker) Append(ctx context.Context, item sequenceItem) (bool, error) { d.PanicIfTrue(item == nil) sc.current = append(sc.current, item) - err := sc.hashValueBytes(item, sc.ch) + err := sc.hashValueBytes(item, sc.sp) if err != nil { return false, err } - if sc.ch.CrossedBoundary() { + if sc.sp.CrossedBoundary() { // When a metaTuple contains a key that is so large that it causes a chunk boundary to be crossed simply by encoding // the metaTuple then we will create a metaTuple to encode the metaTuple containing the same key again, and again // crossing a chunk boundary causes infinite recursion. The solution is not to allow a metaTuple with a single @@ -312,7 +323,7 @@ func (sc *sequenceChunker) createParent(ctx context.Context) error { sc.parent.isLeaf = false if sc.unwrittenCol != nil { - // There is an unwritten collection, but this chunker now has a parent, so + // There is an unwritten collection, but this sequenceSplitter now has a parent, so // write it. See createSequence(). _, err := sc.vrw.WriteValue(ctx, sc.unwrittenCol) @@ -372,7 +383,7 @@ func (sc *sequenceChunker) createSequence(ctx context.Context, write bool) (sequ func (sc *sequenceChunker) handleChunkBoundary(ctx context.Context) error { d.PanicIfFalse(len(sc.current) > 0) - sc.ch.Reset() + sc.sp.Reset() if sc.parent == nil { err := sc.createParent(ctx) @@ -395,7 +406,7 @@ func (sc *sequenceChunker) handleChunkBoundary(ctx context.Context) error { return nil } -// Returns true if this chunker or any of its parents have any pending items in their |current| slice. +// Returns true if this sequenceSplitter or any of its parents have any pending items in their |current| slice. func (sc *sequenceChunker) anyPending() bool { if len(sc.current) > 0 { return true @@ -435,11 +446,11 @@ func (sc *sequenceChunker) Done(ctx context.Context) (sequence, error) { return sc.parent.Done(ctx) } - // At this point, we know this chunker contains, in |current| every item at this level of the resulting tree. To see this, consider that there are two ways a chunker can enter items into its |current|: (1) as the result of resume() with the cursor on anything other than the first item in the sequence, and (2) as a result of a child chunker hitting an explicit chunk boundary during either Append() or finalize(). The only way there can be no items in some parent chunker's |current| is if this chunker began with cursor within its first existing chunk (and thus all parents resume()'d with a cursor on their first item) and continued through all sebsequent items without creating any explicit chunk boundaries (and thus never sent any items up to a parent as a result of chunking). Therefore, this chunker's current must contain all items within the current sequence. + // At this point, we know this sequenceSplitter contains, in |current| every item at this level of the resulting tree. To see this, consider that there are two ways a sequenceSplitter can enter items into its |current|: (1) as the result of resume() with the cursor on anything other than the first item in the sequence, and (2) as a result of a child sequenceSplitter hitting an explicit chunk boundary during either Append() or finalize(). The only way there can be no items in some parent sequenceSplitter's |current| is if this sequenceSplitter began with cursor within its first existing chunk (and thus all parents resume()'d with a cursor on their first item) and continued through all sebsequent items without creating any explicit chunk boundaries (and thus never sent any items up to a parent as a result of chunking). Therefore, this sequenceSplitter's current must contain all items within the current sequence. // This level must represent *a* root of the tree, but it is possibly non-canonical. There are three cases to consider: - // (1) This is "leaf" chunker and thus produced tree of depth 1 which contains exactly one chunk (never hit a boundary), or (2) This in an internal node of the tree which contains multiple references to child nodes. In either case, this is the canonical root of the tree. + // (1) This is "leaf" sequenceSplitter and thus produced tree of depth 1 which contains exactly one chunk (never hit a boundary), or (2) This in an internal node of the tree which contains multiple references to child nodes. In either case, this is the canonical root of the tree. if sc.isLeaf || len(sc.current) > 1 { seq, _, err := sc.createSequence(ctx, false) @@ -450,7 +461,7 @@ func (sc *sequenceChunker) Done(ctx context.Context) (sequence, error) { return seq, nil } - // (3) This is an internal node of the tree which contains a single reference to a child node. This can occur if a non-leaf chunker happens to chunk on the first item (metaTuple) appended. In this case, this is the root of the tree, but it is *not* canonical and we must walk down until we find cases (1) or (2), above. + // (3) This is an internal node of the tree which contains a single reference to a child node. This can occur if a non-leaf sequenceSplitter happens to chunk on the first item (metaTuple) appended. In this case, this is the root of the tree, but it is *not* canonical and we must walk down until we find cases (1) or (2), above. d.PanicIfFalse(!sc.isLeaf && len(sc.current) == 1) mt := sc.current[0].(metaTuple) diff --git a/go/store/types/sequence_concat.go b/go/store/types/sequence_concat.go index 43221b92e3..a91dce51c8 100644 --- a/go/store/types/sequence_concat.go +++ b/go/store/types/sequence_concat.go @@ -73,7 +73,7 @@ func concat(ctx context.Context, fst, snd sequence, newSequenceChunker newSequen cur = cur.parent if cur != nil && ch.parent == nil { // If fst is shallower than snd, its cur will have a parent whereas the - // chunker to snd won't. In that case, create a parent for fst. + // sequenceSplitter to snd won't. In that case, create a parent for fst. err := ch.createParent(ctx) if err != nil { diff --git a/go/store/types/set.go b/go/store/types/set.go index 783de01b90..d61637ac0c 100644 --- a/go/store/types/set.go +++ b/go/store/types/set.go @@ -345,7 +345,7 @@ func newEmptySetSequenceChunker(ctx context.Context, vrw ValueReadWriter) (*sequ return newEmptySequenceChunker(ctx, vrw, makeSetLeafChunkFn(vrw), newOrderedMetaSequenceChunkFn(SetKind, vrw), newSetChunker, hashValueBytes) } -func newSetChunker(nbf *NomsBinFormat, salt byte) chunker { +func newSetChunker(nbf *NomsBinFormat, salt byte) sequenceSplitter { return newRollingValueHasher(nbf, salt) }