mirror of
https://github.com/dolthub/dolt.git
synced 2026-05-14 11:29:06 -05:00
Implement GraphBuilder (#2548)
This commit is contained in:
+64
-64
@@ -6,15 +6,15 @@ package types
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
"github.com/attic-labs/testify/assert"
|
||||
)
|
||||
|
||||
var prefix = []byte{0x01, 0x02, 0x03, 0x04}
|
||||
|
||||
func TestTotalOrdering(t *testing.T) {
|
||||
func TestCompareTotalOrdering(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
// values in increasing order. Some of these are compared by ref so changing the serialization might change the ordering.
|
||||
@@ -57,88 +57,88 @@ func TestCompareEmpties(t *testing.T) {
|
||||
|
||||
func TestCompareDifferentPrimitiveTypes(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
comp := opCacheComparer{}
|
||||
vrw := NewTestValueStore()
|
||||
defer vrw.Close()
|
||||
|
||||
b := append(prefix, byte(BoolKind), 0x00)
|
||||
n := append(prefix, byte(NumberKind), 0x00)
|
||||
s := append(prefix, byte(StringKind), 'a')
|
||||
nums := ValueSlice{Number(1), Number(2), Number(3)}
|
||||
words := ValueSlice{String("k1"), String("v1")}
|
||||
|
||||
assert.Equal(-1, comp.Compare(b, n))
|
||||
assert.Equal(-1, comp.Compare(b, s))
|
||||
assert.Equal(-1, comp.Compare(n, s))
|
||||
blob := NewBlob(bytes.NewBuffer([]byte{1, 2, 3}))
|
||||
nList := NewList(nums...)
|
||||
nMap := NewMap(words...)
|
||||
nRef := NewRef(blob)
|
||||
nSet := NewSet(nums...)
|
||||
nStruct := NewStruct("teststruct", map[string]Value{"f1": Number(1)})
|
||||
|
||||
assert.Equal(1, comp.Compare(s, n))
|
||||
assert.Equal(1, comp.Compare(s, b))
|
||||
assert.Equal(1, comp.Compare(n, b))
|
||||
vals := ValueSlice{Bool(true), Number(19), String("hellow"), blob, nList, nMap, nRef, nSet, nStruct}
|
||||
sort.Sort(vals)
|
||||
|
||||
for i, v1 := range vals {
|
||||
for j, v2 := range vals {
|
||||
iBytes := [1024]byte{}
|
||||
jBytes := [1024]byte{}
|
||||
res := compareEncodedKey(encodeGraphKey(iBytes[:0], v1, vrw), encodeGraphKey(jBytes[:0], v2, vrw))
|
||||
assert.Equal(compareInts(i, j), res)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestComparePrimitives(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
comp := opCacheComparer{}
|
||||
|
||||
tru := encode(Bool(true))
|
||||
fls := encode(Bool(false))
|
||||
one := encode(Number(1))
|
||||
fortytwo := encode(Number(42))
|
||||
hey := encode(String("hey"))
|
||||
ya := encode(String("ya"))
|
||||
bools := []Bool{false, true}
|
||||
for i, v1 := range bools {
|
||||
for j, v2 := range bools {
|
||||
res := compareEncodedNomsValues(encode(v1), encode(v2))
|
||||
assert.Equal(compareInts(i, j), res)
|
||||
}
|
||||
}
|
||||
|
||||
assert.Equal(-1, comp.Compare(fls, tru))
|
||||
assert.Equal(-1, comp.Compare(one, fortytwo))
|
||||
assert.Equal(-1, comp.Compare(hey, ya))
|
||||
nums := []Number{-1111.29, -23, 0, 4.2345, 298}
|
||||
for i, v1 := range nums {
|
||||
for j, v2 := range nums {
|
||||
res := compareEncodedNomsValues(encode(v1), encode(v2))
|
||||
assert.Equal(compareInts(i, j), res)
|
||||
}
|
||||
}
|
||||
|
||||
assert.Equal(0, comp.Compare(tru, tru))
|
||||
assert.Equal(0, comp.Compare(one, one))
|
||||
assert.Equal(0, comp.Compare(hey, hey))
|
||||
|
||||
assert.Equal(1, comp.Compare(tru, fls))
|
||||
assert.Equal(1, comp.Compare(fortytwo, one))
|
||||
assert.Equal(1, comp.Compare(ya, hey))
|
||||
words := []String{"", "aaa", "another", "another1"}
|
||||
for i, v1 := range words {
|
||||
for j, v2 := range words {
|
||||
res := compareEncodedNomsValues(encode(v1), encode(v2))
|
||||
assert.Equal(compareInts(i, j), res)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCompareHashes(t *testing.T) {
|
||||
func TestCompareEncodedKeys(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
comp := opCacheComparer{}
|
||||
vrw := NewTestValueStore()
|
||||
defer vrw.Close()
|
||||
|
||||
tru := encode(Bool(true))
|
||||
one := encode(Number(1))
|
||||
hey := encode(String("hey"))
|
||||
k1 := ValueSlice{String("one"), Number(3)}
|
||||
k2 := ValueSlice{String("one"), Number(5)}
|
||||
|
||||
minHash := append(prefix, append([]byte{byte(BlobKind)}, bytes.Repeat([]byte{0}, hash.ByteLen)...)...)
|
||||
maxHash := append(prefix, append([]byte{byte(BlobKind)}, bytes.Repeat([]byte{0xff}, hash.ByteLen)...)...)
|
||||
almostMaxHash := append(prefix, append([]byte{byte(BlobKind)}, append(bytes.Repeat([]byte{0xff}, hash.ByteLen-1), 0xfe)...)...)
|
||||
bs1 := [initialBufferSize]byte{}
|
||||
bs2 := [initialBufferSize]byte{}
|
||||
|
||||
assert.Equal(-1, comp.Compare(tru, minHash))
|
||||
assert.Equal(-1, comp.Compare(one, minHash))
|
||||
assert.Equal(-1, comp.Compare(hey, minHash))
|
||||
assert.Equal(-1, comp.Compare(minHash, almostMaxHash))
|
||||
assert.Equal(-1, comp.Compare(almostMaxHash, maxHash))
|
||||
|
||||
assert.Equal(0, comp.Compare(minHash, minHash))
|
||||
assert.Equal(0, comp.Compare(almostMaxHash, almostMaxHash))
|
||||
assert.Equal(0, comp.Compare(maxHash, maxHash))
|
||||
|
||||
assert.Equal(1, comp.Compare(minHash, tru))
|
||||
assert.Equal(1, comp.Compare(minHash, one))
|
||||
assert.Equal(1, comp.Compare(minHash, hey))
|
||||
assert.Equal(1, comp.Compare(almostMaxHash, tru))
|
||||
assert.Equal(1, comp.Compare(almostMaxHash, one))
|
||||
assert.Equal(1, comp.Compare(almostMaxHash, hey))
|
||||
assert.Equal(1, comp.Compare(maxHash, tru))
|
||||
assert.Equal(1, comp.Compare(maxHash, one))
|
||||
assert.Equal(1, comp.Compare(maxHash, hey))
|
||||
assert.Equal(1, comp.Compare(maxHash, almostMaxHash))
|
||||
assert.Equal(1, comp.Compare(almostMaxHash, minHash))
|
||||
|
||||
almostMaxHash[5]++
|
||||
assert.Equal(1, comp.Compare(maxHash, almostMaxHash))
|
||||
|
||||
almostMaxHash[0]++
|
||||
assert.Equal(-1, comp.Compare(maxHash, almostMaxHash))
|
||||
e1, _ := encodeKeys(bs1[:0], 0x01020304, MapKind, k1, vrw)
|
||||
e2, _ := encodeKeys(bs2[:0], 0x01020304, MapKind, k2, vrw)
|
||||
assert.Equal(-1, comp.Compare(e1, e2))
|
||||
}
|
||||
|
||||
func encode(v Value) []byte {
|
||||
w := &binaryNomsWriter{make([]byte, 128, 128), 0}
|
||||
newValueEncoder(w, nil).writeValue(v)
|
||||
return append(prefix, w.data()...)
|
||||
return w.data()
|
||||
}
|
||||
|
||||
func compareInts(i, j int) (res int) {
|
||||
if i < j {
|
||||
res = -1
|
||||
} else if i > j {
|
||||
res = 1
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -0,0 +1,309 @@
|
||||
// Copyright 2016 Attic Labs, Inc. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
// GraphBuilder allows non-RAM-bound construction of a graph of nested Maps whose
|
||||
// leaf collections can be Lists, Sets, or Maps that contain any type of Noms
|
||||
// Values.
|
||||
//
|
||||
// Graphs are built by calling one of the GraphBuilder functions:
|
||||
// MapSet(graphKeys, key, value)
|
||||
// SetInsert(graphKeys, value)
|
||||
// ListAppend(graphKeys, value)
|
||||
//
|
||||
// GraphBuilder uses an opCache to store graph operations in leveldb and to be
|
||||
// able to read them out later in a way which ensures a total ordering of all
|
||||
// the nodes at each level of the graph. (See opcache.go for more info on how
|
||||
// that is done)
|
||||
//
|
||||
// GraphBuilder.Build() does the work of assembling the graph. Build() gets an
|
||||
// iterator for this graph from the opCache and uses it to iterate over all the
|
||||
// operations that have been stored for this graph. opCache ensures that the
|
||||
// operations are returned in optimal sorted order so that sequenceChunker can
|
||||
// most efficiently assemble the graph. Build() will ensure that there is a Map
|
||||
// object for each key in |graphKeys|. Any node that falls in the middle of the
|
||||
// graph must be a Map, although, intermediate nodes may have any element as keys
|
||||
// as long as the path formed by the graphKeys doesn't conflict.
|
||||
//
|
||||
// MapSet(), SetInsert(), and ListAppend() are threadsafe meaning they can safely
|
||||
// be called from different go routines. However, the semantics of ListAppend()
|
||||
// are such that the order of the list will be determined by which thread() calls
|
||||
// ListAppend first (this function call may be modified later to allow specification
|
||||
// of index or order).
|
||||
//
|
||||
// Build() should only be called once, after all the operations for the graph
|
||||
// have been stored. It is the caller's responsibility to make sure that all
|
||||
// calls to the mutation operations have completed before Build() is invoked.
|
||||
//
|
||||
|
||||
package types
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/util/status"
|
||||
"github.com/dustin/go-humanize"
|
||||
)
|
||||
|
||||
type GraphBuilder struct {
|
||||
oc opCache
|
||||
vrw ValueReadWriter
|
||||
stack graphStack
|
||||
verbose bool
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
// NewGraphBuilder() returns an new GraphBuilder object.
|
||||
func NewGraphBuilder(vrw ValueReadWriter, rootKind NomsKind, verbose bool) *GraphBuilder {
|
||||
return newGraphBuilder(vrw, vrw.opCache(), rootKind, verbose)
|
||||
}
|
||||
|
||||
func newGraphBuilder(vrw ValueReadWriter, opc opCache, rootKind NomsKind, verbose bool) *GraphBuilder {
|
||||
b := &GraphBuilder{oc: opc, vrw: vrw, verbose: verbose}
|
||||
b.pushNewKeyOnStack(String("ROOT"), rootKind)
|
||||
return b
|
||||
}
|
||||
|
||||
// MapSet() will add the key/value pair |k, v| to the map found by traversing
|
||||
// the graph using the |keys| slice. Intermediate maps referenced by |keys| are
|
||||
// created as necessary. This is threadsafe, may call from multiple go routines.
|
||||
func (b *GraphBuilder) MapSet(keys []Value, k Value, v Value) {
|
||||
d.Chk.True(b.oc != nil, "Can't call MapSet() again after Build()")
|
||||
b.oc.GraphMapSet(keys, k, v)
|
||||
}
|
||||
|
||||
// SetInsert() will insert the value |v| into the set at path |keys|. Intermediate
|
||||
// maps referenced by |keys| are created as necessary. This is threadsafe, may
|
||||
// call from multiple go routines.
|
||||
func (b *GraphBuilder) SetInsert(keys []Value, v Value) {
|
||||
d.Chk.True(b.oc != nil, "Can't call SetInsert() again after Build()")
|
||||
b.oc.GraphSetInsert(keys, v)
|
||||
}
|
||||
|
||||
// ListAppends() will append |v| to the list at path |p|. Intermediate
|
||||
// maps referenced by |keys| are created as necessary. This is threadsafe, may
|
||||
// call from multiple go routines, however append semantics are such that the
|
||||
// elements will be appended in order that functions are called, so order has
|
||||
// to be managed by caller.
|
||||
func (b *GraphBuilder) ListAppend(keys []Value, v Value) {
|
||||
d.Chk.True(b.oc != nil, "Can't call ListAppend() again after Build()")
|
||||
b.oc.GraphListAppend(keys, v)
|
||||
}
|
||||
|
||||
type graphOpContainer struct {
|
||||
keys []Value
|
||||
kind NomsKind
|
||||
item sequenceItem
|
||||
}
|
||||
|
||||
// Builds and returns the graph. This method should only be called after all
|
||||
// calls to the mutation operations (i.e. MapSet, SetInsert, and ListAppend)
|
||||
// have completed. It is the caller's responsibility to ensure that this is
|
||||
// the case. Build() will panic if called more than once on any GraphBuilder
|
||||
// object.
|
||||
func (b *GraphBuilder) Build() Value {
|
||||
var opc opCache
|
||||
|
||||
checkFirstCall := func() {
|
||||
b.mutex.Lock()
|
||||
defer b.mutex.Unlock()
|
||||
|
||||
d.PanicIfTrue(b.oc == nil, "Can only call Build() once")
|
||||
opc = b.oc
|
||||
b.oc = nil
|
||||
}
|
||||
|
||||
checkFirstCall()
|
||||
|
||||
iter := opc.NewIterator()
|
||||
defer iter.Release()
|
||||
keyCnt := int64(0)
|
||||
|
||||
// start up a go routine that will do the reading from graphBuilder's private
|
||||
// ldb opCache.
|
||||
graphOpChan := make(chan graphOpContainer, 512)
|
||||
go func() {
|
||||
for iter.Next() {
|
||||
keys, kind, item := iter.GraphOp()
|
||||
container := graphOpContainer{keys: keys, kind: kind, item: item}
|
||||
graphOpChan <- container
|
||||
}
|
||||
close(graphOpChan)
|
||||
}()
|
||||
|
||||
// iterator returns keys, in sort order by array
|
||||
for goc := range graphOpChan {
|
||||
keys, kind, item := goc.keys, goc.kind, goc.item
|
||||
|
||||
// Get index of first key that is different than what is on the stack
|
||||
idx := commonPrefixCount(b.stack, keys)
|
||||
if idx == -1 {
|
||||
// no keys have changed we're working on same coll as previous
|
||||
// iteration, just append to sequenceChunker at top of stack
|
||||
b.appendItemToCurrentTopOfStack(kind, item)
|
||||
continue
|
||||
}
|
||||
|
||||
// Some keys that were in the last graphOp are no longer present
|
||||
// which indicates that we are finished adding to those cols. Pop
|
||||
// those keys from the stack. This will cause any popped cols to be
|
||||
// closed and added to their parents.
|
||||
for idx < b.stack.lastIdx() {
|
||||
b.popKeyFromStack()
|
||||
}
|
||||
|
||||
// We may have popped some keys off of the stack and are left with
|
||||
// an item to append to the stack of a previously existing key.
|
||||
if b.stack.lastIdx() == len(keys) {
|
||||
b.appendItemToCurrentTopOfStack(kind, item)
|
||||
}
|
||||
|
||||
// Or we may have some new keys to add to the stack. Add those keys
|
||||
// and then append the item to the top element.
|
||||
for b.stack.lastIdx() < len(keys) {
|
||||
if b.stack.lastIdx() < len(keys)-1 {
|
||||
b.pushNewKeyOnStack(keys[b.stack.lastIdx()], MapKind)
|
||||
} else {
|
||||
b.pushNewKeyOnStack(keys[b.stack.lastIdx()], kind)
|
||||
b.appendItemToCurrentTopOfStack(kind, item)
|
||||
}
|
||||
if b.verbose {
|
||||
keyCnt++
|
||||
status.Printf("Added %s keys to graph", humanize.Comma(keyCnt))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We're done adding elements. Pop any intermediate keys off the stack and
|
||||
// fold their results into their parent map.
|
||||
for b.stack.len() > 1 {
|
||||
b.popKeyFromStack()
|
||||
}
|
||||
res := b.stack.pop().done()
|
||||
if b.verbose {
|
||||
status.Done()
|
||||
}
|
||||
return res
|
||||
|
||||
}
|
||||
|
||||
// pushNewKeyOnStack() creates a new graphStackElem node and pushes it on the
|
||||
// stack. The new element contains the |key| and a new sequenceChunker that will
|
||||
// be appended to to build this node in the graph.
|
||||
func (b *GraphBuilder) pushNewKeyOnStack(key Value, kind NomsKind) {
|
||||
var ch *sequenceChunker
|
||||
switch kind {
|
||||
case MapKind:
|
||||
ch = newEmptyMapSequenceChunker(b.vrw, b.vrw)
|
||||
case SetKind:
|
||||
ch = newEmptySetSequenceChunker(b.vrw, b.vrw)
|
||||
case ListKind:
|
||||
ch = newEmptyListSequenceChunker(b.vrw, b.vrw)
|
||||
default:
|
||||
panic("bad 'kind' value in GraphBuilder, newElem()")
|
||||
}
|
||||
b.stack.push(&graphStackElem{key: key, kind: kind, ch: ch})
|
||||
}
|
||||
|
||||
// popKeyFromStack() pops the last element off the stack, calls done() to
|
||||
// finish any sequenceChunking that is in progress, and then assigns the
|
||||
// finished collection it's parent map.
|
||||
func (b *GraphBuilder) popKeyFromStack() {
|
||||
elem := b.stack.pop()
|
||||
col := elem.done()
|
||||
top := b.stack.top()
|
||||
top.ch.Append(mapEntry{elem.key, col})
|
||||
}
|
||||
|
||||
// appendItemToCurrentTopOfStack() adds the current item to the sequenceChunker
|
||||
// that's on the top of the stack.
|
||||
func (b *GraphBuilder) appendItemToCurrentTopOfStack(kind NomsKind, item sequenceItem) {
|
||||
top := b.stack.top()
|
||||
d.PanicIfTrue(top.kind != kind)
|
||||
top.ch.Append(item)
|
||||
}
|
||||
|
||||
type graphStackElem struct {
|
||||
key Value
|
||||
kind NomsKind
|
||||
ch *sequenceChunker
|
||||
}
|
||||
|
||||
type graphStack struct {
|
||||
elems []*graphStackElem
|
||||
}
|
||||
|
||||
func (s *graphStack) push(e *graphStackElem) {
|
||||
s.elems = append(s.elems, e)
|
||||
}
|
||||
|
||||
func (s *graphStack) pop() *graphStackElem {
|
||||
l := len(s.elems) - 1
|
||||
elem := s.elems[l] // last element
|
||||
s.elems = s.elems[:l] // truncate last element
|
||||
return elem
|
||||
}
|
||||
|
||||
func (s *graphStack) top() *graphStackElem {
|
||||
l := len(s.elems) - 1
|
||||
return s.elems[l] // last element
|
||||
}
|
||||
|
||||
func (s *graphStack) len() int {
|
||||
return len(s.elems)
|
||||
}
|
||||
|
||||
func (s *graphStack) lastIdx() int {
|
||||
return len(s.elems) - 1
|
||||
}
|
||||
|
||||
func (s graphStack) String() string {
|
||||
buf := bytes.Buffer{}
|
||||
for i := len(s.elems) - 1; i >= 0; i-- {
|
||||
fmt.Fprintln(&buf, "#:", i, s.elems[i])
|
||||
}
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
// done() creates the appropriate collection for this element and returns it
|
||||
func (e *graphStackElem) done() Collection {
|
||||
switch e.kind {
|
||||
case MapKind:
|
||||
return newMap(e.ch.Done().(orderedSequence))
|
||||
case SetKind:
|
||||
return newSet(e.ch.Done().(orderedSequence))
|
||||
case ListKind:
|
||||
return newList(e.ch.Done())
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
// Returns index of first element in keys that is different from stack. Note,
|
||||
// return value can be equal to len(keys) if there are more element in stack
|
||||
// than in keys
|
||||
func commonPrefixCount(stack graphStack, keys ValueSlice) int {
|
||||
minLen := len(keys)
|
||||
// don't consider the 'ROOT' stack element
|
||||
elems := stack.elems[1:]
|
||||
if len(elems) < minLen {
|
||||
minLen = len(elems)
|
||||
}
|
||||
|
||||
for i := 0; i < minLen; i++ {
|
||||
if !elems[i].key.Equals(keys[i]) {
|
||||
return i
|
||||
}
|
||||
}
|
||||
|
||||
if len(keys) == len(elems) {
|
||||
return -1
|
||||
}
|
||||
return minLen
|
||||
}
|
||||
|
||||
func (e *graphStackElem) String() string {
|
||||
return fmt.Sprintf("key: %s, kind: %s, seq: %p", EncodedValue(e.key), KindToString[e.kind], e.ch)
|
||||
}
|
||||
@@ -0,0 +1,320 @@
|
||||
// Copyright 2016 Attic Labs, Inc. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package types
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/attic-labs/testify/assert"
|
||||
)
|
||||
|
||||
func TestGraphBuilderFindIndex(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
elems := []*graphStackElem{
|
||||
&graphStackElem{key: String("ROOT")},
|
||||
&graphStackElem{key: String("one")},
|
||||
&graphStackElem{key: String("two")},
|
||||
&graphStackElem{key: String("three")},
|
||||
&graphStackElem{key: String("four")},
|
||||
}
|
||||
|
||||
s := graphStack{elems: elems}
|
||||
assert.Equal(0, commonPrefixCount(s, []Value{String("zero")}))
|
||||
assert.Equal(1, commonPrefixCount(s, []Value{String("one"), String("zero")}))
|
||||
assert.Equal(3, commonPrefixCount(s, []Value{String("one"), String("two"), String("three")}))
|
||||
assert.Equal(-1, commonPrefixCount(s, []Value{String("one"), String("two"), String("three"), String("four")}))
|
||||
assert.Equal(4, commonPrefixCount(s, []Value{String("one"), String("two"), String("three"), String("four"), String("five")}))
|
||||
|
||||
values := []Value{String("one"), String("two"), String("three"), String("four")}
|
||||
|
||||
assert.Equal(-1, commonPrefixCount(graphStack{elems: elems[:1]}, []Value{}))
|
||||
assert.Equal(0, commonPrefixCount(graphStack{elems: elems[:1]}, values))
|
||||
assert.Equal(1, commonPrefixCount(graphStack{elems: elems[:2]}, values))
|
||||
assert.Equal(3, commonPrefixCount(graphStack{elems: elems[:4]}, values))
|
||||
assert.Equal(-1, commonPrefixCount(graphStack{elems: elems}, values))
|
||||
assert.Equal(2, commonPrefixCount(graphStack{elems: elems[:5]}, values[:2]))
|
||||
}
|
||||
|
||||
type testGraphOp struct {
|
||||
keys ValueSlice
|
||||
kind NomsKind
|
||||
item sequenceItem
|
||||
}
|
||||
|
||||
func SafeEquals(v1, v2 Value) bool {
|
||||
if v1 == nil && v2 == nil {
|
||||
return true
|
||||
}
|
||||
if v1 == nil || v2 == nil {
|
||||
return false
|
||||
}
|
||||
return v1.Equals(v2)
|
||||
}
|
||||
|
||||
func TestGraphBuilderEncodeDecodeAsKey(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
vrw := NewTestValueStore()
|
||||
defer vrw.Close()
|
||||
|
||||
struct1 := NewStruct("teststruct", StructData{
|
||||
"f1": String("v1"),
|
||||
"f2": String("v2"),
|
||||
})
|
||||
|
||||
keys := ValueSlice{Bool(true), Number(19), String("think!"), struct1}
|
||||
byteBuf := [initialBufferSize]byte{}
|
||||
bs := byteBuf[:0]
|
||||
numKeys := len(keys)
|
||||
expectedRes := ValueSlice{}
|
||||
for _, k := range keys {
|
||||
if isKindOrderedByValue(k.Type().Kind()) {
|
||||
expectedRes = append(expectedRes, k)
|
||||
} else {
|
||||
expectedRes = append(expectedRes, nil)
|
||||
}
|
||||
bs = encodeGraphKey(bs, k, vrw)
|
||||
}
|
||||
res := ValueSlice{}
|
||||
for pos := 0; pos < numKeys; pos++ {
|
||||
var k Value
|
||||
bs, k = decodeValue(bs, false, vrw)
|
||||
res = append(res, k)
|
||||
}
|
||||
|
||||
assert.Equal(len(keys), len(res))
|
||||
for i, origKey := range expectedRes {
|
||||
assert.True(SafeEquals(origKey, res[i]))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGraphBuilderEncodeDecodeAsValue(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
vrw := NewTestValueStore()
|
||||
defer vrw.Close()
|
||||
|
||||
struct1 := NewStruct("teststruct", StructData{
|
||||
"f1": String("v1"),
|
||||
"f2": String("v2"),
|
||||
})
|
||||
|
||||
keys := ValueSlice{Bool(true), Number(19), String("think!"), struct1}
|
||||
byteBuf := [initialBufferSize]byte{}
|
||||
bs := byteBuf[:0]
|
||||
numKeys := len(keys)
|
||||
for _, k := range keys {
|
||||
bs = encodeGraphValue(bs, k, vrw)
|
||||
}
|
||||
res := ValueSlice{}
|
||||
for pos := 0; pos < numKeys; pos++ {
|
||||
var k Value
|
||||
bs, k = decodeValue(bs, true, vrw)
|
||||
res = append(res, k)
|
||||
}
|
||||
|
||||
assert.Equal(len(keys), len(res))
|
||||
for i, origKey := range keys {
|
||||
assert.True(SafeEquals(origKey, res[i]))
|
||||
}
|
||||
}
|
||||
|
||||
func TestGraphBuilderMapSetGraphOp(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
vs := NewTestValueStore()
|
||||
defer vs.Close()
|
||||
|
||||
opc := vs.opCache()
|
||||
|
||||
struct1 := NewStruct("teststruct", StructData{
|
||||
"f1": String("v1"),
|
||||
"f2": String("v2"),
|
||||
})
|
||||
keys := ValueSlice{Bool(true), Number(19), String("think!"), struct1}
|
||||
opc.GraphMapSet(keys, String("yo"), Number(199))
|
||||
iter := opc.NewIterator()
|
||||
assert.True(iter.Next())
|
||||
|
||||
keys1, kind, item := iter.GraphOp()
|
||||
assert.Equal(len(keys), len(keys1))
|
||||
assert.True(keys.Equals(keys1))
|
||||
assert.Equal(MapKind, kind)
|
||||
assert.IsType(mapEntry{}, item)
|
||||
me := item.(mapEntry)
|
||||
assert.True(String("yo").Equals(me.key))
|
||||
assert.True(Number(199).Equals(me.value))
|
||||
|
||||
assert.False(iter.Next())
|
||||
}
|
||||
|
||||
// createTestMap() constructs a graph sized according to the |levels| and
|
||||
// |avgSize| parameters. The graph will contain nested maps with a
|
||||
// depth == |levels|, each map will contain |avgSize| elements of different
|
||||
// types.
|
||||
func createTestMap(levels, avgSize int, valGen func() Value) Map {
|
||||
sampleSize := func() int {
|
||||
size := (int(rand.Int31()) % avgSize) + (avgSize / 2)
|
||||
if size < 2 {
|
||||
return 2
|
||||
}
|
||||
return size
|
||||
}
|
||||
|
||||
genLeaf := func() Value {
|
||||
numElems := sampleSize()
|
||||
elems := ValueSlice{}
|
||||
for i := 0; i < numElems; i++ {
|
||||
elems = append(elems, valGen())
|
||||
}
|
||||
switch rand.Int31() % 3 {
|
||||
case 0:
|
||||
if numElems%2 != 0 {
|
||||
numElems -= 1
|
||||
}
|
||||
return NewMap(elems[:numElems]...)
|
||||
case 1:
|
||||
return NewSet(elems...)
|
||||
case 2:
|
||||
return NewList(elems...)
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
var genChildren func(lvl int) Map
|
||||
genChildren = func(lvl int) Map {
|
||||
numChildren := sampleSize()
|
||||
kvs := ValueSlice{}
|
||||
for i := 0; i < numChildren; i++ {
|
||||
if lvl == levels {
|
||||
kvs = append(kvs, valGen(), genLeaf())
|
||||
} else {
|
||||
// Once in a while, throw in a non-collection value into the
|
||||
// middle of the graph
|
||||
if rand.Int31()%10 == 0 {
|
||||
kvs = append(kvs, valGen(), valGen())
|
||||
} else {
|
||||
kvs = append(kvs, valGen(), genChildren(lvl+1))
|
||||
}
|
||||
}
|
||||
}
|
||||
return NewMap(kvs...)
|
||||
}
|
||||
|
||||
return genChildren(0)
|
||||
}
|
||||
|
||||
// valGen() creates a random String, Number, or Struct Value
|
||||
func valGen() Value {
|
||||
num := rand.Int31() % 1000000
|
||||
switch rand.Int31() % 4 {
|
||||
case 0:
|
||||
return String(fmt.Sprintf("%d", num))
|
||||
case 1:
|
||||
return Number(num)
|
||||
case 2:
|
||||
return NewStruct("teststruct", map[string]Value{"f1": Number(num)})
|
||||
case 3:
|
||||
return NewStruct("teststruct", map[string]Value{"f1": String(fmt.Sprintf("%d", num))})
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
// dupSlice() duplicates a slice along with it's backing store.
|
||||
func dupSlice(s ValueSlice) ValueSlice {
|
||||
vs := make(ValueSlice, len(s))
|
||||
copy(vs, s)
|
||||
return vs
|
||||
}
|
||||
|
||||
func shuffle(a []testGraphOp) {
|
||||
for i := range a {
|
||||
j := rand.Intn(i + 1)
|
||||
if a[i].kind != ListKind && a[j].kind != ListKind {
|
||||
a[i], a[j] = a[j], a[i]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestGraphBuilderNestedMapSet(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
vs := NewTestValueStore()
|
||||
defer vs.Close()
|
||||
|
||||
expected := createTestMap(3, 4, valGen)
|
||||
b := NewGraphBuilder(vs, MapKind, false)
|
||||
|
||||
ops := []testGraphOp{}
|
||||
|
||||
isNomsCollectionKind := func(kind NomsKind) bool {
|
||||
return kind == MapKind || kind == SetKind || kind == ListKind
|
||||
}
|
||||
var generateOps func(keys []Value, col Value)
|
||||
generateOps = func(keys []Value, col Value) {
|
||||
switch c := col.(type) {
|
||||
case Map:
|
||||
c.Iter(func(k, v Value) bool {
|
||||
if isNomsCollectionKind(v.Type().Kind()) {
|
||||
newKeys := append(keys, k)
|
||||
generateOps(newKeys, v)
|
||||
} else {
|
||||
tgo := testGraphOp{keys: dupSlice(keys), kind: MapKind, item: mapEntry{k, v}}
|
||||
ops = append(ops, tgo)
|
||||
}
|
||||
return false
|
||||
})
|
||||
case List:
|
||||
c.Iter(func(v Value, idx uint64) bool {
|
||||
tgo := testGraphOp{keys: dupSlice(keys), kind: ListKind, item: v}
|
||||
ops = append(ops, tgo)
|
||||
return false
|
||||
})
|
||||
case Set:
|
||||
c.Iter(func(v Value) bool {
|
||||
tgo := testGraphOp{keys: dupSlice(keys), kind: SetKind, item: v}
|
||||
ops = append(ops, tgo)
|
||||
return false
|
||||
})
|
||||
}
|
||||
}
|
||||
generateOps(nil, expected)
|
||||
shuffle(ops)
|
||||
|
||||
for _, op := range ops {
|
||||
switch op.kind {
|
||||
case MapKind:
|
||||
b.MapSet(op.keys, op.item.(mapEntry).key, op.item.(mapEntry).value)
|
||||
case SetKind:
|
||||
b.SetInsert(op.keys, op.item.(Value))
|
||||
case ListKind:
|
||||
b.ListAppend(op.keys, op.item.(Value))
|
||||
}
|
||||
}
|
||||
|
||||
v := b.Build()
|
||||
assert.NotNil(v)
|
||||
assert.True(expected.Equals(v))
|
||||
}
|
||||
|
||||
func ExampleGraphBuilder_Build() {
|
||||
vs := NewTestValueStore()
|
||||
defer vs.Close()
|
||||
|
||||
gb := NewGraphBuilder(vs, MapKind, false)
|
||||
gb.SetInsert([]Value{String("parent"), String("children")}, String("John"))
|
||||
gb.SetInsert([]Value{String("parent"), String("children")}, String("Mary"))
|
||||
gb.SetInsert([]Value{String("parent"), String("children")}, String("Frieda"))
|
||||
gb.MapSet([]Value{String("parent"), String("ages")}, String("Father"), Number(42))
|
||||
gb.MapSet([]Value{String("parent"), String("ages")}, String("Mother"), Number(44))
|
||||
gb.ListAppend([]Value{String("parent"), String("chores")}, String("Make dinner"))
|
||||
gb.ListAppend([]Value{String("parent"), String("chores")}, String("Wash dishes"))
|
||||
gb.ListAppend([]Value{String("parent"), String("chores")}, String("Make breakfast"))
|
||||
gb.ListAppend([]Value{String("parent"), String("chores")}, String("Wash dishes"))
|
||||
gb.MapSet([]Value{String("parent")}, String("combinedAge"), Number(86))
|
||||
m := gb.Build()
|
||||
fmt.Println("map:", EncodedValue(m))
|
||||
}
|
||||
+10
-6
@@ -30,11 +30,11 @@ func newList(seq sequence) List {
|
||||
// NewList creates a new List where the type is computed from the elements in the list, populated
|
||||
// with values, chunking if and when needed.
|
||||
func NewList(values ...Value) List {
|
||||
seq := newEmptySequenceChunker(nil, nil, makeListLeafChunkFn(nil), newIndexedMetaSequenceChunkFn(ListKind, nil), hashValueBytes)
|
||||
ch := newEmptyListSequenceChunker(nil, nil)
|
||||
for _, v := range values {
|
||||
seq.Append(v)
|
||||
ch.Append(v)
|
||||
}
|
||||
return newList(seq.Done())
|
||||
return newList(ch.Done())
|
||||
}
|
||||
|
||||
// NewStreamingList creates a new List, populated with values, chunking if and when needed. As
|
||||
@@ -43,11 +43,11 @@ func NewList(values ...Value) List {
|
||||
func NewStreamingList(vrw ValueReadWriter, values <-chan Value) <-chan List {
|
||||
out := make(chan List)
|
||||
go func() {
|
||||
seq := newEmptySequenceChunker(vrw, vrw, makeListLeafChunkFn(vrw), newIndexedMetaSequenceChunkFn(ListKind, vrw), hashValueBytes)
|
||||
ch := newEmptyListSequenceChunker(vrw, vrw)
|
||||
for v := range values {
|
||||
seq.Append(v)
|
||||
ch.Append(v)
|
||||
}
|
||||
out <- newList(seq.Done())
|
||||
out <- newList(ch.Done())
|
||||
close(out)
|
||||
}()
|
||||
return out
|
||||
@@ -291,3 +291,7 @@ func makeListLeafChunkFn(vr ValueReader) makeChunkFn {
|
||||
return list, orderedKeyFromInt(len(values)), uint64(len(values))
|
||||
}
|
||||
}
|
||||
|
||||
func newEmptyListSequenceChunker(vr ValueReader, vw ValueWriter) *sequenceChunker {
|
||||
return newEmptySequenceChunker(vr, vw, makeListLeafChunkFn(vr), newIndexedMetaSequenceChunkFn(ListKind, vr), hashValueBytes)
|
||||
}
|
||||
|
||||
+10
-8
@@ -28,13 +28,13 @@ func mapHashValueBytes(item sequenceItem, rv *rollingValueHasher) {
|
||||
|
||||
func NewMap(kv ...Value) Map {
|
||||
entries := buildMapData(kv)
|
||||
seq := newEmptySequenceChunker(nil, nil, makeMapLeafChunkFn(nil), newOrderedMetaSequenceChunkFn(MapKind, nil), mapHashValueBytes)
|
||||
ch := newEmptyMapSequenceChunker(nil, nil)
|
||||
|
||||
for _, entry := range entries {
|
||||
seq.Append(entry)
|
||||
ch.Append(entry)
|
||||
}
|
||||
|
||||
return newMap(seq.Done().(orderedSequence))
|
||||
return newMap(ch.Done().(orderedSequence))
|
||||
}
|
||||
|
||||
func NewStreamingMap(vrw ValueReadWriter, kvs <-chan Value) <-chan Map {
|
||||
@@ -42,19 +42,17 @@ func NewStreamingMap(vrw ValueReadWriter, kvs <-chan Value) <-chan Map {
|
||||
|
||||
outChan := make(chan Map)
|
||||
go func() {
|
||||
mx := newMapMutator(vrw)
|
||||
|
||||
gb := NewGraphBuilder(vrw, MapKind, false)
|
||||
for v := range kvs {
|
||||
if k == nil {
|
||||
k = v
|
||||
continue
|
||||
}
|
||||
mx.Set(k, v)
|
||||
gb.MapSet(nil, k, v)
|
||||
k = nil
|
||||
}
|
||||
|
||||
d.PanicIfFalse(k == nil)
|
||||
outChan <- mx.Finish()
|
||||
outChan <- gb.Build().(Map)
|
||||
}()
|
||||
return outChan
|
||||
}
|
||||
@@ -294,3 +292,7 @@ func makeMapLeafChunkFn(vr ValueReader) makeChunkFn {
|
||||
return m, key, uint64(len(items))
|
||||
}
|
||||
}
|
||||
|
||||
func newEmptyMapSequenceChunker(vr ValueReader, vw ValueWriter) *sequenceChunker {
|
||||
return newEmptySequenceChunker(vr, vw, makeMapLeafChunkFn(vr), newOrderedMetaSequenceChunkFn(MapKind, vr), mapHashValueBytes)
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ func newMapMutator(vrw ValueReadWriter) *mapMutator {
|
||||
|
||||
func (mx *mapMutator) Set(key Value, val Value) *mapMutator {
|
||||
d.PanicIfFalse(mx.oc != nil, "Can't call Set() again after Finish()")
|
||||
mx.oc.MapSet(key, val)
|
||||
mx.oc.GraphMapSet(nil, key, val)
|
||||
return mx
|
||||
}
|
||||
|
||||
@@ -33,7 +33,10 @@ func (mx *mapMutator) Finish() Map {
|
||||
iter := mx.oc.NewIterator()
|
||||
defer iter.Release()
|
||||
for iter.Next() {
|
||||
seq.Append(iter.MapOp())
|
||||
keys, kind, item := iter.GraphOp()
|
||||
d.PanicIfFalse(0 == len(keys))
|
||||
d.PanicIfFalse(MapKind == kind)
|
||||
seq.Append(item)
|
||||
}
|
||||
return newMap(seq.Done().(orderedSequence))
|
||||
}
|
||||
|
||||
@@ -226,7 +226,7 @@ func (suite *mapTestSuite) createStreamingMap(vs *ValueStore) {
|
||||
kvChan <- entry.value
|
||||
}
|
||||
close(kvChan)
|
||||
suite.True(suite.validate(<-mapChan))
|
||||
suite.True(suite.validate(<-mapChan), "map not valid")
|
||||
}
|
||||
|
||||
func (suite *mapTestSuite) TestStreamingMap() {
|
||||
|
||||
+255
-137
@@ -2,6 +2,63 @@
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
// opCache stores build operations on a graph of nested Maps whose leaves can
|
||||
// in turn be Set, Map, or List collections containing any Noms Value.
|
||||
// OpCacheIterator returns operations in sorted order.
|
||||
//
|
||||
// OpCache uses a special encoding of the information supplied by the MapSet(),
|
||||
// ListAppend(), or SetInsert() operation stored in the ldbKey combined with
|
||||
// custom ldb Comparer object implemented in opcache_compare.go to make this
|
||||
// happen.
|
||||
//
|
||||
// Ldb keys are encoded byte arrays that contain the following information:
|
||||
// 4-bytes -- uint32 in BigEndian order which identifies this key/value
|
||||
// as belonging to a particular graph
|
||||
// 1-byte -- a NomsKind value that represents the collection type that is
|
||||
// being acted on. This will either be MapKind, SetKind, or ListKind.
|
||||
// 1-byte -- uint8 representing the number of NomsValues encoded in this key
|
||||
//
|
||||
// After this 6-byte header, there is a section of bytes for each value encoded
|
||||
// into the key. Each value has a 1-byte prefix:
|
||||
// 1-byte -- a NomsKind value that represents the type of value that is
|
||||
// being encoded.
|
||||
// The 1-byte NomsKind value determines what follows, if this value is
|
||||
// BoolKind, NumberKind, or StringKind, the rest of the bytes are:
|
||||
// 4-bytes -- uint32 length of the Value serialization
|
||||
// n-bytes -- the serialized value
|
||||
// If the NomsKind byte has any other value, it is followed by:
|
||||
// 20-bytes -- digest of Value's hash
|
||||
//
|
||||
// Whenever the value is encoded as a hash digest in the ldbKey, it's actual value
|
||||
// needs to get stored in the ldbValue. (More about this later)
|
||||
//
|
||||
// There are 3 operation types on opCache: MapSet(), SetInsert(), and ListAppend().
|
||||
// Each one stores slightly different things in the ldbKey.
|
||||
// MapSet() -- stores each graphKey and the key to the final Map
|
||||
// ValueSet() -- stores each graphKey and the Value being inserted into the set
|
||||
// ListAppend() -- stores each graphKey and a Number() containing an uint64 value
|
||||
// that is shared across all collections and lists which is incremented each time
|
||||
// ListAppend() is called.
|
||||
//
|
||||
// The ldbValue also stores different information for each mutation operation. An
|
||||
// ldbValue has a 1-byte uint8 header that is the number of values that are encoded
|
||||
// into it.
|
||||
// 1-byte -- uint8 indicating number of values encoded into this byte array
|
||||
// Then for each encoded value it contains:
|
||||
// 4-byte -- uint32 indicating length of value serialization
|
||||
// n-bytes -- the serialized value
|
||||
//
|
||||
// The ldbValue contains the following values for each type of mutation:
|
||||
// MapSet() -- stores any graphKeys that were encoded as a hash digest in
|
||||
// the ldbKey. The mapKey if it was encoded as a hash digest in the ldbKey
|
||||
// and the value being set in the map.
|
||||
// SetInsert() -- stores any graphKeys that were encoded as a hash digest in
|
||||
// the ldbKey. The value being inserted into the set if it was encoded into the
|
||||
// ldbKey as a hash digest.
|
||||
// ListAppend() -- stores any graphKeys that were encoded as a hash digest in the
|
||||
// ldbKey. The value being appended to the list.
|
||||
//
|
||||
|
||||
package types
|
||||
|
||||
import (
|
||||
@@ -26,14 +83,22 @@ type opCacheStore interface {
|
||||
}
|
||||
|
||||
type opCache interface {
|
||||
MapSet(mapKey Value, mapVal Value)
|
||||
SetInsert(val Value)
|
||||
// This method can be called from multiple go routines.
|
||||
GraphMapSet(keys ValueSlice, mapKey Value, mapVal Value)
|
||||
|
||||
// This method can be called from multiple go routines.
|
||||
GraphSetInsert(keys ValueSlice, val Value)
|
||||
|
||||
// This method can be called from multiple go routines, however items will
|
||||
// be appended to the list based on the order that routines execute
|
||||
// this method.
|
||||
GraphListAppend(keys ValueSlice, val Value)
|
||||
|
||||
NewIterator() opCacheIterator
|
||||
}
|
||||
|
||||
type opCacheIterator interface {
|
||||
MapOp() sequenceItem
|
||||
SetOp() sequenceItem
|
||||
GraphOp() (ValueSlice, NomsKind, sequenceItem)
|
||||
Next() bool
|
||||
Release()
|
||||
}
|
||||
@@ -46,9 +111,10 @@ type ldbOpCacheStore struct {
|
||||
}
|
||||
|
||||
type ldbOpCache struct {
|
||||
vrw ValueReadWriter
|
||||
colId uint32
|
||||
ldb *leveldb.DB
|
||||
vrw ValueReadWriter
|
||||
colId uint32
|
||||
listIdx int64
|
||||
ldb *leveldb.DB
|
||||
}
|
||||
|
||||
type ldbOpCacheIterator struct {
|
||||
@@ -63,8 +129,10 @@ func newLdbOpCacheStore(vrw ValueReadWriter) *ldbOpCacheStore {
|
||||
Compression: opt.NoCompression,
|
||||
Comparer: opCacheComparer{},
|
||||
OpenFilesCacheCapacity: 24,
|
||||
NoSync: true, // We don't need this data to be durable. LDB is acting as temporary storage that can be larger than main memory.
|
||||
WriteBuffer: 1 << 27, // 128MiB
|
||||
// This data does not have to be durable. LDB is acting as temporary
|
||||
// storage that can be larger than main memory.
|
||||
NoSync: true,
|
||||
WriteBuffer: 1 << 27, // 128MiB
|
||||
})
|
||||
d.Chk.NoError(err, "opening put cache in %s", dir)
|
||||
return &ldbOpCacheStore{ldb: db, dbDir: dir, vrw: vrw}
|
||||
@@ -80,40 +148,187 @@ func (store *ldbOpCacheStore) opCache() opCache {
|
||||
return &ldbOpCache{vrw: store.vrw, colId: colId, ldb: store.ldb}
|
||||
}
|
||||
|
||||
// Set can be called from any goroutine
|
||||
func (opc *ldbOpCache) MapSet(mapKey Value, mapVal Value) {
|
||||
mapKeyArray := [initialBufferSize]byte{}
|
||||
mapValArray := [initialBufferSize]byte{}
|
||||
// insertLdbOp encodes allKeys into the ldb key. Bool, Number, and String values
|
||||
// are encoded directly into the ldb key bytes. All other types are encoded as
|
||||
// their Hash() digest. Their actual value is then stored in ldb value.
|
||||
func (opc *ldbOpCache) insertLdbOp(allKeys ValueSlice, opKind NomsKind, val Value) {
|
||||
d.PanicIfTrue(len(allKeys) > 0x00FF, "Number of keys in GraphMapSet exceeds max of 256")
|
||||
ldbKeyBytes := [initialBufferSize]byte{}
|
||||
ldbValBytes := [initialBufferSize]byte{}
|
||||
|
||||
switch mapKey.Type().Kind() {
|
||||
default:
|
||||
ldbKey := ldbKeyFromValueHash(mapKey, opc.colId)
|
||||
ldbKey, valuesToEncode := encodeKeys(ldbKeyBytes[:0], opc.colId, opKind, allKeys, opc.vrw)
|
||||
|
||||
// Since we've used the ref of keyValue as our ldbKey. We need to store mapKey and mapVal in the ldb value. We use the following format for that:
|
||||
//
|
||||
// uint32 (4 bytes) bytes bytes
|
||||
// +-----------------------+---------------------+----------------------+
|
||||
// | key serialization len | serialized key | serialized value |
|
||||
// +-----------------------+---------------------+----------------------+
|
||||
|
||||
encodedMapKey := encToSlice(mapKey, mapKeyArray[:], opc.vrw)
|
||||
encodedMapVal := encToSlice(mapVal, mapValArray[:], opc.vrw)
|
||||
ldbValueArray := [initialBufferSize * 2]byte{}
|
||||
binary.LittleEndian.PutUint32(ldbValueArray[:], uint32(len(encodedMapKey)))
|
||||
ldbValue := ldbValueArray[0:4]
|
||||
ldbValue = append(ldbValue, encodedMapKey...)
|
||||
ldbValue = append(ldbValue, encodedMapVal...)
|
||||
|
||||
// TODO: Will manually batching calls to ldb.Put() help?
|
||||
err := opc.ldb.Put(ldbKey, ldbValue, nil)
|
||||
d.Chk.NoError(err)
|
||||
|
||||
case BoolKind, NumberKind, StringKind:
|
||||
ldbKey := ldbKeyFromValue(mapKey, opc.colId, opc.vrw)
|
||||
encodedMapVal := encToSlice(mapVal, mapValArray[:], opc.vrw)
|
||||
err := opc.ldb.Put(ldbKey, encodedMapVal, nil)
|
||||
d.Chk.NoError(err)
|
||||
// val may be nil when dealing with sets, since the val is the key.
|
||||
if val != nil {
|
||||
valuesToEncode = append(valuesToEncode, val)
|
||||
}
|
||||
ldbVal := encodeValues(ldbValBytes[:0], valuesToEncode, opc.vrw)
|
||||
|
||||
err := opc.ldb.Put(ldbKey, ldbVal, nil)
|
||||
d.Chk.NoError(err)
|
||||
}
|
||||
|
||||
func (opc *ldbOpCache) GraphMapSet(graphKeys ValueSlice, mapKey, mapVal Value) {
|
||||
allKeys := append(graphKeys, mapKey)
|
||||
opc.insertLdbOp(allKeys, MapKind, mapVal)
|
||||
}
|
||||
|
||||
func (opc *ldbOpCache) GraphSetInsert(graphKeys ValueSlice, val Value) {
|
||||
allKeys := append(graphKeys, val)
|
||||
opc.insertLdbOp(allKeys, SetKind, val)
|
||||
}
|
||||
|
||||
func (opc *ldbOpCache) GraphListAppend(graphKeys ValueSlice, val Value) {
|
||||
idx := atomic.AddInt64(&opc.listIdx, 1)
|
||||
allKeys := append(graphKeys, Number(idx))
|
||||
opc.insertLdbOp(allKeys, ListKind, val)
|
||||
}
|
||||
|
||||
func (i *ldbOpCacheIterator) GraphOp() (ValueSlice, NomsKind, sequenceItem) {
|
||||
ldbKey := i.iter.Key()
|
||||
ldbVal := i.iter.Value()
|
||||
|
||||
// skip over 4 bytes of colId and get opKind, and numKeys from bytes 4 & 5
|
||||
opKind := NomsKind(ldbKey[4])
|
||||
numKeys := uint8(ldbKey[5])
|
||||
ldbKey = ldbKey[6:]
|
||||
|
||||
// Call decodeValue for each encoded graphKey. nil will be appended to
|
||||
// graphKeys for any keys that were encoded as hash digests.
|
||||
graphKeys := ValueSlice{}
|
||||
for pos := uint8(0); pos < numKeys; pos++ {
|
||||
var gk Value
|
||||
ldbKey, gk = decodeValue(ldbKey, false, i.vr)
|
||||
graphKeys = append(graphKeys, gk)
|
||||
}
|
||||
|
||||
// Get the number of values whose value was encoded in ldbVal
|
||||
numEncodedValues := uint8(ldbVal[0])
|
||||
ldbVal = ldbVal[1:]
|
||||
|
||||
// Call decodeValue for each non-primitive key stored in ldbVal. Replace
|
||||
// the nil value in graphKeys with the new decodedValue.
|
||||
values := ValueSlice{}
|
||||
for pos := uint8(0); pos < numEncodedValues; pos++ {
|
||||
var gk Value
|
||||
ldbVal, gk = decodeValue(ldbVal, true, i.vr)
|
||||
values = append(values, gk)
|
||||
}
|
||||
|
||||
// Fold in any non-primitive key values that were stored in ldbVal
|
||||
pos := 0
|
||||
for idx, k1 := range graphKeys {
|
||||
if k1 == nil {
|
||||
graphKeys[idx] = values[pos]
|
||||
pos++
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the last key in graphKeys. The last key in graphKeys is the
|
||||
// mapkey for Maps, the item for Sets, and the index for Lists.
|
||||
key := graphKeys[len(graphKeys)-1]
|
||||
graphKeys = graphKeys[:len(graphKeys)-1]
|
||||
|
||||
var item sequenceItem
|
||||
switch opKind {
|
||||
case MapKind:
|
||||
val := values[len(values)-1]
|
||||
item = mapEntry{key, val}
|
||||
case SetKind:
|
||||
item = key
|
||||
case ListKind:
|
||||
item = values[len(values)-1]
|
||||
}
|
||||
|
||||
return graphKeys, opKind, item
|
||||
}
|
||||
|
||||
func (opc *ldbOpCache) NewIterator() opCacheIterator {
|
||||
prefix := [4]byte{}
|
||||
binary.BigEndian.PutUint32(prefix[:], opc.colId)
|
||||
return &ldbOpCacheIterator{iter: opc.ldb.NewIterator(util.BytesPrefix(prefix[:]), nil), vr: opc.vrw}
|
||||
}
|
||||
|
||||
func (i *ldbOpCacheIterator) Next() bool {
|
||||
return i.iter.Next()
|
||||
}
|
||||
|
||||
func (i *ldbOpCacheIterator) Release() {
|
||||
i.iter.Release()
|
||||
}
|
||||
|
||||
// encodeKeys() serializes a list of keys to the byte slice |bs|.
|
||||
func encodeKeys(bs []byte, colId uint32, opKind NomsKind, keys []Value, vrw ValueReadWriter) ([]byte, []Value) {
|
||||
// All ldb keys start with a 4-byte collection id that serves as a namespace
|
||||
// that keeps them separate from other collections.
|
||||
idHolder := [4]byte{}
|
||||
idHolderSlice := idHolder[:4]
|
||||
binary.BigEndian.PutUint32(idHolderSlice, colId)
|
||||
bs = append(bs, idHolderSlice...)
|
||||
|
||||
// bs[4] is a NomsKind value which represents the type of leaf
|
||||
// collection being operated on (i.e. MapKind, SetKind, or ListKind)
|
||||
// bs[5] is a single uint8 value representing the number of keys
|
||||
// encoded in the ldb key.
|
||||
bs = append(bs, byte(opKind), byte(len(keys)))
|
||||
|
||||
valuesToEncode := ValueSlice{}
|
||||
for _, gk := range keys {
|
||||
bs = encodeGraphKey(bs, gk, vrw)
|
||||
if !isKindOrderedByValue(gk.Type().Kind()) {
|
||||
valuesToEncode = append(valuesToEncode, gk)
|
||||
}
|
||||
}
|
||||
return bs, valuesToEncode
|
||||
}
|
||||
|
||||
func encodeValues(bs []byte, valuesToEncode []Value, vrw ValueReadWriter) []byte {
|
||||
// Encode allValues into the ldbVal byte slice.
|
||||
bs = append(bs, uint8(len(valuesToEncode)))
|
||||
for _, k := range valuesToEncode {
|
||||
bs = encodeGraphValue(bs, k, vrw)
|
||||
}
|
||||
return bs
|
||||
}
|
||||
|
||||
func encodeGraphKey(bs []byte, v Value, vrw ValueReadWriter) []byte {
|
||||
return encodeForGraph(bs, v, false, vrw)
|
||||
}
|
||||
|
||||
func encodeGraphValue(bs []byte, v Value, vrw ValueReadWriter) []byte {
|
||||
return encodeForGraph(bs, v, true, vrw)
|
||||
}
|
||||
|
||||
func encodeForGraph(bs []byte, v Value, asValue bool, vrw ValueReadWriter) []byte {
|
||||
// Note: encToSlice() and append() will both grow the backing store of |bs|
|
||||
// as necessary. Always call them when writing to |bs|.
|
||||
if asValue || isKindOrderedByValue(v.Type().Kind()) {
|
||||
// if we're encoding value, then put:
|
||||
// noms-kind(1-byte), serialization-len(4-bytes), serialization(n-bytes)
|
||||
buf := [initialBufferSize]byte{}
|
||||
uint32buf := [4]byte{}
|
||||
encodedVal := encToSlice(v, buf[:], vrw)
|
||||
binary.BigEndian.PutUint32(uint32buf[:], uint32(len(encodedVal)))
|
||||
bs = append(bs, uint8(v.Type().Kind()))
|
||||
bs = append(bs, uint32buf[:]...)
|
||||
bs = append(bs, encodedVal...)
|
||||
} else {
|
||||
// if we're encoding hash values, we know the length, so we can leave that out
|
||||
bs = append(bs, uint8(v.Type().Kind()))
|
||||
bs = append(bs, v.Hash().DigestSlice()...)
|
||||
}
|
||||
return bs
|
||||
}
|
||||
|
||||
func decodeValue(bs []byte, asValue bool, vr ValueReader) ([]byte, Value) {
|
||||
kind := NomsKind(bs[0])
|
||||
var v Value
|
||||
if asValue || isKindOrderedByValue(kind) {
|
||||
encodedLen := binary.BigEndian.Uint32(bs[1:5])
|
||||
v = DecodeFromBytes(bs[5:5+encodedLen], vr, staticTypeCache)
|
||||
return bs[5+encodedLen:], v
|
||||
}
|
||||
return bs[1+hash.ByteLen:], nil
|
||||
}
|
||||
|
||||
// Note that, if 'v' are prolly trees, any in-memory child chunks will be written to vw at this time.
|
||||
@@ -124,100 +339,3 @@ func encToSlice(v Value, initBuf []byte, vw ValueWriter) []byte {
|
||||
enc.writeValue(v)
|
||||
return w.data()
|
||||
}
|
||||
|
||||
func (opc *ldbOpCache) NewIterator() opCacheIterator {
|
||||
prefix := [4]byte{}
|
||||
binary.LittleEndian.PutUint32(prefix[:], opc.colId)
|
||||
return &ldbOpCacheIterator{iter: opc.ldb.NewIterator(util.BytesPrefix(prefix[:]), nil), vr: opc.vrw}
|
||||
}
|
||||
|
||||
func (i *ldbOpCacheIterator) Next() bool {
|
||||
return i.iter.Next()
|
||||
}
|
||||
|
||||
func (i *ldbOpCacheIterator) MapOp() sequenceItem {
|
||||
entry := mapEntry{}
|
||||
ldbKey := i.iter.Key()
|
||||
ldbValue := i.iter.Value()
|
||||
switch NomsKind(ldbKey[uint32Size]) {
|
||||
case BoolKind, NumberKind, StringKind:
|
||||
entry.key = DecodeFromBytes(ldbKey[uint32Size:], i.vr, staticTypeCache)
|
||||
entry.value = DecodeFromBytes(ldbValue, i.vr, staticTypeCache)
|
||||
default:
|
||||
keyBytesLen := int(binary.LittleEndian.Uint32(ldbValue))
|
||||
entry.key = DecodeFromBytes(ldbValue[uint32Size:uint32Size+keyBytesLen], i.vr, staticTypeCache)
|
||||
entry.value = DecodeFromBytes(ldbValue[uint32Size+keyBytesLen:], i.vr, staticTypeCache)
|
||||
}
|
||||
|
||||
return entry
|
||||
}
|
||||
|
||||
// Insert can be called from any goroutine
|
||||
func (opc *ldbOpCache) SetInsert(val Value) {
|
||||
switch val.Type().Kind() {
|
||||
default:
|
||||
ldbKey := ldbKeyFromValueHash(val, opc.colId)
|
||||
valArray := [initialBufferSize]byte{}
|
||||
encodedVal := encToSlice(val, valArray[:], opc.vrw)
|
||||
err := opc.ldb.Put(ldbKey, encodedVal, nil)
|
||||
d.Chk.NoError(err)
|
||||
|
||||
case BoolKind, NumberKind, StringKind:
|
||||
ldbKey := ldbKeyFromValue(val, opc.colId, opc.vrw)
|
||||
// Since the ldbKey contains the val, there's no reason to store anything in the ldbValue
|
||||
err := opc.ldb.Put(ldbKey, nil, nil)
|
||||
d.Chk.NoError(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (i *ldbOpCacheIterator) SetOp() sequenceItem {
|
||||
ldbKey := i.iter.Key()
|
||||
|
||||
switch NomsKind(ldbKey[uint32Size]) {
|
||||
case BoolKind, NumberKind, StringKind:
|
||||
return DecodeFromBytes(ldbKey[uint32Size:], i.vr, staticTypeCache)
|
||||
default:
|
||||
data := i.iter.Value()
|
||||
return DecodeFromBytes(data, i.vr, staticTypeCache)
|
||||
}
|
||||
}
|
||||
|
||||
func (i *ldbOpCacheIterator) Release() {
|
||||
i.iter.Release()
|
||||
}
|
||||
|
||||
// writeLdbKeyHeaderBytes writes the first 4 or 5 bytes into the ldbKey. The first 4 bytes in every
|
||||
// ldbKey are the colId. This identifies all the keys for a particular opCache and allows this opStore
|
||||
// to cache values for multiple collections. The optional 5th byte is the NomsKind of the value. In
|
||||
// cases where we're encoding the hash of an object we need to write the nomsKind manually because the
|
||||
// hash doesn't contain it. In cases were we are encoding a primitive value into the key, the first byte
|
||||
// of the value is the nomsKind so there is no reason to write it again.
|
||||
func writeLdbKeyHeaderBytes(ldbKey []byte, colId uint32, v Value) []byte {
|
||||
binary.LittleEndian.PutUint32(ldbKey, colId)
|
||||
length := uint32Size
|
||||
if v != nil {
|
||||
ldbKey[length] = byte(v.Type().Kind())
|
||||
length++
|
||||
}
|
||||
return ldbKey[0:length]
|
||||
}
|
||||
|
||||
// ldbKeys for non-primitive Nom Values (e.g. blobs, structs, lists, maps, etc) use a serialization of the values hash:
|
||||
// colId(4 bytes) + nomsKind(val)(1 byte) + val.Hash()(20 bytes).
|
||||
|
||||
func ldbKeyFromValueHash(val Value, colId uint32) []byte {
|
||||
ldbKeyArray := [uint32Size + 1 + hash.ByteLen]byte{}
|
||||
ldbKey := writeLdbKeyHeaderBytes(ldbKeyArray[:], colId, val)
|
||||
return append(ldbKey, val.Hash().DigestSlice()...)
|
||||
}
|
||||
|
||||
// ldbKeys for primitive Noms Values (e.g. bool, number, & string) consist of a byte string that encodes:
|
||||
// colId(4 bytes) + serialized value(n bytes)
|
||||
// Note: the first byte of the serialized value is the NomsKind.
|
||||
func ldbKeyFromValue(val Value, colId uint32, vrw ValueReadWriter) []byte {
|
||||
valArray := [initialBufferSize]byte{}
|
||||
ldbKeyArray := [initialBufferSize]byte{}
|
||||
ldbKey := writeLdbKeyHeaderBytes(ldbKeyArray[:], colId, nil)
|
||||
encodedVal := encToSlice(val, valArray[:], vrw)
|
||||
return append(ldbKey, encodedVal...)
|
||||
}
|
||||
|
||||
+109
-27
@@ -6,6 +6,7 @@ package types
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/binary"
|
||||
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
@@ -17,28 +18,115 @@ func (opCacheComparer) Compare(a, b []byte) int {
|
||||
if res := bytes.Compare(a[:uint32Size], b[:uint32Size]); res != 0 {
|
||||
return res
|
||||
}
|
||||
a, b = a[uint32Size:], b[uint32Size:]
|
||||
return compareEncodedKeys(a[uint32Size:], b[uint32Size:])
|
||||
}
|
||||
|
||||
func (opCacheComparer) Name() string {
|
||||
return "noms.OpCacheComparator"
|
||||
}
|
||||
|
||||
func (opCacheComparer) Successor(dst, b []byte) []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (opCacheComparer) Separator(dst, a, b []byte) []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func compareEncodedKeys(a, b []byte) int {
|
||||
if compared, res := compareEmpties(a, b); compared {
|
||||
return res
|
||||
}
|
||||
|
||||
// keys are encoded as either values:
|
||||
// nomsKind(1-byte) + serialized len(4-bytes) + serialized value(n-bytes)
|
||||
// or digests:
|
||||
// nomsKind(1-byte) + digest(hash.Bytelen-bytes)
|
||||
splitAfterFirstKey := func(bs []byte) ([]byte, []byte) {
|
||||
keyLen := 1 + hash.ByteLen
|
||||
if isKindOrderedByValue(NomsKind(bs[0])) {
|
||||
l := int(binary.BigEndian.Uint32(bs[1:5]))
|
||||
keyLen = 1 + uint32Size + l
|
||||
}
|
||||
return bs[:keyLen], bs[keyLen:]
|
||||
}
|
||||
|
||||
// a[0] and b[0] represent NomsKind of leafNode being operated on
|
||||
// a[1] and b[1] are the number of keys encoded in this byte slice
|
||||
numAGraphKeys, numBGraphKeys := a[1], b[1]
|
||||
minNumKeys := minByte(numAGraphKeys, numBGraphKeys)
|
||||
|
||||
a, b = a[2:], b[2:]
|
||||
cres := 0
|
||||
for pos := 0; pos < int(minNumKeys) && cres == 0; pos++ {
|
||||
aKey, aRest := splitAfterFirstKey(a)
|
||||
bKey, bRest := splitAfterFirstKey(b)
|
||||
cres = compareEncodedKey(aKey, bKey)
|
||||
a, b = aRest, bRest
|
||||
}
|
||||
|
||||
if cres == 0 {
|
||||
if numAGraphKeys < numBGraphKeys {
|
||||
return -1
|
||||
}
|
||||
if numAGraphKeys > numBGraphKeys {
|
||||
return 1
|
||||
}
|
||||
}
|
||||
return cres
|
||||
}
|
||||
|
||||
// compareEncodedKey accepts two byte slices that each contain a number of
|
||||
// encoded keys. It extracts the first key in each slice and returns the result
|
||||
// of comparing them.
|
||||
func compareEncodedKey(a, b []byte) int {
|
||||
// keys that are orderd by value are encoded as:
|
||||
// NomsKind(1-byte) + length(4-bytes) + encoding(n-bytes)
|
||||
// keys that are not ordred by value are encoded as
|
||||
// NomsKind(1-byte) + hash digest(20-bytes)
|
||||
|
||||
aKind, bKind := NomsKind(a[0]), NomsKind(b[0])
|
||||
if !isKindOrderedByValue(aKind) && !isKindOrderedByValue(bKind) {
|
||||
a, b := a[1:], b[1:]
|
||||
d.PanicIfFalse(len(a) == hash.ByteLen && len(b) == hash.ByteLen)
|
||||
res := bytes.Compare(a, b)
|
||||
d.PanicIfTrue(res == 0 && aKind != bKind, "Values of different kinds with the same hash. Whaa??")
|
||||
return res
|
||||
}
|
||||
|
||||
// Now, we know that at least one of a and b is ordered by value. So if the
|
||||
// kinds are different, we can sort just by comparing them.
|
||||
if res := compareKinds(aKind, bKind); res != 0 {
|
||||
return res
|
||||
}
|
||||
|
||||
// Now we know that we are comparing two values that are both Bools, Numbers,
|
||||
// or Strings. Extract their length and create slices that just contain their
|
||||
// Noms encodings.
|
||||
lenA := binary.BigEndian.Uint32(a[1:5])
|
||||
lenB := binary.BigEndian.Uint32(b[1:5])
|
||||
|
||||
// create a1, b1 slices that just contain encoding
|
||||
a1, b1 := a[1+uint32Size:1+uint32Size+lenA], b[1+uint32Size:1+uint32Size+lenB]
|
||||
|
||||
return compareEncodedNomsValues(a1, b1)
|
||||
}
|
||||
|
||||
// compareEncodedNomsValues compares two slices. Each slice contains a first
|
||||
// byte that holds the nomsKind of the original key and an encoding for that key.
|
||||
// This method relies on knowledge about how bytes are arranged in a Noms
|
||||
// encoding and makes use of that for companing values efficiently.
|
||||
func compareEncodedNomsValues(a, b []byte) int {
|
||||
if compared, res := compareEmpties(a, b); compared {
|
||||
return res
|
||||
}
|
||||
aKind, bKind := NomsKind(a[0]), NomsKind(b[0])
|
||||
d.PanicIfFalse(aKind == bKind, "compareEncodedNomsValues, aKind:", aKind, "!= bKind:", bKind)
|
||||
|
||||
switch aKind {
|
||||
default:
|
||||
if bKind <= StringKind {
|
||||
return 1
|
||||
}
|
||||
a, b = a[1:], b[1:]
|
||||
d.PanicIfFalse(len(a) == hash.ByteLen && len(b) == hash.ByteLen)
|
||||
res := bytes.Compare(a, b)
|
||||
d.PanicIfFalse(res != 0 || aKind == bKind)
|
||||
return res
|
||||
case BoolKind:
|
||||
return bytes.Compare(a, b)
|
||||
case NumberKind:
|
||||
if res := compareKinds(aKind, bKind); res != 0 {
|
||||
return res
|
||||
}
|
||||
reader := binaryNomsReader{a[1:], 0}
|
||||
aNum := reader.readNumber()
|
||||
reader.buff, reader.offset = b[1:], 0
|
||||
@@ -51,11 +139,10 @@ func (opCacheComparer) Compare(a, b []byte) int {
|
||||
}
|
||||
return 1
|
||||
case StringKind:
|
||||
if bKind == StringKind {
|
||||
a, b = a[1+uint32Size:], b[1+uint32Size:]
|
||||
}
|
||||
return bytes.Compare(a, b)
|
||||
res := bytes.Compare(a[1+uint32Size:], b[1+uint32Size:])
|
||||
return res
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
func compareEmpties(a, b []byte) (bool, int) {
|
||||
@@ -81,14 +168,9 @@ func compareKinds(aKind, bKind NomsKind) (res int) {
|
||||
return
|
||||
}
|
||||
|
||||
func (opCacheComparer) Name() string {
|
||||
return "noms.OpCacheComparator"
|
||||
}
|
||||
|
||||
func (opCacheComparer) Successor(dst, b []byte) []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (opCacheComparer) Separator(dst, a, b []byte) []byte {
|
||||
return nil
|
||||
func minByte(a, b byte) byte {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/testify/suite"
|
||||
)
|
||||
|
||||
@@ -43,7 +44,7 @@ func (suite *OpCacheSuite) TestMapSet() {
|
||||
}
|
||||
oc := suite.vs.opCache()
|
||||
for _, entry := range entries {
|
||||
oc.MapSet(entry.key, entry.value)
|
||||
oc.GraphMapSet(nil, entry.key, entry.value)
|
||||
}
|
||||
sort.Sort(entries)
|
||||
|
||||
@@ -51,7 +52,10 @@ func (suite *OpCacheSuite) TestMapSet() {
|
||||
iter := oc.NewIterator()
|
||||
defer iter.Release()
|
||||
for iter.Next() {
|
||||
iterated = append(iterated, iter.MapOp().(mapEntry))
|
||||
keys, kind, item := iter.GraphOp()
|
||||
d.Chk.Empty(keys)
|
||||
d.Chk.Equal(MapKind, kind)
|
||||
iterated = append(iterated, item.(mapEntry))
|
||||
}
|
||||
suite.True(entries.Equals(iterated))
|
||||
}
|
||||
@@ -74,7 +78,7 @@ func (suite *OpCacheSuite) TestSetInsert() {
|
||||
}
|
||||
oc := suite.vs.opCache()
|
||||
for _, entry := range entries {
|
||||
oc.SetInsert(entry)
|
||||
oc.GraphSetInsert(nil, entry)
|
||||
}
|
||||
sort.Sort(entries)
|
||||
|
||||
@@ -82,7 +86,43 @@ func (suite *OpCacheSuite) TestSetInsert() {
|
||||
iter := oc.NewIterator()
|
||||
defer iter.Release()
|
||||
for iter.Next() {
|
||||
iterated = append(iterated, iter.SetOp().(Value))
|
||||
keys, kind, item := iter.GraphOp()
|
||||
d.Chk.Empty(keys)
|
||||
d.Chk.Equal(SetKind, kind)
|
||||
iterated = append(iterated, item.(Value))
|
||||
}
|
||||
suite.True(entries.Equals(iterated))
|
||||
}
|
||||
|
||||
func (suite *OpCacheSuite) TestListAppend() {
|
||||
entries := ValueSlice{
|
||||
NewList(Number(8), Number(0)),
|
||||
String("ahoy"),
|
||||
NewBlob(bytes.NewBufferString("A value")),
|
||||
Number(1),
|
||||
Bool(true),
|
||||
Bool(false),
|
||||
NewBlob(bytes.NewBuffer([]byte{0xff, 0, 0})),
|
||||
NewMap(),
|
||||
Number(42),
|
||||
NewStruct("thing1", StructData{"a": Number(7)}),
|
||||
String("struct"),
|
||||
NewStruct("thing2", nil),
|
||||
String("other"),
|
||||
}
|
||||
oc := suite.vs.opCache()
|
||||
for _, entry := range entries {
|
||||
oc.GraphListAppend(nil, entry)
|
||||
}
|
||||
|
||||
iterated := ValueSlice{}
|
||||
iter := oc.NewIterator()
|
||||
defer iter.Release()
|
||||
for iter.Next() {
|
||||
keys, kind, item := iter.GraphOp()
|
||||
d.Chk.Empty(keys)
|
||||
d.Chk.Equal(ListKind, kind)
|
||||
iterated = append(iterated, item.(Value))
|
||||
}
|
||||
suite.True(entries.Equals(iterated))
|
||||
}
|
||||
|
||||
+10
-8
@@ -21,25 +21,23 @@ func newSet(seq orderedSequence) Set {
|
||||
|
||||
func NewSet(v ...Value) Set {
|
||||
data := buildSetData(v)
|
||||
seq := newEmptySequenceChunker(nil, nil, makeSetLeafChunkFn(nil), newOrderedMetaSequenceChunkFn(SetKind, nil), hashValueBytes)
|
||||
ch := newEmptySetSequenceChunker(nil, nil)
|
||||
|
||||
for _, v := range data {
|
||||
seq.Append(v)
|
||||
ch.Append(v)
|
||||
}
|
||||
|
||||
return newSet(seq.Done().(orderedSequence))
|
||||
return newSet(ch.Done().(orderedSequence))
|
||||
}
|
||||
|
||||
func NewStreamingSet(vrw ValueReadWriter, vals <-chan Value) <-chan Set {
|
||||
outChan := make(chan Set)
|
||||
go func() {
|
||||
mx := newSetMutator(vrw)
|
||||
|
||||
gb := NewGraphBuilder(vrw, SetKind, false)
|
||||
for v := range vals {
|
||||
mx.Insert(v)
|
||||
gb.SetInsert(nil, v)
|
||||
}
|
||||
|
||||
outChan <- mx.Finish()
|
||||
outChan <- gb.Build().(Set)
|
||||
}()
|
||||
return outChan
|
||||
}
|
||||
@@ -240,3 +238,7 @@ func makeSetLeafChunkFn(vr ValueReader) makeChunkFn {
|
||||
return set, key, uint64(len(items))
|
||||
}
|
||||
}
|
||||
|
||||
func newEmptySetSequenceChunker(vr ValueReader, vw ValueWriter) *sequenceChunker {
|
||||
return newEmptySequenceChunker(vr, vw, makeSetLeafChunkFn(vr), newOrderedMetaSequenceChunkFn(SetKind, vr), hashValueBytes)
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ func newSetMutator(vrw ValueReadWriter) *setMutator {
|
||||
|
||||
func (mx *setMutator) Insert(val Value) *setMutator {
|
||||
d.PanicIfFalse(mx.oc != nil, "Can't call Insert() again after Finish()")
|
||||
mx.oc.SetInsert(val)
|
||||
mx.oc.GraphSetInsert(nil, val)
|
||||
return mx
|
||||
}
|
||||
|
||||
@@ -33,7 +33,10 @@ func (mx *setMutator) Finish() Set {
|
||||
iter := mx.oc.NewIterator()
|
||||
defer iter.Release()
|
||||
for iter.Next() {
|
||||
seq.Append(iter.SetOp())
|
||||
keys, kind, item := iter.GraphOp()
|
||||
d.PanicIfFalse(0 == len(keys))
|
||||
d.PanicIfFalse(SetKind == kind)
|
||||
seq.Append(item)
|
||||
}
|
||||
return newSet(seq.Done().(orderedSequence))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user