Report backpressure over the wire

Using ChunkStore.PutMany() means that the DataStore server code
can detect when the ChunkStore it's writing to can't handle
the amount of data being pushed. This patch reports that
status back across the wire to the client that's attempting
to write a Value graph. Due to Issue #1259, the only thing the
client can currently do is retry the entire batch, but we hope
to do better in the future.
This commit is contained in:
Chris Masone
2016-04-11 11:26:09 -07:00
parent d998d0cc02
commit 6ff58aa38b
10 changed files with 237 additions and 103 deletions

View File

@@ -48,9 +48,13 @@ type ChunkSink interface {
io.Closer
}
// BackpressureError is a slice of Chunk that indicates some chunks could not be Put(). Caller is free to try to Put them again later.
type BackpressureError []Chunk
// BackpressureError is a slice of ref.Ref that indicates some chunks could not be Put(). Caller is free to try to Put them again later.
type BackpressureError ref.RefSlice
func (b BackpressureError) Error() string {
return fmt.Sprintf("Tried to Put %d too many Chunks", len(b))
}
func (b BackpressureError) AsHashes() ref.RefSlice {
return ref.RefSlice(b)
}

View File

@@ -136,7 +136,12 @@ func (s *DynamoStore) PutMany(chunks []Chunk) (e BackpressureError) {
s.requestWg.Add(1)
s.unwrittenPuts.Add(c)
default:
return BackpressureError(chunks[i:])
notPut := chunks[i:]
e = make(BackpressureError, len(notPut))
for j, np := range notPut {
e[j] = np.Ref()
}
return
}
}
return

View File

