Add ValueStore.ReadManyValues() (#3036)

The more code can use GetMany(), the better performance gets on top of
NBS. To this end, add a call to ValueStore that allows code to read
many values concurrently. This can be used e.g. by read-ahead code
that's navigating prolly trees to increase performance.

Fixes #3019
This commit is contained in:
cmasone-attic
2017-01-08 14:37:37 -08:00
committed by Rafael Weinstein
parent a4ffa5ba9b
commit e7a96c3748
15 changed files with 309 additions and 70 deletions

View File

@@ -19,8 +19,9 @@ type ChunkStore interface {
RootTracker
}
// Factory allows the creation of namespaced ChunkStore instances. The details of how namespaces
// are separated is left up to the particular implementation of Factory and ChunkStore.
// Factory allows the creation of namespaced ChunkStore instances. The details
// of how namespaces are separated is left up to the particular implementation
// of Factory and ChunkStore.
type Factory interface {
CreateStore(ns string) ChunkStore
@@ -28,10 +29,11 @@ type Factory interface {
Shutter()
}
// RootTracker allows querying and management of the root of an entire tree of references. The
// "root" is the single mutable variable in a ChunkStore. It can store any hash, but it is
// typically used by higher layers (such as Database) to store a hash to a value that represents
// the current state and entire history of a database.
// RootTracker allows querying and management of the root of an entire tree of
// references. The "root" is the single mutable variable in a ChunkStore. It
// can store any hash, but it is typically used by higher layers (such as
// Database) to store a hash to a value that represents the current state and
// entire history of a database.
type RootTracker interface {
Root() hash.Hash
UpdateRoot(current, last hash.Hash) bool
@@ -39,12 +41,13 @@ type RootTracker interface {
// ChunkSource is a place to get chunks from.
type ChunkSource interface {
// Get the Chunk for the value of the hash in the store. If the hash is absent from the store nil
// is returned.
// Get the Chunk for the value of the hash in the store. If the hash is
// absent from the store nil is returned.
Get(h hash.Hash) Chunk
// On return, |foundChunks| will have been fully sent all chunks which have been found. Any
// non-present chunks will silently be ignored.
// GetMany gets the Chunks with |hashes| from the store. On return,
// |foundChunks| will have been fully sent all chunks which have been
// found. Any non-present chunks will silently be ignored.
GetMany(hashes hash.HashSet, foundChunks chan *Chunk)
// Returns true iff the value at the address |h| is contained in the source
@@ -59,8 +62,9 @@ type ChunkSink interface {
// Put writes c into the ChunkSink, blocking until the operation is complete.
Put(c Chunk)
// PutMany tries to write chunks into the sink. It will block as it handles as many as possible,
// then return a BackpressureError containing the rest (if any).
// PutMany tries to write chunks into the sink. It will block as it
// handles as many as possible, then return a BackpressureError containing
// the rest (if any).
PutMany(chunks []Chunk) BackpressureError
// On return, any previously Put chunks should be durable
@@ -69,8 +73,8 @@ type ChunkSink interface {
io.Closer
}
// BackpressureError is a slice of hash.Hash that indicates some chunks could not be Put(). Caller
// is free to try to Put them again later.
// BackpressureError is a slice of hash.Hash that indicates some chunks could
// not be Put(). Caller is free to try to Put them again later.
type BackpressureError hash.HashSlice
func (b BackpressureError) Error() string {

View File

@@ -113,10 +113,10 @@ func (s *DynamoStore) Get(h hash.Hash) Chunk {
return pending
}
ch := make(chan Chunk)
ch := make(chan *Chunk)
s.requestWg.Add(1)
s.readQueue <- GetRequest{h, ch}
return <-ch
s.readQueue <- NewGetRequest(h, ch)
return *(<-ch)
}
func (s *DynamoStore) GetMany(hashes hash.HashSet, foundChunks chan *Chunk) {
@@ -137,7 +137,7 @@ func (s *DynamoStore) Has(h hash.Hash) bool {
ch := make(chan bool)
s.requestWg.Add(1)
s.readQueue <- HasRequest{h, ch}
s.readQueue <- NewHasRequest(h, ch)
return <-ch
}
@@ -197,9 +197,10 @@ func (s *DynamoStore) sendGetRequests(req ReadRequest) {
refs := map[hash.Hash]bool{}
addReq := func(req ReadRequest) {
r := req.Hash()
batch[r] = append(batch[r], req.Outstanding())
refs[r] = true
for h := range req.Hashes() {
batch[h] = append(batch[h], req.Outstanding())
refs[h] = true
}
s.requestWg.Done()
}
addReq(req)
@@ -260,7 +261,7 @@ func (s *DynamoStore) processResponses(responses []map[string]*dynamodb.Attribut
}
c := NewChunkWithHash(r, b)
for _, reqChan := range batch[r] {
reqChan.Satisfy(c)
reqChan.Satisfy(&c)
}
delete(batch, r)
}

View File

@@ -4,41 +4,63 @@
package chunks
import "github.com/attic-labs/noms/go/hash"
import (
"sync"
"github.com/attic-labs/noms/go/hash"
)
type ReadRequest interface {
Hash() hash.Hash
Hashes() hash.HashSet
Outstanding() OutstandingRequest
}
func NewGetRequest(r hash.Hash, ch chan Chunk) GetRequest {
return GetRequest{r, ch}
func NewGetRequest(r hash.Hash, ch chan<- *Chunk) GetRequest {
return GetRequest{hash.HashSet{r: struct{}{}}, ch}
}
type GetRequest struct {
r hash.Hash
ch chan Chunk
hashes hash.HashSet
ch chan<- *Chunk
}
func NewHasRequest(r hash.Hash, ch chan bool) HasRequest {
return HasRequest{r, ch}
func NewGetManyRequest(hashes hash.HashSet, wg *sync.WaitGroup, ch chan<- *Chunk) GetManyRequest {
return GetManyRequest{hashes, wg, ch}
}
type GetManyRequest struct {
hashes hash.HashSet
wg *sync.WaitGroup
ch chan<- *Chunk
}
func NewHasRequest(r hash.Hash, ch chan<- bool) HasRequest {
return HasRequest{hash.HashSet{r: struct{}{}}, ch}
}
type HasRequest struct {
r hash.Hash
ch chan bool
hashes hash.HashSet
ch chan<- bool
}
func (g GetRequest) Hash() hash.Hash {
return g.r
func (g GetRequest) Hashes() hash.HashSet {
return g.hashes
}
func (g GetRequest) Outstanding() OutstandingRequest {
return OutstandingGet(g.ch)
}
func (h HasRequest) Hash() hash.Hash {
return h.r
func (g GetManyRequest) Hashes() hash.HashSet {
return g.hashes
}
func (g GetManyRequest) Outstanding() OutstandingRequest {
return OutstandingGetMany{g.wg, g.ch}
}
func (h HasRequest) Hashes() hash.HashSet {
return h.hashes
}
func (h HasRequest) Outstanding() OutstandingRequest {
@@ -46,24 +68,37 @@ func (h HasRequest) Outstanding() OutstandingRequest {
}
type OutstandingRequest interface {
Satisfy(c Chunk)
Satisfy(c *Chunk)
Fail()
}
type OutstandingGet chan Chunk
type OutstandingHas chan bool
type OutstandingGet chan<- *Chunk
type OutstandingGetMany struct {
wg *sync.WaitGroup
ch chan<- *Chunk
}
type OutstandingHas chan<- bool
func (r OutstandingGet) Satisfy(c Chunk) {
func (r OutstandingGet) Satisfy(c *Chunk) {
r <- c
close(r)
}
func (r OutstandingGet) Fail() {
r <- EmptyChunk
r <- &EmptyChunk
close(r)
}
func (h OutstandingHas) Satisfy(c Chunk) {
func (ogm OutstandingGetMany) Satisfy(c *Chunk) {
ogm.ch <- c
ogm.wg.Done()
}
func (ogm OutstandingGetMany) Fail() {
ogm.wg.Done()
}
func (h OutstandingHas) Satisfy(c *Chunk) {
h <- true
close(h)
}

View File

@@ -5,6 +5,7 @@
package chunks
import (
"sync"
"testing"
"github.com/attic-labs/noms/go/hash"
@@ -28,10 +29,10 @@ func TestGetRequestBatch(t *testing.T) {
}
req0chan := make(chan bool, 1)
req1chan := make(chan Chunk, 1)
req1chan := make(chan *Chunk, 1)
req2chan := make(chan bool, 1)
req3chan := make(chan bool, 1)
req4chan := make(chan Chunk, 1)
req4chan := make(chan *Chunk, 1)
batch := ReadBatch{
r0: []OutstandingRequest{OutstandingHas(req0chan), OutstandingGet(req1chan)},
@@ -42,10 +43,10 @@ func TestGetRequestBatch(t *testing.T) {
for requestedRef, reqs := range batch {
for _, req := range reqs {
if requestedRef == r1 {
req.Satisfy(c1)
req.Satisfy(&c1)
delete(batch, r1)
} else if requestedRef == r2 {
req.Satisfy(c2)
req.Satisfy(&c2)
delete(batch, r2)
}
}
@@ -78,3 +79,45 @@ func TestGetRequestBatch(t *testing.T) {
assert.Equal(0, r0True)
assert.Equal(1, r0False)
}
func TestGetManyRequestBatch(t *testing.T) {
assert := assert.New(t)
h0 := hash.Parse("00000000000000000000000000000000")
c1 := NewChunk([]byte("abc"))
h1 := c1.Hash()
c2 := NewChunk([]byte("123"))
h2 := c2.Hash()
chunks := make(chan *Chunk)
hashes := hash.NewHashSet(h0, h1, h2)
wg := &sync.WaitGroup{}
wg.Add(len(hashes))
go func() { wg.Wait(); close(chunks) }()
req := NewGetManyRequest(hashes, wg, chunks)
batch := ReadBatch{
h0: {req.Outstanding()},
h1: {req.Outstanding()},
h2: {req.Outstanding()},
}
go func() {
for reqHash, reqs := range batch {
for _, req := range reqs {
if reqHash == h1 {
req.Satisfy(&c1)
delete(batch, h1)
} else if reqHash == h2 {
req.Satisfy(&c2)
delete(batch, h2)
}
}
}
batch.Close()
}()
for c := range chunks {
hashes.Remove(c.Hash())
}
assert.Len(hashes, 1)
assert.True(hashes.Has(h0))
}

View File

@@ -138,10 +138,34 @@ func (bhcs *httpBatchStore) Get(h hash.Hash) chunks.Chunk {
return pending
}
ch := make(chan chunks.Chunk)
ch := make(chan *chunks.Chunk)
bhcs.requestWg.Add(1)
bhcs.getQueue <- chunks.NewGetRequest(h, ch)
return <-ch
return *(<-ch)
}
func (bhcs *httpBatchStore) GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) {
cachedChunks := make(chan *chunks.Chunk)
go func() {
bhcs.cacheMu.RLock()
defer bhcs.cacheMu.RUnlock()
defer close(cachedChunks)
bhcs.unwrittenPuts.GetMany(hashes, cachedChunks)
}()
remaining := hash.HashSet{}
for h := range hashes {
remaining.Insert(h)
}
for c := range cachedChunks {
remaining.Remove(c.Hash())
foundChunks <- c
}
wg := &sync.WaitGroup{}
wg.Add(len(remaining))
bhcs.requestWg.Add(1)
bhcs.getQueue <- chunks.NewGetManyRequest(hashes, wg, foundChunks)
wg.Wait()
}
func (bhcs *httpBatchStore) batchGetRequests() {
@@ -199,9 +223,10 @@ func (bhcs *httpBatchStore) sendReadRequests(req chunks.ReadRequest, queue <-cha
count := 0
addReq := func(req chunks.ReadRequest) {
hash := req.Hash()
batch[hash] = append(batch[hash], req.Outstanding())
hashes.Insert(hash)
for h := range req.Hashes() {
batch[h] = append(batch[h], req.Outstanding())
hashes.Insert(h)
}
count++
}
@@ -259,7 +284,7 @@ type readBatchChunkSink struct {
func (rb *readBatchChunkSink) Put(c chunks.Chunk) {
rb.mu.RLock()
for _, or := range (*(rb.batch))[c.Hash()] {
or.Satisfy(c)
or.Satisfy(&c)
}
rb.mu.RUnlock()
@@ -311,7 +336,7 @@ func (bhcs *httpBatchStore) hasRefs(hashes hash.HashSet, batch chunks.ReadBatch)
if scanner.Text() == "true" {
for _, outstanding := range batch[h] {
// This is a little gross, but OutstandingHas.Satisfy() expects a chunk. It ignores it, though, and just sends 'true' over the channel it's holding.
outstanding.Satisfy(chunks.EmptyChunk)
outstanding.Satisfy(&chunks.EmptyChunk)
}
} else {
for _, outstanding := range batch[h] {

View File

@@ -273,6 +273,25 @@ func (suite *HTTPBatchStoreSuite) TestGet() {
suite.Equal(chnx[1].Hash(), got.Hash())
}
func (suite *HTTPBatchStoreSuite) TestGetMany() {
chnx := []chunks.Chunk{
chunks.NewChunk([]byte("abc")),
chunks.NewChunk([]byte("def")),
}
notPresent := chunks.NewChunk([]byte("ghi")).Hash()
suite.NoError(suite.cs.PutMany(chnx))
hashes := hash.NewHashSet(chnx[0].Hash(), chnx[1].Hash(), notPresent)
foundChunks := make(chan *chunks.Chunk)
go func() { suite.store.GetMany(hashes, foundChunks); close(foundChunks) }()
for c := range foundChunks {
hashes.Remove(c.Hash())
}
suite.Len(hashes, 1)
suite.True(hashes.Has(notPresent))
}
func (suite *HTTPBatchStoreSuite) TestGetSame() {
chnx := []chunks.Chunk{
chunks.NewChunk([]byte("def")),

View File

@@ -114,6 +114,14 @@ func (h Hash) Greater(other Hash) bool {
// HashSet is a set of Hashes.
type HashSet map[Hash]struct{}
func NewHashSet(hashes ...Hash) HashSet {
out := HashSet{}
for _, h := range hashes {
out.Insert(h)
}
return out
}
// Insert adds a Hash to the set.
func (hs HashSet) Insert(hash Hash) {
hs[hash] = struct{}{}

View File

@@ -14,12 +14,7 @@ import (
"github.com/attic-labs/testify/assert"
)
type blockStore interface {
types.BatchStore
GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk)
}
type storeOpenFn func() blockStore
type storeOpenFn func() types.BatchStore
func benchmarkNovelWrite(refreshStore storeOpenFn, src *dataSource, t assert.TestingT) bool {
store := refreshStore()
@@ -28,7 +23,7 @@ func benchmarkNovelWrite(refreshStore storeOpenFn, src *dataSource, t assert.Tes
return true
}
func writeToEmptyStore(store blockStore, src *dataSource, t assert.TestingT) {
func writeToEmptyStore(store types.BatchStore, src *dataSource, t assert.TestingT) {
root := store.Root()
assert.Equal(t, hash.Hash{}, root)

View File

@@ -20,7 +20,7 @@ type fileBlockStore struct {
w io.WriteCloser
}
func newFileBlockStore(w io.WriteCloser) blockStore {
func newFileBlockStore(w io.WriteCloser) types.BatchStore {
return fileBlockStore{bufio.NewWriterSize(w, humanize.MiByte), w}
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/nbs"
"github.com/attic-labs/noms/go/types"
"github.com/attic-labs/noms/go/util/profile"
"github.com/attic-labs/testify/assert"
"github.com/aws/aws-sdk-go/aws"
@@ -73,19 +74,19 @@ func main() {
open := newNullBlockStore
wrote := false
var writeDB func()
var refresh func() blockStore
var refresh func() types.BatchStore
if *toNBS != "" || *toFile != "" || *toAWS != "" {
var reset func()
if *toNBS != "" {
dir := makeTempDir(*toNBS, pb)
defer os.RemoveAll(dir)
open = func() blockStore { return nbs.NewLocalStore(dir, bufSize) }
open = func() types.BatchStore { return nbs.NewLocalStore(dir, bufSize) }
reset = func() { os.RemoveAll(dir); os.MkdirAll(dir, 0777) }
} else if *toFile != "" {
dir := makeTempDir(*toFile, pb)
defer os.RemoveAll(dir)
open = func() blockStore {
open = func() types.BatchStore {
f, err := ioutil.TempFile(dir, "")
d.Chk.NoError(err)
return newFileBlockStore(f)
@@ -94,7 +95,7 @@ func main() {
} else if *toAWS != "" {
sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("us-west-2")))
open = func() blockStore {
open = func() types.BatchStore {
return nbs.NewAWSStore(dynamoTable, *toAWS, s3Bucket, sess, bufSize)
}
reset = func() {
@@ -110,21 +111,21 @@ func main() {
}
writeDB = func() { wrote = ensureNovelWrite(wrote, open, src, pb) }
refresh = func() blockStore {
refresh = func() types.BatchStore {
reset()
return open()
}
} else {
if *useNBS != "" {
open = func() blockStore { return nbs.NewLocalStore(*useNBS, bufSize) }
open = func() types.BatchStore { return nbs.NewLocalStore(*useNBS, bufSize) }
} else if *useAWS != "" {
sess := session.Must(session.NewSession(aws.NewConfig().WithRegion("us-west-2")))
open = func() blockStore {
open = func() types.BatchStore {
return nbs.NewAWSStore(dynamoTable, *useAWS, s3Bucket, sess, bufSize)
}
}
writeDB = func() {}
refresh = func() blockStore { panic("WriteNovel unsupported with --useLDB and --useNBS") }
refresh = func() types.BatchStore { panic("WriteNovel unsupported with --useLDB and --useNBS") }
}
benchmarks := []struct {

View File

@@ -14,7 +14,7 @@ type nullBlockStore struct {
bogus int32
}
func newNullBlockStore() blockStore {
func newNullBlockStore() types.BatchStore {
return nullBlockStore{}
}

View File

@@ -47,6 +47,13 @@ func (nbc *NomsBlockCache) Get(hash hash.Hash) chunks.Chunk {
return nbc.chunks.Get(hash)
}
// GetMany gets the Chunks with |hashes| from the store. On return,
// |foundChunks| will have been fully sent all chunks which have been
// found. Any non-present chunks will silently be ignored.
func (nbc *NomsBlockCache) GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) {
nbc.chunks.GetMany(hashes, foundChunks)
}
// ExtractChunks writes the entire contents of the cache to chunkChan. The
// chunks are extracted in insertion order.
func (nbc *NomsBlockCache) ExtractChunks(order EnumerationOrder, chunkChan chan *chunks.Chunk) {

View File

@@ -20,6 +20,11 @@ type BatchStore interface {
// from the store, chunks.EmptyChunk is returned.
Get(h hash.Hash) chunks.Chunk
// GetMany gets the Chunks with |hashes| from the store. On return,
// |foundChunks| will have been fully sent all chunks which have been
// found. Any non-present chunks will silently be ignored.
GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk)
// SchedulePut enqueues a write for the Chunk c with the given refHeight.
// c must be visible to subsequent Get() calls upon return. Typically, the
// Value which was encoded to provide c can also be queried for its
@@ -63,6 +68,12 @@ func (bsa *BatchStoreAdaptor) Get(h hash.Hash) chunks.Chunk {
return bsa.cs.Get(h)
}
// GetMany simply proxies to the backing ChunkStore
func (bsa *BatchStoreAdaptor) GetMany(hashes hash.HashSet, foundChunks chan *chunks.Chunk) {
bsa.once.Do(bsa.expectVersion)
bsa.cs.GetMany(hashes, foundChunks)
}
// SchedulePut simply calls Put on the underlying ChunkStore, and ignores hints.
func (bsa *BatchStoreAdaptor) SchedulePut(c chunks.Chunk, refHeight uint64, hints Hints) {
bsa.once.Do(bsa.expectVersion)

View File

@@ -18,6 +18,7 @@ import (
// package that implements Value reading.
type ValueReader interface {
ReadValue(h hash.Hash) Value
ReadManyValues(hashes hash.HashSet, foundValues chan<- Value)
}
// ValueWriter is an interface that knows how to write Noms Values, e.g.
@@ -149,6 +150,58 @@ func (lvs *ValueStore) setHintsForReadValue(v Value, h hash.Hash) {
}
}
// ReadManyValues reads and decodes Values indicated by |hashes| from lvs. On
// return, |foundValues| will have been fully sent all Values which have been
// found. Any non-present Values will silently be ignored.
func (lvs *ValueStore) ReadManyValues(hashes hash.HashSet, foundValues chan<- Value) {
decode := func(h hash.Hash, chunk *chunks.Chunk) Value {
v := DecodeValue(*chunk, lvs)
lvs.valueCache.Add(h, uint64(len(chunk.Data())), v)
lvs.setHintsForReadValue(v, h)
return v
}
// First, see which hashes can be found in either the Value cache or pendingPuts. Put the rest into a new HashSet to be requested en masse from the BatchStore.
remaining := hash.HashSet{}
for h := range hashes {
if v, ok := lvs.valueCache.Get(h); ok {
if v != nil {
foundValues <- v.(Value)
}
continue
}
chunk := func() chunks.Chunk {
lvs.pendingMu.RLock()
defer lvs.pendingMu.RUnlock()
if pc, ok := lvs.pendingPuts[h]; ok {
return pc.c
}
return chunks.EmptyChunk
}()
if !chunk.IsEmpty() {
foundValues <- decode(h, &chunk)
continue
}
remaining.Insert(h)
}
// Request remaining hashes from BatchStore, processing the found chunks as they come in.
foundChunks := make(chan *chunks.Chunk, 16)
go func() { lvs.bs.GetMany(remaining, foundChunks); close(foundChunks) }()
for c := range foundChunks {
h := c.Hash()
foundValues <- decode(h, c)
remaining.Remove(h)
}
// Any remaining hashes weren't found in the BatchStore should be recorded as not present.
for h := range remaining {
lvs.valueCache.Add(h, 0, nil)
}
}
// WriteValue takes a Value, schedules it to be written it to lvs, and returns
// an appropriately-typed types.Ref. v is not guaranteed to be actually
// written until after Flush().

View File

@@ -19,10 +19,47 @@ func TestValueReadWriteRead(t *testing.T) {
vs := NewTestValueStore()
assert.Nil(vs.ReadValue(s.Hash())) // nil
r := vs.WriteValue(s)
vs.Flush()
v := vs.ReadValue(r.TargetHash()) // non-nil
assert.True(s.Equals(v))
}
func TestValueReadMany(t *testing.T) {
assert := assert.New(t)
vals := ValueSlice{String("hello"), Bool(true), Number(42)}
vs := NewTestValueStore()
hashes := hash.HashSet{}
for _, v := range vals {
hashes.Insert(vs.WriteValue(v).TargetHash())
}
vs.Flush()
// Get one Value into vs's Value cache
vs.ReadValue(vals[0].Hash())
// Get one Value into vs's pendingPuts
three := Number(3)
vals = append(vals, three)
vs.WriteValue(three)
hashes.Insert(three.Hash())
// Add one Value to request that's not in vs
hashes.Insert(Bool(false).Hash())
found := map[hash.Hash]Value{}
foundValues := make(chan Value, len(vals))
go func() { vs.ReadManyValues(hashes, foundValues); close(foundValues) }()
for v := range foundValues {
found[v.Hash()] = v
}
assert.Len(found, len(vals))
for _, v := range vals {
assert.True(v.Equals(found[v.Hash()]))
}
}
func TestCheckChunksInCache(t *testing.T) {
assert := assert.New(t)
cs := chunks.NewTestStore()