diff --git a/http/http_client.go b/chunks/http_store.go similarity index 81% rename from http/http_client.go rename to chunks/http_store.go index 9b043ef4c5..897da67ef9 100644 --- a/http/http_client.go +++ b/chunks/http_store.go @@ -1,4 +1,4 @@ -package http +package chunks import ( "bytes" @@ -12,7 +12,7 @@ import ( "strings" "sync" - "github.com/attic-labs/noms/chunks" + "github.com/attic-labs/noms/constants" "github.com/attic-labs/noms/d" "github.com/attic-labs/noms/ref" ) @@ -27,13 +27,13 @@ const ( type readRequest struct { r ref.Ref - ch chan chunks.Chunk + ch chan Chunk } // readBatch represents a set of queued read requests, each of which are blocking on a receive channel for a response. -type readBatch map[ref.Ref][]chan chunks.Chunk +type readBatch map[ref.Ref][]chan Chunk -func (rrg *readBatch) Put(c chunks.Chunk) { +func (rrg *readBatch) Put(c Chunk) { for _, ch := range (*rrg)[c.Ref()] { ch <- c } @@ -45,30 +45,30 @@ func (rrg *readBatch) Put(c chunks.Chunk) { func (rrq *readBatch) Close() error { for _, chs := range *rrq { for _, ch := range chs { - ch <- chunks.EmptyChunk + ch <- EmptyChunk } } return nil } -type HttpClient struct { +type HttpStore struct { host *url.URL readQueue chan readRequest - writeQueue chan chunks.Chunk + writeQueue chan Chunk wg *sync.WaitGroup written map[ref.Ref]bool wmu *sync.Mutex } -func NewHttpClient(host string) *HttpClient { +func NewHttpStore(host string) *HttpStore { u, err := url.Parse(host) d.Exp.NoError(err) d.Exp.True(u.Scheme == "http" || u.Scheme == "https") d.Exp.Equal(*u, url.URL{Scheme: u.Scheme, Host: u.Host}) - client := &HttpClient{ + client := &HttpStore{ u, make(chan readRequest, readBufferSize), - make(chan chunks.Chunk, writeBufferSize), + make(chan Chunk, writeBufferSize), &sync.WaitGroup{}, map[ref.Ref]bool{}, &sync.Mutex{}, @@ -85,13 +85,17 @@ func NewHttpClient(host string) *HttpClient { return client } -func (c *HttpClient) Get(r ref.Ref) chunks.Chunk { - ch := make(chan chunks.Chunk) +func (c *HttpStore) Host() *url.URL { + return c.host +} + +func (c *HttpStore) Get(r ref.Ref) Chunk { + ch := make(chan Chunk) c.readQueue <- readRequest{r, ch} return <-ch } -func (c *HttpClient) sendReadRequests() { +func (c *HttpStore) sendReadRequests() { for req := range c.readQueue { reqs := readBatch{} refs := map[ref.Ref]bool{} @@ -117,7 +121,7 @@ func (c *HttpClient) sendReadRequests() { } } -func (c *HttpClient) Has(ref ref.Ref) bool { +func (c *HttpStore) Has(ref ref.Ref) bool { // HEAD http:///ref/. Response will be 200 if present, 404 if absent. res := c.requestRef(ref, "HEAD", nil) defer closeResponse(res) @@ -126,7 +130,7 @@ func (c *HttpClient) Has(ref ref.Ref) bool { return res.StatusCode == http.StatusOK } -func (c *HttpClient) Put(chunk chunks.Chunk) { +func (c *HttpStore) Put(chunk Chunk) { // POST http:///ref/. Body is a serialized chunkBuffer. Response will be 201. c.wmu.Lock() defer c.wmu.Unlock() @@ -139,9 +143,9 @@ func (c *HttpClient) Put(chunk chunks.Chunk) { c.writeQueue <- chunk } -func (c *HttpClient) sendWriteRequests() { +func (c *HttpStore) sendWriteRequests() { for chunk := range c.writeQueue { - chs := make([]chunks.Chunk, 0) + chs := make([]Chunk, 0) chs = append(chs, chunk) loop: @@ -161,10 +165,10 @@ func (c *HttpClient) sendWriteRequests() { } } -func (c *HttpClient) postRefs(chs []chunks.Chunk) { +func (c *HttpStore) postRefs(chs []Chunk) { body := &bytes.Buffer{} gw := gzip.NewWriter(body) - sz := chunks.NewSerializer(gw) + sz := NewSerializer(gw) for _, chunk := range chs { sz.Put(chunk) } @@ -172,7 +176,7 @@ func (c *HttpClient) postRefs(chs []chunks.Chunk) { gw.Close() url := *c.host - url.Path = postRefsPath + url.Path = constants.PostRefsPath req, err := http.NewRequest("POST", url.String(), body) d.Chk.NoError(err) @@ -186,9 +190,9 @@ func (c *HttpClient) postRefs(chs []chunks.Chunk) { closeResponse(res) } -func (c *HttpClient) requestRef(r ref.Ref, method string, body io.Reader) *http.Response { +func (c *HttpStore) requestRef(r ref.Ref, method string, body io.Reader) *http.Response { url := *c.host - url.Path = refPath + url.Path = constants.RefPath if (r != ref.Ref{}) { url.Path = path.Join(url.Path, r.String()) } @@ -205,10 +209,10 @@ func (c *HttpClient) requestRef(r ref.Ref, method string, body io.Reader) *http. return res } -func (c *HttpClient) getRefs(refs map[ref.Ref]bool, cs chunks.ChunkSink) { +func (c *HttpStore) getRefs(refs map[ref.Ref]bool, cs ChunkSink) { // POST http:///getRefs/. Post body: ref=sha1---&ref=sha1---& Response will be chunk data if present, 404 if absent. u := *c.host - u.Path = getRefsPath + u.Path = constants.GetRefsPath values := &url.Values{} for r, _ := range refs { values.Add("ref", r.String()) @@ -232,10 +236,10 @@ func (c *HttpClient) getRefs(refs map[ref.Ref]bool, cs chunks.ChunkSink) { reader = gr } - chunks.Deserialize(reader, cs) + Deserialize(reader, cs) } -func (c *HttpClient) Root() ref.Ref { +func (c *HttpStore) Root() ref.Ref { // GET http:///root. Response will be ref of root. res := c.requestRoot("GET", ref.Ref{}, ref.Ref{}) defer closeResponse(res) @@ -246,7 +250,7 @@ func (c *HttpClient) Root() ref.Ref { return ref.Parse(string(data)) } -func (c *HttpClient) UpdateRoot(current, last ref.Ref) bool { +func (c *HttpStore) UpdateRoot(current, last ref.Ref) bool { // POST http://root?current=&last=. Response will be 200 on success, 409 if current is outdated. c.wg.Wait() @@ -261,9 +265,9 @@ func (c *HttpClient) UpdateRoot(current, last ref.Ref) bool { return res.StatusCode == http.StatusOK } -func (c *HttpClient) requestRoot(method string, current, last ref.Ref) *http.Response { +func (c *HttpStore) requestRoot(method string, current, last ref.Ref) *http.Response { u := *c.host - u.Path = rootPath + u.Path = constants.RootPath if method == "POST" { d.Exp.True(current != ref.Ref{}) params := url.Values{} @@ -281,7 +285,7 @@ func (c *HttpClient) requestRoot(method string, current, last ref.Ref) *http.Res return res } -func (c *HttpClient) Close() error { +func (c *HttpStore) Close() error { c.wg.Wait() close(c.readQueue) close(c.writeQueue) @@ -296,20 +300,20 @@ func closeResponse(res *http.Response) error { return res.Body.Close() } -type Flags struct { +type HttpStoreFlags struct { host *string } -func NewFlagsWithPrefix(prefix string) Flags { - return Flags{ +func HttpFlags(prefix string) HttpStoreFlags { + return HttpStoreFlags{ flag.String(prefix+"h", "", "http host to connect to"), } } -func (h Flags) CreateClient() *HttpClient { +func (h HttpStoreFlags) CreateStore() ChunkStore { if *h.host == "" { return nil } else { - return NewHttpClient(*h.host) + return NewHttpStore(*h.host) } } diff --git a/clients/pitchmap/index/index.go b/clients/pitchmap/index/index.go index 1171cb0113..63e41d3275 100644 --- a/clients/pitchmap/index/index.go +++ b/clients/pitchmap/index/index.go @@ -167,9 +167,8 @@ func main() { if util.MaybeStartCPUProfile() { defer util.StopCPUProfile() } - dataStore := datas.NewDataStore(ds) - inputDataset := dataset.NewDataset(dataStore, *inputID) - outputDataset := dataset.NewDataset(dataStore, *outputID) + inputDataset := dataset.NewDataset(ds, *inputID) + outputDataset := dataset.NewDataset(ds, *outputID) input := types.ListFromVal(inputDataset.Head().Value()) output := getIndex(input).NomsValue() diff --git a/clients/server/server.go b/clients/server/server.go index 633fe68537..05a4bc3682 100644 --- a/clients/server/server.go +++ b/clients/server/server.go @@ -9,7 +9,6 @@ import ( "github.com/attic-labs/noms/clients/util" "github.com/attic-labs/noms/d" "github.com/attic-labs/noms/datas" - "github.com/attic-labs/noms/http" ) var ( @@ -25,7 +24,7 @@ func main() { return } - server := http.NewHttpServer(ds, *port) + server := datas.NewDataStoreServer(ds, *port) // Shutdown server gracefully so that profile may be written c := make(chan os.Signal, 1) diff --git a/clients/shove/shove.go b/clients/shove/shove.go index 6092cf2f3e..c9dd7f23ae 100644 --- a/clients/shove/shove.go +++ b/clients/shove/shove.go @@ -8,8 +8,6 @@ import ( "github.com/attic-labs/noms/clients/util" "github.com/attic-labs/noms/d" "github.com/attic-labs/noms/dataset" - "github.com/attic-labs/noms/ref" - "github.com/attic-labs/noms/sync" ) var ( @@ -38,20 +36,7 @@ func main() { defer util.StopCPUProfile() } - sourceHeadRef := source.Head().Ref() - sinkHeadRef := ref.Ref{} - if currentHead, ok := sink.MaybeHead(); ok { - sinkHeadRef = currentHead.Ref() - } - - if sourceHeadRef == sinkHeadRef { - return - } - - sync.CopyReachableChunksP(sourceHeadRef, sinkHeadRef, source.Store(), sink.Store(), int(*p)) - for ok := false; !ok; *sink, ok = sync.SetNewHead(sourceHeadRef, *sink) { - continue - } + *sink = sink.Pull(*source, int(*p)) util.MaybeWriteMemProfile() }) diff --git a/constants/http.go b/constants/http.go new file mode 100644 index 0000000000..724e157d63 --- /dev/null +++ b/constants/http.go @@ -0,0 +1,8 @@ +package constants + +const ( + RootPath = "/root/" + RefPath = "/ref/" + GetRefsPath = "/getRefs/" + PostRefsPath = "/postRefs/" +) diff --git a/datas/datastore.go b/datas/datastore.go index a3864597e4..a73d3e74ea 100644 --- a/datas/datastore.go +++ b/datas/datastore.go @@ -2,7 +2,7 @@ package datas import ( "github.com/attic-labs/noms/chunks" - "github.com/attic-labs/noms/http" + "github.com/attic-labs/noms/ref" "github.com/attic-labs/noms/types" ) @@ -21,6 +21,8 @@ type DataStore interface { // CommitWithParents updates the commit that a datastore points at. The new Commit is constructed using v and p. If the update cannot be performed, e.g., because of a conflict, CommitWithParents returns 'false'. The newest snapshot of the datastore is always returned. CommitWithParents(v types.Value, p SetOfCommit) (DataStore, bool) + + CopyReachableChunksP(r, exclude ref.Ref, sink chunks.ChunkSink, concurrency int) } func NewDataStore(cs chunks.ChunkStore) DataStore { @@ -31,7 +33,7 @@ type Flags struct { ldb chunks.LevelDBStoreFlags memory chunks.MemoryStoreFlags nop chunks.NopStoreFlags - hflags http.Flags + hflags chunks.HttpStoreFlags } func NewFlags() Flags { @@ -43,7 +45,7 @@ func NewFlagsWithPrefix(prefix string) Flags { chunks.LevelDBFlags(prefix), chunks.MemoryFlags(prefix), chunks.NopFlags(prefix), - http.NewFlagsWithPrefix(prefix), + chunks.HttpFlags(prefix), } } @@ -52,12 +54,15 @@ func (f Flags) CreateDataStore() (DataStore, bool) { if cs = f.ldb.CreateStore(); cs != nil { } else if cs = f.memory.CreateStore(); cs != nil { } else if cs = f.nop.CreateStore(); cs != nil { - } else if cs = f.hflags.CreateClient(); cs != nil { } - if cs == nil { - return &LocalDataStore{}, false + if cs != nil { + return newLocalDataStore(cs), true } - return newLocalDataStore(cs), true + if cs = f.hflags.CreateStore(); cs != nil { + return newRemoteDataStore(cs), true + } + + return &LocalDataStore{}, false } diff --git a/http/http_server.go b/datas/datastore_server.go similarity index 61% rename from http/http_server.go rename to datas/datastore_server.go index 52a8a314bd..78e8eeb34c 100644 --- a/http/http_server.go +++ b/datas/datastore_server.go @@ -1,4 +1,4 @@ -package http +package datas import ( "bytes" @@ -10,32 +10,46 @@ import ( "strings" "github.com/attic-labs/noms/chunks" + "github.com/attic-labs/noms/constants" "github.com/attic-labs/noms/d" "github.com/attic-labs/noms/ref" ) -const ( - rootPath = "/root/" - refPath = "/ref/" - getRefsPath = "/getRefs/" - postRefsPath = "/postRefs/" - maxConcurrentPuts = 64 -) - -type httpServer struct { - cs chunks.ChunkStore +type dataStoreServer struct { + ds DataStore port int l *net.Listener conns map[net.Conn]http.ConnState } -func NewHttpServer(cs chunks.ChunkStore, port int) *httpServer { - return &httpServer{ - cs, port, nil, map[net.Conn]http.ConnState{}, +func NewDataStoreServer(ds DataStore, port int) *dataStoreServer { + return &dataStoreServer{ + ds, port, nil, map[net.Conn]http.ConnState{}, } } -func (s *httpServer) handleRef(w http.ResponseWriter, req *http.Request) { +func (s *dataStoreServer) handleGetReachable(r ref.Ref, w http.ResponseWriter, req *http.Request) { + excludeRef := ref.Ref{} + exclude := req.URL.Query().Get("exclude") + if exclude != "" { + excludeRef = ref.Parse(exclude) + } + + w.Header().Add("Content-Type", "application/octet-stream") + writer := w.(io.Writer) + if strings.Contains(req.Header.Get("Accept-Encoding"), "gzip") { + w.Header().Add("Content-Encoding", "gzip") + gw := gzip.NewWriter(w) + defer gw.Close() + writer = gw + } + + sz := chunks.NewSerializer(writer) + s.ds.CopyReachableChunksP(r, excludeRef, sz, 512) + sz.Close() +} + +func (s *dataStoreServer) handleRef(w http.ResponseWriter, req *http.Request) { err := d.Try(func() { refStr := "" pathParts := strings.Split(req.URL.Path[1:], "/") @@ -46,7 +60,12 @@ func (s *httpServer) handleRef(w http.ResponseWriter, req *http.Request) { switch req.Method { case "GET": - chunk := s.cs.Get(r) + all := req.URL.Query().Get("all") + if all == "true" { + s.handleGetReachable(r, w, req) + return + } + chunk := s.ds.Get(r) if chunk.IsEmpty() { w.WriteHeader(http.StatusNotFound) return @@ -58,7 +77,7 @@ func (s *httpServer) handleRef(w http.ResponseWriter, req *http.Request) { w.Header().Add("Cache-Control", "max-age=31536000") // 1 year case "HEAD": - if !s.cs.Has(r) { + if !s.ds.Has(r) { w.WriteHeader(http.StatusNotFound) return } @@ -73,7 +92,7 @@ func (s *httpServer) handleRef(w http.ResponseWriter, req *http.Request) { } } -func (s *httpServer) handlePostRefs(w http.ResponseWriter, req *http.Request) { +func (s *dataStoreServer) handlePostRefs(w http.ResponseWriter, req *http.Request) { err := d.Try(func() { d.Exp.Equal("POST", req.Method) @@ -85,7 +104,7 @@ func (s *httpServer) handlePostRefs(w http.ResponseWriter, req *http.Request) { reader = gr } - chunks.Deserialize(reader, s.cs) + chunks.Deserialize(reader, s.ds) w.WriteHeader(http.StatusCreated) }) @@ -95,7 +114,7 @@ func (s *httpServer) handlePostRefs(w http.ResponseWriter, req *http.Request) { } } -func (s *httpServer) handleGetRefs(w http.ResponseWriter, req *http.Request) { +func (s *dataStoreServer) handleGetRefs(w http.ResponseWriter, req *http.Request) { err := d.Try(func() { d.Exp.Equal("POST", req.Method) @@ -119,7 +138,7 @@ func (s *httpServer) handleGetRefs(w http.ResponseWriter, req *http.Request) { sz := chunks.NewSerializer(writer) for _, r := range refs { - c := s.cs.Get(r) + c := s.ds.Get(r) if !c.IsEmpty() { sz.Put(c) } @@ -133,11 +152,11 @@ func (s *httpServer) handleGetRefs(w http.ResponseWriter, req *http.Request) { } } -func (s *httpServer) handleRoot(w http.ResponseWriter, req *http.Request) { +func (s *dataStoreServer) handleRoot(w http.ResponseWriter, req *http.Request) { err := d.Try(func() { switch req.Method { case "GET": - rootRef := s.cs.Root() + rootRef := s.ds.Root() fmt.Fprintf(w, "%v", rootRef.String()) w.Header().Add("content-type", "text/plain") @@ -150,7 +169,7 @@ func (s *httpServer) handleRoot(w http.ResponseWriter, req *http.Request) { d.Exp.Len(tokens, 1) current := ref.Parse(tokens[0]) - if !s.cs.UpdateRoot(current, last) { + if !s.ds.UpdateRoot(current, last) { w.WriteHeader(http.StatusConflict) return } @@ -163,7 +182,7 @@ func (s *httpServer) handleRoot(w http.ResponseWriter, req *http.Request) { } } -func (s *httpServer) connState(c net.Conn, cs http.ConnState) { +func (s *dataStoreServer) connState(c net.Conn, cs http.ConnState) { switch cs { case http.StateNew, http.StateActive, http.StateIdle: s.conns[c] = cs @@ -172,18 +191,18 @@ func (s *httpServer) connState(c net.Conn, cs http.ConnState) { } } -// Blocks while the server is listening. Running on a separate go routine is supported. -func (s *httpServer) Run() { +// Blocks while the dataStoreServer is listening. Running on a separate go routine is supported. +func (s *dataStoreServer) Run() { l, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port)) d.Chk.NoError(err) s.l = &l mux := http.NewServeMux() - mux.HandleFunc(refPath, http.HandlerFunc(s.handleRef)) - mux.HandleFunc(getRefsPath, http.HandlerFunc(s.handleGetRefs)) - mux.HandleFunc(postRefsPath, http.HandlerFunc(s.handlePostRefs)) - mux.HandleFunc(rootPath, http.HandlerFunc(s.handleRoot)) + mux.HandleFunc(constants.RefPath, http.HandlerFunc(s.handleRef)) + mux.HandleFunc(constants.GetRefsPath, http.HandlerFunc(s.handleGetRefs)) + mux.HandleFunc(constants.PostRefsPath, http.HandlerFunc(s.handlePostRefs)) + mux.HandleFunc(constants.RootPath, http.HandlerFunc(s.handleRoot)) srv := &http.Server{ Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -195,8 +214,8 @@ func (s *httpServer) Run() { srv.Serve(l) } -// Will cause the server to stop listening and an existing call to Run() to continue. -func (s *httpServer) Stop() { +// Will cause the dataStoreServer to stop listening and an existing call to Run() to continue. +func (s *dataStoreServer) Stop() { (*s.l).Close() for c, _ := range s.conns { c.Close() diff --git a/datas/local_datastore.go b/datas/local_datastore.go index 90755b1b51..d36c72cdcb 100644 --- a/datas/local_datastore.go +++ b/datas/local_datastore.go @@ -4,6 +4,7 @@ import ( "github.com/attic-labs/noms/chunks" "github.com/attic-labs/noms/ref" "github.com/attic-labs/noms/types" + "github.com/attic-labs/noms/walk" ) // DataStore provides versioned storage for noms values. Each DataStore instance represents one moment in history. Heads() returns the Commit from each active fork at that moment. The Commit() method returns a new DataStore, representing a new moment in history. @@ -20,13 +21,6 @@ func newLocalDataStore(cs chunks.ChunkStore) *LocalDataStore { return &LocalDataStore{dataStoreCommon{cs, commitFromRef(rootRef, cs)}} } -func newDataStoreInternal(cs chunks.ChunkStore) dataStoreCommon { - if (cs.Root() == ref.Ref{}) { - return dataStoreCommon{cs, nil} - } - return dataStoreCommon{cs, commitFromRef(cs.Root(), cs)} -} - func (lds *LocalDataStore) Commit(v types.Value) (DataStore, bool) { ok := lds.commit(v) return newLocalDataStore(lds.ChunkStore), ok @@ -36,3 +30,50 @@ func (lds *LocalDataStore) CommitWithParents(v types.Value, p SetOfCommit) (Data ok := lds.commitWithParents(v, p) return newLocalDataStore(lds.ChunkStore), ok } + +// Copys all chunks reachable from (and including) |r| but excluding all chunks reachable from (and including) |exclude| in |source| to |sink|. +func (lds *LocalDataStore) CopyReachableChunksP(r, exclude ref.Ref, sink chunks.ChunkSink, concurrency int) { + excludeRefs := map[ref.Ref]bool{} + hasRef := func(r ref.Ref) bool { + return excludeRefs[r] + } + + if exclude != (ref.Ref{}) { + refChan := make(chan ref.Ref, 1024) + addRef := func(r ref.Ref) { + refChan <- r + } + + go func() { + walk.AllP(exclude, lds, addRef, concurrency) + close(refChan) + }() + + for r := range refChan { + excludeRefs[r] = true + } + } + + tcs := &teeChunkSource{lds, sink} + walk.SomeP(r, tcs, hasRef, concurrency) +} + +// teeChunkSource just serves the purpose of writing to |sink| every chunk that is read from |source|. +type teeChunkSource struct { + source chunks.ChunkSource + sink chunks.ChunkSink +} + +func (trs *teeChunkSource) Get(ref ref.Ref) chunks.Chunk { + c := trs.source.Get(ref) + if c.IsEmpty() { + return c + } + + trs.sink.Put(c) + return c +} + +func (trs *teeChunkSource) Has(ref ref.Ref) bool { + return trs.source.Has(ref) +} diff --git a/datas/remote_datastore.go b/datas/remote_datastore.go new file mode 100644 index 0000000000..ee54be7c89 --- /dev/null +++ b/datas/remote_datastore.go @@ -0,0 +1,88 @@ +package datas + +import ( + "compress/gzip" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "path" + "strings" + + "github.com/attic-labs/noms/chunks" + "github.com/attic-labs/noms/constants" + "github.com/attic-labs/noms/d" + "github.com/attic-labs/noms/ref" + "github.com/attic-labs/noms/types" +) + +// DataStore provides versioned storage for noms values. Each DataStore instance represents one moment in history. Heads() returns the Commit from each active fork at that moment. The Commit() method returns a new DataStore, representing a new moment in history. +type RemoteDataStore struct { + dataStoreCommon +} + +func newRemoteDataStore(cs chunks.ChunkStore) *RemoteDataStore { + rootRef := cs.Root() + if rootRef == (ref.Ref{}) { + return &RemoteDataStore{dataStoreCommon{cs, nil}} + } + + return &RemoteDataStore{dataStoreCommon{cs, commitFromRef(rootRef, cs)}} +} + +func (lds *RemoteDataStore) host() *url.URL { + return lds.dataStoreCommon.ChunkStore.(*chunks.HttpStore).Host() +} + +func (lds *RemoteDataStore) Commit(v types.Value) (DataStore, bool) { + ok := lds.commit(v) + return newRemoteDataStore(lds.ChunkStore), ok +} + +func (lds *RemoteDataStore) CommitWithParents(v types.Value, p SetOfCommit) (DataStore, bool) { + ok := lds.commitWithParents(v, p) + return newRemoteDataStore(lds.ChunkStore), ok +} + +func (lds *RemoteDataStore) CopyReachableChunksP(r, exclude ref.Ref, cs chunks.ChunkSink, concurrency int) { + fmt.Println("Starting CopyReachableChunksP") + // POST http:///ref/sha1----?all=true&exclude=sha1----. Response will be chunk data if present, 404 if absent. + u := lds.host() + u.Path = path.Join(constants.RefPath, r.String()) + + values := &url.Values{} + values.Add("all", "true") + if exclude != (ref.Ref{}) { + values.Add("exclude", exclude.String()) + } + u.RawQuery = values.Encode() + + req, err := http.NewRequest("GET", u.String(), nil) + req.Header.Add("Accept-Encoding", "gzip") + d.Chk.NoError(err) + + res, err := http.DefaultClient.Do(req) + d.Chk.NoError(err) + defer closeResponse(res) + d.Chk.Equal(http.StatusOK, res.StatusCode, "Unexpected response: %s", http.StatusText(res.StatusCode)) + + reader := res.Body + if strings.Contains(res.Header.Get("Content-Encoding"), "gzip") { + gr, err := gzip.NewReader(reader) + d.Chk.NoError(err) + defer gr.Close() + reader = gr + } + + fmt.Println("Receiving") + chunks.Deserialize(reader, cs) + fmt.Println("Done") +} + +// In order for keep alive to work we must read to EOF on every response. We may want to add a timeout so that a server that left its connection open can't cause all of ports to be eaten up. +func closeResponse(res *http.Response) error { + data, err := ioutil.ReadAll(res.Body) + d.Chk.NoError(err) + d.Chk.Equal(0, len(data)) + return res.Body.Close() +} diff --git a/http/http_store_test.go b/datas/remote_datastore_tests.go similarity index 84% rename from http/http_store_test.go rename to datas/remote_datastore_tests.go index 05ef4e9e2a..7a44459f7f 100644 --- a/http/http_store_test.go +++ b/datas/remote_datastore_tests.go @@ -1,4 +1,4 @@ -package http +package datas import ( "net/http" @@ -14,12 +14,12 @@ func TestHttpStoreTestSuite(t *testing.T) { type HttpStoreTestSuite struct { chunks.ChunkStoreTestSuite - server *httpServer + server *dataStoreServer } func (suite *HttpStoreTestSuite) SetupTest() { - suite.Store = NewHttpClient("http://localhost:8000") - suite.server = NewHttpServer(chunks.NewMemoryStore(), 8000) + suite.Store = chunks.NewHttpStore("http://localhost:8000") + suite.server = NewDataStoreServer(NewDataStore(chunks.NewMemoryStore()), 8000) go suite.server.Run() // This call to a non-existing URL allows us to exit being sure that the server started. Otherwise, we sometimes get races with Stop() below. diff --git a/dataset/dataset.go b/dataset/dataset.go index 8652023e1b..c6b48dafa2 100644 --- a/dataset/dataset.go +++ b/dataset/dataset.go @@ -6,6 +6,7 @@ import ( "github.com/attic-labs/noms/d" "github.com/attic-labs/noms/datas" "github.com/attic-labs/noms/dataset/mgmt" + "github.com/attic-labs/noms/ref" "github.com/attic-labs/noms/types" ) @@ -59,6 +60,47 @@ func (ds *Dataset) CommitWithParents(v types.Value, p datas.SetOfCommit) (Datase return Dataset{store, ds.id}, ok } +func (ds *Dataset) Pull(source Dataset, concurrency int) Dataset { + sink := *ds + sourceHeadRef := source.Head().Ref() + sinkHeadRef := ref.Ref{} + if currentHead, ok := sink.MaybeHead(); ok { + sinkHeadRef = currentHead.Ref() + } + + if sourceHeadRef == sinkHeadRef { + return sink + } + + source.Store().CopyReachableChunksP(sourceHeadRef, sinkHeadRef, sink.Store(), concurrency) + for ok := false; !ok; sink, ok = sink.SetNewHead(sourceHeadRef) { + continue + } + + return sink +} + +func (ds *Dataset) validateRefAsCommit(r ref.Ref) datas.Commit { + v := types.ReadValue(r, ds.store) + + d.Exp.NotNil(v, "%v cannot be found", r) + + // TODO: Replace this weird recover stuff below once we have a way to determine if a Value is an instance of a custom struct type. BUG #133 + defer func() { + if r := recover(); r != nil { + d.Exp.Fail("Not a Commit:", "%+v", v) + } + }() + + return datas.CommitFromVal(v) +} + +// SetNewHead takes the Ref of the desired new Head of ds, the chunk for which should already exist in the Dataset. It validates that the Ref points to an existing chunk that decodes to the correct type of value and then commits it to ds, returning a new Dataset with newHeadRef set and ok set to true. In the event that the commit fails, ok is set to false and a new up-to-date Dataset is returned WITHOUT newHeadRef in it. The caller should try again using this new Dataset. +func (ds *Dataset) SetNewHead(newHeadRef ref.Ref) (Dataset, bool) { + commit := ds.validateRefAsCommit(newHeadRef) + return ds.CommitWithParents(commit.Value(), commit.Parents()) +} + func (ds *Dataset) Close() { ds.store.Close() } diff --git a/sync/pull_test.go b/dataset/pull_test.go similarity index 64% rename from sync/pull_test.go rename to dataset/pull_test.go index e125fcf3b0..ee42c371a2 100644 --- a/sync/pull_test.go +++ b/dataset/pull_test.go @@ -1,4 +1,4 @@ -package sync +package dataset import ( "testing" @@ -6,21 +6,19 @@ import ( "github.com/attic-labs/noms/Godeps/_workspace/src/github.com/stretchr/testify/assert" "github.com/attic-labs/noms/chunks" "github.com/attic-labs/noms/datas" - "github.com/attic-labs/noms/dataset" "github.com/attic-labs/noms/ref" "github.com/attic-labs/noms/types" ) -func createTestDataset(name string) dataset.Dataset { - t := chunks.NewTestStore() - return dataset.NewDataset(datas.NewDataStore(t), name) +func createTestDataset(name string) Dataset { + return NewDataset(datas.NewDataStore(chunks.NewTestStore()), name) } func TestValidateRef(t *testing.T) { - cs := chunks.NewTestStore() - r := types.WriteValue(types.Bool(true), cs) + ds := createTestDataset("test") + r := types.WriteValue(types.Bool(true), ds.Store()) - assert.Panics(t, func() { validateRefAsCommit(r, cs) }) + assert.Panics(t, func() { ds.validateRefAsCommit(r) }) } func TestPull(t *testing.T) { @@ -52,8 +50,7 @@ func TestPull(t *testing.T) { source, ok = source.Commit(updatedValue) assert.True(ok) - CopyReachableChunksP(source.Head().Ref(), sink.Head().Ref(), source.Store(), sink.Store(), 1) - sink, ok = SetNewHead(source.Head().Ref(), sink) + sink = sink.Pull(source, 1) assert.True(ok) assert.True(source.Head().Equals(sink.Head())) } @@ -71,24 +68,13 @@ func TestPullFirstCommit(t *testing.T) { source, ok := source.Commit(initialValue) assert.True(ok) - sinkHeadRef := func() ref.Ref { - head, ok := sink.MaybeHead() - if ok { - return head.Ref() - } - return ref.Ref{} - }() - - CopyReachableChunksP(source.Head().Ref(), sinkHeadRef, source.Store(), sink.Store(), 1) - CopyReachableChunksP(source.Head().Ref(), sinkHeadRef, source.Store(), sink.Store(), 1) - sink, ok = SetNewHead(source.Head().Ref(), sink) - assert.True(ok) + sink = sink.Pull(source, 1) assert.True(source.Head().Equals(sink.Head())) } func TestFailedCopyChunks(t *testing.T) { - cs := chunks.NewMemoryStore() + ds := createTestDataset("test") r := ref.Parse("sha1-0000000000000000000000000000000000000000") - assert.Panics(t, func() { CopyReachableChunksP(r, ref.Ref{}, cs, cs, 1) }) + assert.Panics(t, func() { ds.Store().CopyReachableChunksP(r, ref.Ref{}, ds.Store(), 1) }) } diff --git a/sync/pull.go b/sync/pull.go deleted file mode 100644 index 98782120e6..0000000000 --- a/sync/pull.go +++ /dev/null @@ -1,78 +0,0 @@ -package sync - -import ( - "github.com/attic-labs/noms/chunks" - "github.com/attic-labs/noms/d" - "github.com/attic-labs/noms/datas" - "github.com/attic-labs/noms/dataset" - "github.com/attic-labs/noms/ref" - "github.com/attic-labs/noms/types" - "github.com/attic-labs/noms/walk" -) - -func validateRefAsCommit(r ref.Ref, cs chunks.ChunkSource) datas.Commit { - v := types.ReadValue(r, cs) - - d.Exp.NotNil(v, "%v cannot be found", r) - - // TODO: Replace this weird recover stuff below once we have a way to determine if a Value is an instance of a custom struct type. BUG #133 - defer func() { - if r := recover(); r != nil { - d.Exp.Fail("Not a Commit:", "%+v", v) - } - }() - return datas.CommitFromVal(v) -} - -// SetNewHead takes the Ref of the desired new Head of ds, the chunk for which should already exist in the Dataset. It validates that the Ref points to an existing chunk that decodes to the correct type of value and then commits it to ds, returning a new Dataset with newHeadRef set and ok set to true. In the event that the commit fails, ok is set to false and a new up-to-date Dataset is returned WITHOUT newHeadRef in it. The caller should try again using this new Dataset. -func SetNewHead(newHeadRef ref.Ref, ds dataset.Dataset) (dataset.Dataset, bool) { - commit := validateRefAsCommit(newHeadRef, ds.Store()) - return ds.CommitWithParents(commit.Value(), commit.Parents()) -} - -// Copys all chunks reachable from (and including) |r| but excluding all chunks reachable from (and including) |exclude| in |source| to |sink|. -func CopyReachableChunksP(r, exclude ref.Ref, source chunks.ChunkSource, sink chunks.ChunkSink, concurrency int) { - excludeRefs := map[ref.Ref]bool{} - hasRef := func(r ref.Ref) bool { - return excludeRefs[r] - } - - if exclude != (ref.Ref{}) { - refChan := make(chan ref.Ref, 1024) - addRef := func(r ref.Ref) { - refChan <- r - } - - go func() { - walk.AllP(exclude, source, addRef, concurrency) - close(refChan) - }() - - for r := range refChan { - excludeRefs[r] = true - } - } - - tcs := &teeChunkSource{source, sink} - walk.SomeP(r, tcs, hasRef, concurrency) -} - -// teeChunkSource just serves the purpose of writing to |sink| every chunk that is read from |source|. -type teeChunkSource struct { - source chunks.ChunkSource - sink chunks.ChunkSink -} - -func (trs *teeChunkSource) Get(ref ref.Ref) chunks.Chunk { - c := trs.source.Get(ref) - if c.IsEmpty() { - return c - } - - trs.sink.Put(c) - return c -} - -func (trs *teeChunkSource) Has(ref ref.Ref) bool { - return trs.source.Has(ref) -}