diff --git a/go/types/blob.go b/go/types/blob.go index b1f1c1d68b..28ead3179c 100644 --- a/go/types/blob.go +++ b/go/types/blob.go @@ -31,8 +31,9 @@ func NewEmptyBlob() Blob { // BUG 155 - Should provide Write... Maybe even have Blob implement ReadWriteSeeker func (b Blob) Reader() io.ReadSeeker { - cursor := newCursorAtIndex(b.seq, 0) - return &BlobReader{b.seq, cursor, nil, 0} + iter := newSequenceIterator(b.seq, 0) + return &BlobReader{b.seq, iter, nil, 0} + } func (b Blob) Splice(idx uint64, deleteCount uint64, data []byte) Blob { @@ -116,7 +117,7 @@ func (b Blob) Type() *Type { type BlobReader struct { seq sequence - cursor *sequenceCursor + iter *sequenceIterator currentReader io.ReadSeeker pos uint64 } @@ -127,11 +128,9 @@ func (cbr *BlobReader) Read(p []byte) (n int, err error) { } n, err = cbr.currentReader.Read(p) - for i := 0; i < n; i++ { - cbr.pos++ - cbr.cursor.advance() - } - if err == io.EOF && cbr.cursor.idx < cbr.cursor.seq.seqLen() { + cbr.pos += uint64(n) + hasMore := cbr.iter.advance(n) + if err == io.EOF && hasMore { cbr.currentReader = nil err = nil } @@ -158,14 +157,16 @@ func (cbr *BlobReader) Seek(offset int64, whence int) (int64, error) { } cbr.pos = uint64(abs) - cbr.cursor = newCursorAtIndex(cbr.seq, cbr.pos) + cbr.iter = newSequenceIterator(cbr.seq, cbr.pos) cbr.currentReader = nil return abs, nil } func (cbr *BlobReader) updateReader() { - cbr.currentReader = bytes.NewReader(cbr.cursor.seq.(blobLeafSequence).data) - cbr.currentReader.Seek(int64(cbr.cursor.idx), 0) + chunk, idx := cbr.iter.chunkAndIndex() + data := chunk.(blobLeafSequence).data + cbr.currentReader = bytes.NewReader(data) + cbr.currentReader.Seek(int64(idx), 0) } func makeBlobLeafChunkFn(vr ValueReader) makeChunkFn { diff --git a/go/types/indexed_sequences.go b/go/types/indexed_sequences.go index 6dfecde618..2860ba4a5b 100644 --- a/go/types/indexed_sequences.go +++ b/go/types/indexed_sequences.go @@ -20,6 +20,14 @@ func newBlobMetaSequence(tuples []metaTuple, vr ValueReader) metaSequence { return newMetaSequence(tuples, BlobType, vr) } +// advanceCursorToOffset advances the cursor as close as possible to idx +// +// If the cursor references a leaf sequence, +// advance to idx, +// and return the number of values preceding the idx +// If it references a meta-sequence, +// advance to the tuple containing idx, +// and return the number of leaf values preceding this tuple func advanceCursorToOffset(cur *sequenceCursor, idx uint64) uint64 { seq := cur.seq @@ -28,6 +36,7 @@ func advanceCursorToOffset(cur *sequenceCursor, idx uint64) uint64 { cur.idx = 0 cum := uint64(0) + // Advance the cursor to the meta-sequence tuple containing idx for cur.idx < ms.seqLen()-1 && uint64(idx) >= cum+ms.tuples[cur.idx].numLeaves { cum += ms.tuples[cur.idx].numLeaves cur.idx++ @@ -40,8 +49,7 @@ func advanceCursorToOffset(cur *sequenceCursor, idx uint64) uint64 { if cur.idx > seq.seqLen() { cur.idx = seq.seqLen() } - - return uint64(cur.idx) + 1 + return uint64(cur.idx) } // If |sink| is not nil, chunks will be eagerly written as they're created. Otherwise they are diff --git a/go/types/meta_sequence.go b/go/types/meta_sequence.go index 1bd4127b38..6eaf1178a4 100644 --- a/go/types/meta_sequence.go +++ b/go/types/meta_sequence.go @@ -35,7 +35,6 @@ func (mt metaTuple) getChildSequence(vr ValueReader) sequence { if mt.child != nil { return mt.child.sequence() } - return mt.ref.TargetValue(vr).(Collection).sequence() } diff --git a/go/types/ordered_sequences.go b/go/types/ordered_sequences.go index 399c814874..05a1b34a57 100644 --- a/go/types/ordered_sequences.go +++ b/go/types/ordered_sequences.go @@ -67,7 +67,6 @@ func newCursorAt(seq orderedSequence, key orderedKey, forInsertion bool, last bo } seq = cs.(orderedSequence) } - d.PanicIfFalse(cur != nil) return cur } diff --git a/go/types/sequence_cursor.go b/go/types/sequence_cursor.go index 0f585da628..771cf5b1d1 100644 --- a/go/types/sequence_cursor.go +++ b/go/types/sequence_cursor.go @@ -4,24 +4,34 @@ package types -import "github.com/attic-labs/noms/go/d" +import ( + "runtime" + + "github.com/attic-labs/noms/go/d" +) + +// The number of go routines to devote to read-ahead. +// The current setting provides good throughput on on a +// 2015 MacBook Pro/2.7 GHz i5 /8 GB. +var readAheadParallelism = runtime.NumCPU() * 64 // sequenceCursor explores a tree of sequence items. type sequenceCursor struct { - parent *sequenceCursor - seq sequence - idx int + parent *sequenceCursor + seq sequence + idx int + readAhead *sequenceReadAhead } +// newSequenceCursor creates a cursor on seq positioned at idx. +// If idx < 0, count backward from the end of seq. func newSequenceCursor(parent *sequenceCursor, seq sequence, idx int) *sequenceCursor { d.PanicIfTrue(seq == nil) if idx < 0 { idx += seq.seqLen() d.PanicIfFalse(idx >= 0) } - - cur := &sequenceCursor{parent, seq, idx} - return cur + return &sequenceCursor{parent, seq, idx, nil} } func (cur *sequenceCursor) length() int { @@ -32,16 +42,27 @@ func (cur *sequenceCursor) getItem(idx int) sequenceItem { return cur.seq.getItem(idx) } +// sync loads the sequence that the cursor index points to. +// It's called whenever the cursor advances/retreats to a different chunk. func (cur *sequenceCursor) sync() { d.PanicIfFalse(cur.parent != nil) + if cur.readAhead != nil { + v := cur.parent.current() + hash := v.(metaTuple).ref.TargetHash() + if cs, ok := cur.readAhead.get(cur.parent.idx, hash); ok { + cur.seq = cs + return + } + } cur.seq = cur.parent.getChildSequence() } +// getChildSequence retrieves the child at the current cursor position. func (cur *sequenceCursor) getChildSequence() sequence { return cur.seq.getChildSequence(cur.idx) } -// Returns the value the cursor refers to. Fails an assertion if the cursor doesn't point to a value. +// current returns the value at the current cursor position func (cur *sequenceCursor) current() sequenceItem { d.PanicIfFalse(cur.valid()) return cur.getItem(cur.idx) @@ -75,6 +96,7 @@ func (cur *sequenceCursor) advanceMaybeAllowPastEnd(allowPastEnd bool) bool { return false } if cur.parent != nil && cur.parent.advanceMaybeAllowPastEnd(false) { + // at end of current leaf chunk and there are more cur.sync() cur.idx = 0 return true @@ -109,22 +131,28 @@ func (cur *sequenceCursor) retreatMaybeAllowBeforeStart(allowBeforeStart bool) b return false } +// clone creates a copy of the cursor func (cur *sequenceCursor) clone() *sequenceCursor { var parent *sequenceCursor if cur.parent != nil { parent = cur.parent.clone() } - return &sequenceCursor{parent, cur.seq, cur.idx} + return newSequenceCursor(parent, cur.seq, cur.idx) } type cursorIterCallback func(item interface{}) bool +// iter iterates forward from the current position +// TODO: replace calls to this with direct calls to IterSequence() func (cur *sequenceCursor) iter(cb cursorIterCallback) { - for cur.valid() && !cb(cur.getItem(cur.idx)) { - cur.advance() - } + iterSequence(cur, cb) } +// newCursorAtIndex creates a new cursor over seq positioned at idx. +// +// Implemented by searching down the tree to the leaf sequence containing idx. Each +// sequence cursor includes a back pointer to its parent so that it can follow the path +// to the next leaf chunk when the cursor exhausts the entries in the current chunk. func newCursorAtIndex(seq sequence, idx uint64) *sequenceCursor { var cur *sequenceCursor for { @@ -136,7 +164,19 @@ func newCursorAtIndex(seq sequence, idx uint64) *sequenceCursor { } seq = cs } - d.PanicIfTrue(cur == nil) return cur } + +// enableReadAhead turns on chunk read-ahead for a leaf sequence cursor. +// It is only intended to be called by sequenceIterator. +// +// Read-ahead should only be used in cases where the caller is iterating sequentially +// forward through all chunks starting at the current position. +func (cur *sequenceCursor) enableReadAhead() { + _, meta := cur.seq.(metaSequence) + d.PanicIfTrue(meta) + if cur.parent != nil { // only operative if sequence is chunked + cur.readAhead = newSequenceReadAhead(cur.parent, readAheadParallelism) + } +} diff --git a/go/types/sequence_iterator.go b/go/types/sequence_iterator.go new file mode 100644 index 0000000000..1e34067f8c --- /dev/null +++ b/go/types/sequence_iterator.go @@ -0,0 +1,65 @@ +// Copyright 2016 Attic Labs, Inc. All rights reserved. +// Licensed under the Apache License, version 2.0: +// http://www.apache.org/licenses/LICENSE-2.0 + +package types + +// iterSequence efficiently iterates through a sequence calling cb for +// each item. +// +// This is preferred to sequenceCursor.iter() +func iterSequence(sc *sequenceCursor, cb cursorIterCallback) { + sc.enableReadAhead() + defer func() { + sc.readAhead = nil + }() + it := &sequenceIterator{sc} + for it.hasMore() && !cb(it.item()) { + it.advance(1) + } +} + +// sequenceIterator iterates forward through a sequence. +// +// Since it can assume the sequence is being read forward, it reads +// upcoming chunks ahead of their access to optimize throughput +type sequenceIterator struct { + cursor *sequenceCursor +} + +// newSequenceIterator creates an iterator for reading a sequence +func newSequenceIterator(seq sequence, idx uint64) *sequenceIterator { + sc := newCursorAtIndex(seq, idx) + sc.enableReadAhead() + return &sequenceIterator{sc} +} + +// hasMore return true if there's more to iterate +func (si sequenceIterator) hasMore() bool { + return si.cursor.valid() +} + +// advance advances the iterator by n items +func (si sequenceIterator) advance(n int) bool { + for i := 0; i < n && si.cursor.advance(); i++ { + } + return si.cursor.valid() +} + +// item returns the value at the current position +func (si sequenceIterator) item() sequenceItem { + return si.cursor.current() +} + +// chunkAndIndex returns the current leaf chunk and the index in that chunk referring to Item() +func (si sequenceIterator) chunkAndIndex() (sequence, int) { + return si.cursor.seq, si.cursor.idx +} + +// hitRate returns the read-ahead cache hit rate +func (si sequenceIterator) readAheadHitRate() float32 { + if ra := si.cursor.readAhead; ra != nil { + return ra.hitRate() + } + return 0.0 +} diff --git a/go/types/sequence_iterator_test.go b/go/types/sequence_iterator_test.go new file mode 100644 index 0000000000..19900b745e --- /dev/null +++ b/go/types/sequence_iterator_test.go @@ -0,0 +1,89 @@ +// Copyright 2016 Attic Labs, Inc. All rights reserved. +// Licensed under the Apache License, version 2.0: +// http://www.apache.org/licenses/LICENSE-2.0 + +package types + +import ( + "bytes" + "fmt" + "testing" + + "github.com/attic-labs/testify/assert" +) + +var seedData = []string{ + "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten", +} + +const testCollectionSize = 100000 + +func genTestBlob() (Blob, []byte) { + var buffer bytes.Buffer + for i := 0; i < testCollectionSize; i += 1 { + for _, v := range seedData { + buffer.WriteString(fmt.Sprintf("%d%s", i, v)) + } + } + blob := NewBlob(&buffer) + return blob, buffer.Bytes() +} + +func TestIterBlob(t *testing.T) { + testIter := func(t *testing.T, blob Blob, expected []byte, start int) { + assert := assert.New(t) + expected = expected[start:] + iter := newSequenceIterator(blob.seq, uint64(start)) + var actual []byte + for iter.hasMore() { + actual = append(actual, iter.item().(byte)) + iter.advance(1) + } + assert.Equal(len(expected), len(actual)) + assert.Equal(expected, actual) + // delta normally 0 but may be more in rare case where the same + // (chunkIdx, hash) pair is repeated in different chunks. A lower + // hit rate likely indicates a bug. + assert.InDelta(1.0, iter.readAheadHitRate(), 0.01) + } + + blob, expected := genTestBlob() + + testIter(t, blob, expected, 0) + testIter(t, blob, expected, len(expected)/2) +} + +func TestIterList(t *testing.T) { + genList := func() (List, []string) { + var buffer []string + var lbuffer []Value + + list := NewList() + + for i := 0; i < testCollectionSize; i += 1 { + for _, v := range seedData { + s := fmt.Sprintf("%d%s", i, v) + buffer = append(buffer, s) + lbuffer = append(lbuffer, String(s)) + } + } + list = list.Append(lbuffer...) + return list, buffer + + } + testIter := func(t *testing.T, list List, expected []string, start int) { + assert := assert.New(t) + expected = expected[start:] + iter := newSequenceIterator(list.seq, uint64(start)) + actual := []string{} + for iter.hasMore() { + actual = append(actual, string(iter.item().(String))) + iter.advance(1) + } + assert.Equal(len(expected), len(actual)) + assert.Equal(expected, actual) + } + list, expected := genList() + testIter(t, list, expected, 0) + testIter(t, list, expected, len(expected)/2) +} diff --git a/go/types/sequence_readahead.go b/go/types/sequence_readahead.go new file mode 100644 index 0000000000..72b7d0b83c --- /dev/null +++ b/go/types/sequence_readahead.go @@ -0,0 +1,94 @@ +// Copyright 2016 Attic Labs, Inc. All rights reserved. +// Licensed under the Apache License, version 2.0: +// http://www.apache.org/licenses/LICENSE-2.0 + +package types + +import ( + "github.com/attic-labs/noms/go/hash" +) + +// sequenceReadAhead implements read-ahead by mapping a hash to a channel returning +// the corresponding sequence. +// +// It reads ahead by firing off a set of short-lived go routines. Each go +// routine (1) reads a sequence, (2) inserts it into a channel, (3) adds the +// channel to the map keyed by sequence hash, and (4) exits. +// +// The caller retrieves the sequence by looking up the channel by hash and +// reading from it. +// +// It maintains parallelism |p| by initially firing off |p| go routines +// to read the next |p| sequences. When a sequence is retreived from the cache, +// It fires off a new go-routine to read the next sequence. This ensures that +// there are always |p| outstanding channels to read from the cache. +// +// This approach has one major advantage over a channel based approach: +// there are no go-routines to shutdown when finished with the cursor. This +// avoids requiring caller to call a Close() method. +type sequenceReadAhead struct { + cursor *sequenceCursor + cache map[raKey]chan sequence + parallelism int + getCount float32 + hitCount float32 +} + +// raKey is the future key. Rather than simply use the hash, we combines it +// with the local chunk offset. This increases the likelihood that repeat values +// in the sequence will get unique entries in the map. +type raKey struct { + idx int + hash hash.Hash +} + +func newSequenceReadAhead(cursor *sequenceCursor, parallelism int) *sequenceReadAhead { + m := map[raKey]chan sequence{} + return &sequenceReadAhead{cursor.clone(), m, parallelism, 0, 0} +} + +func (ra *sequenceReadAhead) get(idx int, h hash.Hash) (sequence, bool) { + ra.readAhead() + key := raKey{idx, h} + ra.getCount += 1 + if future, ok := ra.cache[key]; ok { + result := <-future + ra.hitCount += 1 + delete(ra.cache, key) + return result, true + } + return nil, false +} + +// readAhead (called when read-ahead is enabled) primes the next entries in the +// read-ahead cache. It ensures that go routines have been allocated for reading +// the next n entries in the current sequence. N is either readAheadParallelism +// or the number of entries left in the sequence if smaller. +func (ra *sequenceReadAhead) readAhead() { + // the next position to be primed + count := ra.parallelism - len(ra.cache) + for i := 0; i < count; i += 1 { + if !ra.cursor.advance() { + break + } + future := make(chan sequence, 1) + key := raKey{ + ra.cursor.idx, + ra.cursor.current().(metaTuple).ref.target, + } + ra.cache[key] = future + seq := ra.cursor.seq + idx := ra.cursor.idx + go func() { + // close not required here but ensures fast fail if channel is misused + defer close(future) + val := seq.getChildSequence(idx) + future <- val + + }() + } +} + +func (rc *sequenceReadAhead) hitRate() float32 { + return rc.hitCount / rc.getCount +} diff --git a/go/util/math/minmax.go b/go/util/math/minmax.go new file mode 100644 index 0000000000..0079153d64 --- /dev/null +++ b/go/util/math/minmax.go @@ -0,0 +1,21 @@ +// Copyright 2016 Attic Labs, Inc. All rights reserved. +// Licensed under the Apache License, version 2.0: +// http://www.apache.org/licenses/LICENSE-2.0 + +package math + +// MaxInt returns the larger of x or y. +func MaxInt(x, y int) int { + if x > y { + return x + } + return y +} + +// MinInt returns the smaller of x or y. +func MinInt(x, y int) int { + if x < y { + return x + } + return y +}