From e7a96c3748c3ec4dd1b3b69e6a9979762bc9decc Mon Sep 17 00:00:00 2001 From: cmasone-attic Date: Sun, 8 Jan 2017 14:37:37 -0800 Subject: [PATCH] Add ValueStore.ReadManyValues() (#3036) The more code can use GetMany(), the better performance gets on top of NBS. To this end, add a call to ValueStore that allows code to read many values concurrently. This can be used e.g. by read-ahead code that's navigating prolly trees to increase performance. Fixes #3019 --- go/chunks/chunk_store.go | 32 +++++---- go/chunks/dynamo_store.go | 17 ++--- go/chunks/remote_requests.go | 75 +++++++++++++++------ go/chunks/remote_requests_test.go | 51 ++++++++++++-- go/datas/http_batch_store.go | 39 +++++++++-- go/datas/http_batch_store_test.go | 19 ++++++ go/hash/hash.go | 8 +++ go/nbs/benchmarks/block_store_benchmarks.go | 9 +-- go/nbs/benchmarks/file_block_store.go | 2 +- go/nbs/benchmarks/main.go | 17 ++--- go/nbs/benchmarks/null_block_store.go | 2 +- go/nbs/cache.go | 7 ++ go/types/batch_store.go | 11 +++ go/types/value_store.go | 53 +++++++++++++++ go/types/value_store_test.go | 37 ++++++++++ 15 files changed, 309 insertions(+), 70 deletions(-) diff --git a/go/chunks/chunk_store.go b/go/chunks/chunk_store.go index 85b3c5bfe8..e6d6474bb7 100644 --- a/go/chunks/chunk_store.go +++ b/go/chunks/chunk_store.go @@ -19,8 +19,9 @@ type ChunkStore interface { RootTracker } -// Factory allows the creation of namespaced ChunkStore instances. The details of how namespaces -// are separated is left up to the particular implementation of Factory and ChunkStore. +// Factory allows the creation of namespaced ChunkStore instances. The details +// of how namespaces are separated is left up to the particular implementation +// of Factory and ChunkStore. type Factory interface { CreateStore(ns string) ChunkStore @@ -28,10 +29,11 @@ type Factory interface { Shutter() } -// RootTracker allows querying and management of the root of an entire tree of references. The -// "root" is the single mutable variable in a ChunkStore. It can store any hash, but it is -// typically used by higher layers (such as Database) to store a hash to a value that represents -// the current state and entire history of a database. +// RootTracker allows querying and management of the root of an entire tree of +// references. The "root" is the single mutable variable in a ChunkStore. It +// can store any hash, but it is typically used by higher layers (such as +// Database) to store a hash to a value that represents the current state and +// entire history of a database. type RootTracker interface { Root() hash.Hash UpdateRoot(current, last hash.Hash) bool @@ -39,12 +41,13 @@ type RootTracker interface { // ChunkSource is a place to get chunks from. type ChunkSource interface { - // Get the Chunk for the value of the hash in the store. If the hash is absent from the store nil - // is returned. + // Get the Chunk for the value of the hash in the store. If the hash is + // absent from the store nil is returned. Get(h hash.Hash) Chunk - // On return, |foundChunks| will have been fully sent all chunks which have been found. Any - // non-present chunks will silently be ignored. + // GetMany gets the Chunks with |hashes| from the store. On return, + // |foundChunks| will have been fully sent all chunks which have been + // found. Any non-present chunks will silently be ignored. GetMany(hashes hash.HashSet, foundChunks chan *Chunk) // Returns true iff the value at the address |h| is contained in the source @@ -59,8 +62,9 @@ type ChunkSink interface { // Put writes c into the ChunkSink, blocking until the operation is complete. Put(c Chunk) - // PutMany tries to write chunks into the sink. It will block as it handles as many as possible, - // then return a BackpressureError containing the rest (if any). + // PutMany tries to write chunks into the sink. It will block as it + // handles as many as possible, then return a BackpressureError containing + // the rest (if any). PutMany(chunks []Chunk) BackpressureError // On return, any previously Put chunks should be durable @@ -69,8 +73,8 @@ type ChunkSink interface { io.Closer } -// BackpressureError is a slice of hash.Hash that indicates some chunks could not be Put(). Caller -// is free to try to Put them again later. +// BackpressureError is a slice of hash.Hash that indicates some chunks could +// not be Put(). Caller is free to try to Put them again later. type BackpressureError hash.HashSlice func (b BackpressureError) Error() string { diff --git a/go/chunks/dynamo_store.go b/go/chunks/dynamo_store.go index a4ba2f4813..ba570d2357 100644 --- a/go/chunks/dynamo_store.go +++ b/go/chunks/dynamo_store.go @@ -113,10 +113,10 @@ func (s *DynamoStore) Get(h hash.Hash) Chunk { return pending } - ch := make(chan Chunk) + ch := make(chan *Chunk) s.requestWg.Add(1) - s.readQueue <- GetRequest{h, ch} - return <-ch + s.readQueue <- NewGetRequest(h, ch) + return *(<-ch) } func (s *DynamoStore) GetMany(hashes hash.HashSet, foundChunks chan *Chunk) { @@ -137,7 +137,7 @@ func (s *DynamoStore) Has(h hash.Hash) bool { ch := make(chan bool) s.requestWg.Add(1) - s.readQueue <- HasRequest{h, ch} + s.readQueue <- NewHasRequest(h, ch) return <-ch } @@ -197,9 +197,10 @@ func (s *DynamoStore) sendGetRequests(req ReadRequest) { refs := map[hash.Hash]bool{} addReq := func(req ReadRequest) { - r := req.Hash() - batch[r] = append(batch[r], req.Outstanding()) - refs[r] = true + for h := range req.Hashes() { + batch[h] = append(batch[h], req.Outstanding()) + refs[h] = true + } s.requestWg.Done() } addReq(req) @@ -260,7 +261,7 @@ func (s *DynamoStore) processResponses(responses []map[string]*dynamodb.Attribut } c := NewChunkWithHash(r, b) for _, reqChan := range batch[r] { - reqChan.Satisfy(c) + reqChan.Satisfy(&c) } delete(batch, r) } diff --git a/go/chunks/remote_requests.go b/go/chunks/remote_requests.go index dec96a7d04..876f5e2531 100644 --- a/go/chunks/remote_requests.go +++ b/go/chunks/remote_requests.go @@ -4,41 +4,63 @@ package chunks -import "github.com/attic-labs/noms/go/hash" +import ( + "sync" + + "github.com/attic-labs/noms/go/hash" +) type ReadRequest interface { - Hash() hash.Hash + Hashes() hash.HashSet Outstanding() OutstandingRequest } -func NewGetRequest(r hash.Hash, ch chan Chunk) GetRequest { - return GetRequest{r, ch} +func NewGetRequest(r hash.Hash, ch chan<- *Chunk) GetRequest { + return GetRequest{hash.HashSet{r: struct{}{}}, ch} } type GetRequest struct { - r hash.Hash - ch chan Chunk + hashes hash.HashSet + ch chan<- *Chunk } -func NewHasRequest(r hash.Hash, ch chan bool) HasRequest { - return HasRequest{r, ch} +func NewGetManyRequest(hashes hash.HashSet, wg *sync.WaitGroup, ch chan<- *Chunk) GetManyRequest { + return GetManyRequest{hashes, wg, ch} +} + +type GetManyRequest struct { + hashes hash.HashSet + wg *sync.WaitGroup + ch chan<- *Chunk +} + +func NewHasRequest(r hash.Hash, ch chan<- bool) HasRequest { + return HasRequest{hash.HashSet{r: struct{}{}}, ch} } type HasRequest struct { - r hash.Hash - ch chan bool + hashes hash.HashSet + ch chan<- bool } -func (g GetRequest) Hash() hash.Hash { - return g.r +func (g GetRequest) Hashes() hash.HashSet { + return g.hashes } func (g GetRequest) Outstanding() OutstandingRequest { return OutstandingGet(g.ch) } -func (h HasRequest) Hash() hash.Hash { - return h.r +func (g GetManyRequest) Hashes() hash.HashSet { + return g.hashes +} + +func (g GetManyRequest) Outstanding() OutstandingRequest { + return OutstandingGetMany{g.wg, g.ch} +} + +func (h HasRequest) Hashes() hash.HashSet { + return h.hashes } func (h HasRequest) Outstanding() OutstandingRequest { @@ -46,24 +68,37 @@ func (h HasRequest) Outstanding() OutstandingRequest { } type OutstandingRequest interface { - Satisfy(c Chunk) + Satisfy(c *Chunk) Fail() } -type OutstandingGet chan Chunk -type OutstandingHas chan bool +type OutstandingGet chan<- *Chunk +type OutstandingGetMany struct { + wg *sync.WaitGroup + ch chan<- *Chunk +} +type OutstandingHas chan<- bool -func (r OutstandingGet) Satisfy(c Chunk) { +func (r OutstandingGet) Satisfy(c *Chunk) { r <- c close(r) } func (r OutstandingGet) Fail() { - r <- EmptyChunk + r <- &EmptyChunk close(r) } -func (h OutstandingHas) Satisfy(c Chunk) { +func (ogm OutstandingGetMany) Satisfy(c *Chunk) { + ogm.ch <- c + ogm.wg.Done() +} + +func (ogm OutstandingGetMany) Fail() { + ogm.wg.Done() +} + +func (h OutstandingHas) Satisfy(c *Chunk) { h <- true close(h) } diff --git a/go/chunks/remote_requests_test.go b/go/chunks/remote_requests_test.go index e0f804b6ee..9016bde659 100644 --- a/go/chunks/remote_requests_test.go +++ b/go/chunks/remote_requests_test.go @@ -5,6 +5,7 @@ package chunks import ( + "sync" "testing" "github.com/attic-labs/noms/go/hash" @@ -28,10 +29,10 @@ func TestGetRequestBatch(t *testing.T) { } req0chan := make(chan bool, 1) - req1chan := make(chan Chunk, 1) + req1chan := make(chan *Chunk, 1) req2chan := make(chan bool, 1) req3chan := make(chan bool, 1) - req4chan := make(chan Chunk, 1) + req4chan := make(chan *Chunk, 1) batch := ReadBatch{ r0: []OutstandingRequest{OutstandingHas(req0chan), OutstandingGet(req1chan)}, @@ -42,10 +43,10 @@ func TestGetRequestBatch(t *testing.T) { for requestedRef, reqs := range batch { for _, req := range reqs { if requestedRef == r1 { - req.Satisfy(c1) + req.Satisfy(&c1) delete(batch, r1) } else if requestedRef == r2 { - req.Satisfy(c2) + req.Satisfy(&c2) delete(batch, r2) } } @@ -78,3 +79,45 @@ func TestGetRequestBatch(t *testing.T) { assert.Equal(0, r0True) assert.Equal(1, r0False) } + +func TestGetManyRequestBatch(t *testing.T) { + assert := assert.New(t) + h0 := hash.Parse("00000000000000000000000000000000") + c1 := NewChunk([]byte("abc")) + h1 := c1.Hash() + c2 := NewChunk([]byte("123")) + h2 := c2.Hash() + + chunks := make(chan *Chunk) + hashes := hash.NewHashSet(h0, h1, h2) + wg := &sync.WaitGroup{} + wg.Add(len(hashes)) + go func() { wg.Wait(); close(chunks) }() + + req := NewGetManyRequest(hashes, wg, chunks) + batch := ReadBatch{ + h0: {req.Outstanding()}, + h1: {req.Outstanding()}, + h2: {req.Outstanding()}, + } + go func() { + for reqHash, reqs := range batch { + for _, req := range reqs { + if reqHash == h1 { + req.Satisfy(&c1) + delete(batch, h1) + } else if reqHash == h2 { + req.Satisfy(&c2) + delete(batch, h2) + } + } + } + batch.Close() + }() + + for c := range chunks { + hashes.Remove(c.Hash()) + } + assert.Len(hashes, 1) + assert.True(hashes.Has(h0)) +} diff --git a/go/datas/http_batch_store.go b/go/datas/http_batch_store.go index 29fe04012d..9901d592af 100644 --- a/go/datas/http_batch_store.go +++ b/go/datas/http_batch_store.go @@ -138,10 +138,34 @@ func (bhcs *httpBatchStore) Get(h hash.Hash) chunks.Chunk { return pending } - ch := make(chan chunks.Chunk) + ch := make(chan *chunks.Chunk) bhcs.requestWg.Add(1) bhcs.getQueue <- chunks.NewGetRequest(h, ch) - return <-ch + return *(<-ch) +} + +func (bhcs *httpBatchStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) { + cachedChunks := make(chan *chunks.Chunk) + go func() { + bhcs.cacheMu.RLock() + defer bhcs.cacheMu.RUnlock() + defer close(cachedChunks) + bhcs.unwrittenPuts.GetMany(hashes, cachedChunks) + }() + remaining := hash.HashSet{} + for h := range hashes { + remaining.Insert(h) + } + for c := range cachedChunks { + remaining.Remove(c.Hash()) + foundChunks <- c + } + + wg := &sync.WaitGroup{} + wg.Add(len(remaining)) + bhcs.requestWg.Add(1) + bhcs.getQueue <- chunks.NewGetManyRequest(hashes, wg, foundChunks) + wg.Wait() } func (bhcs *httpBatchStore) batchGetRequests() { @@ -199,9 +223,10 @@ func (bhcs *httpBatchStore) sendReadRequests(req chunks.ReadRequest, queue <-cha count := 0 addReq := func(req chunks.ReadRequest) { - hash := req.Hash() - batch[hash] = append(batch[hash], req.Outstanding()) - hashes.Insert(hash) + for h := range req.Hashes() { + batch[h] = append(batch[h], req.Outstanding()) + hashes.Insert(h) + } count++ } @@ -259,7 +284,7 @@ type readBatchChunkSink struct { func (rb *readBatchChunkSink) Put(c chunks.Chunk) { rb.mu.RLock() for _, or := range (*(rb.batch))[c.Hash()] { - or.Satisfy(c) + or.Satisfy(&c) } rb.mu.RUnlock() @@ -311,7 +336,7 @@ func (bhcs *httpBatchStore) hasRefs(hashes hash.HashSet, batch chunks.ReadBatch) if scanner.Text() == "true" { for _, outstanding := range batch[h] { // This is a little gross, but OutstandingHas.Satisfy() expects a chunk. It ignores it, though, and just sends 'true' over the channel it's holding. - outstanding.Satisfy(chunks.EmptyChunk) + outstanding.Satisfy(&chunks.EmptyChunk) } } else { for _, outstanding := range batch[h] { diff --git a/go/datas/http_batch_store_test.go b/go/datas/http_batch_store_test.go index 69bec2808f..576e7eb868 100644 --- a/go/datas/http_batch_store_test.go +++ b/go/datas/http_batch_store_test.go @@ -273,6 +273,25 @@ func (suite *HTTPBatchStoreSuite) TestGet() { suite.Equal(chnx[1].Hash(), got.Hash()) } +func (suite *HTTPBatchStoreSuite) TestGetMany() { + chnx := []chunks.Chunk{ + chunks.NewChunk([]byte("abc")), + chunks.NewChunk([]byte("def")), + } + notPresent := chunks.NewChunk([]byte("ghi")).Hash() + suite.NoError(suite.cs.PutMany(chnx)) + + hashes := hash.NewHashSet(chnx[0].Hash(), chnx[1].Hash(), notPresent) + foundChunks := make(chan *chunks.Chunk) + go func() { suite.store.GetMany(hashes, foundChunks); close(foundChunks) }() + + for c := range foundChunks { + hashes.Remove(c.Hash()) + } + suite.Len(hashes, 1) + suite.True(hashes.Has(notPresent)) +} + func (suite *HTTPBatchStoreSuite) TestGetSame() { chnx := []chunks.Chunk{ chunks.NewChunk([]byte("def")), diff --git a/go/hash/hash.go b/go/hash/hash.go index 0b0398169f..b7c963cc7e 100644 --- a/go/hash/hash.go +++ b/go/hash/hash.go @@ -114,6 +114,14 @@ func (h Hash) Greater(other Hash) bool { // HashSet is a set of Hashes. type HashSet map[Hash]struct{} +func NewHashSet(hashes ...Hash) HashSet { + out := HashSet{} + for _, h := range hashes { + out.Insert(h) + } + return out +} + // Insert adds a Hash to the set. func (hs HashSet) Insert(hash Hash) { hs[hash] = struct{}{} diff --git a/go/nbs/benchmarks/block_store_benchmarks.go b/go/nbs/benchmarks/block_store_benchmarks.go index d7262d141c..d8c931dbe3 100644 --- a/go/nbs/benchmarks/block_store_benchmarks.go +++ b/go/nbs/benchmarks/block_store_benchmarks.go @@ -14,12 +14,7 @@ import ( "github.com/attic-labs/testify/assert" ) -type blockStore interface { - types.BatchStore - GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) -} - -type storeOpenFn func() blockStore +type storeOpenFn func() types.BatchStore func benchmarkNovelWrite(refreshStore storeOpenFn, src *dataSource, t assert.TestingT) bool { store := refreshStore() @@ -28,7 +23,7 @@ func benchmarkNovelWrite(refreshStore storeOpenFn, src *dataSource, t assert.Tes return true } -func writeToEmptyStore(store blockStore, src *dataSource, t assert.TestingT) { +func writeToEmptyStore(store types.BatchStore, src *dataSource, t assert.TestingT) { root := store.Root() assert.Equal(t, hash.Hash{}, root) diff --git a/go/nbs/benchmarks/file_block_store.go b/go/nbs/benchmarks/file_block_store.go index 700b4df242..05c6587368 100644 --- a/go/nbs/benchmarks/file_block_store.go +++ b/go/nbs/benchmarks/file_block_store.go @@ -20,7 +20,7 @@ type fileBlockStore struct { w io.WriteCloser } -func newFileBlockStore(w io.WriteCloser) blockStore { +func newFileBlockStore(w io.WriteCloser) types.BatchStore { return fileBlockStore{bufio.NewWriterSize(w, humanize.MiByte), w} } diff --git a/go/nbs/benchmarks/main.go b/go/nbs/benchmarks/main.go index 68120f3768..ac3364b35e 100644 --- a/go/nbs/benchmarks/main.go +++ b/go/nbs/benchmarks/main.go @@ -14,6 +14,7 @@ import ( "github.com/attic-labs/noms/go/d" "github.com/attic-labs/noms/go/nbs" + "github.com/attic-labs/noms/go/types" "github.com/attic-labs/noms/go/util/profile" "github.com/attic-labs/testify/assert" "github.com/aws/aws-sdk-go/aws" @@ -73,19 +74,19 @@ func main() { open := newNullBlockStore wrote := false var writeDB func() - var refresh func() blockStore + var refresh func() types.BatchStore if *toNBS != "" || *toFile != "" || *toAWS != "" { var reset func() if *toNBS != "" { dir := makeTempDir(*toNBS, pb) defer os.RemoveAll(dir) - open = func() blockStore { return nbs.NewLocalStore(dir, bufSize) } + open = func() types.BatchStore { return nbs.NewLocalStore(dir, bufSize) } reset = func() { os.RemoveAll(dir); os.MkdirAll(dir, 0777) } } else if *toFile != "" { dir := makeTempDir(*toFile, pb) defer os.RemoveAll(dir) - open = func() blockStore { + open = func() types.BatchStore { f, err := ioutil.TempFile(dir, "") d.Chk.NoError(err) return newFileBlockStore(f) @@ -94,7 +95,7 @@ func main() { } else if *toAWS != "" { sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("us-west-2"))) - open = func() blockStore { + open = func() types.BatchStore { return nbs.NewAWSStore(dynamoTable, *toAWS, s3Bucket, sess, bufSize) } reset = func() { @@ -110,21 +111,21 @@ func main() { } writeDB = func() { wrote = ensureNovelWrite(wrote, open, src, pb) } - refresh = func() blockStore { + refresh = func() types.BatchStore { reset() return open() } } else { if *useNBS != "" { - open = func() blockStore { return nbs.NewLocalStore(*useNBS, bufSize) } + open = func() types.BatchStore { return nbs.NewLocalStore(*useNBS, bufSize) } } else if *useAWS != "" { sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("us-west-2"))) - open = func() blockStore { + open = func() types.BatchStore { return nbs.NewAWSStore(dynamoTable, *useAWS, s3Bucket, sess, bufSize) } } writeDB = func() {} - refresh = func() blockStore { panic("WriteNovel unsupported with --useLDB and --useNBS") } + refresh = func() types.BatchStore { panic("WriteNovel unsupported with --useLDB and --useNBS") } } benchmarks := []struct { diff --git a/go/nbs/benchmarks/null_block_store.go b/go/nbs/benchmarks/null_block_store.go index 6b838e7dc4..f711565a0f 100644 --- a/go/nbs/benchmarks/null_block_store.go +++ b/go/nbs/benchmarks/null_block_store.go @@ -14,7 +14,7 @@ type nullBlockStore struct { bogus int32 } -func newNullBlockStore() blockStore { +func newNullBlockStore() types.BatchStore { return nullBlockStore{} } diff --git a/go/nbs/cache.go b/go/nbs/cache.go index b50d75a3f1..f280c4270e 100644 --- a/go/nbs/cache.go +++ b/go/nbs/cache.go @@ -47,6 +47,13 @@ func (nbc *NomsBlockCache) Get(hash hash.Hash) chunks.Chunk { return nbc.chunks.Get(hash) } +// GetMany gets the Chunks with |hashes| from the store. On return, +// |foundChunks| will have been fully sent all chunks which have been +// found. Any non-present chunks will silently be ignored. +func (nbc *NomsBlockCache) GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) { + nbc.chunks.GetMany(hashes, foundChunks) +} + // ExtractChunks writes the entire contents of the cache to chunkChan. The // chunks are extracted in insertion order. func (nbc *NomsBlockCache) ExtractChunks(order EnumerationOrder, chunkChan chan *chunks.Chunk) { diff --git a/go/types/batch_store.go b/go/types/batch_store.go index c18968b45f..7f447021c2 100644 --- a/go/types/batch_store.go +++ b/go/types/batch_store.go @@ -20,6 +20,11 @@ type BatchStore interface { // from the store, chunks.EmptyChunk is returned. Get(h hash.Hash) chunks.Chunk + // GetMany gets the Chunks with |hashes| from the store. On return, + // |foundChunks| will have been fully sent all chunks which have been + // found. Any non-present chunks will silently be ignored. + GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) + // SchedulePut enqueues a write for the Chunk c with the given refHeight. // c must be visible to subsequent Get() calls upon return. Typically, the // Value which was encoded to provide c can also be queried for its @@ -63,6 +68,12 @@ func (bsa *BatchStoreAdaptor) Get(h hash.Hash) chunks.Chunk { return bsa.cs.Get(h) } +// GetMany simply proxies to the backing ChunkStore +func (bsa *BatchStoreAdaptor) GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) { + bsa.once.Do(bsa.expectVersion) + bsa.cs.GetMany(hashes, foundChunks) +} + // SchedulePut simply calls Put on the underlying ChunkStore, and ignores hints. func (bsa *BatchStoreAdaptor) SchedulePut(c chunks.Chunk, refHeight uint64, hints Hints) { bsa.once.Do(bsa.expectVersion) diff --git a/go/types/value_store.go b/go/types/value_store.go index 81a1759558..a847042384 100644 --- a/go/types/value_store.go +++ b/go/types/value_store.go @@ -18,6 +18,7 @@ import ( // package that implements Value reading. type ValueReader interface { ReadValue(h hash.Hash) Value + ReadManyValues(hashes hash.HashSet, foundValues chan<- Value) } // ValueWriter is an interface that knows how to write Noms Values, e.g. @@ -149,6 +150,58 @@ func (lvs *ValueStore) setHintsForReadValue(v Value, h hash.Hash) { } } +// ReadManyValues reads and decodes Values indicated by |hashes| from lvs. On +// return, |foundValues| will have been fully sent all Values which have been +// found. Any non-present Values will silently be ignored. +func (lvs *ValueStore) ReadManyValues(hashes hash.HashSet, foundValues chan<- Value) { + decode := func(h hash.Hash, chunk *chunks.Chunk) Value { + v := DecodeValue(*chunk, lvs) + lvs.valueCache.Add(h, uint64(len(chunk.Data())), v) + lvs.setHintsForReadValue(v, h) + return v + } + + // First, see which hashes can be found in either the Value cache or pendingPuts. Put the rest into a new HashSet to be requested en masse from the BatchStore. + remaining := hash.HashSet{} + for h := range hashes { + if v, ok := lvs.valueCache.Get(h); ok { + if v != nil { + foundValues <- v.(Value) + } + continue + } + + chunk := func() chunks.Chunk { + lvs.pendingMu.RLock() + defer lvs.pendingMu.RUnlock() + if pc, ok := lvs.pendingPuts[h]; ok { + return pc.c + } + return chunks.EmptyChunk + }() + if !chunk.IsEmpty() { + foundValues <- decode(h, &chunk) + continue + } + + remaining.Insert(h) + } + + // Request remaining hashes from BatchStore, processing the found chunks as they come in. + foundChunks := make(chan *chunks.Chunk, 16) + go func() { lvs.bs.GetMany(remaining, foundChunks); close(foundChunks) }() + for c := range foundChunks { + h := c.Hash() + foundValues <- decode(h, c) + remaining.Remove(h) + } + + // Any remaining hashes weren't found in the BatchStore should be recorded as not present. + for h := range remaining { + lvs.valueCache.Add(h, 0, nil) + } +} + // WriteValue takes a Value, schedules it to be written it to lvs, and returns // an appropriately-typed types.Ref. v is not guaranteed to be actually // written until after Flush(). diff --git a/go/types/value_store_test.go b/go/types/value_store_test.go index 6b90ec6283..1a72248e0d 100644 --- a/go/types/value_store_test.go +++ b/go/types/value_store_test.go @@ -19,10 +19,47 @@ func TestValueReadWriteRead(t *testing.T) { vs := NewTestValueStore() assert.Nil(vs.ReadValue(s.Hash())) // nil r := vs.WriteValue(s) + vs.Flush() v := vs.ReadValue(r.TargetHash()) // non-nil assert.True(s.Equals(v)) } +func TestValueReadMany(t *testing.T) { + assert := assert.New(t) + + vals := ValueSlice{String("hello"), Bool(true), Number(42)} + vs := NewTestValueStore() + hashes := hash.HashSet{} + for _, v := range vals { + hashes.Insert(vs.WriteValue(v).TargetHash()) + } + vs.Flush() + + // Get one Value into vs's Value cache + vs.ReadValue(vals[0].Hash()) + + // Get one Value into vs's pendingPuts + three := Number(3) + vals = append(vals, three) + vs.WriteValue(three) + hashes.Insert(three.Hash()) + + // Add one Value to request that's not in vs + hashes.Insert(Bool(false).Hash()) + + found := map[hash.Hash]Value{} + foundValues := make(chan Value, len(vals)) + go func() { vs.ReadManyValues(hashes, foundValues); close(foundValues) }() + for v := range foundValues { + found[v.Hash()] = v + } + + assert.Len(found, len(vals)) + for _, v := range vals { + assert.True(v.Equals(found[v.Hash()])) + } +} + func TestCheckChunksInCache(t *testing.T) { assert := assert.New(t) cs := chunks.NewTestStore()