RemoteDataStore / CopyReachableChunksP

This commit is contained in:
Rafael Weinstein
2015-09-24 11:06:50 -07:00
parent 670fbc4b06
commit ec8461e8fd
13 changed files with 308 additions and 210 deletions
+40 -36
View File
@@ -1,4 +1,4 @@
package http
package chunks
import (
"bytes"
@@ -12,7 +12,7 @@ import (
"strings"
"sync"
"github.com/attic-labs/noms/chunks"
"github.com/attic-labs/noms/constants"
"github.com/attic-labs/noms/d"
"github.com/attic-labs/noms/ref"
)
@@ -27,13 +27,13 @@ const (
type readRequest struct {
r ref.Ref
ch chan chunks.Chunk
ch chan Chunk
}
// readBatch represents a set of queued read requests, each of which are blocking on a receive channel for a response.
type readBatch map[ref.Ref][]chan chunks.Chunk
type readBatch map[ref.Ref][]chan Chunk
func (rrg *readBatch) Put(c chunks.Chunk) {
func (rrg *readBatch) Put(c Chunk) {
for _, ch := range (*rrg)[c.Ref()] {
ch <- c
}
@@ -45,30 +45,30 @@ func (rrg *readBatch) Put(c chunks.Chunk) {
func (rrq *readBatch) Close() error {
for _, chs := range *rrq {
for _, ch := range chs {
ch <- chunks.EmptyChunk
ch <- EmptyChunk
}
}
return nil
}
type HttpClient struct {
type HttpStore struct {
host *url.URL
readQueue chan readRequest
writeQueue chan chunks.Chunk
writeQueue chan Chunk
wg *sync.WaitGroup
written map[ref.Ref]bool
wmu *sync.Mutex
}
func NewHttpClient(host string) *HttpClient {
func NewHttpStore(host string) *HttpStore {
u, err := url.Parse(host)
d.Exp.NoError(err)
d.Exp.True(u.Scheme == "http" || u.Scheme == "https")
d.Exp.Equal(*u, url.URL{Scheme: u.Scheme, Host: u.Host})
client := &HttpClient{
client := &HttpStore{
u,
make(chan readRequest, readBufferSize),
make(chan chunks.Chunk, writeBufferSize),
make(chan Chunk, writeBufferSize),
&sync.WaitGroup{},
map[ref.Ref]bool{},
&sync.Mutex{},
@@ -85,13 +85,17 @@ func NewHttpClient(host string) *HttpClient {
return client
}
func (c *HttpClient) Get(r ref.Ref) chunks.Chunk {
ch := make(chan chunks.Chunk)
func (c *HttpStore) Host() *url.URL {
return c.host
}
func (c *HttpStore) Get(r ref.Ref) Chunk {
ch := make(chan Chunk)
c.readQueue <- readRequest{r, ch}
return <-ch
}
func (c *HttpClient) sendReadRequests() {
func (c *HttpStore) sendReadRequests() {
for req := range c.readQueue {
reqs := readBatch{}
refs := map[ref.Ref]bool{}
@@ -117,7 +121,7 @@ func (c *HttpClient) sendReadRequests() {
}
}
func (c *HttpClient) Has(ref ref.Ref) bool {
func (c *HttpStore) Has(ref ref.Ref) bool {
// HEAD http://<host>/ref/<sha1-xxx>. Response will be 200 if present, 404 if absent.
res := c.requestRef(ref, "HEAD", nil)
defer closeResponse(res)
@@ -126,7 +130,7 @@ func (c *HttpClient) Has(ref ref.Ref) bool {
return res.StatusCode == http.StatusOK
}
func (c *HttpClient) Put(chunk chunks.Chunk) {
func (c *HttpStore) Put(chunk Chunk) {
// POST http://<host>/ref/. Body is a serialized chunkBuffer. Response will be 201.
c.wmu.Lock()
defer c.wmu.Unlock()
@@ -139,9 +143,9 @@ func (c *HttpClient) Put(chunk chunks.Chunk) {
c.writeQueue <- chunk
}
func (c *HttpClient) sendWriteRequests() {
func (c *HttpStore) sendWriteRequests() {
for chunk := range c.writeQueue {
chs := make([]chunks.Chunk, 0)
chs := make([]Chunk, 0)
chs = append(chs, chunk)
loop:
@@ -161,10 +165,10 @@ func (c *HttpClient) sendWriteRequests() {
}
}
func (c *HttpClient) postRefs(chs []chunks.Chunk) {
func (c *HttpStore) postRefs(chs []Chunk) {
body := &bytes.Buffer{}
gw := gzip.NewWriter(body)
sz := chunks.NewSerializer(gw)
sz := NewSerializer(gw)
for _, chunk := range chs {
sz.Put(chunk)
}
@@ -172,7 +176,7 @@ func (c *HttpClient) postRefs(chs []chunks.Chunk) {
gw.Close()
url := *c.host
url.Path = postRefsPath
url.Path = constants.PostRefsPath
req, err := http.NewRequest("POST", url.String(), body)
d.Chk.NoError(err)
@@ -186,9 +190,9 @@ func (c *HttpClient) postRefs(chs []chunks.Chunk) {
closeResponse(res)
}
func (c *HttpClient) requestRef(r ref.Ref, method string, body io.Reader) *http.Response {
func (c *HttpStore) requestRef(r ref.Ref, method string, body io.Reader) *http.Response {
url := *c.host
url.Path = refPath
url.Path = constants.RefPath
if (r != ref.Ref{}) {
url.Path = path.Join(url.Path, r.String())
}
@@ -205,10 +209,10 @@ func (c *HttpClient) requestRef(r ref.Ref, method string, body io.Reader) *http.
return res
}
func (c *HttpClient) getRefs(refs map[ref.Ref]bool, cs chunks.ChunkSink) {
func (c *HttpStore) getRefs(refs map[ref.Ref]bool, cs ChunkSink) {
// POST http://<host>/getRefs/. Post body: ref=sha1---&ref=sha1---& Response will be chunk data if present, 404 if absent.
u := *c.host
u.Path = getRefsPath
u.Path = constants.GetRefsPath
values := &url.Values{}
for r, _ := range refs {
values.Add("ref", r.String())
@@ -232,10 +236,10 @@ func (c *HttpClient) getRefs(refs map[ref.Ref]bool, cs chunks.ChunkSink) {
reader = gr
}
chunks.Deserialize(reader, cs)
Deserialize(reader, cs)
}
func (c *HttpClient) Root() ref.Ref {
func (c *HttpStore) Root() ref.Ref {
// GET http://<host>/root. Response will be ref of root.
res := c.requestRoot("GET", ref.Ref{}, ref.Ref{})
defer closeResponse(res)
@@ -246,7 +250,7 @@ func (c *HttpClient) Root() ref.Ref {
return ref.Parse(string(data))
}
func (c *HttpClient) UpdateRoot(current, last ref.Ref) bool {
func (c *HttpStore) UpdateRoot(current, last ref.Ref) bool {
// POST http://<host>root?current=<ref>&last=<ref>. Response will be 200 on success, 409 if current is outdated.
c.wg.Wait()
@@ -261,9 +265,9 @@ func (c *HttpClient) UpdateRoot(current, last ref.Ref) bool {
return res.StatusCode == http.StatusOK
}
func (c *HttpClient) requestRoot(method string, current, last ref.Ref) *http.Response {
func (c *HttpStore) requestRoot(method string, current, last ref.Ref) *http.Response {
u := *c.host
u.Path = rootPath
u.Path = constants.RootPath
if method == "POST" {
d.Exp.True(current != ref.Ref{})
params := url.Values{}
@@ -281,7 +285,7 @@ func (c *HttpClient) requestRoot(method string, current, last ref.Ref) *http.Res
return res
}
func (c *HttpClient) Close() error {
func (c *HttpStore) Close() error {
c.wg.Wait()
close(c.readQueue)
close(c.writeQueue)
@@ -296,20 +300,20 @@ func closeResponse(res *http.Response) error {
return res.Body.Close()
}
type Flags struct {
type HttpStoreFlags struct {
host *string
}
func NewFlagsWithPrefix(prefix string) Flags {
return Flags{
func HttpFlags(prefix string) HttpStoreFlags {
return HttpStoreFlags{
flag.String(prefix+"h", "", "http host to connect to"),
}
}
func (h Flags) CreateClient() *HttpClient {
func (h HttpStoreFlags) CreateStore() ChunkStore {
if *h.host == "" {
return nil
} else {
return NewHttpClient(*h.host)
return NewHttpStore(*h.host)
}
}
+2 -3
View File
@@ -167,9 +167,8 @@ func main() {
if util.MaybeStartCPUProfile() {
defer util.StopCPUProfile()
}
dataStore := datas.NewDataStore(ds)
inputDataset := dataset.NewDataset(dataStore, *inputID)
outputDataset := dataset.NewDataset(dataStore, *outputID)
inputDataset := dataset.NewDataset(ds, *inputID)
outputDataset := dataset.NewDataset(ds, *outputID)
input := types.ListFromVal(inputDataset.Head().Value())
output := getIndex(input).NomsValue()
+1 -2
View File
@@ -9,7 +9,6 @@ import (
"github.com/attic-labs/noms/clients/util"
"github.com/attic-labs/noms/d"
"github.com/attic-labs/noms/datas"
"github.com/attic-labs/noms/http"
)
var (
@@ -25,7 +24,7 @@ func main() {
return
}
server := http.NewHttpServer(ds, *port)
server := datas.NewDataStoreServer(ds, *port)
// Shutdown server gracefully so that profile may be written
c := make(chan os.Signal, 1)
+1 -16
View File
@@ -8,8 +8,6 @@ import (
"github.com/attic-labs/noms/clients/util"
"github.com/attic-labs/noms/d"
"github.com/attic-labs/noms/dataset"
"github.com/attic-labs/noms/ref"
"github.com/attic-labs/noms/sync"
)
var (
@@ -38,20 +36,7 @@ func main() {
defer util.StopCPUProfile()
}
sourceHeadRef := source.Head().Ref()
sinkHeadRef := ref.Ref{}
if currentHead, ok := sink.MaybeHead(); ok {
sinkHeadRef = currentHead.Ref()
}
if sourceHeadRef == sinkHeadRef {
return
}
sync.CopyReachableChunksP(sourceHeadRef, sinkHeadRef, source.Store(), sink.Store(), int(*p))
for ok := false; !ok; *sink, ok = sync.SetNewHead(sourceHeadRef, *sink) {
continue
}
*sink = sink.Pull(*source, int(*p))
util.MaybeWriteMemProfile()
})
+8
View File
@@ -0,0 +1,8 @@
package constants
const (
RootPath = "/root/"
RefPath = "/ref/"
GetRefsPath = "/getRefs/"
PostRefsPath = "/postRefs/"
)
+12 -7
View File
@@ -2,7 +2,7 @@ package datas
import (
"github.com/attic-labs/noms/chunks"
"github.com/attic-labs/noms/http"
"github.com/attic-labs/noms/ref"
"github.com/attic-labs/noms/types"
)
@@ -21,6 +21,8 @@ type DataStore interface {
// CommitWithParents updates the commit that a datastore points at. The new Commit is constructed using v and p. If the update cannot be performed, e.g., because of a conflict, CommitWithParents returns 'false'. The newest snapshot of the datastore is always returned.
CommitWithParents(v types.Value, p SetOfCommit) (DataStore, bool)
CopyReachableChunksP(r, exclude ref.Ref, sink chunks.ChunkSink, concurrency int)
}
func NewDataStore(cs chunks.ChunkStore) DataStore {
@@ -31,7 +33,7 @@ type Flags struct {
ldb chunks.LevelDBStoreFlags
memory chunks.MemoryStoreFlags
nop chunks.NopStoreFlags
hflags http.Flags
hflags chunks.HttpStoreFlags
}
func NewFlags() Flags {
@@ -43,7 +45,7 @@ func NewFlagsWithPrefix(prefix string) Flags {
chunks.LevelDBFlags(prefix),
chunks.MemoryFlags(prefix),
chunks.NopFlags(prefix),
http.NewFlagsWithPrefix(prefix),
chunks.HttpFlags(prefix),
}
}
@@ -52,12 +54,15 @@ func (f Flags) CreateDataStore() (DataStore, bool) {
if cs = f.ldb.CreateStore(); cs != nil {
} else if cs = f.memory.CreateStore(); cs != nil {
} else if cs = f.nop.CreateStore(); cs != nil {
} else if cs = f.hflags.CreateClient(); cs != nil {
}
if cs == nil {
return &LocalDataStore{}, false
if cs != nil {
return newLocalDataStore(cs), true
}
return newLocalDataStore(cs), true
if cs = f.hflags.CreateStore(); cs != nil {
return newRemoteDataStore(cs), true
}
return &LocalDataStore{}, false
}
@@ -1,4 +1,4 @@
package http
package datas
import (
"bytes"
@@ -10,32 +10,46 @@ import (
"strings"
"github.com/attic-labs/noms/chunks"
"github.com/attic-labs/noms/constants"
"github.com/attic-labs/noms/d"
"github.com/attic-labs/noms/ref"
)
const (
rootPath = "/root/"
refPath = "/ref/"
getRefsPath = "/getRefs/"
postRefsPath = "/postRefs/"
maxConcurrentPuts = 64
)
type httpServer struct {
cs chunks.ChunkStore
type dataStoreServer struct {
ds DataStore
port int
l *net.Listener
conns map[net.Conn]http.ConnState
}
func NewHttpServer(cs chunks.ChunkStore, port int) *httpServer {
return &httpServer{
cs, port, nil, map[net.Conn]http.ConnState{},
func NewDataStoreServer(ds DataStore, port int) *dataStoreServer {
return &dataStoreServer{
ds, port, nil, map[net.Conn]http.ConnState{},
}
}
func (s *httpServer) handleRef(w http.ResponseWriter, req *http.Request) {
func (s *dataStoreServer) handleGetReachable(r ref.Ref, w http.ResponseWriter, req *http.Request) {
excludeRef := ref.Ref{}
exclude := req.URL.Query().Get("exclude")
if exclude != "" {
excludeRef = ref.Parse(exclude)
}
w.Header().Add("Content-Type", "application/octet-stream")
writer := w.(io.Writer)
if strings.Contains(req.Header.Get("Accept-Encoding"), "gzip") {
w.Header().Add("Content-Encoding", "gzip")
gw := gzip.NewWriter(w)
defer gw.Close()
writer = gw
}
sz := chunks.NewSerializer(writer)
s.ds.CopyReachableChunksP(r, excludeRef, sz, 512)
sz.Close()
}
func (s *dataStoreServer) handleRef(w http.ResponseWriter, req *http.Request) {
err := d.Try(func() {
refStr := ""
pathParts := strings.Split(req.URL.Path[1:], "/")
@@ -46,7 +60,12 @@ func (s *httpServer) handleRef(w http.ResponseWriter, req *http.Request) {
switch req.Method {
case "GET":
chunk := s.cs.Get(r)
all := req.URL.Query().Get("all")
if all == "true" {
s.handleGetReachable(r, w, req)
return
}
chunk := s.ds.Get(r)
if chunk.IsEmpty() {
w.WriteHeader(http.StatusNotFound)
return
@@ -58,7 +77,7 @@ func (s *httpServer) handleRef(w http.ResponseWriter, req *http.Request) {
w.Header().Add("Cache-Control", "max-age=31536000") // 1 year
case "HEAD":
if !s.cs.Has(r) {
if !s.ds.Has(r) {
w.WriteHeader(http.StatusNotFound)
return
}
@@ -73,7 +92,7 @@ func (s *httpServer) handleRef(w http.ResponseWriter, req *http.Request) {
}
}
func (s *httpServer) handlePostRefs(w http.ResponseWriter, req *http.Request) {
func (s *dataStoreServer) handlePostRefs(w http.ResponseWriter, req *http.Request) {
err := d.Try(func() {
d.Exp.Equal("POST", req.Method)
@@ -85,7 +104,7 @@ func (s *httpServer) handlePostRefs(w http.ResponseWriter, req *http.Request) {
reader = gr
}
chunks.Deserialize(reader, s.cs)
chunks.Deserialize(reader, s.ds)
w.WriteHeader(http.StatusCreated)
})
@@ -95,7 +114,7 @@ func (s *httpServer) handlePostRefs(w http.ResponseWriter, req *http.Request) {
}
}
func (s *httpServer) handleGetRefs(w http.ResponseWriter, req *http.Request) {
func (s *dataStoreServer) handleGetRefs(w http.ResponseWriter, req *http.Request) {
err := d.Try(func() {
d.Exp.Equal("POST", req.Method)
@@ -119,7 +138,7 @@ func (s *httpServer) handleGetRefs(w http.ResponseWriter, req *http.Request) {
sz := chunks.NewSerializer(writer)
for _, r := range refs {
c := s.cs.Get(r)
c := s.ds.Get(r)
if !c.IsEmpty() {
sz.Put(c)
}
@@ -133,11 +152,11 @@ func (s *httpServer) handleGetRefs(w http.ResponseWriter, req *http.Request) {
}
}
func (s *httpServer) handleRoot(w http.ResponseWriter, req *http.Request) {
func (s *dataStoreServer) handleRoot(w http.ResponseWriter, req *http.Request) {
err := d.Try(func() {
switch req.Method {
case "GET":
rootRef := s.cs.Root()
rootRef := s.ds.Root()
fmt.Fprintf(w, "%v", rootRef.String())
w.Header().Add("content-type", "text/plain")
@@ -150,7 +169,7 @@ func (s *httpServer) handleRoot(w http.ResponseWriter, req *http.Request) {
d.Exp.Len(tokens, 1)
current := ref.Parse(tokens[0])
if !s.cs.UpdateRoot(current, last) {
if !s.ds.UpdateRoot(current, last) {
w.WriteHeader(http.StatusConflict)
return
}
@@ -163,7 +182,7 @@ func (s *httpServer) handleRoot(w http.ResponseWriter, req *http.Request) {
}
}
func (s *httpServer) connState(c net.Conn, cs http.ConnState) {
func (s *dataStoreServer) connState(c net.Conn, cs http.ConnState) {
switch cs {
case http.StateNew, http.StateActive, http.StateIdle:
s.conns[c] = cs
@@ -172,18 +191,18 @@ func (s *httpServer) connState(c net.Conn, cs http.ConnState) {
}
}
// Blocks while the server is listening. Running on a separate go routine is supported.
func (s *httpServer) Run() {
// Blocks while the dataStoreServer is listening. Running on a separate go routine is supported.
func (s *dataStoreServer) Run() {
l, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port))
d.Chk.NoError(err)
s.l = &l
mux := http.NewServeMux()
mux.HandleFunc(refPath, http.HandlerFunc(s.handleRef))
mux.HandleFunc(getRefsPath, http.HandlerFunc(s.handleGetRefs))
mux.HandleFunc(postRefsPath, http.HandlerFunc(s.handlePostRefs))
mux.HandleFunc(rootPath, http.HandlerFunc(s.handleRoot))
mux.HandleFunc(constants.RefPath, http.HandlerFunc(s.handleRef))
mux.HandleFunc(constants.GetRefsPath, http.HandlerFunc(s.handleGetRefs))
mux.HandleFunc(constants.PostRefsPath, http.HandlerFunc(s.handlePostRefs))
mux.HandleFunc(constants.RootPath, http.HandlerFunc(s.handleRoot))
srv := &http.Server{
Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -195,8 +214,8 @@ func (s *httpServer) Run() {
srv.Serve(l)
}
// Will cause the server to stop listening and an existing call to Run() to continue.
func (s *httpServer) Stop() {
// Will cause the dataStoreServer to stop listening and an existing call to Run() to continue.
func (s *dataStoreServer) Stop() {
(*s.l).Close()
for c, _ := range s.conns {
c.Close()
+48 -7
View File
@@ -4,6 +4,7 @@ import (
"github.com/attic-labs/noms/chunks"
"github.com/attic-labs/noms/ref"
"github.com/attic-labs/noms/types"
"github.com/attic-labs/noms/walk"
)
// DataStore provides versioned storage for noms values. Each DataStore instance represents one moment in history. Heads() returns the Commit from each active fork at that moment. The Commit() method returns a new DataStore, representing a new moment in history.
@@ -20,13 +21,6 @@ func newLocalDataStore(cs chunks.ChunkStore) *LocalDataStore {
return &LocalDataStore{dataStoreCommon{cs, commitFromRef(rootRef, cs)}}
}
func newDataStoreInternal(cs chunks.ChunkStore) dataStoreCommon {
if (cs.Root() == ref.Ref{}) {
return dataStoreCommon{cs, nil}
}
return dataStoreCommon{cs, commitFromRef(cs.Root(), cs)}
}
func (lds *LocalDataStore) Commit(v types.Value) (DataStore, bool) {
ok := lds.commit(v)
return newLocalDataStore(lds.ChunkStore), ok
@@ -36,3 +30,50 @@ func (lds *LocalDataStore) CommitWithParents(v types.Value, p SetOfCommit) (Data
ok := lds.commitWithParents(v, p)
return newLocalDataStore(lds.ChunkStore), ok
}
// Copys all chunks reachable from (and including) |r| but excluding all chunks reachable from (and including) |exclude| in |source| to |sink|.
func (lds *LocalDataStore) CopyReachableChunksP(r, exclude ref.Ref, sink chunks.ChunkSink, concurrency int) {
excludeRefs := map[ref.Ref]bool{}
hasRef := func(r ref.Ref) bool {
return excludeRefs[r]
}
if exclude != (ref.Ref{}) {
refChan := make(chan ref.Ref, 1024)
addRef := func(r ref.Ref) {
refChan <- r
}
go func() {
walk.AllP(exclude, lds, addRef, concurrency)
close(refChan)
}()
for r := range refChan {
excludeRefs[r] = true
}
}
tcs := &teeChunkSource{lds, sink}
walk.SomeP(r, tcs, hasRef, concurrency)
}
// teeChunkSource just serves the purpose of writing to |sink| every chunk that is read from |source|.
type teeChunkSource struct {
source chunks.ChunkSource
sink chunks.ChunkSink
}
func (trs *teeChunkSource) Get(ref ref.Ref) chunks.Chunk {
c := trs.source.Get(ref)
if c.IsEmpty() {
return c
}
trs.sink.Put(c)
return c
}
func (trs *teeChunkSource) Has(ref ref.Ref) bool {
return trs.source.Has(ref)
}
+88
View File
@@ -0,0 +1,88 @@
package datas
import (
"compress/gzip"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"path"
"strings"
"github.com/attic-labs/noms/chunks"
"github.com/attic-labs/noms/constants"
"github.com/attic-labs/noms/d"
"github.com/attic-labs/noms/ref"
"github.com/attic-labs/noms/types"
)
// DataStore provides versioned storage for noms values. Each DataStore instance represents one moment in history. Heads() returns the Commit from each active fork at that moment. The Commit() method returns a new DataStore, representing a new moment in history.
type RemoteDataStore struct {
dataStoreCommon
}
func newRemoteDataStore(cs chunks.ChunkStore) *RemoteDataStore {
rootRef := cs.Root()
if rootRef == (ref.Ref{}) {
return &RemoteDataStore{dataStoreCommon{cs, nil}}
}
return &RemoteDataStore{dataStoreCommon{cs, commitFromRef(rootRef, cs)}}
}
func (lds *RemoteDataStore) host() *url.URL {
return lds.dataStoreCommon.ChunkStore.(*chunks.HttpStore).Host()
}
func (lds *RemoteDataStore) Commit(v types.Value) (DataStore, bool) {
ok := lds.commit(v)
return newRemoteDataStore(lds.ChunkStore), ok
}
func (lds *RemoteDataStore) CommitWithParents(v types.Value, p SetOfCommit) (DataStore, bool) {
ok := lds.commitWithParents(v, p)
return newRemoteDataStore(lds.ChunkStore), ok
}
func (lds *RemoteDataStore) CopyReachableChunksP(r, exclude ref.Ref, cs chunks.ChunkSink, concurrency int) {
fmt.Println("Starting CopyReachableChunksP")
// POST http://<host>/ref/sha1----?all=true&exclude=sha1----. Response will be chunk data if present, 404 if absent.
u := lds.host()
u.Path = path.Join(constants.RefPath, r.String())
values := &url.Values{}
values.Add("all", "true")
if exclude != (ref.Ref{}) {
values.Add("exclude", exclude.String())
}
u.RawQuery = values.Encode()
req, err := http.NewRequest("GET", u.String(), nil)
req.Header.Add("Accept-Encoding", "gzip")
d.Chk.NoError(err)
res, err := http.DefaultClient.Do(req)
d.Chk.NoError(err)
defer closeResponse(res)
d.Chk.Equal(http.StatusOK, res.StatusCode, "Unexpected response: %s", http.StatusText(res.StatusCode))
reader := res.Body
if strings.Contains(res.Header.Get("Content-Encoding"), "gzip") {
gr, err := gzip.NewReader(reader)
d.Chk.NoError(err)
defer gr.Close()
reader = gr
}
fmt.Println("Receiving")
chunks.Deserialize(reader, cs)
fmt.Println("Done")
}
// In order for keep alive to work we must read to EOF on every response. We may want to add a timeout so that a server that left its connection open can't cause all of ports to be eaten up.
func closeResponse(res *http.Response) error {
data, err := ioutil.ReadAll(res.Body)
d.Chk.NoError(err)
d.Chk.Equal(0, len(data))
return res.Body.Close()
}
@@ -1,4 +1,4 @@
package http
package datas
import (
"net/http"
@@ -14,12 +14,12 @@ func TestHttpStoreTestSuite(t *testing.T) {
type HttpStoreTestSuite struct {
chunks.ChunkStoreTestSuite
server *httpServer
server *dataStoreServer
}
func (suite *HttpStoreTestSuite) SetupTest() {
suite.Store = NewHttpClient("http://localhost:8000")
suite.server = NewHttpServer(chunks.NewMemoryStore(), 8000)
suite.Store = chunks.NewHttpStore("http://localhost:8000")
suite.server = NewDataStoreServer(NewDataStore(chunks.NewMemoryStore()), 8000)
go suite.server.Run()
// This call to a non-existing URL allows us to exit being sure that the server started. Otherwise, we sometimes get races with Stop() below.
+42
View File
@@ -6,6 +6,7 @@ import (
"github.com/attic-labs/noms/d"
"github.com/attic-labs/noms/datas"
"github.com/attic-labs/noms/dataset/mgmt"
"github.com/attic-labs/noms/ref"
"github.com/attic-labs/noms/types"
)
@@ -59,6 +60,47 @@ func (ds *Dataset) CommitWithParents(v types.Value, p datas.SetOfCommit) (Datase
return Dataset{store, ds.id}, ok
}
func (ds *Dataset) Pull(source Dataset, concurrency int) Dataset {
sink := *ds
sourceHeadRef := source.Head().Ref()
sinkHeadRef := ref.Ref{}
if currentHead, ok := sink.MaybeHead(); ok {
sinkHeadRef = currentHead.Ref()
}
if sourceHeadRef == sinkHeadRef {
return sink
}
source.Store().CopyReachableChunksP(sourceHeadRef, sinkHeadRef, sink.Store(), concurrency)
for ok := false; !ok; sink, ok = sink.SetNewHead(sourceHeadRef) {
continue
}
return sink
}
func (ds *Dataset) validateRefAsCommit(r ref.Ref) datas.Commit {
v := types.ReadValue(r, ds.store)
d.Exp.NotNil(v, "%v cannot be found", r)
// TODO: Replace this weird recover stuff below once we have a way to determine if a Value is an instance of a custom struct type. BUG #133
defer func() {
if r := recover(); r != nil {
d.Exp.Fail("Not a Commit:", "%+v", v)
}
}()
return datas.CommitFromVal(v)
}
// SetNewHead takes the Ref of the desired new Head of ds, the chunk for which should already exist in the Dataset. It validates that the Ref points to an existing chunk that decodes to the correct type of value and then commits it to ds, returning a new Dataset with newHeadRef set and ok set to true. In the event that the commit fails, ok is set to false and a new up-to-date Dataset is returned WITHOUT newHeadRef in it. The caller should try again using this new Dataset.
func (ds *Dataset) SetNewHead(newHeadRef ref.Ref) (Dataset, bool) {
commit := ds.validateRefAsCommit(newHeadRef)
return ds.CommitWithParents(commit.Value(), commit.Parents())
}
func (ds *Dataset) Close() {
ds.store.Close()
}
+10 -24
View File
@@ -1,4 +1,4 @@
package sync
package dataset
import (
"testing"
@@ -6,21 +6,19 @@ import (
"github.com/attic-labs/noms/Godeps/_workspace/src/github.com/stretchr/testify/assert"
"github.com/attic-labs/noms/chunks"
"github.com/attic-labs/noms/datas"
"github.com/attic-labs/noms/dataset"
"github.com/attic-labs/noms/ref"
"github.com/attic-labs/noms/types"
)
func createTestDataset(name string) dataset.Dataset {
t := chunks.NewTestStore()
return dataset.NewDataset(datas.NewDataStore(t), name)
func createTestDataset(name string) Dataset {
return NewDataset(datas.NewDataStore(chunks.NewTestStore()), name)
}
func TestValidateRef(t *testing.T) {
cs := chunks.NewTestStore()
r := types.WriteValue(types.Bool(true), cs)
ds := createTestDataset("test")
r := types.WriteValue(types.Bool(true), ds.Store())
assert.Panics(t, func() { validateRefAsCommit(r, cs) })
assert.Panics(t, func() { ds.validateRefAsCommit(r) })
}
func TestPull(t *testing.T) {
@@ -52,8 +50,7 @@ func TestPull(t *testing.T) {
source, ok = source.Commit(updatedValue)
assert.True(ok)
CopyReachableChunksP(source.Head().Ref(), sink.Head().Ref(), source.Store(), sink.Store(), 1)
sink, ok = SetNewHead(source.Head().Ref(), sink)
sink = sink.Pull(source, 1)
assert.True(ok)
assert.True(source.Head().Equals(sink.Head()))
}
@@ -71,24 +68,13 @@ func TestPullFirstCommit(t *testing.T) {
source, ok := source.Commit(initialValue)
assert.True(ok)
sinkHeadRef := func() ref.Ref {
head, ok := sink.MaybeHead()
if ok {
return head.Ref()
}
return ref.Ref{}
}()
CopyReachableChunksP(source.Head().Ref(), sinkHeadRef, source.Store(), sink.Store(), 1)
CopyReachableChunksP(source.Head().Ref(), sinkHeadRef, source.Store(), sink.Store(), 1)
sink, ok = SetNewHead(source.Head().Ref(), sink)
assert.True(ok)
sink = sink.Pull(source, 1)
assert.True(source.Head().Equals(sink.Head()))
}
func TestFailedCopyChunks(t *testing.T) {
cs := chunks.NewMemoryStore()
ds := createTestDataset("test")
r := ref.Parse("sha1-0000000000000000000000000000000000000000")
assert.Panics(t, func() { CopyReachableChunksP(r, ref.Ref{}, cs, cs, 1) })
assert.Panics(t, func() { ds.Store().CopyReachableChunksP(r, ref.Ref{}, ds.Store(), 1) })
}
-78
View File
@@ -1,78 +0,0 @@
package sync
import (
"github.com/attic-labs/noms/chunks"
"github.com/attic-labs/noms/d"
"github.com/attic-labs/noms/datas"
"github.com/attic-labs/noms/dataset"
"github.com/attic-labs/noms/ref"
"github.com/attic-labs/noms/types"
"github.com/attic-labs/noms/walk"
)
func validateRefAsCommit(r ref.Ref, cs chunks.ChunkSource) datas.Commit {
v := types.ReadValue(r, cs)
d.Exp.NotNil(v, "%v cannot be found", r)
// TODO: Replace this weird recover stuff below once we have a way to determine if a Value is an instance of a custom struct type. BUG #133
defer func() {
if r := recover(); r != nil {
d.Exp.Fail("Not a Commit:", "%+v", v)
}
}()
return datas.CommitFromVal(v)
}
// SetNewHead takes the Ref of the desired new Head of ds, the chunk for which should already exist in the Dataset. It validates that the Ref points to an existing chunk that decodes to the correct type of value and then commits it to ds, returning a new Dataset with newHeadRef set and ok set to true. In the event that the commit fails, ok is set to false and a new up-to-date Dataset is returned WITHOUT newHeadRef in it. The caller should try again using this new Dataset.
func SetNewHead(newHeadRef ref.Ref, ds dataset.Dataset) (dataset.Dataset, bool) {
commit := validateRefAsCommit(newHeadRef, ds.Store())
return ds.CommitWithParents(commit.Value(), commit.Parents())
}
// Copys all chunks reachable from (and including) |r| but excluding all chunks reachable from (and including) |exclude| in |source| to |sink|.
func CopyReachableChunksP(r, exclude ref.Ref, source chunks.ChunkSource, sink chunks.ChunkSink, concurrency int) {
excludeRefs := map[ref.Ref]bool{}
hasRef := func(r ref.Ref) bool {
return excludeRefs[r]
}
if exclude != (ref.Ref{}) {
refChan := make(chan ref.Ref, 1024)
addRef := func(r ref.Ref) {
refChan <- r
}
go func() {
walk.AllP(exclude, source, addRef, concurrency)
close(refChan)
}()
for r := range refChan {
excludeRefs[r] = true
}
}
tcs := &teeChunkSource{source, sink}
walk.SomeP(r, tcs, hasRef, concurrency)
}
// teeChunkSource just serves the purpose of writing to |sink| every chunk that is read from |source|.
type teeChunkSource struct {
source chunks.ChunkSource
sink chunks.ChunkSink
}
func (trs *teeChunkSource) Get(ref ref.Ref) chunks.Chunk {
c := trs.source.Get(ref)
if c.IsEmpty() {
return c
}
trs.sink.Put(c)
return c
}
func (trs *teeChunkSource) Has(ref ref.Ref) bool {
return trs.source.Has(ref)
}