From 65edbaabe321e44131ad5ddcb54ce012ff505442 Mon Sep 17 00:00:00 2001 From: cmasone-attic Date: Fri, 5 Aug 2016 15:38:41 -0700 Subject: [PATCH] Allow Dataset HEAD to be set to non-fastforward commits (#2271) The Dataset.Commit() code pathway still enforces fast-forward-only behavior, but a new SetHead() method allows the HEAD of a Dataset to be forced to any other Commit. noms sync detects the case where the source Commit is not a descendent of the provided sink Dataset's HEAD and uses the new API to force the sink to the desired new Commit, printing out the now-abandoned old HEAD. Fixes #2240 --- cmd/noms/noms_sync.go | 15 ++++++++-- cmd/noms/noms_sync_test.go | 20 +++++++++++++- go/datas/commit.go | 2 +- go/datas/database.go | 5 +++- go/datas/database_common.go | 25 ++++++++++------- go/datas/local_database.go | 7 ++++- go/datas/pull.go | 9 ++++-- go/datas/remote_database_client.go | 7 ++++- go/datas/remote_database_handlers.go | 4 +-- go/dataset/dataset.go | 41 +++++++++++++++++++--------- go/dataset/pull_test.go | 20 ++++++++++---- 11 files changed, 116 insertions(+), 39 deletions(-) diff --git a/cmd/noms/noms_sync.go b/cmd/noms/noms_sync.go index 76cd055ea7..c94f9e3fe1 100644 --- a/cmd/noms/noms_sync.go +++ b/cmd/noms/noms_sync.go @@ -77,10 +77,19 @@ func runSync(args []string) int { lastProgressCh <- last }() + sourceRef := types.NewRef(sourceObj) + sinkRef, _ := sinkDataset.MaybeHeadRef() + nonFF := false err = d.Try(func() { defer profile.MaybeStartProfile().Stop() + sinkDataset.Pull(sourceStore, sourceRef, p, progressCh) + var err error - sinkDataset, err = sinkDataset.Pull(sourceStore, types.NewRef(sourceObj), p, progressCh) + sinkDataset, err = sinkDataset.FastForward(sourceRef) + if err == datas.ErrMergeNeeded { + sinkDataset, err = sinkDataset.SetHead(sourceRef) + nonFF = true + } d.PanicIfError(err) }) @@ -92,8 +101,10 @@ func runSync(args []string) int { if last := <-lastProgressCh; last.DoneCount > 0 { status.Printf("Done - Synced %s in %s (%s/s)", humanize.Bytes(last.DoneBytes), since(start), bytesPerSec(last, start)) status.Done() + } else if nonFF && !sourceRef.Equals(sinkRef) { + fmt.Printf("Abandoning %s; new head is %s\n", sinkRef.TargetHash(), sourceRef.TargetHash()) } else { - fmt.Println(args[0], "is up to date.") + fmt.Println(args[1], "is up to date.") } return 0 diff --git a/cmd/noms/noms_sync_test.go b/cmd/noms/noms_sync_test.go index 5a68c34bb8..eba0351c18 100644 --- a/cmd/noms/noms_sync_test.go +++ b/cmd/noms/noms_sync_test.go @@ -70,5 +70,23 @@ func (s *nomsSyncTestSuite) TestSync() { dest = dataset.NewDataset(datas.NewDatabase(chunks.NewLevelDBStore(ldb2dir, "", 1, false)), "bar") s.True(types.Number(43).Equals(dest.HeadValue())) dest.Database().Close() - +} + +func (s *nomsSyncTestSuite) TestRewind() { + var err error + source1 := dataset.NewDataset(datas.NewDatabase(chunks.NewLevelDBStore(s.LdbDir, "", 1, false)), "foo") + source1, err = source1.CommitValue(types.Number(42)) + s.NoError(err) + rewindRef := source1.HeadRef().TargetHash() + source1, err = source1.CommitValue(types.Number(43)) + s.NoError(err) + source1.Database().Close() // Close Database backing both Datasets + + sourceSpec := spec.CreateValueSpecString("ldb", s.LdbDir, "#"+rewindRef.String()) + sinkDatasetSpec := spec.CreateValueSpecString("ldb", s.LdbDir, "foo") + s.Run(main, []string{"sync", sourceSpec, sinkDatasetSpec}) + + dest := dataset.NewDataset(datas.NewDatabase(chunks.NewLevelDBStore(s.LdbDir, "", 1, false)), "foo") + s.True(types.Number(42).Equals(dest.HeadValue())) + dest.Database().Close() } diff --git a/go/datas/commit.go b/go/datas/commit.go index 8ce4d00e37..b485cfe6c8 100644 --- a/go/datas/commit.go +++ b/go/datas/commit.go @@ -122,6 +122,6 @@ func IsCommitType(t *types.Type) bool { return types.IsSubtype(valueCommitType, t) } -func isRefOfCommitType(t *types.Type) bool { +func IsRefOfCommitType(t *types.Type) bool { return t.Kind() == types.RefKind && IsCommitType(getRefElementType(t)) } diff --git a/go/datas/database.go b/go/datas/database.go index b150ee8df1..83ce978170 100644 --- a/go/datas/database.go +++ b/go/datas/database.go @@ -34,12 +34,15 @@ type Database interface { // Datasets returns the root of the database which is a MapOfStringToRefOfCommit where string is a datasetID. Datasets() types.Map - // Commit updates the Commit that datasetID in this database points at. All Values that have been written to this Database are guaranteed to be persistent after Commit(). If the update cannot be performed, e.g., because of a conflict, error will non-nil. The newest snapshot of the database is always returned. + // Commit updates the Commit that datasetID in this database points at. All Values that have been written to this Database are guaranteed to be persistent after Commit(). If the update cannot be performed, e.g., because of a conflict, error will be non-nil. The newest snapshot of the database is always returned. Commit(datasetID string, commit types.Struct) (Database, error) // Delete removes the Dataset named datasetID from the map at the root of the Database. The Dataset data is not necessarily cleaned up at this time, but may be garbage collected in the future. If the update cannot be performed, e.g., because of a conflict, error will non-nil. The newest snapshot of the database is always returned. Delete(datasetID string) (Database, error) + // SetHead sets the Commit that datasetID in this database points at. All Values that have been written to this Database are guaranteed to be persistent after SetHead(). If the update cannot be performed, e.g., because of a conflict, error will be non-nil. The newest snapshot of the database is always returned. + SetHead(datasetID string, commit types.Struct) (Database, error) + has(hash hash.Hash) bool validatingBatchStore() types.BatchStore } diff --git a/go/datas/database_common.go b/go/datas/database_common.go index 4814b841c9..cf384c99fd 100644 --- a/go/datas/database_common.go +++ b/go/datas/database_common.go @@ -69,6 +69,11 @@ func (ds *databaseCommon) Datasets() types.Map { return *ds.datasets } +func (ds *databaseCommon) datasetsFromRef(datasetsRef hash.Hash) *types.Map { + c := ds.ReadValue(datasetsRef).(types.Map) + return &c +} + func (ds *databaseCommon) has(h hash.Hash) bool { return ds.cch.Has(h) } @@ -85,22 +90,22 @@ func (ds *databaseCommon) Close() error { return ds.vs.Close() } -func (ds *databaseCommon) datasetsFromRef(datasetsRef hash.Hash) *types.Map { - c := ds.ReadValue(datasetsRef).(types.Map) - return &c -} - -func (ds *databaseCommon) commit(datasetID string, commit types.Struct) error { +func (ds *databaseCommon) doSetHead(datasetID string, commit types.Struct) error { d.PanicIfTrue(!IsCommitType(commit.Type()), "Can't commit a non-Commit struct to dataset %s", datasetID) - return ds.doCommit(datasetID, commit) + + currentRootRef, currentDatasets := ds.getRootAndDatasets() + commitRef := ds.WriteValue(commit) // will be orphaned if the tryUpdateRoot() below fails + + currentDatasets = currentDatasets.Set(types.String(datasetID), commitRef) + return ds.tryUpdateRoot(currentDatasets, currentRootRef) } // doCommit manages concurrent access the single logical piece of mutable state: the current Root. doCommit is optimistic in that it is attempting to update head making the assumption that currentRootRef is the hash of the current head. The call to UpdateRoot below will return an 'ErrOptimisticLockFailed' error if that assumption fails (e.g. because of a race with another writer) and the entire algorithm must be tried again. This method will also fail and return an 'ErrMergeNeeded' error if the |commit| is not a descendent of the current dataset head func (ds *databaseCommon) doCommit(datasetID string, commit types.Struct) error { - currentRootRef, currentDatasets := ds.getRootAndDatasets() + d.PanicIfTrue(!IsCommitType(commit.Type()), "Can't commit a non-Commit struct to dataset %s", datasetID) - // TODO: This Commit will be orphaned if the tryUpdateRoot() below fails - commitRef := ds.WriteValue(commit) + currentRootRef, currentDatasets := ds.getRootAndDatasets() + commitRef := ds.WriteValue(commit) // will be orphaned if the tryUpdateRoot() below fails // First commit in store is always fast-foward. if !currentRootRef.IsEmpty() { diff --git a/go/datas/local_database.go b/go/datas/local_database.go index df4101db17..afdc717543 100644 --- a/go/datas/local_database.go +++ b/go/datas/local_database.go @@ -25,7 +25,7 @@ func newLocalDatabase(cs chunks.ChunkStore) *LocalDatabase { } func (lds *LocalDatabase) Commit(datasetID string, commit types.Struct) (Database, error) { - err := lds.commit(datasetID, commit) + err := lds.doCommit(datasetID, commit) return &LocalDatabase{newDatabaseCommon(lds.cch, lds.vs, lds.rt), lds.cs}, err } @@ -34,6 +34,11 @@ func (lds *LocalDatabase) Delete(datasetID string) (Database, error) { return &LocalDatabase{newDatabaseCommon(lds.cch, lds.vs, lds.rt), lds.cs}, err } +func (lds *LocalDatabase) SetHead(datasetID string, commit types.Struct) (Database, error) { + err := lds.doSetHead(datasetID, commit) + return &LocalDatabase{newDatabaseCommon(lds.cch, lds.vs, lds.rt), lds.cs}, err +} + func (lds *LocalDatabase) validatingBatchStore() (bs types.BatchStore) { bs = lds.vs.BatchStore() if !bs.IsValidating() { diff --git a/go/datas/pull.go b/go/datas/pull.go index 23b09e1e6e..efe5f66754 100644 --- a/go/datas/pull.go +++ b/go/datas/pull.go @@ -21,12 +21,17 @@ type PullProgress struct { func Pull(srcDB, sinkDB Database, sourceRef, sinkHeadRef types.Ref, concurrency int, progressCh chan PullProgress) { srcQ, sinkQ := &types.RefByHeight{sourceRef}, &types.RefByHeight{sinkHeadRef} + // If the sourceRef points to an object already in sinkDB, there's nothing to do. + if sinkDB.has(sourceRef.TargetHash()) { + return + } + // We generally expect that sourceRef descends from sinkHeadRef, so that walking down from sinkHeadRef yields useful hints. If it's not even in the srcDB, then just clear out sinkQ right now and don't bother. if !srcDB.has(sinkHeadRef.TargetHash()) { sinkQ.PopBack() } - // Since we expect sourceRef to descend from sinkHeadRef, we assume srcDB has a superset of the data in sinkDB. There are some cases where, logically, the code wants to read data it knows to be in sinkDB. In this case, it doesn't actually matter which Database the data comes from, so as an optimization we use whichever is a LocalDatabase -- if either is. + // Since we expect sinkHeadRef to descend from sourceRef, we assume srcDB has a superset of the data in sinkDB. There are some cases where, logically, the code wants to read data it knows to be in sinkDB. In this case, it doesn't actually matter which Database the data comes from, so as an optimization we use whichever is a LocalDatabase -- if either is. mostLocalDB := srcDB if _, ok := sinkDB.(*LocalDatabase); ok { mostLocalDB = sinkDB @@ -239,7 +244,7 @@ func traverseSink(sinkRef types.Ref, db Database) traverseResult { } func traverseCommon(comRef, sinkHead types.Ref, db Database) traverseResult { - if comRef.Height() > 1 && isRefOfCommitType(comRef.Type()) { + if comRef.Height() > 1 && IsRefOfCommitType(comRef.Type()) { commit := comRef.TargetValue(db).(types.Struct) // We don't want to traverse the parents of sinkHead, but we still want to traverse its Value on the sinkDB side. We also still want to traverse all children, in both the srcDB and sinkDB, of any common Commit that is not at the Head of sinkDB. exclusionSet := types.NewSet() diff --git a/go/datas/remote_database_client.go b/go/datas/remote_database_client.go index a305be73c0..e52b989d59 100644 --- a/go/datas/remote_database_client.go +++ b/go/datas/remote_database_client.go @@ -27,7 +27,7 @@ func (rds *RemoteDatabaseClient) validatingBatchStore() (bs types.BatchStore) { } func (rds *RemoteDatabaseClient) Commit(datasetID string, commit types.Struct) (Database, error) { - err := rds.commit(datasetID, commit) + err := rds.doCommit(datasetID, commit) return &RemoteDatabaseClient{newDatabaseCommon(rds.cch, rds.vs, rds.rt)}, err } @@ -36,6 +36,11 @@ func (rds *RemoteDatabaseClient) Delete(datasetID string) (Database, error) { return &RemoteDatabaseClient{newDatabaseCommon(rds.cch, rds.vs, rds.rt)}, err } +func (rds *RemoteDatabaseClient) SetHead(datasetID string, commit types.Struct) (Database, error) { + err := rds.doSetHead(datasetID, commit) + return &RemoteDatabaseClient{newDatabaseCommon(rds.cch, rds.vs, rds.rt)}, err +} + func (f RemoteStoreFactory) CreateStore(ns string) Database { return NewRemoteDatabase(f.host+httprouter.CleanPath(ns), f.auth) } diff --git a/go/datas/remote_database_handlers.go b/go/datas/remote_database_handlers.go index 08d600daea..593dae9ef4 100644 --- a/go/datas/remote_database_handlers.go +++ b/go/datas/remote_database_handlers.go @@ -255,7 +255,7 @@ func handleRootPost(w http.ResponseWriter, req *http.Request, ps URLParams, cs c func isMapOfStringToRefOfCommit(m types.Map) bool { mapTypes := m.Type().Desc.(types.CompoundDesc).ElemTypes keyType, valType := mapTypes[0], mapTypes[1] - return keyType.Kind() == types.StringKind && (isRefOfCommitType(valType) || isUnionOfRefOfCommitType(valType)) + return keyType.Kind() == types.StringKind && (IsRefOfCommitType(valType) || isUnionOfRefOfCommitType(valType)) } func isUnionOfRefOfCommitType(t *types.Type) bool { @@ -263,7 +263,7 @@ func isUnionOfRefOfCommitType(t *types.Type) bool { return false } for _, et := range t.Desc.(types.CompoundDesc).ElemTypes { - if !isRefOfCommitType(et) { + if !IsRefOfCommitType(et) { return false } } diff --git a/go/dataset/dataset.go b/go/dataset/dataset.go index 31d09e94ce..a09c1ed034 100644 --- a/go/dataset/dataset.go +++ b/go/dataset/dataset.go @@ -102,28 +102,43 @@ func (ds *Dataset) Commit(v types.Value, opts CommitOptions) (Dataset, error) { return Dataset{store, ds.id}, err } -func (ds *Dataset) Pull(sourceStore datas.Database, sourceRef types.Ref, concurrency int, progressCh chan datas.PullProgress) (Dataset, error) { - sink := *ds - +// Pull objects that descend from sourceRef in srcDB into sinkDB, using at most the given degree of concurrency. Progress will be reported over progressCh as the algorithm works. Objects that are already present in ds will not be pulled over. +func (ds *Dataset) Pull(sourceDB datas.Database, sourceRef types.Ref, concurrency int, progressCh chan datas.PullProgress) { sinkHeadRef := types.Ref{} - if currentHeadRef, ok := sink.MaybeHeadRef(); ok { + if currentHeadRef, ok := ds.MaybeHeadRef(); ok { sinkHeadRef = currentHeadRef } + datas.Pull(sourceDB, ds.Database(), sourceRef, sinkHeadRef, concurrency, progressCh) +} - if sourceRef == sinkHeadRef { - return sink, nil +// FastForward takes a types.Ref to a Commit object and makes it the new Head of ds iff it is a descendant of the current Head. Intended to be used e.g. after a call to Pull(). If the update cannot be performed, e.g., because another process moved the current Head out from under you, err will be non-nil. The newest snapshot of the Dataset is always returned, so the caller an easily retry using the latest. +func (ds *Dataset) FastForward(newHeadRef types.Ref) (sink Dataset, err error) { + sink = *ds + if currentHeadRef, ok := sink.MaybeHeadRef(); ok && newHeadRef == currentHeadRef { + return + } else if newHeadRef.Height() <= currentHeadRef.Height() { + return sink, datas.ErrMergeNeeded } - datas.Pull(sourceStore, sink.Database(), sourceRef, sinkHeadRef, concurrency, progressCh) - err := datas.ErrOptimisticLockFailed - for ; err == datas.ErrOptimisticLockFailed; sink, err = sink.setNewHead(sourceRef) { + for err = datas.ErrOptimisticLockFailed; err == datas.ErrOptimisticLockFailed; sink, err = sink.commitNewHead(newHeadRef) { + } + return +} + +// SetHead takes a types.Ref to a Commit object and makes it the new Head of ds. Intended to be used e.g. when rewinding in ds' Commit history. If the update cannot be performed, e.g., because the state of ds.Database() changed out from under you, err will be non-nil. The newest snapshot of the Dataset is always returned, so the caller an easily retry using the latest. +func (ds *Dataset) SetHead(newHeadRef types.Ref) (sink Dataset, err error) { + sink = *ds + if currentHeadRef, ok := sink.MaybeHeadRef(); ok && newHeadRef == currentHeadRef { + return } - return sink, err + commit := sink.validateRefAsCommit(newHeadRef) + store, err := sink.Database().SetHead(sink.id, commit) + return Dataset{store, sink.id}, err } func (ds *Dataset) validateRefAsCommit(r types.Ref) types.Struct { - v := ds.store.ReadValue(r.TargetHash()) + v := ds.Database().ReadValue(r.TargetHash()) if v == nil { panic(r.TargetHash().String() + " not found") @@ -134,8 +149,8 @@ func (ds *Dataset) validateRefAsCommit(r types.Ref) types.Struct { return v.(types.Struct) } -// setNewHead attempts to make the object pointed to by newHeadRef the new Head of ds. First, it checks that the object exists in ds and validates that it decodes to the correct type of value. Next, it attempts to commit the object to ds.Database(). This may fail if, for instance, the Head of ds has been changed by another goroutine or process. In the event that the commit fails, the error from Database().Commit() is returned along with a new Dataset that's at it's proper, current Head. The caller should take any necessary corrective action and try again using this new Dataset. -func (ds *Dataset) setNewHead(newHeadRef types.Ref) (Dataset, error) { +// commitNewHead attempts to make the object pointed to by newHeadRef the new Head of ds. First, it checks that the object exists in ds and validates that it decodes to the correct type of value. Next, it attempts to commit the object to ds.Database(). This may fail if, for instance, the Head of ds has been changed by another goroutine or process. In the event that the commit fails, the error from Database().Commit() is returned along with a new Dataset that's at it's proper, current Head. The caller should take any necessary corrective action and try again using this new Dataset. +func (ds *Dataset) commitNewHead(newHeadRef types.Ref) (Dataset, error) { commit := ds.validateRefAsCommit(newHeadRef) store, err := ds.Database().Commit(ds.id, commit) return Dataset{store, ds.id}, err diff --git a/go/dataset/pull_test.go b/go/dataset/pull_test.go index 7c4ff629f1..1fc5794a08 100644 --- a/go/dataset/pull_test.go +++ b/go/dataset/pull_test.go @@ -73,7 +73,9 @@ func TestPullTopDown(t *testing.T) { source, err = source.CommitValue(updatedValue) assert.NoError(err) - sink, err = sink.Pull(source.Database(), types.NewRef(source.Head()), 1, nil) + srcHeadRef := types.NewRef(source.Head()) + sink.Pull(source.Database(), srcHeadRef, 1, nil) + sink, err = sink.FastForward(srcHeadRef) assert.NoError(err) assert.True(source.Head().Equals(sink.Head())) } @@ -94,7 +96,9 @@ func TestPullFirstCommitTopDown(t *testing.T) { source, err := source.CommitValue(sourceInitialValue) assert.NoError(err) - sink, err = sink.Pull(source.Database(), types.NewRef(source.Head()), 1, nil) + srcHeadRef := types.NewRef(source.Head()) + sink.Pull(source.Database(), srcHeadRef, 1, nil) + sink, err = sink.FastForward(srcHeadRef) assert.NoError(err) assert.True(source.Head().Equals(sink.Head())) } @@ -113,7 +117,9 @@ func TestPullDeepRefTopDown(t *testing.T) { source, err := source.CommitValue(sourceInitialValue) assert.NoError(err) - sink, err = sink.Pull(source.Database(), types.NewRef(source.Head()), 1, nil) + srcHeadRef := types.NewRef(source.Head()) + sink.Pull(source.Database(), srcHeadRef, 1, nil) + sink, err = sink.FastForward(srcHeadRef) assert.NoError(err) assert.True(source.Head().Equals(sink.Head())) } @@ -154,10 +160,14 @@ func TestPullWithMeta(t *testing.T) { assert.NoError(err) h4 := source.Head() - sink, err = sink.Pull(source.Database(), types.NewRef(h2), 1, nil) + srcHeadRef := types.NewRef(h2) + sink.Pull(source.Database(), srcHeadRef, 1, nil) + sink, err = sink.FastForward(srcHeadRef) assert.NoError(err) - sink, err = sink.Pull(source.Database(), types.NewRef(h4), 1, nil) + srcHeadRef = types.NewRef(h4) + sink.Pull(source.Database(), srcHeadRef, 1, nil) + sink, err = sink.FastForward(srcHeadRef) assert.NoError(err) assert.True(source.Head().Equals(sink.Head())) }