BlobEditor (#3599)

This commit is contained in:
Rafael Weinstein
2017-07-25 13:40:49 -07:00
committed by GitHub
parent cd0c27a565
commit f290a711c2
7 changed files with 414 additions and 37 deletions
+4 -20
View File
@@ -29,6 +29,10 @@ func NewEmptyBlob() Blob {
return Blob{newBlobLeafSequence(nil, []byte{}), &hash.Hash{}}
}
func (b Blob) Edit() *BlobEditor {
return NewBlobEditor(b)
}
// ReaderAt interface. Eagerly loads requested byte-range from the blob p-tree.
func (b Blob) ReadAt(p []byte, off int64) (n int, err error) {
// TODO: Support negative off?
@@ -128,26 +132,6 @@ func (b Blob) CopyReadAhead(w io.Writer, chunkSize uint64, concurrency int) (n i
return
}
func (b Blob) Splice(idx uint64, deleteCount uint64, data []byte) Blob {
if deleteCount == 0 && len(data) == 0 {
return b
}
d.PanicIfFalse(idx <= b.Len())
d.PanicIfFalse(idx+deleteCount <= b.Len())
ch := b.newChunker(newCursorAtIndex(b.seq, idx, false), b.seq.valueReader())
for deleteCount > 0 {
ch.Skip()
deleteCount--
}
for _, v := range data {
ch.Append(v)
}
return newBlob(ch.Done())
}
// Concat returns a new Blob comprised of this joined with other. It only needs
// to visit the rightmost prolly tree chunks of this Blob, and the leftmost
// prolly tree chunks of other, so it's efficient.
+310
View File
@@ -0,0 +1,310 @@
// Copyright 2017 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package types
import (
"errors"
"io"
"sync"
"github.com/attic-labs/noms/go/d"
)
type BlobEditor struct {
b Blob
edits *blobEdit
pos int64
}
func NewBlobEditor(b Blob) *BlobEditor {
return &BlobEditor{b, nil, 0}
}
func (be *BlobEditor) Kind() NomsKind {
return BlobKind
}
func (be *BlobEditor) Value(vrw ValueReadWriter) Value {
return be.Blob(vrw)
}
func (be *BlobEditor) Blob(vrw ValueReadWriter) Blob {
if be.edits == nil {
return be.b // no edits
}
seq := be.b.sequence()
vr := seq.valueReader()
if vrw != nil {
vr = vrw
}
curs := make([]chan *sequenceCursor, 0)
for edit := be.edits; edit != nil; edit = edit.next {
edit := edit
// TODO: Use ReadMany
cc := make(chan *sequenceCursor, 1)
curs = append(curs, cc)
go func() {
cc <- newCursorAtIndex(seq, edit.idx, false)
}()
}
var ch *sequenceChunker
idx := 0
for edit := be.edits; edit != nil; edit = edit.next {
cur := <-curs[idx]
idx++
if ch == nil {
ch = newSequenceChunker(cur, 0, vr, vrw, makeBlobLeafChunkFn(vr), newIndexedMetaSequenceChunkFn(BlobKind, vr), hashValueByte)
} else {
ch.advanceTo(cur)
}
dc := edit.removed
for dc > 0 {
ch.Skip()
dc--
}
for _, v := range edit.inserted {
ch.Append(v)
}
}
return newBlob(ch.Done())
}
func collapseBlobEdit(newEdit, edit *blobEdit) bool {
if newEdit.idx+newEdit.removed < edit.idx ||
edit.idx+uint64(len(edit.inserted)) < newEdit.idx {
return false
}
collapsed := &blobEdit{}
if newEdit.idx <= edit.idx {
collapsed.idx = newEdit.idx
overlap := newEdit.removed - (edit.idx - newEdit.idx) // number of leading N values removed from edit.inserted
if overlap < uint64(len(edit.inserted)) {
// newEdit doesn't remove all of edit.inserted
collapsed.inserted = append(newEdit.inserted, edit.inserted[overlap:]...)
collapsed.removed = newEdit.removed + edit.removed - overlap
} else {
// newEdit removes all of edit.inserted
collapsed.inserted = newEdit.inserted
collapsed.removed = newEdit.removed + edit.removed - uint64(len(edit.inserted))
}
} else {
// edit.idx < newEdit.idx
collapsed.idx = edit.idx
editInsertedLen := uint64(len(edit.inserted))
beginEditRemovePoint := newEdit.idx - edit.idx
if beginEditRemovePoint == editInsertedLen {
// newEdit took place at the position immediately after the last element of edit.inserted
collapsed.inserted = append(edit.inserted, newEdit.inserted...)
collapsed.removed = edit.removed + newEdit.removed
} else {
// newEdit takes place within edit.inserted
collapsed.inserted = append(collapsed.inserted, edit.inserted[:beginEditRemovePoint]...)
collapsed.inserted = append(collapsed.inserted, newEdit.inserted...)
endEditRemovePoint := beginEditRemovePoint + newEdit.removed
if endEditRemovePoint < editInsertedLen {
// elements of edit.inserted remain beyond newEdit.removed
collapsed.removed = edit.removed
collapsed.inserted = append(collapsed.inserted, edit.inserted[endEditRemovePoint:]...)
} else {
collapsed.removed = edit.removed + endEditRemovePoint - editInsertedLen
}
}
}
*newEdit = *collapsed
return true
}
func (be *BlobEditor) Len() uint64 {
delta := int64(0)
for edit := be.edits; edit != nil; edit = edit.next {
delta += -int64(edit.removed) + int64(len(edit.inserted))
}
return uint64(int64(be.b.Len()) + delta)
}
func (be *BlobEditor) Splice(idx uint64, deleteCount uint64, insert []byte) *BlobEditor {
ne := &blobEdit{idx, deleteCount, insert, nil}
var last *blobEdit
edit := be.edits
for edit != nil {
if collapseBlobEdit(ne, edit) {
if last == nil {
be.edits = edit.next
} else {
last.next = edit.next
}
edit = edit.next
continue
}
if edit.idx > ne.idx {
break
}
ne.idx = adjustBlobIdx(ne.idx, edit)
last = edit
edit = edit.next
}
if ne.removed == 0 && len(ne.inserted) == 0 {
return be // effectively removed 1 or more existing slices
}
if ne.idx > be.b.Len() {
d.Panic("Index Out Of Bounds")
}
if ne.idx == be.b.Len() && ne.removed > 0 {
d.Panic("Index Out Of Bounds")
}
if last == nil {
// Insert |ne| in first position
ne.next = be.edits
be.edits = ne
} else {
ne.next = last.next
last.next = ne
}
return be
}
func (be *BlobEditor) Seek(offset int64, whence int) (int64, error) {
abs := int64(be.pos)
switch whence {
case 0:
abs = offset
case 1:
abs += offset
case 2:
abs = int64(be.Len()) + offset
default:
return 0, errors.New("BlobEditor.Seek: invalid whence")
}
if abs < 0 {
return 0, errors.New("BlobEditor.Seek: negative position")
}
if uint64(abs) > be.Len() {
return 0, errors.New("BlobEditor.Seek: sparse blobs not supported")
}
be.pos = int64(abs)
return abs, nil
}
func (be *BlobEditor) Read(p []byte) (n int, err error) {
startIdx := uint64(be.pos)
endIdx := startIdx + uint64(len(p))
if endIdx > be.Len() {
endIdx = be.Len()
}
n = int(endIdx - startIdx)
if endIdx == be.Len() {
err = io.EOF
}
wg := &sync.WaitGroup{}
asyncReadAt := func(length uint64) {
idx := int64(startIdx)
to := p[:length]
wg.Add(1)
go func() {
be.b.ReadAt(to, idx)
wg.Done()
}()
startIdx += length
p = p[length:]
}
edit := be.edits
for edit != nil && startIdx < endIdx {
if edit.idx > startIdx {
// ReadAt the bytes before the current edit
end := endIdx
if endIdx > edit.idx {
end = edit.idx
}
asyncReadAt(end - startIdx)
continue
}
insertedLength := uint64(len(edit.inserted))
if edit.idx <= startIdx && startIdx < (edit.idx+insertedLength) {
// Copy bytes within the current edit
start := startIdx - edit.idx
end := endIdx - edit.idx
if end > insertedLength {
end = insertedLength
}
copy(p, edit.inserted[start:end])
p = p[end-start:]
startIdx += end - start
continue
}
startIdx = adjustBlobIdx(startIdx, edit)
endIdx = adjustBlobIdx(endIdx, edit)
edit = edit.next
}
if endIdx > startIdx {
// ReadAt any bytes beyond the final edit
asyncReadAt(endIdx - startIdx)
}
wg.Wait()
return
}
func (be *BlobEditor) Write(p []byte) (n int, err error) {
removeCount := uint64(len(p))
remaining := be.Len() - uint64(be.pos)
if remaining < removeCount {
removeCount = remaining
}
be.Splice(uint64(be.pos), removeCount, p)
return len(p), nil
}
func adjustBlobIdx(idx uint64, e *blobEdit) uint64 {
return idx + e.removed - uint64(len(e.inserted))
}
type blobEdit struct {
idx uint64
removed uint64
inserted []byte
next *blobEdit
}
+91
View File
@@ -0,0 +1,91 @@
// Copyright 2017 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package types
import (
"math/rand"
"testing"
"io/ioutil"
"bytes"
"io"
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/testify/assert"
)
func TestBlobReadWriteFuzzer(t *testing.T) {
rounds := 1024
operations := 512
flushEvery := 16
maxInsertCount := uint64(64)
ts := &chunks.TestStorage{}
cs := ts.NewView()
vs := newValueStoreWithCacheAndPending(cs, 0, 0)
r := rand.New(rand.NewSource(0))
nextRandInt := func(from, to uint64) uint64 {
return from + uint64(float64(to-from)*r.Float64())
}
cr := newCountingReader()
for i := 0; i < rounds; i++ {
b := NewBlob()
f, _ := ioutil.TempFile("", "buff")
be := b.Edit()
for j := 0; j < operations; j++ {
if j%2 == 1 {
// random read
idx := nextRandInt(0, be.Len())
l := nextRandInt(0, be.Len()-idx)
f.Seek(int64(idx), 0)
be.Seek(int64(idx), 0)
ex := make([]byte, l)
ac := make([]byte, l)
f.Read(ex)
be.Read(ac)
assert.True(t, bytes.Compare(ex, ac) == 0)
} else {
// randon write
idx := nextRandInt(0, be.Len())
f.Seek(int64(idx), 0)
be.Seek(int64(idx), 0)
l := nextRandInt(0, maxInsertCount)
data, err := ioutil.ReadAll(&io.LimitedReader{cr, int64(l)})
assert.NoError(t, err)
f.Write(data)
be.Write(data)
}
if j%flushEvery == 0 {
// Flush
b = be.Blob(vs)
be = b.Edit()
}
}
f.Sync()
b = be.Blob(vs)
f.Seek(0, 0)
info, err := f.Stat()
assert.NoError(t, err)
assert.True(t, uint64(info.Size()) == b.Len())
expect, err := ioutil.ReadAll(f)
assert.NoError(t, err)
actual := make([]byte, b.Len())
b.ReadAt(actual, 0)
assert.True(t, bytes.Compare(expect, actual) == 0)
}
}
+4 -4
View File
@@ -210,22 +210,22 @@ func TestBlobSplice(t *testing.T) {
blob := NewEmptyBlob()
buf := new(bytes.Buffer)
blob = blob.Splice(0, 0, []byte("I'll do anything"))
blob = blob.Edit().Splice(0, 0, []byte("I'll do anything")).Blob(nil)
buf.Reset()
buf.ReadFrom(blob.Reader())
assert.Equal(buf.String(), "I'll do anything")
blob = blob.Splice(16, 0, []byte(" for arv"))
blob = blob.Edit().Splice(16, 0, []byte(" for arv")).Blob(nil)
buf.Reset()
buf.ReadFrom(blob.Reader())
assert.Equal(buf.String(), "I'll do anything for arv")
blob = blob.Splice(0, 0, []byte("Yes, "))
blob = blob.Edit().Splice(0, 0, []byte("Yes, ")).Blob(nil)
buf.Reset()
buf.ReadFrom(blob.Reader())
assert.Equal(buf.String(), "Yes, I'll do anything for arv")
blob = blob.Splice(5, 20, []byte("it's hard to satisfy"))
blob = blob.Edit().Splice(5, 20, []byte("it's hard to satisfy")).Blob(nil)
buf.Reset()
buf.ReadFrom(blob.Reader())
assert.Equal(buf.String(), "Yes, it's hard to satisfy arv")
+2 -10
View File
@@ -5,7 +5,6 @@
package types
import (
"fmt"
"sync"
"github.com/attic-labs/noms/go/d"
@@ -117,7 +116,7 @@ func (le *ListEditor) List(vrw ValueReadWriter) List {
return newList(ch.Done())
}
func collapse(newEdit, edit *listEdit) bool {
func collapseListEdit(newEdit, edit *listEdit) bool {
if newEdit.idx+newEdit.removed < edit.idx ||
edit.idx+uint64(len(edit.inserted)) < newEdit.idx {
return false
@@ -179,13 +178,6 @@ func (le *ListEditor) Len() uint64 {
return uint64(int64(le.l.Len()) + delta)
}
func (le *ListEditor) dump() {
fmt.Println("ListEditor", le.Len())
for edit := le.edits; edit != nil; edit = edit.next {
fmt.Println("Edit", edit.idx, edit.removed, edit.inserted)
}
}
func (le *ListEditor) Splice(idx uint64, deleteCount uint64, vs ...Valuable) *ListEditor {
for _, sv := range vs {
d.PanicIfTrue(sv == nil)
@@ -197,7 +189,7 @@ func (le *ListEditor) Splice(idx uint64, deleteCount uint64, vs ...Valuable) *Li
edit := le.edits
for edit != nil {
if collapse(ne, edit) {
if collapseListEdit(ne, edit) {
if last == nil {
le.edits = edit.next
} else {
+1 -1
View File
@@ -175,7 +175,7 @@ func AsValuables(vs []Value) []Valuable {
return res
}
func TestSpliceFuzzer(t *testing.T) {
func TestListSpliceFuzzer(t *testing.T) {
startCount := 1000
rounds := 1000
splices := 100
+2 -2
View File
@@ -294,7 +294,7 @@ func (fs *nomsFS) Truncate(path string, size uint64, context *fuse.Context) fuse
ref := file.Get("data").(types.Ref)
blob := ref.TargetValue(fs.db).(types.Blob)
blob = blob.Splice(size, blob.Len()-size, nil)
blob = blob.Edit().Splice(size, blob.Len()-size, nil).Blob(nil)
ref = fs.db.WriteValue(blob)
file = file.Set("data", ref)
@@ -481,7 +481,7 @@ func (nfile nomsFile) Write(data []byte, off int64) (uint32, fuse.Status) {
del = ll - oo
}
blob = blob.Splice(uint64(off), del, data)
blob = blob.Edit().Splice(uint64(off), del, data).Blob(nil)
ref = nfile.fs.db.WriteValue(blob)
file = file.Set("data", ref)