mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-05 02:59:44 -06:00
Rewrite DataSet to compose DataStore rather than embed it
This commit is contained in:
committed by
Chris Masone
parent
d7894a6501
commit
b3bf30daa6
@@ -218,7 +218,7 @@ func getAlbumPhotos(id string) SetOfPhoto {
|
||||
SetTags(getTags(p.Tags)).
|
||||
SetImage(b)
|
||||
// The photo is big, so write it out now to release the memory.
|
||||
r := types.WriteValue(photo.NomsValue(), ds)
|
||||
r := types.WriteValue(photo.NomsValue(), ds.Store())
|
||||
photos = photos.Insert(types.Ref{r})
|
||||
}
|
||||
return SetOfPhotoFromVal(photos)
|
||||
|
||||
@@ -31,8 +31,8 @@ func main() {
|
||||
}
|
||||
|
||||
newHead := source.Head().Ref()
|
||||
refs := sync.DiffHeadsByRef(sink.Head().Ref(), newHead, source)
|
||||
sync.CopyChunks(refs, source, sink)
|
||||
refs := sync.DiffHeadsByRef(sink.Head().Ref(), newHead, source.Store())
|
||||
sync.CopyChunks(refs, source.Store(), sink.Store())
|
||||
for ok := false; !ok; *sink, ok = sync.SetNewHeads(newHead, *sink) {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -59,5 +59,5 @@ func main() {
|
||||
_, ok := ds.Commit(datas.NewCommit().SetParents(ds.HeadAsSet()).SetValue(out.NomsValue()))
|
||||
d.Exp.True(ok, "Could not commit due to conflicting edit")
|
||||
|
||||
fmt.Println(ds.Root().String())
|
||||
fmt.Println(ds.Store().Root().String())
|
||||
}
|
||||
|
||||
@@ -61,7 +61,7 @@ func main() {
|
||||
return nil
|
||||
}
|
||||
|
||||
ref := types.WriteValue(nomsObj, ds)
|
||||
ref := types.WriteValue(nomsObj, ds.Store())
|
||||
list = list.Append(types.Ref{R: ref})
|
||||
return nil
|
||||
})
|
||||
|
||||
@@ -10,30 +10,21 @@ import (
|
||||
// DataStore provides versioned storage for noms values. Each DataStore instance represents one moment in history. Heads() returns the Commit from each active fork at that moment. The Commit() method returns a new DataStore, representing a new moment in history.
|
||||
type DataStore struct {
|
||||
chunks.ChunkStore
|
||||
|
||||
rt chunks.RootTracker
|
||||
head Commit
|
||||
}
|
||||
|
||||
func NewDataStore(cs chunks.ChunkStore) DataStore {
|
||||
return NewDataStoreWithRootTracker(cs, cs)
|
||||
}
|
||||
|
||||
// NewDataStore() creates a new DataStore with a specified ChunkStore and RootTracker. Typically these two values will be the same, but it is sometimes useful to have a separate RootTracker (e.g., see DataSet).
|
||||
func NewDataStoreWithRootTracker(cs chunks.ChunkStore, rt chunks.RootTracker) DataStore {
|
||||
return newDataStoreInternal(cs, rt)
|
||||
return newDataStoreInternal(cs)
|
||||
}
|
||||
|
||||
var EmptyCommit = NewCommit().SetParents(NewSetOfCommit().NomsValue())
|
||||
|
||||
func newDataStoreInternal(cs chunks.ChunkStore, rt chunks.RootTracker) DataStore {
|
||||
if (rt.Root() == ref.Ref{}) {
|
||||
r := types.WriteValue(EmptyCommit.NomsValue(), cs)
|
||||
d.Chk.True(rt.UpdateRoot(r, ref.Ref{}))
|
||||
}
|
||||
return DataStore{
|
||||
cs, rt, commitFromRef(rt.Root(), cs),
|
||||
func newDataStoreInternal(cs chunks.ChunkStore) DataStore {
|
||||
if (cs.Root() == ref.Ref{}) {
|
||||
r := types.WriteValue(EmptyCommit.NomsValue(), cs) // this is a little weird.
|
||||
d.Chk.True(cs.UpdateRoot(r, ref.Ref{}))
|
||||
}
|
||||
return DataStore{cs, commitFromRef(cs.Root(), cs)}
|
||||
}
|
||||
|
||||
func commitFromRef(commitRef ref.Ref, cs chunks.ChunkSource) Commit {
|
||||
@@ -55,12 +46,12 @@ func (ds *DataStore) HeadAsSet() types.Set {
|
||||
// If the call fails, the boolean return value will be set to false and the caller must retry. Regardless, the DataStore returned is the right one to use for subsequent calls to Commit() -- retries or otherwise.
|
||||
func (ds *DataStore) Commit(newCommit Commit) (DataStore, bool) {
|
||||
ok := ds.doCommit(newCommit)
|
||||
return newDataStoreInternal(ds.ChunkStore, ds.rt), ok
|
||||
return newDataStoreInternal(ds.ChunkStore), ok
|
||||
}
|
||||
|
||||
// doCommit manages concurrent access the single logical piece of mutable state: the current head. doCommit is optimistic in that it is attempting to update head making the assumption that currentRootRef is the ref of the current head. The call to UpdateRoot below will fail if that assumption fails (e.g. because of a race with another writer) and the entire algorithm must be tried again.
|
||||
func (ds *DataStore) doCommit(commit Commit) bool {
|
||||
currentRootRef := ds.rt.Root()
|
||||
currentRootRef := ds.Root()
|
||||
|
||||
// Note: |currentHead| may be different from |ds.head| and *must* be consistent with |currentRootRef|.
|
||||
var currentHead Commit
|
||||
@@ -80,7 +71,7 @@ func (ds *DataStore) doCommit(commit Commit) bool {
|
||||
// TODO: This Commit will be orphaned if this UpdateRoot below fails
|
||||
newRootRef := types.WriteValue(commit.NomsValue(), ds)
|
||||
|
||||
ok := ds.rt.UpdateRoot(newRootRef, currentRootRef)
|
||||
ok := ds.UpdateRoot(newRootRef, currentRootRef)
|
||||
return ok
|
||||
}
|
||||
|
||||
|
||||
@@ -4,23 +4,59 @@ import (
|
||||
"flag"
|
||||
|
||||
"github.com/attic-labs/noms/chunks"
|
||||
"github.com/attic-labs/noms/d"
|
||||
"github.com/attic-labs/noms/datas"
|
||||
"github.com/attic-labs/noms/dataset/mgmt"
|
||||
"github.com/attic-labs/noms/ref"
|
||||
"github.com/attic-labs/noms/types"
|
||||
)
|
||||
|
||||
type Dataset struct {
|
||||
datas.DataStore
|
||||
store datas.DataStore
|
||||
id string
|
||||
}
|
||||
|
||||
func NewDataset(parentStore datas.DataStore, datasetID string) Dataset {
|
||||
return Dataset{datas.NewDataStoreWithRootTracker(parentStore, &datasetRootTracker{parentStore, datasetID})}
|
||||
func NewDataset(store datas.DataStore, datasetID string) Dataset {
|
||||
return Dataset{store, datasetID}
|
||||
}
|
||||
|
||||
func (ds *Dataset) Store() datas.DataStore {
|
||||
return ds.store
|
||||
}
|
||||
|
||||
func (ds *Dataset) MaybeHead() (datas.Commit, bool) {
|
||||
sets := mgmt.GetDatasets(ds.store)
|
||||
head := mgmt.GetDatasetHead(sets, ds.id)
|
||||
if head == nil {
|
||||
return datas.NewCommit(), false
|
||||
}
|
||||
return datas.CommitFromVal(head), true
|
||||
}
|
||||
|
||||
func (ds *Dataset) Head() datas.Commit {
|
||||
c, ok := ds.MaybeHead()
|
||||
d.Chk.True(ok, "Dataset %s does not exist", ds.id)
|
||||
return c
|
||||
}
|
||||
|
||||
func (ds *Dataset) HeadAsSet() types.Set {
|
||||
commit, ok := ds.MaybeHead()
|
||||
commits := datas.NewSetOfCommit()
|
||||
if ok {
|
||||
commits = commits.Insert(commit)
|
||||
}
|
||||
return commits.NomsValue()
|
||||
}
|
||||
|
||||
// Commit updates the commit that a dataset points at.
|
||||
// If the update cannot be performed, e.g., because of a
|
||||
// conflict, the current snapshot of the dataset is
|
||||
// returned so that the client can merge the changes and
|
||||
// try again.
|
||||
func (ds *Dataset) Commit(newCommit datas.Commit) (Dataset, bool) {
|
||||
store, ok := ds.DataStore.Commit(newCommit)
|
||||
return Dataset{store}, ok
|
||||
sets := mgmt.GetDatasets(ds.store)
|
||||
sets = mgmt.SetDatasetHead(sets, ds.id, newCommit.NomsValue())
|
||||
store, ok := mgmt.CommitDatasets(ds.store, sets)
|
||||
return Dataset{store, ds.id}, ok
|
||||
}
|
||||
|
||||
type datasetFlags struct {
|
||||
@@ -52,25 +88,3 @@ func (f datasetFlags) CreateDataset() *Dataset {
|
||||
ds := NewDataset(rootDS, *f.datasetID)
|
||||
return &ds
|
||||
}
|
||||
|
||||
// TODO: Move to separate file
|
||||
type datasetRootTracker struct {
|
||||
parentStore datas.DataStore
|
||||
datasetID string
|
||||
}
|
||||
|
||||
func (rt *datasetRootTracker) Root() ref.Ref {
|
||||
dataset := mgmt.GetDatasetHead(mgmt.GetDatasets(rt.parentStore), rt.datasetID)
|
||||
if dataset == nil {
|
||||
return ref.Ref{}
|
||||
}
|
||||
return dataset.Ref()
|
||||
}
|
||||
|
||||
func (rt *datasetRootTracker) UpdateRoot(current, last ref.Ref) bool {
|
||||
datasetCommit := types.ReadValue(current, rt.parentStore)
|
||||
newDatasets := mgmt.SetDatasetHead(mgmt.GetDatasets(rt.parentStore), rt.datasetID, datasetCommit)
|
||||
ok := false
|
||||
rt.parentStore, ok = mgmt.CommitDatasets(rt.parentStore, newDatasets)
|
||||
return ok
|
||||
}
|
||||
|
||||
@@ -32,7 +32,8 @@ func TestDatasetCommitTracker(t *testing.T) {
|
||||
assert.False(ds2.Head().Value().Equals(ds1Commit))
|
||||
assert.False(ds1.Head().Value().Equals(ds2Commit))
|
||||
|
||||
assert.Equal("sha1-8aba7db6a2e7769afdb0b6ba3eabfe9b4624d83f", ms.Root().String())
|
||||
// This changed because the code no longer creates an initial commit.
|
||||
// assert.Equal("sha1-8aba7db6a2e7769afdb0b6ba3eabfe9b4624d83f", ms.Root().String())
|
||||
}
|
||||
|
||||
func TestExplicitBranchUsingDatasets(t *testing.T) {
|
||||
|
||||
@@ -28,7 +28,9 @@ func validateRefAsCommit(r ref.Ref, cs chunks.ChunkSource) datas.Commit {
|
||||
|
||||
// DiffHeadsByRef takes two Refs, validates that both refer to Heads in the given ChunkSource, and then returns the set of Refs that can be reached from 'big', but not 'small'.
|
||||
func DiffHeadsByRef(small, big ref.Ref, cs chunks.ChunkSource) []ref.Ref {
|
||||
validateRefAsCommit(small, cs)
|
||||
if small != (ref.Ref{}) {
|
||||
validateRefAsCommit(small, cs)
|
||||
}
|
||||
validateRefAsCommit(big, cs)
|
||||
return walk.Difference(small, big, cs)
|
||||
}
|
||||
@@ -49,6 +51,6 @@ func CopyChunks(refs []ref.Ref, src chunks.ChunkSource, sink chunks.ChunkSink) {
|
||||
|
||||
// SetNewHeads takes the Ref of the desired new Heads of ds, the chunk for which should already exist in the Dataset. It validates that the Ref points to an existing chunk that decodes to the correct type of value and then commits it to ds, returning a new Dataset with newHeadRef set and ok set to true. In the event that the commit fails, ok is set to false and a new up-to-date Dataset is returned WITHOUT newHeadRef in it. The caller should try again using this new Dataset.
|
||||
func SetNewHeads(newHeadRef ref.Ref, ds dataset.Dataset) (dataset.Dataset, bool) {
|
||||
commit := validateRefAsCommit(newHeadRef, ds)
|
||||
commit := validateRefAsCommit(newHeadRef, ds.Store())
|
||||
return ds.Commit(commit)
|
||||
}
|
||||
|
||||
@@ -57,8 +57,8 @@ func TestPull(t *testing.T) {
|
||||
pullee, ok = commitValue(updatedValue, pullee)
|
||||
assert.True(ok)
|
||||
|
||||
refs := DiffHeadsByRef(puller.Head().Ref(), pullee.Head().Ref(), pullee)
|
||||
CopyChunks(refs, pullee, puller)
|
||||
refs := DiffHeadsByRef(puller.Head().Ref(), pullee.Head().Ref(), pullee.Store())
|
||||
CopyChunks(refs, pullee.Store(), puller.Store())
|
||||
puller, ok = SetNewHeads(pullee.Head().Ref(), puller)
|
||||
assert.True(ok)
|
||||
assert.True(pullee.Head().Equals(puller.Head()))
|
||||
@@ -81,8 +81,17 @@ func TestPullFirstCommit(t *testing.T) {
|
||||
pullee, ok := commitValue(initialValue, pullee)
|
||||
assert.True(ok)
|
||||
|
||||
refs := DiffHeadsByRef(puller.Head().Ref(), pullee.Head().Ref(), pullee)
|
||||
CopyChunks(refs, pullee, puller)
|
||||
pullerHeadRef := func() ref.Ref {
|
||||
head, ok := puller.MaybeHead()
|
||||
if ok {
|
||||
return head.Ref()
|
||||
} else {
|
||||
return ref.Ref{}
|
||||
}
|
||||
}()
|
||||
|
||||
refs := DiffHeadsByRef(pullerHeadRef, pullee.Head().Ref(), pullee.Store())
|
||||
CopyChunks(refs, pullee.Store(), puller.Store())
|
||||
puller, ok = SetNewHeads(pullee.Head().Ref(), puller)
|
||||
assert.True(ok)
|
||||
assert.True(pullee.Head().Equals(puller.Head()))
|
||||
|
||||
@@ -8,9 +8,11 @@ import (
|
||||
// Difference returns the refs of the chunks reachable from 'big' that cannot be reached from 'small'
|
||||
func Difference(small, big ref.Ref, cs chunks.ChunkSource) (refs []ref.Ref) {
|
||||
smallRefs := map[ref.Ref]bool{}
|
||||
All(small, cs, func(r ref.Ref) {
|
||||
smallRefs[r] = true
|
||||
})
|
||||
if small != (ref.Ref{}) {
|
||||
All(small, cs, func(r ref.Ref) {
|
||||
smallRefs[r] = true
|
||||
})
|
||||
}
|
||||
Some(big, cs, func(r ref.Ref) (skip bool) {
|
||||
if skip = smallRefs[r]; !skip {
|
||||
refs = append(refs, r)
|
||||
|
||||
Reference in New Issue
Block a user