mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-25 11:39:32 -05:00
Go: Use unified ref-height-based Pull algorithm (#1722)
Change Dataset.Pull to use a single algorithm to pull data from a source to a sink, regardless of which (if any) is local. The basic algorithm is described in the first section of pulling.md. This implementation is equivalent but phrased a bit differently. The algorithm actually used is described in the second section of pulling.md The main changes: - datas.Pull(), which implements the new pulling algorithm - RefHeap, a priority queue that sorts types.Ref by ref-height and then by ref.TargetHash() - Add has() to both Database implementations. Cache has() checks. - Switch Dataset to use new datas.Pull(). Currently not concurrent. Toward #1568 Mostly, prune reachableChunks
This commit is contained in:
+1
-1
@@ -17,7 +17,7 @@ type Chunk struct {
|
||||
data []byte
|
||||
}
|
||||
|
||||
var EmptyChunk = Chunk{}
|
||||
var EmptyChunk = NewChunk([]byte{})
|
||||
|
||||
func (c Chunk) Hash() hash.Hash {
|
||||
return c.r
|
||||
|
||||
@@ -23,7 +23,7 @@ import (
|
||||
Chunk N
|
||||
|
||||
Chunk:
|
||||
Hash // 20-byte sha1 hash
|
||||
Hash // 20-byte sha1 hash
|
||||
Len // 4-byte int
|
||||
Data // len(Data) == Len
|
||||
*/
|
||||
@@ -89,8 +89,8 @@ func Deserialize(reader io.Reader, cs ChunkSink, rateLimit chan struct{}) {
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
for {
|
||||
c := deserializeChunk(reader)
|
||||
if c.IsEmpty() {
|
||||
c, success := deserializeChunk(reader)
|
||||
if !success {
|
||||
break
|
||||
}
|
||||
|
||||
@@ -113,8 +113,8 @@ func Deserialize(reader io.Reader, cs ChunkSink, rateLimit chan struct{}) {
|
||||
// DeserializeToChan reads off of |reader| until EOF, sending chunks to chunkChan in the order they are read.
|
||||
func DeserializeToChan(reader io.Reader, chunkChan chan<- Chunk) {
|
||||
for {
|
||||
c := deserializeChunk(reader)
|
||||
if c.IsEmpty() {
|
||||
c, success := deserializeChunk(reader)
|
||||
if !success {
|
||||
break
|
||||
}
|
||||
chunkChan <- c
|
||||
@@ -122,11 +122,11 @@ func DeserializeToChan(reader io.Reader, chunkChan chan<- Chunk) {
|
||||
close(chunkChan)
|
||||
}
|
||||
|
||||
func deserializeChunk(reader io.Reader) Chunk {
|
||||
func deserializeChunk(reader io.Reader) (Chunk, bool) {
|
||||
digest := hash.Sha1Digest{}
|
||||
n, err := io.ReadFull(reader, digest[:])
|
||||
if err == io.EOF {
|
||||
return EmptyChunk
|
||||
return EmptyChunk, false
|
||||
}
|
||||
d.Chk.NoError(err)
|
||||
d.Chk.True(int(sha1.Size) == n)
|
||||
@@ -141,6 +141,6 @@ func deserializeChunk(reader io.Reader) Chunk {
|
||||
d.Chk.NoError(err)
|
||||
d.Chk.True(int64(chunkSize) == n2)
|
||||
c := w.Chunk()
|
||||
d.Chk.True(h == c.Hash())
|
||||
return c
|
||||
d.Chk.True(h == c.Hash(), "%s != %s", h, c.Hash())
|
||||
return c, true
|
||||
}
|
||||
|
||||
@@ -7,18 +7,21 @@ package datas
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/attic-labs/noms/go/chunks"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
)
|
||||
|
||||
type cachingChunkHaver struct {
|
||||
backing chunks.ChunkSource
|
||||
hasCache map[hash.Hash]bool
|
||||
mu *sync.Mutex
|
||||
type chunkHaver interface {
|
||||
Has(h hash.Hash) bool
|
||||
}
|
||||
|
||||
func newCachingChunkHaver(cs chunks.ChunkSource) *cachingChunkHaver {
|
||||
return &cachingChunkHaver{cs, map[hash.Hash]bool{}, &sync.Mutex{}}
|
||||
type cachingChunkHaver struct {
|
||||
backing chunkHaver
|
||||
hasCache map[hash.Hash]bool
|
||||
mu *sync.RWMutex
|
||||
}
|
||||
|
||||
func newCachingChunkHaver(cs chunkHaver) *cachingChunkHaver {
|
||||
return &cachingChunkHaver{cs, map[hash.Hash]bool{}, &sync.RWMutex{}}
|
||||
}
|
||||
|
||||
func (ccs *cachingChunkHaver) Has(r hash.Hash) bool {
|
||||
@@ -31,8 +34,8 @@ func (ccs *cachingChunkHaver) Has(r hash.Hash) bool {
|
||||
}
|
||||
|
||||
func checkCache(ccs *cachingChunkHaver, r hash.Hash) (has, ok bool) {
|
||||
ccs.mu.Lock()
|
||||
defer ccs.mu.Unlock()
|
||||
ccs.mu.RLock()
|
||||
defer ccs.mu.RUnlock()
|
||||
has, ok = ccs.hasCache[r]
|
||||
return
|
||||
}
|
||||
|
||||
+4
-2
@@ -7,6 +7,7 @@ package datas
|
||||
import "github.com/attic-labs/noms/go/types"
|
||||
|
||||
var commitType *types.Type
|
||||
var refOfCommitType *types.Type
|
||||
|
||||
const (
|
||||
ParentsField = "parents"
|
||||
@@ -27,6 +28,7 @@ func init() {
|
||||
}
|
||||
commitType = types.MakeStructType(structName, fieldTypes)
|
||||
commitType.Desc.(types.StructDesc).SetField(ParentsField, types.MakeSetType(types.MakeRefType(commitType)))
|
||||
refOfCommitType = types.MakeRefType(commitType)
|
||||
}
|
||||
|
||||
func NewCommit() types.Struct {
|
||||
@@ -39,7 +41,7 @@ func NewCommit() types.Struct {
|
||||
}
|
||||
|
||||
func typeForMapOfStringToRefOfCommit() *types.Type {
|
||||
return types.MakeMapType(types.StringType, types.MakeRefType(commitType))
|
||||
return types.MakeMapType(types.StringType, refOfCommitType)
|
||||
}
|
||||
|
||||
func NewMapOfStringToRefOfCommit() types.Map {
|
||||
@@ -47,7 +49,7 @@ func NewMapOfStringToRefOfCommit() types.Map {
|
||||
}
|
||||
|
||||
func typeForSetOfRefOfCommit() *types.Type {
|
||||
return types.MakeSetType(types.MakeRefType(commitType))
|
||||
return types.MakeSetType(refOfCommitType)
|
||||
}
|
||||
|
||||
func CommitType() *types.Type {
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"io"
|
||||
|
||||
"github.com/attic-labs/noms/go/chunks"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
"github.com/attic-labs/noms/go/types"
|
||||
)
|
||||
|
||||
@@ -39,17 +40,10 @@ type Database interface {
|
||||
// Delete removes the Dataset named datasetID from the map at the root of the Database. The Dataset data is not necessarily cleaned up at this time, but may be garbage collected in the future. If the update cannot be performed, e.g., because of a conflict, error will non-nil. The newest snapshot of the database is always returned.
|
||||
Delete(datasetID string) (Database, error)
|
||||
|
||||
batchSink() batchSink
|
||||
has(hash hash.Hash) bool
|
||||
batchStore() types.BatchStore
|
||||
}
|
||||
|
||||
// This interface exists solely to allow RemoteDatabaseClient to pass back a gross side-channel thing for the purposes of pull.
|
||||
type batchSink interface {
|
||||
SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints)
|
||||
Flush()
|
||||
io.Closer
|
||||
}
|
||||
|
||||
func NewDatabase(cs chunks.ChunkStore) Database {
|
||||
return newLocalDatabase(cs)
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
)
|
||||
|
||||
type databaseCommon struct {
|
||||
cch *cachingChunkHaver
|
||||
vs *types.ValueStore
|
||||
rt chunks.RootTracker
|
||||
rootRef hash.Hash
|
||||
@@ -25,8 +26,8 @@ var (
|
||||
ErrMergeNeeded = errors.New("Dataset head is not ancestor of commit")
|
||||
)
|
||||
|
||||
func newDatabaseCommon(vs *types.ValueStore, rt chunks.RootTracker) databaseCommon {
|
||||
return databaseCommon{vs: vs, rt: rt, rootRef: rt.Root()}
|
||||
func newDatabaseCommon(cch *cachingChunkHaver, vs *types.ValueStore, rt chunks.RootTracker) databaseCommon {
|
||||
return databaseCommon{cch: cch, vs: vs, rt: rt, rootRef: rt.Root()}
|
||||
}
|
||||
|
||||
func (ds *databaseCommon) MaybeHead(datasetID string) (types.Struct, bool) {
|
||||
@@ -68,6 +69,10 @@ func (ds *databaseCommon) Datasets() types.Map {
|
||||
return *ds.datasets
|
||||
}
|
||||
|
||||
func (ds *databaseCommon) has(h hash.Hash) bool {
|
||||
return ds.cch.Has(h)
|
||||
}
|
||||
|
||||
func (ds *databaseCommon) ReadValue(r hash.Hash) types.Value {
|
||||
return ds.vs.ReadValue(r)
|
||||
}
|
||||
|
||||
@@ -48,8 +48,8 @@ type RemoteDatabaseSuite struct {
|
||||
func (suite *RemoteDatabaseSuite) SetupTest() {
|
||||
suite.cs = chunks.NewTestStore()
|
||||
suite.makeDs = func(cs chunks.ChunkStore) Database {
|
||||
hbs := newHTTPBatchStoreForTest(suite.cs)
|
||||
return &RemoteDatabaseClient{newDatabaseCommon(types.NewValueStore(hbs), hbs)}
|
||||
hbs := newHTTPBatchStoreForTest(cs)
|
||||
return &RemoteDatabaseClient{newDatabaseCommon(newCachingChunkHaver(hbs), types.NewValueStore(hbs), hbs)}
|
||||
}
|
||||
suite.ds = suite.makeDs(suite.cs)
|
||||
}
|
||||
|
||||
@@ -77,8 +77,9 @@ type httpDoer interface {
|
||||
}
|
||||
|
||||
type writeRequest struct {
|
||||
hash hash.Hash
|
||||
hints types.Hints
|
||||
hash hash.Hash
|
||||
hints types.Hints
|
||||
justHints bool
|
||||
}
|
||||
|
||||
// Use a custom http client rather than http.DefaultClient. We limit ourselves to a maximum of |requestLimit| concurrent http requests, the custom httpClient ups the maxIdleConnsPerHost value so that one connection stays open for each concurrent request.
|
||||
@@ -274,7 +275,11 @@ func (bhcs *httpBatchStore) SchedulePut(c chunks.Chunk, refHeight uint64, hints
|
||||
}
|
||||
|
||||
bhcs.requestWg.Add(1)
|
||||
bhcs.writeQueue <- writeRequest{c.Hash(), hints}
|
||||
bhcs.writeQueue <- writeRequest{c.Hash(), hints, false}
|
||||
}
|
||||
|
||||
func (bhcs *httpBatchStore) AddHints(hints types.Hints) {
|
||||
bhcs.writeQueue <- writeRequest{hash.Hash{}, hints, true}
|
||||
}
|
||||
|
||||
func (bhcs *httpBatchStore) batchPutRequests() {
|
||||
@@ -285,10 +290,12 @@ func (bhcs *httpBatchStore) batchPutRequests() {
|
||||
hints := types.Hints{}
|
||||
hashes := hashSet{}
|
||||
handleRequest := func(wr writeRequest) {
|
||||
if hashes.Has(wr.hash) {
|
||||
bhcs.requestWg.Done() // Already have a put enqueued for wr.hash.
|
||||
} else {
|
||||
hashes.Insert(wr.hash)
|
||||
if !wr.justHints {
|
||||
if hashes.Has(wr.hash) {
|
||||
bhcs.requestWg.Done() // Already have a put enqueued for wr.hash.
|
||||
} else {
|
||||
hashes.Insert(wr.hash)
|
||||
}
|
||||
}
|
||||
for hint := range wr.hints {
|
||||
hints[hint] = struct{}{}
|
||||
@@ -347,6 +354,7 @@ func (bhcs *httpBatchStore) sendWriteRequests(hashes hashSet, hints types.Hints)
|
||||
gw.Close()
|
||||
pw.Close()
|
||||
errChan <- err
|
||||
close(errChan)
|
||||
}()
|
||||
body := buildWriteValueRequest(serializedChunks, hints)
|
||||
|
||||
|
||||
@@ -6,47 +6,30 @@ package datas
|
||||
|
||||
import (
|
||||
"github.com/attic-labs/noms/go/chunks"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
"github.com/attic-labs/noms/go/types"
|
||||
)
|
||||
|
||||
// Database provides versioned storage for noms values. Each Database instance represents one moment in history. Heads() returns the Commit from each active fork at that moment. The Commit() method returns a new Database, representing a new moment in history.
|
||||
type LocalDatabase struct {
|
||||
cch *cachingChunkHaver
|
||||
databaseCommon
|
||||
}
|
||||
|
||||
func newLocalDatabase(cs chunks.ChunkStore) *LocalDatabase {
|
||||
return &LocalDatabase{
|
||||
newCachingChunkHaver(cs),
|
||||
newDatabaseCommon(types.NewValueStore(types.NewBatchStoreAdaptor(cs)), cs),
|
||||
newDatabaseCommon(newCachingChunkHaver(cs), types.NewValueStore(types.NewBatchStoreAdaptor(cs)), cs),
|
||||
}
|
||||
}
|
||||
|
||||
func (lds *LocalDatabase) Commit(datasetID string, commit types.Struct) (Database, error) {
|
||||
err := lds.commit(datasetID, commit)
|
||||
lds.vs.Flush()
|
||||
return &LocalDatabase{
|
||||
lds.cch,
|
||||
newDatabaseCommon(lds.vs, lds.rt),
|
||||
}, err
|
||||
return &LocalDatabase{newDatabaseCommon(lds.cch, lds.vs, lds.rt)}, err
|
||||
}
|
||||
|
||||
func (lds *LocalDatabase) Delete(datasetID string) (Database, error) {
|
||||
err := lds.doDelete(datasetID)
|
||||
lds.vs.Flush()
|
||||
return &LocalDatabase{
|
||||
lds.cch,
|
||||
newDatabaseCommon(lds.vs, lds.rt),
|
||||
}, err
|
||||
}
|
||||
|
||||
func (lds *LocalDatabase) has(r hash.Hash) bool {
|
||||
return lds.cch.Has(r)
|
||||
}
|
||||
|
||||
func (lds *LocalDatabase) batchSink() batchSink {
|
||||
return lds.vs.BatchStore()
|
||||
return &LocalDatabase{newDatabaseCommon(lds.cch, lds.vs, lds.rt)}, err
|
||||
}
|
||||
|
||||
func (lds *LocalDatabase) batchStore() types.BatchStore {
|
||||
|
||||
@@ -1,164 +0,0 @@
|
||||
// Copyright 2016 The Noms Authors. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package datas
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync"
|
||||
|
||||
"github.com/attic-labs/noms/go/chunks"
|
||||
"github.com/attic-labs/noms/go/constants"
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
"github.com/attic-labs/noms/go/types"
|
||||
"github.com/julienschmidt/httprouter"
|
||||
)
|
||||
|
||||
// notABatchSink exists solely to provide a way to pull chunks into a remote data store without validation, because doing it with validation efficiently requires some serialization changes we have yet to make. Once we land BUG 822, we can come back here and undo this.
|
||||
type notABatchSink struct {
|
||||
host *url.URL
|
||||
httpClient httpDoer
|
||||
auth string
|
||||
writeQueue chan chunks.Chunk
|
||||
flushChan chan struct{}
|
||||
finishedChan chan struct{}
|
||||
rateLimit chan struct{}
|
||||
requestWg *sync.WaitGroup
|
||||
workerWg *sync.WaitGroup
|
||||
unwrittenPuts *orderedChunkCache
|
||||
}
|
||||
|
||||
func newNotABatchSink(host *url.URL, auth string) *notABatchSink {
|
||||
sink := ¬ABatchSink{
|
||||
host: host,
|
||||
httpClient: makeHTTPClient(httpChunkSinkConcurrency),
|
||||
auth: auth,
|
||||
writeQueue: make(chan chunks.Chunk, writeBufferSize),
|
||||
flushChan: make(chan struct{}),
|
||||
finishedChan: make(chan struct{}),
|
||||
rateLimit: make(chan struct{}, httpChunkSinkConcurrency),
|
||||
requestWg: &sync.WaitGroup{},
|
||||
workerWg: &sync.WaitGroup{},
|
||||
unwrittenPuts: newOrderedChunkCache(),
|
||||
}
|
||||
sink.batchPutRequests()
|
||||
return sink
|
||||
}
|
||||
|
||||
func (bhcs *notABatchSink) Flush() {
|
||||
bhcs.flushChan <- struct{}{}
|
||||
bhcs.requestWg.Wait()
|
||||
return
|
||||
}
|
||||
|
||||
func (bhcs *notABatchSink) Close() (e error) {
|
||||
close(bhcs.finishedChan)
|
||||
bhcs.requestWg.Wait()
|
||||
bhcs.workerWg.Wait()
|
||||
|
||||
close(bhcs.flushChan)
|
||||
close(bhcs.writeQueue)
|
||||
close(bhcs.rateLimit)
|
||||
return
|
||||
}
|
||||
|
||||
func (bhcs *notABatchSink) SchedulePut(c chunks.Chunk, refHeight uint64, hints types.Hints) {
|
||||
if !bhcs.unwrittenPuts.Insert(c, refHeight) {
|
||||
return
|
||||
}
|
||||
|
||||
bhcs.requestWg.Add(1)
|
||||
bhcs.writeQueue <- c
|
||||
}
|
||||
|
||||
func (bhcs *notABatchSink) batchPutRequests() {
|
||||
bhcs.workerWg.Add(1)
|
||||
go func() {
|
||||
defer bhcs.workerWg.Done()
|
||||
|
||||
var chunks []chunks.Chunk
|
||||
sendAndReset := func() {
|
||||
bhcs.sendWriteRequests(chunks) // Takes ownership of chunks
|
||||
chunks = nil
|
||||
}
|
||||
|
||||
for done := false; !done; {
|
||||
drainAndSend := false
|
||||
select {
|
||||
case c := <-bhcs.writeQueue:
|
||||
chunks = append(chunks, c)
|
||||
if len(chunks) == writeBufferSize {
|
||||
sendAndReset()
|
||||
}
|
||||
case <-bhcs.flushChan:
|
||||
drainAndSend = true
|
||||
case <-bhcs.finishedChan:
|
||||
drainAndSend = true
|
||||
done = true
|
||||
}
|
||||
|
||||
if drainAndSend {
|
||||
for drained := false; !drained; {
|
||||
select {
|
||||
case c := <-bhcs.writeQueue:
|
||||
chunks = append(chunks, c)
|
||||
default:
|
||||
drained = true
|
||||
}
|
||||
if len(chunks) == writeBufferSize || (drained && chunks != nil) {
|
||||
sendAndReset()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
d.Chk.Nil(chunks, "%d chunks were never sent to server", len(chunks))
|
||||
}()
|
||||
}
|
||||
|
||||
func (bhcs *notABatchSink) sendWriteRequests(chnx []chunks.Chunk) {
|
||||
bhcs.rateLimit <- struct{}{}
|
||||
go func() {
|
||||
hashes := make(hashSet, len(chnx))
|
||||
defer func() {
|
||||
bhcs.unwrittenPuts.Clear(hashes)
|
||||
bhcs.requestWg.Add(-len(chnx))
|
||||
}()
|
||||
|
||||
body := &bytes.Buffer{}
|
||||
gw := gzip.NewWriter(body)
|
||||
sz := chunks.NewSerializer(gw)
|
||||
for _, chunk := range chnx {
|
||||
hashes.Insert(chunk.Hash())
|
||||
sz.Put(chunk)
|
||||
}
|
||||
sz.Close()
|
||||
gw.Close()
|
||||
|
||||
url := *bhcs.host
|
||||
url.Path = httprouter.CleanPath(bhcs.host.Path + constants.PostRefsPath)
|
||||
req := newRequest("POST", bhcs.auth, url.String(), body, http.Header{
|
||||
"Content-Encoding": {"gzip"},
|
||||
"Content-Type": {"application/octet-stream"},
|
||||
})
|
||||
|
||||
res, err := bhcs.httpClient.Do(req)
|
||||
d.Chk.NoError(err)
|
||||
|
||||
d.Chk.True(res.StatusCode == http.StatusCreated, "Unexpected response: %s", http.StatusText(res.StatusCode))
|
||||
closeResponse(res)
|
||||
<-bhcs.rateLimit
|
||||
}()
|
||||
}
|
||||
|
||||
func (bhcs *notABatchSink) Root() hash.Hash {
|
||||
panic("Not Reached")
|
||||
}
|
||||
|
||||
func (bhcs *notABatchSink) UpdateRoot(current, last hash.Hash) bool {
|
||||
panic("Not Reached")
|
||||
}
|
||||
@@ -1,67 +0,0 @@
|
||||
// Copyright 2016 The Noms Authors. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package datas
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"testing"
|
||||
|
||||
"github.com/attic-labs/noms/go/chunks"
|
||||
"github.com/attic-labs/noms/go/constants"
|
||||
"github.com/attic-labs/noms/go/types"
|
||||
"github.com/attic-labs/testify/suite"
|
||||
"github.com/julienschmidt/httprouter"
|
||||
)
|
||||
|
||||
func TestNotABatchSink(t *testing.T) {
|
||||
suite.Run(t, &NotABatchSinkSuite{})
|
||||
}
|
||||
|
||||
type NotABatchSinkSuite struct {
|
||||
suite.Suite
|
||||
cs *chunks.TestStore
|
||||
store batchSink
|
||||
}
|
||||
|
||||
func (suite *NotABatchSinkSuite) SetupTest() {
|
||||
suite.cs = chunks.NewTestStore()
|
||||
suite.store = newNotAHintedBatchSinkForTest(suite.cs)
|
||||
}
|
||||
|
||||
func newNotAHintedBatchSinkForTest(cs chunks.ChunkStore) batchSink {
|
||||
serv := inlineServer{httprouter.New()}
|
||||
serv.POST(
|
||||
constants.PostRefsPath,
|
||||
func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||
HandlePostRefs(w, req, ps, cs)
|
||||
},
|
||||
)
|
||||
u, _ := url.Parse("http://localhost:9000")
|
||||
hcs := newNotABatchSink(u, "")
|
||||
hcs.httpClient = serv
|
||||
return hcs
|
||||
}
|
||||
|
||||
func (suite *NotABatchSinkSuite) TearDownTest() {
|
||||
suite.store.Close()
|
||||
suite.cs.Close()
|
||||
}
|
||||
|
||||
func (suite *NotABatchSinkSuite) TestPutChunks() {
|
||||
vals := []types.Value{
|
||||
types.NewString("abc"),
|
||||
types.NewString("def"),
|
||||
}
|
||||
l := types.NewList()
|
||||
for _, v := range vals {
|
||||
suite.store.SchedulePut(types.EncodeValue(v, nil), 1, types.Hints{})
|
||||
l = l.Append(types.NewRef(v))
|
||||
}
|
||||
suite.store.SchedulePut(types.EncodeValue(l, nil), 2, types.Hints{})
|
||||
suite.store.Flush()
|
||||
|
||||
suite.Equal(3, suite.cs.Writes)
|
||||
}
|
||||
+95
-31
@@ -5,50 +5,114 @@
|
||||
package datas
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"container/heap"
|
||||
|
||||
"github.com/attic-labs/noms/go/chunks"
|
||||
"github.com/attic-labs/noms/go/d"
|
||||
"github.com/attic-labs/noms/go/hash"
|
||||
"github.com/attic-labs/noms/go/types"
|
||||
"github.com/attic-labs/noms/go/walk"
|
||||
)
|
||||
|
||||
// CopyMissingChunksP copies to |sink| all chunks in source that are reachable from (and including) |r|, skipping chunks that |sink| already has
|
||||
func CopyMissingChunksP(source Database, sink *LocalDatabase, sourceRef types.Ref, concurrency int) {
|
||||
stopCallback := func(r types.Ref) bool {
|
||||
return sink.has(r.TargetHash())
|
||||
// Pull objects that descends from sourceRef from srcDB to sinkDB. sinkHeadRef should point to a Commit (in sinkDB) that's an ancestor of sourceRef. This allows the algorithm to figure out which portions of data are already present in sinkDB and skip copying them.
|
||||
// TODO: Figure out how to add concurrency.
|
||||
func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref) {
|
||||
srcQ, sinkQ := types.RefHeap{sourceRef}, types.RefHeap{sinkHeadRef}
|
||||
heap.Init(&srcQ)
|
||||
heap.Init(&sinkQ)
|
||||
|
||||
// We generally expect that sourceRef descends from sinkHeadRef, so that walking down from sinkHeadRef yields useful hints. If it's not even in the srcDB, then just clear out sinkQ right now and don't bother.
|
||||
if !srcDB.has(sinkHeadRef.TargetHash()) {
|
||||
heap.Pop(&sinkQ)
|
||||
}
|
||||
copyWorker(source, sink, sourceRef, stopCallback, concurrency)
|
||||
}
|
||||
|
||||
// CopyReachableChunksP copies to |sink| all chunks reachable from (and including) |r|, but that are not in the subtree rooted at |exclude|
|
||||
func CopyReachableChunksP(source, sink Database, sourceRef, exclude types.Ref, concurrency int) {
|
||||
excludeRefs := map[hash.Hash]bool{}
|
||||
hc := hintCache{}
|
||||
reachableChunks := hashSet{}
|
||||
|
||||
if !exclude.TargetHash().IsEmpty() {
|
||||
mu := sync.Mutex{}
|
||||
excludeCallback := func(r types.Ref) bool {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
excludeRefs[r.TargetHash()] = true
|
||||
return false
|
||||
// Since we expect sourceRef to descend from sinkHeadRef, we assume srcDB has a superset of the data in sinkDB. There are some cases where, logically, the code wants to read data it knows to be in sinkDB. In this case, it doesn't actually matter which Database the data comes from, so as an optimization we use whichever is a LocalDatabase -- if either is.
|
||||
mostLocalDB := srcDB
|
||||
if _, ok := sinkDB.(*LocalDatabase); ok {
|
||||
mostLocalDB = sinkDB
|
||||
}
|
||||
|
||||
for !srcQ.Empty() {
|
||||
srcRef := srcQ[0]
|
||||
|
||||
// If the head of one Q is "higher" than the other, traverse it and then loop again. "HigherThan" sorts first by greater ref-height, then orders Refs by TargetHash.
|
||||
if sinkQ.Empty() || types.HigherThan(srcRef, sinkQ[0]) {
|
||||
traverseSource(&srcQ, srcDB, sinkDB, reachableChunks)
|
||||
continue
|
||||
}
|
||||
// Either the head of sinkQ is higher, or the heads of both queues are equal.
|
||||
if types.HigherThan(sinkQ[0], srcRef) {
|
||||
traverseSink(&sinkQ, mostLocalDB, hc)
|
||||
continue
|
||||
}
|
||||
|
||||
walk.SomeChunksP(exclude, source.batchStore(), excludeCallback, nil, concurrency)
|
||||
// The heads of both Qs are the same.
|
||||
d.Chk.True(!sinkQ.Empty(), "The heads should be the same, but sinkQ is empty!")
|
||||
traverseCommon(sinkHeadRef, &sinkQ, &srcQ, mostLocalDB, hc)
|
||||
}
|
||||
|
||||
stopCallback := func(r types.Ref) bool {
|
||||
return excludeRefs[r.TargetHash()]
|
||||
hints := types.Hints{}
|
||||
for hash := range reachableChunks {
|
||||
if hint, present := hc[hash]; present {
|
||||
hints[hint] = struct{}{}
|
||||
}
|
||||
}
|
||||
copyWorker(source, sink, sourceRef, stopCallback, concurrency)
|
||||
sinkDB.batchStore().AddHints(hints)
|
||||
}
|
||||
|
||||
func copyWorker(source, sink Database, sourceRef types.Ref, stopCb walk.SomeChunksStopCallback, concurrency int) {
|
||||
bs := sink.batchSink()
|
||||
type hintCache map[hash.Hash]hash.Hash
|
||||
|
||||
walk.SomeChunksP(sourceRef, source.batchStore(), stopCb, func(r types.Ref, c chunks.Chunk) {
|
||||
bs.SchedulePut(c, r.Height(), types.Hints{})
|
||||
}, concurrency)
|
||||
|
||||
bs.Flush()
|
||||
func traverseSource(srcQ *types.RefHeap, src Database, sinkDB Database, reachableChunks hashSet) {
|
||||
srcRef := heap.Pop(srcQ).(types.Ref)
|
||||
h := srcRef.TargetHash()
|
||||
if !sinkDB.has(h) {
|
||||
srcBS := src.batchStore()
|
||||
c := srcBS.Get(h)
|
||||
v := types.DecodeValue(c, src)
|
||||
d.Chk.True(v != nil, "Expected decoded chunk to be non-nil.")
|
||||
for _, reachable := range v.Chunks() {
|
||||
heap.Push(srcQ, reachable)
|
||||
reachableChunks.Insert(reachable.TargetHash())
|
||||
}
|
||||
sinkDB.batchStore().SchedulePut(c, srcRef.Height(), types.Hints{})
|
||||
delete(reachableChunks, h)
|
||||
}
|
||||
}
|
||||
|
||||
func traverseSink(sinkQ *types.RefHeap, db Database, hc hintCache) {
|
||||
sinkRef := heap.Pop(sinkQ).(types.Ref)
|
||||
if sinkRef.Height() > 1 {
|
||||
h := sinkRef.TargetHash()
|
||||
for _, reachable := range sinkRef.TargetValue(db).Chunks() {
|
||||
heap.Push(sinkQ, reachable)
|
||||
hc[reachable.TargetHash()] = h
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func traverseCommon(sinkHead types.Ref, sinkQ, srcQ *types.RefHeap, db Database, hc hintCache) {
|
||||
comRef, sinkRef := heap.Pop(srcQ).(types.Ref), heap.Pop(sinkQ).(types.Ref)
|
||||
d.Chk.True(comRef.Equals(sinkRef), "traverseCommon expects refs to be equal: %s != %s", comRef.TargetHash(), sinkRef.TargetHash())
|
||||
if comRef.Height() == 1 {
|
||||
return
|
||||
}
|
||||
if comRef.Type().Equals(refOfCommitType) {
|
||||
commit := comRef.TargetValue(db).(types.Struct)
|
||||
// We don't want to traverse the parents of sinkHead, but we still want to traverse its Value on the sinkDB side. We also still want to traverse all children, in both the srcDB and sinkDB, of any common Commit that is not at the Head of sinkDB.
|
||||
isHeadOfSink := comRef.Equals(sinkHead)
|
||||
exclusionSet := types.NewSet()
|
||||
if isHeadOfSink {
|
||||
exclusionSet = commit.Get(ParentsField).(types.Set)
|
||||
}
|
||||
commitHash := comRef.TargetHash()
|
||||
for _, reachable := range commit.Chunks() {
|
||||
if !exclusionSet.Has(reachable) {
|
||||
heap.Push(sinkQ, reachable)
|
||||
if !isHeadOfSink {
|
||||
heap.Push(srcQ, reachable)
|
||||
}
|
||||
hc[reachable.TargetHash()] = commitHash
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,278 @@
|
||||
// Copyright 2016 The Noms Authors. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package datas
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/attic-labs/noms/go/chunks"
|
||||
"github.com/attic-labs/noms/go/types"
|
||||
"github.com/attic-labs/testify/suite"
|
||||
)
|
||||
|
||||
const dsID = "ds1"
|
||||
|
||||
func TestLocalToLocalPulls(t *testing.T) {
|
||||
suite.Run(t, &LocalToLocalSuite{})
|
||||
}
|
||||
|
||||
func TestRemoteToLocalPulls(t *testing.T) {
|
||||
suite.Run(t, &RemoteToLocalSuite{})
|
||||
}
|
||||
|
||||
func TestLocalToRemotePulls(t *testing.T) {
|
||||
suite.Run(t, &LocalToRemoteSuite{})
|
||||
}
|
||||
|
||||
func TestRemoteToRemotePulls(t *testing.T) {
|
||||
suite.Run(t, &RemoteToRemoteSuite{})
|
||||
}
|
||||
|
||||
type PullSuite struct {
|
||||
suite.Suite
|
||||
sinkCS *chunks.TestStore
|
||||
sourceCS *chunks.TestStore
|
||||
sink Database
|
||||
source Database
|
||||
}
|
||||
|
||||
type LocalToLocalSuite struct {
|
||||
PullSuite
|
||||
}
|
||||
|
||||
func (suite *LocalToLocalSuite) SetupTest() {
|
||||
suite.sinkCS = chunks.NewTestStore()
|
||||
suite.sourceCS = chunks.NewTestStore()
|
||||
suite.sink = NewDatabase(suite.sinkCS)
|
||||
suite.source = NewDatabase(suite.sourceCS)
|
||||
}
|
||||
|
||||
type RemoteToLocalSuite struct {
|
||||
PullSuite
|
||||
}
|
||||
|
||||
func (suite *RemoteToLocalSuite) SetupTest() {
|
||||
suite.sinkCS = chunks.NewTestStore()
|
||||
suite.sourceCS = chunks.NewTestStore()
|
||||
suite.sink = NewDatabase(suite.sinkCS)
|
||||
suite.source = makeRemoteDb(suite.sourceCS)
|
||||
}
|
||||
|
||||
type LocalToRemoteSuite struct {
|
||||
PullSuite
|
||||
}
|
||||
|
||||
func (suite *LocalToRemoteSuite) SetupTest() {
|
||||
suite.sinkCS = chunks.NewTestStore()
|
||||
suite.sourceCS = chunks.NewTestStore()
|
||||
suite.sink = makeRemoteDb(suite.sinkCS)
|
||||
suite.source = NewDatabase(suite.sourceCS)
|
||||
}
|
||||
|
||||
type RemoteToRemoteSuite struct {
|
||||
PullSuite
|
||||
}
|
||||
|
||||
func (suite *RemoteToRemoteSuite) SetupTest() {
|
||||
suite.sinkCS = chunks.NewTestStore()
|
||||
suite.sourceCS = chunks.NewTestStore()
|
||||
suite.sink = makeRemoteDb(suite.sinkCS)
|
||||
suite.source = makeRemoteDb(suite.sourceCS)
|
||||
}
|
||||
|
||||
func makeRemoteDb(cs chunks.ChunkStore) Database {
|
||||
hbs := newHTTPBatchStoreForTest(cs)
|
||||
return &RemoteDatabaseClient{newDatabaseCommon(newCachingChunkHaver(hbs), types.NewValueStore(hbs), hbs)}
|
||||
}
|
||||
|
||||
func (suite *PullSuite) sinkIsLocal() bool {
|
||||
_, isLocal := suite.sink.(*LocalDatabase)
|
||||
return isLocal
|
||||
}
|
||||
|
||||
func (suite *PullSuite) TearDownTest() {
|
||||
suite.sink.Close()
|
||||
suite.source.Close()
|
||||
suite.sinkCS.Close()
|
||||
suite.sourceCS.Close()
|
||||
}
|
||||
|
||||
// Source: -3-> C(L2) -1-> N
|
||||
// \ -2-> L1 -1-> N
|
||||
// \ -1-> L0
|
||||
//
|
||||
// Sink: Nada
|
||||
func (suite *PullSuite) TestPullEverything() {
|
||||
l := buildListOfHeight(2, suite.source)
|
||||
sourceRef := suite.commitToSource(l, types.NewSet())
|
||||
|
||||
Pull(suite.source, suite.sink, sourceRef, types.Ref{})
|
||||
suite.Equal(0, suite.sinkCS.Reads)
|
||||
|
||||
suite.sink.batchStore().Flush()
|
||||
v := suite.sink.ReadValue(sourceRef.TargetHash()).(types.Struct)
|
||||
suite.NotNil(v)
|
||||
suite.True(l.Equals(v.Get(ValueField)))
|
||||
}
|
||||
|
||||
// Source: -6-> C3(L5) -1-> N
|
||||
// . \ -5-> L4 -1-> N
|
||||
// . \ -4-> L3 -1-> N
|
||||
// . \ -3-> L2 -1-> N
|
||||
// 5 \ -2-> L1 -1-> N
|
||||
// . \ -1-> L0
|
||||
// C2(L4) -1-> N
|
||||
// . \ -4-> L3 -1-> N
|
||||
// . \ -3-> L2 -1-> N
|
||||
// . \ -2-> L1 -1-> N
|
||||
// 3 \ -1-> L0
|
||||
// .
|
||||
// C1(L2) -1-> N
|
||||
// \ -2-> L1 -1-> N
|
||||
// \ -1-> L0
|
||||
//
|
||||
// Sink: -3-> C1(L2) -1-> N
|
||||
// \ -2-> L1 -1-> N
|
||||
// \ -1-> L0
|
||||
func (suite *PullSuite) TestPullMultiGeneration() {
|
||||
sinkL := buildListOfHeight(2, suite.sink)
|
||||
sinkRef := suite.commitToSink(sinkL, types.NewSet())
|
||||
expectedReads := suite.sinkCS.Reads
|
||||
|
||||
srcL := buildListOfHeight(2, suite.source)
|
||||
sourceRef := suite.commitToSource(srcL, types.NewSet())
|
||||
srcL = buildListOfHeight(4, suite.source)
|
||||
sourceRef = suite.commitToSource(srcL, types.NewSet(sourceRef))
|
||||
srcL = buildListOfHeight(5, suite.source)
|
||||
sourceRef = suite.commitToSource(srcL, types.NewSet(sourceRef))
|
||||
|
||||
Pull(suite.source, suite.sink, sourceRef, sinkRef)
|
||||
if suite.sinkIsLocal() {
|
||||
// C1 gets read from most-local DB
|
||||
expectedReads++
|
||||
}
|
||||
suite.Equal(expectedReads, suite.sinkCS.Reads)
|
||||
|
||||
suite.sink.batchStore().Flush()
|
||||
v := suite.sink.ReadValue(sourceRef.TargetHash()).(types.Struct)
|
||||
suite.NotNil(v)
|
||||
suite.True(srcL.Equals(v.Get(ValueField)))
|
||||
}
|
||||
|
||||
// Source: -6-> C2(L5) -1-> N
|
||||
// . \ -5-> L4 -1-> N
|
||||
// . \ -4-> L3 -1-> N
|
||||
// . \ -3-> L2 -1-> N
|
||||
// 4 \ -2-> L1 -1-> N
|
||||
// . \ -1-> L0
|
||||
// C1(L3) -1-> N
|
||||
// \ -3-> L2 -1-> N
|
||||
// \ -2-> L1 -1-> N
|
||||
// \ -1-> L0
|
||||
//
|
||||
// Sink: -5-> C3(L3') -1-> N
|
||||
// . \ -3-> L2 -1-> N
|
||||
// . \ \ -2-> L1 -1-> N
|
||||
// . \ \ -1-> L0
|
||||
// . \ - "oy!"
|
||||
// 4
|
||||
// .
|
||||
// C1(L3) -1-> N
|
||||
// \ -3-> L2 -1-> N
|
||||
// \ -2-> L1 -1-> N
|
||||
// \ -1-> L0
|
||||
func (suite *PullSuite) TestPullDivergentHistory() {
|
||||
sinkL := buildListOfHeight(3, suite.sink)
|
||||
sinkRef := suite.commitToSink(sinkL, types.NewSet())
|
||||
srcL := buildListOfHeight(3, suite.source)
|
||||
sourceRef := suite.commitToSource(srcL, types.NewSet())
|
||||
|
||||
sinkL = sinkL.Append(types.NewString("oy!"))
|
||||
sinkRef = suite.commitToSink(sinkL, types.NewSet(sinkRef))
|
||||
srcL = srcL.Set(1, buildListOfHeight(5, suite.source))
|
||||
sourceRef = suite.commitToSource(srcL, types.NewSet(sourceRef))
|
||||
preReads := suite.sinkCS.Reads
|
||||
|
||||
Pull(suite.source, suite.sink, sourceRef, sinkRef)
|
||||
|
||||
// No objects read from sink, since sink Head is not an ancestor of source HEAD.
|
||||
suite.Equal(preReads, suite.sinkCS.Reads)
|
||||
|
||||
suite.sink.batchStore().Flush()
|
||||
v := suite.sink.ReadValue(sourceRef.TargetHash()).(types.Struct)
|
||||
suite.NotNil(v)
|
||||
suite.True(srcL.Equals(v.Get(ValueField)))
|
||||
}
|
||||
|
||||
// Source: -6-> C2(L4) -1-> N
|
||||
// . \ -4-> L3 -1-> N
|
||||
// . \ -3-> L2 -1-> N
|
||||
// . \ - "oy!"
|
||||
// 5 \ -2-> L1 -1-> N
|
||||
// . \ -1-> L0
|
||||
// C1(L4) -1-> N
|
||||
// \ -4-> L3 -1-> N
|
||||
// \ -3-> L2 -1-> N
|
||||
// \ -2-> L1 -1-> N
|
||||
// \ -1-> L0
|
||||
// Sink: -5-> C1(L4) -1-> N
|
||||
// \ -4-> L3 -1-> N
|
||||
// \ -3-> L2 -1-> N
|
||||
// \ -2-> L1 -1-> N
|
||||
// \ -1-> L0
|
||||
func (suite *PullSuite) TestPullUpdates() {
|
||||
sinkL := buildListOfHeight(4, suite.sink)
|
||||
sinkRef := suite.commitToSink(sinkL, types.NewSet())
|
||||
expectedReads := suite.sinkCS.Reads
|
||||
|
||||
srcL := buildListOfHeight(4, suite.source)
|
||||
sourceRef := suite.commitToSource(srcL, types.NewSet())
|
||||
L3 := srcL.Get(1).(types.Ref).TargetValue(suite.source).(types.List)
|
||||
L2 := L3.Get(1).(types.Ref).TargetValue(suite.source).(types.List)
|
||||
L2 = L2.Append(suite.source.WriteValue(types.NewString("oy!")))
|
||||
L3 = L3.Set(1, suite.source.WriteValue(L2))
|
||||
srcL = srcL.Set(1, suite.source.WriteValue(L3))
|
||||
sourceRef = suite.commitToSource(srcL, types.NewSet(sourceRef))
|
||||
|
||||
Pull(suite.source, suite.sink, sourceRef, sinkRef)
|
||||
|
||||
if suite.sinkIsLocal() {
|
||||
// 3 objects read from sink: L3, L2 and C1 (when considering the shared commit).
|
||||
expectedReads += 3
|
||||
}
|
||||
suite.Equal(expectedReads, suite.sinkCS.Reads)
|
||||
|
||||
suite.sink.batchStore().Flush()
|
||||
v := suite.sink.ReadValue(sourceRef.TargetHash()).(types.Struct)
|
||||
suite.NotNil(v)
|
||||
suite.True(srcL.Equals(v.Get(ValueField)))
|
||||
}
|
||||
|
||||
func (suite *PullSuite) commitToSource(v types.Value, p types.Set) types.Ref {
|
||||
var err error
|
||||
suite.source, err = suite.source.Commit(dsID, NewCommit().Set(ValueField, v).Set(ParentsField, p))
|
||||
suite.NoError(err)
|
||||
return suite.source.HeadRef(dsID)
|
||||
}
|
||||
|
||||
func (suite *PullSuite) commitToSink(v types.Value, p types.Set) types.Ref {
|
||||
var err error
|
||||
suite.sink, err = suite.sink.Commit(dsID, NewCommit().Set(ValueField, v).Set(ParentsField, p))
|
||||
suite.NoError(err)
|
||||
return suite.sink.HeadRef(dsID)
|
||||
}
|
||||
|
||||
func buildListOfHeight(height int, vw types.ValueWriter) types.List {
|
||||
unique := 0
|
||||
l := types.NewList(types.Number(unique), types.Number(unique+1))
|
||||
unique += 2
|
||||
|
||||
for i := 0; i < height; i++ {
|
||||
r1, r2 := vw.WriteValue(types.Number(unique)), vw.WriteValue(l)
|
||||
unique++
|
||||
l = types.NewList(r1, r2)
|
||||
}
|
||||
return l
|
||||
}
|
||||
@@ -0,0 +1,92 @@
|
||||
# Dataset pulling algorithm
|
||||
The approach is to explore the chunk graph of both sink and source in order of decreasing ref-height. As the code walks, it uses the knowledge gained about which chunks are present in the sink to both prune the source-graph-walk and build up a set of `hints` that can be sent to a remote Database to aid in chunk validation.
|
||||
|
||||
## Basic algorithm
|
||||
|
||||
- let `sink` be the *sink* database
|
||||
- let `source` be the *source* database
|
||||
- let `snkQ` and `srcQ` be priority queues of `Ref` prioritized by highest `Ref.height`
|
||||
- let `hints` be a map of `hash => hash`
|
||||
- let `reachableChunks` be a set of hashes
|
||||
- let `snkHdRef` be the ref (of `Commit`) of the head of the *sink* dataset
|
||||
- let `srcHdRef` be the ref of the *source* `Commit`, which must descend from the `Commit` indicated by `snkHdRef`
|
||||
|
||||
- let `traverseSource(srcRef, srcQ, sink, source, reachableChunks)` be
|
||||
- pop `srcRef` from `srcQ`
|
||||
- if `!sink.has(srcRef)`
|
||||
- let `c` = `source.batchStore().Get(srcRef.targetHash)`
|
||||
- let `v` = `types.DecodeValue(c, source)`
|
||||
- insert all child refs, `cr`, from `v` into `srcQ` and into reachableRefs
|
||||
- `sink.batchStore().SchedulePut(c, srcRef.height, no hints)`
|
||||
- (hints will all be gathered and handed to sink.batchStore at the end)
|
||||
|
||||
|
||||
- let `traverseSink(sinkRef, snkQ, sink, hints)` be
|
||||
- pop `snkRef` from `snkQ`
|
||||
- if `snkRef.height` > 1
|
||||
- let `v` = `sink.readValue(snkRef.targetHash)`
|
||||
- insert all child refs, `cr`, from `v` into `snkQ` and `hints[cr] = snkRef`
|
||||
|
||||
|
||||
- let `traverseCommon(comRef, snkHdRef, snkQ, srcQ, sink, hints)` be
|
||||
- pop `comRef` from both `snkQ` and `srcQ`
|
||||
- if `comRef.height` > 1
|
||||
- if `comRef` is a `Ref` of `Commit`
|
||||
- let `v` = `sink.readValue(comRef.targetHash)`
|
||||
- if `comRef` == snkHdRef
|
||||
- *ignore all parent refs*
|
||||
- insert each other child ref `cr` from `v` into `snkQ` *only*, set `hints[cr] = comRef`
|
||||
- else
|
||||
- insert each child ref `cr` from `v` into both `snkQ` and `srcQ`, set `hints[cr] = comRef`
|
||||
|
||||
|
||||
- let `pull(source, sink, srcHdRef, sinkHdRef)
|
||||
- insert `snkHdRef` into `snkQ` and `srcHdRef` into `srcQ`
|
||||
- create empty `hints` and `reachableChunks`
|
||||
- while `srcQ` is non-empty
|
||||
- let `srcHt` and `snkHt` be the respective heights of the *top* `Ref` in each of `srcQ` and `snkQ`
|
||||
- if `srcHt` > `snkHt`, for every `srcHdRef` in `srcQ` which is of greater height than `snkHt`
|
||||
- `traverseSource(srcHdRef, srcQ, sink, source)`
|
||||
- else if `snkHt` > `srcHt`, for every `snkHdRef` in `snkQ` which is of greater height than `srcHt`
|
||||
- `traverseSink(snkHdRef, snkQ, sink)`
|
||||
- else
|
||||
- for every `comRef` in which is common to `snkQ` and `srcQ` which is of height `srcHt` (and `snkHt`)
|
||||
- `traverseCommon(comRef, snkHdRef, snkQ, srcQ, sink, hints)`
|
||||
- for every `ref` in `srcQ` which is of height `srcHt`
|
||||
- `traverseSource(ref, srcQ, sink, source, reachableChunks)`
|
||||
- for every `ref` in `snkQ` which is of height `snkHt`
|
||||
- `traverseSink(ref, snkQ, sink, hints)`
|
||||
- for all `hash` in `reachableChunks`
|
||||
- sink.batchStore().addHint(hints[hash])
|
||||
|
||||
|
||||
## Isomorphic, but less clear, algorithm
|
||||
|
||||
- let all identifiers be as above
|
||||
- let `traverseSource`, `traverseSink`, and `traverseCommon` be as above
|
||||
|
||||
- let `higherThan(refA, refB)` be
|
||||
- if refA.height == refB.height
|
||||
- return refA.targetHash < refB.targetHash
|
||||
- return refA.height > refB.height
|
||||
|
||||
- let `pull(source, sink, srcHdRef, sinkHdRef)
|
||||
- insert `snkHdRef` into `snkQ` and `srcHdRef` into `srcQ`
|
||||
- create empty `hints` and `reachableChunks`
|
||||
- while `srcQ` is non-empty
|
||||
- if `sinkQ` is empty
|
||||
- pop `ref` from `srcQ`
|
||||
- `traverseSource(ref, srcQ, sink, source, reachableChunks))
|
||||
- else if `higherThan(head of srcQ, head of snkQ)`
|
||||
- pop `ref` from `srcQ`
|
||||
- `traverseSource(ref, srcQ, sink, source, reachableChunks))
|
||||
- else if `higherThan(head of snkQ, head of srcQ)`
|
||||
- pop `ref` from `snkQ`
|
||||
- `traverseSink(ref, snkQ, sink, hints)`
|
||||
- else, heads of both queues are the same
|
||||
- pop `comRef` from `snkQ` and `srcQ`
|
||||
- `traverseCommon(comRef, snkHdRef, snkQ, srcQ, sink, hints)`
|
||||
- for all `hash` in `reachableChunks`
|
||||
- sink.batchStore().addHint(hints[hash])
|
||||
|
||||
|
||||
@@ -16,12 +16,7 @@ type RemoteDatabaseClient struct {
|
||||
|
||||
func NewRemoteDatabase(baseURL, auth string) *RemoteDatabaseClient {
|
||||
httpBS := newHTTPBatchStore(baseURL, auth)
|
||||
return &RemoteDatabaseClient{newDatabaseCommon(types.NewValueStore(httpBS), httpBS)}
|
||||
}
|
||||
|
||||
func (rds *RemoteDatabaseClient) batchSink() batchSink {
|
||||
httpBS := rds.vs.BatchStore().(*httpBatchStore)
|
||||
return newNotABatchSink(httpBS.host, httpBS.auth)
|
||||
return &RemoteDatabaseClient{newDatabaseCommon(newCachingChunkHaver(httpBS), types.NewValueStore(httpBS), httpBS)}
|
||||
}
|
||||
|
||||
func (rds *RemoteDatabaseClient) batchStore() types.BatchStore {
|
||||
@@ -31,13 +26,13 @@ func (rds *RemoteDatabaseClient) batchStore() types.BatchStore {
|
||||
func (rds *RemoteDatabaseClient) Commit(datasetID string, commit types.Struct) (Database, error) {
|
||||
err := rds.commit(datasetID, commit)
|
||||
rds.vs.Flush()
|
||||
return &RemoteDatabaseClient{newDatabaseCommon(rds.vs, rds.rt)}, err
|
||||
return &RemoteDatabaseClient{newDatabaseCommon(rds.cch, rds.vs, rds.rt)}, err
|
||||
}
|
||||
|
||||
func (rds *RemoteDatabaseClient) Delete(datasetID string) (Database, error) {
|
||||
err := rds.doDelete(datasetID)
|
||||
rds.vs.Flush()
|
||||
return &RemoteDatabaseClient{newDatabaseCommon(rds.vs, rds.rt)}, err
|
||||
return &RemoteDatabaseClient{newDatabaseCommon(rds.cch, rds.vs, rds.rt)}, err
|
||||
}
|
||||
|
||||
func (f RemoteStoreFactory) CreateStore(ns string) Database {
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"compress/gzip"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
@@ -76,7 +77,11 @@ func HandleWriteValue(w http.ResponseWriter, req *http.Request, ps URLParams, cs
|
||||
d.Exp.Equal("POST", req.Method)
|
||||
|
||||
reader := bodyReader(req)
|
||||
defer reader.Close()
|
||||
defer func() {
|
||||
// Ensure all data on reader is consumed
|
||||
io.Copy(ioutil.Discard, reader)
|
||||
reader.Close()
|
||||
}()
|
||||
vbs := types.NewValidatingBatchingSink(cs)
|
||||
vbs.Prepare(deserializeHints(reader))
|
||||
|
||||
|
||||
@@ -74,11 +74,10 @@ func (ds *Dataset) CommitWithParents(v types.Value, p types.Set) (Dataset, error
|
||||
}
|
||||
|
||||
func (ds *Dataset) Pull(sourceStore datas.Database, sourceRef types.Ref, concurrency int) (Dataset, error) {
|
||||
_, topDown := ds.Database().(*datas.LocalDatabase)
|
||||
return ds.pull(sourceStore, sourceRef, concurrency, topDown)
|
||||
return ds.pull(sourceStore, sourceRef, concurrency)
|
||||
}
|
||||
|
||||
func (ds *Dataset) pull(source datas.Database, sourceRef types.Ref, concurrency int, topDown bool) (Dataset, error) {
|
||||
func (ds *Dataset) pull(source datas.Database, sourceRef types.Ref, concurrency int) (Dataset, error) {
|
||||
sink := *ds
|
||||
|
||||
sinkHeadRef := types.Ref{}
|
||||
@@ -90,12 +89,7 @@ func (ds *Dataset) pull(source datas.Database, sourceRef types.Ref, concurrency
|
||||
return sink, nil
|
||||
}
|
||||
|
||||
if topDown {
|
||||
datas.CopyMissingChunksP(source, sink.Database().(*datas.LocalDatabase), sourceRef, concurrency)
|
||||
} else {
|
||||
datas.CopyReachableChunksP(source, sink.Database(), sourceRef, sinkHeadRef, concurrency)
|
||||
}
|
||||
|
||||
datas.Pull(source, sink.Database(), sourceRef, sinkHeadRef)
|
||||
err := datas.ErrOptimisticLockFailed
|
||||
for ; err == datas.ErrOptimisticLockFailed; sink, err = sink.setNewHead(sourceRef) {
|
||||
}
|
||||
|
||||
+6
-30
@@ -41,7 +41,7 @@ func NewSet(ds Dataset, vs ...types.Value) types.Ref {
|
||||
return ds.Database().WriteValue(v)
|
||||
}
|
||||
|
||||
func pullTest(t *testing.T, topdown bool) {
|
||||
func TestPullTopDown(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
sink := createTestDataset("sink")
|
||||
@@ -73,20 +73,12 @@ func pullTest(t *testing.T, topdown bool) {
|
||||
source, err = source.Commit(updatedValue)
|
||||
assert.NoError(err)
|
||||
|
||||
sink, err = sink.pull(source.Database(), types.NewRef(source.Head()), 1, topdown)
|
||||
sink, err = sink.pull(source.Database(), types.NewRef(source.Head()), 1)
|
||||
assert.NoError(err)
|
||||
assert.True(source.Head().Equals(sink.Head()))
|
||||
}
|
||||
|
||||
func TestPullTopDown(t *testing.T) {
|
||||
pullTest(t, true)
|
||||
}
|
||||
|
||||
func TestPullExclude(t *testing.T) {
|
||||
pullTest(t, false)
|
||||
}
|
||||
|
||||
func pullFirstCommit(t *testing.T, topdown bool) {
|
||||
func TestPullFirstCommitTopDown(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
sink := createTestDataset("sink")
|
||||
@@ -102,20 +94,12 @@ func pullFirstCommit(t *testing.T, topdown bool) {
|
||||
source, err := source.Commit(sourceInitialValue)
|
||||
assert.NoError(err)
|
||||
|
||||
sink, err = sink.pull(source.Database(), types.NewRef(source.Head()), 1, topdown)
|
||||
sink, err = sink.pull(source.Database(), types.NewRef(source.Head()), 1)
|
||||
assert.NoError(err)
|
||||
assert.True(source.Head().Equals(sink.Head()))
|
||||
}
|
||||
|
||||
func TestPullFirstCommitTopDown(t *testing.T) {
|
||||
pullFirstCommit(t, true)
|
||||
}
|
||||
|
||||
func TestPullFirstCommitExclude(t *testing.T) {
|
||||
pullFirstCommit(t, false)
|
||||
}
|
||||
|
||||
func pullDeepRef(t *testing.T, topdown bool) {
|
||||
func TestPullDeepRefTopDown(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
|
||||
sink := createTestDataset("sink")
|
||||
@@ -129,15 +113,7 @@ func pullDeepRef(t *testing.T, topdown bool) {
|
||||
source, err := source.Commit(sourceInitialValue)
|
||||
assert.NoError(err)
|
||||
|
||||
sink, err = sink.pull(source.Database(), types.NewRef(source.Head()), 1, topdown)
|
||||
sink, err = sink.pull(source.Database(), types.NewRef(source.Head()), 1)
|
||||
assert.NoError(err)
|
||||
assert.True(source.Head().Equals(sink.Head()))
|
||||
}
|
||||
|
||||
func TestPullDeepRefTopDown(t *testing.T) {
|
||||
pullDeepRef(t, true)
|
||||
}
|
||||
|
||||
func TestPullDeepRefExclude(t *testing.T) {
|
||||
pullDeepRef(t, false)
|
||||
}
|
||||
|
||||
+14
-8
@@ -20,6 +20,9 @@ type BatchStore interface {
|
||||
// c may or may not be persisted when Put() returns, but is guaranteed to be persistent after a call to Flush() or Close().
|
||||
SchedulePut(c chunks.Chunk, refHeight uint64, hints Hints)
|
||||
|
||||
// AddHints allows additional hints, as used by SchedulePut, to be added for use in the current batch.
|
||||
AddHints(hints Hints)
|
||||
|
||||
// Flush causes enqueued Puts to be persisted.
|
||||
Flush()
|
||||
io.Closer
|
||||
@@ -39,20 +42,23 @@ func NewBatchStoreAdaptor(cs chunks.ChunkStore) BatchStore {
|
||||
}
|
||||
|
||||
// Get simply proxies to the backing ChunkStore
|
||||
func (lbs *BatchStoreAdaptor) Get(h hash.Hash) chunks.Chunk {
|
||||
return lbs.cs.Get(h)
|
||||
func (bsa *BatchStoreAdaptor) Get(h hash.Hash) chunks.Chunk {
|
||||
return bsa.cs.Get(h)
|
||||
}
|
||||
|
||||
// SchedulePut simply calls Put on the underlying ChunkStore, and ignores hints.
|
||||
func (lbs *BatchStoreAdaptor) SchedulePut(c chunks.Chunk, refHeight uint64, hints Hints) {
|
||||
lbs.cs.Put(c)
|
||||
func (bsa *BatchStoreAdaptor) SchedulePut(c chunks.Chunk, refHeight uint64, hints Hints) {
|
||||
bsa.cs.Put(c)
|
||||
}
|
||||
|
||||
// AddHints is a noop.
|
||||
func (bsa *BatchStoreAdaptor) AddHints(hints Hints) {}
|
||||
|
||||
// Flush is a noop.
|
||||
func (lbs *BatchStoreAdaptor) Flush() {}
|
||||
func (bsa *BatchStoreAdaptor) Flush() {}
|
||||
|
||||
// Close closes the underlying ChunkStore
|
||||
func (lbs *BatchStoreAdaptor) Close() error {
|
||||
lbs.Flush()
|
||||
return lbs.cs.Close()
|
||||
func (bsa *BatchStoreAdaptor) Close() error {
|
||||
bsa.Flush()
|
||||
return bsa.cs.Close()
|
||||
}
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
// Copyright 2016 The Noms Authors. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package types
|
||||
|
||||
// RefHeap implements heap.Interface (which includes sort.Interface) as a height based priority queue.
|
||||
type RefHeap []Ref
|
||||
|
||||
func (h RefHeap) Len() int {
|
||||
return len(h)
|
||||
}
|
||||
|
||||
func (h RefHeap) Less(i, j int) bool {
|
||||
return HigherThan(h[i], h[j])
|
||||
}
|
||||
|
||||
func (h RefHeap) Swap(i, j int) {
|
||||
h[i], h[j] = h[j], h[i]
|
||||
}
|
||||
|
||||
func (h *RefHeap) Push(r interface{}) {
|
||||
*h = append(*h, r.(Ref))
|
||||
}
|
||||
|
||||
func (h *RefHeap) Pop() interface{} {
|
||||
old := *h
|
||||
n := len(old)
|
||||
x := old[n-1]
|
||||
*h = old[0 : n-1]
|
||||
return x
|
||||
}
|
||||
|
||||
func (h *RefHeap) Empty() bool {
|
||||
return len(*h) == 0
|
||||
}
|
||||
|
||||
// HigherThan returns true if a is 'higher than' b, generally if its ref-height is greater. If the two are of the same height, fall back to sorting by TargetHash.
|
||||
func HigherThan(a, b Ref) bool {
|
||||
if a.Height() == b.Height() {
|
||||
return a.TargetHash().Less(b.TargetHash())
|
||||
}
|
||||
// > because we want the larger heights to be at the start of the queue.
|
||||
return a.Height() > b.Height()
|
||||
|
||||
}
|
||||
|
||||
// RefSlice implements sort.Interface to order by target ref.
|
||||
type RefSlice []Ref
|
||||
|
||||
func (s RefSlice) Len() int {
|
||||
return len(s)
|
||||
}
|
||||
|
||||
func (s RefSlice) Less(i, j int) bool {
|
||||
return s[i].TargetHash().Less(s[j].TargetHash())
|
||||
}
|
||||
|
||||
func (s RefSlice) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
}
|
||||
@@ -0,0 +1,70 @@
|
||||
// Copyright 2016 The Noms Authors. All rights reserved.
|
||||
// Licensed under the Apache License, version 2.0:
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
package types
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"testing"
|
||||
|
||||
"github.com/attic-labs/testify/assert"
|
||||
)
|
||||
|
||||
func TestRefHeap(t *testing.T) {
|
||||
unique := 0
|
||||
newRefWithHeight := func(height uint64) Ref {
|
||||
r := NewRef(Number(unique))
|
||||
unique++
|
||||
r.height = height
|
||||
return r
|
||||
}
|
||||
|
||||
assert := assert.New(t)
|
||||
|
||||
h := RefHeap{}
|
||||
heap.Init(&h)
|
||||
|
||||
r1 := newRefWithHeight(1)
|
||||
r2 := newRefWithHeight(2)
|
||||
r3 := newRefWithHeight(3)
|
||||
r4 := newRefWithHeight(2)
|
||||
|
||||
heap.Push(&h, r1)
|
||||
assert.Equal(r1, h[0])
|
||||
assert.Equal(1, len(h))
|
||||
|
||||
heap.Push(&h, r3)
|
||||
assert.Equal(r3, h[0])
|
||||
assert.Equal(2, len(h))
|
||||
|
||||
heap.Push(&h, r2)
|
||||
assert.Equal(r3, h[0])
|
||||
assert.Equal(3, len(h))
|
||||
|
||||
heap.Push(&h, r4)
|
||||
assert.Equal(r3, h[0])
|
||||
assert.Equal(4, len(h))
|
||||
|
||||
expectedSecond, expectedThird := func() (Ref, Ref) {
|
||||
if r2.TargetHash().Less(r4.TargetHash()) {
|
||||
return r2, r4
|
||||
}
|
||||
return r4, r2
|
||||
}()
|
||||
|
||||
assert.Equal(r3, heap.Pop(&h).(Ref))
|
||||
assert.Equal(expectedSecond, h[0])
|
||||
assert.Equal(3, len(h))
|
||||
|
||||
assert.Equal(expectedSecond, heap.Pop(&h).(Ref))
|
||||
assert.Equal(expectedThird, h[0])
|
||||
assert.Equal(2, len(h))
|
||||
|
||||
assert.Equal(expectedThird, heap.Pop(&h).(Ref))
|
||||
assert.Equal(r1, h[0])
|
||||
assert.Equal(1, len(h))
|
||||
|
||||
assert.Equal(r1, heap.Pop(&h).(Ref))
|
||||
assert.Equal(0, len(h))
|
||||
}
|
||||
Reference in New Issue
Block a user