go read ahead (#3046)

This commit is contained in:
Rafael Weinstein
2017-01-10 15:28:26 -08:00
committed by GitHub
parent c3427a4364
commit 01303a828d
22 changed files with 338 additions and 418 deletions
+7 -1
View File
@@ -6,6 +6,7 @@ package migration
import (
"fmt"
"io"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/types"
@@ -22,7 +23,12 @@ func MigrateFromVersion7(source v7types.Value, sourceStore v7types.ValueReadWrit
case v7types.String:
return types.String(string(source)), nil
case v7types.Blob:
return types.NewStreamingBlob(sourceStore, source.Reader()), nil
preader, pwriter := io.Pipe()
go func() {
source.Reader().Copy(pwriter)
pwriter.Close()
}()
return types.NewStreamingBlob(sourceStore, preader), nil
case v7types.List:
vc := make(chan types.Value, 1024)
lc := types.NewStreamingList(sinkStore, vc)
+3 -2
View File
@@ -8,7 +8,6 @@ import (
"bytes"
"encoding/binary"
"fmt"
"io/ioutil"
"time"
"github.com/attic-labs/noms/go/chunks"
@@ -104,7 +103,9 @@ func main() {
ds = db.GetDataset("test")
t1 = time.Now()
blob = ds.HeadValue().(types.Blob)
outBytes, _ := ioutil.ReadAll(blob.Reader())
buff := &bytes.Buffer{}
blob.Reader().Copy(buff)
outBytes := buff.Bytes()
readDuration := time.Since(t1)
d.PanicIfFalse(bytes.Compare(blobBytes, outBytes) == 0)
fmt.Printf("\t\t\t%s\t\t%s\n\n", rate(buildDuration, *blobSize), rate(readDuration, *blobSize))
+48 -14
View File
@@ -30,10 +30,9 @@ func NewEmptyBlob() Blob {
}
// BUG 155 - Should provide Write... Maybe even have Blob implement ReadWriteSeeker
func (b Blob) Reader() io.ReadSeeker {
iter := newSequenceIterator(b.seq, 0)
return &BlobReader{b.seq, iter, nil, 0}
func (b Blob) Reader() *BlobReader {
cursor := newCursorAtIndex(b.seq, 0, true)
return &BlobReader{b.seq, cursor, nil, 0}
}
func (b Blob) Splice(idx uint64, deleteCount uint64, data []byte) Blob {
@@ -44,7 +43,7 @@ func (b Blob) Splice(idx uint64, deleteCount uint64, data []byte) Blob {
d.PanicIfFalse(idx <= b.Len())
d.PanicIfFalse(idx+deleteCount <= b.Len())
ch := b.newChunker(newCursorAtIndex(b.seq, idx), b.seq.valueReader())
ch := b.newChunker(newCursorAtIndex(b.seq, idx, false), b.seq.valueReader())
for deleteCount > 0 {
ch.Skip()
deleteCount--
@@ -117,7 +116,7 @@ func (b Blob) Type() *Type {
type BlobReader struct {
seq sequence
iter *sequenceIterator
cursor *sequenceCursor
currentReader io.ReadSeeker
pos uint64
}
@@ -128,9 +127,11 @@ func (cbr *BlobReader) Read(p []byte) (n int, err error) {
}
n, err = cbr.currentReader.Read(p)
cbr.pos += uint64(n)
hasMore := cbr.iter.advance(n)
if err == io.EOF && hasMore {
for i := 0; i < n; i++ {
cbr.pos++
cbr.cursor.advance()
}
if err == io.EOF && cbr.cursor.idx < cbr.cursor.seq.seqLen() {
cbr.currentReader = nil
err = nil
}
@@ -138,6 +139,41 @@ func (cbr *BlobReader) Read(p []byte) (n int, err error) {
return
}
func (cbr BlobReader) Copy(w io.Writer) (n int64) {
if cbr.cursor.parent == nil {
data := cbr.cursor.seq.(blobLeafSequence).data
n, err := io.Copy(w, bytes.NewReader(data))
d.Chk.NoError(err)
d.Chk.True(n == int64(len(data)))
return n
}
curChan := make(chan chan *sequenceCursor, 30)
stopChan := make(chan struct{})
go func() {
readAheadLeafCursors(cbr.cursor, curChan, stopChan)
close(curChan)
}()
// Copy the bytes of each leaf
for ch := range curChan {
leafCur := <-ch
parentCur := leafCur.parent
d.Chk.NotNil(parentCur.childSeqs)
// This is hacky, but iterating over the each of these preloaded cursors actual dramatically
// degrades perf. Just reach in and grab the leaf sequences
for _, leaf := range parentCur.childSeqs {
data := leaf.(blobLeafSequence).data
n, err := io.Copy(w, bytes.NewReader(data))
d.Chk.NoError(err)
d.Chk.True(n == int64(len(data)))
}
}
return
}
func (cbr *BlobReader) Seek(offset int64, whence int) (int64, error) {
abs := int64(cbr.pos)
@@ -157,16 +193,14 @@ func (cbr *BlobReader) Seek(offset int64, whence int) (int64, error) {
}
cbr.pos = uint64(abs)
cbr.iter = newSequenceIterator(cbr.seq, cbr.pos)
cbr.cursor = newCursorAtIndex(cbr.seq, cbr.pos, true)
cbr.currentReader = nil
return abs, nil
}
func (cbr *BlobReader) updateReader() {
chunk, idx := cbr.iter.chunkAndIndex()
data := chunk.(blobLeafSequence).data
cbr.currentReader = bytes.NewReader(data)
cbr.currentReader.Seek(int64(idx), 0)
cbr.currentReader = bytes.NewReader(cbr.cursor.seq.(blobLeafSequence).data)
cbr.currentReader.Seek(int64(cbr.cursor.idx), 0)
}
func makeBlobLeafChunkFn(vr ValueReader) makeChunkFn {
+22 -3
View File
@@ -77,9 +77,9 @@ func newBlobTestSuite(size uint, expectRefStr string, expectChunkCount int, expe
expectAppendChunkDiff: expectAppendChunkDiff,
validate: func(v2 Collection) bool {
b2 := v2.(Blob)
out := make([]byte, length)
io.ReadFull(b2.Reader(), out)
return bytes.Compare(out, buff) == 0
outBuff := &bytes.Buffer{}
b2.Reader().Copy(outBuff)
return bytes.Compare(outBuff.Bytes(), buff) == 0
},
prependOne: func() Collection {
dup := make([]byte, length+1)
@@ -308,3 +308,22 @@ func TestBlobNewParallel(t *testing.T) {
b = NewBlob(readers...)
assert.Equal(data, readAll(b))
}
func TestStreamingParallelBlob(t *testing.T) {
assert := assert.New(t)
buff := randomBuff(1 << 26 /* 64MB */)
chunks := 4
readers := make([]io.Reader, chunks)
chunkSize := len(buff) / chunks
for i := 0; i < len(readers); i++ {
readers[i] = bytes.NewReader(buff[i*chunkSize : (i+1)*chunkSize])
}
vs := NewTestValueStore()
blob := NewStreamingBlob(vs, readers...)
outBuff := &bytes.Buffer{}
blob.Reader().Copy(outBuff)
assert.True(bytes.Compare(buff, outBuff.Bytes()) == 0)
}
+8 -8
View File
@@ -109,7 +109,7 @@ func (l List) Type() *Type {
// descend into the prolly-tree which leads to Get being O(depth).
func (l List) Get(idx uint64) Value {
d.PanicIfFalse(idx < l.Len())
cur := newCursorAtIndex(l.seq, idx)
cur := newCursorAtIndex(l.seq, idx, false)
return cur.current().(Value)
}
@@ -120,7 +120,7 @@ func (l List) Map(mf MapFunc) []interface{} {
// TODO: This is bad API. It should have returned another List.
// https://github.com/attic-labs/noms/issues/2557
idx := uint64(0)
cur := newCursorAtIndex(l.seq, idx)
cur := newCursorAtIndex(l.seq, idx, true)
results := make([]interface{}, 0, l.Len())
cur.iter(func(v interface{}) bool {
@@ -159,7 +159,7 @@ func (l List) Splice(idx uint64, deleteCount uint64, vs ...Value) List {
d.PanicIfFalse(idx <= l.Len())
d.PanicIfFalse(idx+deleteCount <= l.Len())
cur := newCursorAtIndex(l.seq, idx)
cur := newCursorAtIndex(l.seq, idx, false)
ch := l.newChunker(cur, l.seq.valueReader())
for deleteCount > 0 {
ch.Skip()
@@ -205,7 +205,7 @@ type listIterFunc func(v Value, index uint64) (stop bool)
// iteration stops.
func (l List) Iter(f listIterFunc) {
idx := uint64(0)
cur := newCursorAtIndex(l.seq, idx)
cur := newCursorAtIndex(l.seq, idx, true)
cur.iter(func(v interface{}) bool {
if f(v.(Value), uint64(idx)) {
return true
@@ -223,7 +223,7 @@ func (l List) IterAll(f listIterAllFunc) {
// TODO: Consider removing this and have Iter behave like IterAll.
// https://github.com/attic-labs/noms/issues/2558
idx := uint64(0)
cur := newCursorAtIndex(l.seq, idx)
cur := newCursorAtIndex(l.seq, idx, true)
cur.iter(func(v interface{}) bool {
f(v.(Value), uint64(idx))
idx++
@@ -239,7 +239,7 @@ func (l List) Iterator() ListIterator {
// IteratorAt returns a ListIterator starting at index. If index is out of bound the iterator will
// have reached its end on creation.
func (l List) IteratorAt(index uint64) ListIterator {
return ListIterator{newCursorAtIndex(l.seq, index)}
return ListIterator{newCursorAtIndex(l.seq, index, true)}
}
// Diff streams the diff from last to the current list to the changes channel. Caller can close
@@ -266,8 +266,8 @@ func (l List) DiffWithLimit(last List, changes chan<- Splice, closeChan <-chan s
return
}
lastCur := newCursorAtIndex(last.seq, 0)
lCur := newCursorAtIndex(l.seq, 0)
lastCur := newCursorAtIndex(last.seq, 0, false)
lCur := newCursorAtIndex(l.seq, 0, false)
indexedSequenceDiff(last.seq, lastCur.depth(), 0, l.seq, lCur.depth(), 0, changes, closeChan, maxSpliceMatrixSize)
}
+2 -2
View File
@@ -983,9 +983,9 @@ func TestListDiffLargeWithSameMiddle(t *testing.T) {
// should only read/write a "small & reasonably sized portion of the total"
assert.Equal(3, cs1.Writes)
assert.Equal(3, cs1.Reads)
assert.Equal(2, cs1.Reads)
assert.Equal(3, cs2.Writes)
assert.Equal(3, cs2.Reads)
assert.Equal(2, cs2.Reads)
}
func TestListDiffAllValuesInSequenceRemoved(t *testing.T) {
+11 -11
View File
@@ -125,7 +125,7 @@ func (m Map) Type() *Type {
}
func (m Map) firstOrLast(last bool) (Value, Value) {
cur := newCursorAt(m.seq, emptyKey, false, last)
cur := newCursorAt(m.seq, emptyKey, false, last, false)
if !cur.valid() {
return nil, nil
}
@@ -146,13 +146,13 @@ func (m Map) At(idx uint64) (key, value Value) {
panic(fmt.Errorf("Out of bounds: %s >= %s", idx, m.Len()))
}
cur := newCursorAtIndex(m.seq, idx)
cur := newCursorAtIndex(m.seq, idx, false)
entry := cur.current().(mapEntry)
return entry.key, entry.value
}
func (m Map) MaybeGet(key Value) (v Value, ok bool) {
cur := newCursorAtValue(m.seq, key, false, false)
cur := newCursorAtValue(m.seq, key, false, false, false)
if !cur.valid() {
return nil, false
}
@@ -176,7 +176,7 @@ func (m Map) SetM(kv ...Value) Map {
k, v, tail := kv[0], kv[1], kv[2:]
cur, found := m.getCursorAtValue(k)
cur, found := m.getCursorAtValue(k, false)
deleteCount := uint64(0)
if found {
deleteCount = 1
@@ -185,7 +185,7 @@ func (m Map) SetM(kv ...Value) Map {
}
func (m Map) Remove(k Value) Map {
if cur, found := m.getCursorAtValue(k); found {
if cur, found := m.getCursorAtValue(k, false); found {
return m.splice(cur, 1)
}
return m
@@ -204,14 +204,14 @@ func (m Map) splice(cur *sequenceCursor, deleteCount uint64, vs ...mapEntry) Map
return newMap(ch.Done().(orderedSequence))
}
func (m Map) getCursorAtValue(v Value) (cur *sequenceCursor, found bool) {
cur = newCursorAtValue(m.seq, v, true, false)
func (m Map) getCursorAtValue(v Value, readAhead bool) (cur *sequenceCursor, found bool) {
cur = newCursorAtValue(m.seq, v, true, false, readAhead)
found = cur.idx < cur.seq.seqLen() && cur.current().(mapEntry).key.Equals(v)
return
}
func (m Map) Has(key Value) bool {
cur := newCursorAtValue(m.seq, key, false, false)
cur := newCursorAtValue(m.seq, key, false, false, false)
if !cur.valid() {
return false
}
@@ -227,7 +227,7 @@ func (m Map) Get(key Value) Value {
type mapIterCallback func(key, value Value) (stop bool)
func (m Map) Iter(cb mapIterCallback) {
cur := newCursorAt(m.seq, emptyKey, false, false)
cur := newCursorAt(m.seq, emptyKey, false, false, true)
cur.iter(func(v interface{}) bool {
entry := v.(mapEntry)
return cb(entry.key, entry.value)
@@ -237,7 +237,7 @@ func (m Map) Iter(cb mapIterCallback) {
type mapIterAllCallback func(key, value Value)
func (m Map) IterAll(cb mapIterAllCallback) {
cur := newCursorAt(m.seq, emptyKey, false, false)
cur := newCursorAt(m.seq, emptyKey, false, false, true)
cur.iter(func(v interface{}) bool {
entry := v.(mapEntry)
cb(entry.key, entry.value)
@@ -246,7 +246,7 @@ func (m Map) IterAll(cb mapIterAllCallback) {
}
func (m Map) IterFrom(start Value, cb mapIterCallback) {
cur := newCursorAtValue(m.seq, start, false, false)
cur := newCursorAtValue(m.seq, start, false, false, true)
cur.iter(func(v interface{}) bool {
entry := v.(mapEntry)
return cb(entry.key, entry.value)
+49 -21
View File
@@ -7,7 +7,6 @@ package types
import (
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/noms/go/util/orderedparallel"
)
const (
@@ -145,23 +144,6 @@ func (ms metaSequence) getChildSequence(idx int) sequence {
return mt.getChildSequence(ms.vr)
}
func (ms metaSequence) beginFetchingChildSequences(start, length uint64) chan interface{} {
input := make(chan interface{})
output := orderedparallel.New(input, func(item interface{}) interface{} {
i := item.(int)
return ms.getChildSequence(i)
}, int(length))
go func() {
for i := start; i < start+length; i++ {
input <- int(i)
}
close(input)
}()
return output
}
// Returns the sequences pointed to by all items[i], s.t. start <= i < end, and returns the
// concatentation as one long composite sequence
func (ms metaSequence) getCompositeChildSequence(start uint64, length uint64) sequence {
@@ -179,9 +161,8 @@ func (ms metaSequence) getCompositeChildSequence(start uint64, length uint64) se
isIndexedSequence = true
}
output := ms.beginFetchingChildSequences(start, length)
for item := range output {
seq := item.(sequence)
output := ms.getChildren(start, start+length)
for _, seq := range output {
switch t := seq.(type) {
case metaSequence:
@@ -213,6 +194,53 @@ func (ms metaSequence) getCompositeChildSequence(start uint64, length uint64) se
return newSetLeafSequence(ms.vr, valueItems...)
}
// fetches child sequences from start (inclusive) to end (exclusive) and respects uncommitted child
// sequences.
func (ms metaSequence) getChildren(start, end uint64) (seqs []sequence) {
d.Chk.True(end <= uint64(len(ms.tuples)))
d.Chk.True(start <= end)
seqs = make([]sequence, end-start)
hs := make(hash.HashSet, len(seqs))
for i := start; i < end; i++ {
mt := ms.tuples[i]
if mt.child != nil {
seqs[i-start] = mt.child.sequence()
} else {
hs[mt.ref.TargetHash()] = struct{}{}
}
}
if len(hs) == 0 {
return // can occur with ptree that is fully uncommitted
}
// Fetch committed child sequences in a single batch
valueChan := make(chan Value, len(hs))
go func() {
ms.vr.ReadManyValues(hs, valueChan)
close(valueChan)
}()
children := make(map[hash.Hash]sequence, len(hs))
for value := range valueChan {
children[value.Hash()] = value.(Collection).sequence()
}
for i := start; i < end; i++ {
mt := ms.tuples[i]
if mt.child != nil {
continue
}
childSeq := children[mt.ref.TargetHash()]
d.Chk.NotNil(childSeq)
seqs[i-start] = childSeq
}
return
}
func isMetaSequence(seq sequence) bool {
_, seqIsMeta := seq.(metaSequence)
return seqIsMeta
+4 -4
View File
@@ -38,15 +38,15 @@ func newMapMetaSequence(tuples []metaTuple, vr ValueReader) metaSequence {
return newMetaSequence(tuples, t, vr)
}
func newCursorAtValue(seq orderedSequence, val Value, forInsertion bool, last bool) *sequenceCursor {
func newCursorAtValue(seq orderedSequence, val Value, forInsertion bool, last bool, readAhead bool) *sequenceCursor {
var key orderedKey
if val != nil {
key = newOrderedKey(val)
}
return newCursorAt(seq, key, forInsertion, last)
return newCursorAt(seq, key, forInsertion, last, readAhead)
}
func newCursorAt(seq orderedSequence, key orderedKey, forInsertion bool, last bool) *sequenceCursor {
func newCursorAt(seq orderedSequence, key orderedKey, forInsertion bool, last bool, readAhead bool) *sequenceCursor {
var cur *sequenceCursor
for {
idx := 0
@@ -54,7 +54,7 @@ func newCursorAt(seq orderedSequence, key orderedKey, forInsertion bool, last bo
idx = -1
}
seqIsMeta := isMetaSequence(seq)
cur = newSequenceCursor(cur, seq, idx)
cur = newSequenceCursor(cur, seq, idx, readAhead)
if key != emptyKey {
if !seekTo(cur, key, forInsertion && seqIsMeta) {
return cur
+4 -4
View File
@@ -112,8 +112,8 @@ func orderedSequenceDiffBest(last orderedSequence, current orderedSequence, chan
func orderedSequenceDiffTopDown(last orderedSequence, current orderedSequence, changes chan<- ValueChanged, stopChan <-chan struct{}) bool {
var lastHeight, currentHeight int
functions.All(
func() { lastHeight = newCursorAt(last, emptyKey, false, false).depth() },
func() { currentHeight = newCursorAt(current, emptyKey, false, false).depth() },
func() { lastHeight = newCursorAt(last, emptyKey, false, false, false).depth() },
func() { currentHeight = newCursorAt(current, emptyKey, false, false, false).depth() },
)
return orderedSequenceDiffInternalNodes(last, current, changes, stopChan, lastHeight, currentHeight)
}
@@ -159,8 +159,8 @@ func orderedSequenceDiffInternalNodes(last orderedSequence, current orderedSeque
// Streams the diff from |last| to |current| into |changes|, using a left-right approach.
// Left-right immediately descends to the first change and starts streaming changes, but compared to top-down it's serial and much slower to calculate the full diff.
func orderedSequenceDiffLeftRight(last orderedSequence, current orderedSequence, changes chan<- ValueChanged, stopChan <-chan struct{}) bool {
lastCur := newCursorAt(last, emptyKey, false, false)
currentCur := newCursorAt(current, emptyKey, false, false)
lastCur := newCursorAt(last, emptyKey, false, false, true)
currentCur := newCursorAt(current, emptyKey, false, false, true)
for lastCur.valid() && currentCur.valid() {
fastForward(lastCur, currentCur)
+1 -1
View File
@@ -321,7 +321,7 @@ func (hip HashIndexPath) Resolve(v Value) (res Value) {
return nil
}
cur := newCursorAt(seq, orderedKeyFromHash(hip.Hash), false, false)
cur := newCursorAt(seq, orderedKeyFromHash(hip.Hash), false, false, false)
if !cur.valid() {
return nil
}
+2 -2
View File
@@ -23,9 +23,9 @@ func concat(fst, snd sequence, newSequenceChunker newSequenceChunkerFn) sequence
if vr != snd.valueReader() {
d.Panic("cannot concat sequences from different databases")
}
chunker := newSequenceChunker(newCursorAtIndex(fst, fst.numLeaves()), vr)
chunker := newSequenceChunker(newCursorAtIndex(fst, fst.numLeaves(), false), vr)
for cur, ch := newCursorAtIndex(snd, 0), chunker; cur != nil; cur = cur.parent {
for cur, ch := newCursorAtIndex(snd, 0, false), chunker; cur != nil; cur = cur.parent {
// If fst is shallower than snd, its cur will have a parent whereas the
// chunker to snd won't. In that case, create a parent for fst.
// Note that if the inverse is true - snd is shallower than fst - this just
+113 -39
View File
@@ -4,34 +4,83 @@
package types
import (
"runtime"
"github.com/attic-labs/noms/go/d"
)
// The number of go routines to devote to read-ahead.
// The current setting provides good throughput on on a
// 2015 MacBook Pro/2.7 GHz i5 /8 GB.
var readAheadParallelism = runtime.NumCPU() * 64
import "github.com/attic-labs/noms/go/d"
// sequenceCursor explores a tree of sequence items.
type sequenceCursor struct {
parent *sequenceCursor
seq sequence
idx int
readAhead *sequenceReadAhead
readAhead bool
childSeqs []sequence
}
// Advances |sc| forward and streams back a clone of |sc| for each distinct sequence its *parent*
// visits. The caller should read the leaf cursor off of |curChan| and advance() until invalid.
// |readAheadLeafCursor| will |preloadChildren()| on each distinct parent cursor prior to sending it
// to |curChan|. The effect of this is that the client will be iterating over a sequence of
// leaf + 1 prolly tree sequences, each of which will have preloaded its children.
//
// /---\ /---\
// _______________________ <- each Cx's grandparent will be nil so that it only advances within a single sequence
// / \ / \ / \ / \ <- first meta-level
// /\ /\ /\ /\ /\ /\ /\ /\ <- leaf level
// ^ ^ ^ ^
// | | | |
// c1 c2 c3 c3 <- |curChan|
//
func readAheadLeafCursors(sc *sequenceCursor, curChan chan chan *sequenceCursor, stopChan chan struct{}) {
d.Chk.False(isMetaSequence(sc.seq))
parentCursor := sc.parent
if parentCursor == nil {
ch := make(chan *sequenceCursor, 1)
curChan <- ch
ch <- sc // No meta level on which to read ahead
return
}
d.Chk.True(parentCursor.readAhead)
leafIdx := sc.idx // Ensure the first cursor delivered is at the correct start position
for {
select {
case <-stopChan:
return
default:
}
if !parentCursor.valid() {
break // end of meta level has been reached
}
ch := make(chan *sequenceCursor)
curChan <- ch
pc := newSequenceCursor(nil, parentCursor.seq, parentCursor.idx, true)
go func(ch chan *sequenceCursor, pc *sequenceCursor, leafIdx int) {
ch <- newSequenceCursor(pc, pc.getChildSequence(), leafIdx, true)
}(ch, pc, leafIdx)
for parentCursor.advance() && parentCursor.idx > 0 {
}
leafIdx = 0
}
}
// newSequenceCursor creates a cursor on seq positioned at idx.
// If idx < 0, count backward from the end of seq.
func newSequenceCursor(parent *sequenceCursor, seq sequence, idx int) *sequenceCursor {
func newSequenceCursor(parent *sequenceCursor, seq sequence, idx int, readAhead bool) *sequenceCursor {
d.PanicIfTrue(seq == nil)
if idx < 0 {
idx += seq.seqLen()
d.PanicIfFalse(idx >= 0)
}
return &sequenceCursor{parent, seq, idx, nil}
readAhead = readAhead && isMetaSequence(seq) && seq.valueReader() != nil
return &sequenceCursor{parent, seq, idx, readAhead, nil}
}
func (cur *sequenceCursor) length() int {
@@ -46,19 +95,29 @@ func (cur *sequenceCursor) getItem(idx int) sequenceItem {
// It's called whenever the cursor advances/retreats to a different chunk.
func (cur *sequenceCursor) sync() {
d.PanicIfFalse(cur.parent != nil)
if cur.readAhead != nil {
v := cur.parent.current()
hash := v.(metaTuple).ref.TargetHash()
if cs, ok := cur.readAhead.get(cur.parent.idx, hash); ok {
cur.seq = cs
return
}
}
cur.childSeqs = nil
cur.seq = cur.parent.getChildSequence()
}
func (cur *sequenceCursor) preloadChildren() {
if cur.childSeqs != nil {
return
}
cur.childSeqs = make([]sequence, cur.seq.seqLen())
ms := cur.seq.(metaSequence)
copy(cur.childSeqs[cur.idx:], ms.getChildren(uint64(cur.idx), uint64(cur.seq.seqLen())))
}
// getChildSequence retrieves the child at the current cursor position.
func (cur *sequenceCursor) getChildSequence() sequence {
if cur.readAhead {
cur.preloadChildren()
if child := cur.childSeqs[cur.idx]; child != nil {
return child
}
}
return cur.seq.getChildSequence(cur.idx)
}
@@ -137,15 +196,43 @@ func (cur *sequenceCursor) clone() *sequenceCursor {
if cur.parent != nil {
parent = cur.parent.clone()
}
return newSequenceCursor(parent, cur.seq, cur.idx)
cl := newSequenceCursor(parent, cur.seq, cur.idx, cur.readAhead)
cl.childSeqs = cur.childSeqs
return cl
}
type cursorIterCallback func(item interface{}) bool
// iter iterates forward from the current position
// TODO: replace calls to this with direct calls to IterSequence()
func (cur *sequenceCursor) iter(cb cursorIterCallback) {
iterSequence(cur, cb)
if !cur.readAhead {
for cur.valid() && !cb(cur.getItem(cur.idx)) {
cur.advance()
}
return
}
curChan := make(chan chan *sequenceCursor, 16) // read ahead ~ 10MB of leaf sequence
stopChan := make(chan struct{}, 1)
go func() {
readAheadLeafCursors(cur, curChan, stopChan)
close(curChan)
}()
for ch := range curChan {
leafCursor := <-ch
for leafCursor.valid() {
if cb(leafCursor.getItem(cur.idx)) {
stopChan <- struct{}{}
for _ = range curChan {
} // ensure async loading goroutine exits before we do
return
}
leafCursor.advance()
}
}
}
// newCursorAtIndex creates a new cursor over seq positioned at idx.
@@ -153,10 +240,10 @@ func (cur *sequenceCursor) iter(cb cursorIterCallback) {
// Implemented by searching down the tree to the leaf sequence containing idx. Each
// sequence cursor includes a back pointer to its parent so that it can follow the path
// to the next leaf chunk when the cursor exhausts the entries in the current chunk.
func newCursorAtIndex(seq sequence, idx uint64) *sequenceCursor {
func newCursorAtIndex(seq sequence, idx uint64, readAhead bool) *sequenceCursor {
var cur *sequenceCursor
for {
cur = newSequenceCursor(cur, seq, 0)
cur = newSequenceCursor(cur, seq, 0, readAhead)
idx = idx - advanceCursorToOffset(cur, idx)
cs := cur.getChildSequence()
if cs == nil {
@@ -167,16 +254,3 @@ func newCursorAtIndex(seq sequence, idx uint64) *sequenceCursor {
d.PanicIfTrue(cur == nil)
return cur
}
// enableReadAhead turns on chunk read-ahead for a leaf sequence cursor.
// It is only intended to be called by sequenceIterator.
//
// Read-ahead should only be used in cases where the caller is iterating sequentially
// forward through all chunks starting at the current position.
func (cur *sequenceCursor) enableReadAhead() {
_, meta := cur.seq.(metaSequence)
d.PanicIfTrue(meta)
if cur.parent != nil { // only operative if sequence is chunked
cur.readAhead = newSequenceReadAhead(cur.parent, readAheadParallelism)
}
}
+5 -5
View File
@@ -70,18 +70,18 @@ func (ts testSequence) Type() *Type {
}
func newTestSequenceCursor(items []interface{}) *sequenceCursor {
parent := newSequenceCursor(nil, testSequence{items}, 0)
parent := newSequenceCursor(nil, testSequence{items}, 0, false)
items = items[0].([]interface{})
return newSequenceCursor(parent, testSequence{items}, 0)
return newSequenceCursor(parent, testSequence{items}, 0, false)
}
// TODO: Convert all tests to use newTestSequenceCursor3.
func newTestSequenceCursor3(items []interface{}) *sequenceCursor {
top := newSequenceCursor(nil, testSequence{items}, 0)
top := newSequenceCursor(nil, testSequence{items}, 0, false)
items = items[0].([]interface{})
middle := newSequenceCursor(top, testSequence{items}, 0)
middle := newSequenceCursor(top, testSequence{items}, 0, false)
items = items[0].([]interface{})
return newSequenceCursor(middle, testSequence{items}, 0)
return newSequenceCursor(middle, testSequence{items}, 0, false)
}
func TestTestCursor(t *testing.T) {
-65
View File
@@ -1,65 +0,0 @@
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package types
// iterSequence efficiently iterates through a sequence calling cb for
// each item.
//
// This is preferred to sequenceCursor.iter()
func iterSequence(sc *sequenceCursor, cb cursorIterCallback) {
sc.enableReadAhead()
defer func() {
sc.readAhead = nil
}()
it := &sequenceIterator{sc}
for it.hasMore() && !cb(it.item()) {
it.advance(1)
}
}
// sequenceIterator iterates forward through a sequence.
//
// Since it can assume the sequence is being read forward, it reads
// upcoming chunks ahead of their access to optimize throughput
type sequenceIterator struct {
cursor *sequenceCursor
}
// newSequenceIterator creates an iterator for reading a sequence
func newSequenceIterator(seq sequence, idx uint64) *sequenceIterator {
sc := newCursorAtIndex(seq, idx)
sc.enableReadAhead()
return &sequenceIterator{sc}
}
// hasMore return true if there's more to iterate
func (si sequenceIterator) hasMore() bool {
return si.cursor.valid()
}
// advance advances the iterator by n items
func (si sequenceIterator) advance(n int) bool {
for i := 0; i < n && si.cursor.advance(); i++ {
}
return si.cursor.valid()
}
// item returns the value at the current position
func (si sequenceIterator) item() sequenceItem {
return si.cursor.current()
}
// chunkAndIndex returns the current leaf chunk and the index in that chunk referring to Item()
func (si sequenceIterator) chunkAndIndex() (sequence, int) {
return si.cursor.seq, si.cursor.idx
}
// hitRate returns the read-ahead cache hit rate
func (si sequenceIterator) readAheadHitRate() float32 {
if ra := si.cursor.readAhead; ra != nil {
return ra.hitRate()
}
return 0.0
}
-98
View File
@@ -1,98 +0,0 @@
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package types
import (
"bytes"
"fmt"
"testing"
"github.com/attic-labs/testify/assert"
)
var seedData = []string{
"zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine",
}
const testCollectionSize = 100000
func genTestBlob() (Blob, []byte) {
var buffer bytes.Buffer
smallTestChunks()
defer normalProductionChunks()
for i := 0; buffer.Len() < testCollectionSize; i += 1 {
v := seedData[i%len(seedData)]
buffer.WriteString(fmt.Sprintf("%d%s", i, v))
}
raw := buffer.Bytes()
blob := NewBlob(&buffer)
return blob, raw
}
func assertMinDepth(assert *assert.Assertions, iter *sequenceIterator, min int) {
depth := iter.cursor.depth()
assert.Condition(func() bool { return depth >= min }, "depth less the min depth: %d >= %d", depth, min)
}
func TestIterBlob(t *testing.T) {
testIter := func(t *testing.T, blob Blob, expected []byte, start int) {
assert := assert.New(t)
expected = expected[start:]
iter := newSequenceIterator(blob.seq, uint64(start))
// need 4 levels to exercise parent traversal during read-ahead
assertMinDepth(assert, iter, 4)
var actual []byte
for iter.hasMore() {
actual = append(actual, iter.item().(byte))
iter.advance(1)
}
assert.Equal(len(expected), len(actual))
assert.Equal(expected, actual)
// delta normally 0 but may be more in rare case where the same
// (chunkIdx, hash) pair is repeated in different chunks. A lower
// hit rate likely indicates a bug.
assert.InDelta(1.0, iter.readAheadHitRate(), 0.05)
}
blob, expected := genTestBlob()
testIter(t, blob, expected, 0)
testIter(t, blob, expected, len(expected)/2)
}
func TestIterList(t *testing.T) {
genList := func() (List, []string) {
var buffer []string
var lbuffer []Value
smallTestChunks()
defer normalProductionChunks()
list := NewList()
for i := 0; len(buffer) < testCollectionSize; i += 1 {
v := seedData[i%len(seedData)]
s := fmt.Sprintf("%d%s", i, v)
buffer = append(buffer, s)
lbuffer = append(lbuffer, String(s))
}
list = list.Append(lbuffer...)
return list, buffer
}
testIter := func(t *testing.T, list List, expected []string, start int) {
assert := assert.New(t)
expected = expected[start:]
iter := newSequenceIterator(list.seq, uint64(start))
assertMinDepth(assert, iter, 4)
actual := []string{}
for iter.hasMore() {
actual = append(actual, string(iter.item().(String)))
iter.advance(1)
}
assert.Equal(len(expected), len(actual))
assert.Equal(expected, actual)
}
list, expected := genList()
testIter(t, list, expected, 0)
testIter(t, list, expected, len(expected)/2)
}
-94
View File
@@ -1,94 +0,0 @@
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package types
import (
"github.com/attic-labs/noms/go/hash"
)
// sequenceReadAhead implements read-ahead by mapping a hash to a channel returning
// the corresponding sequence.
//
// It reads ahead by firing off a set of short-lived go routines. Each go
// routine (1) reads a sequence, (2) inserts it into a channel, (3) adds the
// channel to the map keyed by sequence hash, and (4) exits.
//
// The caller retrieves the sequence by looking up the channel by hash and
// reading from it.
//
// It maintains parallelism |p| by initially firing off |p| go routines
// to read the next |p| sequences. When a sequence is retreived from the cache,
// It fires off a new go-routine to read the next sequence. This ensures that
// there are always |p| outstanding channels to read from the cache.
//
// This approach has one major advantage over a channel based approach:
// there are no go-routines to shutdown when finished with the cursor. This
// avoids requiring caller to call a Close() method.
type sequenceReadAhead struct {
cursor *sequenceCursor
cache map[raKey]chan sequence
parallelism int
getCount float32
hitCount float32
}
// raKey is the future key. Rather than simply use the hash, we combines it
// with the local chunk offset. This increases the likelihood that repeat values
// in the sequence will get unique entries in the map.
type raKey struct {
idx int
hash hash.Hash
}
func newSequenceReadAhead(cursor *sequenceCursor, parallelism int) *sequenceReadAhead {
m := map[raKey]chan sequence{}
return &sequenceReadAhead{cursor.clone(), m, parallelism, 0, 0}
}
func (ra *sequenceReadAhead) get(idx int, h hash.Hash) (sequence, bool) {
ra.readAhead()
key := raKey{idx, h}
ra.getCount += 1
if future, ok := ra.cache[key]; ok {
result := <-future
ra.hitCount += 1
delete(ra.cache, key)
return result, true
}
return nil, false
}
// readAhead (called when read-ahead is enabled) primes the next entries in the
// read-ahead cache. It ensures that go routines have been allocated for reading
// the next n entries in the current sequence. N is either readAheadParallelism
// or the number of entries left in the sequence if smaller.
func (ra *sequenceReadAhead) readAhead() {
// the next position to be primed
count := ra.parallelism - len(ra.cache)
for i := 0; i < count; i += 1 {
if !ra.cursor.advance() {
break
}
future := make(chan sequence, 1)
key := raKey{
ra.cursor.idx,
ra.cursor.current().(metaTuple).ref.target,
}
ra.cache[key] = future
seq := ra.cursor.seq
idx := ra.cursor.idx
go func() {
// close not required here but ensures fast fail if channel is misused
defer close(future)
val := seq.getChildSequence(idx)
future <- val
}()
}
}
func (rc *sequenceReadAhead) hitRate() float32 {
return rc.hitCount / rc.getCount
}
+9 -9
View File
@@ -108,7 +108,7 @@ func (s Set) Type() *Type {
}
func (s Set) First() Value {
cur := newCursorAt(s.seq, emptyKey, false, false)
cur := newCursorAt(s.seq, emptyKey, false, false, false)
if !cur.valid() {
return nil
}
@@ -120,7 +120,7 @@ func (s Set) At(idx uint64) Value {
panic(fmt.Errorf("Out of bounds: %s >= %s", idx, s.Len()))
}
cur := newCursorAtIndex(s.seq, idx)
cur := newCursorAtIndex(s.seq, idx, false)
return cur.current().(Value)
}
@@ -132,7 +132,7 @@ func (s Set) Insert(values ...Value) Set {
head, tail := values[0], values[1:]
var res Set
if cur, found := s.getCursorAtValue(head); !found {
if cur, found := s.getCursorAtValue(head, false); !found {
res = s.splice(cur, 0, head)
} else {
res = s
@@ -149,7 +149,7 @@ func (s Set) Remove(values ...Value) Set {
head, tail := values[0], values[1:]
var res Set
if cur, found := s.getCursorAtValue(head); found {
if cur, found := s.getCursorAtValue(head, false); found {
res = s.splice(cur, 1)
} else {
res = s
@@ -173,21 +173,21 @@ func (s Set) splice(cur *sequenceCursor, deleteCount uint64, vs ...Value) Set {
return ns
}
func (s Set) getCursorAtValue(v Value) (cur *sequenceCursor, found bool) {
cur = newCursorAtValue(s.seq, v, true, false)
func (s Set) getCursorAtValue(v Value, readAhead bool) (cur *sequenceCursor, found bool) {
cur = newCursorAtValue(s.seq, v, true, false, readAhead)
found = cur.idx < cur.seq.seqLen() && cur.current().(Value).Equals(v)
return
}
func (s Set) Has(v Value) bool {
cur := newCursorAtValue(s.seq, v, false, false)
cur := newCursorAtValue(s.seq, v, false, false, false)
return cur.valid() && cur.current().(Value).Equals(v)
}
type setIterCallback func(v Value) bool
func (s Set) Iter(cb setIterCallback) {
cur := newCursorAt(s.seq, emptyKey, false, false)
cur := newCursorAt(s.seq, emptyKey, false, false, true)
cur.iter(func(v interface{}) bool {
return cb(v.(Value))
})
@@ -196,7 +196,7 @@ func (s Set) Iter(cb setIterCallback) {
type setIterAllCallback func(v Value)
func (s Set) IterAll(cb setIterAllCallback) {
cur := newCursorAt(s.seq, emptyKey, false, false)
cur := newCursorAt(s.seq, emptyKey, false, false, true)
cur.iter(func(v interface{}) bool {
cb(v.(Value))
return false
+3 -3
View File
@@ -38,7 +38,7 @@ type setIterator struct {
func (si *setIterator) Next() Value {
if si.cursor == nil {
si.cursor = newCursorAt(si.s.seq, emptyKey, false, false)
si.cursor = newCursorAt(si.s.seq, emptyKey, false, false, false)
} else {
si.cursor.advance()
}
@@ -53,7 +53,7 @@ func (si *setIterator) SkipTo(v Value) Value {
first := false
if si.cursor == nil {
first = true
si.cursor, _ = si.s.getCursorAtValue(v)
si.cursor, _ = si.s.getCursorAtValue(v, false)
}
if !si.cursor.valid() {
@@ -69,7 +69,7 @@ func (si *setIterator) SkipTo(v Value) Value {
return si.Next()
}
si.cursor, _ = si.s.getCursorAtValue(v)
si.cursor, _ = si.s.getCursorAtValue(v, false)
if si.cursor.valid() {
return si.cursor.current().(Value)
}
+11 -10
View File
@@ -99,7 +99,7 @@ export class SequenceCursor<T, S: Sequence<any>> {
if (this.idx < 0) {
this.idx = Math.max(0, this.sequence.length + this.idx);
}
this.readAhead = readAhead;
this.readAhead = readAhead && this.sequence.isMeta;
this.childSeqs = null;
}
@@ -124,17 +124,18 @@ export class SequenceCursor<T, S: Sequence<any>> {
}
getChildSequence(): Promise<?S> {
if (this.readAhead && this.sequence.isMeta && !this.childSeqs) {
// Only readAhead when enabled, for meta sequences.
const childSeqs = [];
for (let i = this.idx; i < this.sequence.length; i++) {
childSeqs[i] = this.sequence.getChildSequence(i);
if (this.readAhead) {
if (!this.childSeqs) {
const childSeqs = [];
for (let i = this.idx; i < this.sequence.length; i++) {
childSeqs[i] = this.sequence.getChildSequence(i);
}
this.childSeqs = childSeqs;
}
this.childSeqs = childSeqs;
}
if (this.childSeqs && this.childSeqs[this.idx]) {
return this.childSeqs[this.idx]; // read ahead cache
if (this.childSeqs[this.idx]) {
return this.childSeqs[this.idx]; // read ahead cache
}
}
return this.sequence.getChildSequence(this.idx);
+30 -21
View File
@@ -15,6 +15,7 @@ import (
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/spec"
"github.com/attic-labs/noms/go/types"
"github.com/attic-labs/noms/go/util/profile"
"github.com/attic-labs/noms/go/util/progressreader"
"github.com/attic-labs/noms/go/util/status"
"github.com/attic-labs/noms/go/util/verbose"
@@ -30,6 +31,8 @@ func main() {
spec.RegisterDatabaseFlags(flag.CommandLine)
verbose.RegisterVerboseFlags(flag.CommandLine)
profile.RegisterProfileFlags(flag.CommandLine)
flag.Parse(true)
if flag.NArg() != 1 && flag.NArg() != 2 {
@@ -50,30 +53,36 @@ func main() {
blob = b
}
file := os.Stdout
blobReader := blob.Reader().(io.Reader)
showProgress := false
defer profile.MaybeStartProfile().Stop()
filePath := flag.Arg(1)
if filePath != "" {
// Note: overwrites any existing file.
var err error
file, err = os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0644)
d.CheckErrorNoUsage(err)
defer file.Close()
showProgress = true
start := time.Now()
expected := humanize.Bytes(blob.Len())
blobReader = progressreader.New(blob.Reader(), func(seen uint64) {
elapsed := time.Since(start).Seconds()
rate := uint64(float64(seen) / elapsed)
status.Printf("%s of %s written in %ds (%s/s)...", humanize.Bytes(seen), expected, int(elapsed), humanize.Bytes(rate))
})
if filePath == "" {
blob.Reader().Copy(os.Stdout)
return
}
// Note: overwrites any existing file.
file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0644)
d.CheckErrorNoUsage(err)
defer file.Close()
start := time.Now()
expected := humanize.Bytes(blob.Len())
// Create a pipe so that we can connect a progress reader
preader, pwriter := io.Pipe()
go func() {
blob.Reader().Copy(pwriter)
pwriter.Close()
}()
blobReader := progressreader.New(preader, func(seen uint64) {
elapsed := time.Since(start).Seconds()
rate := uint64(float64(seen) / elapsed)
status.Printf("%s of %s written in %ds (%s/s)...", humanize.Bytes(seen), expected, int(elapsed), humanize.Bytes(rate))
})
io.Copy(file, blobReader)
if showProgress {
status.Done()
}
status.Done()
}
+6 -1
View File
@@ -89,7 +89,12 @@ func main() {
d.CheckError(fmt.Errorf("Path %s not a Blob: %s\n", *path, types.EncodedValue(val.Type())))
}
defer db.Close()
r = blob.Reader()
preader, pwriter := io.Pipe()
go func() {
blob.Reader().Copy(pwriter)
pwriter.Close()
}()
r = preader
size = blob.Len()
dataSetArgN = 0
} else {