Add debug logging to HandleWriteValue (#2846)

This patch introduces optional debug logging in util/verbose, and adds
some usage of it to HandleWriteValue and the httpBatchStore
SchedulePut code path. It also modifies chunks.DeserializeToChan() so
that callers can better recover from panics in there.

https://github.com/attic-labs/attic/issues/103
This commit is contained in:
cmasone-attic
2016-11-21 15:11:34 -08:00
committed by GitHub
parent ff4ee3c3a9
commit 0cf72d5b85
5 changed files with 127 additions and 42 deletions

View File

@@ -87,7 +87,6 @@ func DeserializeToChan(reader io.Reader, chunkChan chan<- interface{}) {
}
chunkChan <- &c
}
close(chunkChan)
}
func deserializeChunk(reader io.Reader) (Chunk, bool) {

View File

@@ -22,6 +22,7 @@ import (
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/noms/go/types"
"github.com/attic-labs/noms/go/util/verbose"
"github.com/golang/snappy"
"github.com/julienschmidt/httprouter"
)
@@ -30,8 +31,6 @@ const (
httpChunkSinkConcurrency = 6
writeBufferSize = 1 << 12 // 4K
readBufferSize = 1 << 12 // 4K
httpStatusTooManyRequests = 429 // This is new in Go 1.6. Once the builders have that, use it.
)
// httpBatchStore implements types.BatchStore
@@ -387,6 +386,7 @@ func (bhcs *httpBatchStore) sendWriteRequests(hashes hash.HashSet, hints types.H
var res *http.Response
var err error
for tryAgain := true; tryAgain; {
verbose.Log("Sending %d chunks", len(hashes))
chunkChan := make(chan *chunks.Chunk, 1024)
go func() {
bhcs.unwrittenPuts.ExtractChunks(hashes, chunkChan)
@@ -408,7 +408,7 @@ func (bhcs *httpBatchStore) sendWriteRequests(hashes hash.HashSet, hints types.H
expectVersion(res)
defer closeResponse(res.Body)
if tryAgain = res.StatusCode == httpStatusTooManyRequests; tryAgain {
if tryAgain = res.StatusCode == http.StatusTooManyRequests; tryAgain {
reader := res.Body
if strings.Contains(res.Header.Get("Content-Encoding"), "gzip") {
gr, err := gzip.NewReader(reader)
@@ -418,12 +418,14 @@ func (bhcs *httpBatchStore) sendWriteRequests(hashes hash.HashSet, hints types.H
}
/*hashes :=*/ deserializeHashes(reader)
// TODO: BUG 1259 Since the client must currently send all chunks in one batch, the only thing to do in response to backpressure is send EVERYTHING again. Once batching is again possible, this code should figure out how to resend the chunks indicated by hashes.
verbose.Log("Retrying...")
}
}
if http.StatusCreated != res.StatusCode {
d.Panic("Unexpected response: %s", formatErrorResponse(res))
}
verbose.Log("Finished sending %d hashes", len(hashes))
}()
}

View File

@@ -19,6 +19,7 @@ import (
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/noms/go/types"
"github.com/attic-labs/noms/go/util/orderedparallel"
"github.com/attic-labs/noms/go/util/verbose"
"github.com/golang/snappy"
)
@@ -28,43 +29,61 @@ type URLParams interface {
type Handler func(w http.ResponseWriter, req *http.Request, ps URLParams, cs chunks.ChunkStore)
// NomsVersionHeader is the name of the header that Noms clients and servers must set in every request/response.
const NomsVersionHeader = "x-noms-vers"
const nomsBaseHTML = "<html><head></head><body><p>Hi. This is a Noms HTTP server.</p><p>To learn more, visit <a href=\"https://github.com/attic-labs/noms\">our GitHub project</a>.</p></body></html>"
var (
// HandleWriteValue is meant to handle HTTP POST requests to the writeValue/ server endpoint. The payload should be an appropriately-ordered sequence of Chunks to be validated and stored on the server.
// TODO: Nice comment about what headers it expects/honors, payload format, and error responses.
HandleWriteValue = versionCheck(handleWriteValue)
// HandleGetRefs is meant to handle HTTP POST requests to the getRefs/ server endpoint. Given a sequence of Chunk hashes, the server will fetch and return them.
// TODO: Nice comment about what headers it expects/honors, payload format, and responses.
HandleGetRefs = versionCheck(handleGetRefs)
// HandleWriteValue is meant to handle HTTP POST requests to the hasRefs/ server endpoint. Given a sequence of Chunk hashes, the server check for their presence and return a list of true/false responses.
// TODO: Nice comment about what headers it expects/honors, payload format, and responses.
HandleHasRefs = versionCheck(handleHasRefs)
// HandleRootGet is meant to handle HTTP GET requests to the root/ server endpoint. The server returns the hash of the Root as a string.
// TODO: Nice comment about what headers it expects/honors, payload format, and responses.
HandleRootGet = versionCheck(handleRootGet)
// HandleWriteValue is meant to handle HTTP POST requests to the root/ server endpoint. This is used to update the Root to point to a new Chunk.
// TODO: Nice comment about what headers it expects/honors, payload format, and error responses.
HandleRootPost = versionCheck(handleRootPost)
// HandleBaseGet is meant to handle HTTP GET requests to the / server endpoint. This is used to give a friendly message to users.
// TODO: Nice comment about what headers it expects/honors, payload format, and error responses.
HandleBaseGet = handleBaseGet
const (
// NomsVersionHeader is the name of the header that Noms clients and
// servers must set in every request/response.
NomsVersionHeader = "x-noms-vers"
nomsBaseHTML = "<html><head></head><body><p>Hi. This is a Noms HTTP server.</p><p>To learn more, visit <a href=\"https://github.com/attic-labs/noms\">our GitHub project</a>.</p></body></html>"
writeValueConcurrency = 16
)
const writeValueConcurrency = 16
var (
// HandleWriteValue is meant to handle HTTP POST requests to the
// writeValue/ server endpoint. The payload should be an appropriately-
// ordered sequence of Chunks to be validated and stored on the server.
// TODO: Nice comment about what headers it expects/honors, payload
// format, and error responses.
HandleWriteValue = versionCheck(handleWriteValue)
// HandleGetRefs is meant to handle HTTP POST requests to the getRefs/
// server endpoint. Given a sequence of Chunk hashes, the server will
// fetch and return them.
// TODO: Nice comment about what headers it
// expects/honors, payload format, and responses.
HandleGetRefs = versionCheck(handleGetRefs)
// HandleWriteValue is meant to handle HTTP POST requests to the hasRefs/
// server endpoint. Given a sequence of Chunk hashes, the server check for
// their presence and return a list of true/false responses.
// TODO: Nice comment about what headers it expects/honors, payload
// format, and responses.
HandleHasRefs = versionCheck(handleHasRefs)
// HandleRootGet is meant to handle HTTP GET requests to the root/ server
// endpoint. The server returns the hash of the Root as a string.
// TODO: Nice comment about what headers it expects/honors, payload
// format, and responses.
HandleRootGet = versionCheck(handleRootGet)
// HandleWriteValue is meant to handle HTTP POST requests to the root/
// server endpoint. This is used to update the Root to point to a new
// Chunk.
// TODO: Nice comment about what headers it expects/honors, payload
// format, and error responses.
HandleRootPost = versionCheck(handleRootPost)
// HandleBaseGet is meant to handle HTTP GET requests to the / server
// endpoint. This is used to give a friendly message to users.
// TODO: Nice comment about what headers it expects/honors, payload
// format, and error responses.
HandleBaseGet = handleBaseGet
)
func versionCheck(hndlr Handler) Handler {
return func(w http.ResponseWriter, req *http.Request, ps URLParams, cs chunks.ChunkStore) {
w.Header().Set(NomsVersionHeader, constants.NomsVersion)
if req.Header.Get(NomsVersionHeader) != constants.NomsVersion {
fmt.Println("Returning version mismatch error")
verbose.Log("Returning version mismatch error")
http.Error(
w,
fmt.Sprintf("Error: SDK version %s is incompatible with data of version %s", req.Header.Get(NomsVersionHeader), constants.NomsVersion),
@@ -75,18 +94,36 @@ func versionCheck(hndlr Handler) Handler {
err := d.Try(func() { hndlr(w, req, ps, cs) })
if err != nil {
fmt.Printf("Returning bad request:\n%v\n", err)
http.Error(w, fmt.Sprintf("Error: %v", d.Unwrap(err)), http.StatusBadRequest)
err = d.Unwrap(err)
verbose.Log("Returning bad request:\n%v\n", err)
http.Error(w, fmt.Sprintf("Error: %v", err), http.StatusBadRequest)
return
}
}
}
func recoverToError(errChan chan<- d.WrappedError) {
var we d.WrappedError
if r := recover(); r != nil {
switch err := r.(type) {
case d.WrappedError:
we = err
case error:
we = d.Wrap(err)
default:
we = d.Wrap(fmt.Errorf("Error: %v", err))
}
}
errChan <- we
}
func handleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs chunks.ChunkStore) {
if req.Method != "POST" {
d.Panic("Expected post method.")
}
verbose.Log("Handling WriteValue from " + req.RemoteAddr)
defer verbose.Log("Finished handling WriteValue from " + req.RemoteAddr)
reader := bodyReader(req)
defer func() {
// Ensure all data on reader is consumed
@@ -96,8 +133,16 @@ func handleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs
vbs := types.NewValidatingBatchingSink(cs)
vbs.Prepare(deserializeHints(reader))
// Deserialize chunks from reader in background, recovering from errors
errChan := make(chan d.WrappedError)
defer close(errChan)
chunkChan := make(chan interface{}, writeValueConcurrency)
go chunks.DeserializeToChan(reader, chunkChan)
go func() {
defer recoverToError(errChan)
defer close(chunkChan)
chunks.DeserializeToChan(reader, chunkChan)
}()
decoded := orderedparallel.New(
chunkChan,
func(c interface{}) interface{} {
@@ -105,12 +150,17 @@ func handleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs
},
writeValueConcurrency)
count := 0
var bpe chunks.BackpressureError
for dci := range decoded {
dc := dci.(types.DecodedChunk)
if dc.Chunk != nil && dc.Value != nil {
if bpe == nil {
bpe = vbs.Enqueue(*dc.Chunk, *dc.Value)
count++
if count%100 == 0 {
verbose.Log("Enqueued %d chunks", count)
}
} else {
bpe = append(bpe, dc.Chunk.Hash())
}
@@ -118,11 +168,17 @@ func handleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs
// TODO: what about having DeserializeToChan take a 'done' channel to stop it?
}
}
// If there was an error during chunk deserialization, log and respond
if err := <-errChan; err != nil {
panic(err)
}
if bpe == nil {
bpe = vbs.Flush()
}
if bpe != nil {
w.WriteHeader(httpStatusTooManyRequests)
w.WriteHeader(http.StatusTooManyRequests)
w.Header().Add("Content-Type", "application/octet-stream")
writer := respWriter(req, w)
defer writer.Close()
@@ -282,7 +338,7 @@ func handleRootPost(w http.ResponseWriter, req *http.Request, ps URLParams, cs c
}
m := v.(types.Map)
if !m.Empty() && !isMapOfStringToRefOfCommit(m) {
panic(d.Wrap(fmt.Errorf("Root of a Database must be a Map<String, Ref<Commit>>, not %s", m.Type().Describe())))
d.Panic("Root of a Database must be a Map<String, Ref<Commit>>, not %s", m.Type().Describe())
}
if !cs.UpdateRoot(current, last) {

View File

@@ -57,6 +57,20 @@ func TestHandleWriteValue(t *testing.T) {
}
}
func TestHandleWriteValuePanic(t *testing.T) {
assert := assert.New(t)
cs := chunks.NewTestStore()
body := &bytes.Buffer{}
serializeHints(body, types.Hints{})
body.WriteString("Bogus")
w := httptest.NewRecorder()
HandleWriteValue(w, newRequest("POST", "", "", body, nil), params{}, cs)
assert.Equal(http.StatusBadRequest, w.Code, "Handler error:\n%s", string(w.Body.Bytes()))
}
func TestHandleWriteValueDupChunks(t *testing.T) {
assert := assert.New(t)
cs := chunks.NewTestStore()
@@ -108,7 +122,7 @@ func TestHandleWriteValueBackpressure(t *testing.T) {
w := httptest.NewRecorder()
HandleWriteValue(w, newRequest("POST", "", "", body, nil), params{}, cs)
if assert.Equal(httpStatusTooManyRequests, w.Code, "Handler error:\n%s", string(w.Body.Bytes())) {
if assert.Equal(http.StatusTooManyRequests, w.Code, "Handler error:\n%s", string(w.Body.Bytes())) {
hashes := deserializeHashes(w.Body)
assert.Len(hashes, 1)
assert.Equal(l2.Hash(), hashes[0])
@@ -144,7 +158,7 @@ func TestBuildWriteValueRequest(t *testing.T) {
assert.Equal(len(hints), count)
outChunkChan := make(chan interface{}, 16)
go chunks.DeserializeToChan(gr, outChunkChan)
go deserializeToChan(gr, outChunkChan)
for c := range outChunkChan {
assert.Equal(chnx[0].Hash(), c.(*chunks.Chunk).Hash())
chnx = chnx[1:]
@@ -152,6 +166,11 @@ func TestBuildWriteValueRequest(t *testing.T) {
assert.Empty(chnx)
}
func deserializeToChan(reader io.Reader, chunkChan chan<- interface{}) {
chunks.DeserializeToChan(reader, chunkChan)
close(chunkChan)
}
func serializeChunks(chnx []chunks.Chunk, assert *assert.Assertions) io.Reader {
body := &bytes.Buffer{}
sw := snappy.NewBufferedWriter(body)
@@ -209,7 +228,7 @@ func TestHandleGetRefs(t *testing.T) {
if assert.Equal(http.StatusOK, w.Code, "Handler error:\n%s", string(w.Body.Bytes())) {
chunkChan := make(chan interface{})
go chunks.DeserializeToChan(w.Body, chunkChan)
go deserializeToChan(w.Body, chunkChan)
for c := range chunkChan {
assert.Equal(chnx[0].Hash(), c.(*chunks.Chunk).Hash())
chnx = chnx[1:]

View File

@@ -5,6 +5,8 @@
package verbose
import (
"fmt"
flag "github.com/juju/gnuflag"
)
@@ -30,3 +32,10 @@ func Verbose() bool {
func Quiet() bool {
return quiet
}
// Log calls Printf(format, args...) iff Verbose() returns true.
func Log(format string, args ...interface{}) {
if Verbose() {
fmt.Printf(format+"\n", args...)
}
}