Add concurrency to use of ValidatingBatchingSink (#2684)

There are two places where ValidatingBatchingSink could be more
concurrent: Prepare(), where it's reading in hints, and Enqueue().

Making Prepare() handle many hints concurrently is easy because the
hints don't depend on one another, so that method now just spins up
a number of goroutines and runs them all at once.

Enqueue() is more complex, because while Chunk decoding and validation
of its hash can proceed concurrently, validating that a given Chunk is
'ref-complete' requires that the chunks in the writeValue payload all
be processed in order. So, this patch uses orderedparallel to run the
new Decode() method on chunks in parallel, but then return to serial
operation before calling the modified Enqueue() method.

Fixes #1935
This commit is contained in:
cmasone-attic
2016-10-10 15:33:35 -07:00
committed by GitHub
parent 23f94bdf8c
commit c9c1bb9ff5
8 changed files with 166 additions and 32 deletions
+1
View File
@@ -37,6 +37,7 @@ func setupServeFlags() *flag.FlagSet {
serveFlagSet.IntVar(&port, "port", 8000, "port to listen on for HTTP requests")
spec.RegisterDatabaseFlags(serveFlagSet)
verbose.RegisterVerboseFlags(serveFlagSet)
profile.RegisterProfileFlags(serveFlagSet)
return serveFlagSet
}
+13 -7
View File
@@ -46,7 +46,9 @@ func Serialize(chunk Chunk, writer io.Writer) {
d.PanicIfFalse(uint32(n) == chunkSize)
}
// Deserialize reads off of |reader| until EOF, sending chunks to |cs|. If |rateLimit| is non-nill, concurrency will be limited to the available capacity of the channel.
// Deserialize reads off of |reader| until EOF, sending chunks to |cs|. If
// |rateLimit| is non-nil, concurrency will be limited to the available
// capacity of the channel.
func Deserialize(reader io.Reader, cs ChunkSink, rateLimit chan struct{}) {
wg := sync.WaitGroup{}
@@ -72,8 +74,12 @@ func Deserialize(reader io.Reader, cs ChunkSink, rateLimit chan struct{}) {
wg.Wait()
}
// DeserializeToChan reads off of |reader| until EOF, sending chunks to chunkChan in the order they are read.
func DeserializeToChan(reader io.Reader, chunkChan chan<- *Chunk) {
// DeserializeToChan reads off of |reader| until EOF, sending chunks to
// chunkChan in the order they are read. Objects sent over chunkChan are
// *Chunk.
// The type is `chan<- interface{}` so that this is compatible with
// orderedparallel.New().
func DeserializeToChan(reader io.Reader, chunkChan chan<- interface{}) {
for {
c, success := deserializeChunk(reader)
if !success {
@@ -98,11 +104,11 @@ func deserializeChunk(reader io.Reader) (Chunk, bool) {
err = binary.Read(reader, binary.BigEndian, &chunkSize)
d.Chk.NoError(err)
w := NewChunkWriter()
n2, err := io.CopyN(w, reader, int64(chunkSize))
data := make([]byte, int(chunkSize))
n, err = io.ReadFull(reader, data)
d.Chk.NoError(err)
d.PanicIfFalse(int64(chunkSize) == n2)
c := w.Chunk()
d.PanicIfFalse(int(chunkSize) == n)
c := NewChunk(data)
d.PanicIfFalse(h == c.Hash(), "%s != %s", h, c.Hash())
return c, true
}
+3 -2
View File
@@ -28,7 +28,7 @@ func newLocalBatchStore(cs chunks.ChunkStore) *localBatchStore {
return &localBatchStore{
cs: cs,
unwrittenPuts: newOrderedChunkCache(),
vbs: types.NewValidatingBatchingSink(cs, types.NewTypeCache()),
vbs: types.NewValidatingBatchingSink(cs),
hints: types.Hints{},
hashes: hash.HashSet{},
mu: &sync.Mutex{},
@@ -101,7 +101,8 @@ func (lbs *localBatchStore) Flush() {
var bpe chunks.BackpressureError
for c := range chunkChan {
if bpe == nil {
bpe = lbs.vbs.Enqueue(*c)
dc := lbs.vbs.DecodeUnqueued(c)
bpe = lbs.vbs.Enqueue(*dc.Chunk, *dc.Value)
} else {
bpe = append(bpe, c.Hash())
}
+14 -6
View File
@@ -18,6 +18,7 @@ import (
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/noms/go/types"
"github.com/attic-labs/noms/go/util/orderedparallel"
"github.com/golang/snappy"
)
@@ -88,18 +89,25 @@ func handleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs
io.Copy(ioutil.Discard, reader)
reader.Close()
}()
tc := types.NewTypeCache()
vbs := types.NewValidatingBatchingSink(cs, tc)
vbs := types.NewValidatingBatchingSink(cs)
vbs.Prepare(deserializeHints(reader))
chunkChan := make(chan *chunks.Chunk, 16)
chunkChan := make(chan interface{}, 16)
go chunks.DeserializeToChan(reader, chunkChan)
decoded := orderedparallel.New(
chunkChan,
func(c interface{}) interface{} {
return vbs.DecodeUnqueued(c.(*chunks.Chunk))
},
16)
var bpe chunks.BackpressureError
for c := range chunkChan {
for dci := range decoded {
dc := dci.(types.DecodedChunk)
if bpe == nil {
bpe = vbs.Enqueue(*c)
bpe = vbs.Enqueue(*dc.Chunk, *dc.Value)
} else {
bpe = append(bpe, c.Hash())
bpe = append(bpe, dc.Chunk.Hash())
}
// If a previous Enqueue() errored, we still need to drain chunkChan
// TODO: what about having DeserializeToChan take a 'done' channel to stop it?
+4 -4
View File
@@ -117,10 +117,10 @@ func TestBuildWriteValueRequest(t *testing.T) {
}
assert.Equal(len(hints), count)
outChunkChan := make(chan *chunks.Chunk, 16)
outChunkChan := make(chan interface{}, 16)
go chunks.DeserializeToChan(gr, outChunkChan)
for c := range outChunkChan {
assert.Equal(chnx[0].Hash(), c.Hash())
assert.Equal(chnx[0].Hash(), c.(*chunks.Chunk).Hash())
chnx = chnx[1:]
}
assert.Empty(chnx)
@@ -182,10 +182,10 @@ func TestHandleGetRefs(t *testing.T) {
)
if assert.Equal(http.StatusOK, w.Code, "Handler error:\n%s", string(w.Body.Bytes())) {
chunkChan := make(chan *chunks.Chunk)
chunkChan := make(chan interface{})
go chunks.DeserializeToChan(w.Body, chunkChan)
for c := range chunkChan {
assert.Equal(chnx[0].Hash(), c.Hash())
assert.Equal(chnx[0].Hash(), c.(*chunks.Chunk).Hash())
chnx = chnx[1:]
}
assert.Empty(chnx)
+51 -9
View File
@@ -5,8 +5,11 @@
package types
import (
"sync"
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/hash"
)
const batchSize = 100
@@ -16,28 +19,65 @@ type ValidatingBatchingSink struct {
cs chunks.ChunkStore
batch [batchSize]chunks.Chunk
count int
tc *TypeCache
pool sync.Pool
}
func NewValidatingBatchingSink(cs chunks.ChunkStore, tc *TypeCache) *ValidatingBatchingSink {
return &ValidatingBatchingSink{vs: newLocalValueStore(cs), cs: cs, tc: tc}
func NewValidatingBatchingSink(cs chunks.ChunkStore) *ValidatingBatchingSink {
return &ValidatingBatchingSink{
vs: newLocalValueStore(cs),
cs: cs,
pool: sync.Pool{New: func() interface{} { return NewTypeCache() }},
}
}
// Prepare primes the type info cache used to validate Enqueued Chunks by reading the Chunks referenced by the provided hints.
func (vbs *ValidatingBatchingSink) Prepare(hints Hints) {
rl := make(chan struct{}, batchSize)
wg := sync.WaitGroup{}
for hint := range hints {
vbs.vs.ReadValue(hint)
wg.Add(1)
rl <- struct{}{}
go func(hint hash.Hash) {
vbs.vs.ReadValue(hint)
<-rl
wg.Done()
}(hint)
}
wg.Wait()
close(rl)
}
// Enequeue adds a Chunk to the queue of Chunks waiting to be Put into vbs' backing ChunkStore. The instance keeps an internal buffer of Chunks, spilling to the ChunkStore when the buffer is full. If an attempt to Put Chunks fails, this method returns the BackpressureError from the underlying ChunkStore.
func (vbs *ValidatingBatchingSink) Enqueue(c chunks.Chunk) chunks.BackpressureError {
// DecodedChunk holds a pointer to a Chunk and the Value that results from
// calling DecodeFromBytes(c.Data()).
type DecodedChunk struct {
Chunk *chunks.Chunk
Value *Value
}
// DecodeUnqueued decodes c and checks that the hash of the resulting value
// matches c.Hash(). It returns a DecodedChunk holding both c and a pointer to
// the decoded Value. However, if c has already been Enqueued, DecodeUnqueued
// returns an empty DecodedChunk.
func (vbs *ValidatingBatchingSink) DecodeUnqueued(c *chunks.Chunk) DecodedChunk {
h := c.Hash()
if vbs.vs.isPresent(h) {
return nil
return DecodedChunk{}
}
v := DecodeFromBytes(c.Data(), vbs.vs, vbs.tc)
tc := vbs.pool.Get()
defer vbs.pool.Put(tc)
v := DecodeFromBytes(c.Data(), vbs.vs, tc.(*TypeCache))
d.PanicIfTrue(getHash(v) != h, "Invalid hash found")
return DecodedChunk{c, &v}
}
// Enequeue adds c to the queue of Chunks waiting to be Put into vbs' backing
// ChunkStore. It is assumed that v is the Value decoded from c, and so v can
// be used to validate the ref-completeness of c. The instance keeps an
// internal buffer of Chunks, spilling to the ChunkStore when the buffer is
// full. If an attempt to Put Chunks fails, this method returns the
// BackpressureError from the underlying ChunkStore.
func (vbs *ValidatingBatchingSink) Enqueue(c chunks.Chunk, v Value) chunks.BackpressureError {
h := c.Hash()
vbs.vs.ensureChunksInCache(v)
vbs.vs.set(h, hintedChunk{v.Type(), h})
@@ -49,7 +89,9 @@ func (vbs *ValidatingBatchingSink) Enqueue(c chunks.Chunk) chunks.BackpressureEr
return nil
}
// Flush Puts any Chunks buffered by Enqueue calls into the backing ChunkStore. If the attempt to Put fails, this method returns the BackpressureError returned by the underlying ChunkStore.
// Flush Puts any Chunks buffered by Enqueue calls into the backing
// ChunkStore. If the attempt to Put fails, this method returns the
// BackpressureError returned by the underlying ChunkStore.
func (vbs *ValidatingBatchingSink) Flush() (err chunks.BackpressureError) {
err = vbs.cs.PutMany(vbs.batch[:vbs.count])
vbs.count = 0
+76
View File
@@ -0,0 +1,76 @@
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package types
import (
"testing"
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/testify/assert"
)
func TestValidatingBatchingSinkPrepare(t *testing.T) {
cs := chunks.NewTestStore()
hints := Hints{}
chnx := []chunks.Chunk{
EncodeValue(Number(42), nil),
EncodeValue(Number(-7), nil),
EncodeValue(String("oy"), nil),
EncodeValue(Bool(true), nil),
EncodeValue(NewBlob(), nil),
}
for _, c := range chnx {
cs.Put(c)
hints[c.Hash()] = struct{}{}
}
vbs := NewValidatingBatchingSink(cs)
vbs.Prepare(hints)
assert.Equal(t, 5, cs.Reads)
}
func TestValidatingBatchingSinkDecode(t *testing.T) {
v := Number(42)
c := EncodeValue(v, nil)
vbs := NewValidatingBatchingSink(chunks.NewTestStore())
dc := vbs.DecodeUnqueued(&c)
assert.True(t, v.Equals(*dc.Value))
}
func TestValidatingBatchingSinkDecodeAlreadyEnqueued(t *testing.T) {
v := Number(42)
c := EncodeValue(v, nil)
vbs := NewValidatingBatchingSink(chunks.NewTestStore())
assert.NoError(t, vbs.Enqueue(c, v))
dc := vbs.DecodeUnqueued(&c)
assert.Nil(t, dc.Chunk)
assert.Nil(t, dc.Value)
}
func TestValidatingBatchingSinkEnqueueAndFlush(t *testing.T) {
v := Number(42)
c := EncodeValue(v, nil)
cs := chunks.NewTestStore()
vbs := NewValidatingBatchingSink(cs)
assert.NoError(t, vbs.Enqueue(c, v))
assert.NoError(t, vbs.Flush())
assert.Equal(t, 1, cs.Writes)
}
func TestValidatingBatchingSinkEnqueueImplicitFlush(t *testing.T) {
cs := chunks.NewTestStore()
vbs := NewValidatingBatchingSink(cs)
for i := 0; i <= batchSize; i++ {
v := Number(i)
assert.NoError(t, vbs.Enqueue(EncodeValue(v, nil), v))
}
assert.Equal(t, batchSize, cs.Writes)
assert.NoError(t, vbs.Flush())
assert.Equal(t, 1, cs.Writes-batchSize)
}
+4 -4
View File
@@ -38,7 +38,7 @@ type ValueReadWriter interface {
type ValueStore struct {
bs BatchStore
cache map[hash.Hash]chunkCacheEntry
mu *sync.Mutex
mu sync.RWMutex
valueCache *sizecache.SizeCache
opcStore opCacheStore
once sync.Once
@@ -67,7 +67,7 @@ func NewValueStore(bs BatchStore) *ValueStore {
}
func NewValueStoreWithCache(bs BatchStore, cacheSize uint64) *ValueStore {
return &ValueStore{bs, map[hash.Hash]chunkCacheEntry{}, &sync.Mutex{}, sizecache.New(cacheSize), nil, sync.Once{}}
return &ValueStore{bs, map[hash.Hash]chunkCacheEntry{}, sync.RWMutex{}, sizecache.New(cacheSize), nil, sync.Once{}}
}
func (lvs *ValueStore) BatchStore() BatchStore {
@@ -154,8 +154,8 @@ func (lvs *ValueStore) isPresent(r hash.Hash) (present bool) {
}
func (lvs *ValueStore) check(r hash.Hash) chunkCacheEntry {
lvs.mu.Lock()
defer lvs.mu.Unlock()
lvs.mu.RLock()
defer lvs.mu.RUnlock()
return lvs.cache[r]
}