Go: add heuristic merge option to db.Commit() (#2759)

This patch adds an optional MergePolicy field to CommitOptions. It's a
callback. If the caller sets it, then the commit code will look for a
common ancestor between the Dataset HEAD and the provided Commit. If
the caller-provided Commit descends from HEAD, then Commit proceeds as
normal.
If it does not, but there is a common ancestor, the code runs
merge.ThreeWay() on the values of the provided Commit, HEAD, and the
common ancestor, invoking the MergePolicy callback to resolve
conflicts. If merge succeeds, a merge Commit is created that descends
from both HEAD and the caller-provided Commit. This becomes the new
HEAD of the Dataset.

Fixes #2534
This commit is contained in:
cmasone-attic
2016-10-27 11:52:46 -07:00
committed by GitHub
parent b9db5d1bc2
commit e954427903
9 changed files with 156 additions and 130 deletions

View File

@@ -56,33 +56,20 @@ func NewCommit(value types.Value, parents types.Set, meta types.Struct) types.St
return types.NewStructWithType(t, types.ValueSlice{meta, parents, value})
}
// CommitDescendsFrom returns true if commit descends from ancestor
func CommitDescendsFrom(commit types.Struct, ancestor types.Ref, vr types.ValueReader) bool {
// BFS because the common case is that the ancestor is only a step or two away
ancestors := commit.Get(ParentsField).(types.Set)
for !ancestors.Has(ancestor) {
if ancestors.Empty() {
return false
}
ancestors = getAncestors(ancestors, ancestor.Height(), vr)
}
return true
}
// FindCommonAncestor returns the most recent common ancestor of c1 and c2, if
// one exists, setting ok to true. If there is no common ancestor, ok is set
// to false.
func FindCommonAncestor(c1, c2 types.Struct, vr types.ValueReader) (a types.Struct, ok bool) {
d.PanicIfFalse(IsCommitType(c1.Type()), "FindCommonAncestor() called on %s", c1.Type().Describe())
d.PanicIfFalse(IsCommitType(c2.Type()), "FindCommonAncestor() called on %s", c2.Type().Describe())
func FindCommonAncestor(c1, c2 types.Ref, vr types.ValueReader) (a types.Ref, ok bool) {
d.PanicIfFalse(IsRefOfCommitType(c1.Type()), "FindCommonAncestor() called on %s", c1.Type().Describe())
d.PanicIfFalse(IsRefOfCommitType(c2.Type()), "FindCommonAncestor() called on %s", c2.Type().Describe())
c1Q, c2Q := &types.RefByHeight{types.NewRef(c1)}, &types.RefByHeight{types.NewRef(c2)}
c1Q, c2Q := &types.RefByHeight{c1}, &types.RefByHeight{c2}
for !c1Q.Empty() && !c2Q.Empty() {
c1Ht, c2Ht := c1Q.MaxHeight(), c2Q.MaxHeight()
if c1Ht == c2Ht {
c1Parents, c2Parents := c1Q.PopRefsOfHeight(c1Ht), c2Q.PopRefsOfHeight(c2Ht)
if common := findCommonRef(c1Parents, c2Parents); (common != types.Ref{}) {
return common.TargetValue(vr).(types.Struct), true
return common, true
}
parentsToQueue(c1Parents, c1Q, vr)
parentsToQueue(c2Parents, c2Q, vr)
@@ -124,29 +111,6 @@ func findCommonRef(a, b types.RefSlice) types.Ref {
return types.Ref{}
}
// getAncestors returns set of direct ancestors with height >= minHeight
func getAncestors(commits types.Set, minHeight uint64, vr types.ValueReader) types.Set {
ancestors := types.NewSet()
commits.IterAll(func(v types.Value) {
r := v.(types.Ref)
c := r.TargetValue(vr).(types.Struct)
// only consider commit-refs greater than minHeight; commit-refs at same height
// can be ignored since their parent heights will be < minHeight
if r.Height() > minHeight {
next := []types.Value{}
c.Get(ParentsField).(types.Set).IterAll(func(v types.Value) {
r := v.(types.Ref)
// only consider parent commit-refs >= minHeight
if r.Height() >= minHeight {
next = append(next, v)
}
})
ancestors = ancestors.Insert(next...)
}
})
return ancestors
}
func makeCommitType(valueType *types.Type, parentsValueTypes []*types.Type, metaType *types.Type, parentsMetaTypes []*types.Type) *types.Type {
tmp := make([]*types.Type, len(parentsValueTypes), len(parentsValueTypes)+1)
copy(tmp, parentsValueTypes)

View File

@@ -4,11 +4,24 @@
package datas
import "github.com/attic-labs/noms/go/types"
import (
"github.com/attic-labs/noms/go/merge"
"github.com/attic-labs/noms/go/types"
)
// CommitOptions is used to pass options into Commit.
type CommitOptions struct {
// Parents, if provided is the parent commits of the commit we are creating.
// Parents, if provided is the parent commits of the commit we are
// creating.
Parents types.Set
Meta types.Struct
// Meta is a Struct that describes arbitrary metadata about this Commit,
// e.g. a timestamp or descriptive text.
Meta types.Struct
// Policy will be called to attempt to merge this Commit with the current
// Head, if this is not a fast-forward. If Policy is nil, no merging will
// be attempted. Note that because Commit() retries in some cases, Policy
// might also be called multiple times with different values.
Policy merge.Policy
}

View File

@@ -134,14 +134,15 @@ func TestFindCommonAncestor(t *testing.T) {
// Assert that c is the common ancestor of a and b
assertCommonAncestor := func(expected, a, b types.Struct) {
if found, ok := FindCommonAncestor(a, b, db); assert.True(ok) {
if found, ok := FindCommonAncestor(types.NewRef(a), types.NewRef(b), db); assert.True(ok) {
ancestor := found.TargetValue(db).(types.Struct)
assert.True(
expected.Equals(found),
expected.Equals(ancestor),
"%s should be common ancestor of %s, %s. Got %s",
expected.Get(ValueField),
a.Get(ValueField),
b.Get(ValueField),
found.Get(ValueField),
ancestor.Get(ValueField),
)
}
}
@@ -183,79 +184,13 @@ func TestFindCommonAncestor(t *testing.T) {
assertCommonAncestor(a1, a6, c3) // Traversing multiple parents on both sides
// No common ancestor
if found, ok := FindCommonAncestor(d2, a6, db); !assert.False(ok) {
if found, ok := FindCommonAncestor(types.NewRef(d2), types.NewRef(a6), db); !assert.False(ok) {
assert.Fail(
"Unexpected common ancestor!",
"Should be no common ancestor of %s, %s. Got %s",
d2.Get(ValueField),
a6.Get(ValueField),
found.Get(ValueField),
found.TargetValue(db).(types.Struct).Get(ValueField),
)
}
}
func TestCommitDescendsFrom(t *testing.T) {
assert := assert.New(t)
db := NewDatabase(chunks.NewTestStore())
defer db.Close()
// Add a commit and return it
addCommit := func(datasetID string, val string, parents ...types.Struct) types.Struct {
ds := db.GetDataset(datasetID)
var err error
ds, err = db.Commit(ds, types.String(val), CommitOptions{Parents: toRefSet(parents...)})
assert.NoError(err)
return ds.Head()
}
// Assert that c does/doesn't descend from a
assertDescendsFrom := func(c types.Struct, a types.Struct, expected bool) {
assert.Equal(expected, CommitDescendsFrom(c, types.NewRef(a), db),
"Test: CommitDescendsFrom(%s, %s)", c.Get("value"), a.Get("value"))
}
// Assert that children have immediate ancestors with height >= minHeight
assertAncestors := func(children []types.Struct, minLevel uint64, expected []types.Struct) {
exp := toRefSet(expected...)
ancestors := getAncestors(toRefSet(children...), minLevel, db)
assert.True(exp.Equals(ancestors), "expected: [%s]; got: [%s]", toValuesString(exp, db), toValuesString(ancestors, db))
}
// Build commit DAG
//
// ds-a: a1<-a2<-a3<-a4<-a5<-a6
// ^ /
// \ /-------/
// \ V
// ds-b: b2
//
a := "ds-a"
b := "ds-b"
a1 := addCommit(a, "a1")
a2 := addCommit(a, "a2", a1)
b2 := addCommit(b, "b2", a1)
a3 := addCommit(a, "a3", a2)
a4 := addCommit(a, "a4", a3)
a5 := addCommit(a, "a5", a4, b2)
a6 := addCommit(a, "a6", a5)
// Positive tests
assertDescendsFrom(a3, a2, true) // parent
assertDescendsFrom(a3, a1, true) // grandparent
assertDescendsFrom(a3, a1, true) // origin
assertDescendsFrom(a6, b2, true) // merge ancestor
assertDescendsFrom(a5, a3, true) // exercise prune parent
assertDescendsFrom(a6, a3, true) // exercise prune grandparent
// Negative tests
assertDescendsFrom(a4, a5, false) // sanity
assertDescendsFrom(a6, a6, false) // self
assertDescendsFrom(a4, b2, false) // different branch
// Verify pruning
assertAncestors([]types.Struct{a6}, 5, []types.Struct{a5}) // no pruning; one parent
assertAncestors([]types.Struct{a5}, 2, []types.Struct{a4, b2}) // no pruning; 2 parents
assertAncestors([]types.Struct{a5}, 4, []types.Struct{a4}) // prune 1 parent
assertAncestors([]types.Struct{a5}, 5, []types.Struct{}) // prune child b/c child.Height <= minHeight
assertAncestors([]types.Struct{a4, b2}, 3, []types.Struct{a3}) // prune 1 child b/c child.Height <= minHeight
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/d"
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/noms/go/merge"
"github.com/attic-labs/noms/go/types"
)
@@ -93,11 +94,11 @@ func (dbc *databaseCommon) doFastForward(ds Dataset, newHeadRef types.Ref) error
}
commit := dbc.validateRefAsCommit(newHeadRef)
return dbc.doCommit(ds.ID(), commit)
return dbc.doCommit(ds.ID(), commit, nil)
}
// doCommit manages concurrent access the single logical piece of mutable state: the current Root. doCommit is optimistic in that it is attempting to update head making the assumption that currentRootHash is the hash of the current head. The call to UpdateRoot below will return an 'ErrOptimisticLockFailed' error if that assumption fails (e.g. because of a race with another writer) and the entire algorithm must be tried again. This method will also fail and return an 'ErrMergeNeeded' error if the |commit| is not a descendent of the current dataset head
func (dbc *databaseCommon) doCommit(datasetID string, commit types.Struct) error {
func (dbc *databaseCommon) doCommit(datasetID string, commit types.Struct, mergePolicy merge.Policy) error {
d.PanicIfFalse(IsCommitType(commit.Type()), "Can't commit a non-Commit struct to dataset %s", datasetID)
defer func() { dbc.rootHash, dbc.datasets = dbc.rt.Root(), nil }()
@@ -107,17 +108,33 @@ func (dbc *databaseCommon) doCommit(datasetID string, commit types.Struct) error
currentRootHash, currentDatasets := dbc.getRootAndDatasets()
commitRef := dbc.WriteValue(commit) // will be orphaned if the tryUpdateRoot() below fails
// Allow only fast-forward commits.
// If there's nothing in the DB yet, skip all this logic.
if !currentRootHash.IsEmpty() {
r, hasHead := currentDatasets.MaybeGet(types.String(datasetID))
// First commit in dataset is always fast-forward, so go through all this iff there's already a Head for datasetID.
if hasHead {
// This covers all cases where commit doesn't descend from the Head of datasetID, including the case where we hit an ErrOptimisticLockFailed and looped back around because some other process changed the Head out from under us.
if !CommitDescendsFrom(commit, r.(types.Ref), dbc) {
currentHeadRef := r.(types.Ref)
ancestorRef, found := FindCommonAncestor(commitRef, currentHeadRef, dbc)
if !found {
return ErrMergeNeeded
}
// This covers all cases where currentHeadRef is not an ancestor of commit, including the following edge cases:
// - commit is a duplicate of currentHead.
// - we hit an ErrOptimisticLockFailed and looped back around because some other process changed the Head out from under us.
if !currentHeadRef.Equals(ancestorRef) || currentHeadRef.Equals(commitRef) {
if mergePolicy == nil {
return ErrMergeNeeded
}
ancestor, currentHead := dbc.validateRefAsCommit(ancestorRef), dbc.validateRefAsCommit(currentHeadRef)
merged, err := mergePolicy(commit.Get(ValueField), currentHead.Get(ValueField), ancestor.Get(ValueField), dbc, nil)
if err != nil {
return err
}
commitRef = dbc.WriteValue(NewCommit(merged, types.NewSet(commitRef, currentHeadRef), types.EmptyStruct))
}
}
}
currentDatasets = currentDatasets.Set(types.String(datasetID), commitRef)

View File

@@ -9,6 +9,7 @@ import (
"github.com/attic-labs/noms/go/chunks"
"github.com/attic-labs/noms/go/hash"
"github.com/attic-labs/noms/go/merge"
"github.com/attic-labs/noms/go/types"
"github.com/attic-labs/testify/assert"
"github.com/attic-labs/testify/suite"
@@ -163,7 +164,7 @@ func (suite *DatabaseSuite) TestDatabaseCommit() {
// \----|c|
// Should be disallowed.
c := types.String("c")
ds, err = suite.db.Commit(ds, c, CommitOptions{Parents: types.NewSet(aCommitRef)})
ds, err = suite.db.Commit(ds, c, newOpts(aCommitRef))
suite.Error(err)
suite.True(ds.HeadValue().Equals(b))
@@ -176,7 +177,7 @@ func (suite *DatabaseSuite) TestDatabaseCommit() {
// Attempt to recommit |b| with |a| as parent.
// Should be disallowed.
ds, err = suite.db.Commit(ds, b, CommitOptions{Parents: types.NewSet(aCommitRef)})
ds, err = suite.db.Commit(ds, b, newOpts(aCommitRef))
suite.Error(err)
suite.True(ds.HeadValue().Equals(d))
@@ -191,6 +192,10 @@ func (suite *DatabaseSuite) TestDatabaseCommit() {
newDB.Close()
}
func newOpts(parents ...types.Value) CommitOptions {
return CommitOptions{Parents: types.NewSet(parents...)}
}
func (suite *DatabaseSuite) TestDatabaseDuplicateCommit() {
datasetID := "ds1"
ds := suite.db.GetDataset(datasetID)
@@ -202,7 +207,49 @@ func (suite *DatabaseSuite) TestDatabaseDuplicateCommit() {
suite.NoError(err)
_, err = suite.db.CommitValue(ds, v)
suite.Error(err)
suite.IsType(ErrMergeNeeded, err)
}
func (suite *DatabaseSuite) TestDatabaseCommitMerge() {
datasetID1, datasetID2 := "ds1", "ds2"
ds1, ds2 := suite.db.GetDataset(datasetID1), suite.db.GetDataset(datasetID2)
var err error
v := types.NewMap(types.String("Hello"), types.Number(42))
ds1, err = suite.db.CommitValue(ds1, v)
ds1First := ds1
suite.NoError(err)
ds1, err = suite.db.CommitValue(ds1, v.Set(types.String("Friends"), types.Bool(true)))
suite.NoError(err)
ds2, err = suite.db.CommitValue(ds2, types.String("Goodbye"))
suite.NoError(err)
// No common ancestor
_, err = suite.db.Commit(ds1, types.Number(47), newOpts(ds2.HeadRef()))
suite.IsType(ErrMergeNeeded, err, "%s", err)
// Unmergeable
_, err = suite.db.Commit(ds1, types.Number(47), newOptsWithMerge(merge.None, ds1First.HeadRef()))
suite.IsType(&merge.ErrMergeConflict{}, err, "%s", err)
// Merge policies
newV := v.Set(types.String("Friends"), types.Bool(false))
_, err = suite.db.Commit(ds1, newV, newOptsWithMerge(merge.None, ds1First.HeadRef()))
suite.IsType(&merge.ErrMergeConflict{}, err, "%s", err)
theirs, err := suite.db.Commit(ds1, newV, newOptsWithMerge(merge.Theirs, ds1First.HeadRef()))
suite.NoError(err)
suite.True(types.Bool(true).Equals(theirs.HeadValue().(types.Map).Get(types.String("Friends"))))
newV = v.Set(types.String("Friends"), types.Number(47))
ours, err := suite.db.Commit(ds1First, newV, newOptsWithMerge(merge.Ours, ds1First.HeadRef()))
suite.NoError(err)
suite.True(types.Number(47).Equals(ours.HeadValue().(types.Map).Get(types.String("Friends"))))
}
func newOptsWithMerge(policy merge.ResolveFunc, parents ...types.Value) CommitOptions {
return CommitOptions{Parents: types.NewSet(parents...), Policy: merge.NewThreeWay(policy)}
}
func (suite *DatabaseSuite) TestDatabaseDelete() {

View File

@@ -32,7 +32,7 @@ func (ldb *LocalDatabase) GetDataset(datasetID string) Dataset {
func (ldb *LocalDatabase) Commit(ds Dataset, v types.Value, opts CommitOptions) (Dataset, error) {
return ldb.doHeadUpdate(
ds,
func(ds Dataset) error { return ldb.doCommit(ds.ID(), buildNewCommit(ds, v, opts)) },
func(ds Dataset) error { return ldb.doCommit(ds.ID(), buildNewCommit(ds, v, opts), opts.Policy) },
)
}

View File

@@ -29,7 +29,7 @@ func (rdb *RemoteDatabaseClient) GetDataset(datasetID string) Dataset {
}
func (rdb *RemoteDatabaseClient) Commit(ds Dataset, v types.Value, opts CommitOptions) (Dataset, error) {
err := rdb.doCommit(ds.ID(), buildNewCommit(ds, v, opts))
err := rdb.doCommit(ds.ID(), buildNewCommit(ds, v, opts), opts.Policy)
return rdb.GetDataset(ds.ID()), err
}

View File

@@ -11,6 +11,12 @@ import (
"github.com/attic-labs/noms/go/types"
)
// Policy functors are used to merge two values (a and b) against a common
// ancestor. All three Values and their must by wholly readable from vrw.
// Whenever a change is merged, implementations should send a struct{} over
// progress.
type Policy func(a, b, ancestor types.Value, vrw types.ValueReadWriter, progress chan struct{}) (merged types.Value, err error)
// ResolveFunc is the type for custom merge-conflict resolution callbacks.
// When the merge algorithm encounters two non-mergeable changes (aChange and
// bChange) at the same path, it calls the ResolveFunc passed into ThreeWay().
@@ -22,6 +28,21 @@ import (
// to be used (if any), and true.
type ResolveFunc func(aChange, bChange types.DiffChangeType, a, b types.Value, path types.Path) (change types.DiffChangeType, merged types.Value, ok bool)
// None is the no-op ResolveFunc. Any conflict results in a merge failure.
func None(aChange, bChange types.DiffChangeType, a, b types.Value, path types.Path) (change types.DiffChangeType, merged types.Value, ok bool) {
return change, merged, false
}
// Ours resolves conflicts by preferring changes from the Value currently being committed.
func Ours(aChange, bChange types.DiffChangeType, a, b types.Value, path types.Path) (change types.DiffChangeType, merged types.Value, ok bool) {
return aChange, a, true
}
// Theirs resolves conflicts by preferring changes in the current HEAD.
func Theirs(aChange, bChange types.DiffChangeType, a, b types.Value, path types.Path) (change types.DiffChangeType, merged types.Value, ok bool) {
return bChange, b, true
}
// ErrMergeConflict indicates that a merge attempt failed and must be resolved
// manually for the provided reason.
type ErrMergeConflict struct {
@@ -36,6 +57,13 @@ func newMergeConflict(format string, args ...interface{}) *ErrMergeConflict {
return &ErrMergeConflict{fmt.Sprintf(format, args...)}
}
// Creates a new Policy based on ThreeWay using the provided ResolveFunc.
func NewThreeWay(resolve ResolveFunc) Policy {
return func(a, b, parent types.Value, vrw types.ValueReadWriter, progress chan struct{}) (merged types.Value, err error) {
return ThreeWay(a, b, parent, vrw, resolve, progress)
}
}
// ThreeWay attempts a three-way merge between two _candidate_ values that
// have both changed with respect to a common _parent_ value. The result of
// the algorithm is a _merged_ value or an error if merging could not be done.
@@ -119,7 +147,7 @@ func ThreeWay(a, b, parent types.Value, vrw types.ValueReadWriter, resolve Resol
}
if resolve == nil {
resolve = defaultResolve
resolve = None
}
m := &merger{vrw, resolve, progress}
return m.threeWay(a, b, parent, types.Path{})
@@ -140,10 +168,6 @@ type merger struct {
progress chan<- struct{}
}
func defaultResolve(aChange, bChange types.DiffChangeType, a, b types.Value, p types.Path) (change types.DiffChangeType, merged types.Value, ok bool) {
return
}
func updateProgress(progress chan<- struct{}) {
// TODO: Eventually we'll want more information than a single bit :).
if progress != nil {

View File

@@ -179,6 +179,32 @@ func (s *ThreeWayKeyValMergeSuite) TestThreeWayMerge_CustomMerge() {
}
}
func (s *ThreeWayKeyValMergeSuite) TestThreeWayMerge_MergeOurs() {
p := kvs{"k1", "k-one"}
a := kvs{"k1", "k-won"}
b := kvs{"k1", "k-too", "k2", "k-two"}
exp := kvs{"k1", "k-won", "k2", "k-two"}
merged, err := ThreeWay(s.create(a), s.create(b), s.create(p), s.vs, Ours, nil)
if s.NoError(err) {
expected := s.create(exp)
s.True(expected.Equals(merged), "%s != %s", types.EncodedValue(expected), types.EncodedValue(merged))
}
}
func (s *ThreeWayKeyValMergeSuite) TestThreeWayMerge_MergeTheirs() {
p := kvs{"k1", "k-one"}
a := kvs{"k1", "k-won"}
b := kvs{"k1", "k-too", "k2", "k-two"}
exp := kvs{"k1", "k-too", "k2", "k-two"}
merged, err := ThreeWay(s.create(a), s.create(b), s.create(p), s.vs, Theirs, nil)
if s.NoError(err) {
expected := s.create(exp)
s.True(expected.Equals(merged), "%s != %s", types.EncodedValue(expected), types.EncodedValue(merged))
}
}
func (s *ThreeWayKeyValMergeSuite) TestThreeWayMerge_NilConflict() {
s.tryThreeWayConflict(nil, s.create(mm2b), s.create(mm2), "Cannot merge nil Value with")
s.tryThreeWayConflict(s.create(mm2a), nil, s.create(mm2), "with nil Value.")