@@ -50,8 +50,8 @@ func (rts ReadThroughStore) Put(c Chunk) {
func (rts ReadThroughStore) PutMany(chunks []Chunk) BackpressureError {
bpe := rts.backingStore.PutMany(chunks)
lookup := make(map[ref.Ref]bool, len(bpe))
for _, c := range bpe {
lookup[c.Ref()] = true
for _, r := range bpe {
lookup[r] = true
}
toPut := make([]Chunk, 0, len(chunks)-len(bpe))
for _, c := range chunks {

View File

@@ -24,6 +24,8 @@ const (
httpChunkSinkConcurrency = 6
writeBufferSize = 1 << 12 // 4K
readBufferSize = 1 << 12 // 4K
httpStatusTooManyRequests = 429 // This is new in Go 1.6. Once the builders have that, use it.
)
// httpBatchStore implements types.BatchStore
@@ -242,8 +244,6 @@ func (bhcs *httpBatchStore) batchPutRequests() {
drained = true
d.Chk.NoError(sz.Close())
d.Chk.NoError(gw.Close())
_, err := buf.Seek(0, 0)
d.Chk.NoError(err, "Could not reset filesystem buffer to offset 0.")
bhcs.sendWriteRequests(buf, numChunks, hints) // Takes ownership of buf, hints
numChunks = 0
@@ -268,26 +268,45 @@ func (bhcs *httpBatchStore) sendWriteRequests(serializedChunks *os.File, numChun
bhcs.rateLimit <- struct{}{}
go func() {
defer func() {
<-bhcs.rateLimit
bhcs.unwrittenPuts = newUnwrittenPutCache()
bhcs.requestWg.Add(-numChunks)
d.Chk.NoError(serializedChunks.Close(), "Cannot close filesystem buffer.")
d.Chk.NoError(os.Remove(serializedChunks.Name()), "Cannot remove filesystem buffer.")
}()
body := buildWriteValueRequest(serializedChunks, hints)
var res *http.Response
for tryAgain := true; tryAgain; {
_, err := serializedChunks.Seek(0, 0)
d.Chk.NoError(err, "Could not reset filesystem buffer to offset 0.")
body := buildWriteValueRequest(serializedChunks, hints)
url := *bhcs.host
url.Path = httprouter.CleanPath(bhcs.host.Path + constants.WriteValuePath)
req := newRequest("POST", bhcs.auth, url.String(), body, http.Header{
"Content-Encoding": {"gzip"},
"Content-Type": {"application/octet-stream"},
})
url := *bhcs.host
url.Path = httprouter.CleanPath(bhcs.host.Path + constants.WriteValuePath)
req := newRequest("POST", bhcs.auth, url.String(), body, http.Header{
"Accept-Encoding": {"gzip"},
"Content-Encoding": {"gzip"},
"Content-Type": {"application/octet-stream"},
})
res, err = bhcs.httpClient.Do(req)
d.Exp.NoError(err)
defer closeResponse(res)
if tryAgain = res.StatusCode == httpStatusTooManyRequests; tryAgain {
reader := res.Body
if strings.Contains(res.Header.Get("Content-Encoding"), "gzip") {
gr, err := gzip.NewReader(reader)
d.Exp.NoError(err)
defer gr.Close()
reader = gr
}
/*hashes :=*/ deserializeHashes(reader)
// TODO: BUG 1259 Since the client must currently send all chunks in one batch, the only thing to do in response to backpressure is send EVERYTHING again. Once batching is again possible, this code should figure out how to resend the chunks indicated by hashes.
}
}
res, err := bhcs.httpClient.Do(req)
d.Exp.NoError(err)
defer closeResponse(res)
d.Exp.Equal(http.StatusCreated, res.StatusCode, "Unexpected response: %s", formatErrorResponse(res))
<-bhcs.rateLimit
}()
}

View File

@@ -1,7 +1,9 @@
package datas
import (
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"github.com/attic-labs/noms/chunks"
@@ -27,9 +29,15 @@ type inlineServer struct {
}
func (serv inlineServer) Do(req *http.Request) (resp *http.Response, err error) {
w := newFakeHTTPResponseWriter()
w := httptest.NewRecorder()
serv.ServeHTTP(w, req)
return w.resp, nil
return &http.Response{
StatusCode: w.Code,
Status: http.StatusText(w.Code),
Header: w.HeaderMap,
Body: ioutil.NopCloser(w.Body),
},
nil
}
func (suite *HTTPBatchStoreSuite) SetupTest() {
@@ -102,7 +110,7 @@ func (suite *HTTPBatchStoreSuite) TestPutChunkWithHints() {
types.EncodeValue(types.NewString("abc"), nil),
types.EncodeValue(types.NewString("def"), nil),
}
suite.cs.PutMany(chnx)
suite.NoError(suite.cs.PutMany(chnx))
l := types.NewList(types.NewRef(chnx[0].Ref()), types.NewRef(chnx[1].Ref()))
suite.store.SchedulePut(types.EncodeValue(l, nil), types.Hints{
@@ -114,6 +122,52 @@ func (suite *HTTPBatchStoreSuite) TestPutChunkWithHints() {
suite.Equal(3, suite.cs.Writes)
}
type backpressureCS struct {
chunks.ChunkStore
tries int
}
func (b *backpressureCS) PutMany(chnx []chunks.Chunk) chunks.BackpressureError {
if chnx == nil {
return nil
}
b.tries++
if len(chnx) <= b.tries {
return b.ChunkStore.PutMany(chnx)
}
if bpe := b.ChunkStore.PutMany(chnx[:b.tries]); bpe != nil {
return bpe
}
bpe := make(chunks.BackpressureError, len(chnx)-b.tries)
for i, c := range chnx[b.tries:] {
bpe[i] = c.Ref()
}
return bpe
}
func (suite *HTTPBatchStoreSuite) TestPutChunksBackpressure() {
bpcs := &backpressureCS{ChunkStore: suite.cs}
bs := newHTTPBatchStoreForTest(bpcs)
defer bs.Close()
defer bpcs.Close()
chnx := []chunks.Chunk{
types.EncodeValue(types.NewString("abc"), nil),
types.EncodeValue(types.NewString("def"), nil),
}
l := types.NewList()
for _, c := range chnx {
bs.SchedulePut(c, types.Hints{})
l = l.Append(types.NewRef(c.Ref()))
}
bs.SchedulePut(types.EncodeValue(l, nil), types.Hints{})
bs.Flush()
suite.Equal(6, suite.cs.Writes)
}
func (suite *HTTPBatchStoreSuite) TestRoot() {
c := chunks.NewChunk([]byte("abc"))
suite.True(suite.cs.UpdateRoot(c.Ref(), ref.Ref{}))

View File

@@ -119,15 +119,17 @@ func (bhcs *notABatchSink) batchPutRequests() {
func (bhcs *notABatchSink) sendWriteRequests(chnx []chunks.Chunk) {
bhcs.rateLimit <- struct{}{}
go func() {
hashes := make(ref.RefSlice, len(chnx))
defer func() {
bhcs.unwrittenPuts.Clear(chnx)
bhcs.unwrittenPuts.Clear(hashes)
bhcs.requestWg.Add(-len(chnx))
}()
body := &bytes.Buffer{}
gw := gzip.NewWriter(body)
sz := chunks.NewSerializer(gw)
for _, chunk := range chnx {
for i, chunk := range chnx {
hashes[i] = chunk.Ref()
sz.Put(chunk)
}
sz.Close()

View File

@@ -43,10 +43,10 @@ func (p *unwrittenPutCache) Get(r ref.Ref) chunks.Chunk {
return chunks.EmptyChunk
}
func (p *unwrittenPutCache) Clear(chunks []chunks.Chunk) {
func (p *unwrittenPutCache) Clear(hashes ref.RefSlice) {
p.mu.Lock()
defer p.mu.Unlock()
for _, c := range chunks {
delete(p.unwrittenPuts, c.Ref())
for _, hash := range hashes {
delete(p.unwrittenPuts, hash)
}
}

View File

@@ -26,14 +26,8 @@ func HandlePostRefs(w http.ResponseWriter, req *http.Request, ps URLParams, cs c
err := d.Try(func() {
d.Exp.Equal("POST", req.Method)
var reader io.Reader = req.Body
if strings.Contains(req.Header.Get("Content-Encoding"), "gzip") {
gr, err := gzip.NewReader(reader)
d.Exp.NoError(err)
defer gr.Close()
reader = gr
}
reader := bodyReader(req)
defer reader.Close()
chunks.Deserialize(reader, cs, nil)
w.WriteHeader(http.StatusCreated)
})
@@ -44,17 +38,40 @@ func HandlePostRefs(w http.ResponseWriter, req *http.Request, ps URLParams, cs c
}
}
func bodyReader(req *http.Request) (reader io.ReadCloser) {
reader = req.Body
if strings.Contains(req.Header.Get("Content-Encoding"), "gzip") {
gr, err := gzip.NewReader(reader)
d.Exp.NoError(err)
reader = gr
}
return
}
func respWriter(req *http.Request, w http.ResponseWriter) (writer io.WriteCloser) {
writer = wc{w.(io.Writer)}
if strings.Contains(req.Header.Get("Accept-Encoding"), "gzip") {
w.Header().Add("Content-Encoding", "gzip")
gw := gzip.NewWriter(w)
writer = gw
}
return
}
type wc struct {
io.Writer
}
func (wc wc) Close() error {
return nil
}
func HandleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs chunks.ChunkStore) {
err := d.Try(func() {
d.Exp.Equal("POST", req.Method)
var reader io.Reader = req.Body
if strings.Contains(req.Header.Get("Content-Encoding"), "gzip") {
gr, err := gzip.NewReader(reader)
d.Exp.NoError(err)
defer gr.Close()
reader = gr
}
reader := bodyReader(req)
defer reader.Close()
vbs := types.NewValidatingBatchingSink(cs)
vbs.Prepare(deserializeHints(reader))
@@ -71,7 +88,14 @@ func HandleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs
if bpe == nil {
bpe = vbs.Flush()
}
// TODO communicate backpressure from bpe
if bpe != nil {
w.WriteHeader(httpStatusTooManyRequests)
w.Header().Add("Content-Type", "application/octet-stream")
writer := respWriter(req, w)
defer writer.Close()
serializeHashes(writer, bpe.AsHashes())
return
}
w.WriteHeader(http.StatusCreated)
})
@@ -105,13 +129,8 @@ func HandleGetRefs(w http.ResponseWriter, req *http.Request, ps URLParams, cs ch
}
w.Header().Add("Content-Type", "application/octet-stream")
writer := w.(io.Writer)
if strings.Contains(req.Header.Get("Accept-Encoding"), "gzip") {
w.Header().Add("Content-Encoding", "gzip")
gw := gzip.NewWriter(w)
defer gw.Close()
writer = gw
}
writer := respWriter(req, w)
defer writer.Close()
sz := chunks.NewSerializer(writer)
for _, r := range refs {

View File

@@ -7,6 +7,7 @@ import (
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
@@ -42,10 +43,10 @@ func TestHandleWriteValue(t *testing.T) {
sz.Put(listChunk)
sz.Close()
w := newFakeHTTPResponseWriter()
w := httptest.NewRecorder()
HandleWriteValue(w, &http.Request{Body: ioutil.NopCloser(body), Method: "POST"}, params{}, cs)
if assert.Equal(http.StatusCreated, w.resp.StatusCode, "Handler error:\n%s", string(w.buf.Bytes())) {
if assert.Equal(http.StatusCreated, w.Code, "Handler error:\n%s", string(w.Body.Bytes())) {
ds2 := NewDataStore(cs)
v := ds2.ReadValue(l2.Ref())
if assert.NotNil(v) {
@@ -54,6 +55,40 @@ func TestHandleWriteValue(t *testing.T) {
}
}
func TestHandleWriteValueBackpressure(t *testing.T) {
assert := assert.New(t)
cs := &backpressureCS{ChunkStore: chunks.NewMemoryStore()}
ds := NewDataStore(cs)
l := types.NewList(
ds.WriteValue(types.Bool(true)),
ds.WriteValue(types.Bool(false)),
)
ds.WriteValue(l)
hint := l.Ref()
newItem := types.NewEmptyBlob()
itemChunk := types.EncodeValue(newItem, nil)
l2 := l.Insert(1, types.NewRefOfBlob(itemChunk.Ref()))
listChunk := types.EncodeValue(l2, nil)
body := &bytes.Buffer{}
serializeHints(body, map[ref.Ref]struct{}{hint: struct{}{}})
sz := chunks.NewSerializer(body)
sz.Put(itemChunk)
sz.Put(listChunk)
sz.Close()
w := httptest.NewRecorder()
HandleWriteValue(w, &http.Request{Body: ioutil.NopCloser(body), Method: "POST"}, params{}, cs)
if assert.Equal(httpStatusTooManyRequests, w.Code, "Handler error:\n%s", string(w.Body.Bytes())) {
hashes := deserializeHashes(w.Body)
assert.Len(hashes, 1)
assert.Equal(l2.Ref(), hashes[0])
}
}
func TestBuildWriteValueRequest(t *testing.T) {
assert := assert.New(t)
input1, input2 := "abc", "def"
@@ -133,7 +168,7 @@ func TestHandleGetRefs(t *testing.T) {
body := strings.NewReader(fmt.Sprintf("ref=%s&ref=%s", chnx[0].Ref(), chnx[1].Ref()))
w := newFakeHTTPResponseWriter()
w := httptest.NewRecorder()
HandleGetRefs(w,
&http.Request{Body: ioutil.NopCloser(body), Method: "POST", Header: http.Header{
"Content-Type": {"application/x-www-form-urlencoded"},
@@ -142,9 +177,9 @@ func TestHandleGetRefs(t *testing.T) {
cs,
)
if assert.Equal(http.StatusOK, w.resp.StatusCode, "Handler error:\n%s", string(w.buf.Bytes())) {
if assert.Equal(http.StatusOK, w.Code, "Handler error:\n%s", string(w.Body.Bytes())) {
chunkChan := make(chan chunks.Chunk)
go chunks.DeserializeToChan(w.buf, chunkChan)
go chunks.DeserializeToChan(w.Body, chunkChan)
for c := range chunkChan {
assert.Equal(chnx[0].Ref(), c.Ref())
chnx = chnx[1:]
@@ -160,11 +195,11 @@ func TestHandleGetRoot(t *testing.T) {
cs.Put(c)
assert.True(cs.UpdateRoot(c.Ref(), ref.Ref{}))
w := newFakeHTTPResponseWriter()
w := httptest.NewRecorder()
HandleRootGet(w, &http.Request{Method: "GET"}, params{}, cs)
if assert.Equal(http.StatusOK, w.resp.StatusCode, "Handler error:\n%s", string(w.buf.Bytes())) {
root := ref.Parse(string(w.buf.Bytes()))
if assert.Equal(http.StatusOK, w.Code, "Handler error:\n%s", string(w.Body.Bytes())) {
root := ref.Parse(string(w.Body.Bytes()))
assert.Equal(c.Ref(), root)
}
}
@@ -187,15 +222,15 @@ func TestHandlePostRoot(t *testing.T) {
queryParams.Add("current", chnx[1].Ref().String())
u.RawQuery = queryParams.Encode()
w := newFakeHTTPResponseWriter()
w := httptest.NewRecorder()
HandleRootPost(w, &http.Request{URL: u, Method: "POST"}, params{}, cs)
assert.Equal(http.StatusConflict, w.resp.StatusCode, "Handler error:\n%s", string(w.buf.Bytes()))
assert.Equal(http.StatusConflict, w.Code, "Handler error:\n%s", string(w.Body.Bytes()))
// Now, update the root manually to 'last' and try again.
assert.True(cs.UpdateRoot(chnx[0].Ref(), ref.Ref{}))
w = newFakeHTTPResponseWriter()
w = httptest.NewRecorder()
HandleRootPost(w, &http.Request{URL: u, Method: "POST"}, params{}, cs)
assert.Equal(http.StatusOK, w.resp.StatusCode, "Handler error:\n%s", string(w.buf.Bytes()))
assert.Equal(http.StatusOK, w.Code, "Handler error:\n%s", string(w.Body.Bytes()))
}
type params map[string]string
@@ -203,34 +238,3 @@ type params map[string]string
func (p params) ByName(k string) string {
return p[k]
}
type fakeHTTPResponseWriter struct {
buf *bytes.Buffer
resp *http.Response
}
func newFakeHTTPResponseWriter() *fakeHTTPResponseWriter {
buf := &bytes.Buffer{}
return &fakeHTTPResponseWriter{
buf: buf,
resp: &http.Response{
StatusCode: http.StatusOK,
Status: http.StatusText(http.StatusOK),
Header: http.Header{},
Body: ioutil.NopCloser(buf),
},
}
}
func (rw *fakeHTTPResponseWriter) Header() http.Header {
return rw.resp.Header
}
func (rw *fakeHTTPResponseWriter) Write(b []byte) (int, error) {
return rw.buf.Write(b)
}
func (rw *fakeHTTPResponseWriter) WriteHeader(ret int) {
rw.resp.StatusCode = ret
rw.resp.Status = http.StatusText(ret)
}

View File

@@ -11,17 +11,29 @@ import (
"github.com/attic-labs/noms/types"
)
func serializeHints(w io.Writer, hints map[ref.Ref]struct{}) {
func serializeHints(w io.Writer, hints types.Hints) {
err := binary.Write(w, binary.LittleEndian, uint32(len(hints))) // 4 billion hints is probably absurd. Maybe this should be smaller?
d.Chk.NoError(err)
for r := range hints {
digest := r.Digest()
n, err := io.Copy(w, bytes.NewReader(digest[:]))
d.Chk.NoError(err)
d.Chk.Equal(int64(sha1.Size), n)
serializeHash(w, r)
}
}
func serializeHashes(w io.Writer, hashes ref.RefSlice) {
err := binary.Write(w, binary.LittleEndian, uint32(len(hashes))) // 4 billion hashes is probably absurd. Maybe this should be smaller?
d.Chk.NoError(err)
for _, r := range hashes {
serializeHash(w, r)
}
}
func serializeHash(w io.Writer, hash ref.Ref) {
digest := hash.Digest()
n, err := io.Copy(w, bytes.NewReader(digest[:]))
d.Chk.NoError(err)
d.Chk.Equal(int64(sha1.Size), n)
}
func deserializeHints(reader io.Reader) types.Hints {
numRefs := uint32(0)
err := binary.Read(reader, binary.LittleEndian, &numRefs)
@@ -29,12 +41,27 @@ func deserializeHints(reader io.Reader) types.Hints {
hints := make(types.Hints, numRefs)
for i := uint32(0); i < numRefs; i++ {
digest := ref.Sha1Digest{}
n, err := io.ReadFull(reader, digest[:])
d.Chk.NoError(err)
d.Chk.Equal(int(sha1.Size), n)
hints[ref.New(digest)] = struct{}{}
hints[deserializeHash(reader)] = struct{}{}
}
return hints
}
func deserializeHashes(reader io.Reader) ref.RefSlice {
numRefs := uint32(0)
err := binary.Read(reader, binary.LittleEndian, &numRefs)
d.Chk.NoError(err)
hashes := make(ref.RefSlice, numRefs)
for i := range hashes {
hashes[i] = deserializeHash(reader)
}
return hashes
}
func deserializeHash(reader io.Reader) ref.Ref {
digest := ref.Sha1Digest{}
n, err := io.ReadFull(reader, digest[:])
d.Chk.NoError(err)
d.Chk.Equal(int(sha1.Size), n)
return ref.New(digest)
}