Improve sequence read performance using read-ahead (#2711)

* Implement read-ahead in sequence_cursor
For each meta-sequence that contains leaf sequences, start reading ahead in
parallel and deliver in order to a buffered channel. Each advance of the cursor gets
the next sequence in the read-ahead channel.

toward: #2079
-

* Address code review comments:
- Use // for all comments
- Fix label format
- Increase channel read timeout

* Rework read-ahead to use map[int]channel sequence instead of a channel of sequences

* Rework sequence cursor read-ahead for better throughput

- Guts of read-ahead now encapsulted in sequenceReadAhead
- New implemention uses a cursor to iterate across the leaves ahead
  of the current cursor
  - It reads ahead using short-lived go routines that place each read-ahead
    sequence in a channel that is then stored by hash in a map
  - When the sequence is needed, the cursor first looks in the map. If found,
    it reads the sequence from the channel stored in the map. If not, it reads
    it normally.
  - This approach allows for reading ahead in parallel without requiring a long
    running pool of goroutines
- Introduce sequenceIterator to encapulate read-ahead behind an abstraction that
  always reads forward. This is currently used narrowly but could be used more
  widely as the the core implementation for all sequence iterators

* Address review comments
This commit is contained in:
Eric Halpern
2016-11-12 11:51:26 -08:00
committed by GitHub
parent f6c3f91cc3
commit 242b782748
9 changed files with 344 additions and 28 deletions
+12 -11
View File
@@ -31,8 +31,9 @@ func NewEmptyBlob() Blob {
// BUG 155 - Should provide Write... Maybe even have Blob implement ReadWriteSeeker
func (b Blob) Reader() io.ReadSeeker {
cursor := newCursorAtIndex(b.seq, 0)
return &BlobReader{b.seq, cursor, nil, 0}
iter := newSequenceIterator(b.seq, 0)
return &BlobReader{b.seq, iter, nil, 0}
}
func (b Blob) Splice(idx uint64, deleteCount uint64, data []byte) Blob {
@@ -116,7 +117,7 @@ func (b Blob) Type() *Type {
type BlobReader struct {
seq sequence
cursor *sequenceCursor
iter *sequenceIterator
currentReader io.ReadSeeker
pos uint64
}
@@ -127,11 +128,9 @@ func (cbr *BlobReader) Read(p []byte) (n int, err error) {
}
n, err = cbr.currentReader.Read(p)
for i := 0; i < n; i++ {
cbr.pos++
cbr.cursor.advance()
}
if err == io.EOF && cbr.cursor.idx < cbr.cursor.seq.seqLen() {
cbr.pos += uint64(n)
hasMore := cbr.iter.advance(n)
if err == io.EOF && hasMore {
cbr.currentReader = nil
err = nil
}
@@ -158,14 +157,16 @@ func (cbr *BlobReader) Seek(offset int64, whence int) (int64, error) {
}
cbr.pos = uint64(abs)
cbr.cursor = newCursorAtIndex(cbr.seq, cbr.pos)
cbr.iter = newSequenceIterator(cbr.seq, cbr.pos)
cbr.currentReader = nil
return abs, nil
}
func (cbr *BlobReader) updateReader() {
cbr.currentReader = bytes.NewReader(cbr.cursor.seq.(blobLeafSequence).data)
cbr.currentReader.Seek(int64(cbr.cursor.idx), 0)
chunk, idx := cbr.iter.chunkAndIndex()
data := chunk.(blobLeafSequence).data
cbr.currentReader = bytes.NewReader(data)
cbr.currentReader.Seek(int64(idx), 0)
}
func makeBlobLeafChunkFn(vr ValueReader) makeChunkFn {
+10 -2
View File
@@ -20,6 +20,14 @@ func newBlobMetaSequence(tuples []metaTuple, vr ValueReader) metaSequence {
return newMetaSequence(tuples, BlobType, vr)
}
// advanceCursorToOffset advances the cursor as close as possible to idx
//
// If the cursor references a leaf sequence,
// advance to idx,
// and return the number of values preceding the idx
// If it references a meta-sequence,
// advance to the tuple containing idx,
// and return the number of leaf values preceding this tuple
func advanceCursorToOffset(cur *sequenceCursor, idx uint64) uint64 {
seq := cur.seq
@@ -28,6 +36,7 @@ func advanceCursorToOffset(cur *sequenceCursor, idx uint64) uint64 {
cur.idx = 0
cum := uint64(0)
// Advance the cursor to the meta-sequence tuple containing idx
for cur.idx < ms.seqLen()-1 && uint64(idx) >= cum+ms.tuples[cur.idx].numLeaves {
cum += ms.tuples[cur.idx].numLeaves
cur.idx++
@@ -40,8 +49,7 @@ func advanceCursorToOffset(cur *sequenceCursor, idx uint64) uint64 {
if cur.idx > seq.seqLen() {
cur.idx = seq.seqLen()
}
return uint64(cur.idx) + 1
return uint64(cur.idx)
}
// If |sink| is not nil, chunks will be eagerly written as they're created. Otherwise they are
-1
View File
@@ -35,7 +35,6 @@ func (mt metaTuple) getChildSequence(vr ValueReader) sequence {
if mt.child != nil {
return mt.child.sequence()
}
return mt.ref.TargetValue(vr).(Collection).sequence()
}
-1
View File
@@ -67,7 +67,6 @@ func newCursorAt(seq orderedSequence, key orderedKey, forInsertion bool, last bo
}
seq = cs.(orderedSequence)
}
d.PanicIfFalse(cur != nil)
return cur
}
+53 -13
View File
@@ -4,24 +4,34 @@
package types
import "github.com/attic-labs/noms/go/d"
import (
"runtime"
"github.com/attic-labs/noms/go/d"
)
// The number of go routines to devote to read-ahead.
// The current setting provides good throughput on on a
// 2015 MacBook Pro/2.7 GHz i5 /8 GB.
var readAheadParallelism = runtime.NumCPU() * 64
// sequenceCursor explores a tree of sequence items.
type sequenceCursor struct {
parent *sequenceCursor
seq sequence
idx int
parent *sequenceCursor
seq sequence
idx int
readAhead *sequenceReadAhead
}
// newSequenceCursor creates a cursor on seq positioned at idx.
// If idx < 0, count backward from the end of seq.
func newSequenceCursor(parent *sequenceCursor, seq sequence, idx int) *sequenceCursor {
d.PanicIfTrue(seq == nil)
if idx < 0 {
idx += seq.seqLen()
d.PanicIfFalse(idx >= 0)
}
cur := &sequenceCursor{parent, seq, idx}
return cur
return &sequenceCursor{parent, seq, idx, nil}
}
func (cur *sequenceCursor) length() int {
@@ -32,16 +42,27 @@ func (cur *sequenceCursor) getItem(idx int) sequenceItem {
return cur.seq.getItem(idx)
}
// sync loads the sequence that the cursor index points to.
// It's called whenever the cursor advances/retreats to a different chunk.
func (cur *sequenceCursor) sync() {
d.PanicIfFalse(cur.parent != nil)
if cur.readAhead != nil {
v := cur.parent.current()
hash := v.(metaTuple).ref.TargetHash()
if cs, ok := cur.readAhead.get(cur.parent.idx, hash); ok {
cur.seq = cs
return
}
}
cur.seq = cur.parent.getChildSequence()
}
// getChildSequence retrieves the child at the current cursor position.
func (cur *sequenceCursor) getChildSequence() sequence {
return cur.seq.getChildSequence(cur.idx)
}
// Returns the value the cursor refers to. Fails an assertion if the cursor doesn't point to a value.
// current returns the value at the current cursor position
func (cur *sequenceCursor) current() sequenceItem {
d.PanicIfFalse(cur.valid())
return cur.getItem(cur.idx)
@@ -75,6 +96,7 @@ func (cur *sequenceCursor) advanceMaybeAllowPastEnd(allowPastEnd bool) bool {
return false
}
if cur.parent != nil && cur.parent.advanceMaybeAllowPastEnd(false) {
// at end of current leaf chunk and there are more
cur.sync()
cur.idx = 0
return true
@@ -109,22 +131,28 @@ func (cur *sequenceCursor) retreatMaybeAllowBeforeStart(allowBeforeStart bool) b
return false
}
// clone creates a copy of the cursor
func (cur *sequenceCursor) clone() *sequenceCursor {
var parent *sequenceCursor
if cur.parent != nil {
parent = cur.parent.clone()
}
return &sequenceCursor{parent, cur.seq, cur.idx}
return newSequenceCursor(parent, cur.seq, cur.idx)
}
type cursorIterCallback func(item interface{}) bool
// iter iterates forward from the current position
// TODO: replace calls to this with direct calls to IterSequence()
func (cur *sequenceCursor) iter(cb cursorIterCallback) {
for cur.valid() && !cb(cur.getItem(cur.idx)) {
cur.advance()
}
iterSequence(cur, cb)
}
// newCursorAtIndex creates a new cursor over seq positioned at idx.
//
// Implemented by searching down the tree to the leaf sequence containing idx. Each
// sequence cursor includes a back pointer to its parent so that it can follow the path
// to the next leaf chunk when the cursor exhausts the entries in the current chunk.
func newCursorAtIndex(seq sequence, idx uint64) *sequenceCursor {
var cur *sequenceCursor
for {
@@ -136,7 +164,19 @@ func newCursorAtIndex(seq sequence, idx uint64) *sequenceCursor {
}
seq = cs
}
d.PanicIfTrue(cur == nil)
return cur
}
// enableReadAhead turns on chunk read-ahead for a leaf sequence cursor.
// It is only intended to be called by sequenceIterator.
//
// Read-ahead should only be used in cases where the caller is iterating sequentially
// forward through all chunks starting at the current position.
func (cur *sequenceCursor) enableReadAhead() {
_, meta := cur.seq.(metaSequence)
d.PanicIfTrue(meta)
if cur.parent != nil { // only operative if sequence is chunked
cur.readAhead = newSequenceReadAhead(cur.parent, readAheadParallelism)
}
}
+65
View File
@@ -0,0 +1,65 @@
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package types
// iterSequence efficiently iterates through a sequence calling cb for
// each item.
//
// This is preferred to sequenceCursor.iter()
func iterSequence(sc *sequenceCursor, cb cursorIterCallback) {
sc.enableReadAhead()
defer func() {
sc.readAhead = nil
}()
it := &sequenceIterator{sc}
for it.hasMore() && !cb(it.item()) {
it.advance(1)
}
}
// sequenceIterator iterates forward through a sequence.
//
// Since it can assume the sequence is being read forward, it reads
// upcoming chunks ahead of their access to optimize throughput
type sequenceIterator struct {
cursor *sequenceCursor
}
// newSequenceIterator creates an iterator for reading a sequence
func newSequenceIterator(seq sequence, idx uint64) *sequenceIterator {
sc := newCursorAtIndex(seq, idx)
sc.enableReadAhead()
return &sequenceIterator{sc}
}
// hasMore return true if there's more to iterate
func (si sequenceIterator) hasMore() bool {
return si.cursor.valid()
}
// advance advances the iterator by n items
func (si sequenceIterator) advance(n int) bool {
for i := 0; i < n && si.cursor.advance(); i++ {
}
return si.cursor.valid()
}
// item returns the value at the current position
func (si sequenceIterator) item() sequenceItem {
return si.cursor.current()
}
// chunkAndIndex returns the current leaf chunk and the index in that chunk referring to Item()
func (si sequenceIterator) chunkAndIndex() (sequence, int) {
return si.cursor.seq, si.cursor.idx
}
// hitRate returns the read-ahead cache hit rate
func (si sequenceIterator) readAheadHitRate() float32 {
if ra := si.cursor.readAhead; ra != nil {
return ra.hitRate()
}
return 0.0
}
+89
View File
@@ -0,0 +1,89 @@
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package types
import (
"bytes"
"fmt"
"testing"
"github.com/attic-labs/testify/assert"
)
var seedData = []string{
"zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten",
}
const testCollectionSize = 100000
func genTestBlob() (Blob, []byte) {
var buffer bytes.Buffer
for i := 0; i < testCollectionSize; i += 1 {
for _, v := range seedData {
buffer.WriteString(fmt.Sprintf("%d%s", i, v))
}
}
blob := NewBlob(&buffer)
return blob, buffer.Bytes()
}
func TestIterBlob(t *testing.T) {
testIter := func(t *testing.T, blob Blob, expected []byte, start int) {
assert := assert.New(t)
expected = expected[start:]
iter := newSequenceIterator(blob.seq, uint64(start))
var actual []byte
for iter.hasMore() {
actual = append(actual, iter.item().(byte))
iter.advance(1)
}
assert.Equal(len(expected), len(actual))
assert.Equal(expected, actual)
// delta normally 0 but may be more in rare case where the same
// (chunkIdx, hash) pair is repeated in different chunks. A lower
// hit rate likely indicates a bug.
assert.InDelta(1.0, iter.readAheadHitRate(), 0.01)
}
blob, expected := genTestBlob()
testIter(t, blob, expected, 0)
testIter(t, blob, expected, len(expected)/2)
}
func TestIterList(t *testing.T) {
genList := func() (List, []string) {
var buffer []string
var lbuffer []Value
list := NewList()
for i := 0; i < testCollectionSize; i += 1 {
for _, v := range seedData {
s := fmt.Sprintf("%d%s", i, v)
buffer = append(buffer, s)
lbuffer = append(lbuffer, String(s))
}
}
list = list.Append(lbuffer...)
return list, buffer
}
testIter := func(t *testing.T, list List, expected []string, start int) {
assert := assert.New(t)
expected = expected[start:]
iter := newSequenceIterator(list.seq, uint64(start))
actual := []string{}
for iter.hasMore() {
actual = append(actual, string(iter.item().(String)))
iter.advance(1)
}
assert.Equal(len(expected), len(actual))
assert.Equal(expected, actual)
}
list, expected := genList()
testIter(t, list, expected, 0)
testIter(t, list, expected, len(expected)/2)
}
+94
View File
@@ -0,0 +1,94 @@
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package types
import (
"github.com/attic-labs/noms/go/hash"
)
// sequenceReadAhead implements read-ahead by mapping a hash to a channel returning
// the corresponding sequence.
//
// It reads ahead by firing off a set of short-lived go routines. Each go
// routine (1) reads a sequence, (2) inserts it into a channel, (3) adds the
// channel to the map keyed by sequence hash, and (4) exits.
//
// The caller retrieves the sequence by looking up the channel by hash and
// reading from it.
//
// It maintains parallelism |p| by initially firing off |p| go routines
// to read the next |p| sequences. When a sequence is retreived from the cache,
// It fires off a new go-routine to read the next sequence. This ensures that
// there are always |p| outstanding channels to read from the cache.
//
// This approach has one major advantage over a channel based approach:
// there are no go-routines to shutdown when finished with the cursor. This
// avoids requiring caller to call a Close() method.
type sequenceReadAhead struct {
cursor *sequenceCursor
cache map[raKey]chan sequence
parallelism int
getCount float32
hitCount float32
}
// raKey is the future key. Rather than simply use the hash, we combines it
// with the local chunk offset. This increases the likelihood that repeat values
// in the sequence will get unique entries in the map.
type raKey struct {
idx int
hash hash.Hash
}
func newSequenceReadAhead(cursor *sequenceCursor, parallelism int) *sequenceReadAhead {
m := map[raKey]chan sequence{}
return &sequenceReadAhead{cursor.clone(), m, parallelism, 0, 0}
}
func (ra *sequenceReadAhead) get(idx int, h hash.Hash) (sequence, bool) {
ra.readAhead()
key := raKey{idx, h}
ra.getCount += 1
if future, ok := ra.cache[key]; ok {
result := <-future
ra.hitCount += 1
delete(ra.cache, key)
return result, true
}
return nil, false
}
// readAhead (called when read-ahead is enabled) primes the next entries in the
// read-ahead cache. It ensures that go routines have been allocated for reading
// the next n entries in the current sequence. N is either readAheadParallelism
// or the number of entries left in the sequence if smaller.
func (ra *sequenceReadAhead) readAhead() {
// the next position to be primed
count := ra.parallelism - len(ra.cache)
for i := 0; i < count; i += 1 {
if !ra.cursor.advance() {
break
}
future := make(chan sequence, 1)
key := raKey{
ra.cursor.idx,
ra.cursor.current().(metaTuple).ref.target,
}
ra.cache[key] = future
seq := ra.cursor.seq
idx := ra.cursor.idx
go func() {
// close not required here but ensures fast fail if channel is misused
defer close(future)
val := seq.getChildSequence(idx)
future <- val
}()
}
}
func (rc *sequenceReadAhead) hitRate() float32 {
return rc.hitCount / rc.getCount
}
+21
View File
@@ -0,0 +1,21 @@
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package math
// MaxInt returns the larger of x or y.
func MaxInt(x, y int) int {
if x > y {
return x
}
return y
}
// MinInt returns the smaller of x or y.
func MinInt(x, y int) int {
if x < y {
return x
}
return y
}