Avoid loading full commit history of all new commits superceed at least one current root

This commit is contained in:
Rafael Weinstein
2015-07-17 13:58:54 -07:00
parent 04fea40e72
commit f18ae3ef22
2 changed files with 50 additions and 30 deletions
+47 -28
View File
@@ -42,41 +42,41 @@ func (ds *DataStore) Roots() RootSet {
func (ds *DataStore) Commit(newRoots RootSet) DataStore {
Chk.True(newRoots.Len() > 0)
parentsList := make([]types.Set, newRoots.Len())
i := uint64(0)
newRoots.Iter(func(root Root) (stop bool) {
parentsList[i] = root.Parents()
i++
return
})
superceded := types.NewSet().Union(parentsList...)
for !ds.doCommit(newRoots, RootSet{superceded}) {
// TODO: We probably shouldn't let this go *forever*. Considrer putting a limit and... I know don't...panicing?
for !ds.doCommit(newRoots) {
}
return newDataStoreInternal(ds.ChunkStore, ds.rt, ds.rc)
}
func (ds *DataStore) doCommit(add, remove RootSet) bool {
// Note that |oldRoots| may be different from |ds.Roots| if someone else has commited since this Datastore was created. This computation must be based on the *current root* not the root associated with this Datastore.
currentRootRef := ds.rt.Root()
oldRoots := rootSetFromRef(currentRootRef, ds)
// doCommit manages concurrent access the single logical piece of mutable state: the current root (rootSet). doCommit is optimistic in that it is attempting to create a new root making the assumption that currentRootRef is the existing root. The call to UpdateRoot below will fail if that assumption fails (e.g. because of a race with another writer) and the entire algorigthm must be tried again.
func (ds *DataStore) doCommit(roots RootSet) bool {
Chk.True(roots.Len() > 0)
prexisting := make([]Root, 0)
ds.rc.Update(currentRootRef)
add.Iter(func(r Root) (stop bool) {
if ds.rc.Contains(r.Ref()) {
prexisting = append(prexisting, r)
}
return
})
add = add.Remove(prexisting...)
if add.Len() == 0 {
return true
currentRootRef := ds.rt.Root()
// Note: |currentRoots| may be different from |ds.roots| and *must* be consistent with |currentRootRef|.
var currentRoots RootSet
if currentRootRef == ds.roots.Ref() {
currentRoots = ds.roots
} else {
currentRoots = rootSetFromRef(currentRootRef, ds)
}
newRoots := oldRoots.Subtract(remove).Union(add)
newRoots := roots.Union(currentRoots)
roots.Iter(func(root Root) (stop bool) {
if ds.isPrexisting(root, currentRoots) {
newRoots = newRoots.Remove(root)
} else {
newRoots = RootSetFromVal(newRoots.NomsValue().Subtract(root.Parents()))
}
return
})
if newRoots.Len() == 0 || newRoots.Equals(currentRoots) {
return true
}
// TODO: This set will be orphaned if this UpdateRoot below fails
newRootRef, err := types.WriteValue(newRoots.NomsValue(), ds)
@@ -84,3 +84,22 @@ func (ds *DataStore) doCommit(add, remove RootSet) bool {
return ds.rt.UpdateRoot(newRootRef, currentRootRef)
}
func (ds *DataStore) isPrexisting(root Root, currentRoots RootSet) bool {
if currentRoots.Has(root) {
return true
}
// If a new root directly superceeds an existing current root, it can't have already been committed because its hash would be uncomputable.
superceedsCurrentRoot := false
root.Parents().Iter(func(parent types.Value) (stop bool) {
superceedsCurrentRoot = currentRoots.Has(RootFromVal(parent))
return superceedsCurrentRoot
})
if superceedsCurrentRoot {
return false
}
ds.rc.Update(currentRoots)
return ds.rc.Contains(root.Ref())
}
+3 -2
View File
@@ -25,11 +25,12 @@ func (cache *rootCache) updateFromCommit(root Root) {
cache.refs[root.Ref()] = true
}
func (cache *rootCache) Update(rootsRef ref.Ref) {
if rootsRef == (ref.Ref{}) {
func (cache *rootCache) Update(currentRoots RootSet) {
if currentRoots.Len() == 0 {
return
}
rootsRef := currentRoots.Ref()
if _, ok := cache.refs[rootsRef]; ok {
return
}