mirror of
https://github.com/dolthub/dolt.git
synced 2026-01-28 18:59:00 -06:00
Add streaming set and improve streaming collection performance.
Streaming collections now share one ldb instance rather than having 1 ldb instance per collection. Towards: #2114 (nomsdex)
This commit is contained in:
@@ -12,6 +12,8 @@ import (
|
||||
"github.com/attic-labs/testify/assert"
|
||||
)
|
||||
|
||||
var prefix = []byte{0x01, 0x02, 0x03, 0x04}
|
||||
|
||||
func TestTotalOrdering(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
@@ -48,25 +50,18 @@ func TestTotalOrdering(t *testing.T) {
|
||||
func TestCompareEmpties(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
comp := opCacheComparer{}
|
||||
assert.Equal(-1, comp.Compare(nil, []byte{0xff}))
|
||||
assert.Equal(-1, comp.Compare([]byte{}, []byte{0xff}))
|
||||
|
||||
assert.Equal(0, comp.Compare(nil, nil))
|
||||
assert.Equal(0, comp.Compare(nil, []byte{}))
|
||||
assert.Equal(0, comp.Compare([]byte{}, []byte{}))
|
||||
assert.Equal(0, comp.Compare([]byte{}, nil))
|
||||
|
||||
assert.Equal(1, comp.Compare([]byte{0xff}, nil))
|
||||
assert.Equal(1, comp.Compare([]byte{0xff}, []byte{}))
|
||||
assert.Equal(-1, comp.Compare(prefix, append(prefix, 0xff)))
|
||||
assert.Equal(0, comp.Compare(prefix, prefix))
|
||||
assert.Equal(1, comp.Compare(append(prefix, 0xff), prefix))
|
||||
}
|
||||
|
||||
func TestCompareDifferentPrimitiveTypes(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
comp := opCacheComparer{}
|
||||
|
||||
b := []byte{byte(BoolKind), 0x00}
|
||||
n := []byte{byte(NumberKind), 0x00}
|
||||
s := []byte{byte(StringKind), 'a'}
|
||||
b := append(prefix, byte(BoolKind), 0x00)
|
||||
n := append(prefix, byte(NumberKind), 0x00)
|
||||
s := append(prefix, byte(StringKind), 'a')
|
||||
|
||||
assert.Equal(-1, comp.Compare(b, n))
|
||||
assert.Equal(-1, comp.Compare(b, s))
|
||||
@@ -109,9 +104,9 @@ func TestCompareHashes(t *testing.T) {
|
||||
one := encode(Number(1))
|
||||
hey := encode(String("hey"))
|
||||
|
||||
minHash := append([]byte{byte(BlobKind)}, bytes.Repeat([]byte{0}, hash.ByteLen)...)
|
||||
maxHash := append([]byte{byte(BlobKind)}, bytes.Repeat([]byte{0xff}, hash.ByteLen)...)
|
||||
almostMaxHash := append([]byte{byte(BlobKind)}, append(bytes.Repeat([]byte{0xff}, hash.ByteLen-1), 0xfe)...)
|
||||
minHash := append(prefix, append([]byte{byte(BlobKind)}, bytes.Repeat([]byte{0}, hash.ByteLen)...)...)
|
||||
maxHash := append(prefix, append([]byte{byte(BlobKind)}, bytes.Repeat([]byte{0xff}, hash.ByteLen)...)...)
|
||||
almostMaxHash := append(prefix, append([]byte{byte(BlobKind)}, append(bytes.Repeat([]byte{0xff}, hash.ByteLen-1), 0xfe)...)...)
|
||||
|
||||
assert.Equal(-1, comp.Compare(tru, minHash))
|
||||
assert.Equal(-1, comp.Compare(one, minHash))
|
||||
@@ -135,13 +130,15 @@ func TestCompareHashes(t *testing.T) {
|
||||
assert.Equal(1, comp.Compare(maxHash, almostMaxHash))
|
||||
assert.Equal(1, comp.Compare(almostMaxHash, minHash))
|
||||
|
||||
almostMaxHash[0]++
|
||||
almostMaxHash[5]++
|
||||
assert.Equal(1, comp.Compare(maxHash, almostMaxHash))
|
||||
|
||||
almostMaxHash[0]++
|
||||
assert.Equal(-1, comp.Compare(maxHash, almostMaxHash))
|
||||
}
|
||||
|
||||
func encode(v Value) []byte {
|
||||
w := &binaryNomsWriter{make([]byte, 128, 128), 0}
|
||||
newValueEncoder(w, nil).writeValue(v)
|
||||
return w.data()
|
||||
return append(prefix, w.data()...)
|
||||
}
|
||||
|
||||
@@ -42,7 +42,7 @@ func NewStreamingMap(vrw ValueReadWriter, kvs <-chan Value) <-chan Map {
|
||||
|
||||
outChan := make(chan Map)
|
||||
go func() {
|
||||
mx := newMutator(vrw)
|
||||
mx := newMapMutator(vrw)
|
||||
|
||||
for v := range kvs {
|
||||
if k == nil {
|
||||
|
||||
@@ -7,24 +7,23 @@ package types
|
||||
import "github.com/attic-labs/noms/go/d"
|
||||
|
||||
type mapMutator struct {
|
||||
oc *opCache
|
||||
oc opCache
|
||||
vrw ValueReadWriter
|
||||
}
|
||||
|
||||
func newMutator(vrw ValueReadWriter) *mapMutator {
|
||||
return &mapMutator{newOpCache(vrw), vrw}
|
||||
func newMapMutator(vrw ValueReadWriter) *mapMutator {
|
||||
return &mapMutator{vrw.opCache(), vrw}
|
||||
}
|
||||
|
||||
func (mx *mapMutator) Set(key Value, val Value) *mapMutator {
|
||||
d.Chk.True(mx.oc != nil, "Can't call Set() again after Finish()")
|
||||
mx.oc.Set(key, val)
|
||||
mx.oc.MapSet(key, val)
|
||||
return mx
|
||||
}
|
||||
|
||||
func (mx *mapMutator) Finish() Map {
|
||||
d.Chk.True(mx.oc != nil, "Can only call Finish() once")
|
||||
defer func() {
|
||||
mx.oc.Destroy()
|
||||
mx.oc = nil
|
||||
}()
|
||||
|
||||
@@ -34,7 +33,7 @@ func (mx *mapMutator) Finish() Map {
|
||||
iter := mx.oc.NewIterator()
|
||||
defer iter.Release()
|
||||
for iter.Next() {
|
||||
seq.Append(iter.Op())
|
||||
seq.Append(iter.MapOp())
|
||||
}
|
||||
return newMap(seq.Done().(orderedSequence))
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"bytes"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/attic-labs/testify/assert"
|
||||
@@ -212,12 +213,11 @@ func newMapTestSuite(size uint, expectRefStr string, expectChunkCount int, expec
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *mapTestSuite) TestStreamingMap() {
|
||||
func (suite *mapTestSuite) createStreamingMap(vs *ValueStore) {
|
||||
randomized := make(mapEntrySlice, len(suite.elems.entries))
|
||||
for i, j := range rand.Perm(len(randomized)) {
|
||||
randomized[j] = suite.elems.entries[i]
|
||||
}
|
||||
vs := NewTestValueStore()
|
||||
|
||||
kvChan := make(chan Value)
|
||||
mapChan := NewStreamingMap(vs, kvChan)
|
||||
@@ -226,7 +226,30 @@ func (suite *mapTestSuite) TestStreamingMap() {
|
||||
kvChan <- entry.value
|
||||
}
|
||||
close(kvChan)
|
||||
suite.validate(<-mapChan)
|
||||
suite.True(suite.validate(<-mapChan))
|
||||
}
|
||||
|
||||
func (suite *mapTestSuite) TestStreamingMap() {
|
||||
vs := NewTestValueStore()
|
||||
defer vs.Close()
|
||||
suite.createStreamingMap(vs)
|
||||
}
|
||||
|
||||
func (suite *mapTestSuite) TestStreamingMap2() {
|
||||
wg := sync.WaitGroup{}
|
||||
vs := NewTestValueStore()
|
||||
defer vs.Close()
|
||||
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
suite.createStreamingMap(vs)
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
suite.createStreamingMap(vs)
|
||||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestMapSuite1K(t *testing.T) {
|
||||
|
||||
@@ -8,15 +8,55 @@ import (
|
||||
"encoding/binary"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
"github.com/syndtr/goleveldb/leveldb"
|
||||
"github.com/syndtr/goleveldb/leveldb/iterator"
|
||||
"github.com/syndtr/goleveldb/leveldb/opt"
|
||||
"github.com/syndtr/goleveldb/leveldb/util"
|
||||
)
|
||||
|
||||
func newOpCache(vrw ValueReadWriter) *opCache {
|
||||
const uint32Size = 4
|
||||
|
||||
type opCacheStore interface {
|
||||
opCache() opCache
|
||||
destroy() error
|
||||
}
|
||||
|
||||
type opCache interface {
|
||||
MapSet(mapKey Value, mapVal Value)
|
||||
SetInsert(val Value)
|
||||
NewIterator() opCacheIterator
|
||||
}
|
||||
|
||||
type opCacheIterator interface {
|
||||
MapOp() sequenceItem
|
||||
SetOp() sequenceItem
|
||||
Next() bool
|
||||
Release()
|
||||
}
|
||||
|
||||
type ldbOpCacheStore struct {
|
||||
ldb *leveldb.DB
|
||||
dbDir string
|
||||
collectionId uint32
|
||||
vrw ValueReadWriter
|
||||
}
|
||||
|
||||
type ldbOpCache struct {
|
||||
vrw ValueReadWriter
|
||||
colId uint32
|
||||
ldb *leveldb.DB
|
||||
}
|
||||
|
||||
type ldbOpCacheIterator struct {
|
||||
iter iterator.Iterator
|
||||
vr ValueReader
|
||||
}
|
||||
|
||||
func newLdbOpCacheStore(vrw ValueReadWriter) *ldbOpCacheStore {
|
||||
dir, err := ioutil.TempDir("", "")
|
||||
d.Chk.NoError(err)
|
||||
db, err := leveldb.OpenFile(dir, &opt.Options{
|
||||
@@ -27,62 +67,56 @@ func newOpCache(vrw ValueReadWriter) *opCache {
|
||||
WriteBuffer: 1 << 27, // 128MiB
|
||||
})
|
||||
d.Chk.NoError(err, "opening put cache in %s", dir)
|
||||
return &opCache{ops: db, dbDir: dir, vrw: vrw}
|
||||
return &ldbOpCacheStore{ldb: db, dbDir: dir, vrw: vrw}
|
||||
}
|
||||
|
||||
type opCache struct {
|
||||
ops *leveldb.DB
|
||||
dbDir string
|
||||
vrw ValueReadWriter
|
||||
ldbKeyScratch [1 + hash.ByteLen]byte
|
||||
keyScratch [initialBufferSize]byte
|
||||
valScratch [initialBufferSize]byte
|
||||
func (store *ldbOpCacheStore) destroy() error {
|
||||
d.Chk.NoError(store.ldb.Close())
|
||||
return os.RemoveAll(store.dbDir)
|
||||
}
|
||||
|
||||
type opCacheIterator struct {
|
||||
iter iterator.Iterator
|
||||
vr ValueReader
|
||||
func (store *ldbOpCacheStore) opCache() opCache {
|
||||
colId := atomic.AddUint32(&store.collectionId, 1)
|
||||
return &ldbOpCache{vrw: store.vrw, colId: colId, ldb: store.ldb}
|
||||
}
|
||||
|
||||
var uint32Size = binary.Size(uint32(0))
|
||||
|
||||
// Set can be called from any goroutine
|
||||
func (p *opCache) Set(mapKey Value, mapVal Value) {
|
||||
func (opc *ldbOpCache) MapSet(mapKey Value, mapVal Value) {
|
||||
mapKeyArray := [initialBufferSize]byte{}
|
||||
mapValArray := [initialBufferSize]byte{}
|
||||
|
||||
switch mapKey.Type().Kind() {
|
||||
default:
|
||||
// This is the complicated case. For non-primitives, we want the ldb key to be the hash of mapKey, but we obviously need to get both mapKey and mapVal into ldb somehow. The simplest thing is just to do this:
|
||||
ldbKey := ldbKeyFromValueHash(mapKey, opc.colId)
|
||||
|
||||
// Since we've used the ref of keyValue as our ldbKey. We need to store mapKey and mapVal in the ldb value. We use the following format for that:
|
||||
//
|
||||
// uint32 (4 bytes) bytes bytes
|
||||
// +-----------------------+---------------------+----------------------+
|
||||
// | key serialization len | serialized key | serialized value |
|
||||
// +-----------------------+---------------------+----------------------+
|
||||
|
||||
// Note that, if mapKey and/or mapVal are prolly trees, any in-memory child chunks will be written to vrw at this time.
|
||||
p.ldbKeyScratch[0] = byte(mapKey.Type().Kind())
|
||||
copy(p.ldbKeyScratch[1:], mapKey.Hash().DigestSlice())
|
||||
mapKeyData := encToSlice(mapKey, p.keyScratch[:], p.vrw)
|
||||
mapValData := encToSlice(mapVal, p.valScratch[:], p.vrw)
|
||||
encodedMapKey := encToSlice(mapKey, mapKeyArray[:], opc.vrw)
|
||||
encodedMapVal := encToSlice(mapVal, mapValArray[:], opc.vrw)
|
||||
ldbValueArray := [initialBufferSize * 2]byte{}
|
||||
binary.LittleEndian.PutUint32(ldbValueArray[:], uint32(len(encodedMapKey)))
|
||||
ldbValue := ldbValueArray[0:4]
|
||||
ldbValue = append(ldbValue, encodedMapKey...)
|
||||
ldbValue = append(ldbValue, encodedMapVal...)
|
||||
|
||||
mapKeyByteLen := len(mapKeyData)
|
||||
data := make([]byte, uint32Size+mapKeyByteLen+len(mapValData))
|
||||
binary.LittleEndian.PutUint32(data, uint32(mapKeyByteLen))
|
||||
copy(data[uint32Size:], mapKeyData)
|
||||
copy(data[uint32Size+mapKeyByteLen:], mapValData)
|
||||
|
||||
// TODO: Will manually batching these help?
|
||||
err := p.ops.Put(p.ldbKeyScratch[:], data, nil)
|
||||
// TODO: Will manually batching calls to ldb.Put() help?
|
||||
err := opc.ldb.Put(ldbKey, ldbValue, nil)
|
||||
d.Chk.NoError(err)
|
||||
|
||||
case BoolKind, NumberKind, StringKind:
|
||||
// In this case, we can just serialize mapKey and use it as the ldb key, so we can also just serialize mapVal and dump that into the DB.
|
||||
keyData := encToSlice(mapKey, p.keyScratch[:], p.vrw)
|
||||
valData := encToSlice(mapVal, p.valScratch[:], p.vrw)
|
||||
// TODO: Will manually batching these help?
|
||||
err := p.ops.Put(keyData, valData, nil)
|
||||
ldbKey := ldbKeyFromValue(mapKey, opc.colId, opc.vrw)
|
||||
encodedMapVal := encToSlice(mapVal, mapValArray[:], opc.vrw)
|
||||
err := opc.ldb.Put(ldbKey, encodedMapVal, nil)
|
||||
d.Chk.NoError(err)
|
||||
}
|
||||
}
|
||||
|
||||
// Note that, if 'v' are prolly trees, any in-memory child chunks will be written to vw at this time.
|
||||
func encToSlice(v Value, initBuf []byte, vw ValueWriter) []byte {
|
||||
// TODO: Are there enough calls to this that it's worth re-using a nomsWriter and valueEncoder?
|
||||
w := &binaryNomsWriter{initBuf, 0}
|
||||
@@ -91,37 +125,99 @@ func encToSlice(v Value, initBuf []byte, vw ValueWriter) []byte {
|
||||
return w.data()
|
||||
}
|
||||
|
||||
func (p *opCache) NewIterator() *opCacheIterator {
|
||||
return &opCacheIterator{p.ops.NewIterator(nil, nil), p.vrw}
|
||||
func (opc *ldbOpCache) NewIterator() opCacheIterator {
|
||||
prefix := [4]byte{}
|
||||
binary.LittleEndian.PutUint32(prefix[:], opc.colId)
|
||||
return &ldbOpCacheIterator{iter: opc.ldb.NewIterator(util.BytesPrefix(prefix[:]), nil), vr: opc.vrw}
|
||||
}
|
||||
|
||||
func (p *opCache) Destroy() error {
|
||||
d.Chk.NoError(p.ops.Close())
|
||||
return os.RemoveAll(p.dbDir)
|
||||
}
|
||||
|
||||
func (i *opCacheIterator) Next() bool {
|
||||
func (i *ldbOpCacheIterator) Next() bool {
|
||||
return i.iter.Next()
|
||||
}
|
||||
|
||||
func (i *opCacheIterator) Op() sequenceItem {
|
||||
func (i *ldbOpCacheIterator) MapOp() sequenceItem {
|
||||
entry := mapEntry{}
|
||||
ldbKey := i.iter.Key()
|
||||
data := i.iter.Value()
|
||||
dataOffset := 0
|
||||
switch NomsKind(ldbKey[0]) {
|
||||
ldbValue := i.iter.Value()
|
||||
switch NomsKind(ldbKey[uint32Size]) {
|
||||
case BoolKind, NumberKind, StringKind:
|
||||
entry.key = DecodeFromBytes(ldbKey, i.vr, staticTypeCache)
|
||||
entry.key = DecodeFromBytes(ldbKey[uint32Size:], i.vr, staticTypeCache)
|
||||
entry.value = DecodeFromBytes(ldbValue, i.vr, staticTypeCache)
|
||||
default:
|
||||
keyBytesLen := int(binary.LittleEndian.Uint32(data))
|
||||
entry.key = DecodeFromBytes(data[uint32Size:uint32Size+keyBytesLen], i.vr, staticTypeCache)
|
||||
dataOffset = uint32Size + keyBytesLen
|
||||
keyBytesLen := int(binary.LittleEndian.Uint32(ldbValue))
|
||||
entry.key = DecodeFromBytes(ldbValue[uint32Size:uint32Size+keyBytesLen], i.vr, staticTypeCache)
|
||||
entry.value = DecodeFromBytes(ldbValue[uint32Size+keyBytesLen:], i.vr, staticTypeCache)
|
||||
}
|
||||
|
||||
entry.value = DecodeFromBytes(data[dataOffset:], i.vr, staticTypeCache)
|
||||
return entry
|
||||
}
|
||||
|
||||
func (i *opCacheIterator) Release() {
|
||||
// Insert can be called from any goroutine
|
||||
func (opc *ldbOpCache) SetInsert(val Value) {
|
||||
switch val.Type().Kind() {
|
||||
default:
|
||||
ldbKey := ldbKeyFromValueHash(val, opc.colId)
|
||||
valArray := [initialBufferSize]byte{}
|
||||
encodedVal := encToSlice(val, valArray[:], opc.vrw)
|
||||
err := opc.ldb.Put(ldbKey, encodedVal, nil)
|
||||
d.Chk.NoError(err)
|
||||
|
||||
case BoolKind, NumberKind, StringKind:
|
||||
ldbKey := ldbKeyFromValue(val, opc.colId, opc.vrw)
|
||||
// Since the ldbKey contains the val, there's no reason to store anything in the ldbValue
|
||||
err := opc.ldb.Put(ldbKey, nil, nil)
|
||||
d.Chk.NoError(err)
|
||||
}
|
||||
}
|
||||
|
||||
func (i *ldbOpCacheIterator) SetOp() sequenceItem {
|
||||
ldbKey := i.iter.Key()
|
||||
|
||||
switch NomsKind(ldbKey[uint32Size]) {
|
||||
case BoolKind, NumberKind, StringKind:
|
||||
return DecodeFromBytes(ldbKey[uint32Size:], i.vr, staticTypeCache)
|
||||
default:
|
||||
data := i.iter.Value()
|
||||
return DecodeFromBytes(data, i.vr, staticTypeCache)
|
||||
}
|
||||
}
|
||||
|
||||
func (i *ldbOpCacheIterator) Release() {
|
||||
i.iter.Release()
|
||||
}
|
||||
|
||||
// writeLdbKeyHeaderBytes writes the first 4 or 5 bytes into the ldbKey. The first 4 bytes in every
|
||||
// ldbKey are the colId. This identifies all the keys for a particular opCache and allows this opStore
|
||||
// to cache values for multiple collections. The optional 5th byte is the NomsKind of the value. In
|
||||
// cases where we're encoding the hash of an object we need to write the nomsKind manually because the
|
||||
// hash doesn't contain it. In cases were we are encoding a primitive value into the key, the first byte
|
||||
// of the value is the nomsKind so there is no reason to write it again.
|
||||
func writeLdbKeyHeaderBytes(ldbKey []byte, colId uint32, v Value) []byte {
|
||||
binary.LittleEndian.PutUint32(ldbKey, colId)
|
||||
length := uint32Size
|
||||
if v != nil {
|
||||
ldbKey[length] = byte(v.Type().Kind())
|
||||
length++
|
||||
}
|
||||
return ldbKey[0:length]
|
||||
}
|
||||
|
||||
// ldbKeys for non-primitive Nom Values (e.g. blobs, structs, lists, maps, etc) use a serialization of the values hash:
|
||||
// colId(4 bytes) + nomsKind(val)(1 byte) + val.Hash()(20 bytes).
|
||||
|
||||
func ldbKeyFromValueHash(val Value, colId uint32) []byte {
|
||||
ldbKeyArray := [uint32Size + 1 + hash.ByteLen]byte{}
|
||||
ldbKey := writeLdbKeyHeaderBytes(ldbKeyArray[:], colId, val)
|
||||
return append(ldbKey, val.Hash().DigestSlice()...)
|
||||
}
|
||||
|
||||
// ldbKeys for primitive Noms Values (e.g. bool, number, & string) consist of a byte string that encodes:
|
||||
// colId(4 bytes) + serialized value(n bytes)
|
||||
// Note: the first byte of the serialized value is the NomsKind.
|
||||
func ldbKeyFromValue(val Value, colId uint32, vrw ValueReadWriter) []byte {
|
||||
valArray := [initialBufferSize]byte{}
|
||||
ldbKeyArray := [initialBufferSize]byte{}
|
||||
ldbKey := writeLdbKeyHeaderBytes(ldbKeyArray[:], colId, nil)
|
||||
encodedVal := encToSlice(val, valArray[:], vrw)
|
||||
return append(ldbKey, encodedVal...)
|
||||
}
|
||||
|
||||
@@ -14,6 +14,11 @@ import (
|
||||
type opCacheComparer struct{}
|
||||
|
||||
func (opCacheComparer) Compare(a, b []byte) int {
|
||||
if res := bytes.Compare(a[:uint32Size], b[:uint32Size]); res != 0 {
|
||||
return res
|
||||
}
|
||||
a, b = a[uint32Size:], b[uint32Size:]
|
||||
|
||||
if compared, res := compareEmpties(a, b); compared {
|
||||
return res
|
||||
}
|
||||
|
||||
@@ -19,20 +19,17 @@ func TestOpCache(t *testing.T) {
|
||||
type OpCacheSuite struct {
|
||||
suite.Suite
|
||||
vs *ValueStore
|
||||
oc *opCache
|
||||
}
|
||||
|
||||
func (suite *OpCacheSuite) SetupTest() {
|
||||
suite.vs = NewTestValueStore()
|
||||
suite.oc = newOpCache(suite.vs)
|
||||
}
|
||||
|
||||
func (suite *OpCacheSuite) TearDownTest() {
|
||||
suite.vs.Close()
|
||||
suite.oc.Destroy()
|
||||
}
|
||||
|
||||
func (suite *OpCacheSuite) TestSet() {
|
||||
func (suite *OpCacheSuite) TestMapSet() {
|
||||
entries := mapEntrySlice{
|
||||
{NewList(Number(8), Number(0)), String("ahoy")},
|
||||
{String("A key"), NewBlob(bytes.NewBufferString("A value"))},
|
||||
@@ -44,16 +41,48 @@ func (suite *OpCacheSuite) TestSet() {
|
||||
{String("struct"), NewStruct("thing2", nil)},
|
||||
{Number(42), String("other")},
|
||||
}
|
||||
oc := suite.vs.opCache()
|
||||
for _, entry := range entries {
|
||||
suite.oc.Set(entry.key, entry.value)
|
||||
oc.MapSet(entry.key, entry.value)
|
||||
}
|
||||
sort.Sort(entries)
|
||||
|
||||
iterated := mapEntrySlice{}
|
||||
iter := suite.oc.NewIterator()
|
||||
iter := oc.NewIterator()
|
||||
defer iter.Release()
|
||||
for iter.Next() {
|
||||
iterated = append(iterated, iter.Op().(mapEntry))
|
||||
iterated = append(iterated, iter.MapOp().(mapEntry))
|
||||
}
|
||||
suite.True(entries.Equals(iterated))
|
||||
}
|
||||
|
||||
func (suite *OpCacheSuite) TestSetInsert() {
|
||||
entries := ValueSlice{
|
||||
NewList(Number(8), Number(0)),
|
||||
String("ahoy"),
|
||||
NewBlob(bytes.NewBufferString("A value")),
|
||||
Number(1),
|
||||
Bool(true),
|
||||
Bool(false),
|
||||
NewBlob(bytes.NewBuffer([]byte{0xff, 0, 0})),
|
||||
NewMap(),
|
||||
Number(42),
|
||||
NewStruct("thing1", StructData{"a": Number(7)}),
|
||||
String("struct"),
|
||||
NewStruct("thing2", nil),
|
||||
String("other"),
|
||||
}
|
||||
oc := suite.vs.opCache()
|
||||
for _, entry := range entries {
|
||||
oc.SetInsert(entry)
|
||||
}
|
||||
sort.Sort(entries)
|
||||
|
||||
iterated := ValueSlice{}
|
||||
iter := oc.NewIterator()
|
||||
defer iter.Release()
|
||||
for iter.Next() {
|
||||
iterated = append(iterated, iter.SetOp().(Value))
|
||||
}
|
||||
suite.True(entries.Equals(iterated))
|
||||
}
|
||||
|
||||
@@ -30,6 +30,20 @@ func NewSet(v ...Value) Set {
|
||||
return newSet(seq.Done().(orderedSequence))
|
||||
}
|
||||
|
||||
func NewStreamingSet(vrw ValueReadWriter, vals <-chan Value) <-chan Set {
|
||||
outChan := make(chan Set)
|
||||
go func() {
|
||||
mx := newSetMutator(vrw)
|
||||
|
||||
for v := range vals {
|
||||
mx.Insert(v)
|
||||
}
|
||||
|
||||
outChan <- mx.Finish()
|
||||
}()
|
||||
return outChan
|
||||
}
|
||||
|
||||
// Computes the diff from |last| to |s| using "best" algorithm, which balances returning results early vs completing quickly.
|
||||
func (s Set) Diff(last Set, changes chan<- ValueChanged, closeChan <-chan struct{}) {
|
||||
if s.Equals(last) {
|
||||
|
||||
39
go/types/setMutator.go
Normal file
39
go/types/setMutator.go
Normal file
@@ -0,0 +1,39 @@
|
||||
// Copyright 2016 Attic Labs, Inc. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package types
|
||||
|
||||
import "github.com/attic-labs/noms/go/d"
|
||||
|
||||
type setMutator struct {
|
||||
oc opCache
|
||||
vrw ValueReadWriter
|
||||
}
|
||||
|
||||
func newSetMutator(vrw ValueReadWriter) *setMutator {
|
||||
return &setMutator{vrw.opCache(), vrw}
|
||||
}
|
||||
|
||||
func (mx *setMutator) Insert(val Value) *setMutator {
|
||||
d.Chk.True(mx.oc != nil, "Can't call Insert() again after Finish()")
|
||||
mx.oc.SetInsert(val)
|
||||
return mx
|
||||
}
|
||||
|
||||
func (mx *setMutator) Finish() Set {
|
||||
d.Chk.True(mx.oc != nil, "Can only call Finish() once")
|
||||
defer func() {
|
||||
mx.oc = nil
|
||||
}()
|
||||
|
||||
seq := newEmptySequenceChunker(mx.vrw, mx.vrw, makeSetLeafChunkFn(mx.vrw), newOrderedMetaSequenceChunkFn(SetKind, mx.vrw), hashValueBytes)
|
||||
|
||||
// I tried splitting this up so that the iteration ran in a separate goroutine from the Append'ing, but it actually made things a bit slower when I ran a test.
|
||||
iter := mx.oc.NewIterator()
|
||||
defer iter.Release()
|
||||
for iter.Next() {
|
||||
seq.Append(iter.SetOp())
|
||||
}
|
||||
return newSet(seq.Done().(orderedSequence))
|
||||
}
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"bytes"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/attic-labs/testify/assert"
|
||||
@@ -156,6 +157,43 @@ func newSetTestSuite(size uint, expectRefStr string, expectChunkCount int, expec
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *setTestSuite) createStreamingSet(vs *ValueStore) {
|
||||
randomized := make(ValueSlice, len(suite.elems))
|
||||
for i, j := range rand.Perm(len(randomized)) {
|
||||
randomized[j] = suite.elems[i]
|
||||
}
|
||||
|
||||
vChan := make(chan Value)
|
||||
setChan := NewStreamingSet(vs, vChan)
|
||||
for _, entry := range randomized {
|
||||
vChan <- entry
|
||||
}
|
||||
close(vChan)
|
||||
suite.True(suite.validate(<-setChan))
|
||||
}
|
||||
|
||||
func (suite *setTestSuite) TestStreamingSet() {
|
||||
vs := NewTestValueStore()
|
||||
defer vs.Close()
|
||||
suite.createStreamingSet(vs)
|
||||
}
|
||||
|
||||
func (suite *setTestSuite) TestStreamingSet2() {
|
||||
vs := NewTestValueStore()
|
||||
defer vs.Close()
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
suite.createStreamingSet(vs)
|
||||
wg.Done()
|
||||
}()
|
||||
go func() {
|
||||
suite.createStreamingSet(vs)
|
||||
wg.Done()
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func TestSetSuite1K(t *testing.T) {
|
||||
suite.Run(t, newSetTestSuite(10, "n99i86gc4s23ol7ctmjuc1p4jk4msr4i", 0, 0, 0, newNumber))
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ type ValueWriter interface {
|
||||
type ValueReadWriter interface {
|
||||
ValueReader
|
||||
ValueWriter
|
||||
opCache() opCache
|
||||
}
|
||||
|
||||
// ValueStore provides methods to read and write Noms Values to a BatchStore. It validates Values as they are written, but does not guarantee that these Values are persisted to the BatchStore until a subsequent Flush. or Close.
|
||||
@@ -39,6 +40,8 @@ type ValueStore struct {
|
||||
cache map[hash.Hash]chunkCacheEntry
|
||||
mu *sync.Mutex
|
||||
valueCache *sizecache.SizeCache
|
||||
opcStore opCacheStore
|
||||
once sync.Once
|
||||
}
|
||||
|
||||
const defaultValueCacheSize = 1 << 25 // 32MB
|
||||
@@ -64,7 +67,7 @@ func NewValueStore(bs BatchStore) *ValueStore {
|
||||
}
|
||||
|
||||
func NewValueStoreWithCache(bs BatchStore, cacheSize uint64) *ValueStore {
|
||||
return &ValueStore{bs, map[hash.Hash]chunkCacheEntry{}, &sync.Mutex{}, sizecache.New(cacheSize)}
|
||||
return &ValueStore{bs, map[hash.Hash]chunkCacheEntry{}, &sync.Mutex{}, sizecache.New(cacheSize), nil, sync.Once{}}
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) BatchStore() BatchStore {
|
||||
@@ -125,6 +128,11 @@ func (lvs *ValueStore) Flush() {
|
||||
// Close closes the underlying BatchStore
|
||||
func (lvs *ValueStore) Close() error {
|
||||
lvs.Flush()
|
||||
if lvs.opcStore != nil {
|
||||
err := lvs.opcStore.destroy()
|
||||
d.Chk.NoError(err, "Attempt to clean up opCacheStore failed, error: %s\n", err)
|
||||
lvs.opcStore = nil
|
||||
}
|
||||
return lvs.bs.Close()
|
||||
}
|
||||
|
||||
@@ -171,6 +179,13 @@ func (lvs *ValueStore) ensureChunksInCache(v Value) {
|
||||
lvs.checkChunksInCache(v, true)
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) opCache() opCache {
|
||||
lvs.once.Do(func() {
|
||||
lvs.opcStore = newLdbOpCacheStore(lvs)
|
||||
})
|
||||
return lvs.opcStore.opCache()
|
||||
}
|
||||
|
||||
func (lvs *ValueStore) checkChunksInCache(v Value, readValues bool) Hints {
|
||||
hints := map[hash.Hash]struct{}{}
|
||||
for _, reachable := range v.Chunks() {
|
||||
|
||||
Reference in New Issue
Block a user