use MemoryStore rather than ChunkBuffer

This commit is contained in:
Rafael Weinstein
2015-09-10 12:48:27 -07:00
parent 5eac97fbf9
commit a877c6fac9
7 changed files with 232 additions and 145 deletions

View File

@@ -1,8 +1,11 @@
package chunks
import (
"bytes"
"encoding/binary"
"io"
"github.com/attic-labs/noms/d"
"github.com/attic-labs/noms/ref"
)
@@ -24,14 +27,66 @@ type RootTracker interface {
type ChunkSource interface {
// Get gets a reader for the value of the Ref in the store. If the ref is absent from the store nil is returned.
Get(ref ref.Ref) io.ReadCloser
// Returns true iff the value at the address |ref| is contained in the source
Has(ref ref.Ref) bool
}
// ChunkSink is a place to put chunks.
type ChunkSink interface {
Put() ChunkWriter
}
// Returns true iff the value at the address |ref| is contained in the source
Has(ref ref.Ref) bool
/*
Chunk Serialization:
Chunk 0
Chunk 1
..
Chunk N
Chunk:
Len // 4-byte int
Data // len(Data) == Len
*/
func Serialize(w io.Writer, refs map[ref.Ref]bool, cs ChunkSource) {
// TODO: If ChunkSource could provide the length of a chunk without having to buffer it, this could be completely streaming.
chunk := &bytes.Buffer{}
for r, _ := range refs {
chunk.Reset()
r := cs.Get(r)
if r == nil {
continue
}
_, err := io.Copy(chunk, r)
d.Chk.NoError(err)
// Because of chunking at higher levels, no chunk should never be more than 4GB
chunkSize := uint32(len(chunk.Bytes()))
err = binary.Write(w, binary.LittleEndian, chunkSize)
d.Chk.NoError(err)
n, err := io.Copy(w, chunk)
d.Chk.NoError(err)
d.Chk.Equal(uint32(n), chunkSize)
}
}
func Deserialize(r io.Reader, cs ChunkSink) {
for {
chunkSize := uint32(0)
err := binary.Read(r, binary.LittleEndian, &chunkSize)
if err == io.EOF {
break
}
d.Chk.NoError(err)
w := cs.Put()
_, err = io.CopyN(w, r, int64(chunkSize))
d.Chk.NoError(err)
w.Close()
}
}
// NewFlags creates a new instance of Flags, which declares a number of ChunkStore-related command-line flags using the golang flag package. Call this before flag.Parse().

View File

@@ -18,7 +18,7 @@ type ChunkWriter interface {
}
// ChunkWriter wraps an io.WriteCloser, additionally providing the ability to grab a Ref for all data written through the interface. Calling Ref() or Close() on an instance disallows further writing.
type writeFn func(ref ref.Ref, buff *bytes.Buffer)
type writeFn func(ref ref.Ref, data []byte)
type chunkWriter struct {
write writeFn
@@ -57,7 +57,7 @@ func (w *chunkWriter) Close() error {
}
w.ref = ref.FromHash(w.hash)
w.write(w.ref, w.buffer)
w.write(w.ref, w.buffer.Bytes())
w.buffer = nil
return nil
}

View File

@@ -3,10 +3,8 @@ package chunks
import (
"bytes"
"compress/gzip"
"encoding/binary"
"flag"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"net"
@@ -24,8 +22,10 @@ const (
rootPath = "/root/"
refPath = "/ref/"
getRefsPath = "/getRefs/"
postRefsPath = "/postRefs/"
targetBufferSize = 1 << 16 // 64k (compressed)
readBufferSize = 1 << 12 // 4k
writeBufferSize = 1 << 12 // 4k
readLimit = 6 // Experimentally, 5 was dimishing returns, 1 for good measure
writeLimit = 6
)
@@ -35,12 +35,41 @@ type readRequest struct {
ch chan io.ReadCloser
}
// readBatch represents a set of queued read requests, each of which are blocking on a receive channel for a response. It implements ChunkSink so that the responses can be directly deserialized and streamed back to callers.
type readBatch map[ref.Ref][]chan io.ReadCloser
func (rrg *readBatch) Put() ChunkWriter {
return newChunkWriter(rrg.write)
}
func (rrg *readBatch) write(r ref.Ref, data []byte) {
for _, ch := range (*rrg)[r] {
ch <- ioutil.NopCloser(bytes.NewReader(data))
}
delete(*rrg, r)
}
// Callers to Get() must receive nil if the corresponding chunk wasn't in the response from the server (i.e. it wasn't found).
func (rrq *readBatch) respondToFailedReads() {
for _, chs := range *rrq {
for _, ch := range chs {
ch <- nil
}
}
}
type writeRequest struct {
r ref.Ref
data []byte
}
type httpStoreClient struct {
host *url.URL
readQueue chan readRequest
cb *chunkBuffer
writeQueue chan writeRequest
wg *sync.WaitGroup
writeLimit chan int
written map[ref.Ref]bool
wmu *sync.Mutex
}
@@ -59,9 +88,9 @@ func NewHttpStoreClient(host string) *httpStoreClient {
client := &httpStoreClient{
u,
make(chan readRequest, readBufferSize),
newChunkBuffer(),
make(chan writeRequest, writeBufferSize),
&sync.WaitGroup{},
make(chan int, writeLimit),
map[ref.Ref]bool{},
&sync.Mutex{},
}
@@ -69,6 +98,10 @@ func NewHttpStoreClient(host string) *httpStoreClient {
go client.sendReadRequests()
}
for i := 0; i < writeLimit; i++ {
go client.sendWriteRequests()
}
return client
}
@@ -86,29 +119,27 @@ func (c *httpStoreClient) Get(r ref.Ref) io.ReadCloser {
func (c *httpStoreClient) sendReadRequests() {
for req := range c.readQueue {
reqs := []readRequest{req}
reqs := readBatch{}
refs := map[ref.Ref]bool{}
addReq := func(req readRequest) {
reqs[req.r] = append(reqs[req.r], req.ch)
refs[req.r] = true
}
addReq(req)
loop:
for {
select {
case req := <-c.readQueue:
reqs = append(reqs, req)
addReq(req)
default:
break loop
}
}
refs := make(map[ref.Ref]bool)
for _, req := range reqs {
refs[req.r] = true
}
cs := &MemoryStore{}
c.getRefs(refs, cs)
for _, req := range reqs {
req.ch <- cs.Get(req.r)
}
c.getRefs(refs, &reqs)
reqs.respondToFailedReads()
}
}
@@ -126,35 +157,66 @@ func (c *httpStoreClient) Put() ChunkWriter {
return newChunkWriter(c.write)
}
func (c *httpStoreClient) write(r ref.Ref, buff *bytes.Buffer) {
func (c *httpStoreClient) write(r ref.Ref, data []byte) {
c.wmu.Lock()
defer c.wmu.Unlock()
if c.written[r] {
return
}
c.written[r] = true
c.cb.appendChunk(buff)
if c.cb.isFull() {
c.flushBuffered()
c.wg.Add(1)
c.writeQueue <- writeRequest{r, data}
}
func (c *httpStoreClient) sendWriteRequests() {
for req := range c.writeQueue {
ms := &MemoryStore{}
refs := map[ref.Ref]bool{}
addReq := func(req writeRequest) {
ms.write(req.r, req.data)
refs[req.r] = true
}
addReq(req)
loop:
for {
select {
case req := <-c.writeQueue:
addReq(req)
default:
break loop
}
}
c.postRefs(refs, ms)
for _, _ = range refs {
c.wg.Done()
}
}
}
func (c *httpStoreClient) flushBuffered() {
if c.cb.count == 0 {
return
}
func (c *httpStoreClient) postRefs(refs map[ref.Ref]bool, cs ChunkSource) {
body := &bytes.Buffer{}
gw := gzip.NewWriter(body)
Serialize(gw, refs, cs)
gw.Close()
c.cb.finish()
url := *c.host
url.Path = postRefsPath
req, err := http.NewRequest("POST", url.String(), body)
d.Chk.NoError(err)
c.wg.Add(1)
c.writeLimit <- 1 // TODO: This may block writes, fix so that when the upload limit is hit, incoming writes simply buffer but return immediately
go func(body *bytes.Buffer) {
res := c.requestRef(ref.Ref{}, "POST", body)
defer closeResponse(res)
req.Header.Set("Content-Encoding", "gzip")
req.Header.Set("Content-Type", "application/octet-stream")
d.Chk.Equal(res.StatusCode, http.StatusCreated, "Unexpected response: %s", http.StatusText(res.StatusCode))
res, err := http.DefaultClient.Do(req)
d.Chk.NoError(err)
<-c.writeLimit
c.wg.Done()
}(c.cb.buff)
c.cb = newChunkBuffer()
d.Chk.Equal(res.StatusCode, http.StatusCreated, "Unexpected response: %s", http.StatusText(res.StatusCode))
closeResponse(res)
}
func (c *httpStoreClient) requestRef(r ref.Ref, method string, body io.Reader) *http.Response {
@@ -176,8 +238,8 @@ func (c *httpStoreClient) requestRef(r ref.Ref, method string, body io.Reader) *
return res
}
func (c *httpStoreClient) getRefs(refs map[ref.Ref]bool, cs ChunkStore) {
// POST http://<host>/getRefs/. Post query: ref=sha1---&ref=sha1---& Response will be chunk data if present, 404 if absent.
func (c *httpStoreClient) getRefs(refs map[ref.Ref]bool, cs ChunkSink) {
// POST http://<host>/getRefs/. Post body: ref=sha1---&ref=sha1---& Response will be chunk data if present, 404 if absent.
u := *c.host
u.Path = getRefsPath
values := &url.Values{}
@@ -186,15 +248,24 @@ func (c *httpStoreClient) getRefs(refs map[ref.Ref]bool, cs ChunkStore) {
}
req, err := http.NewRequest("POST", u.String(), strings.NewReader(values.Encode()))
req.Header.Add("Accept-Encoding", "gzip")
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
d.Chk.NoError(err)
res, err := http.DefaultClient.Do(req)
d.Chk.NoError(err)
defer closeResponse(res)
d.Chk.Equal(http.StatusOK, res.StatusCode, "Unexpected response: %s", http.StatusText(res.StatusCode))
deserializeToChunkStore(res.Body, cs)
closeResponse(res)
reader := res.Body
if strings.Contains(res.Header.Get("Content-Encoding"), "gzip") {
gr, err := gzip.NewReader(reader)
d.Chk.NoError(err)
defer gr.Close()
reader = gr
}
Deserialize(reader, cs)
}
func (c *httpStoreClient) Root() ref.Ref {
@@ -210,8 +281,12 @@ func (c *httpStoreClient) Root() ref.Ref {
func (c *httpStoreClient) UpdateRoot(current, last ref.Ref) bool {
// POST http://<host>root?current=<ref>&last=<ref>. Response will be 200 on success, 409 if current is outdated.
c.flushBuffered()
c.wg.Wait()
c.wmu.Lock()
c.written = map[ref.Ref]bool{}
c.wmu.Unlock()
res := c.requestRoot("POST", current, last)
defer closeResponse(res)
@@ -242,17 +317,12 @@ func (c *httpStoreClient) requestRoot(method string, current, last ref.Ref) *htt
func (c *httpStoreClient) Close() error {
c.wg.Wait()
close(c.readQueue)
close(c.writeQueue)
return nil
}
func (s *httpStoreServer) handleRef(w http.ResponseWriter, req *http.Request) {
err := d.Try(func() {
if req.Method == "POST" {
deserializeToChunkStore(req.Body, s.cs)
w.WriteHeader(http.StatusCreated)
return
}
refStr := ""
pathParts := strings.Split(req.URL.Path[1:], "/")
if len(pathParts) > 1 {
@@ -279,6 +349,8 @@ func (s *httpStoreServer) handleRef(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusNotFound)
return
}
default:
d.Exp.Fail("Unexpected method: ", req.Method)
}
})
@@ -288,32 +360,53 @@ func (s *httpStoreServer) handleRef(w http.ResponseWriter, req *http.Request) {
}
}
func (s *httpStoreServer) handlePostRefs(w http.ResponseWriter, req *http.Request) {
err := d.Try(func() {
d.Exp.Equal("POST", req.Method)
var reader io.Reader = req.Body
if strings.Contains(req.Header.Get("Content-Encoding"), "gzip") {
gr, err := gzip.NewReader(reader)
d.Exp.NoError(err)
defer gr.Close()
reader = gr
}
Deserialize(reader, s.cs)
w.WriteHeader(http.StatusCreated)
})
if err != nil {
http.Error(w, fmt.Sprintf("Error: %v", err), http.StatusBadRequest)
return
}
}
func (s *httpStoreServer) handleGetRefs(w http.ResponseWriter, req *http.Request) {
err := d.Try(func() {
d.Exp.Equal("POST", req.Method)
req.ParseForm()
refs := req.Form["ref"]
d.Exp.True(len(refs) > 0)
refStrs := req.PostForm["ref"]
d.Exp.True(len(refStrs) > 0)
cb := newChunkBuffer()
for _, refStr := range refs {
refs := map[ref.Ref]bool{}
for _, refStr := range refStrs {
r := ref.Parse(refStr)
reader := s.cs.Get(r)
if reader != nil {
buff := &bytes.Buffer{}
_, err := io.Copy(buff, reader)
d.Chk.NoError(err)
reader.Close()
cb.appendChunk(buff)
}
refs[r] = true
}
cb.finish()
_, err := io.Copy(w, cb.buff)
d.Chk.NoError(err)
w.Header().Add("Content-Type", "application/octet-stream")
writer := w.(io.Writer)
if strings.Contains(req.Header.Get("Accept-Encoding"), "gzip") {
w.Header().Add("Content-Encoding", "gzip")
gw := gzip.NewWriter(w)
defer gw.Close()
writer = gw
}
Serialize(writer, refs, s.cs)
})
if err != nil {
@@ -379,6 +472,7 @@ func (s *httpStoreServer) Run() {
mux.HandleFunc(refPath, http.HandlerFunc(s.handleRef))
mux.HandleFunc(getRefsPath, http.HandlerFunc(s.handleGetRefs))
mux.HandleFunc(postRefsPath, http.HandlerFunc(s.handlePostRefs))
mux.HandleFunc(rootPath, http.HandlerFunc(s.handleRoot))
srv := &http.Server{
@@ -399,76 +493,6 @@ func (s *httpStoreServer) Stop() {
}
}
/*
ChunkBuffer:
Chunk 0
Chunk 1
..
Chunk N
Footer
Chunk:
Len // 4-byte int
Data // len(Data) == Len
The entire ChunkBuffer is gzip'd when serialized and un-gzip'd on deserializeToChunkStore
*/
var crcTable = crc32.MakeTable(crc32.Castagnoli)
type chunkBuffer struct {
buff *bytes.Buffer
w io.WriteCloser
count uint32
}
func newChunkBuffer() *chunkBuffer {
buff := &bytes.Buffer{}
return &chunkBuffer{buff, gzip.NewWriter(buff), 0}
}
func (cb *chunkBuffer) appendChunk(chunk *bytes.Buffer) {
d.Chk.True(len(chunk.Bytes()) < 1<<32) // Because of chunking at higher levels, no chunk should never be more than 4GB
cb.count++
chunkSize := uint32(chunk.Len())
err := binary.Write(cb.w, binary.LittleEndian, chunkSize)
d.Chk.NoError(err)
n, err := io.Copy(cb.w, chunk)
d.Chk.NoError(err)
d.Chk.Equal(uint32(n), chunkSize)
}
func (cb *chunkBuffer) isFull() bool {
return cb.buff.Len() >= targetBufferSize
}
func (cb *chunkBuffer) finish() {
cb.w.Close()
cb.w = nil
}
func deserializeToChunkStore(body io.Reader, cs ChunkStore) {
r, err := gzip.NewReader(body)
d.Chk.NoError(err)
for true {
chunkSize := uint32(0)
err = binary.Read(r, binary.LittleEndian, &chunkSize)
if err == io.EOF {
break
}
d.Chk.NoError(err)
// BUG 206 - Validate the resulting refs match the client's expectation.
w := cs.Put()
_, err := io.CopyN(w, r, int64(chunkSize))
d.Chk.NoError(err)
w.Close()
}
}
type httpStoreFlags struct {
host *string
}

View File

@@ -88,13 +88,13 @@ func (l *LevelDBStore) Put() ChunkWriter {
return newChunkWriter(l.write)
}
func (l *LevelDBStore) write(ref ref.Ref, buff *bytes.Buffer) {
func (l *LevelDBStore) write(ref ref.Ref, data []byte) {
if l.Has(ref) {
return
}
key := toChunkKey(ref)
err := l.db.Put(key, buff.Bytes(), nil)
err := l.db.Put(key, data, nil)
d.Chk.NoError(err)
l.putCount += 1
}

View File

@@ -34,7 +34,7 @@ func (ms *MemoryStore) Put() ChunkWriter {
return newChunkWriter(ms.write)
}
func (ms *MemoryStore) write(r ref.Ref, buff *bytes.Buffer) {
func (ms *MemoryStore) write(r ref.Ref, data []byte) {
if ms.Has(r) {
return
}
@@ -42,7 +42,7 @@ func (ms *MemoryStore) write(r ref.Ref, buff *bytes.Buffer) {
if ms.data == nil {
ms.data = map[ref.Ref][]byte{}
}
ms.data[r] = buff.Bytes()
ms.data[r] = data
}
func (ms *MemoryStore) Len() int {

View File

@@ -1,6 +1,7 @@
package chunks
import (
"bytes"
"testing"
"github.com/attic-labs/noms/Godeps/_workspace/src/github.com/stretchr/testify/suite"
@@ -21,3 +22,11 @@ func (suite *MemoryStoreTestSuite) SetupTest() {
func (suite *MemoryStoreTestSuite) TearDownTest() {
suite.store.Close()
}
func (suite *MemoryStoreTestSuite) TestBadSerialization() {
bad := []byte{0, 1} // Not enough bytes to read first length
ms := &MemoryStore{}
suite.Panics(func() {
Deserialize(bytes.NewReader(bad), ms)
})
}

View File

@@ -1,7 +1,6 @@
package chunks
import (
"bytes"
"flag"
"io"
@@ -29,7 +28,7 @@ func (ms *NopStore) Put() ChunkWriter {
return newChunkWriter(ms.write)
}
func (ms *NopStore) write(ref ref.Ref, buff *bytes.Buffer) {}
func (ms *NopStore) write(ref ref.Ref, data []byte) {}
func (ms *NopStore) Close() error {
return nil