diff --git a/go/chunks/dynamo_store.go b/go/chunks/dynamo_store.go index e8c7ea8d1d..24eab63eef 100644 --- a/go/chunks/dynamo_store.go +++ b/go/chunks/dynamo_store.go @@ -122,7 +122,9 @@ func (s *DynamoStore) Get(h hash.Hash) Chunk { func (s *DynamoStore) GetMany(hashes hash.HashSet, foundChunks chan *Chunk) { for h, _ := range hashes { c := s.Get(h) - foundChunks <- &c + if !c.IsEmpty() { + foundChunks <- &c + } } return } diff --git a/go/chunks/leveldb_store.go b/go/chunks/leveldb_store.go index 53e939359d..dbc7241359 100644 --- a/go/chunks/leveldb_store.go +++ b/go/chunks/leveldb_store.go @@ -103,7 +103,9 @@ func (l *LevelDBStore) Get(ref hash.Hash) Chunk { func (l *LevelDBStore) GetMany(hashes hash.HashSet, foundChunks chan *Chunk) { for h, _ := range hashes { c := l.Get(h) - foundChunks <- &c + if !c.IsEmpty() { + foundChunks <- &c + } } return } diff --git a/go/chunks/memory_store.go b/go/chunks/memory_store.go index ffd8207e75..b2e9582151 100644 --- a/go/chunks/memory_store.go +++ b/go/chunks/memory_store.go @@ -35,7 +35,9 @@ func (ms *MemoryStore) Get(h hash.Hash) Chunk { func (ms *MemoryStore) GetMany(hashes hash.HashSet, foundChunks chan *Chunk) { for h, _ := range hashes { c := ms.Get(h) - foundChunks <- &c + if !c.IsEmpty() { + foundChunks <- &c + } } return } diff --git a/go/datas/remote_database_handlers_test.go b/go/datas/remote_database_handlers_test.go index 044a21c5d6..4433a6dff1 100644 --- a/go/datas/remote_database_handlers_test.go +++ b/go/datas/remote_database_handlers_test.go @@ -26,13 +26,15 @@ import ( func TestHandleWriteValue(t *testing.T) { assert := assert.New(t) cs := chunks.NewTestStore() - ds := NewDatabase(cs) + db := NewDatabase(cs) l := types.NewList( - ds.WriteValue(types.Bool(true)), - ds.WriteValue(types.Bool(false)), + db.WriteValue(types.Bool(true)), + db.WriteValue(types.Bool(false)), ) - ds.WriteValue(l) + r := db.WriteValue(l) + _, err := db.CommitValue(db.GetDataset("datasetID"), r) + assert.NoError(err) hint := l.Hash() newItem := types.NewEmptyBlob() @@ -49,8 +51,8 @@ func TestHandleWriteValue(t *testing.T) { HandleWriteValue(w, newRequest("POST", "", "", body, nil), params{}, cs) if assert.Equal(http.StatusCreated, w.Code, "Handler error:\n%s", string(w.Body.Bytes())) { - ds2 := NewDatabase(cs) - v := ds2.ReadValue(l2.Hash()) + db2 := NewDatabase(cs) + v := db2.ReadValue(l2.Hash()) if assert.NotNil(v) { assert.True(v.Equals(l2), "%+v != %+v", v, l2) } @@ -89,8 +91,8 @@ func TestHandleWriteValueDupChunks(t *testing.T) { HandleWriteValue(w, newRequest("POST", "", "", body, nil), params{}, cs) if assert.Equal(http.StatusCreated, w.Code, "Handler error:\n%s", string(w.Body.Bytes())) { - ds2 := NewDatabase(cs) - v := ds2.ReadValue(newItem.Hash()) + db := NewDatabase(cs) + v := db.ReadValue(newItem.Hash()) if assert.NotNil(v) { assert.True(v.Equals(newItem), "%+v != %+v", v, newItem) } @@ -100,13 +102,15 @@ func TestHandleWriteValueDupChunks(t *testing.T) { func TestHandleWriteValueBackpressure(t *testing.T) { assert := assert.New(t) cs := &backpressureCS{ChunkStore: chunks.NewMemoryStore()} - ds := NewDatabase(cs) + db := NewDatabase(cs) l := types.NewList( - ds.WriteValue(types.Bool(true)), - ds.WriteValue(types.Bool(false)), + db.WriteValue(types.Bool(true)), + db.WriteValue(types.Bool(false)), ) - ds.WriteValue(l) + r := db.WriteValue(l) + _, err := db.CommitValue(db.GetDataset("datasetID"), r) + assert.NoError(err) hint := l.Hash() newItem := types.NewEmptyBlob() diff --git a/go/types/batch_store.go b/go/types/batch_store.go index b3729a2413..04499773bd 100644 --- a/go/types/batch_store.go +++ b/go/types/batch_store.go @@ -44,7 +44,7 @@ type BatchStore interface { } // Hints are a set of hashes that should be used to speed up the validation of one or more Chunks. -type Hints map[hash.Hash]struct{} +type Hints hash.HashSet // BatchStoreAdaptor provides a naive implementation of BatchStore should only be used with ChunkStores that can Put relatively quickly. It provides no actual batching or validation. Its intended use is for adapting a ChunkStore for use in something that requires a BatchStore. type BatchStoreAdaptor struct { diff --git a/go/types/validating_batching_sink.go b/go/types/validating_batching_sink.go index b6f98d9c21..f436e164e8 100644 --- a/go/types/validating_batching_sink.go +++ b/go/types/validating_batching_sink.go @@ -32,19 +32,23 @@ func NewValidatingBatchingSink(cs chunks.ChunkStore) *ValidatingBatchingSink { // Prepare primes the type info cache used to validate Enqueued Chunks by reading the Chunks referenced by the provided hints. func (vbs *ValidatingBatchingSink) Prepare(hints Hints) { - rl := make(chan struct{}, batchSize) + foundChunks := make(chan *chunks.Chunk, batchSize) wg := sync.WaitGroup{} - for hint := range hints { + for i := 0; i < batchSize; i++ { wg.Add(1) - rl <- struct{}{} - go func(hint hash.Hash) { - vbs.vs.ReadValue(hint) - <-rl - wg.Done() - }(hint) + go func() { + defer wg.Done() + tc := vbs.pool.Get() + defer vbs.pool.Put(tc) + for c := range foundChunks { + v := DecodeFromBytes(c.Data(), vbs.vs, tc.(*TypeCache)) + vbs.vs.setHintsForReadValue(v, c.Hash()) + } + }() } + vbs.cs.GetMany(hash.HashSet(hints), foundChunks) + close(foundChunks) wg.Wait() - close(rl) } // DecodedChunk holds a pointer to a Chunk and the Value that results from diff --git a/go/types/value_store.go b/go/types/value_store.go index 54870cae82..81a1759558 100644 --- a/go/types/value_store.go +++ b/go/types/value_store.go @@ -46,7 +46,7 @@ type ValueReadWriter interface { type ValueStore struct { bs BatchStore cacheMu sync.RWMutex - cache map[hash.Hash]chunkCacheEntry + hintCache map[hash.Hash]chunkCacheEntry pendingHints map[hash.Hash]chunkCacheEntry pendingMu sync.RWMutex pendingPuts map[hash.Hash]pendingChunk @@ -90,7 +90,7 @@ func NewValueStoreWithCache(bs BatchStore, cacheSize uint64) *ValueStore { return &ValueStore{ bs: bs, cacheMu: sync.RWMutex{}, - cache: map[hash.Hash]chunkCacheEntry{}, + hintCache: map[hash.Hash]chunkCacheEntry{}, pendingHints: map[hash.Hash]chunkCacheEntry{}, pendingMu: sync.RWMutex{}, pendingPuts: map[hash.Hash]pendingChunk{}, @@ -132,18 +132,21 @@ func (lvs *ValueStore) ReadValue(h hash.Hash) Value { v := DecodeValue(chunk, lvs) lvs.valueCache.Add(h, uint64(len(chunk.Data())), v) + lvs.setHintsForReadValue(v, h) + return v +} +func (lvs *ValueStore) setHintsForReadValue(v Value, h hash.Hash) { var entry chunkCacheEntry = absentChunk{} if v != nil { - lvs.cacheChunks(v, h, false) - // h is trivially a hint for v, so consider putting that in the cache. If we got to v by reading some higher-level chunk, this entry gets dropped on the floor because h already has a hint in the cache. If we later read some other chunk that references v, cacheChunks will overwrite this with a hint pointing to that chunk. + lvs.setHintsForReachable(v, h, false) + // h is trivially a hint for v, so consider putting that in the hintCache. If we got to v by reading some higher-level chunk, this entry gets dropped on the floor because h already has a hint in the hintCache. If we later read some other chunk that references v, setHintsForReachable will overwrite this with a hint pointing to that chunk. // If we don't do this, top-level Values that get read but not written -- such as the existing Head of a Database upon a Commit -- can be erroneously left out during a pull. entry = hintedChunk{v.Type(), h} } if cur := lvs.check(h); cur == nil || cur.Hint().IsEmpty() { lvs.set(h, entry, false) } - return v } // WriteValue takes a Value, schedules it to be written it to lvs, and returns @@ -176,7 +179,7 @@ func (lvs *ValueStore) WriteValue(v Value) Ref { }) }() - lvs.cacheChunks(v, v.Hash(), true) + lvs.setHintsForReachable(v, v.Hash(), true) lvs.set(h, (*presentChunk)(v.Type()), false) lvs.valueCache.Drop(h) return r @@ -205,8 +208,8 @@ func (lvs *ValueStore) Close() error { return lvs.bs.Close() } -// cacheChunks looks at the Chunks reachable from v and, for each one checks if there's a hint in the cache. If there isn't, or if the hint is a self-reference, the chunk gets r set as its new hint. -func (lvs *ValueStore) cacheChunks(v Value, r hash.Hash, toPending bool) { +// setHintsForReachable looks at the Chunks reachable from v and, for each one checks if there's a hint in the hintCache. If there isn't, or if the hint is a self-reference, the chunk gets r set as its new hint. +func (lvs *ValueStore) setHintsForReachable(v Value, r hash.Hash, toPending bool) { v.WalkRefs(func(reachable Ref) { hash := reachable.TargetHash() if cur := lvs.check(hash); cur == nil || cur.Hint().IsEmpty() || cur.Hint() == hash { @@ -225,7 +228,7 @@ func (lvs *ValueStore) isPresent(r hash.Hash) (present bool) { func (lvs *ValueStore) check(r hash.Hash) chunkCacheEntry { lvs.cacheMu.RLock() defer lvs.cacheMu.RUnlock() - return lvs.cache[r] + return lvs.hintCache[r] } func (lvs *ValueStore) set(r hash.Hash, entry chunkCacheEntry, toPending bool) { @@ -234,7 +237,7 @@ func (lvs *ValueStore) set(r hash.Hash, entry chunkCacheEntry, toPending bool) { if toPending { lvs.pendingHints[r] = entry } else { - lvs.cache[r] = entry + lvs.hintCache[r] = entry } } @@ -243,7 +246,7 @@ func (lvs *ValueStore) mergePendingHints() { defer lvs.cacheMu.Unlock() for h, entry := range lvs.pendingHints { - lvs.cache[h] = entry + lvs.hintCache[h] = entry } lvs.pendingHints = map[hash.Hash]chunkCacheEntry{} } @@ -266,11 +269,11 @@ func (lvs *ValueStore) opCache() opCache { func (lvs *ValueStore) checkChunksInCache(v Value, readValues bool) Hints { hints := map[hash.Hash]struct{}{} collectHints := func(reachable Ref) { - // First, check the type cache to see if reachable is already known to be valid. + // First, check the hintCache to see if reachable is already known to be valid. targetHash := reachable.TargetHash() entry := lvs.check(targetHash) - // If it's not already in the cache, attempt to read the value directly, which will put it and its chunks into the cache. + // If it's not already in the hintCache, attempt to read the value directly, which will put it and its chunks into the hintCache. if entry == nil || !entry.Present() { var reachableV Value if readValues {