Introduce NewStreamingTypedList()

NewStreamingTypedList() reads Values from a channel and appends them
to a List, chunking as it goes and writing these chunks to a given
ChunkSink. It returns a `chan List` that the caller can get the
finished List from once he's done writing values to the `chan Value`
he provided at call-time.
This commit is contained in:
Chris Masone
2016-02-05 16:01:46 -08:00
parent ed9e8b9aed
commit 937dd624d0
10 changed files with 81 additions and 34 deletions

View File

@@ -50,7 +50,7 @@ func newBlobLeafChunkFn() makeChunkFn {
}
func NewBlob(r io.Reader) Blob {
seq := newEmptySequenceChunker(newBlobLeafChunkFn(), newIndexedMetaSequenceChunkFn(typeForBlob), newBlobLeafBoundaryChecker(), newIndexedMetaSequenceBoundaryChecker)
seq := newEmptySequenceChunker(newBlobLeafChunkFn(), newIndexedMetaSequenceChunkFn(typeForBlob, nil), newBlobLeafBoundaryChecker(), newIndexedMetaSequenceBoundaryChecker)
buf := []byte{0}
for {
n, err := r.Read(buf)

View File

@@ -190,11 +190,11 @@ func (cl compoundList) sequenceCursorAtIndex(idx uint64) *sequenceCursor {
func (cl compoundList) sequenceChunkerAtIndex(idx uint64) *sequenceChunker {
cur := cl.sequenceCursorAtIndex(idx)
return newSequenceChunker(cur, makeListLeafChunkFn(cl.t), newIndexedMetaSequenceChunkFn(cl.t), newListLeafBoundaryChecker(), newIndexedMetaSequenceBoundaryChecker)
return newSequenceChunker(cur, makeListLeafChunkFn(cl.t, nil), newIndexedMetaSequenceChunkFn(cl.t, nil), newListLeafBoundaryChecker(), newIndexedMetaSequenceBoundaryChecker)
}
func (cl compoundList) Filter(cb listFilterCallback) List {
seq := newEmptySequenceChunker(makeListLeafChunkFn(cl.t), newIndexedMetaSequenceChunkFn(cl.t), newListLeafBoundaryChecker(), newIndexedMetaSequenceBoundaryChecker)
seq := newEmptySequenceChunker(makeListLeafChunkFn(cl.t, nil), newIndexedMetaSequenceChunkFn(cl.t, nil), newListLeafBoundaryChecker(), newIndexedMetaSequenceBoundaryChecker)
cl.IterAll(func(v Value, idx uint64) {
if cb(v, idx) {
seq.Append(v)
@@ -254,7 +254,7 @@ func newListLeafBoundaryChecker() boundaryChecker {
})
}
func makeListLeafChunkFn(t Type) makeChunkFn {
func makeListLeafChunkFn(t Type, cs chunks.ChunkSink) makeChunkFn {
return func(items []sequenceItem) (sequenceItem, Value) {
values := make([]Value, len(items))
@@ -263,6 +263,9 @@ func makeListLeafChunkFn(t Type) makeChunkFn {
}
list := valueFromType(newListLeaf(nil, t, values...), t)
if cs != nil {
return metaTuple{nil, WriteValue(list, cs), Uint64(len(values))}, list
}
return metaTuple{list, ref.Ref{}, Uint64(len(values))}, list
}
}

View File

@@ -62,7 +62,7 @@ func getTestSimpleListUnique() testSimpleList {
uniques[s.Int63()] = true
}
values := make([]Value, 0, length)
for k, _ := range uniques {
for k := range uniques {
values = append(values, Int64(k))
}
return values
@@ -76,6 +76,39 @@ func testSimpleListFromNomsList(list List) testSimpleList {
return simple
}
func TestStreamingCompoundListCreation(t *testing.T) {
assert := assert.New(t)
cs := chunks.NewTestStore()
simpleList := getTestSimpleList()
tr := MakeCompoundType(ListKind, MakePrimitiveType(Int64Kind))
cl := NewTypedList(tr, simpleList...)
valueChan := make(chan Value)
listChan := NewStreamingTypedList(tr, cs, valueChan)
go func() {
for _, v := range simpleList {
valueChan <- v
}
close(valueChan)
}()
assertChunksEqual(assert, cl, <-listChan, cs)
}
func assertChunksEqual(assert *assert.Assertions, v1, v2 Value, cs chunks.ChunkSource) {
assert.EqualValues(v1.Ref(), v2.Ref())
v1Chunks, v2Chunks := v1.Chunks(), v2.Chunks()
if assert.NotEmpty(v2Chunks) {
assert.Equal(len(v1Chunks), len(v2Chunks))
assert.True(cs.Has(v1.Ref()))
for _, r := range v2Chunks {
assert.Contains(v1Chunks, r)
assert.True(cs.Has(r))
}
}
}
func TestCompoundListGet(t *testing.T) {
assert := assert.New(t)
@@ -124,7 +157,7 @@ func TestCompoundListIterAll(t *testing.T) {
expectIdx := uint64(0)
cl.IterAll(func(v Value, idx uint64) {
assert.Equal(expectIdx, idx)
expectIdx += 1
expectIdx++
assert.Equal(simpleList[idx], v)
})
@@ -218,7 +251,6 @@ func TestCompoundListCursorAt(t *testing.T) {
return
}
}
panic("not reachable")
}
assert.Equal(getTestSimpleListLen(), listLen(0, func(cur *sequenceCursor) bool {

View File

@@ -95,9 +95,8 @@ func (cm compoundMap) Remove(k Value) Map {
if seq, found := cm.sequenceChunkerAtKey(k); found {
seq.Skip()
return seq.Done().(Map)
} else {
return cm
}
return cm
}
func (cm compoundMap) sequenceChunkerAtKey(k Value) (*sequenceChunker, bool) {

View File

@@ -186,16 +186,14 @@ func TestCompoundMapIter(t *testing.T) {
idx := uint64(0)
endAt := uint64(mapPattern)
m.Iter(func(k, v Value) bool {
m.Iter(func(k, v Value) (done bool) {
assert.True(tm.entries[idx].key.Equals(k))
assert.True(tm.entries[idx].value.Equals(v))
if idx == endAt {
idx += 1
return true
done = true
}
idx += 1
return false
idx++
return
})
assert.Equal(endAt, idx-1)

View File

@@ -138,15 +138,13 @@ func TestCompoundSetIter(t *testing.T) {
idx := uint64(0)
endAt := uint64(setPattern)
set.Iter(func(v Value) bool {
set.Iter(func(v Value) (done bool) {
assert.True(ts.values[idx].Equals(v))
if idx == endAt {
idx += 1
return true
done = true
}
idx += 1
return false
idx++
return
})
assert.Equal(endAt, idx-1)

View File

@@ -2,6 +2,7 @@ package types
import (
"encoding/base64"
"fmt"
"io/ioutil"
"strconv"
"strings"
@@ -153,18 +154,17 @@ func (r *jsonArrayReader) readMap(t Type, pkg *Package) Value {
func indexTypeForMetaSequence(t Type) Type {
switch t.Kind() {
default:
panic(fmt.Sprintf("Unknown type used for metaSequence: %s", t.Describe()))
case BlobKind, ListKind:
return MakePrimitiveType(Uint64Kind)
case MapKind, SetKind:
elemType := t.Desc.(CompoundDesc).ElemTypes[0]
if elemType.IsOrdered() {
return elemType
} else {
return MakeCompoundType(RefKind, MakePrimitiveType(ValueKind))
}
case BlobKind, ListKind:
return MakePrimitiveType(Uint64Kind)
return MakeCompoundType(RefKind, MakePrimitiveType(ValueKind))
}
panic("unreached")
}
func (r *jsonArrayReader) maybeReadMetaSequence(t Type, pkg *Package) (Value, bool) {

View File

@@ -3,6 +3,7 @@ package types
import (
"crypto/sha1"
"github.com/attic-labs/noms/chunks"
"github.com/attic-labs/noms/ref"
)
@@ -13,7 +14,7 @@ func newIndexedMetaSequenceBoundaryChecker() boundaryChecker {
})
}
func newIndexedMetaSequenceChunkFn(t Type) makeChunkFn {
func newIndexedMetaSequenceChunkFn(t Type, cs chunks.ChunkSink) makeChunkFn {
return func(items []sequenceItem) (sequenceItem, Value) {
tuples := make(metaSequenceData, len(items))
@@ -22,6 +23,9 @@ func newIndexedMetaSequenceChunkFn(t Type) makeChunkFn {
}
meta := newMetaSequenceFromData(tuples, t, nil)
if cs != nil {
return metaTuple{nil, WriteValue(meta, cs), Uint64(tuples.uint64ValuesSum())}, meta
}
return metaTuple{meta, ref.Ref{}, Uint64(tuples.uint64ValuesSum())}, meta
}
}

View File

@@ -1,5 +1,7 @@
package types
import "github.com/attic-labs/noms/chunks"
type List interface {
Value
Len() uint64
@@ -26,15 +28,30 @@ type MapFunc func(v Value, index uint64) interface{}
var listType = MakeCompoundType(ListKind, MakePrimitiveType(ValueKind))
// NewList creates a new untyped List, populated with values, chunking if and when needed.
func NewList(v ...Value) List {
return NewTypedList(listType, v...)
}
// NewTypedList creates a new List with type t, populated with values, chunking if and when needed.
func NewTypedList(t Type, values ...Value) List {
seq := newEmptySequenceChunker(makeListLeafChunkFn(t), newIndexedMetaSequenceChunkFn(t), newListLeafBoundaryChecker(), newIndexedMetaSequenceBoundaryChecker)
seq := newEmptySequenceChunker(makeListLeafChunkFn(t, nil), newIndexedMetaSequenceChunkFn(t, nil), newListLeafBoundaryChecker(), newIndexedMetaSequenceBoundaryChecker)
for _, v := range values {
seq.Append(v)
}
return seq.Done().(List)
}
// NewStreamingTypedList creates a new List with type t, populated with values, chunking if and when needed. As chunks are created, they're written to cs -- including the root chunk of the list. Once the caller has closed values, she can read the completed List from the returned channel.
func NewStreamingTypedList(t Type, cs chunks.ChunkSink, values <-chan Value) <-chan List {
out := make(chan List)
go func() {
seq := newEmptySequenceChunker(makeListLeafChunkFn(t, cs), newIndexedMetaSequenceChunkFn(t, cs), newListLeafBoundaryChecker(), newIndexedMetaSequenceBoundaryChecker)
for v := range values {
seq.Append(v)
}
out <- seq.Done().(List)
close(out)
}()
return out
}

View File

@@ -137,8 +137,6 @@ func newMetaSequenceCursor(root metaSequence, cs chunks.ChunkSource) (*sequenceC
return cursor, val
}
}
panic("not reachable")
}
func readMetaTupleValue(item sequenceItem, cs chunks.ChunkSource) Value {
@@ -160,6 +158,4 @@ func iterateMetaSequenceLeaf(ms metaSequence, cs chunks.ChunkSource, cb func(Val
v = readMetaTupleValue(cursor.current(), cs)
}
panic("not reachable")
}