httpHintedChunkStore uses one big batch

A novel chunk may contain references to any other novel chunk, as long
as there are no cycles. This means that breaking up the stream of
novel chunks being written to the server into batches risks creating
races -- chunks in one batch might reference chunks in another,
meaning that the server would somehow need to be able to
cross-reference batches. This seems super hard, so we've just forced
the code to write in one massive batch upon Commit(). We'll evaluate
the performance of this solution and see what we need to change.

Also, there's a terrible hack in HandleWriteValue to make it so that
pulls can work by back-channeling all their chunks via postRefs/ and
then writing the final Commit object via writeValue/
This can be fixed once we fix issue 822
This commit is contained in:
Chris Masone
2016-04-12 09:50:53 -07:00
parent d8a2d285e9
commit 7db243745b
10 changed files with 202 additions and 191 deletions

View File

@@ -85,7 +85,7 @@ func Deserialize(reader io.Reader, cs ChunkSink, rateLimit chan struct{}) {
wg := sync.WaitGroup{}
for {
c := deserializeWorker(reader)
c := deserializeChunk(reader)
if c.IsEmpty() {
break
}
@@ -109,7 +109,7 @@ func Deserialize(reader io.Reader, cs ChunkSink, rateLimit chan struct{}) {
// DeserializeToChan reads off of |reader| until EOF, sending chunks to chunkChan in the order they are read.
func DeserializeToChan(reader io.Reader, chunkChan chan<- Chunk) {
for {
c := deserializeWorker(reader)
c := deserializeChunk(reader)
if c.IsEmpty() {
break
}
@@ -118,7 +118,7 @@ func DeserializeToChan(reader io.Reader, chunkChan chan<- Chunk) {
close(chunkChan)
}
func deserializeWorker(reader io.Reader) Chunk {
func deserializeChunk(reader io.Reader) Chunk {
digest := ref.Sha1Digest{}
n, err := io.ReadFull(reader, digest[:])
if err == io.EOF {

View File

@@ -25,7 +25,7 @@ const (
dynamoWriteUnitSize = 1024 // 1K
readBufferSize = 1 << 12 // 4k
writeBufferSize = 1 << 12 // 4k
writeBufferSize = dynamoMaxPutCount
dynamoTableName = "noms"
refAttr = "ref"

View File

@@ -4,6 +4,7 @@ import (
"io"
"io/ioutil"
"github.com/attic-labs/noms/d"
"github.com/attic-labs/noms/ref"
)
@@ -47,24 +48,17 @@ func (rts ReadThroughStore) Put(c Chunk) {
}
func (rts ReadThroughStore) PutMany(chunks []Chunk) BackpressureError {
bpe1 := rts.backingStore.PutMany(chunks)
bpe2 := rts.cachingStore.PutMany(chunks)
if bpe1 == nil {
return bpe2
} else if bpe2 == nil {
return bpe1
}
// Neither is nil
lookup := make(map[ref.Ref]bool, len(bpe1))
for _, c := range bpe1 {
bpe := rts.backingStore.PutMany(chunks)
lookup := make(map[ref.Ref]bool, len(bpe))
for _, c := range bpe {
lookup[c.Ref()] = true
}
for _, c := range bpe2 {
if !lookup[c.Ref()] {
bpe1 = append(bpe1, c)
}
toPut := make([]Chunk, 0, len(chunks)-len(bpe))
for _, c := range chunks {
toPut = append(toPut, c)
}
return bpe1
d.Chk.NoError(rts.cachingStore.PutMany(toPut))
return bpe
}
func (rts ReadThroughStore) Root() ref.Ref {

View File

@@ -28,12 +28,12 @@ var (
hostFlag = flag.String("host", "localhost:0", "Host to listen on")
)
type dataStoreRecord struct {
ds datas.DataStore
type chunkStoreRecord struct {
cs chunks.ChunkStore
alias string
}
type dataStoreRecords map[string]dataStoreRecord
type chunkStoreRecords map[string]chunkStoreRecord
func main() {
usage := func() {
@@ -59,12 +59,8 @@ func main() {
}
prefix := dsPathPrefix + "/:store"
router.GET(prefix+constants.RefPath+":ref", routeToStore(stores, datas.HandleRef))
router.OPTIONS(prefix+constants.RefPath+":ref", routeToStore(stores, datas.HandleRef))
router.POST(prefix+constants.PostRefsPath, routeToStore(stores, datas.HandlePostRefs))
router.OPTIONS(prefix+constants.PostRefsPath, routeToStore(stores, datas.HandlePostRefs))
router.POST(prefix+constants.GetHasPath, routeToStore(stores, datas.HandleGetHasRefs))
router.OPTIONS(prefix+constants.GetHasPath, routeToStore(stores, datas.HandleGetHasRefs))
router.POST(prefix+constants.GetRefsPath, routeToStore(stores, datas.HandleGetRefs))
router.OPTIONS(prefix+constants.GetRefsPath, routeToStore(stores, datas.HandleGetRefs))
router.GET(prefix+constants.RootPath, routeToStore(stores, datas.HandleRootGet))
@@ -89,9 +85,9 @@ func main() {
log.Fatal(srv.Serve(l))
}
func constructQueryString(args []string) (url.Values, dataStoreRecords) {
func constructQueryString(args []string) (url.Values, chunkStoreRecords) {
qsValues := url.Values{}
stores := dataStoreRecords{}
stores := chunkStoreRecords{}
for _, arg := range args {
k, v, ok := split2(arg, "=")
@@ -107,7 +103,7 @@ func constructQueryString(args []string) (url.Values, dataStoreRecords) {
_, path, _ := split2(v, ":")
record, ok := stores[path]
if !ok {
record.ds = datas.NewDataStore(chunks.NewLevelDBStore(path, "", 24, false))
record.cs = chunks.NewLevelDBStore(path, "", 24, false)
// Identify the stores with a (abridged) hash of the file system path,
// so that the same URL always refers to the same database.
hash := sha1.Sum([]byte(path))
@@ -123,12 +119,12 @@ func constructQueryString(args []string) (url.Values, dataStoreRecords) {
return qsValues, stores
}
func routeToStore(stores dataStoreRecords, handler datas.Handler) httprouter.Handle {
func routeToStore(stores chunkStoreRecords, handler datas.Handler) httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
store := params.ByName("store")
for _, record := range stores {
if record.alias == store {
handler(w, r, params, record.ds)
handler(w, r, params, record.cs)
return
}
}

View File

@@ -1,17 +1,27 @@
package datas
import (
"sync"
"github.com/attic-labs/noms/d"
"github.com/attic-labs/noms/ref"
"github.com/attic-labs/noms/types"
)
type cachingValueStore struct {
*chunkTypeCache
hcs hintedChunkStore
hcs hintedChunkStore
cache map[ref.Ref]chunkCacheEntry
mu *sync.Mutex
}
type chunkCacheEntry interface {
Present() bool
Hint() ref.Ref
Type() types.Type
}
func newCachingValueStore(hcs hintedChunkStore) cachingValueStore {
return cachingValueStore{newChunkTypeCache(), hcs}
return cachingValueStore{hcs, map[ref.Ref]chunkCacheEntry{}, &sync.Mutex{}}
}
// WriteValue takes a Value, schedules it to be written it to cvs, and returns v.Ref(). v is not guaranteed to be actually written until after a successful Commit().
@@ -50,3 +60,99 @@ func (cvs *cachingValueStore) ReadValue(r ref.Ref) types.Value {
cvs.checkAndSet(r, entry)
return v
}
func (cvs *cachingValueStore) isPresent(r ref.Ref) (present bool) {
if entry := cvs.check(r); entry != nil && entry.Present() {
present = true
}
return
}
func (cvs *cachingValueStore) check(r ref.Ref) chunkCacheEntry {
cvs.mu.Lock()
defer cvs.mu.Unlock()
return cvs.cache[r]
}
func (cvs *cachingValueStore) set(r ref.Ref, entry chunkCacheEntry) {
cvs.mu.Lock()
defer cvs.mu.Unlock()
cvs.cache[r] = entry
}
func (cvs *cachingValueStore) checkAndSet(r ref.Ref, entry chunkCacheEntry) {
if cur := cvs.check(r); cur == nil || cur.Hint().IsEmpty() {
cvs.set(r, entry)
}
}
func (cvs *cachingValueStore) checkChunksInCache(v types.Value) map[ref.Ref]struct{} {
hints := map[ref.Ref]struct{}{}
for _, reachable := range v.Chunks() {
entry := cvs.check(reachable.TargetRef())
d.Exp.True(entry != nil && entry.Present(), "Value to write -- Type %s -- contains ref %s, which points to a non-existent Value.", v.Type().Describe(), reachable.TargetRef())
if hint := entry.Hint(); !hint.IsEmpty() {
hints[hint] = struct{}{}
}
// BUG 1121
// It's possible that entry.Type() will be simply 'Value', but that 'reachable' is actually a properly-typed object -- that is, a Ref to some specific Type. The Exp below would fail, though it's possible that the Type is actually correct. We wouldn't be able to verify without reading it, though, so we'll dig into this later.
targetType := getTargetType(reachable)
if targetType.Equals(types.MakePrimitiveType(types.ValueKind)) {
continue
}
d.Exp.True(entry.Type().Equals(targetType), "Value to write contains ref %s, which points to a value of a different type: %+v != %+v", reachable.TargetRef(), entry.Type(), targetType)
}
return hints
}
func getTargetType(refBase types.RefBase) types.Type {
refType := refBase.Type()
d.Chk.Equal(types.RefKind, refType.Kind())
return refType.Desc.(types.CompoundDesc).ElemTypes[0]
}
type presentChunk types.Type
func (p presentChunk) Present() bool {
return true
}
func (p presentChunk) Hint() (r ref.Ref) {
return
}
func (p presentChunk) Type() types.Type {
return types.Type(p)
}
type hintedChunk struct {
t types.Type
hint ref.Ref
}
func (h hintedChunk) Present() bool {
return true
}
func (h hintedChunk) Hint() (r ref.Ref) {
return h.hint
}
func (h hintedChunk) Type() types.Type {
return h.t
}
type absentChunk struct{}
func (a absentChunk) Present() bool {
return false
}
func (a absentChunk) Hint() (r ref.Ref) {
return
}
func (a absentChunk) Type() types.Type {
panic("Not reached. Should never call Type() on an absentChunk.")
}

View File

@@ -1,120 +0,0 @@
package datas
import (
"sync"
"github.com/attic-labs/noms/d"
"github.com/attic-labs/noms/ref"
"github.com/attic-labs/noms/types"
)
type chunkTypeCache struct {
cache map[ref.Ref]chunkCacheEntry
mu *sync.Mutex
}
type chunkCacheEntry interface {
Present() bool
Hint() ref.Ref
Type() types.Type
}
func newChunkTypeCache() *chunkTypeCache {
return &chunkTypeCache{map[ref.Ref]chunkCacheEntry{}, &sync.Mutex{}}
}
func (c *chunkTypeCache) isPresent(r ref.Ref) (present bool) {
if entry := c.check(r); entry != nil && entry.Present() {
present = true
}
return
}
func (c *chunkTypeCache) check(r ref.Ref) chunkCacheEntry {
c.mu.Lock()
defer c.mu.Unlock()
return c.cache[r]
}
func (c *chunkTypeCache) set(r ref.Ref, entry chunkCacheEntry) {
c.mu.Lock()
defer c.mu.Unlock()
c.cache[r] = entry
}
func (c *chunkTypeCache) checkAndSet(r ref.Ref, entry chunkCacheEntry) {
if cur := c.check(r); cur == nil || cur.Hint().IsEmpty() {
c.set(r, entry)
}
}
func (c *chunkTypeCache) checkChunksInCache(v types.Value) map[ref.Ref]struct{} {
hints := map[ref.Ref]struct{}{}
for _, reachable := range v.Chunks() {
entry := c.check(reachable.TargetRef())
d.Exp.True(entry != nil && entry.Present(), "Value to write -- Type %s -- contains ref %s, which points to a non-existent Value.", v.Type().Describe(), reachable.TargetRef())
if hint := entry.Hint(); !hint.IsEmpty() {
hints[hint] = struct{}{}
}
// BUG 1121
// It's possible that entry.Type() will be simply 'Value', but that 'reachable' is actually a properly-typed object -- that is, a Ref to some specific Type. The Exp below would fail, though it's possible that the Type is actually correct. We wouldn't be able to verify without reading it, though, so we'll dig into this later.
targetType := getTargetType(reachable)
if targetType.Equals(types.MakePrimitiveType(types.ValueKind)) {
continue
}
d.Exp.True(entry.Type().Equals(targetType), "Value to write contains ref %s, which points to a value of a different type: %+v != %+v", reachable.TargetRef(), entry.Type(), targetType)
}
return hints
}
func getTargetType(refBase types.RefBase) types.Type {
refType := refBase.Type()
d.Chk.Equal(types.RefKind, refType.Kind())
return refType.Desc.(types.CompoundDesc).ElemTypes[0]
}
type presentChunk types.Type
func (p presentChunk) Present() bool {
return true
}
func (p presentChunk) Hint() (r ref.Ref) {
return
}
func (p presentChunk) Type() types.Type {
return types.Type(p)
}
type hintedChunk struct {
t types.Type
hint ref.Ref
}
func (h hintedChunk) Present() bool {
return true
}
func (h hintedChunk) Hint() (r ref.Ref) {
return h.hint
}
func (h hintedChunk) Type() types.Type {
return h.t
}
type absentChunk struct{}
func (a absentChunk) Present() bool {
return false
}
func (a absentChunk) Hint() (r ref.Ref) {
return
}
func (a absentChunk) Type() types.Type {
panic("Not reached. Should never call Type() on an absentChunk.")
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/attic-labs/noms/ref"
)
// Note that this doesn't actually implement the chunks.ChunkStore interface. This is by design, because we don't want to provide Has(), and because Put() is intended to be non-blocking and take 'hints', but I failed to come up with a different-but-still-relevant name.
type hintedChunkStore interface {
hintedChunkSink
chunks.RootTracker
@@ -16,7 +17,8 @@ type hintedChunkStore interface {
}
type hintedChunkSink interface {
// Put writes c into the ChunkSink, using the provided hints to assist in validation. c may or may not be persisted when Put() returns, but is guaranteed to be persistent after a call to Flush() or Close().
// Put writes c into the ChunkSink, using the provided hints to assist in validation. Validation requires checking that all refs embedded in c are themselves valid, which could be done by resolving each one. Instead, hints provides a (smaller) set of refs that point to chunks that themselves contain many of c's refs. Thus, by checking only the hinted chunks, c can be validated with fewer read operations.
// c may or may not be persisted when Put() returns, but is guaranteed to be persistent after a call to Flush() or Close().
Put(c chunks.Chunk, hints map[ref.Ref]struct{})
// Flush causes enqueued Puts to be persisted.

View File

@@ -7,6 +7,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
@@ -20,10 +21,11 @@ import (
const (
httpChunkSinkConcurrency = 6
writeBufferSize = 64
readBufferSize = 2048
writeBufferSize = 1 << 12 // 4K
readBufferSize = 1 << 12 // 4K
)
// Note that this doesn't actually implement the chunks.ChunkStore interface. This is by design, because we don't want to provide Has(), and because Put() is intended to be non-blocking and take 'hints', but I failed to come up with a different-but-still-relevant name.
type httpHintedChunkStore struct {
host *url.URL
httpClient httpDoer
@@ -206,25 +208,23 @@ func (bhcs *httpHintedChunkStore) batchPutRequests() {
go func() {
defer bhcs.workerWg.Done()
var chunks []chunks.Chunk
numChunks := 0
hints := map[ref.Ref]struct{}{}
sendAndReset := func() {
bhcs.sendWriteRequests(chunks, hints) // Takes ownership of chunks
chunks = nil
hints = map[ref.Ref]struct{}{}
buf := makeBuffer()
gw := gzip.NewWriter(buf)
sz := chunks.NewSerializer(gw)
handleRequest := func(wr writeRequest) {
numChunks++
sz.Put(wr.c)
for hint := range wr.hints {
hints[hint] = struct{}{}
}
}
for done := false; !done; {
drainAndSend := false
select {
case wr := <-bhcs.writeQueue:
chunks = append(chunks, wr.c)
for hint := range wr.hints {
hints[hint] = struct{}{}
}
if len(chunks) == writeBufferSize {
sendAndReset()
}
handleRequest(wr)
case <-bhcs.flushChan:
drainAndSend = true
case <-bhcs.finishedChan:
@@ -236,32 +236,44 @@ func (bhcs *httpHintedChunkStore) batchPutRequests() {
for drained := false; !drained; {
select {
case wr := <-bhcs.writeQueue:
chunks = append(chunks, wr.c)
for hint := range wr.hints {
hints[hint] = struct{}{}
}
handleRequest(wr)
default:
drained = true
}
if len(chunks) == writeBufferSize || (drained && chunks != nil) {
sendAndReset()
d.Chk.NoError(sz.Close())
d.Chk.NoError(gw.Close())
_, err := buf.Seek(0, 0)
d.Chk.NoError(err, "Could not reset filesystem buffer to offset 0.")
bhcs.sendWriteRequests(buf, numChunks, hints) // Takes ownership of buf, hints
numChunks = 0
hints = map[ref.Ref]struct{}{}
buf = makeBuffer()
gw = gzip.NewWriter(buf)
sz = chunks.NewSerializer(gw)
}
}
}
}
d.Chk.Nil(chunks, "%d chunks were never sent to server", len(chunks))
}()
}
func (bhcs *httpHintedChunkStore) sendWriteRequests(chnx []chunks.Chunk, hints map[ref.Ref]struct{}) {
func makeBuffer() *os.File {
f, err := ioutil.TempFile("", "http_hinted_chunk_store_")
d.Chk.NoError(err, "Cannot create filesystem buffer for Chunks.")
return f
}
func (bhcs *httpHintedChunkStore) sendWriteRequests(serializedChunks *os.File, numChunks int, hints map[ref.Ref]struct{}) {
bhcs.rateLimit <- struct{}{}
go func() {
defer func() {
bhcs.unwrittenPuts.Clear(chnx)
bhcs.requestWg.Add(-len(chnx))
bhcs.unwrittenPuts = newUnwrittenPutCache()
bhcs.requestWg.Add(-numChunks)
d.Chk.NoError(serializedChunks.Close(), "Cannot close filesystem buffer.")
d.Chk.NoError(os.Remove(serializedChunks.Name()), "Cannot remove filesystem buffer.")
}()
body := buildWriteValueRequest(chnx, hints)
body := buildWriteValueRequest(serializedChunks, hints)
url := *bhcs.host
url.Path = httprouter.CleanPath(bhcs.host.Path + constants.WriteValuePath)

View File

@@ -72,6 +72,20 @@ func HandleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs
}
v := types.DecodeChunk(c, &cvr)
d.Exp.NotNil(v, "Chunk with hash %s failed to decode", r)
// IT'S A HAAAAAACK
if datasets, ok := v.(MapOfStringToRefOfCommit); ok {
// Manually populate cvr with Dataset Heads, because shove still uses a crappy backchannel to send chunks over. This can go away once we fix BUG 822.
datasets.IterAll(func(s string, rOfC RefOfCommit) {
r := rOfC.TargetRef()
if !cvr.isPresent(r) {
fmt.Println("Manually shoving in", r)
cvr.ReadValue(r)
}
})
}
// End HAAAAAAAAACK
cvr.checkChunksInCache(v)
cvr.set(r, presentChunk(v.Type()))
orderedChunks = append(orderedChunks, c)
@@ -88,17 +102,12 @@ func HandleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs
}
// Contents of the returned io.Reader are gzipped.
func buildWriteValueRequest(chnx []chunks.Chunk, hints map[ref.Ref]struct{}) io.Reader {
func buildWriteValueRequest(serializedChunks io.Reader, hints map[ref.Ref]struct{}) io.Reader {
body := &bytes.Buffer{}
gw := gzip.NewWriter(body)
serializeHints(gw, hints)
sz := chunks.NewSerializer(gw)
for _, chunk := range chnx {
sz.Put(chunk)
}
sz.Close()
gw.Close()
return body
d.Chk.NoError(gw.Close())
return io.MultiReader(body, serializedChunks)
}
func HandleGetRefs(w http.ResponseWriter, req *http.Request, ps URLParams, cs chunks.ChunkStore) {

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"compress/gzip"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
@@ -60,11 +61,12 @@ func TestBuildWriteValueRequest(t *testing.T) {
chunks.NewChunk([]byte(input1)),
chunks.NewChunk([]byte(input2)),
}
hints := map[ref.Ref]struct{}{
ref.Parse("sha1-0000000000000000000000000000000000000002"): struct{}{},
ref.Parse("sha1-0000000000000000000000000000000000000003"): struct{}{},
}
compressed := buildWriteValueRequest(chnx, hints)
compressed := buildWriteValueRequest(serializeChunks(chnx, assert), hints)
gr, err := gzip.NewReader(compressed)
d.Exp.NoError(err)
defer gr.Close()
@@ -86,6 +88,16 @@ func TestBuildWriteValueRequest(t *testing.T) {
assert.Empty(chnx)
}
func serializeChunks(chnx []chunks.Chunk, assert *assert.Assertions) io.Reader {
body := &bytes.Buffer{}
gw := gzip.NewWriter(body)
sz := chunks.NewSerializer(gw)
assert.NoError(sz.PutMany(chnx))
assert.NoError(sz.Close())
assert.NoError(gw.Close())
return body
}
func TestBuildGetRefsRequest(t *testing.T) {
assert := assert.New(t)
refs := map[ref.Ref]struct{}{