plumbed new chunker interface

This commit is contained in:
Andy Arthur
2021-10-11 11:49:27 -07:00
parent e9c5db3e6b
commit 87e1a32bf4
9 changed files with 129 additions and 36 deletions

View File

@@ -179,7 +179,7 @@ func ApplyNEdits(ctx context.Context, edits EditProvider, m Map, numEdits int64)
if ch == nil {
var err error
ch, err = newSequenceChunker(ctx, cur, 0, vrw, makeMapLeafChunkFn(vrw), newOrderedMetaSequenceChunkFn(MapKind, vrw), mapHashValueBytes)
ch, err = newSequenceChunker(ctx, cur, 0, vrw, makeMapLeafChunkFn(vrw), newOrderedMetaSequenceChunkFn(MapKind, vrw), newMapChunker, mapHashValueBytes)
if ae.SetIfError(err) {
continue

View File

@@ -267,7 +267,7 @@ func (b Blob) Concat(ctx context.Context, other Blob) (Blob, error) {
}
func (b Blob) newChunker(ctx context.Context, cur *sequenceCursor, vrw ValueReadWriter) (*sequenceChunker, error) {
return newSequenceChunker(ctx, cur, 0, vrw, makeBlobLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(BlobKind, vrw), hashByte)
return newSequenceChunker(ctx, cur, 0, vrw, makeBlobLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(BlobKind, vrw), newBlobChunker, hashByte)
}
func hashByte(item sequenceItem, c chunker) error {
@@ -277,6 +277,10 @@ func hashByte(item sequenceItem, c chunker) error {
})
}
func newBlobChunker(nbf *NomsBinFormat, salt byte) chunker {
return newRollingByteHasher(nbf, salt)
}
func (b Blob) asSequence() sequence {
return b.sequence
}
@@ -426,7 +430,7 @@ func readBlobsP(ctx context.Context, vrw ValueReadWriter, rs ...io.Reader) (Blob
}
func readBlob(ctx context.Context, r io.Reader, vrw ValueReadWriter) (Blob, error) {
sc, err := newEmptySequenceChunker(ctx, vrw, makeBlobLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(BlobKind, vrw), hashByte)
sc, err := newEmptySequenceChunker(ctx, vrw, makeBlobLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(BlobKind, vrw), newBlobChunker, hashByte)
if err != nil {
return Blob{}, err

View File

@@ -485,7 +485,11 @@ func (l List) DiffWithLimit(ctx context.Context, last List, changes chan<- Splic
}
func (l List) newChunker(ctx context.Context, cur *sequenceCursor, vrw ValueReadWriter) (*sequenceChunker, error) {
return newSequenceChunker(ctx, cur, 0, vrw, makeListLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(ListKind, vrw), hashValueBytes)
return newSequenceChunker(ctx, cur, 0, vrw, makeListLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(ListKind, vrw), newListChunker, hashValueBytes)
}
func newListChunker(nbf *NomsBinFormat, salt byte) chunker {
return newRollingValueHasher(nbf, salt)
}
func makeListLeafChunkFn(vrw ValueReadWriter) makeChunkFn {
@@ -514,7 +518,7 @@ func makeListLeafChunkFn(vrw ValueReadWriter) makeChunkFn {
}
func newEmptyListSequenceChunker(ctx context.Context, vrw ValueReadWriter) (*sequenceChunker, error) {
return newEmptySequenceChunker(ctx, vrw, makeListLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(ListKind, vrw), hashValueBytes)
return newEmptySequenceChunker(ctx, vrw, makeListLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(ListKind, vrw), newListChunker, hashValueBytes)
}
func (l List) readFrom(nbf *NomsBinFormat, b *binaryNomsReader) (Value, error) {

View File

@@ -139,7 +139,7 @@ func (le *ListEditor) List(ctx context.Context) (List, error) {
var err error
if ch == nil {
ch, err = newSequenceChunker(ctx, cur, 0, vrw, makeListLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(ListKind, vrw), hashValueBytes)
ch, err = newSequenceChunker(ctx, cur, 0, vrw, makeListLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(ListKind, vrw), newListChunker, hashValueBytes)
} else {
err = ch.advanceTo(ctx, cur)
}

View File

@@ -63,6 +63,10 @@ func mapHashValueBytes(item sequenceItem, c chunker) error {
return nil
}
func newMapChunker(nbf *NomsBinFormat, salt byte) chunker {
return newRollingValueHasher(nbf, salt)
}
func NewMap(ctx context.Context, vrw ValueReadWriter, kv ...Value) (Map, error) {
entries, err := buildMapData(vrw.Format(), kv)
@@ -608,7 +612,7 @@ func makeMapLeafChunkFn(vrw ValueReadWriter) makeChunkFn {
}
func newEmptyMapSequenceChunker(ctx context.Context, vrw ValueReadWriter) (*sequenceChunker, error) {
return newEmptySequenceChunker(ctx, vrw, makeMapLeafChunkFn(vrw), newOrderedMetaSequenceChunkFn(MapKind, vrw), mapHashValueBytes)
return newEmptySequenceChunker(ctx, vrw, makeMapLeafChunkFn(vrw), newOrderedMetaSequenceChunkFn(MapKind, vrw), newMapChunker, mapHashValueBytes)
}
func (m Map) readFrom(nbf *NomsBinFormat, b *binaryNomsReader) (Value, error) {

View File

@@ -106,6 +106,21 @@ func (rv *rollingValueHasher) Write(cb func(w *binaryNomsWriter) error) (err err
return
}
func (rv *rollingValueHasher) HashByte(b byte) bool {
return rv.hashByte(b, rv.bw.offset)
}
func (rv *rollingValueHasher) hashByte(b byte, offset uint32) bool {
if !rv.crossedBoundary {
rv.bz.HashByte(b ^ rv.salt)
rv.crossedBoundary = (rv.bz.Sum32()&rv.pattern == rv.pattern)
if offset > maxChunkSize {
rv.crossedBoundary = true
}
}
return rv.crossedBoundary
}
func (rv *rollingValueHasher) Nbf() *NomsBinFormat {
return rv.nbf
}
@@ -121,17 +136,71 @@ func (rv *rollingValueHasher) Reset() {
rv.sl.Reset()
}
func (rv *rollingValueHasher) HashByte(b byte) bool {
return rv.hashByte(b, rv.bw.offset)
// rollingByteHasher is a chunker for Blobs
type rollingByteHasher struct {
bw binaryNomsWriter
idx uint32
bz *buzhash.BuzHash
crossedBoundary bool
pattern, window uint32
salt byte
nbf *NomsBinFormat
}
func (rv *rollingValueHasher) hashByte(b byte, offset uint32) bool {
if !rv.crossedBoundary {
rv.bz.HashByte(b ^ rv.salt)
rv.crossedBoundary = (rv.bz.Sum32()&rv.pattern == rv.pattern)
func newRollingByteHasher(nbf *NomsBinFormat, salt byte) *rollingByteHasher {
pattern, window := chunkingConfig()
w := newBinaryNomsWriter()
rb := &rollingByteHasher{
bw: w,
bz: buzhash.NewBuzHash(window),
pattern: pattern,
window: window,
salt: salt,
nbf: nbf,
}
return rb
}
var _ chunker = &rollingByteHasher{}
func (bh *rollingByteHasher) Write(cb func(w *binaryNomsWriter) error) (err error) {
err = cb(&bh.bw)
if err != nil {
return err
}
for ; bh.idx < bh.bw.offset; bh.idx++ {
bh.hashByte(bh.bw.buff[bh.idx], bh.bw.offset)
}
return
}
func (bh *rollingByteHasher) hashByte(b byte, offset uint32) bool {
if !bh.crossedBoundary {
bh.bz.HashByte(b ^ bh.salt)
bh.crossedBoundary = (bh.bz.Sum32()&bh.pattern == bh.pattern)
if offset > maxChunkSize {
rv.crossedBoundary = true
bh.crossedBoundary = true
}
}
return rv.crossedBoundary
return bh.crossedBoundary
}
func (bh *rollingByteHasher) Nbf() *NomsBinFormat {
return bh.nbf
}
func (bh *rollingByteHasher) CrossedBoundary() bool {
return bh.crossedBoundary
}
func (bh *rollingByteHasher) Reset() {
bh.crossedBoundary = false
bh.bz = buzhash.NewBuzHash(bh.window)
bh.bw.reset()
bh.idx = 0
}

View File

@@ -42,7 +42,15 @@ func hashValueBytes(item sequenceItem, c chunker) error {
})
}
type hashValueBytesFn func(item sequenceItem, sl chunker) error
// newChunkerFn makes a chunker.
type newChunkerFn func(fmt *NomsBinFormat, salt byte) chunker
// hashValueBytesFn translates |item| into a byte stream to provide to |ch|.
type hashValueBytesFn func(item sequenceItem, ch chunker) error
// makeChunkFn takes a sequence of items to chunk, and returns the result of chunking those items,
// a tuple of a reference to that chunk which can itself be chunked + its underlying value.
type makeChunkFn func(level uint64, values []sequenceItem) (Collection, orderedKey, uint64, error)
type sequenceChunker struct {
cur *sequenceCursor
@@ -53,19 +61,17 @@ type sequenceChunker struct {
makeChunk, parentMakeChunk makeChunkFn
isLeaf bool
hashValueBytes hashValueBytesFn
newCh newChunkerFn
ch chunker
done bool
unwrittenCol Collection
}
// makeChunkFn takes a sequence of items to chunk, and returns the result of chunking those items, a tuple of a reference to that chunk which can itself be chunked + its underlying value.
type makeChunkFn func(level uint64, values []sequenceItem) (Collection, orderedKey, uint64, error)
func newEmptySequenceChunker(ctx context.Context, vrw ValueReadWriter, makeChunk, parentMakeChunk makeChunkFn, hashValueBytes hashValueBytesFn) (*sequenceChunker, error) {
return newSequenceChunker(ctx, nil, uint64(0), vrw, makeChunk, parentMakeChunk, hashValueBytes)
func newEmptySequenceChunker(ctx context.Context, vrw ValueReadWriter, makeChunk, parentMakeChunk makeChunkFn, newCh newChunkerFn, hashValueBytes hashValueBytesFn) (*sequenceChunker, error) {
return newSequenceChunker(ctx, nil, uint64(0), vrw, makeChunk, parentMakeChunk, newCh, hashValueBytes)
}
func newSequenceChunker(ctx context.Context, cur *sequenceCursor, level uint64, vrw ValueReadWriter, makeChunk, parentMakeChunk makeChunkFn, hashValueBytes hashValueBytesFn) (*sequenceChunker, error) {
func newSequenceChunker(ctx context.Context, cur *sequenceCursor, level uint64, vrw ValueReadWriter, makeChunk, parentMakeChunk makeChunkFn, newCh newChunkerFn, hashValueBytes hashValueBytesFn) (*sequenceChunker, error) {
d.PanicIfFalse(makeChunk != nil)
d.PanicIfFalse(parentMakeChunk != nil)
d.PanicIfFalse(hashValueBytes != nil)
@@ -74,17 +80,19 @@ func newSequenceChunker(ctx context.Context, cur *sequenceCursor, level uint64,
// |cur| will be nil if this is a new sequence, implying this is a new tree, or the tree has grown in height relative to its original chunked form.
sc := &sequenceChunker{
cur,
level,
vrw,
nil,
make([]sequenceItem, 0, 1<<10),
makeChunk, parentMakeChunk,
true,
hashValueBytes,
newRollingValueHasher(vrw.Format(), byte(level%256)),
false,
nil,
cur: cur,
level: level,
vrw: vrw,
parent: nil,
current: make([]sequenceItem, 0, 1<<10),
makeChunk: makeChunk,
parentMakeChunk: parentMakeChunk,
isLeaf: true,
hashValueBytes: hashValueBytes,
newCh: newCh,
ch: newCh(vrw.Format(), byte(level%256)),
done: false,
unwrittenCol: nil,
}
if cur != nil {
@@ -295,7 +303,7 @@ func (sc *sequenceChunker) createParent(ctx context.Context) error {
}
var err error
sc.parent, err = newSequenceChunker(ctx, parent, sc.level+1, sc.vrw, sc.parentMakeChunk, sc.parentMakeChunk, metaHashValueBytes)
sc.parent, err = newSequenceChunker(ctx, parent, sc.level+1, sc.vrw, sc.parentMakeChunk, sc.parentMakeChunk, sc.newCh, metaHashValueBytes)
if err != nil {
return err

View File

@@ -342,7 +342,11 @@ func makeSetLeafChunkFn(vrw ValueReadWriter) makeChunkFn {
}
func newEmptySetSequenceChunker(ctx context.Context, vrw ValueReadWriter) (*sequenceChunker, error) {
return newEmptySequenceChunker(ctx, vrw, makeSetLeafChunkFn(vrw), newOrderedMetaSequenceChunkFn(SetKind, vrw), hashValueBytes)
return newEmptySequenceChunker(ctx, vrw, makeSetLeafChunkFn(vrw), newOrderedMetaSequenceChunkFn(SetKind, vrw), newSetChunker, hashValueBytes)
}
func newSetChunker(nbf *NomsBinFormat, salt byte) chunker {
return newRollingValueHasher(nbf, salt)
}
func (s Set) readFrom(nbf *NomsBinFormat, b *binaryNomsReader) (Value, error) {

View File

@@ -138,7 +138,7 @@ func (se *SetEditor) Set(ctx context.Context) (Set, error) {
var err error
if ch == nil {
ch, err = newSequenceChunker(ctx, cur, 0, vrw, makeSetLeafChunkFn(vrw), newOrderedMetaSequenceChunkFn(SetKind, vrw), hashValueBytes)
ch, err = newSequenceChunker(ctx, cur, 0, vrw, makeSetLeafChunkFn(vrw), newOrderedMetaSequenceChunkFn(SetKind, vrw), newMapChunker, hashValueBytes)
} else {
err = ch.advanceTo(ctx, cur)
}