mirror of
https://github.com/dolthub/dolt.git
synced 2026-05-20 03:00:43 -05:00
Fix crash in handleWriteValue (#2696)
In some cases where the same chunk appears more than once in a given writeValue request, the handleWriteValue code is able to recognize this and skip re-decoding and re-hashing it. In that case an empty result winds up percolating through the code, and I wasn't handling this correctly. Fixed and added a unit test to catch this. Fixes #2695
This commit is contained in:
@@ -30,7 +30,7 @@ type Handler func(w http.ResponseWriter, req *http.Request, ps URLParams, cs chu
|
||||
|
||||
// NomsVersionHeader is the name of the header that Noms clients and servers must set in every request/response.
|
||||
const NomsVersionHeader = "x-noms-vers"
|
||||
const nomsBaseHtml = "<html><head></head><body><p>Hi. This is a Noms HTTP server.</p><p>To learn more, visit <a href=\"https://github.com/attic-labs/noms\">our GitHub project</a>.</p></body></html>"
|
||||
const nomsBaseHTML = "<html><head></head><body><p>Hi. This is a Noms HTTP server.</p><p>To learn more, visit <a href=\"https://github.com/attic-labs/noms\">our GitHub project</a>.</p></body></html>"
|
||||
|
||||
var (
|
||||
// HandleWriteValue is meant to handle HTTP POST requests to the writeValue/ server endpoint. The payload should be an appropriately-ordered sequence of Chunks to be validated and stored on the server.
|
||||
@@ -58,6 +58,8 @@ var (
|
||||
HandleBaseGet = handleBaseGet
|
||||
)
|
||||
|
||||
const writeValueConcurrency = 16
|
||||
|
||||
func versionCheck(hndlr Handler) Handler {
|
||||
return func(w http.ResponseWriter, req *http.Request, ps URLParams, cs chunks.ChunkStore) {
|
||||
w.Header().Set(NomsVersionHeader, constants.NomsVersion)
|
||||
@@ -92,25 +94,27 @@ func handleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs
|
||||
vbs := types.NewValidatingBatchingSink(cs)
|
||||
vbs.Prepare(deserializeHints(reader))
|
||||
|
||||
chunkChan := make(chan interface{}, 16)
|
||||
chunkChan := make(chan interface{}, writeValueConcurrency)
|
||||
go chunks.DeserializeToChan(reader, chunkChan)
|
||||
decoded := orderedparallel.New(
|
||||
chunkChan,
|
||||
func(c interface{}) interface{} {
|
||||
return vbs.DecodeUnqueued(c.(*chunks.Chunk))
|
||||
},
|
||||
16)
|
||||
writeValueConcurrency)
|
||||
|
||||
var bpe chunks.BackpressureError
|
||||
for dci := range decoded {
|
||||
dc := dci.(types.DecodedChunk)
|
||||
if bpe == nil {
|
||||
bpe = vbs.Enqueue(*dc.Chunk, *dc.Value)
|
||||
} else {
|
||||
bpe = append(bpe, dc.Chunk.Hash())
|
||||
if dc.Chunk != nil && dc.Value != nil {
|
||||
if bpe == nil {
|
||||
bpe = vbs.Enqueue(*dc.Chunk, *dc.Value)
|
||||
} else {
|
||||
bpe = append(bpe, dc.Chunk.Hash())
|
||||
}
|
||||
// If a previous Enqueue() errored, we still need to drain chunkChan
|
||||
// TODO: what about having DeserializeToChan take a 'done' channel to stop it?
|
||||
}
|
||||
// If a previous Enqueue() errored, we still need to drain chunkChan
|
||||
// TODO: what about having DeserializeToChan take a 'done' channel to stop it?
|
||||
}
|
||||
if bpe == nil {
|
||||
bpe = vbs.Flush()
|
||||
@@ -271,7 +275,7 @@ func handleBaseGet(w http.ResponseWriter, req *http.Request, ps URLParams, rt ch
|
||||
d.PanicIfTrue(req.Method != "GET", "Expected get method.")
|
||||
|
||||
w.Header().Add("content-type", "text/html")
|
||||
fmt.Fprintf(w, nomsBaseHtml)
|
||||
fmt.Fprintf(w, nomsBaseHTML)
|
||||
}
|
||||
|
||||
func isMapOfStringToRefOfCommit(m types.Map) bool {
|
||||
|
||||
@@ -57,6 +57,32 @@ func TestHandleWriteValue(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleWriteValueDupChunks(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := chunks.NewTestStore()
|
||||
|
||||
newItem := types.NewEmptyBlob()
|
||||
itemChunk := types.EncodeValue(newItem, nil)
|
||||
|
||||
body := &bytes.Buffer{}
|
||||
serializeHints(body, map[hash.Hash]struct{}{})
|
||||
// Write the same chunk to body enough times to be certain that at least one of the concurrent deserialize/decode passes completes before the last one can continue.
|
||||
for i := 0; i <= writeValueConcurrency; i++ {
|
||||
chunks.Serialize(itemChunk, body)
|
||||
}
|
||||
|
||||
w := httptest.NewRecorder()
|
||||
HandleWriteValue(w, newRequest("POST", "", "", body, nil), params{}, cs)
|
||||
|
||||
if assert.Equal(http.StatusCreated, w.Code, "Handler error:\n%s", string(w.Body.Bytes())) {
|
||||
ds2 := NewDatabase(cs)
|
||||
v := ds2.ReadValue(newItem.Hash())
|
||||
if assert.NotNil(v) {
|
||||
assert.True(v.Equals(newItem), "%+v != %+v", v, newItem)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleWriteValueBackpressure(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
cs := &backpressureCS{ChunkStore: chunks.NewMemoryStore()}
|
||||
@@ -260,7 +286,7 @@ func TestHandleGetBase(t *testing.T) {
|
||||
HandleBaseGet(w, newRequest("GET", "", "", nil, nil), params{}, cs)
|
||||
|
||||
if assert.Equal(http.StatusOK, w.Code, "Handler error:\n%s", string(w.Body.Bytes())) {
|
||||
assert.Equal([]byte(nomsBaseHtml), w.Body.Bytes())
|
||||
assert.Equal([]byte(nomsBaseHTML), w.Body.Bytes())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user