diff --git a/chunks/chunk_store.go b/chunks/chunk_store.go index 6b96e617a4..458cb576d2 100644 --- a/chunks/chunk_store.go +++ b/chunks/chunk_store.go @@ -1,8 +1,11 @@ package chunks import ( + "bytes" + "encoding/binary" "io" + "github.com/attic-labs/noms/d" "github.com/attic-labs/noms/ref" ) @@ -24,14 +27,66 @@ type RootTracker interface { type ChunkSource interface { // Get gets a reader for the value of the Ref in the store. If the ref is absent from the store nil is returned. Get(ref ref.Ref) io.ReadCloser + + // Returns true iff the value at the address |ref| is contained in the source + Has(ref ref.Ref) bool } // ChunkSink is a place to put chunks. type ChunkSink interface { Put() ChunkWriter +} - // Returns true iff the value at the address |ref| is contained in the source - Has(ref ref.Ref) bool +/* + Chunk Serialization: + Chunk 0 + Chunk 1 + .. + Chunk N + + Chunk: + Len // 4-byte int + Data // len(Data) == Len +*/ + +func Serialize(w io.Writer, refs map[ref.Ref]bool, cs ChunkSource) { + // TODO: If ChunkSource could provide the length of a chunk without having to buffer it, this could be completely streaming. + chunk := &bytes.Buffer{} + for r, _ := range refs { + chunk.Reset() + r := cs.Get(r) + if r == nil { + continue + } + + _, err := io.Copy(chunk, r) + d.Chk.NoError(err) + + // Because of chunking at higher levels, no chunk should never be more than 4GB + chunkSize := uint32(len(chunk.Bytes())) + err = binary.Write(w, binary.LittleEndian, chunkSize) + d.Chk.NoError(err) + + n, err := io.Copy(w, chunk) + d.Chk.NoError(err) + d.Chk.Equal(uint32(n), chunkSize) + } +} + +func Deserialize(r io.Reader, cs ChunkSink) { + for { + chunkSize := uint32(0) + err := binary.Read(r, binary.LittleEndian, &chunkSize) + if err == io.EOF { + break + } + d.Chk.NoError(err) + + w := cs.Put() + _, err = io.CopyN(w, r, int64(chunkSize)) + d.Chk.NoError(err) + w.Close() + } } // NewFlags creates a new instance of Flags, which declares a number of ChunkStore-related command-line flags using the golang flag package. Call this before flag.Parse(). diff --git a/chunks/chunk_writer.go b/chunks/chunk_writer.go index d7e86ae614..4009965ace 100644 --- a/chunks/chunk_writer.go +++ b/chunks/chunk_writer.go @@ -18,7 +18,7 @@ type ChunkWriter interface { } // ChunkWriter wraps an io.WriteCloser, additionally providing the ability to grab a Ref for all data written through the interface. Calling Ref() or Close() on an instance disallows further writing. -type writeFn func(ref ref.Ref, buff *bytes.Buffer) +type writeFn func(ref ref.Ref, data []byte) type chunkWriter struct { write writeFn @@ -57,7 +57,7 @@ func (w *chunkWriter) Close() error { } w.ref = ref.FromHash(w.hash) - w.write(w.ref, w.buffer) + w.write(w.ref, w.buffer.Bytes()) w.buffer = nil return nil } diff --git a/chunks/http_store.go b/chunks/http_store.go index d6310c4363..b64c6d4dd6 100644 --- a/chunks/http_store.go +++ b/chunks/http_store.go @@ -3,10 +3,8 @@ package chunks import ( "bytes" "compress/gzip" - "encoding/binary" "flag" "fmt" - "hash/crc32" "io" "io/ioutil" "net" @@ -24,8 +22,10 @@ const ( rootPath = "/root/" refPath = "/ref/" getRefsPath = "/getRefs/" + postRefsPath = "/postRefs/" targetBufferSize = 1 << 16 // 64k (compressed) readBufferSize = 1 << 12 // 4k + writeBufferSize = 1 << 12 // 4k readLimit = 6 // Experimentally, 5 was dimishing returns, 1 for good measure writeLimit = 6 ) @@ -35,12 +35,41 @@ type readRequest struct { ch chan io.ReadCloser } +// readBatch represents a set of queued read requests, each of which are blocking on a receive channel for a response. It implements ChunkSink so that the responses can be directly deserialized and streamed back to callers. +type readBatch map[ref.Ref][]chan io.ReadCloser + +func (rrg *readBatch) Put() ChunkWriter { + return newChunkWriter(rrg.write) +} + +func (rrg *readBatch) write(r ref.Ref, data []byte) { + for _, ch := range (*rrg)[r] { + ch <- ioutil.NopCloser(bytes.NewReader(data)) + } + + delete(*rrg, r) +} + +// Callers to Get() must receive nil if the corresponding chunk wasn't in the response from the server (i.e. it wasn't found). +func (rrq *readBatch) respondToFailedReads() { + for _, chs := range *rrq { + for _, ch := range chs { + ch <- nil + } + } +} + +type writeRequest struct { + r ref.Ref + data []byte +} + type httpStoreClient struct { host *url.URL readQueue chan readRequest - cb *chunkBuffer + writeQueue chan writeRequest wg *sync.WaitGroup - writeLimit chan int + written map[ref.Ref]bool wmu *sync.Mutex } @@ -59,9 +88,9 @@ func NewHttpStoreClient(host string) *httpStoreClient { client := &httpStoreClient{ u, make(chan readRequest, readBufferSize), - newChunkBuffer(), + make(chan writeRequest, writeBufferSize), &sync.WaitGroup{}, - make(chan int, writeLimit), + map[ref.Ref]bool{}, &sync.Mutex{}, } @@ -69,6 +98,10 @@ func NewHttpStoreClient(host string) *httpStoreClient { go client.sendReadRequests() } + for i := 0; i < writeLimit; i++ { + go client.sendWriteRequests() + } + return client } @@ -86,29 +119,27 @@ func (c *httpStoreClient) Get(r ref.Ref) io.ReadCloser { func (c *httpStoreClient) sendReadRequests() { for req := range c.readQueue { - reqs := []readRequest{req} + reqs := readBatch{} + refs := map[ref.Ref]bool{} + + addReq := func(req readRequest) { + reqs[req.r] = append(reqs[req.r], req.ch) + refs[req.r] = true + } + addReq(req) loop: for { select { case req := <-c.readQueue: - reqs = append(reqs, req) + addReq(req) default: break loop } } - refs := make(map[ref.Ref]bool) - for _, req := range reqs { - refs[req.r] = true - } - - cs := &MemoryStore{} - c.getRefs(refs, cs) - - for _, req := range reqs { - req.ch <- cs.Get(req.r) - } + c.getRefs(refs, &reqs) + reqs.respondToFailedReads() } } @@ -126,35 +157,66 @@ func (c *httpStoreClient) Put() ChunkWriter { return newChunkWriter(c.write) } -func (c *httpStoreClient) write(r ref.Ref, buff *bytes.Buffer) { +func (c *httpStoreClient) write(r ref.Ref, data []byte) { c.wmu.Lock() defer c.wmu.Unlock() + if c.written[r] { + return + } + c.written[r] = true - c.cb.appendChunk(buff) - if c.cb.isFull() { - c.flushBuffered() + c.wg.Add(1) + c.writeQueue <- writeRequest{r, data} +} + +func (c *httpStoreClient) sendWriteRequests() { + for req := range c.writeQueue { + ms := &MemoryStore{} + refs := map[ref.Ref]bool{} + + addReq := func(req writeRequest) { + ms.write(req.r, req.data) + refs[req.r] = true + } + addReq(req) + + loop: + for { + select { + case req := <-c.writeQueue: + addReq(req) + default: + break loop + } + } + + c.postRefs(refs, ms) + + for _, _ = range refs { + c.wg.Done() + } } } -func (c *httpStoreClient) flushBuffered() { - if c.cb.count == 0 { - return - } +func (c *httpStoreClient) postRefs(refs map[ref.Ref]bool, cs ChunkSource) { + body := &bytes.Buffer{} + gw := gzip.NewWriter(body) + Serialize(gw, refs, cs) + gw.Close() - c.cb.finish() + url := *c.host + url.Path = postRefsPath + req, err := http.NewRequest("POST", url.String(), body) + d.Chk.NoError(err) - c.wg.Add(1) - c.writeLimit <- 1 // TODO: This may block writes, fix so that when the upload limit is hit, incoming writes simply buffer but return immediately - go func(body *bytes.Buffer) { - res := c.requestRef(ref.Ref{}, "POST", body) - defer closeResponse(res) + req.Header.Set("Content-Encoding", "gzip") + req.Header.Set("Content-Type", "application/octet-stream") - d.Chk.Equal(res.StatusCode, http.StatusCreated, "Unexpected response: %s", http.StatusText(res.StatusCode)) + res, err := http.DefaultClient.Do(req) + d.Chk.NoError(err) - <-c.writeLimit - c.wg.Done() - }(c.cb.buff) - c.cb = newChunkBuffer() + d.Chk.Equal(res.StatusCode, http.StatusCreated, "Unexpected response: %s", http.StatusText(res.StatusCode)) + closeResponse(res) } func (c *httpStoreClient) requestRef(r ref.Ref, method string, body io.Reader) *http.Response { @@ -176,8 +238,8 @@ func (c *httpStoreClient) requestRef(r ref.Ref, method string, body io.Reader) * return res } -func (c *httpStoreClient) getRefs(refs map[ref.Ref]bool, cs ChunkStore) { - // POST http:///getRefs/. Post query: ref=sha1---&ref=sha1---& Response will be chunk data if present, 404 if absent. +func (c *httpStoreClient) getRefs(refs map[ref.Ref]bool, cs ChunkSink) { + // POST http:///getRefs/. Post body: ref=sha1---&ref=sha1---& Response will be chunk data if present, 404 if absent. u := *c.host u.Path = getRefsPath values := &url.Values{} @@ -186,15 +248,24 @@ func (c *httpStoreClient) getRefs(refs map[ref.Ref]bool, cs ChunkStore) { } req, err := http.NewRequest("POST", u.String(), strings.NewReader(values.Encode())) + req.Header.Add("Accept-Encoding", "gzip") req.Header.Add("Content-Type", "application/x-www-form-urlencoded") d.Chk.NoError(err) res, err := http.DefaultClient.Do(req) d.Chk.NoError(err) + defer closeResponse(res) d.Chk.Equal(http.StatusOK, res.StatusCode, "Unexpected response: %s", http.StatusText(res.StatusCode)) - deserializeToChunkStore(res.Body, cs) - closeResponse(res) + reader := res.Body + if strings.Contains(res.Header.Get("Content-Encoding"), "gzip") { + gr, err := gzip.NewReader(reader) + d.Chk.NoError(err) + defer gr.Close() + reader = gr + } + + Deserialize(reader, cs) } func (c *httpStoreClient) Root() ref.Ref { @@ -210,8 +281,12 @@ func (c *httpStoreClient) Root() ref.Ref { func (c *httpStoreClient) UpdateRoot(current, last ref.Ref) bool { // POST http://root?current=&last=. Response will be 200 on success, 409 if current is outdated. - c.flushBuffered() c.wg.Wait() + + c.wmu.Lock() + c.written = map[ref.Ref]bool{} + c.wmu.Unlock() + res := c.requestRoot("POST", current, last) defer closeResponse(res) @@ -242,17 +317,12 @@ func (c *httpStoreClient) requestRoot(method string, current, last ref.Ref) *htt func (c *httpStoreClient) Close() error { c.wg.Wait() close(c.readQueue) + close(c.writeQueue) return nil } func (s *httpStoreServer) handleRef(w http.ResponseWriter, req *http.Request) { err := d.Try(func() { - if req.Method == "POST" { - deserializeToChunkStore(req.Body, s.cs) - w.WriteHeader(http.StatusCreated) - return - } - refStr := "" pathParts := strings.Split(req.URL.Path[1:], "/") if len(pathParts) > 1 { @@ -279,6 +349,8 @@ func (s *httpStoreServer) handleRef(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusNotFound) return } + default: + d.Exp.Fail("Unexpected method: ", req.Method) } }) @@ -288,32 +360,53 @@ func (s *httpStoreServer) handleRef(w http.ResponseWriter, req *http.Request) { } } +func (s *httpStoreServer) handlePostRefs(w http.ResponseWriter, req *http.Request) { + err := d.Try(func() { + d.Exp.Equal("POST", req.Method) + + var reader io.Reader = req.Body + if strings.Contains(req.Header.Get("Content-Encoding"), "gzip") { + gr, err := gzip.NewReader(reader) + d.Exp.NoError(err) + defer gr.Close() + reader = gr + } + + Deserialize(reader, s.cs) + w.WriteHeader(http.StatusCreated) + }) + + if err != nil { + http.Error(w, fmt.Sprintf("Error: %v", err), http.StatusBadRequest) + return + } +} + func (s *httpStoreServer) handleGetRefs(w http.ResponseWriter, req *http.Request) { err := d.Try(func() { d.Exp.Equal("POST", req.Method) req.ParseForm() - refs := req.Form["ref"] - d.Exp.True(len(refs) > 0) + refStrs := req.PostForm["ref"] + d.Exp.True(len(refStrs) > 0) - cb := newChunkBuffer() - for _, refStr := range refs { + refs := map[ref.Ref]bool{} + for _, refStr := range refStrs { r := ref.Parse(refStr) - reader := s.cs.Get(r) - if reader != nil { - buff := &bytes.Buffer{} - _, err := io.Copy(buff, reader) - d.Chk.NoError(err) - reader.Close() - cb.appendChunk(buff) - } + refs[r] = true } - cb.finish() - - _, err := io.Copy(w, cb.buff) - d.Chk.NoError(err) w.Header().Add("Content-Type", "application/octet-stream") + + writer := w.(io.Writer) + if strings.Contains(req.Header.Get("Accept-Encoding"), "gzip") { + w.Header().Add("Content-Encoding", "gzip") + gw := gzip.NewWriter(w) + defer gw.Close() + writer = gw + } + + Serialize(writer, refs, s.cs) }) if err != nil { @@ -379,6 +472,7 @@ func (s *httpStoreServer) Run() { mux.HandleFunc(refPath, http.HandlerFunc(s.handleRef)) mux.HandleFunc(getRefsPath, http.HandlerFunc(s.handleGetRefs)) + mux.HandleFunc(postRefsPath, http.HandlerFunc(s.handlePostRefs)) mux.HandleFunc(rootPath, http.HandlerFunc(s.handleRoot)) srv := &http.Server{ @@ -399,76 +493,6 @@ func (s *httpStoreServer) Stop() { } } -/* - ChunkBuffer: - Chunk 0 - Chunk 1 - .. - Chunk N - Footer - - Chunk: - Len // 4-byte int - Data // len(Data) == Len - - The entire ChunkBuffer is gzip'd when serialized and un-gzip'd on deserializeToChunkStore -*/ - -var crcTable = crc32.MakeTable(crc32.Castagnoli) - -type chunkBuffer struct { - buff *bytes.Buffer - w io.WriteCloser - count uint32 -} - -func newChunkBuffer() *chunkBuffer { - buff := &bytes.Buffer{} - return &chunkBuffer{buff, gzip.NewWriter(buff), 0} -} - -func (cb *chunkBuffer) appendChunk(chunk *bytes.Buffer) { - d.Chk.True(len(chunk.Bytes()) < 1<<32) // Because of chunking at higher levels, no chunk should never be more than 4GB - cb.count++ - - chunkSize := uint32(chunk.Len()) - err := binary.Write(cb.w, binary.LittleEndian, chunkSize) - d.Chk.NoError(err) - - n, err := io.Copy(cb.w, chunk) - d.Chk.NoError(err) - d.Chk.Equal(uint32(n), chunkSize) -} - -func (cb *chunkBuffer) isFull() bool { - return cb.buff.Len() >= targetBufferSize -} - -func (cb *chunkBuffer) finish() { - cb.w.Close() - cb.w = nil -} - -func deserializeToChunkStore(body io.Reader, cs ChunkStore) { - r, err := gzip.NewReader(body) - d.Chk.NoError(err) - - for true { - chunkSize := uint32(0) - err = binary.Read(r, binary.LittleEndian, &chunkSize) - if err == io.EOF { - break - } - d.Chk.NoError(err) - - // BUG 206 - Validate the resulting refs match the client's expectation. - w := cs.Put() - _, err := io.CopyN(w, r, int64(chunkSize)) - d.Chk.NoError(err) - w.Close() - } -} - type httpStoreFlags struct { host *string } diff --git a/chunks/leveldb_store.go b/chunks/leveldb_store.go index 515d11ce96..79160705fe 100644 --- a/chunks/leveldb_store.go +++ b/chunks/leveldb_store.go @@ -88,13 +88,13 @@ func (l *LevelDBStore) Put() ChunkWriter { return newChunkWriter(l.write) } -func (l *LevelDBStore) write(ref ref.Ref, buff *bytes.Buffer) { +func (l *LevelDBStore) write(ref ref.Ref, data []byte) { if l.Has(ref) { return } key := toChunkKey(ref) - err := l.db.Put(key, buff.Bytes(), nil) + err := l.db.Put(key, data, nil) d.Chk.NoError(err) l.putCount += 1 } diff --git a/chunks/memory_store.go b/chunks/memory_store.go index 200387dc61..7c85d1595d 100644 --- a/chunks/memory_store.go +++ b/chunks/memory_store.go @@ -34,7 +34,7 @@ func (ms *MemoryStore) Put() ChunkWriter { return newChunkWriter(ms.write) } -func (ms *MemoryStore) write(r ref.Ref, buff *bytes.Buffer) { +func (ms *MemoryStore) write(r ref.Ref, data []byte) { if ms.Has(r) { return } @@ -42,7 +42,7 @@ func (ms *MemoryStore) write(r ref.Ref, buff *bytes.Buffer) { if ms.data == nil { ms.data = map[ref.Ref][]byte{} } - ms.data[r] = buff.Bytes() + ms.data[r] = data } func (ms *MemoryStore) Len() int { diff --git a/chunks/memory_store_test.go b/chunks/memory_store_test.go index 67e80d255c..0849d7dc5a 100644 --- a/chunks/memory_store_test.go +++ b/chunks/memory_store_test.go @@ -1,6 +1,7 @@ package chunks import ( + "bytes" "testing" "github.com/attic-labs/noms/Godeps/_workspace/src/github.com/stretchr/testify/suite" @@ -21,3 +22,11 @@ func (suite *MemoryStoreTestSuite) SetupTest() { func (suite *MemoryStoreTestSuite) TearDownTest() { suite.store.Close() } + +func (suite *MemoryStoreTestSuite) TestBadSerialization() { + bad := []byte{0, 1} // Not enough bytes to read first length + ms := &MemoryStore{} + suite.Panics(func() { + Deserialize(bytes.NewReader(bad), ms) + }) +} diff --git a/chunks/nop_store.go b/chunks/nop_store.go index 09cf579825..d31ec8edab 100644 --- a/chunks/nop_store.go +++ b/chunks/nop_store.go @@ -1,7 +1,6 @@ package chunks import ( - "bytes" "flag" "io" @@ -29,7 +28,7 @@ func (ms *NopStore) Put() ChunkWriter { return newChunkWriter(ms.write) } -func (ms *NopStore) write(ref ref.Ref, buff *bytes.Buffer) {} +func (ms *NopStore) write(ref ref.Ref, data []byte) {} func (ms *NopStore) Close() error { return nil