go/store/datas: Remove many uses of MaybeHeadRef.

This commit is contained in:
Aaron Son
2022-03-15 14:32:54 -07:00
parent 7b680c86a0
commit d980fa15ca
23 changed files with 193 additions and 607 deletions

View File

@@ -342,14 +342,10 @@ func (ddb *DoltDB) NewPendingCommit(
return nil, err
}
nomsHeadRef, hasHead, err := ds.MaybeHeadRef()
if err != nil {
return nil, err
}
nomsHeadAddr, hasHead := ds.MaybeHeadAddr()
var parents []hash.Hash
if hasHead {
parents = append(parents, nomsHeadRef.TargetHash())
parents = append(parents, nomsHeadAddr)
}
for _, pc := range parentCommits {

View File

@@ -64,12 +64,9 @@ func (ph *PushOnWriteHook) SetLogger(ctx context.Context, wr io.Writer) error {
// replicate pushes a dataset from srcDB to destDB and force sets the destDB ref to the new dataset value
func pushDataset(ctx context.Context, destDB, srcDB datas.Database, tempTableDir string, ds datas.Dataset) error {
stRef, ok, err := ds.MaybeHeadRef()
if err != nil {
return err
}
addr, ok := ds.MaybeHeadAddr()
if !ok {
// No head ref, return
// No head, return
return nil
}
@@ -85,7 +82,7 @@ func pushDataset(ctx context.Context, destDB, srcDB datas.Database, tempTableDir
return err
}
puller, err := pull.NewPuller(ctx, tempTableDir, defaultChunksPerTF, srcCS, destCS, wrf, stRef.TargetHash(), nil)
puller, err := pull.NewPuller(ctx, tempTableDir, defaultChunksPerTF, srcCS, destCS, wrf, addr, nil)
if err == pull.ErrDBUpToDate {
return nil
} else if err != nil {
@@ -102,7 +99,7 @@ func pushDataset(ctx context.Context, destDB, srcDB datas.Database, tempTableDir
return err
}
_, err = destDB.SetHead(ctx, ds, stRef.TargetHash())
_, err = destDB.SetHead(ctx, ds, addr)
return err
}
@@ -138,18 +135,15 @@ func NewAsyncPushOnWriteHook(bThreads *sql.BackgroundThreads, destDB *DoltDB, tm
// Execute implements CommitHook, replicates head updates to the destDb field
func (ah *AsyncPushOnWriteHook) Execute(ctx context.Context, ds datas.Dataset, db datas.Database) error {
rf, ok, err := ds.MaybeHeadRef()
if err != nil {
return ErrHashNotFound
}
addr, ok := ds.MaybeHeadAddr()
if !ok {
return ErrHashNotFound
}
select {
case ah.ch <- PushArg{ds: ds, db: db, hash: rf.TargetHash()}:
case ah.ch <- PushArg{ds: ds, db: db, hash: addr}:
case <-ctx.Done():
ah.ch <- PushArg{ds: ds, db: db, hash: rf.TargetHash()}
ah.ch <- PushArg{ds: ds, db: db, hash: addr}
return ctx.Err()
}
return nil

View File

@@ -567,13 +567,12 @@ func (ddb *DoltDB) CommitWithParentCommits(ctx context.Context, valHash hash.Has
}
var parents []hash.Hash
headRef, hasHead, err := ds.MaybeHeadRef()
headAddr, hasHead := ds.MaybeHeadAddr()
if err != nil {
return nil, err
}
if hasHead {
parents = append(parents, headRef.TargetHash())
parents = append(parents, headAddr)
}
for _, cm := range parentCommits {

View File

@@ -79,8 +79,7 @@ func runCommit(ctx context.Context, args []string) int {
util.CheckErrorNoUsage(errors.New(fmt.Sprintf("Error resolving value: %s", path)))
}
oldCommitRef, oldCommitExists, err := ds.MaybeHeadRef()
d.PanicIfError(err)
oldCommitAddr, oldCommitExists := ds.MaybeHeadAddr()
if oldCommitExists {
head, ok, err := ds.MaybeHeadValue()
@@ -108,10 +107,7 @@ func runCommit(ctx context.Context, args []string) int {
ds, err = db.Commit(ctx, ds, value, datas.CommitOptions{Meta: meta})
util.CheckErrorNoUsage(err)
headRef, ok, err := ds.MaybeHeadRef()
d.PanicIfError(err)
headAddr, ok := ds.MaybeHeadAddr()
if !ok {
panic("commit succeeded, but dataset has no head ref")
}
@@ -119,10 +115,10 @@ func runCommit(ctx context.Context, args []string) int {
if oldCommitExists {
if ok {
fmt.Fprintf(os.Stdout, "New head #%v (was #%v)\n", headRef.TargetHash().String(), oldCommitRef.TargetHash().String())
fmt.Fprintf(os.Stdout, "New head #%v (was #%v)\n", headAddr.String(), oldCommitAddr.String())
}
} else {
fmt.Fprintf(os.Stdout, "New head #%v\n", headRef.TargetHash().String())
fmt.Fprintf(os.Stdout, "New head #%v\n", headAddr.String())
}
return 0
}

View File

@@ -50,11 +50,11 @@ func (s *nomsDiffTestSuite) TestNomsDiffOutputNotTruncated() {
ds, err := addCommit(sp.GetDataset(context.Background()), "first commit")
s.NoError(err)
r1 := spec.CreateValueSpecString("nbs", s.DBDir, "#"+mustHeadRef(ds).TargetHash().String())
r1 := spec.CreateValueSpecString("nbs", s.DBDir, "#"+mustHeadAddr(ds).String())
ds, err = addCommit(ds, "second commit")
s.NoError(err)
r2 := spec.CreateValueSpecString("nbs", s.DBDir, "#"+mustHeadRef(ds).TargetHash().String())
r2 := spec.CreateValueSpecString("nbs", s.DBDir, "#"+mustHeadAddr(ds).String())
out, _ := s.MustRun(main, []string{"diff", r1, r2})
s.True(strings.HasSuffix(out, "\"second commit\"\n }\n"), out)
@@ -71,11 +71,11 @@ func (s *nomsDiffTestSuite) TestNomsDiffStat() {
ds, err := addCommit(sp.GetDataset(context.Background()), "first commit")
s.NoError(err)
r1 := spec.CreateHashSpecString("nbs", s.DBDir, mustHeadRef(ds).TargetHash())
r1 := spec.CreateHashSpecString("nbs", s.DBDir, mustHeadAddr(ds))
ds, err = addCommit(ds, "second commit")
s.NoError(err)
r2 := spec.CreateHashSpecString("nbs", s.DBDir, mustHeadRef(ds).TargetHash())
r2 := spec.CreateHashSpecString("nbs", s.DBDir, mustHeadAddr(ds))
out, _ := s.MustRun(main, []string{"diff", "--stat", r1, r2})
s.Contains(out, "Comparing commit values")
@@ -89,13 +89,13 @@ func (s *nomsDiffTestSuite) TestNomsDiffStat() {
ds, err = datas.CommitValue(context.Background(), db, ds, l)
s.NoError(err)
r3 := spec.CreateHashSpecString("nbs", s.DBDir, mustHeadRef(ds).TargetHash()) + ".value"
r3 := spec.CreateHashSpecString("nbs", s.DBDir, mustHeadAddr(ds)) + ".value"
l, err = types.NewList(context.Background(), vrw, types.Float(1), types.Float(222), types.Float(4))
s.NoError(err)
ds, err = datas.CommitValue(context.Background(), db, ds, l)
s.NoError(err)
r4 := spec.CreateHashSpecString("nbs", s.DBDir, mustHeadRef(ds).TargetHash()) + ".value"
r4 := spec.CreateHashSpecString("nbs", s.DBDir, mustHeadAddr(ds)) + ".value"
out, _ = s.MustRun(main, []string{"diff", "--stat", r3, r4})
s.Contains(out, "1 insertion (25.00%), 2 deletions (50.00%), 0 changes (0.00%), (4 values vs 3 values)")

View File

@@ -30,7 +30,6 @@ import (
"github.com/dolthub/dolt/go/store/cmd/noms/util"
"github.com/dolthub/dolt/go/store/config"
"github.com/dolthub/dolt/go/store/d"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/util/verbose"
)
@@ -60,9 +59,7 @@ func runDs(ctx context.Context, args []string) int {
util.CheckError(err)
defer db.Close()
oldCommitRef, errBool, err := set.MaybeHeadRef()
d.PanicIfError(err)
oldCommitAddr, errBool := set.MaybeHeadAddr()
if !errBool {
util.CheckError(fmt.Errorf("Dataset %v not found", set.ID()))
}
@@ -70,7 +67,7 @@ func runDs(ctx context.Context, args []string) int {
_, err = set.Database().Delete(ctx, set)
util.CheckError(err)
fmt.Printf("Deleted %v (was #%v)\n", toDelete, oldCommitRef.TargetHash().String())
fmt.Printf("Deleted %v (was #%v)\n", toDelete, oldCommitAddr.String())
} else {
dbSpec := ""
if len(args) >= 1 {

View File

@@ -112,11 +112,11 @@ func addCommitWithValue(ds datas.Dataset, v types.Value) (datas.Dataset, error)
}
func addBranchedDataset(vrw types.ValueReadWriter, newDs, parentDs datas.Dataset, v string) (datas.Dataset, error) {
return newDs.Database().Commit(context.Background(), newDs, types.String(v), datas.CommitOptions{Parents: []hash.Hash{mustHeadRef(parentDs).TargetHash()}})
return newDs.Database().Commit(context.Background(), newDs, types.String(v), datas.CommitOptions{Parents: []hash.Hash{mustHeadAddr(parentDs)}})
}
func mergeDatasets(vrw types.ValueReadWriter, ds1, ds2 datas.Dataset, v string) (datas.Dataset, error) {
return ds1.Database().Commit(context.Background(), ds1, types.String(v), datas.CommitOptions{Parents: []hash.Hash{mustHeadRef(ds1).TargetHash(), mustHeadRef(ds2).TargetHash()}})
return ds1.Database().Commit(context.Background(), ds1, types.String(v), datas.CommitOptions{Parents: []hash.Hash{mustHeadAddr(ds1), mustHeadAddr(ds2)}})
}
func mustHead(ds datas.Dataset) types.Struct {
@@ -128,15 +128,10 @@ func mustHead(ds datas.Dataset) types.Struct {
return s
}
func mustHeadRef(ds datas.Dataset) types.Ref {
hr, ok, err := ds.MaybeHeadRef()
d.PanicIfError(err)
if !ok {
panic("no head")
}
return hr
func mustHeadAddr(ds datas.Dataset) hash.Hash {
addr, ok := ds.MaybeHeadAddr()
d.PanicIfFalse(ok)
return addr
}
func mustHeadValue(ds datas.Dataset) types.Value {

View File

@@ -99,23 +99,19 @@ func runMerge(ctx context.Context, args []string) int {
closer()
util.CheckErrorNoUsage(err)
leftHeadRef, ok, err := leftDS.MaybeHeadRef()
d.PanicIfError(err)
leftHeadAddr, ok := leftDS.MaybeHeadAddr()
if !ok {
fmt.Fprintln(os.Stderr, args[1]+" has no head value.")
return 1
}
rightHeadRef, ok, err := rightDS.MaybeHeadRef()
d.PanicIfError(err)
rightHeadAddr, ok := rightDS.MaybeHeadAddr()
if !ok {
fmt.Fprintln(os.Stderr, args[2]+" has no head value.")
return 1
}
_, err = db.Commit(ctx, outDS, merged, datas.CommitOptions{Parents: []hash.Hash{leftHeadRef.TargetHash(), rightHeadRef.TargetHash()}})
_, err = db.Commit(ctx, outDS, merged, datas.CommitOptions{Parents: []hash.Hash{leftHeadAddr, rightHeadAddr}})
d.PanicIfError(err)
status.Printf("Done")

View File

@@ -82,7 +82,7 @@ func (s *nomsMergeTestSuite) TestNomsMerge_Success() {
"lst": mustValue(types.NewList(context.Background(), leftSpec.GetVRW(context.Background()), types.Float(1), types.String("foo"))),
"map": mustValue(types.NewMap(context.Background(), leftSpec.GetVRW(context.Background()), types.Float(1), types.String("foo"), types.String("foo"), types.Float(1))),
},
[]hash.Hash{p.TargetHash()})
[]hash.Hash{p})
r := s.setupMergeDataset(
rightSpec,
@@ -92,7 +92,7 @@ func (s *nomsMergeTestSuite) TestNomsMerge_Success() {
"lst": mustValue(types.NewList(context.Background(), rightSpec.GetVRW(context.Background()), types.Float(1), types.String("foo"))),
"map": mustValue(types.NewMap(context.Background(), rightSpec.GetVRW(context.Background()), types.Float(1), types.String("foo"), types.String("foo"), types.Float(1), types.Float(2), types.String("bar"))),
},
[]hash.Hash{p.TargetHash()})
[]hash.Hash{p})
expected := mustValue(types.NewStruct(parentSpec.GetVRW(context.Background()).Format(), "", types.StructData{
"num": types.Float(42),
@@ -117,22 +117,27 @@ func (s *nomsMergeTestSuite) spec(name string) spec.Spec {
return sp
}
func (s *nomsMergeTestSuite) setupMergeDataset(sp spec.Spec, data types.StructData, p []hash.Hash) types.Ref {
func (s *nomsMergeTestSuite) setupMergeDataset(sp spec.Spec, data types.StructData, p []hash.Hash) hash.Hash {
ds := sp.GetDataset(context.Background())
db := sp.GetDatabase(context.Background())
vrw := sp.GetVRW(context.Background())
ds, err := db.Commit(context.Background(), ds, mustValue(types.NewStruct(vrw.Format(), "", data)), datas.CommitOptions{Parents: p})
s.NoError(err)
return mustHeadRef(ds)
return mustHeadAddr(ds)
}
func (s *nomsMergeTestSuite) validateDataset(name string, expected types.Struct, parents ...types.Value) {
func (s *nomsMergeTestSuite) validateDataset(name string, expected types.Struct, parents ...hash.Hash) {
sp, err := spec.ForDataset(spec.CreateValueSpecString("nbs", s.DBDir, name))
vrw := sp.GetVRW(context.Background())
if s.NoError(err) {
defer sp.Close()
commit := mustHead(sp.GetDataset(context.Background()))
s.True(mustGetValue(commit.MaybeGet(datas.ParentsField)).Equals(mustSet(types.NewSet(context.Background(), vrw, parents...))))
vparents := mustGetValue(commit.MaybeGet(datas.ParentsListField)).(types.List)
i := 0
vparents.IterAll(context.TODO(), func(v types.Value, _ uint64) error {
s.True(v.(types.Ref).TargetHash() == parents[i])
i++
return nil
})
merged := mustHeadValue(sp.GetDataset(context.Background()))
s.True(expected.Equals(merged), "%s != %s", mustString(types.EncodedValue(context.Background(), expected)), mustString(types.EncodedValue(context.Background(), merged)))
}
@@ -148,8 +153,8 @@ func (s *nomsMergeTestSuite) TestNomsMerge_Left() {
defer rightSpec.Close()
p := s.setupMergeDataset(parentSpec, types.StructData{"num": types.Float(42)}, nil)
l := s.setupMergeDataset(leftSpec, types.StructData{"num": types.Float(43)}, []hash.Hash{p.TargetHash()})
r := s.setupMergeDataset(rightSpec, types.StructData{"num": types.Float(44)}, []hash.Hash{p.TargetHash()})
l := s.setupMergeDataset(leftSpec, types.StructData{"num": types.Float(43)}, []hash.Hash{p})
r := s.setupMergeDataset(rightSpec, types.StructData{"num": types.Float(44)}, []hash.Hash{p})
expected := mustValue(types.NewStruct(parentSpec.GetVRW(context.Background()).Format(), "", types.StructData{"num": types.Float(43)}))
@@ -173,8 +178,8 @@ func (s *nomsMergeTestSuite) TestNomsMerge_Right() {
defer rightSpec.Close()
p := s.setupMergeDataset(parentSpec, types.StructData{"num": types.Float(42)}, nil)
l := s.setupMergeDataset(leftSpec, types.StructData{"num": types.Float(43)}, []hash.Hash{p.TargetHash()})
r := s.setupMergeDataset(rightSpec, types.StructData{"num": types.Float(44)}, []hash.Hash{p.TargetHash()})
l := s.setupMergeDataset(leftSpec, types.StructData{"num": types.Float(43)}, []hash.Hash{p})
r := s.setupMergeDataset(rightSpec, types.StructData{"num": types.Float(44)}, []hash.Hash{p})
expected := mustValue(types.NewStruct(parentSpec.GetVRW(context.Background()).Format(), "", types.StructData{"num": types.Float(44)}))
@@ -197,8 +202,8 @@ func (s *nomsMergeTestSuite) TestNomsMerge_Conflict() {
rightSpec := s.spec(right)
defer rightSpec.Close()
p := s.setupMergeDataset(parentSpec, types.StructData{"num": types.Float(42)}, nil)
s.setupMergeDataset(leftSpec, types.StructData{"num": types.Float(43)}, []hash.Hash{p.TargetHash()})
s.setupMergeDataset(rightSpec, types.StructData{"num": types.Float(44)}, []hash.Hash{p.TargetHash()})
s.setupMergeDataset(leftSpec, types.StructData{"num": types.Float(43)}, []hash.Hash{p})
s.setupMergeDataset(rightSpec, types.StructData{"num": types.Float(44)}, []hash.Hash{p})
s.Panics(func() { s.MustRun(main, []string{"merge", s.DBDir, left, right, "output"}) })
}

View File

@@ -99,8 +99,7 @@ func runSync(ctx context.Context, args []string) int {
sourceRef, err := types.NewRef(sourceObj, sourceVRW.Format())
util.CheckError(err)
sinkRef, sinkExists, err := sinkDataset.MaybeHeadRef()
util.CheckError(err)
sinkAddr, sinkExists := sinkDataset.MaybeHeadAddr()
nonFF := false
srcCS := datas.ChunkStoreFromDatabase(sourceStore)
sinkCS := datas.ChunkStoreFromDatabase(sinkDB)
@@ -140,8 +139,8 @@ func runSync(ctx context.Context, args []string) int {
status.Done()
} else if !sinkExists {
fmt.Printf("All chunks already exist at destination! Created new dataset %s.\n", args[1])
} else if nonFF && !sourceRef.Equals(sinkRef) {
fmt.Printf("Abandoning %s; new head is %s\n", sinkRef.TargetHash(), sourceRef.TargetHash())
} else if nonFF && sourceRef.TargetHash() != sinkAddr {
fmt.Printf("Abandoning %s; new head is %s\n", sinkAddr, sourceRef.TargetHash())
} else {
fmt.Printf("Dataset %s is already up to date.\n", args[1])
}

View File

@@ -186,7 +186,7 @@ func (s *nomsSyncTestSuite) TestRewind() {
s.NoError(err)
src, err = datas.CommitValue(context.Background(), sourceDB, src, types.Float(42))
s.NoError(err)
rewindRef := mustHeadRef(src).TargetHash()
rewindRef := mustHeadAddr(src)
src, err = datas.CommitValue(context.Background(), sourceDB, src, types.Float(43))
s.NoError(err)
sourceDB.Close() // Close Database backing both Datasets

View File

@@ -152,7 +152,7 @@ func newCommitForValue(ctx context.Context, vrw types.ValueReadWriter, v types.V
return newCommit(ctx, v, parentsList, parentsClosure, includeParentsClosure, metaSt)
}
func FindCommonAncestorUsingParentsList(ctx context.Context, c1, c2 types.Ref, vr1, vr2 types.ValueReader) (types.Ref, bool, error) {
func findCommonAncestorUsingParentsList(ctx context.Context, c1, c2 types.Ref, vr1, vr2 types.ValueReader) (types.Ref, bool, error) {
c1Q, c2Q := RefByHeightHeap{c1}, RefByHeightHeap{c2}
for !c1Q.Empty() && !c2Q.Empty() {
c1Ht, c2Ht := c1Q.MaxHeight(), c2Q.MaxHeight()
@@ -192,14 +192,14 @@ func FindCommonAncestorUsingParentsList(ctx context.Context, c1, c2 types.Ref, v
//
// This implementation makes use of the parents_closure field on the commit
// struct. If the commit does not have a materialized parents_closure, this
// implementation delegates to FindCommonAncestorUsingParentsList.
// implementation delegates to findCommonAncestorUsingParentsList.
func FindCommonAncestor(ctx context.Context, c1, c2 types.Ref, vr1, vr2 types.ValueReader) (types.Ref, bool, error) {
pi1, err := newParentsClosureIterator(ctx, c1, vr1)
if err != nil {
return types.Ref{}, false, err
}
if pi1 == nil {
return FindCommonAncestorUsingParentsList(ctx, c1, c2, vr1, vr2)
return findCommonAncestorUsingParentsList(ctx, c1, c2, vr1, vr2)
}
pi2, err := newParentsClosureIterator(ctx, c2, vr2)
@@ -207,7 +207,7 @@ func FindCommonAncestor(ctx context.Context, c1, c2 types.Ref, vr1, vr2 types.Va
return types.Ref{}, false, err
}
if pi2 == nil {
return FindCommonAncestorUsingParentsList(ctx, c1, c2, vr1, vr2)
return findCommonAncestorUsingParentsList(ctx, c1, c2, vr1, vr2)
}
for {

View File

@@ -45,18 +45,11 @@ func mustHead(ds Dataset) types.Struct {
return s
}
func mustHeadRef(ds Dataset) types.Ref {
hr, ok, err := ds.MaybeHeadRef()
if err != nil {
panic("error getting head")
}
if !ok {
panic("no head")
}
return hr
func mustHeight(ds Dataset) uint64 {
h, ok, err := ds.MaybeHeight()
d.PanicIfError(err)
d.PanicIfFalse(ok)
return h
}
func mustHeadValue(ds Dataset) types.Value {
@@ -323,7 +316,7 @@ func assertCommonAncestor(t *testing.T, expected, a, b types.Struct, ldb, rdb *d
"FindCommonAncestor": FindCommonAncestor,
"SetClosure": commonAncWithSetClosure,
"LazyClosure": commonAncWithLazyClosure,
"FindCommonAncestorUsingParentsList": FindCommonAncestorUsingParentsList,
"FindCommonAncestorUsingParentsList": findCommonAncestorUsingParentsList,
}
for name, method := range methods {
@@ -354,12 +347,12 @@ func assertCommonAncestor(t *testing.T, expected, a, b types.Struct, ldb, rdb *d
}
// Add a commit and return it.
func addCommit(t *testing.T, db *database, datasetID string, val string, parents ...types.Struct) (types.Struct, types.Ref) {
func addCommit(t *testing.T, db *database, datasetID string, val string, parents ...types.Struct) (types.Struct, hash.Hash) {
ds, err := db.GetDataset(context.Background(), datasetID)
assert.NoError(t, err)
ds, err = db.Commit(context.Background(), ds, types.String(val), CommitOptions{Parents: mustCommitToTargetHashes(db, parents...)})
assert.NoError(t, err)
return mustHead(ds), mustHeadRef(ds)
return mustHead(ds), mustHeadAddr(ds)
}
func assertClosureMapValue(t *testing.T, vrw types.ValueReadWriter, v types.Value, h hash.Hash) bool {
@@ -493,57 +486,57 @@ func TestCommitParentsClosure(t *testing.T) {
}
a, b, c, d := "ds-a", "ds-b", "ds-c", "ds-d"
a1, a1r := addCommit(t, db, a, "a1")
a2, a2r := addCommit(t, db, a, "a2", a1)
a3, a3r := addCommit(t, db, a, "a3", a2)
a1, a1a := addCommit(t, db, a, "a1")
a2, a2a := addCommit(t, db, a, "a2", a1)
a3, a3a := addCommit(t, db, a, "a3", a2)
b1, b1r := addCommit(t, db, b, "b1", a1)
b2, b2r := addCommit(t, db, b, "b2", b1)
b3, b3r := addCommit(t, db, b, "b3", b2)
b1, b1a := addCommit(t, db, b, "b1", a1)
b2, b2a := addCommit(t, db, b, "b2", b1)
b3, b3a := addCommit(t, db, b, "b3", b2)
c1, c1r := addCommit(t, db, c, "c1", a3, b3)
c1, c1a := addCommit(t, db, c, "c1", a3, b3)
d1, _ := addCommit(t, db, d, "d1", c1, b3)
assertCommitParentsClosure(a1, []expected{})
assertCommitParentsClosure(a2, []expected{
{1, a1r.TargetHash()},
{1, a1a},
})
assertCommitParentsClosure(a3, []expected{
{1, a1r.TargetHash()},
{2, a2r.TargetHash()},
{1, a1a},
{2, a2a},
})
assertCommitParentsClosure(b1, []expected{
{1, a1r.TargetHash()},
{1, a1a},
})
assertCommitParentsClosure(b2, []expected{
{1, a1r.TargetHash()},
{2, b1r.TargetHash()},
{1, a1a},
{2, b1a},
})
assertCommitParentsClosure(b3, []expected{
{1, a1r.TargetHash()},
{2, b1r.TargetHash()},
{3, b2r.TargetHash()},
{1, a1a},
{2, b1a},
{3, b2a},
})
assertCommitParentsClosure(c1, []expected{
{1, a1r.TargetHash()},
{2, a2r.TargetHash()},
{2, b1r.TargetHash()},
{3, a3r.TargetHash()},
{3, b2r.TargetHash()},
{4, b3r.TargetHash()},
{1, a1a},
{2, a2a},
{2, b1a},
{3, a3a},
{3, b2a},
{4, b3a},
})
assertCommitParentsClosure(d1, []expected{
{1, a1r.TargetHash()},
{2, a2r.TargetHash()},
{2, b1r.TargetHash()},
{3, a3r.TargetHash()},
{3, b2r.TargetHash()},
{4, b3r.TargetHash()},
{5, c1r.TargetHash()},
{1, a1a},
{2, a2a},
{2, b1a},
{3, a3a},
{3, b2a},
{4, b3a},
{5, c1a},
})
}

View File

@@ -96,7 +96,7 @@ type Database interface {
// before attempting to write a new value. And it does the normal optimistic locking that Commit does, assuming the
// pessimistic locking passes. After this method runs, the two datasets given in |commitDS and |workingSetDS| are both
// updated in the new root, or neither of them are.
CommitWithWorkingSet(ctx context.Context, commitDS, workingSetDS Dataset, commit types.Value, workingSetSpec WorkingSetSpec, prevWsHash hash.Hash, opts CommitOptions) (Dataset, Dataset, error)
CommitWithWorkingSet(ctx context.Context, commitDS, workingSetDS Dataset, val types.Value, workingSetSpec WorkingSetSpec, prevWsHash hash.Hash, opts CommitOptions) (Dataset, Dataset, error)
// Delete removes the Dataset named ds.ID() from the map at the root of
// the Database. If the Dataset is already not present in the map,

View File

@@ -478,7 +478,7 @@ func (db *database) doFastForward(ctx context.Context, ds Dataset, newHeadAddr h
if err != nil {
return err
}
if !found || mergeNeeded(ref, ancestorRef, newRef) {
if !found || mergeNeeded(currentHeadAddr, ancestorRef.TargetHash()) {
return ErrMergeNeeded
}
}
@@ -491,12 +491,7 @@ func (db *database) doFastForward(ctx context.Context, ds Dataset, newHeadAddr h
}
func (db *database) Commit(ctx context.Context, ds Dataset, v types.Value, opts CommitOptions) (Dataset, error) {
var currentAddr hash.Hash
if currRef, ok, err := ds.MaybeHeadRef(); err != nil {
return Dataset{}, err
} else if ok {
currentAddr = currRef.TargetHash()
}
currentAddr, _ := ds.MaybeHeadAddr()
st, err := buildNewCommit(ctx, ds, v, opts)
if err != nil {
return Dataset{}, err
@@ -560,8 +555,8 @@ func (db *database) doCommit(ctx context.Context, datasetID string, datasetCurre
})
}
func mergeNeeded(currentHeadRef types.Ref, ancestorRef types.Ref, commitRef types.Ref) bool {
return currentHeadRef.TargetHash() != ancestorRef.TargetHash()
func mergeNeeded(currentAddr hash.Hash, ancestorAddr hash.Hash) bool {
return currentAddr != ancestorAddr
}
func (db *database) Tag(ctx context.Context, ds Dataset, commitAddr hash.Hash, opts TagOptions) (Dataset, error) {
@@ -688,14 +683,7 @@ func (db *database) CommitWithWorkingSet(
return Dataset{}, Dataset{}, err
}
var currDSHash hash.Hash
currDSRef, ok, err := commitDS.MaybeHeadRef()
if err != nil {
return Dataset{}, Dataset{}, err
}
if ok {
currDSHash = currDSRef.TargetHash()
}
currDSHash, _ := commitDS.MaybeHeadAddr()
err = db.update(ctx, func(ctx context.Context, datasets types.Map) (types.Map, error) {
success, err := assertDatasetHash(ctx, datasets, workingSetDS.ID(), prevWsHash)
@@ -707,21 +695,16 @@ func (db *database) CommitWithWorkingSet(
return types.Map{}, ErrOptimisticLockFailed
}
r, hasHead, err := datasets.MaybeGet(ctx, types.String(commitDS.ID()))
if err != nil {
var currDS hash.Hash
if r, hasHead, err := datasets.MaybeGet(ctx, types.String(commitDS.ID())); err != nil {
return types.Map{}, err
} else if hasHead {
currDS = r.(types.Ref).TargetHash()
}
if hasHead {
currentHeadRef := r.(types.Ref)
ancestorRef, found, err := FindCommonAncestor(ctx, commitRef, currentHeadRef, db, db)
if err != nil {
return types.Map{}, err
}
if !found || mergeNeeded(currentHeadRef, ancestorRef, commitRef) {
return types.Map{}, ErrMergeNeeded
}
if currDS != currDSHash {
return types.Map{}, ErrMergeNeeded
}
return datasets.Edit().
@@ -952,20 +935,16 @@ func (db *database) validateWorkingSet(t types.Struct) error {
func buildNewCommit(ctx context.Context, ds Dataset, v types.Value, opts CommitOptions) (types.Struct, error) {
if opts.Parents == nil || len(opts.Parents) == 0 {
if headRef, ok, err := ds.MaybeHeadRef(); err != nil {
return types.Struct{}, err
} else if ok {
opts.Parents = []hash.Hash{headRef.TargetHash()}
headAddr, ok := ds.MaybeHeadAddr()
if ok {
opts.Parents = []hash.Hash{headAddr}
}
} else {
curr, ok, err := ds.MaybeHeadRef()
if err != nil {
return types.Struct{}, err
}
curr, ok := ds.MaybeHeadAddr()
if ok {
found := false
for _, h := range opts.Parents {
if h == curr.TargetHash() {
if h == curr {
found = true
break
}

View File

@@ -210,31 +210,31 @@ func (suite *DatabaseSuite) TestDatabaseCommit() {
// ds2 matches the Datasets Map in suite.db
suiteDS, err := suite.db.GetDataset(context.Background(), datasetID)
suite.NoError(err)
headRef := mustHeadRef(suiteDS)
suite.True(mustHeadRef(ds2).Equals(headRef))
headAddr := mustHeadAddr(suiteDS)
suite.True(mustHeadAddr(ds2) == headAddr)
// ds2 has |a| at its head
h, ok, err := ds2.MaybeHeadValue()
suite.NoError(err)
suite.True(ok)
suite.True(h.Equals(a))
suite.Equal(uint64(1), mustHeadRef(ds2).Height())
suite.Equal(uint64(1), mustHeight(ds2))
ds = ds2
aCommitRef := mustHeadRef(ds) // to be used to test disallowing of non-fastforward commits below
aCommitAddr := mustHeadAddr(ds) // to be used to test disallowing of non-fastforward commits below
// |a| <- |b|
b := types.String("b")
ds, err = CommitValue(context.Background(), suite.db, ds, b)
suite.NoError(err)
suite.True(mustHeadValue(ds).Equals(b))
suite.Equal(uint64(2), mustHeadRef(ds).Height())
suite.Equal(uint64(2), mustHeight(ds))
// |a| <- |b|
// \----|c|
// Should be disallowed.
c := types.String("c")
_, err = suite.db.Commit(context.Background(), ds, c, newOpts(suite.db, aCommitRef))
_, err = suite.db.Commit(context.Background(), ds, c, newOpts(suite.db, aCommitAddr))
suite.Error(err)
suite.True(mustHeadValue(ds).Equals(b))
@@ -243,11 +243,11 @@ func (suite *DatabaseSuite) TestDatabaseCommit() {
ds, err = CommitValue(context.Background(), suite.db, ds, d)
suite.NoError(err)
suite.True(mustHeadValue(ds).Equals(d))
suite.Equal(uint64(3), mustHeadRef(ds).Height())
suite.Equal(uint64(3), mustHeight(ds))
// Attempt to recommit |b| with |a| as parent.
// Should be disallowed.
_, err = suite.db.Commit(context.Background(), ds, b, newOpts(suite.db, aCommitRef))
_, err = suite.db.Commit(context.Background(), ds, b, newOpts(suite.db, aCommitAddr))
suite.Error(err)
suite.True(mustHeadValue(ds).Equals(d))
@@ -336,8 +336,8 @@ func assertMapOfStringToRefOfCommit(ctx context.Context, proposed, datasets type
d.PanicIfError(derr)
}
func newOpts(vrw types.ValueReadWriter, parent types.Ref) CommitOptions {
return CommitOptions{Parents: []hash.Hash{parent.TargetHash()}}
func newOpts(vrw types.ValueReadWriter, parent hash.Hash) CommitOptions {
return CommitOptions{Parents: []hash.Hash{parent}}
}
func (suite *DatabaseSuite) TestDatabaseDuplicateCommit() {
@@ -396,8 +396,7 @@ func (suite *DatabaseSuite) TestDatabaseDelete() {
suite.Equal(uint64(1), datasets.Len())
newDS, err := newDB.GetDataset(context.Background(), datasetID2)
suite.NoError(err)
_, present, err = newDS.MaybeHeadRef()
suite.NoError(err)
present = newDS.HasHead()
suite.True(present, "Dataset %s should be present", datasetID2)
}
@@ -496,8 +495,7 @@ func (suite *DatabaseSuite) TestDeleteWithConcurrentChunkStoreUse() {
// Attempted concurrent delete, which should proceed without a problem
ds1, err = suite.db.Delete(context.Background(), ds1)
suite.NoError(err)
_, present, err := ds1.MaybeHeadRef()
suite.NoError(err)
present := ds1.HasHead()
suite.False(present, "Dataset %s should not be present", datasetID)
}
@@ -511,19 +509,19 @@ func (suite *DatabaseSuite) TestSetHead() {
a := types.String("a")
ds, err = CommitValue(context.Background(), suite.db, ds, a)
suite.NoError(err)
aCommitRef := mustHeadRef(ds) // To use in non-FF SetHeadToCommit() below.
aCommitAddr := mustHeadAddr(ds) // To use in non-FF SetHeadToCommit() below.
b := types.String("b")
ds, err = CommitValue(context.Background(), suite.db, ds, b)
suite.NoError(err)
suite.True(mustHeadValue(ds).Equals(b))
bCommitRef := mustHeadRef(ds) // To use in FF SetHeadToCommit() below.
bCommitAddr := mustHeadAddr(ds) // To use in FF SetHeadToCommit() below.
ds, err = suite.db.SetHead(context.Background(), ds, aCommitRef.TargetHash())
ds, err = suite.db.SetHead(context.Background(), ds, aCommitAddr)
suite.NoError(err)
suite.True(mustHeadValue(ds).Equals(a))
ds, err = suite.db.SetHead(context.Background(), ds, bCommitRef.TargetHash())
ds, err = suite.db.SetHead(context.Background(), ds, bCommitAddr)
suite.NoError(err)
suite.True(mustHeadValue(ds).Equals(b))
}
@@ -537,7 +535,7 @@ func (suite *DatabaseSuite) TestFastForward() {
a := types.String("a")
ds, err = CommitValue(context.Background(), suite.db, ds, a)
suite.NoError(err)
aCommitRef := mustHeadRef(ds) // To use in non-FF cases below.
aCommitAddr := mustHeadAddr(ds) // To use in non-FF cases below.
b := types.String("b")
ds, err = CommitValue(context.Background(), suite.db, ds, b)
@@ -548,19 +546,19 @@ func (suite *DatabaseSuite) TestFastForward() {
ds, err = CommitValue(context.Background(), suite.db, ds, c)
suite.NoError(err)
suite.True(mustHeadValue(ds).Equals(c))
cCommitRef := mustHeadRef(ds) // To use in FastForward() below.
cCommitAddr := mustHeadAddr(ds) // To use in FastForward() below.
// FastForward should disallow this, as |a| is not a descendant of |c|
_, err = suite.db.FastForward(context.Background(), ds, aCommitRef.TargetHash())
_, err = suite.db.FastForward(context.Background(), ds, aCommitAddr)
suite.Error(err)
// Move Head back to something earlier in the lineage, so we can test FastForward
ds, err = suite.db.SetHead(context.Background(), ds, aCommitRef.TargetHash())
ds, err = suite.db.SetHead(context.Background(), ds, aCommitAddr)
suite.NoError(err)
suite.True(mustHeadValue(ds).Equals(a))
// This should succeed, because while |a| is not a direct parent of |c|, it is an ancestor.
ds, err = suite.db.FastForward(context.Background(), ds, cCommitRef.TargetHash())
ds, err = suite.db.FastForward(context.Background(), ds, cCommitAddr)
suite.NoError(err)
suite.True(mustHeadValue(ds).Equals(c))
}

View File

@@ -237,6 +237,17 @@ func (ds Dataset) MaybeHeadAddr() (hash.Hash, bool) {
return ds.head.Addr(), true
}
func (ds Dataset) MaybeHeight() (uint64, bool, error) {
r, ok, err := ds.MaybeHeadRef()
if err != nil {
return 0, false, err
}
if !ok {
return 0, false, nil
}
return r.Height(), true, nil
}
func (ds Dataset) IsTag() bool {
return ds.head != nil && ds.head.TypeName() == TagName
}

View File

@@ -169,17 +169,17 @@ func (suite *PullSuite) TestPullEverything() {
expectedReads := suite.sinkCS.Reads()
l := buildListOfHeight(2, suite.sourceVRW)
sourceRef := suite.commitToSource(l, nil)
sourceAddr := suite.commitToSource(l, nil)
pt := startProgressTracker()
wrf, err := types.WalkRefsForChunkStore(suite.sourceCS)
suite.NoError(err)
err = Pull(context.Background(), suite.sourceCS, suite.sinkCS, wrf, sourceRef.TargetHash(), pt.Ch)
err = Pull(context.Background(), suite.sourceCS, suite.sinkCS, wrf, sourceAddr, pt.Ch)
suite.NoError(err)
suite.True(expectedReads-suite.sinkCS.Reads() <= suite.commitReads)
pt.Validate(suite)
v := mustValue(suite.sinkVRW.ReadValue(context.Background(), sourceRef.TargetHash())).(types.Struct)
v := mustValue(suite.sinkVRW.ReadValue(context.Background(), sourceAddr)).(types.Struct)
suite.NotNil(v)
suite.True(l.Equals(mustGetValue(v.MaybeGet(datas.ValueField))))
}
@@ -209,23 +209,23 @@ func (suite *PullSuite) TestPullMultiGeneration() {
expectedReads := suite.sinkCS.Reads()
srcL := buildListOfHeight(2, suite.sourceVRW)
sourceRef := suite.commitToSource(srcL, nil)
sourceAddr := suite.commitToSource(srcL, nil)
srcL = buildListOfHeight(4, suite.sourceVRW)
sourceRef = suite.commitToSource(srcL, []hash.Hash{sourceRef.TargetHash()})
sourceAddr = suite.commitToSource(srcL, []hash.Hash{sourceAddr})
srcL = buildListOfHeight(5, suite.sourceVRW)
sourceRef = suite.commitToSource(srcL, []hash.Hash{sourceRef.TargetHash()})
sourceAddr = suite.commitToSource(srcL, []hash.Hash{sourceAddr})
pt := startProgressTracker()
wrf, err := types.WalkRefsForChunkStore(suite.sourceCS)
suite.NoError(err)
err = Pull(context.Background(), suite.sourceCS, suite.sinkCS, wrf, sourceRef.TargetHash(), pt.Ch)
err = Pull(context.Background(), suite.sourceCS, suite.sinkCS, wrf, sourceAddr, pt.Ch)
suite.NoError(err)
suite.True(expectedReads-suite.sinkCS.Reads() <= suite.commitReads)
pt.Validate(suite)
v, err := suite.sinkVRW.ReadValue(context.Background(), sourceRef.TargetHash())
v, err := suite.sinkVRW.ReadValue(context.Background(), sourceAddr)
suite.NoError(err)
suite.NotNil(v)
suite.True(srcL.Equals(mustGetValue(v.(types.Struct).MaybeGet(datas.ValueField))))
@@ -255,30 +255,30 @@ func (suite *PullSuite) TestPullMultiGeneration() {
// \ -1-> L0
func (suite *PullSuite) TestPullDivergentHistory() {
sinkL := buildListOfHeight(3, suite.sinkVRW)
sinkRef := suite.commitToSink(sinkL, nil)
sinkAddr := suite.commitToSink(sinkL, nil)
srcL := buildListOfHeight(3, suite.sourceVRW)
sourceRef := suite.commitToSource(srcL, nil)
sourceAddr := suite.commitToSource(srcL, nil)
var err error
sinkL, err = sinkL.Edit().Append(types.String("oy!")).List(context.Background())
suite.NoError(err)
sinkRef = suite.commitToSink(sinkL, []hash.Hash{sinkRef.TargetHash()})
sinkAddr = suite.commitToSink(sinkL, []hash.Hash{sinkAddr})
srcL, err = srcL.Edit().Set(1, buildListOfHeight(5, suite.sourceVRW)).List(context.Background())
suite.NoError(err)
sourceRef = suite.commitToSource(srcL, []hash.Hash{sourceRef.TargetHash()})
sourceAddr = suite.commitToSource(srcL, []hash.Hash{sourceAddr})
preReads := suite.sinkCS.Reads()
pt := startProgressTracker()
wrf, err := types.WalkRefsForChunkStore(suite.sourceCS)
suite.NoError(err)
err = Pull(context.Background(), suite.sourceCS, suite.sinkCS, wrf, sourceRef.TargetHash(), pt.Ch)
err = Pull(context.Background(), suite.sourceCS, suite.sinkCS, wrf, sourceAddr, pt.Ch)
suite.NoError(err)
suite.True(preReads-suite.sinkCS.Reads() <= suite.commitReads)
pt.Validate(suite)
v, err := suite.sinkVRW.ReadValue(context.Background(), sourceRef.TargetHash())
v, err := suite.sinkVRW.ReadValue(context.Background(), sourceAddr)
suite.NoError(err)
suite.NotNil(v)
suite.True(srcL.Equals(mustGetValue(v.(types.Struct).MaybeGet(datas.ValueField))))
@@ -306,7 +306,7 @@ func (suite *PullSuite) TestPullUpdates() {
expectedReads := suite.sinkCS.Reads()
srcL := buildListOfHeight(4, suite.sourceVRW)
sourceRef := suite.commitToSource(srcL, nil)
sourceAddr := suite.commitToSource(srcL, nil)
L3 := mustValue(mustValue(srcL.Get(context.Background(), 1)).(types.Ref).TargetValue(context.Background(), suite.sourceVRW)).(types.List)
L2 := mustValue(mustValue(L3.Get(context.Background(), 1)).(types.Ref).TargetValue(context.Background(), suite.sourceVRW)).(types.List)
L2Ed := L2.Edit().Append(mustRef(suite.sourceVRW.WriteValue(context.Background(), types.String("oy!"))))
@@ -318,40 +318,40 @@ func (suite *PullSuite) TestPullUpdates() {
srcLEd := srcL.Edit().Set(1, mustRef(suite.sourceVRW.WriteValue(context.Background(), L3)))
srcL, err = srcLEd.List(context.Background())
suite.NoError(err)
sourceRef = suite.commitToSource(srcL, []hash.Hash{sourceRef.TargetHash()})
sourceAddr = suite.commitToSource(srcL, []hash.Hash{sourceAddr})
pt := startProgressTracker()
wrf, err := types.WalkRefsForChunkStore(suite.sourceCS)
suite.NoError(err)
err = Pull(context.Background(), suite.sourceCS, suite.sinkCS, wrf, sourceRef.TargetHash(), pt.Ch)
err = Pull(context.Background(), suite.sourceCS, suite.sinkCS, wrf, sourceAddr, pt.Ch)
suite.NoError(err)
suite.True(expectedReads-suite.sinkCS.Reads() <= suite.commitReads)
pt.Validate(suite)
v, err := suite.sinkVRW.ReadValue(context.Background(), sourceRef.TargetHash())
v, err := suite.sinkVRW.ReadValue(context.Background(), sourceAddr)
suite.NoError(err)
suite.NotNil(v)
suite.True(srcL.Equals(mustGetValue(v.(types.Struct).MaybeGet(datas.ValueField))))
}
func (suite *PullSuite) commitToSource(v types.Value, p []hash.Hash) types.Ref {
func (suite *PullSuite) commitToSource(v types.Value, p []hash.Hash) hash.Hash {
db := datas.NewTypesDatabase(suite.sourceVRW.(*types.ValueStore))
ds, err := db.GetDataset(context.Background(), datasetID)
suite.NoError(err)
ds, err = db.Commit(context.Background(), ds, v, datas.CommitOptions{Parents: p})
suite.NoError(err)
return mustHeadRef(ds)
return mustHeadAddr(ds)
}
func (suite *PullSuite) commitToSink(v types.Value, p []hash.Hash) types.Ref {
func (suite *PullSuite) commitToSink(v types.Value, p []hash.Hash) hash.Hash {
db := datas.NewTypesDatabase(suite.sinkVRW.(*types.ValueStore))
ds, err := db.GetDataset(context.Background(), datasetID)
suite.NoError(err)
ds, err = db.Commit(context.Background(), ds, v, datas.CommitOptions{Parents: p})
suite.NoError(err)
return mustHeadRef(ds)
return mustHeadAddr(ds)
}
func buildListOfHeight(height int, vrw types.ValueReadWriter) types.List {
@@ -596,16 +596,8 @@ func mustRef(ref types.Ref, err error) types.Ref {
return ref
}
func mustHeadRef(ds datas.Dataset) types.Ref {
hr, ok, err := ds.MaybeHeadRef()
if err != nil {
panic("error getting head")
}
if !ok {
panic("no head")
}
return hr
func mustHeadAddr(ds datas.Dataset) hash.Hash {
addr, ok := ds.MaybeHeadAddr()
d.PanicIfFalse(ok)
return addr
}

View File

@@ -238,7 +238,7 @@ func TestPuller(t *testing.T) {
require.NoError(t, err)
var parent []hash.Hash
states := map[string]types.Ref{}
states := map[string]hash.Hash{}
for _, delta := range deltas {
for tbl, sets := range delta.sets {
rootMap, err = addTableValues(ctx, vs, rootMap, tbl, sets...)
@@ -261,13 +261,12 @@ func TestPuller(t *testing.T) {
ds, err = db.Commit(ctx, ds, rootMap, commitOpts)
require.NoError(t, err)
r, ok, err := ds.MaybeHeadRef()
require.NoError(t, err)
dsAddr, ok := ds.MaybeHeadAddr()
require.True(t, ok)
parent = []hash.Hash{r.TargetHash()}
parent = []hash.Hash{dsAddr}
states[delta.name] = r
states[delta.name] = dsAddr
}
tbl, err := makeABigTable(ctx, vs)
@@ -285,13 +284,12 @@ func TestPuller(t *testing.T) {
ds, err = db.Commit(ctx, ds, rootMap, commitOpts)
require.NoError(t, err)
r, ok, err := ds.MaybeHeadRef()
require.NoError(t, err)
addr, ok := ds.MaybeHeadAddr()
require.True(t, ok)
states["add big table"] = r
states["add big table"] = addr
for k, rootRef := range states {
for k, rootAddr := range states {
t.Run(k, func(t *testing.T) {
eventCh := make(chan PullerEvent, 128)
wg := new(sync.WaitGroup)
@@ -323,7 +321,7 @@ func TestPuller(t *testing.T) {
require.NoError(t, err)
wrf, err := types.WalkRefsForChunkStore(datas.ChunkStoreFromDatabase(db))
require.NoError(t, err)
plr, err := NewPuller(ctx, tmpDir, 128, datas.ChunkStoreFromDatabase(db), datas.ChunkStoreFromDatabase(sinkdb), wrf, rootRef.TargetHash(), eventCh)
plr, err := NewPuller(ctx, tmpDir, 128, datas.ChunkStoreFromDatabase(db), datas.ChunkStoreFromDatabase(sinkdb), wrf, rootAddr, eventCh)
require.NoError(t, err)
err = plr.Pull(ctx)
@@ -333,15 +331,14 @@ func TestPuller(t *testing.T) {
sinkDS, err := sinkdb.GetDataset(ctx, "ds")
require.NoError(t, err)
sinkDS, err = sinkdb.FastForward(ctx, sinkDS, rootRef.TargetHash())
sinkDS, err = sinkdb.FastForward(ctx, sinkDS, rootAddr)
require.NoError(t, err)
require.NoError(t, err)
sinkRootRef, ok, err := sinkDS.MaybeHeadRef()
require.NoError(t, err)
sinkRootAddr, ok := sinkDS.MaybeHeadAddr()
require.True(t, ok)
eq, err := pullerRefEquality(ctx, rootRef, sinkRootRef, vs, sinkvs)
eq, err := pullerAddrEquality(ctx, rootAddr, sinkRootAddr, vs, sinkvs)
require.NoError(t, err)
assert.True(t, eq)
@@ -371,135 +368,21 @@ func makeABigTable(ctx context.Context, vrw types.ValueReadWriter) (types.Map, e
return me.Map(ctx)
}
func pullerRefEquality(ctx context.Context, expectad, actual types.Ref, src, sink types.ValueReadWriter) (bool, error) {
expectedVal, err := expectad.TargetValue(ctx, src)
if err != nil {
return false, err
}
actualVal, err := actual.TargetValue(ctx, sink)
if err != nil {
return false, err
}
exPs, exTbls, err := parentsAndTables(expectedVal.(types.Struct))
if err != nil {
return false, err
}
actPs, actTbls, err := parentsAndTables(actualVal.(types.Struct))
if err != nil {
return false, err
}
if !exPs.Equals(actPs) {
func pullerAddrEquality(ctx context.Context, expected, actual hash.Hash, src, sink types.ValueReadWriter) (bool, error) {
if expected != actual {
return false, nil
}
err = exTbls.IterAll(ctx, func(key, exVal types.Value) error {
actVal, ok, err := actTbls.MaybeGet(ctx, key)
if err != nil {
return err
}
if !ok {
return errors.New("Missing table " + string(key.(types.String)))
}
exMapVal, err := exVal.(types.Ref).TargetValue(ctx, src)
if err != nil {
return err
}
actMapVal, err := actVal.(types.Ref).TargetValue(ctx, sink)
if err != nil {
return err
}
return errIfNotEqual(ctx, exMapVal.(types.Map), actMapVal.(types.Map))
})
expectedVal, err := src.ReadValue(ctx, expected)
if err != nil {
return false, err
}
actualVal, err := sink.ReadValue(ctx, actual)
if err != nil {
return false, err
}
return exTbls.Equals(actTbls), nil
}
var errNotEqual = errors.New("not equal")
func errIfNotEqual(ctx context.Context, ex, act types.Map) error {
exItr, err := ex.Iterator(ctx)
if err != nil {
return err
}
actItr, err := act.Iterator(ctx)
if err != nil {
return err
}
for {
exK, exV, err := exItr.Next(ctx)
if err != nil {
return err
}
actK, actV, err := actItr.Next(ctx)
if err != nil {
return err
}
if actK == nil && exK == nil {
break
} else if exK == nil || actK == nil {
return errNotEqual
}
if exV == nil && actV == nil {
continue
} else if exV == nil || actV == nil {
return errNotEqual
}
if !exK.Equals(actK) || !exV.Equals(actV) {
return errNotEqual
}
}
return nil
}
func parentsAndTables(cm types.Struct) (types.List, types.Map, error) {
ps, ok, err := cm.MaybeGet(datas.ParentsListField)
if err != nil {
return types.EmptyList, types.EmptyMap, err
}
if !ok {
return types.EmptyList, types.EmptyMap, err
}
tbls, ok, err := cm.MaybeGet("value")
if err != nil {
return types.EmptyList, types.EmptyMap, err
}
if !ok {
return types.EmptyList, types.EmptyMap, err
}
return ps.(types.List), tbls.(types.Map), nil
return expectedVal.Equals(actualVal), nil
}
func writeValAndGetRef(ctx context.Context, vrw types.ValueReadWriter, val types.Value) (types.Ref, error) {

View File

@@ -77,17 +77,10 @@ func readTupleFromDB(ctx context.Context, t require.TestingT, dsID string) (*typ
ds, err := db.GetDataset(ctx, dsID)
require.NoError(t, err)
ref, ok, err := ds.MaybeHeadRef()
val, ok, err := ds.MaybeHeadValue()
require.NoError(t, err)
require.True(t, ok)
val, err := ref.TargetValue(ctx, vrw)
require.NoError(t, err)
st := val.(types.Struct)
val, ok, err = st.MaybeGet("value")
require.NoError(t, err)
require.True(t, ok)
tup := val.(types.Tuple)
valSlice, err := tup.AsSlice()
require.NoError(t, err)

View File

@@ -21,105 +21,6 @@
package types
import (
"sort"
"github.com/dolthub/dolt/go/store/hash"
)
// RefByHeight implements sort.Interface to order by increasing HeightOrder(). It uses increasing order because this causes repeated pushes and pops of the 'tallest' Refs to re-use memory, avoiding reallocations.
// We might consider making this a firmer abstraction boundary as a part of BUG 2182
type RefByHeight []Ref
func (h RefByHeight) Len() int {
return len(h)
}
func (h RefByHeight) Less(i, j int) bool {
return !HeightOrder(h[i], h[j])
}
func (h RefByHeight) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}
func (h *RefByHeight) PushBack(r Ref) {
*h = append(*h, r)
}
func (h *RefByHeight) PopBack() Ref {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// DropIndices takes a slice of integer indices into h and splices out the Refs at those indices.
func (h *RefByHeight) DropIndices(indices []int) {
sort.Ints(indices)
old := *h
numIdx := len(indices)
for i, j := 0, 0; i < old.Len(); i++ {
if len(indices) > 0 && i == indices[0] {
indices = indices[1:]
continue
}
if i != j {
old[j] = old[i]
}
j++
}
*h = old[:old.Len()-numIdx]
}
func (h *RefByHeight) Unique() {
seen := hash.HashSet{}
result := make(RefByHeight, 0, cap(*h))
for _, r := range *h {
target := r.TargetHash()
if !seen.Has(target) {
result = append(result, r)
}
seen.Insert(target)
}
*h = result
}
// PopRefsOfHeight pops off and returns all refs r in h for which r.Height() == height.
func (h *RefByHeight) PopRefsOfHeight(height uint64) (refs RefSlice) {
for h.MaxHeight() == height {
r := h.PopBack()
refs = append(refs, r)
}
return
}
// MaxHeight returns the height of the 'tallest' Ref in h.
func (h RefByHeight) MaxHeight() uint64 {
if h.Empty() {
return 0
}
return h.PeekEnd().Height()
}
func (h RefByHeight) Empty() bool {
return h.Len() == 0
}
// PeekEnd returns, but does not Pop the tallest Ref in h.
func (h RefByHeight) PeekEnd() (head Ref) {
return h.PeekAt(h.Len() - 1)
}
// PeekAt returns, but does not remove, the Ref at h[idx]. If the index is out of range, returns the empty Ref.
func (h RefByHeight) PeekAt(idx int) (peek Ref) {
if idx >= 0 && idx < h.Len() {
peek = h[idx]
}
return
}
// HeightOrder returns true if a is 'higher than' b, generally if its ref-height is greater. If the two are of the same height, fall back to sorting by TargetHash.
func HeightOrder(a, b Ref) bool {
if a.Height() == b.Height() {

View File

@@ -1,137 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// This file incorporates work covered by the following copyright and
// permission notice:
//
// Copyright 2016 Attic Labs, Inc. All rights reserved.
// Licensed under the Apache License, version 2.0:
// http://www.apache.org/licenses/LICENSE-2.0
package types
import (
"sort"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestRefByHeight(t *testing.T) {
unique := 0
newRefWithHeight := func(height uint64) (Ref, error) {
v := Float(unique)
unique++
h, err := v.Hash(Format_7_18)
if err != nil {
return Ref{}, err
}
return constructRef(Format_7_18, h, PrimitiveTypeMap[FloatKind], height)
}
assert := assert.New(t)
h := RefByHeight{}
r1, err := newRefWithHeight(1)
require.NoError(t, err)
r2, err := newRefWithHeight(2)
require.NoError(t, err)
r3, err := newRefWithHeight(3)
require.NoError(t, err)
r4, err := newRefWithHeight(2)
require.NoError(t, err)
h.PushBack(r1)
assert.Equal(r1, h.PeekEnd())
assert.Equal(1, len(h))
h.PushBack(r3)
sort.Sort(&h)
assert.Equal(r3, h.PeekEnd())
assert.Equal(2, len(h))
h.PushBack(r2)
sort.Sort(&h)
assert.Equal(r3, h.PeekEnd())
assert.Equal(3, len(h))
h.PushBack(r4)
sort.Sort(&h)
assert.Equal(r3, h.PeekEnd())
assert.Equal(4, len(h))
expectedSecond, expectedThird := func() (Ref, Ref) {
if r2.TargetHash().Less(r4.TargetHash()) {
return r2, r4
}
return r4, r2
}()
assert.Equal(r3, h.PopBack())
assert.Equal(expectedSecond, h.PeekEnd())
assert.Equal(3, len(h))
assert.Equal(expectedSecond, h.PopBack())
assert.Equal(expectedThird, h.PeekEnd())
assert.Equal(2, len(h))
assert.Equal(expectedThird, h.PopBack())
assert.Equal(r1, h.PeekEnd())
assert.Equal(1, len(h))
assert.Equal(r1, h.PopBack())
assert.Equal(0, len(h))
}
func TestDropIndices(t *testing.T) {
h := &RefByHeight{}
for i := 0; i < 10; i++ {
ref, err := NewRef(Float(i), Format_7_18)
require.NoError(t, err)
h.PushBack(ref)
}
sort.Sort(h)
toDrop := []int{2, 4, 7}
expected := RefSlice{h.PeekAt(2), h.PeekAt(4), h.PeekAt(7)}
h.DropIndices(toDrop)
assert.Len(t, *h, 7)
for i, dropped := range expected {
assert.NotContains(t, *h, dropped, "Should not contain %d", toDrop[i])
}
}
func TestPopRefsOfHeight(t *testing.T) {
h := &RefByHeight{}
for i, n := range []int{6, 3, 6, 6, 2} {
hsh, err := Float(i).Hash(Format_7_18)
require.NoError(t, err)
r, err := constructRef(Format_7_18, hsh, PrimitiveTypeMap[FloatKind], uint64(n))
require.NoError(t, err)
h.PushBack(r)
}
sort.Sort(h)
expected := RefSlice{h.PeekAt(4), h.PeekAt(3), h.PeekAt(2)}
refs := h.PopRefsOfHeight(h.MaxHeight())
assert.Len(t, *h, 2)
assert.Len(t, refs, 3)
for _, popped := range expected {
assert.NotContains(t, *h, popped, "Should not contain ref of height 6")
}
}

View File

@@ -92,13 +92,9 @@ func WriteToWriter(ctx context.Context, wr io.Writer, store *FileValueStore, val
return err
}
ref, _, err := ds.MaybeHeadRef()
addr, _ := ds.MaybeHeadAddr()
if err != nil {
return err
}
err = write(wr, ref.TargetHash(), store)
err = write(wr, addr, store)
if err != nil {
return err