Allow Dataset HEAD to be set to non-fastforward commits (#2271)

The Dataset.Commit() code pathway still enforces fast-forward-only
behavior, but a new SetHead() method allows the HEAD of a Dataset to
be forced to any other Commit.

noms sync detects the case where the source Commit is not a descendent
of the provided sink Dataset's HEAD and uses the new API to force the
sink to the desired new Commit, printing out the now-abandoned old
HEAD.

Fixes #2240
This commit is contained in:
cmasone-attic
2016-08-05 15:38:41 -07:00
committed by GitHub
parent fd034f8c81
commit 65edbaabe3
11 changed files with 116 additions and 39 deletions

View File

@@ -77,10 +77,19 @@ func runSync(args []string) int {
lastProgressCh <- last
}()
sourceRef := types.NewRef(sourceObj)
sinkRef, _ := sinkDataset.MaybeHeadRef()
nonFF := false
err = d.Try(func() {
defer profile.MaybeStartProfile().Stop()
sinkDataset.Pull(sourceStore, sourceRef, p, progressCh)
var err error
sinkDataset, err = sinkDataset.Pull(sourceStore, types.NewRef(sourceObj), p, progressCh)
sinkDataset, err = sinkDataset.FastForward(sourceRef)
if err == datas.ErrMergeNeeded {
sinkDataset, err = sinkDataset.SetHead(sourceRef)
nonFF = true
}
d.PanicIfError(err)
})
@@ -92,8 +101,10 @@ func runSync(args []string) int {
if last := <-lastProgressCh; last.DoneCount > 0 {
status.Printf("Done - Synced %s in %s (%s/s)", humanize.Bytes(last.DoneBytes), since(start), bytesPerSec(last, start))
status.Done()
} else if nonFF && !sourceRef.Equals(sinkRef) {
fmt.Printf("Abandoning %s; new head is %s\n", sinkRef.TargetHash(), sourceRef.TargetHash())
} else {
fmt.Println(args[0], "is up to date.")
fmt.Println(args[1], "is up to date.")
}
return 0

View File

@@ -70,5 +70,23 @@ func (s *nomsSyncTestSuite) TestSync() {
dest = dataset.NewDataset(datas.NewDatabase(chunks.NewLevelDBStore(ldb2dir, "", 1, false)), "bar")
s.True(types.Number(43).Equals(dest.HeadValue()))
dest.Database().Close()
}
func (s *nomsSyncTestSuite) TestRewind() {
var err error
source1 := dataset.NewDataset(datas.NewDatabase(chunks.NewLevelDBStore(s.LdbDir, "", 1, false)), "foo")
source1, err = source1.CommitValue(types.Number(42))
s.NoError(err)
rewindRef := source1.HeadRef().TargetHash()
source1, err = source1.CommitValue(types.Number(43))
s.NoError(err)
source1.Database().Close() // Close Database backing both Datasets
sourceSpec := spec.CreateValueSpecString("ldb", s.LdbDir, "#"+rewindRef.String())
sinkDatasetSpec := spec.CreateValueSpecString("ldb", s.LdbDir, "foo")
s.Run(main, []string{"sync", sourceSpec, sinkDatasetSpec})
dest := dataset.NewDataset(datas.NewDatabase(chunks.NewLevelDBStore(s.LdbDir, "", 1, false)), "foo")
s.True(types.Number(42).Equals(dest.HeadValue()))
dest.Database().Close()
}

View File

@@ -122,6 +122,6 @@ func IsCommitType(t *types.Type) bool {
return types.IsSubtype(valueCommitType, t)
}
func isRefOfCommitType(t *types.Type) bool {
func IsRefOfCommitType(t *types.Type) bool {
return t.Kind() == types.RefKind && IsCommitType(getRefElementType(t))
}

View File

@@ -34,12 +34,15 @@ type Database interface {
// Datasets returns the root of the database which is a MapOfStringToRefOfCommit where string is a datasetID.
Datasets() types.Map
// Commit updates the Commit that datasetID in this database points at. All Values that have been written to this Database are guaranteed to be persistent after Commit(). If the update cannot be performed, e.g., because of a conflict, error will non-nil. The newest snapshot of the database is always returned.
// Commit updates the Commit that datasetID in this database points at. All Values that have been written to this Database are guaranteed to be persistent after Commit(). If the update cannot be performed, e.g., because of a conflict, error will be non-nil. The newest snapshot of the database is always returned.
Commit(datasetID string, commit types.Struct) (Database, error)
// Delete removes the Dataset named datasetID from the map at the root of the Database. The Dataset data is not necessarily cleaned up at this time, but may be garbage collected in the future. If the update cannot be performed, e.g., because of a conflict, error will non-nil. The newest snapshot of the database is always returned.
Delete(datasetID string) (Database, error)
// SetHead sets the Commit that datasetID in this database points at. All Values that have been written to this Database are guaranteed to be persistent after SetHead(). If the update cannot be performed, e.g., because of a conflict, error will be non-nil. The newest snapshot of the database is always returned.
SetHead(datasetID string, commit types.Struct) (Database, error)
has(hash hash.Hash) bool
validatingBatchStore() types.BatchStore
}

View File

@@ -69,6 +69,11 @@ func (ds *databaseCommon) Datasets() types.Map {
return *ds.datasets
}
func (ds *databaseCommon) datasetsFromRef(datasetsRef hash.Hash) *types.Map {
c := ds.ReadValue(datasetsRef).(types.Map)
return &c
}
func (ds *databaseCommon) has(h hash.Hash) bool {
return ds.cch.Has(h)
}
@@ -85,22 +90,22 @@ func (ds *databaseCommon) Close() error {
return ds.vs.Close()
}
func (ds *databaseCommon) datasetsFromRef(datasetsRef hash.Hash) *types.Map {
c := ds.ReadValue(datasetsRef).(types.Map)
return &c
}
func (ds *databaseCommon) commit(datasetID string, commit types.Struct) error {
func (ds *databaseCommon) doSetHead(datasetID string, commit types.Struct) error {
d.PanicIfTrue(!IsCommitType(commit.Type()), "Can't commit a non-Commit struct to dataset %s", datasetID)
return ds.doCommit(datasetID, commit)
currentRootRef, currentDatasets := ds.getRootAndDatasets()
commitRef := ds.WriteValue(commit) // will be orphaned if the tryUpdateRoot() below fails
currentDatasets = currentDatasets.Set(types.String(datasetID), commitRef)
return ds.tryUpdateRoot(currentDatasets, currentRootRef)
}
// doCommit manages concurrent access the single logical piece of mutable state: the current Root. doCommit is optimistic in that it is attempting to update head making the assumption that currentRootRef is the hash of the current head. The call to UpdateRoot below will return an 'ErrOptimisticLockFailed' error if that assumption fails (e.g. because of a race with another writer) and the entire algorithm must be tried again. This method will also fail and return an 'ErrMergeNeeded' error if the |commit| is not a descendent of the current dataset head
func (ds *databaseCommon) doCommit(datasetID string, commit types.Struct) error {
currentRootRef, currentDatasets := ds.getRootAndDatasets()
d.PanicIfTrue(!IsCommitType(commit.Type()), "Can't commit a non-Commit struct to dataset %s", datasetID)
// TODO: This Commit will be orphaned if the tryUpdateRoot() below fails
commitRef := ds.WriteValue(commit)
currentRootRef, currentDatasets := ds.getRootAndDatasets()
commitRef := ds.WriteValue(commit) // will be orphaned if the tryUpdateRoot() below fails
// First commit in store is always fast-foward.
if !currentRootRef.IsEmpty() {

View File

@@ -25,7 +25,7 @@ func newLocalDatabase(cs chunks.ChunkStore) *LocalDatabase {
}
func (lds *LocalDatabase) Commit(datasetID string, commit types.Struct) (Database, error) {
err := lds.commit(datasetID, commit)
err := lds.doCommit(datasetID, commit)
return &LocalDatabase{newDatabaseCommon(lds.cch, lds.vs, lds.rt), lds.cs}, err
}
@@ -34,6 +34,11 @@ func (lds *LocalDatabase) Delete(datasetID string) (Database, error) {
return &LocalDatabase{newDatabaseCommon(lds.cch, lds.vs, lds.rt), lds.cs}, err
}
func (lds *LocalDatabase) SetHead(datasetID string, commit types.Struct) (Database, error) {
err := lds.doSetHead(datasetID, commit)
return &LocalDatabase{newDatabaseCommon(lds.cch, lds.vs, lds.rt), lds.cs}, err
}
func (lds *LocalDatabase) validatingBatchStore() (bs types.BatchStore) {
bs = lds.vs.BatchStore()
if !bs.IsValidating() {

View File

@@ -21,12 +21,17 @@ type PullProgress struct {
func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency int, progressCh chan PullProgress) {
srcQ, sinkQ := &types.RefByHeight{sourceRef}, &types.RefByHeight{sinkHeadRef}
// If the sourceRef points to an object already in sinkDB, there's nothing to do.
if sinkDB.has(sourceRef.TargetHash()) {
return
}
// We generally expect that sourceRef descends from sinkHeadRef, so that walking down from sinkHeadRef yields useful hints. If it's not even in the srcDB, then just clear out sinkQ right now and don't bother.
if !srcDB.has(sinkHeadRef.TargetHash()) {
sinkQ.PopBack()
}
// Since we expect sourceRef to descend from sinkHeadRef, we assume srcDB has a superset of the data in sinkDB. There are some cases where, logically, the code wants to read data it knows to be in sinkDB. In this case, it doesn't actually matter which Database the data comes from, so as an optimization we use whichever is a LocalDatabase -- if either is.
// Since we expect sinkHeadRef to descend from sourceRef, we assume srcDB has a superset of the data in sinkDB. There are some cases where, logically, the code wants to read data it knows to be in sinkDB. In this case, it doesn't actually matter which Database the data comes from, so as an optimization we use whichever is a LocalDatabase -- if either is.
mostLocalDB := srcDB
if _, ok := sinkDB.(*LocalDatabase); ok {
mostLocalDB = sinkDB
@@ -239,7 +244,7 @@ func traverseSink(sinkRef types.Ref, db Database) traverseResult {
}
func traverseCommon(comRef, sinkHead types.Ref, db Database) traverseResult {
if comRef.Height() > 1 && isRefOfCommitType(comRef.Type()) {
if comRef.Height() > 1 && IsRefOfCommitType(comRef.Type()) {
commit := comRef.TargetValue(db).(types.Struct)
// We don't want to traverse the parents of sinkHead, but we still want to traverse its Value on the sinkDB side. We also still want to traverse all children, in both the srcDB and sinkDB, of any common Commit that is not at the Head of sinkDB.
exclusionSet := types.NewSet()

View File

@@ -27,7 +27,7 @@ func (rds *RemoteDatabaseClient) validatingBatchStore() (bs types.BatchStore) {
}
func (rds *RemoteDatabaseClient) Commit(datasetID string, commit types.Struct) (Database, error) {
err := rds.commit(datasetID, commit)
err := rds.doCommit(datasetID, commit)
return &RemoteDatabaseClient{newDatabaseCommon(rds.cch, rds.vs, rds.rt)}, err
}
@@ -36,6 +36,11 @@ func (rds *RemoteDatabaseClient) Delete(datasetID string) (Database, error) {
return &RemoteDatabaseClient{newDatabaseCommon(rds.cch, rds.vs, rds.rt)}, err
}
func (rds *RemoteDatabaseClient) SetHead(datasetID string, commit types.Struct) (Database, error) {
err := rds.doSetHead(datasetID, commit)
return &RemoteDatabaseClient{newDatabaseCommon(rds.cch, rds.vs, rds.rt)}, err
}
func (f RemoteStoreFactory) CreateStore(ns string) Database {
return NewRemoteDatabase(f.host+httprouter.CleanPath(ns), f.auth)
}

View File

@@ -255,7 +255,7 @@ func handleRootPost(w http.ResponseWriter, req *http.Request, ps URLParams, cs c
func isMapOfStringToRefOfCommit(m types.Map) bool {
mapTypes := m.Type().Desc.(types.CompoundDesc).ElemTypes
keyType, valType := mapTypes[0], mapTypes[1]
return keyType.Kind() == types.StringKind && (isRefOfCommitType(valType) || isUnionOfRefOfCommitType(valType))
return keyType.Kind() == types.StringKind && (IsRefOfCommitType(valType) || isUnionOfRefOfCommitType(valType))
}
func isUnionOfRefOfCommitType(t *types.Type) bool {
@@ -263,7 +263,7 @@ func isUnionOfRefOfCommitType(t *types.Type) bool {
return false
}
for _, et := range t.Desc.(types.CompoundDesc).ElemTypes {
if !isRefOfCommitType(et) {
if !IsRefOfCommitType(et) {
return false
}
}

View File

@@ -102,28 +102,43 @@ func (ds *Dataset) Commit(v types.Value, opts CommitOptions) (Dataset, error) {
return Dataset{store, ds.id}, err
}
func (ds *Dataset) Pull(sourceStore datas.Database, sourceRef types.Ref, concurrency int, progressCh chan datas.PullProgress) (Dataset, error) {
sink := *ds
// Pull objects that descend from sourceRef in srcDB into sinkDB, using at most the given degree of concurrency. Progress will be reported over progressCh as the algorithm works. Objects that are already present in ds will not be pulled over.
func (ds *Dataset) Pull(sourceDB datas.Database, sourceRef types.Ref, concurrency int, progressCh chan datas.PullProgress) {
sinkHeadRef := types.Ref{}
if currentHeadRef, ok := sink.MaybeHeadRef(); ok {
if currentHeadRef, ok := ds.MaybeHeadRef(); ok {
sinkHeadRef = currentHeadRef
}
datas.Pull(sourceDB, ds.Database(), sourceRef, sinkHeadRef, concurrency, progressCh)
}
if sourceRef == sinkHeadRef {
return sink, nil
// FastForward takes a types.Ref to a Commit object and makes it the new Head of ds iff it is a descendant of the current Head. Intended to be used e.g. after a call to Pull(). If the update cannot be performed, e.g., because another process moved the current Head out from under you, err will be non-nil. The newest snapshot of the Dataset is always returned, so the caller an easily retry using the latest.
func (ds *Dataset) FastForward(newHeadRef types.Ref) (sink Dataset, err error) {
sink = *ds
if currentHeadRef, ok := sink.MaybeHeadRef(); ok && newHeadRef == currentHeadRef {
return
} else if newHeadRef.Height() <= currentHeadRef.Height() {
return sink, datas.ErrMergeNeeded
}
datas.Pull(sourceStore, sink.Database(), sourceRef, sinkHeadRef, concurrency, progressCh)
err := datas.ErrOptimisticLockFailed
for ; err == datas.ErrOptimisticLockFailed; sink, err = sink.setNewHead(sourceRef) {
for err = datas.ErrOptimisticLockFailed; err == datas.ErrOptimisticLockFailed; sink, err = sink.commitNewHead(newHeadRef) {
}
return
}
// SetHead takes a types.Ref to a Commit object and makes it the new Head of ds. Intended to be used e.g. when rewinding in ds' Commit history. If the update cannot be performed, e.g., because the state of ds.Database() changed out from under you, err will be non-nil. The newest snapshot of the Dataset is always returned, so the caller an easily retry using the latest.
func (ds *Dataset) SetHead(newHeadRef types.Ref) (sink Dataset, err error) {
sink = *ds
if currentHeadRef, ok := sink.MaybeHeadRef(); ok && newHeadRef == currentHeadRef {
return
}
return sink, err
commit := sink.validateRefAsCommit(newHeadRef)
store, err := sink.Database().SetHead(sink.id, commit)
return Dataset{store, sink.id}, err
}
func (ds *Dataset) validateRefAsCommit(r types.Ref) types.Struct {
v := ds.store.ReadValue(r.TargetHash())
v := ds.Database().ReadValue(r.TargetHash())
if v == nil {
panic(r.TargetHash().String() + " not found")
@@ -134,8 +149,8 @@ func (ds *Dataset) validateRefAsCommit(r types.Ref) types.Struct {
return v.(types.Struct)
}
// setNewHead attempts to make the object pointed to by newHeadRef the new Head of ds. First, it checks that the object exists in ds and validates that it decodes to the correct type of value. Next, it attempts to commit the object to ds.Database(). This may fail if, for instance, the Head of ds has been changed by another goroutine or process. In the event that the commit fails, the error from Database().Commit() is returned along with a new Dataset that's at it's proper, current Head. The caller should take any necessary corrective action and try again using this new Dataset.
func (ds *Dataset) setNewHead(newHeadRef types.Ref) (Dataset, error) {
// commitNewHead attempts to make the object pointed to by newHeadRef the new Head of ds. First, it checks that the object exists in ds and validates that it decodes to the correct type of value. Next, it attempts to commit the object to ds.Database(). This may fail if, for instance, the Head of ds has been changed by another goroutine or process. In the event that the commit fails, the error from Database().Commit() is returned along with a new Dataset that's at it's proper, current Head. The caller should take any necessary corrective action and try again using this new Dataset.
func (ds *Dataset) commitNewHead(newHeadRef types.Ref) (Dataset, error) {
commit := ds.validateRefAsCommit(newHeadRef)
store, err := ds.Database().Commit(ds.id, commit)
return Dataset{store, ds.id}, err

View File

@@ -73,7 +73,9 @@ func TestPullTopDown(t *testing.T) {
source, err = source.CommitValue(updatedValue)
assert.NoError(err)
sink, err = sink.Pull(source.Database(), types.NewRef(source.Head()), 1, nil)
srcHeadRef := types.NewRef(source.Head())
sink.Pull(source.Database(), srcHeadRef, 1, nil)
sink, err = sink.FastForward(srcHeadRef)
assert.NoError(err)
assert.True(source.Head().Equals(sink.Head()))
}
@@ -94,7 +96,9 @@ func TestPullFirstCommitTopDown(t *testing.T) {
source, err := source.CommitValue(sourceInitialValue)
assert.NoError(err)
sink, err = sink.Pull(source.Database(), types.NewRef(source.Head()), 1, nil)
srcHeadRef := types.NewRef(source.Head())
sink.Pull(source.Database(), srcHeadRef, 1, nil)
sink, err = sink.FastForward(srcHeadRef)
assert.NoError(err)
assert.True(source.Head().Equals(sink.Head()))
}
@@ -113,7 +117,9 @@ func TestPullDeepRefTopDown(t *testing.T) {
source, err := source.CommitValue(sourceInitialValue)
assert.NoError(err)
sink, err = sink.Pull(source.Database(), types.NewRef(source.Head()), 1, nil)
srcHeadRef := types.NewRef(source.Head())
sink.Pull(source.Database(), srcHeadRef, 1, nil)
sink, err = sink.FastForward(srcHeadRef)
assert.NoError(err)
assert.True(source.Head().Equals(sink.Head()))
}
@@ -154,10 +160,14 @@ func TestPullWithMeta(t *testing.T) {
assert.NoError(err)
h4 := source.Head()
sink, err = sink.Pull(source.Database(), types.NewRef(h2), 1, nil)
srcHeadRef := types.NewRef(h2)
sink.Pull(source.Database(), srcHeadRef, 1, nil)
sink, err = sink.FastForward(srcHeadRef)
assert.NoError(err)
sink, err = sink.Pull(source.Database(), types.NewRef(h4), 1, nil)
srcHeadRef = types.NewRef(h4)
sink.Pull(source.Database(), srcHeadRef, 1, nil)
sink, err = sink.FastForward(srcHeadRef)
assert.NoError(err)
assert.True(source.Head().Equals(sink.Head()))
}