Allow changes to different Datasets to proceed concurrently (#2533)

Prior to this change, concurrent modification of two different
Datasets in a given Database would result in an
ErrOptimisticLockFailed for whichever process lost the race. As of
this patch, concurrent changes to the _same_ Dataset will result in an
ErrMergeNeeded for the loser, but concurrent changes to different
Datasets will succeed.

Fixes #2524
This commit is contained in:
cmasone-attic
2016-09-14 13:56:30 -07:00
committed by GitHub
parent 5edf89cf3d
commit 55caa0f519
5 changed files with 263 additions and 117 deletions

View File

@@ -96,34 +96,60 @@ func (ds *databaseCommon) doSetHead(datasetID string, commit types.Struct) error
func (ds *databaseCommon) doCommit(datasetID string, commit types.Struct) error {
d.PanicIfTrue(!IsCommitType(commit.Type()), "Can't commit a non-Commit struct to dataset %s", datasetID)
currentRootRef, currentDatasets := ds.getRootAndDatasets()
commitRef := ds.WriteValue(commit) // will be orphaned if the tryUpdateRoot() below fails
var err error
for err = ErrOptimisticLockFailed; err == ErrOptimisticLockFailed; {
currentRootRef, currentDatasets := ds.getRootAndDatasets()
commitRef := ds.WriteValue(commit) // will be orphaned if the tryUpdateRoot() below fails
// First commit in store is always fast-forward.
if !currentRootRef.IsEmpty() {
r, hasHead := currentDatasets.MaybeGet(types.String(datasetID))
// Allow only fast-forward commits.
// If there's nothing in the DB yet, skip all this logic.
if !currentRootRef.IsEmpty() {
r, hasHead := currentDatasets.MaybeGet(types.String(datasetID))
// First commit in dataset is always fast-forward.
if hasHead {
currentHeadRef := r.(types.Ref)
// Allow only fast-forward commits.
if commitRef.Equals(currentHeadRef) {
return nil
}
if !CommitDescendsFrom(commit, currentHeadRef, ds) {
return ErrMergeNeeded
// First commit in dataset is always fast-forward, so go through all this iff there's already a Head for datasetID.
if hasHead {
currentHeadRef := r.(types.Ref)
if commitRef.Equals(currentHeadRef) {
return nil
}
// This covers all cases where commit doesn't descend from the Head of datasetID, including the case where we hit an ErrOptimisticLockFailed and looped back around because some other process changed the Head out from under us.
if !CommitDescendsFrom(commit, currentHeadRef, ds) {
return ErrMergeNeeded
}
}
}
currentDatasets = currentDatasets.Set(types.String(datasetID), commitRef)
err = ds.tryUpdateRoot(currentDatasets, currentRootRef)
}
currentDatasets = currentDatasets.Set(types.String(datasetID), commitRef)
return ds.tryUpdateRoot(currentDatasets, currentRootRef)
return err
}
// doDelete manages concurrent access the single logical piece of mutable state: the current Root. doDelete is optimistic in that it is attempting to update head making the assumption that currentRootRef is the hash of the current head. The call to UpdateRoot below will return an 'ErrOptimisticLockFailed' error if that assumption fails (e.g. because of a race with another writer) and the entire algorithm must be tried again.
func (ds *databaseCommon) doDelete(datasetID string) error {
func (ds *databaseCommon) doDelete(datasetIDstr string) error {
datasetID := types.String(datasetIDstr)
currentRootRef, currentDatasets := ds.getRootAndDatasets()
currentDatasets = currentDatasets.Remove(types.String(datasetID))
return ds.tryUpdateRoot(currentDatasets, currentRootRef)
var initialHead types.Ref
if r, hasHead := currentDatasets.MaybeGet(datasetID); !hasHead {
return nil
} else {
initialHead = r.(types.Ref)
}
var err error
for {
currentDatasets = currentDatasets.Remove(datasetID)
err = ds.tryUpdateRoot(currentDatasets, currentRootRef)
if err != ErrOptimisticLockFailed {
break
}
// If the optimistic lock failed because someone changed the Head of datasetID, then return ErrMergeNeeded. If it failed because someone changed a different Dataset, we should try again.
currentRootRef, currentDatasets = ds.getRootAndDatasets()
if r, hasHead := currentDatasets.MaybeGet(datasetID); !hasHead || (hasHead && !initialHead.Equals(r)) {
err = ErrMergeNeeded
break
}
}
return err
}
func (ds *databaseCommon) getRootAndDatasets() (currentRootRef hash.Hash, currentDatasets types.Map) {
@@ -138,7 +164,7 @@ func (ds *databaseCommon) getRootAndDatasets() (currentRootRef hash.Hash, curren
}
func (ds *databaseCommon) tryUpdateRoot(currentDatasets types.Map, currentRootRef hash.Hash) (err error) {
// TODO: This Commit will be orphaned if the UpdateRoot below fails
// TODO: This Map will be orphaned if the UpdateRoot below fails
newRootRef := ds.WriteValue(currentDatasets).TargetHash()
// If the root has been updated by another process in the short window since we read it, this call will fail. See issue #404
if !ds.rt.UpdateRoot(newRootRef, currentRootRef) {

View File

@@ -27,8 +27,8 @@ func TestRemoteDatabase(t *testing.T) {
type DatabaseSuite struct {
suite.Suite
cs *chunks.TestStore
ds Database
makeDs func(chunks.ChunkStore) Database
db Database
makeDb func(chunks.ChunkStore) Database
}
type LocalDatabaseSuite struct {
@@ -37,8 +37,8 @@ type LocalDatabaseSuite struct {
func (suite *LocalDatabaseSuite) SetupTest() {
suite.cs = chunks.NewTestStore()
suite.makeDs = NewDatabase
suite.ds = suite.makeDs(suite.cs)
suite.makeDb = NewDatabase
suite.db = suite.makeDb(suite.cs)
}
type RemoteDatabaseSuite struct {
@@ -47,24 +47,24 @@ type RemoteDatabaseSuite struct {
func (suite *RemoteDatabaseSuite) SetupTest() {
suite.cs = chunks.NewTestStore()
suite.makeDs = func(cs chunks.ChunkStore) Database {
suite.makeDb = func(cs chunks.ChunkStore) Database {
hbs := newHTTPBatchStoreForTest(cs)
return &RemoteDatabaseClient{newDatabaseCommon(newCachingChunkHaver(hbs), types.NewValueStore(hbs), hbs)}
}
suite.ds = suite.makeDs(suite.cs)
suite.db = suite.makeDb(suite.cs)
}
func (suite *DatabaseSuite) TearDownTest() {
suite.ds.Close()
suite.db.Close()
suite.cs.Close()
}
func (suite *DatabaseSuite) TestReadWriteCache() {
var v types.Value = types.Bool(true)
suite.NotEqual(hash.Hash{}, suite.ds.WriteValue(v))
r := suite.ds.WriteValue(v).TargetHash()
suite.NotEqual(hash.Hash{}, suite.db.WriteValue(v))
r := suite.db.WriteValue(v).TargetHash()
commit := NewCommit(v, types.NewSet(), types.EmptyStruct)
newDs, err := suite.ds.Commit("foo", commit)
newDs, err := suite.db.Commit("foo", commit)
suite.NoError(err)
suite.Equal(1, suite.cs.Writes-writesOnCommit)
@@ -75,41 +75,41 @@ func (suite *DatabaseSuite) TestReadWriteCache() {
func (suite *DatabaseSuite) TestReadWriteCachePersists() {
var err error
var v types.Value = types.Bool(true)
suite.NotEqual(hash.Hash{}, suite.ds.WriteValue(v))
r := suite.ds.WriteValue(v)
suite.NotEqual(hash.Hash{}, suite.db.WriteValue(v))
r := suite.db.WriteValue(v)
commit := NewCommit(v, types.NewSet(), types.EmptyStruct)
suite.ds, err = suite.ds.Commit("foo", commit)
suite.db, err = suite.db.Commit("foo", commit)
suite.NoError(err)
suite.Equal(1, suite.cs.Writes-writesOnCommit)
newCommit := NewCommit(r, types.NewSet(types.NewRef(commit)), types.EmptyStruct)
suite.ds, err = suite.ds.Commit("foo", newCommit)
suite.db, err = suite.db.Commit("foo", newCommit)
suite.NoError(err)
}
func (suite *DatabaseSuite) TestWriteRefToNonexistentValue() {
suite.Panics(func() { suite.ds.WriteValue(types.NewRef(types.Bool(true))) })
suite.Panics(func() { suite.db.WriteValue(types.NewRef(types.Bool(true))) })
}
func (suite *DatabaseSuite) TestTolerateUngettableRefs() {
suite.Nil(suite.ds.ReadValue(hash.Hash{}))
suite.Nil(suite.db.ReadValue(hash.Hash{}))
}
func (suite *DatabaseSuite) TestDatabaseCommit() {
datasetID := "ds1"
datasets := suite.ds.Datasets()
datasets := suite.db.Datasets()
suite.Zero(datasets.Len())
// |a|
a := types.String("a")
aCommit := NewCommit(a, types.NewSet(), types.EmptyStruct)
ds2, err := suite.ds.Commit(datasetID, aCommit)
ds2, err := suite.db.Commit(datasetID, aCommit)
suite.NoError(err)
// The old database still has no head.
_, ok := suite.ds.MaybeHead(datasetID)
_, ok := suite.db.MaybeHead(datasetID)
suite.False(ok)
_, ok = suite.ds.MaybeHeadRef(datasetID)
_, ok = suite.db.MaybeHeadRef(datasetID)
suite.False(ok)
// The new database has |a|.
@@ -118,45 +118,45 @@ func (suite *DatabaseSuite) TestDatabaseCommit() {
aRef1 := ds2.HeadRef(datasetID)
suite.Equal(aCommit1.Hash(), aRef1.TargetHash())
suite.Equal(uint64(1), aRef1.Height())
suite.ds = ds2
suite.db = ds2
// |a| <- |b|
b := types.String("b")
bCommit := NewCommit(b, types.NewSet(types.NewRef(aCommit)), types.EmptyStruct)
suite.ds, err = suite.ds.Commit(datasetID, bCommit)
suite.db, err = suite.db.Commit(datasetID, bCommit)
suite.NoError(err)
suite.True(suite.ds.Head(datasetID).Get(ValueField).Equals(b))
suite.Equal(uint64(2), suite.ds.HeadRef(datasetID).Height())
suite.True(suite.db.Head(datasetID).Get(ValueField).Equals(b))
suite.Equal(uint64(2), suite.db.HeadRef(datasetID).Height())
// |a| <- |b|
// \----|c|
// Should be disallowed.
c := types.String("c")
cCommit := NewCommit(c, types.NewSet(types.NewRef(aCommit)), types.EmptyStruct)
suite.ds, err = suite.ds.Commit(datasetID, cCommit)
suite.db, err = suite.db.Commit(datasetID, cCommit)
suite.Error(err)
suite.True(suite.ds.Head(datasetID).Get(ValueField).Equals(b))
suite.True(suite.db.Head(datasetID).Get(ValueField).Equals(b))
// |a| <- |b| <- |d|
d := types.String("d")
dCommit := NewCommit(d, types.NewSet(types.NewRef(bCommit)), types.EmptyStruct)
suite.ds, err = suite.ds.Commit(datasetID, dCommit)
suite.db, err = suite.db.Commit(datasetID, dCommit)
suite.NoError(err)
suite.True(suite.ds.Head(datasetID).Get(ValueField).Equals(d))
suite.Equal(uint64(3), suite.ds.HeadRef(datasetID).Height())
suite.True(suite.db.Head(datasetID).Get(ValueField).Equals(d))
suite.Equal(uint64(3), suite.db.HeadRef(datasetID).Height())
// Attempt to recommit |b| with |a| as parent.
// Should be disallowed.
suite.ds, err = suite.ds.Commit(datasetID, bCommit)
suite.db, err = suite.db.Commit(datasetID, bCommit)
suite.Error(err)
suite.True(suite.ds.Head(datasetID).Get(ValueField).Equals(d))
suite.True(suite.db.Head(datasetID).Get(ValueField).Equals(d))
// Add a commit to a different datasetId
_, err = suite.ds.Commit("otherDs", aCommit)
_, err = suite.db.Commit("otherDs", aCommit)
suite.NoError(err)
// Get a fresh database, and verify that both datasets are present
newDs := suite.makeDs(suite.cs)
newDs := suite.makeDb(suite.cs)
datasets2 := newDs.Datasets()
suite.Equal(uint64(2), datasets2.Len())
newDs.Close()
@@ -164,70 +164,70 @@ func (suite *DatabaseSuite) TestDatabaseCommit() {
func (suite *DatabaseSuite) TestDatabaseDelete() {
datasetID1, datasetID2 := "ds1", "ds2"
datasets := suite.ds.Datasets()
datasets := suite.db.Datasets()
suite.Zero(datasets.Len())
// |a|
var err error
a := types.String("a")
suite.ds, err = suite.ds.Commit(datasetID1, NewCommit(a, types.NewSet(), types.EmptyStruct))
suite.db, err = suite.db.Commit(datasetID1, NewCommit(a, types.NewSet(), types.EmptyStruct))
suite.NoError(err)
suite.True(suite.ds.Head(datasetID1).Get(ValueField).Equals(a))
suite.True(suite.db.Head(datasetID1).Get(ValueField).Equals(a))
// ds1; |a|, ds2: |b|
b := types.String("b")
suite.ds, err = suite.ds.Commit(datasetID2, NewCommit(b, types.NewSet(), types.EmptyStruct))
suite.db, err = suite.db.Commit(datasetID2, NewCommit(b, types.NewSet(), types.EmptyStruct))
suite.NoError(err)
suite.True(suite.ds.Head(datasetID2).Get(ValueField).Equals(b))
suite.True(suite.db.Head(datasetID2).Get(ValueField).Equals(b))
suite.ds, err = suite.ds.Delete(datasetID1)
suite.db, err = suite.db.Delete(datasetID1)
suite.NoError(err)
suite.True(suite.ds.Head(datasetID2).Get(ValueField).Equals(b))
_, present := suite.ds.MaybeHead(datasetID1)
suite.True(suite.db.Head(datasetID2).Get(ValueField).Equals(b))
_, present := suite.db.MaybeHead(datasetID1)
suite.False(present, "Dataset %s should not be present", datasetID1)
// Get a fresh database, and verify that only ds1 is present
newDs := suite.makeDs(suite.cs)
newDs := suite.makeDb(suite.cs)
datasets = newDs.Datasets()
suite.Equal(uint64(1), datasets.Len())
_, present = suite.ds.MaybeHead(datasetID2)
_, present = suite.db.MaybeHead(datasetID2)
suite.True(present, "Dataset %s should be present", datasetID2)
newDs.Close()
}
func (suite *DatabaseSuite) TestDatabaseDeleteConcurrent() {
func (suite *DatabaseSuite) TestDeleteConcurrentDatabaseUse() {
datasetID := "ds1"
suite.Zero(suite.ds.Datasets().Len())
suite.Zero(suite.db.Datasets().Len())
var err error
// |a|
a := types.String("a")
aCommit := NewCommit(a, types.NewSet(), types.EmptyStruct)
suite.ds, err = suite.ds.Commit(datasetID, aCommit)
suite.db, err = suite.db.Commit(datasetID, aCommit)
suite.NoError(err)
// |a| <- |b|
b := types.String("b")
bCommit := NewCommit(b, types.NewSet(types.NewRef(aCommit)), types.EmptyStruct)
ds2, err := suite.ds.Commit(datasetID, bCommit)
db2, err := suite.db.Commit(datasetID, bCommit)
suite.NoError(err)
suite.True(suite.ds.Head(datasetID).Get(ValueField).Equals(a))
suite.True(ds2.Head(datasetID).Get(ValueField).Equals(b))
suite.True(suite.db.Head(datasetID).Get(ValueField).Equals(a))
suite.True(db2.Head(datasetID).Get(ValueField).Equals(b))
suite.ds, err = suite.ds.Delete(datasetID)
suite.db, err = suite.db.Delete(datasetID)
suite.NoError(err)
_, present := suite.ds.MaybeHead(datasetID)
_, present := suite.db.MaybeHead(datasetID)
suite.False(present, "Dataset %s should not be present", datasetID)
_, present = ds2.MaybeHead(datasetID)
_, present = db2.MaybeHead(datasetID)
suite.True(present, "Dataset %s should be present", datasetID)
// Get a fresh database, and verify that no databases are present
newDs := suite.makeDs(suite.cs)
suite.Equal(uint64(0), newDs.Datasets().Len())
newDs.Close()
newDb := suite.makeDb(suite.cs)
suite.Equal(uint64(0), newDb.Datasets().Len())
newDb.Close()
}
func (suite *DatabaseSuite) TestDatabaseConcurrency() {
func (suite *DatabaseSuite) TestConcurrentDatabaseUse() {
datasetID := "ds1"
var err error
@@ -235,41 +235,167 @@ func (suite *DatabaseSuite) TestDatabaseConcurrency() {
// |a| <- |b|
a := types.String("a")
aCommit := NewCommit(a, types.NewSet(), types.EmptyStruct)
suite.ds, err = suite.ds.Commit(datasetID, aCommit)
suite.db, err = suite.db.Commit(datasetID, aCommit)
b := types.String("b")
bCommit := NewCommit(b, types.NewSet(types.NewRef(aCommit)), types.EmptyStruct)
suite.ds, err = suite.ds.Commit(datasetID, bCommit)
suite.db, err = suite.db.Commit(datasetID, bCommit)
suite.NoError(err)
suite.True(suite.ds.Head(datasetID).Get(ValueField).Equals(b))
suite.True(suite.db.Head(datasetID).Get(ValueField).Equals(b))
// Important to create this here.
ds2 := suite.makeDs(suite.cs)
db2 := suite.makeDb(suite.cs)
// Change 1:
// |a| <- |b| <- |c|
c := types.String("c")
cCommit := NewCommit(c, types.NewSet(types.NewRef(bCommit)), types.EmptyStruct)
suite.ds, err = suite.ds.Commit(datasetID, cCommit)
suite.db, err = suite.db.Commit(datasetID, cCommit)
suite.NoError(err)
suite.True(suite.ds.Head(datasetID).Get(ValueField).Equals(c))
suite.True(suite.db.Head(datasetID).Get(ValueField).Equals(c))
// Change 2:
// |a| <- |b| <- |e|
// Should be disallowed, Database returned by Commit() should have |c| as Head.
e := types.String("e")
eCommit := NewCommit(e, types.NewSet(types.NewRef(bCommit)), types.EmptyStruct)
ds2, err = ds2.Commit(datasetID, eCommit)
db2, err = db2.Commit(datasetID, eCommit)
suite.Error(err)
suite.True(ds2.Head(datasetID).Get(ValueField).Equals(c))
suite.True(db2.Head(datasetID).Get(ValueField).Equals(c))
}
type waitDuringUpdateRootChunkStore struct {
chunks.ChunkStore
preUpdateRootHook func()
}
func (w *waitDuringUpdateRootChunkStore) UpdateRoot(current, last hash.Hash) (ok bool) {
if w.preUpdateRootHook != nil {
w.preUpdateRootHook()
}
ok = w.ChunkStore.UpdateRoot(current, last)
return
}
func (suite *DatabaseSuite) TestCommitWithConcurrentChunkStoreUse() {
datasetID := "ds1"
var err error
// Setup:
// ds1: |a| <- |b|
aCommit := NewCommit(types.String("a"), types.NewSet(), types.EmptyStruct)
suite.db, err = suite.db.Commit(datasetID, aCommit)
b := types.String("b")
bCommit := NewCommit(b, types.NewSet(types.NewRef(aCommit)), types.EmptyStruct)
suite.db, err = suite.db.Commit(datasetID, bCommit)
suite.NoError(err)
suite.True(suite.db.Head(datasetID).Get(ValueField).Equals(b))
// Craft DB that will allow me to move the backing ChunkStore while suite.db isn't looking
w := &waitDuringUpdateRootChunkStore{suite.cs, nil}
db := suite.makeDb(w)
// Concurrent change, but to some other dataset. This shouldn't stop changes to ds1.
// ds1: |a| <- |b|
// ds2: |stuff|
w.preUpdateRootHook = func() {
var concErr error
e := types.String("stuff")
eCommit := NewCommit(e, types.NewSet(types.NewRef(bCommit)), types.EmptyStruct)
suite.db, concErr = suite.db.Commit("ds2", eCommit)
suite.NoError(concErr)
suite.True(suite.db.Head("ds2").Get(ValueField).Equals(e))
w.preUpdateRootHook = nil
}
// Attempted Concurrent change, which should proceed without a problem
c := types.String("c")
cCommit := NewCommit(c, types.NewSet(types.NewRef(bCommit)), types.EmptyStruct)
db, err = db.Commit(datasetID, cCommit)
suite.NoError(err)
suite.True(db.Head(datasetID).Get(ValueField).Equals(c))
// Concurrent change, to move root out from under my feet:
// ds1: |a| <- |b| <- |c| <- |e|
e := types.String("e")
w.preUpdateRootHook = func() {
var concErr error
eCommit := NewCommit(e, types.NewSet(types.NewRef(cCommit)), types.EmptyStruct)
suite.db, concErr = suite.db.Commit(datasetID, eCommit)
suite.NoError(concErr)
suite.True(suite.db.Head(datasetID).Get(ValueField).Equals(e))
w.preUpdateRootHook = nil
}
// Attempted Concurrent change, which should fail due to the above
nope := types.String("nope")
nopeCommit := NewCommit(nope, types.NewSet(types.NewRef(cCommit)), types.EmptyStruct)
db, err = db.Commit(datasetID, nopeCommit)
suite.Error(err)
suite.True(db.Head(datasetID).Get(ValueField).Equals(e))
}
func (suite *DatabaseSuite) TestDeleteWithConcurrentChunkStoreUse() {
datasetID := "ds1"
var err error
// Setup:
// ds1: |a| <- |b|
aCommit := NewCommit(types.String("a"), types.NewSet(), types.EmptyStruct)
suite.db, err = suite.db.Commit(datasetID, aCommit)
b := types.String("b")
bCommit := NewCommit(b, types.NewSet(types.NewRef(aCommit)), types.EmptyStruct)
suite.db, err = suite.db.Commit(datasetID, bCommit)
suite.NoError(err)
suite.True(suite.db.Head(datasetID).Get(ValueField).Equals(b))
// Craft DB that will allow me to move the backing ChunkStore while suite.db isn't looking
w := &waitDuringUpdateRootChunkStore{suite.cs, nil}
db := suite.makeDb(w)
// Concurrent change, to move root out from under my feet:
// ds1: |a| <- |b| <- |e|
e := types.String("e")
w.preUpdateRootHook = func() {
var concErr error
eCommit := NewCommit(e, types.NewSet(types.NewRef(bCommit)), types.EmptyStruct)
suite.db, concErr = suite.db.Commit(datasetID, eCommit)
suite.NoError(concErr)
suite.True(suite.db.Head(datasetID).Get(ValueField).Equals(e))
w.preUpdateRootHook = nil
}
// Attempted Concurrent change, which should fail due to the above
db, err = db.Delete(datasetID)
suite.Error(err)
suite.True(db.Head(datasetID).Get(ValueField).Equals(e))
// Concurrent change, but to some other dataset. This shouldn't stop changes to ds1.
// ds1: |a| <- |b| <- |e|
// ds2: |stuff|
w.preUpdateRootHook = func() {
var concErr error
e := types.String("stuff")
eCommit := NewCommit(e, types.NewSet(types.NewRef(bCommit)), types.EmptyStruct)
suite.db, concErr = suite.db.Commit("ds2", eCommit)
suite.NoError(concErr)
suite.True(suite.db.Head("ds2").Get(ValueField).Equals(e))
w.preUpdateRootHook = nil
}
// Attempted Concurrent change, which should proceed without a problem
db, err = db.Delete(datasetID)
suite.NoError(err)
_, present := db.MaybeHead(datasetID)
suite.False(present, "Dataset %s should not be present", datasetID)
}
func (suite *DatabaseSuite) TestDatabaseHeightOfRefs() {
r1 := suite.ds.WriteValue(types.String("hello"))
r1 := suite.db.WriteValue(types.String("hello"))
suite.Equal(uint64(1), r1.Height())
r2 := suite.ds.WriteValue(r1)
r2 := suite.db.WriteValue(r1)
suite.Equal(uint64(2), r2.Height())
suite.Equal(uint64(3), suite.ds.WriteValue(r2).Height())
suite.Equal(uint64(3), suite.db.WriteValue(r2).Height())
}
func (suite *DatabaseSuite) TestDatabaseHeightOfCollections() {
@@ -280,41 +406,41 @@ func (suite *DatabaseSuite) TestDatabaseHeightOfCollections() {
v1 := types.String("hello")
v2 := types.String("world")
s1 := types.NewSet(v1, v2)
suite.Equal(uint64(1), suite.ds.WriteValue(s1).Height())
suite.Equal(uint64(1), suite.db.WriteValue(s1).Height())
// Set<Ref<String>>
s2 := types.NewSet(suite.ds.WriteValue(v1), suite.ds.WriteValue(v2))
suite.Equal(uint64(2), suite.ds.WriteValue(s2).Height())
s2 := types.NewSet(suite.db.WriteValue(v1), suite.db.WriteValue(v2))
suite.Equal(uint64(2), suite.db.WriteValue(s2).Height())
// List<Set<String>>
v3 := types.String("foo")
v4 := types.String("bar")
s3 := types.NewSet(v3, v4)
l1 := types.NewList(s1, s3)
suite.Equal(uint64(1), suite.ds.WriteValue(l1).Height())
suite.Equal(uint64(1), suite.db.WriteValue(l1).Height())
// List<Ref<Set<String>>
l2 := types.NewList(suite.ds.WriteValue(s1), suite.ds.WriteValue(s3))
suite.Equal(uint64(2), suite.ds.WriteValue(l2).Height())
l2 := types.NewList(suite.db.WriteValue(s1), suite.db.WriteValue(s3))
suite.Equal(uint64(2), suite.db.WriteValue(l2).Height())
// List<Ref<Set<Ref<String>>>
s4 := types.NewSet(suite.ds.WriteValue(v3), suite.ds.WriteValue(v4))
l3 := types.NewList(suite.ds.WriteValue(s4))
suite.Equal(uint64(3), suite.ds.WriteValue(l3).Height())
s4 := types.NewSet(suite.db.WriteValue(v3), suite.db.WriteValue(v4))
l3 := types.NewList(suite.db.WriteValue(s4))
suite.Equal(uint64(3), suite.db.WriteValue(l3).Height())
// List<Set<String> | RefValue<Set<String>>>
l4 := types.NewList(s1, suite.ds.WriteValue(s3))
suite.Equal(uint64(2), suite.ds.WriteValue(l4).Height())
l5 := types.NewList(suite.ds.WriteValue(s1), s3)
suite.Equal(uint64(2), suite.ds.WriteValue(l5).Height())
l4 := types.NewList(s1, suite.db.WriteValue(s3))
suite.Equal(uint64(2), suite.db.WriteValue(l4).Height())
l5 := types.NewList(suite.db.WriteValue(s1), s3)
suite.Equal(uint64(2), suite.db.WriteValue(l5).Height())
// Familiar with the "New Jersey Turnpike" drink? Here's the noms version of that...
everything := []types.Value{v1, v2, s1, s2, v3, v4, s3, l1, l2, s4, l3, l4, l5}
andMore := make([]types.Value, 0, len(everything)*3+2)
for _, v := range everything {
andMore = append(andMore, v, v.Type(), suite.ds.WriteValue(v))
andMore = append(andMore, v, v.Type(), suite.db.WriteValue(v))
}
andMore = append(andMore, setOfStringType, setOfRefOfStringType)
suite.ds.WriteValue(types.NewList(andMore...))
suite.db.WriteValue(types.NewList(andMore...))
}

View File

@@ -227,6 +227,7 @@ func (suite *HTTPBatchStoreSuite) TestRoot() {
func (suite *HTTPBatchStoreSuite) TestVersionMismatch() {
store := newBadVersionHTTPBatchStoreForTest(suite)
c := chunks.NewChunk([]byte("abc"))
suite.cs.Put(c)
suite.Panics(func() { store.UpdateRoot(c.Hash(), hash.Hash{}) })
}

View File

@@ -72,8 +72,8 @@ func versionCheck(hndlr Handler) Handler {
err := d.Try(func() { hndlr(w, req, ps, cs) })
if err != nil {
fmt.Printf("Returning bad request: %v\n", err)
http.Error(w, fmt.Sprintf("Error: %v", err), http.StatusBadRequest)
fmt.Printf("Returning bad request:\n%v\n", err)
http.Error(w, fmt.Sprintf("Error: %v", d.Unwrap(err)), http.StatusBadRequest)
return
}
}

View File

@@ -112,17 +112,17 @@ func (ds *Dataset) Pull(sourceDB datas.Database, sourceRef types.Ref, concurrenc
}
// FastForward takes a types.Ref to a Commit object and makes it the new Head of ds iff it is a descendant of the current Head. Intended to be used e.g. after a call to Pull(). If the update cannot be performed, e.g., because another process moved the current Head out from under you, err will be non-nil. The newest snapshot of the Dataset is always returned, so the caller an easily retry using the latest.
func (ds *Dataset) FastForward(newHeadRef types.Ref) (sink Dataset, err error) {
sink = *ds
func (ds *Dataset) FastForward(newHeadRef types.Ref) (Dataset, error) {
sink := *ds
if currentHeadRef, ok := sink.MaybeHeadRef(); ok && newHeadRef == currentHeadRef {
return
return sink, nil
} else if newHeadRef.Height() <= currentHeadRef.Height() {
return sink, datas.ErrMergeNeeded
}
for err = datas.ErrOptimisticLockFailed; err == datas.ErrOptimisticLockFailed; sink, err = sink.commitNewHead(newHeadRef) {
}
return
commit := ds.validateRefAsCommit(newHeadRef)
store, err := ds.Database().Commit(ds.id, commit)
return Dataset{store, ds.id}, err
}
// SetHead takes a types.Ref to a Commit object and makes it the new Head of ds. Intended to be used e.g. when rewinding in ds' Commit history. If the update cannot be performed, e.g., because the state of ds.Database() changed out from under you, err will be non-nil. The newest snapshot of the Dataset is always returned, so the caller an easily retry using the latest.
@@ -148,10 +148,3 @@ func (ds *Dataset) validateRefAsCommit(r types.Ref) types.Struct {
}
return v.(types.Struct)
}
// commitNewHead attempts to make the object pointed to by newHeadRef the new Head of ds. First, it checks that the object exists in ds and validates that it decodes to the correct type of value. Next, it attempts to commit the object to ds.Database(). This may fail if, for instance, the Head of ds has been changed by another goroutine or process. In the event that the commit fails, the error from Database().Commit() is returned along with a new Dataset that's at it's proper, current Head. The caller should take any necessary corrective action and try again using this new Dataset.
func (ds *Dataset) commitNewHead(newHeadRef types.Ref) (Dataset, error) {
commit := ds.validateRefAsCommit(newHeadRef)
store, err := ds.Database().Commit(ds.id, commit)
return Dataset{store, ds.id}, err
}