From b37b5cc61d7fc3670b36aaefac2fc7371fdb7a5e Mon Sep 17 00:00:00 2001 From: cmasone-attic Date: Wed, 12 Oct 2016 08:49:12 -0700 Subject: [PATCH] Fix crash in handleWriteValue (#2696) In some cases where the same chunk appears more than once in a given writeValue request, the handleWriteValue code is able to recognize this and skip re-decoding and re-hashing it. In that case an empty result winds up percolating through the code, and I wasn't handling this correctly. Fixed and added a unit test to catch this. Fixes #2695 --- go/datas/remote_database_handlers.go | 24 +++++++++++-------- go/datas/remote_database_handlers_test.go | 28 ++++++++++++++++++++++- 2 files changed, 41 insertions(+), 11 deletions(-) diff --git a/go/datas/remote_database_handlers.go b/go/datas/remote_database_handlers.go index 32417359e7..690d746f59 100644 --- a/go/datas/remote_database_handlers.go +++ b/go/datas/remote_database_handlers.go @@ -30,7 +30,7 @@ type Handler func(w http.ResponseWriter, req *http.Request, ps URLParams, cs chu // NomsVersionHeader is the name of the header that Noms clients and servers must set in every request/response. const NomsVersionHeader = "x-noms-vers" -const nomsBaseHtml = "

Hi. This is a Noms HTTP server.

To learn more, visit our GitHub project.

" +const nomsBaseHTML = "

Hi. This is a Noms HTTP server.

To learn more, visit our GitHub project.

" var ( // HandleWriteValue is meant to handle HTTP POST requests to the writeValue/ server endpoint. The payload should be an appropriately-ordered sequence of Chunks to be validated and stored on the server. @@ -58,6 +58,8 @@ var ( HandleBaseGet = handleBaseGet ) +const writeValueConcurrency = 16 + func versionCheck(hndlr Handler) Handler { return func(w http.ResponseWriter, req *http.Request, ps URLParams, cs chunks.ChunkStore) { w.Header().Set(NomsVersionHeader, constants.NomsVersion) @@ -92,25 +94,27 @@ func handleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs vbs := types.NewValidatingBatchingSink(cs) vbs.Prepare(deserializeHints(reader)) - chunkChan := make(chan interface{}, 16) + chunkChan := make(chan interface{}, writeValueConcurrency) go chunks.DeserializeToChan(reader, chunkChan) decoded := orderedparallel.New( chunkChan, func(c interface{}) interface{} { return vbs.DecodeUnqueued(c.(*chunks.Chunk)) }, - 16) + writeValueConcurrency) var bpe chunks.BackpressureError for dci := range decoded { dc := dci.(types.DecodedChunk) - if bpe == nil { - bpe = vbs.Enqueue(*dc.Chunk, *dc.Value) - } else { - bpe = append(bpe, dc.Chunk.Hash()) + if dc.Chunk != nil && dc.Value != nil { + if bpe == nil { + bpe = vbs.Enqueue(*dc.Chunk, *dc.Value) + } else { + bpe = append(bpe, dc.Chunk.Hash()) + } + // If a previous Enqueue() errored, we still need to drain chunkChan + // TODO: what about having DeserializeToChan take a 'done' channel to stop it? } - // If a previous Enqueue() errored, we still need to drain chunkChan - // TODO: what about having DeserializeToChan take a 'done' channel to stop it? } if bpe == nil { bpe = vbs.Flush() @@ -271,7 +275,7 @@ func handleBaseGet(w http.ResponseWriter, req *http.Request, ps URLParams, rt ch d.PanicIfTrue(req.Method != "GET", "Expected get method.") w.Header().Add("content-type", "text/html") - fmt.Fprintf(w, nomsBaseHtml) + fmt.Fprintf(w, nomsBaseHTML) } func isMapOfStringToRefOfCommit(m types.Map) bool { diff --git a/go/datas/remote_database_handlers_test.go b/go/datas/remote_database_handlers_test.go index fac4c4a84a..23696d883e 100644 --- a/go/datas/remote_database_handlers_test.go +++ b/go/datas/remote_database_handlers_test.go @@ -57,6 +57,32 @@ func TestHandleWriteValue(t *testing.T) { } } +func TestHandleWriteValueDupChunks(t *testing.T) { + assert := assert.New(t) + cs := chunks.NewTestStore() + + newItem := types.NewEmptyBlob() + itemChunk := types.EncodeValue(newItem, nil) + + body := &bytes.Buffer{} + serializeHints(body, map[hash.Hash]struct{}{}) + // Write the same chunk to body enough times to be certain that at least one of the concurrent deserialize/decode passes completes before the last one can continue. + for i := 0; i <= writeValueConcurrency; i++ { + chunks.Serialize(itemChunk, body) + } + + w := httptest.NewRecorder() + HandleWriteValue(w, newRequest("POST", "", "", body, nil), params{}, cs) + + if assert.Equal(http.StatusCreated, w.Code, "Handler error:\n%s", string(w.Body.Bytes())) { + ds2 := NewDatabase(cs) + v := ds2.ReadValue(newItem.Hash()) + if assert.NotNil(v) { + assert.True(v.Equals(newItem), "%+v != %+v", v, newItem) + } + } +} + func TestHandleWriteValueBackpressure(t *testing.T) { assert := assert.New(t) cs := &backpressureCS{ChunkStore: chunks.NewMemoryStore()} @@ -260,7 +286,7 @@ func TestHandleGetBase(t *testing.T) { HandleBaseGet(w, newRequest("GET", "", "", nil, nil), params{}, cs) if assert.Equal(http.StatusOK, w.Code, "Handler error:\n%s", string(w.Body.Bytes())) { - assert.Equal([]byte(nomsBaseHtml), w.Body.Bytes()) + assert.Equal([]byte(nomsBaseHTML), w.Body.Bytes()) } }