mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-09 18:59:12 -06:00
Add (temporary) custom leaf-chunker for blobs (#1911)
Add (temporary) custom leaf-chunker for blobs
This commit is contained in:
121
go/types/blob.go
121
go/types/blob.go
@@ -11,6 +11,9 @@ import (
|
||||
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
"github.com/attic-labs/noms/go/util/orderedparallel"
|
||||
|
||||
"github.com/kch42/buzhash"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -156,41 +159,99 @@ func newBlobLeafChunkFn(vr ValueReader, sink ValueWriter) makeChunkFn {
|
||||
buff[i] = v.(byte)
|
||||
}
|
||||
|
||||
seq := newBlobLeafSequence(vr, buff)
|
||||
blob := newBlob(seq)
|
||||
|
||||
var ref Ref
|
||||
var child Collection
|
||||
if sink != nil {
|
||||
// Eagerly write chunks
|
||||
ref = sink.WriteValue(blob)
|
||||
child = nil
|
||||
} else {
|
||||
ref = NewRef(blob)
|
||||
child = blob
|
||||
}
|
||||
|
||||
return newMetaTuple(ref, orderedKeyFromInt(len(buff)), uint64(len(buff)), child), seq
|
||||
return chunkBlobLeaf(vr, sink, buff)
|
||||
}
|
||||
}
|
||||
|
||||
func chunkBlobLeaf(vr ValueReader, sink ValueWriter, buff []byte) (metaTuple, sequence) {
|
||||
seq := newBlobLeafSequence(vr, buff)
|
||||
blob := newBlob(seq)
|
||||
|
||||
var ref Ref
|
||||
var child Collection
|
||||
if sink != nil {
|
||||
// Eagerly write chunks
|
||||
ref = sink.WriteValue(blob)
|
||||
child = nil
|
||||
} else {
|
||||
ref = NewRef(blob)
|
||||
child = blob
|
||||
}
|
||||
|
||||
return newMetaTuple(ref, orderedKeyFromInt(len(buff)), uint64(len(buff)), child), seq
|
||||
}
|
||||
|
||||
func NewBlob(r io.Reader) Blob {
|
||||
return NewStreamingBlob(r, nil)
|
||||
}
|
||||
|
||||
func NewStreamingBlob(r io.Reader, vrw ValueReadWriter) Blob {
|
||||
seq := newEmptySequenceChunker(newBlobLeafChunkFn(nil, vrw), newIndexedMetaSequenceChunkFn(BlobKind, nil, vrw), newBlobLeafBoundaryChecker(), newIndexedMetaSequenceBoundaryChecker)
|
||||
buf := [8192]byte{}
|
||||
for {
|
||||
n, err := r.Read(buf[:])
|
||||
for i := 0; i < n; i++ {
|
||||
seq.Append(buf[i])
|
||||
}
|
||||
if err != nil {
|
||||
d.Chk.True(io.EOF == err)
|
||||
break
|
||||
}
|
||||
}
|
||||
return newBlob(seq.Done().(indexedSequence))
|
||||
|
||||
type tempBlob struct {
|
||||
seq sequence
|
||||
mt metaTuple
|
||||
}
|
||||
|
||||
func NewStreamingBlob(r io.Reader, vrw ValueReadWriter) Blob {
|
||||
sc := newEmptySequenceChunker(newBlobLeafChunkFn(nil, vrw), newIndexedMetaSequenceChunkFn(BlobKind, nil, vrw), newBlobLeafBoundaryChecker(), newIndexedMetaSequenceBoundaryChecker)
|
||||
|
||||
// TODO: The code below is a temporary. It's basically a custom leaf-level chunker for blobs. There are substational perf gains by doing it this way as it avoids the cost of boxing every single byte which is chunked.
|
||||
chunkBuff := [8192]byte{}
|
||||
chunkBytes := chunkBuff[:]
|
||||
bh := buzhash.NewBuzHash(blobWindowSize)
|
||||
offset := 0
|
||||
addByte := func(b byte) bool {
|
||||
if offset >= len(chunkBytes) {
|
||||
tmp := make([]byte, len(chunkBytes)*2)
|
||||
copy(tmp, chunkBytes)
|
||||
chunkBytes = tmp
|
||||
}
|
||||
chunkBytes[offset] = b
|
||||
offset++
|
||||
bh.HashByte(b)
|
||||
return bh.Sum32()&blobPattern == blobPattern
|
||||
}
|
||||
|
||||
input := make(chan interface{}, 16)
|
||||
output := orderedparallel.New(input, func(item interface{}) interface{} {
|
||||
cp := item.([]byte)
|
||||
mt, seq := chunkBlobLeaf(vrw, vrw, cp)
|
||||
return tempBlob{seq, mt}
|
||||
}, 16)
|
||||
|
||||
makeChunk := func() {
|
||||
cp := make([]byte, offset)
|
||||
copy(cp, chunkBytes[0:offset])
|
||||
input <- cp
|
||||
offset = 0
|
||||
}
|
||||
|
||||
go func() {
|
||||
readBuff := [8192]byte{}
|
||||
for {
|
||||
n, err := r.Read(readBuff[:])
|
||||
for i := 0; i < n; i++ {
|
||||
if addByte(readBuff[i]) {
|
||||
makeChunk()
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
d.Chk.True(io.EOF == err)
|
||||
if offset > 0 {
|
||||
makeChunk()
|
||||
}
|
||||
close(input)
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for b := range output {
|
||||
tb := b.(tempBlob)
|
||||
sc.lastSeq = tb.seq
|
||||
if sc.parent == nil {
|
||||
sc.createParent()
|
||||
}
|
||||
sc.parent.Append(tb.mt)
|
||||
}
|
||||
|
||||
return newBlob(sc.Done().(indexedSequence))
|
||||
}
|
||||
|
||||
@@ -81,7 +81,7 @@ func main() {
|
||||
pr = progressreader.New(f, getStatusPrinter(s.Size()))
|
||||
}
|
||||
|
||||
b := types.NewBlob(pr)
|
||||
b := types.NewStreamingBlob(pr, ds.Database())
|
||||
ds, err = ds.Commit(b)
|
||||
if err != nil {
|
||||
d.Chk.True(datas.ErrMergeNeeded == err)
|
||||
|
||||
Reference in New Issue
Block a user