Merge pull request #3607 from dolthub/aaron/dolt_dev-parent-closure-in-commits

[no-release-notes] go/store/datas: Store parent commit closure in DOLT_DEV, DOLT_1 formats.
This commit is contained in:
Aaron Son
2022-06-14 10:40:29 -07:00
committed by GitHub
16 changed files with 714 additions and 455 deletions

View File

@@ -575,7 +575,7 @@ func (ddb *DoltDB) CommitDanglingWithParentCommits(ctx context.Context, valHash
}
commitOpts := datas.CommitOptions{Parents: parents, Meta: cm}
dcommit, err := datas.NewCommitForValue(ctx, ddb.vrw, val, commitOpts)
dcommit, err := datas.NewCommitForValue(ctx, datas.ChunkStoreFromDatabase(ddb.db), ddb.vrw, val, commitOpts)
if err != nil {
return nil, err
}

View File

@@ -88,7 +88,7 @@ func LoadedLocalLocation() *time.Location {
func BasicSelectTests() []SelectTest {
headCommitHash := "73hc2robs4v0kt9taoe3m5hd49dmrgun"
if types.Format_Default == types.Format_DOLT_DEV {
headCommitHash = "hqcfvhaumjtetijsqdnnjhj87f0lesbo"
headCommitHash = "r5hevva9fc9ul414fm5lo11r8vcqifc1"
}
return []SelectTest{
{

View File

@@ -64,8 +64,8 @@ func (s *nomsDsTestSuite) TestNomsDs() {
golden1 := "oetp3jigkp5pid2f5c4mknpo17mso31b"
golden2 := "tsbj1qq88llk3k8qqqb5n3188sbpiu7r"
if types.Format_Default == types.Format_DOLT_DEV {
golden1 = "v5hvf758gu5gkbe987gcju2de2sunkul"
golden2 = "c5j2ve21fdtr8vmqu548lc7546tmv6sl"
golden1 = "mr1ksgmbdqfpa5s4kbv6rqs25c8pklqs"
golden2 = "339b2ojpn2a14jpf0t7cq9f6v0tnt2a0"
}
s.NoError(err)

View File

@@ -55,8 +55,8 @@ func (s *nomsRootTestSuite) TestBasic() {
goldenHello := "u8g2r4qg97kkqn42lvao77st2mv3bpl0\n"
goldenGoodbye := "70b9adi6amrab3a5t4hcibdob0cq49m0\n"
if types.Format_Default == types.Format_DOLT_DEV {
goldenHello = "jeps3m1q780r3cv8tgfv7658ft9cbt24\n"
goldenGoodbye = "5cu08e3onbbna253menst2koc9sq5q0l\n"
goldenHello = "mmvr5g771a7eo56qpor2o0t5pvsolq75\n"
goldenGoodbye = "103spr29q69oq3b9a1ane7egt9uo4tkj\n"
}
ds, _ = datas.CommitValue(context.Background(), db, ds, types.String("hello!"))

View File

@@ -30,10 +30,12 @@ import (
flatbuffers "github.com/google/flatbuffers/go"
"github.com/dolthub/dolt/go/gen/fb/serial"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/d"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/nomdl"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/val"
)
const (
@@ -125,15 +127,15 @@ func newCommit(ctx context.Context, value types.Value, parentsList types.List, p
}
}
func NewCommitForValue(ctx context.Context, vrw types.ValueReadWriter, v types.Value, opts CommitOptions) (*Commit, error) {
func NewCommitForValue(ctx context.Context, cs chunks.ChunkStore, vrw types.ValueReadWriter, v types.Value, opts CommitOptions) (*Commit, error) {
if opts.Parents == nil || len(opts.Parents) == 0 {
return nil, errors.New("cannot create commit without parents")
}
return newCommitForValue(ctx, vrw, v, opts)
return newCommitForValue(ctx, cs, vrw, v, opts)
}
func commit_flatbuffer(vaddr hash.Hash, opts CommitOptions, heights []uint64) ([]byte, uint64) {
func commit_flatbuffer(vaddr hash.Hash, opts CommitOptions, heights []uint64, parentsClosureAddr hash.Hash) ([]byte, uint64) {
builder := flatbuffers.NewBuilder(1024)
vaddroff := builder.CreateByteVector(vaddr[:])
@@ -156,6 +158,8 @@ func commit_flatbuffer(vaddr hash.Hash, opts CommitOptions, heights []uint64) ([
}
}
pcaddroff := builder.CreateByteVector(parentsClosureAddr[:])
nameoff := builder.CreateString(opts.Meta.Name)
emailoff := builder.CreateString(opts.Meta.Email)
descoff := builder.CreateString(opts.Meta.Description)
@@ -163,6 +167,7 @@ func commit_flatbuffer(vaddr hash.Hash, opts CommitOptions, heights []uint64) ([
serial.CommitAddRoot(builder, vaddroff)
serial.CommitAddHeight(builder, maxheight+1)
serial.CommitAddParentAddrs(builder, parentaddrsoff)
serial.CommitAddParentClosure(builder, pcaddroff)
serial.CommitAddName(builder, nameoff)
serial.CommitAddEmail(builder, emailoff)
serial.CommitAddDescription(builder, descoff)
@@ -172,7 +177,13 @@ func commit_flatbuffer(vaddr hash.Hash, opts CommitOptions, heights []uint64) ([
return builder.FinishedBytes(), maxheight + 1
}
func newCommitForValue(ctx context.Context, vrw types.ValueReadWriter, v types.Value, opts CommitOptions) (*Commit, error) {
var commitKeyTupleDesc = val.NewTupleDescriptor(
val.Type{Enc: val.Uint64Enc, Nullable: false},
val.Type{Enc: val.AddressEnc, Nullable: false},
)
var commitValueTupleDesc = val.NewTupleDescriptor()
func newCommitForValue(ctx context.Context, cs chunks.ChunkStore, vrw types.ValueReadWriter, v types.Value, opts CommitOptions) (*Commit, error) {
if opts.Meta == nil {
opts.Meta = &CommitMeta{}
}
@@ -182,15 +193,21 @@ func newCommitForValue(ctx context.Context, vrw types.ValueReadWriter, v types.V
if err != nil {
return nil, err
}
parents := make([]*serial.Commit, len(opts.Parents))
heights := make([]uint64, len(opts.Parents))
parents, err := vrw.ReadManyValues(ctx, opts.Parents)
parentValues, err := vrw.ReadManyValues(ctx, opts.Parents)
if err != nil {
return nil, err
}
for i := range heights {
heights[i] = serial.GetRootAsCommit([]byte(parents[i].(types.SerialMessage)), 0).Height()
parents[i] = serial.GetRootAsCommit([]byte(parentValues[i].(types.SerialMessage)), 0)
heights[i] = parents[i].Height()
}
bs, height := commit_flatbuffer(r.TargetHash(), opts, heights)
parentClosureAddr, err := writeFbCommitParentClosure(ctx, cs, vrw, parents, opts.Parents)
if err != nil {
return nil, err
}
bs, height := commit_flatbuffer(r.TargetHash(), opts, heights, parentClosureAddr)
v := types.SerialMessage(bs)
addr, err := v.Hash(vrw.Format())
if err != nil {
@@ -224,7 +241,7 @@ func newCommitForValue(ctx context.Context, vrw types.ValueReadWriter, v types.V
return nil, err
}
parentsClosure, includeParentsClosure, err := getParentsClosure(ctx, vrw, parentsList)
parentsClosure, includeParentsClosure, err := writeTypesCommitParentClosure(ctx, vrw, parentsList)
if err != nil {
return nil, err
}
@@ -638,76 +655,6 @@ func getRefElementType(t *types.Type) *types.Type {
return t.Desc.(types.CompoundDesc).ElemTypes[0]
}
type parentsClosureIterator struct {
mi types.MapIterator
err error
curr types.Tuple
}
func (i *parentsClosureIterator) Err() error {
return i.err
}
func (i *parentsClosureIterator) Hash() hash.Hash {
if i.err != nil {
return hash.Hash{}
}
var h hash.Hash
field, err := i.curr.Get(1)
if err != nil {
i.err = err
return hash.Hash{}
}
ib, ok := field.(types.InlineBlob)
if !ok {
i.err = fmt.Errorf("second field of tuple key parents closure should have been InlineBlob")
return hash.Hash{}
}
copy(h[:], []byte(ib))
return h
}
func (i *parentsClosureIterator) Less(f *types.NomsBinFormat, other *parentsClosureIterator) bool {
if i.err != nil || other.err != nil {
return false
}
ret, err := i.curr.Less(f, other.curr)
if err != nil {
i.err = err
other.err = err
return false
}
return ret
}
func (i *parentsClosureIterator) Next(ctx context.Context) bool {
if i.err != nil {
return false
}
n, _, err := i.mi.Next(ctx)
if err != nil {
i.err = err
return false
}
if n == nil || types.IsNull(n) {
return false
}
t, ok := n.(types.Tuple)
if !ok {
i.err = fmt.Errorf("key value of parents closure map should have been Tuple")
return false
}
i.curr = t
return true
}
func commitToMapKeyTuple(f *types.NomsBinFormat, c *Commit) (types.Tuple, error) {
h := c.Addr()
ib := make([]byte, len(hash.Hash{}))
copy(ib, h[:])
return types.NewTuple(f, types.Uint(c.Height()), types.InlineBlob(ib))
}
func firstError(l, r error) error {
if l != nil {
return l
@@ -715,63 +662,6 @@ func firstError(l, r error) error {
return r
}
func newParentsClosureIterator(ctx context.Context, c *Commit, vr types.ValueReader) (*parentsClosureIterator, error) {
sv := c.NomsValue()
if _, ok := sv.(types.SerialMessage); ok {
// TODO: __DOLT_DEV__ should support parent closures.
return nil, nil
}
s, ok := sv.(types.Struct)
if !ok {
return nil, fmt.Errorf("target ref is not struct: %v", sv)
}
if s.Name() != commitName {
return nil, fmt.Errorf("target ref is not commit: %v", sv)
}
fv, ok, err := s.MaybeGet(parentsClosureField)
if err != nil {
return nil, err
}
if !ok {
return nil, nil
}
mr, ok := fv.(types.Ref)
if !ok {
return nil, fmt.Errorf("value of parents_closure field is not Ref: %v", fv)
}
mv, err := mr.TargetValue(ctx, vr)
if err != nil {
return nil, err
}
m, ok := mv.(types.Map)
if !ok {
return nil, fmt.Errorf("target value of parents_closure Ref is not Map: %v", mv)
}
maxKeyTuple, err := types.NewTuple(vr.Format(), types.Uint(18446744073709551615))
if err != nil {
return nil, err
}
mi, err := m.IteratorBackFrom(ctx, maxKeyTuple)
if err != nil {
return nil, err
}
initialCurr, err := commitToMapKeyTuple(vr.Format(), c)
if err != nil {
return nil, err
}
return &parentsClosureIterator{mi, nil, initialCurr}, nil
}
func IsCommitType(nbf *types.NomsBinFormat, t *types.Type) bool {
return types.IsSubtype(nbf, valueCommitType, t)
}

View File

@@ -0,0 +1,457 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datas
import (
"context"
"errors"
"fmt"
"io"
"github.com/dolthub/dolt/go/gen/fb/serial"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/pool"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/val"
)
func hackVRToCS(vr types.ValueReader) chunks.ChunkStore {
switch v := vr.(type) {
case Database:
return ChunkStoreFromDatabase(v)
case *types.ValueStore:
return v.ChunkStore()
}
panic("unknown ValueReader implementation...")
}
func newParentsClosureIterator(ctx context.Context, c *Commit, vr types.ValueReader) (parentsClosureIter, error) {
sv := c.NomsValue()
if _, ok := sv.(types.SerialMessage); ok {
msg := serial.GetRootAsCommit(sv.(types.SerialMessage), 0)
addr := hash.New(msg.ParentClosureBytes())
if addr.IsEmpty() {
return nil, nil
}
v, err := vr.ReadValue(ctx, addr)
if err != nil {
return nil, err
}
if types.IsNull(v) {
return nil, fmt.Errorf("internal error or data loss: dangling commit parent closure for addr %s or commit %s", addr.String(), c.Addr().String())
}
node := tree.NodeFromBytes(v.(types.TupleRowStorage))
ns := tree.NewNodeStore(hackVRToCS(vr))
m := prolly.NewMap(node, ns, commitKeyTupleDesc, commitValueTupleDesc)
mi, err := m.IterAllReverse(ctx)
if err != nil {
return nil, err
}
return &fbParentsClosureIterator{mi: mi, curr: commitToFbKeyTuple(c, ns.Pool()), err: nil}, nil
}
s, ok := sv.(types.Struct)
if !ok {
return nil, fmt.Errorf("target ref is not struct: %v", sv)
}
if s.Name() != commitName {
return nil, fmt.Errorf("target ref is not commit: %v", sv)
}
fv, ok, err := s.MaybeGet(parentsClosureField)
if err != nil {
return nil, err
}
if !ok {
return nil, nil
}
mr, ok := fv.(types.Ref)
if !ok {
return nil, fmt.Errorf("value of parents_closure field is not Ref: %v", fv)
}
mv, err := mr.TargetValue(ctx, vr)
if err != nil {
return nil, err
}
m, ok := mv.(types.Map)
if !ok {
return nil, fmt.Errorf("target value of parents_closure Ref is not Map: %v", mv)
}
maxKeyTuple, err := types.NewTuple(vr.Format(), types.Uint(18446744073709551615))
if err != nil {
return nil, err
}
mi, err := m.IteratorBackFrom(ctx, maxKeyTuple)
if err != nil {
return nil, err
}
initialCurr, err := commitToMapKeyTuple(vr.Format(), c)
if err != nil {
return nil, err
}
return &parentsClosureIterator{mi, nil, initialCurr}, nil
}
func commitToMapKeyTuple(f *types.NomsBinFormat, c *Commit) (types.Tuple, error) {
h := c.Addr()
ib := make([]byte, len(hash.Hash{}))
copy(ib, h[:])
return types.NewTuple(f, types.Uint(c.Height()), types.InlineBlob(ib))
}
func commitToFbKeyTuple(c *Commit, p pool.BuffPool) val.Tuple {
tb := val.NewTupleBuilder(commitKeyTupleDesc)
tb.PutUint64(0, c.Height())
h := c.Addr()
tb.PutAddress(1, h[:])
return tb.Build(p)
}
type parentsClosureIter interface {
Err() error
Hash() hash.Hash
Height() uint64
Less(*types.NomsBinFormat, parentsClosureIter) bool
Next(context.Context) bool
}
type parentsClosureIterator struct {
mi types.MapIterator
err error
curr types.Tuple
}
func (i *parentsClosureIterator) Err() error {
return i.err
}
func (i *parentsClosureIterator) Height() uint64 {
if i.err != nil {
return 0
}
field, err := i.curr.Get(0)
if err != nil {
i.err = err
return 0
}
return uint64(field.(types.Uint))
}
func (i *parentsClosureIterator) Hash() hash.Hash {
if i.err != nil {
return hash.Hash{}
}
var h hash.Hash
field, err := i.curr.Get(1)
if err != nil {
i.err = err
return hash.Hash{}
}
ib, ok := field.(types.InlineBlob)
if !ok {
i.err = fmt.Errorf("second field of tuple key parents closure should have been InlineBlob")
return hash.Hash{}
}
copy(h[:], []byte(ib))
return h
}
func (i *parentsClosureIterator) Less(f *types.NomsBinFormat, otherI parentsClosureIter) bool {
other := otherI.(*parentsClosureIterator)
if i.err != nil || other.err != nil {
return false
}
ret, err := i.curr.Less(f, other.curr)
if err != nil {
i.err = err
other.err = err
return false
}
return ret
}
func (i *parentsClosureIterator) Next(ctx context.Context) bool {
if i.err != nil {
return false
}
n, _, err := i.mi.Next(ctx)
if err != nil {
i.err = err
return false
}
if n == nil || types.IsNull(n) {
return false
}
t, ok := n.(types.Tuple)
if !ok {
i.err = fmt.Errorf("key value of parents closure map should have been Tuple")
return false
}
i.curr = t
return true
}
type fbParentsClosureIterator struct {
mi prolly.MapIter
curr val.Tuple
err error
}
func (i *fbParentsClosureIterator) Err() error {
return i.err
}
func (i *fbParentsClosureIterator) Height() uint64 {
if i.err != nil {
return 0
}
h, _ := commitKeyTupleDesc.GetUint64(0, i.curr)
return h
}
func (i *fbParentsClosureIterator) Hash() hash.Hash {
if i.err != nil {
return hash.Hash{}
}
bs, _ := commitKeyTupleDesc.GetAddress(1, i.curr)
return hash.New(bs)
}
func (i *fbParentsClosureIterator) Next(ctx context.Context) bool {
if i.err != nil {
return false
}
i.curr, _, i.err = i.mi.Next(ctx)
if i.err == io.EOF {
i.err = nil
return false
}
return true
}
func (i *fbParentsClosureIterator) Less(f *types.NomsBinFormat, otherI parentsClosureIter) bool {
other := otherI.(*fbParentsClosureIterator)
return commitKeyTupleDesc.Comparator().Compare(i.curr, other.curr, commitKeyTupleDesc) == -1
}
func writeTypesCommitParentClosure(ctx context.Context, vrw types.ValueReadWriter, parentRefsL types.List) (types.Ref, bool, error) {
parentRefs := make([]types.Ref, int(parentRefsL.Len()))
parents := make([]types.Struct, len(parentRefs))
if len(parents) == 0 {
return types.Ref{}, false, nil
}
err := parentRefsL.IterAll(ctx, func(v types.Value, i uint64) error {
r, ok := v.(types.Ref)
if !ok {
return errors.New("parentsRef element was not a Ref")
}
parentRefs[int(i)] = r
tv, err := r.TargetValue(ctx, vrw)
if err != nil {
return err
}
s, ok := tv.(types.Struct)
if !ok {
return errors.New("parentRef target value was not a Struct")
}
parents[int(i)] = s
return nil
})
if err != nil {
return types.Ref{}, false, err
}
parentMaps := make([]types.Map, len(parents))
parentParentLists := make([]types.List, len(parents))
for i, p := range parents {
v, ok, err := p.MaybeGet(parentsClosureField)
if err != nil {
return types.Ref{}, false, err
}
if !ok || types.IsNull(v) {
empty, err := types.NewMap(ctx, vrw)
if err != nil {
return types.Ref{}, false, err
}
parentMaps[i] = empty
} else {
r, ok := v.(types.Ref)
if !ok {
return types.Ref{}, false, errors.New("unexpected field value type for parents_closure in commit struct")
}
tv, err := r.TargetValue(ctx, vrw)
if err != nil {
return types.Ref{}, false, err
}
parentMaps[i], ok = tv.(types.Map)
if !ok {
return types.Ref{}, false, fmt.Errorf("unexpected target value type for parents_closure in commit struct: %v", tv)
}
}
v, ok, err = p.MaybeGet(parentsListField)
if err != nil {
return types.Ref{}, false, err
}
if !ok || types.IsNull(v) {
empty, err := types.NewList(ctx, vrw)
if err != nil {
return types.Ref{}, false, err
}
parentParentLists[i] = empty
} else {
parentParentLists[i], ok = v.(types.List)
if !ok {
return types.Ref{}, false, errors.New("unexpected field value or type for parents_list in commit struct")
}
}
if parentMaps[i].Len() == 0 && parentParentLists[i].Len() != 0 {
// If one of the commits has an empty parents_closure, but non-empty parents, we will not record
// a parents_closure here.
return types.Ref{}, false, nil
}
}
// Convert parent lists to List<Ref<Value>>
for i, l := range parentParentLists {
newRefs := make([]types.Value, int(l.Len()))
err := l.IterAll(ctx, func(v types.Value, i uint64) error {
r, ok := v.(types.Ref)
if !ok {
return errors.New("unexpected entry type for parents_list in commit struct")
}
newRefs[int(i)], err = types.ToRefOfValue(r, vrw.Format())
if err != nil {
return err
}
return nil
})
if err != nil {
return types.Ref{}, false, err
}
parentParentLists[i], err = types.NewList(ctx, vrw, newRefs...)
if err != nil {
return types.Ref{}, false, err
}
}
editor := parentMaps[0].Edit()
for i, r := range parentRefs {
h := r.TargetHash()
key, err := types.NewTuple(vrw.Format(), types.Uint(r.Height()), types.InlineBlob(h[:]))
if err != nil {
editor.Close(ctx)
return types.Ref{}, false, err
}
editor.Set(key, parentParentLists[i])
}
for i := 1; i < len(parentMaps); i++ {
changes := make(chan types.ValueChanged)
var derr error
go func() {
defer close(changes)
derr = parentMaps[1].Diff(ctx, parentMaps[0], changes)
}()
for c := range changes {
if c.ChangeType == types.DiffChangeAdded {
editor.Set(c.Key, c.NewValue)
}
}
if derr != nil {
editor.Close(ctx)
return types.Ref{}, false, derr
}
}
m, err := editor.Map(ctx)
if err != nil {
return types.Ref{}, false, err
}
r, err := vrw.WriteValue(ctx, m)
if err != nil {
return types.Ref{}, false, err
}
r, err = types.ToRefOfValue(r, vrw.Format())
if err != nil {
return types.Ref{}, false, err
}
return r, true, nil
}
func writeFbCommitParentClosure(ctx context.Context, cs chunks.ChunkStore, vrw types.ValueReadWriter, parents []*serial.Commit, parentAddrs []hash.Hash) (hash.Hash, error) {
if len(parents) == 0 {
// We write an empty hash for parent-less commits of height 1.
return hash.Hash{}, nil
}
// Fetch the parent closures of our parents.
addrs := make([]hash.Hash, len(parents))
for i := range parents {
addrs[i] = hash.New(parents[i].ParentClosureBytes())
}
vs, err := vrw.ReadManyValues(ctx, addrs)
if err != nil {
return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: ReadManyValues: %w", err)
}
// Load them as ProllyTrees.
ns := tree.NewNodeStore(cs)
maps := make([]prolly.Map, len(parents))
for i := range addrs {
if !types.IsNull(vs[i]) {
node := tree.NodeFromBytes(vs[i].(types.TupleRowStorage))
maps[i] = prolly.NewMap(node, ns, commitKeyTupleDesc, commitValueTupleDesc)
} else {
maps[i], err = prolly.NewMapFromTuples(ctx, ns, commitKeyTupleDesc, commitValueTupleDesc)
if err != nil {
return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: NewMapFromTuples: %w", err)
}
}
}
// Add all the missing entries from [1, ...) maps to the 0th map.
editor := maps[0].Mutate()
for i := 1; i < len(maps); i++ {
err = prolly.DiffMaps(ctx, maps[0], maps[i], func(ctx context.Context, diff tree.Diff) error {
if diff.Type == tree.AddedDiff {
return editor.Put(ctx, val.Tuple(diff.Key), val.EmptyTuple)
}
return nil
})
if err != nil && !errors.Is(err, io.EOF) {
return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: DiffMaps: %w", err)
}
}
// Add the parents themselves to the new map.
tb := val.NewTupleBuilder(commitKeyTupleDesc)
for i := 0; i < len(parents); i++ {
tb.PutUint64(0, parents[i].Height())
tb.PutAddress(1, parentAddrs[i][:])
err = editor.Put(ctx, tb.Build(ns.Pool()), val.EmptyTuple)
if err != nil {
return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: MutableMap.Put: %w", err)
}
tb.Recycle()
}
// This puts the map in the NodeStore as well.
res, err := editor.Map(ctx)
if err != nil {
return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: MutableMap.Map: %w", err)
}
return res.HashOf(), nil
}

View File

@@ -129,7 +129,7 @@ func TestNewCommit(t *testing.T) {
defer db.Close()
parents := mustList(types.NewList(context.Background(), db))
parentsClosure := mustParentsClosure(t, false)(getParentsClosure(context.Background(), db, parents))
parentsClosure := mustParentsClosure(t, false)(writeTypesCommitParentClosure(context.Background(), db, parents))
commit, err := newCommit(context.Background(), types.Float(1), parents, parentsClosure, false, types.EmptyStruct(db.Format()))
assert.NoError(err)
at, err := types.TypeOf(commit)
@@ -150,7 +150,7 @@ func TestNewCommit(t *testing.T) {
// Committing another Float
parents = mustList(types.NewList(context.Background(), db, mustRef(types.NewRef(commit, db.Format()))))
parentsClosure = mustParentsClosure(t, true)(getParentsClosure(context.Background(), db, parents))
parentsClosure = mustParentsClosure(t, true)(writeTypesCommitParentClosure(context.Background(), db, parents))
commit2, err := newCommit(context.Background(), types.Float(2), parents, parentsClosure, true, types.EmptyStruct(db.Format()))
assert.NoError(err)
at2, err := types.TypeOf(commit2)
@@ -169,7 +169,7 @@ func TestNewCommit(t *testing.T) {
// Now commit a String
parents = mustList(types.NewList(context.Background(), db, mustRef(types.NewRef(commit2, db.Format()))))
parentsClosure = mustParentsClosure(t, true)(getParentsClosure(context.Background(), db, parents))
parentsClosure = mustParentsClosure(t, true)(writeTypesCommitParentClosure(context.Background(), db, parents))
commit3, err := newCommit(context.Background(), types.String("Hi"), parents, parentsClosure, true, types.EmptyStruct(db.Format()))
assert.NoError(err)
at3, err := types.TypeOf(commit3)
@@ -195,7 +195,7 @@ func TestNewCommit(t *testing.T) {
}`)
assertTypeEquals(metaType, mustType(types.TypeOf(meta)))
parents = mustList(types.NewList(context.Background(), db, mustRef(types.NewRef(commit2, db.Format()))))
parentsClosure = mustParentsClosure(t, true)(getParentsClosure(context.Background(), db, parents))
parentsClosure = mustParentsClosure(t, true)(writeTypesCommitParentClosure(context.Background(), db, parents))
commit4, err := newCommit(context.Background(), types.String("Hi"), parents, parentsClosure, true, meta)
assert.NoError(err)
at4, err := types.TypeOf(commit4)
@@ -219,7 +219,7 @@ func TestNewCommit(t *testing.T) {
parents = mustList(types.NewList(context.Background(), db,
mustRef(types.NewRef(commit2, db.Format())),
mustRef(types.NewRef(commit3, db.Format()))))
parentsClosure = mustParentsClosure(t, true)(getParentsClosure(context.Background(), db, parents))
parentsClosure = mustParentsClosure(t, true)(writeTypesCommitParentClosure(context.Background(), db, parents))
commit5, err := newCommit(
context.Background(),
types.String("Hi"),
@@ -303,7 +303,7 @@ func commonAncWithLazyClosure(ctx context.Context, c1, c2 *Commit, vr1, vr2 type
}
// Assert that c is the common ancestor of a and b, using multiple common ancestor methods.
func assertCommonAncestor(t *testing.T, expected, a, b types.Value, ldb, rdb *database) {
func assertCommonAncestor(t *testing.T, expected, a, b types.Value, ldb, rdb *database, desc string) {
type caFinder func(ctx context.Context, c1, c2 *Commit, vr1, vr2 types.ValueReader) (a hash.Hash, ok bool, err error)
methods := map[string]caFinder{
@@ -321,7 +321,7 @@ func assertCommonAncestor(t *testing.T, expected, a, b types.Value, ldb, rdb *da
require.NoError(t, err)
for name, method := range methods {
t.Run(fmt.Sprintf("%s/%s_%s", name, aref.TargetHash().String(), bref.TargetHash().String()), func(t *testing.T) {
t.Run(fmt.Sprintf("%s/%s", name, desc), func(t *testing.T) {
assert := assert.New(t)
ctx := context.Background()
found, ok, err := method(ctx, ac, bc, ldb, rdb)
@@ -412,6 +412,7 @@ func TestCommitParentsClosure(t *testing.T) {
storage := &chunks.TestStorage{}
db := NewDatabase(storage.NewViewWithDefaultFormat()).(*database)
ctx := context.Background()
type expected struct {
height int
@@ -419,88 +420,46 @@ func TestCommitParentsClosure(t *testing.T) {
}
assertCommitParentsClosure := func(v types.Value, es []expected) {
if _, ok := v.(types.SerialMessage); ok {
t.Skip("__DOLT_DEV__ does not implement ParentsClosure yet.")
}
s, ok := v.(types.Struct)
if !assert.True(ok) {
c, err := commitPtr(db.Format(), v, nil)
if !assert.NoError(err) {
return
}
v, ok, err := s.MaybeGet(parentsClosureField)
iter, err := newParentsClosureIterator(ctx, c, db)
if !assert.NoError(err) {
return
}
if len(es) == 0 {
assert.False(ok, "must not find parents_closure field when its length is 0")
assert.Nil(iter)
return
}
if !assert.True(ok, "must find parents_closure field in commit.") {
return
for _, e := range es {
if !assert.True(iter.Next(ctx)) {
return
}
if !assert.Equal(e.hash, iter.Hash()) {
return
}
if !assert.Equal(uint64(e.height), iter.Height()) {
return
}
}
r, ok := v.(types.Ref)
if !assert.True(ok, "parents_closure field must contain a ref value.") {
return
}
tv, err := r.TargetValue(context.Background(), db)
if !assert.NoError(err, "getting target value of parents_closure field must not error") {
return
}
m, ok := tv.(types.Map)
if !assert.True(ok, "parents_closure ref target value must contain a map value.") {
return
}
if !assert.Equal(len(es), int(m.Len()), "expected length %v and got %v", len(es), m.Len()) {
return
}
i := 0
err = m.IterAll(context.Background(), func(k, v types.Value) error {
j := i
i++
kt, ok := k.(types.Tuple)
if !assert.True(ok, "key type must be Tuple") {
return nil
}
if !assert.Equal(2, int(kt.Len()), "key must have length 2") {
return nil
}
hv, err := kt.Get(0)
if !assert.NoError(err) {
return nil
}
h, ok := hv.(types.Uint)
if !assert.True(ok, "key first field must be Uint") {
return nil
}
if !assert.Equal(es[j].height, int(uint64(h))) {
return nil
}
hv, err = kt.Get(1)
if !assert.NoError(err) {
return nil
}
b, ok := hv.(types.InlineBlob)
if !assert.True(ok, "key second field must be InlineBlob") {
return nil
}
var fh hash.Hash
copy(fh[:], []byte(b))
if !assert.Equal(es[j].hash, fh, "hash for idx %d did not match", j) {
return nil
}
assertClosureMapValue(t, db, v, fh)
return nil
})
assert.NoError(err)
assert.False(iter.Next(ctx))
assert.NoError(iter.Err())
}
// TODO: These tests rely on the hash values of the commits
// to assert the order of commits that are at the same height in the
// parent closure map. The values have been tweaked to currently pass
// with LD_1 and DOLT_DEV.
a, b, c, d := "ds-a", "ds-b", "ds-c", "ds-d"
a1, a1a := addCommit(t, db, a, "a1")
a2, a2a := addCommit(t, db, a, "a2", a1)
a3, a3a := addCommit(t, db, a, "a3", a2)
a3, a3a := addCommit(t, db, a, "a3 ", a2)
b1, b1a := addCommit(t, db, b, "b1", a1)
b2, b2a := addCommit(t, db, b, "b2", b1)
b3, b3a := addCommit(t, db, b, "b3", b2)
b2, b2a := addCommit(t, db, b, "b2 ", b1)
b3, b3a := addCommit(t, db, b, "b3 ", b2)
c1, c1a := addCommit(t, db, c, "c1", a3, b3)
@@ -511,40 +470,40 @@ func TestCommitParentsClosure(t *testing.T) {
{1, a1a},
})
assertCommitParentsClosure(a3, []expected{
{1, a1a},
{2, a2a},
{1, a1a},
})
assertCommitParentsClosure(b1, []expected{
{1, a1a},
})
assertCommitParentsClosure(b2, []expected{
{1, a1a},
{2, b1a},
{1, a1a},
})
assertCommitParentsClosure(b3, []expected{
{1, a1a},
{2, b1a},
{3, b2a},
{2, b1a},
{1, a1a},
})
assertCommitParentsClosure(c1, []expected{
{1, a1a},
{2, a2a},
{2, b1a},
{3, a3a},
{3, b2a},
{4, b3a},
{3, b2a},
{3, a3a},
{2, b1a},
{2, a2a},
{1, a1a},
})
assertCommitParentsClosure(d1, []expected{
{1, a1a},
{2, a2a},
{2, b1a},
{3, a3a},
{3, b2a},
{4, b3a},
{5, c1a},
{4, b3a},
{3, b2a},
{3, a3a},
{2, b1a},
{2, a2a},
{1, a1a},
})
}
@@ -584,11 +543,11 @@ func TestFindCommonAncestor(t *testing.T) {
b5, _ := addCommit(t, db, b, "b5", b4, a3)
a6, _ := addCommit(t, db, a, "a6", a5, b5)
assertCommonAncestor(t, a1, a1, a1, db, db) // All self
assertCommonAncestor(t, a1, a1, a2, db, db) // One side self
assertCommonAncestor(t, a2, a3, b3, db, db) // Common parent
assertCommonAncestor(t, a2, a4, b4, db, db) // Common grandparent
assertCommonAncestor(t, a1, a6, c3, db, db) // Traversing multiple parents on both sides
assertCommonAncestor(t, a1, a1, a1, db, db, "all self")
assertCommonAncestor(t, a1, a1, a2, db, db, "one side self")
assertCommonAncestor(t, a2, a3, b3, db, db, "common parent")
assertCommonAncestor(t, a2, a4, b4, db, db, "common grandeparent")
assertCommonAncestor(t, a1, a6, c3, db, db, "traversing multiple parents on both sides")
// No common ancestor
ctx := context.Background()
@@ -616,69 +575,71 @@ func TestFindCommonAncestor(t *testing.T) {
assert.NoError(db.Close())
storage = &chunks.TestStorage{}
db = NewDatabase(storage.NewViewWithDefaultFormat()).(*database)
defer db.Close()
t.Run("DifferentVRWs", func(t *testing.T) {
storage = &chunks.TestStorage{}
db = NewDatabase(storage.NewViewWithDefaultFormat()).(*database)
defer db.Close()
rstorage := &chunks.TestStorage{}
rdb := NewDatabase(rstorage.NewViewWithDefaultFormat()).(*database)
defer rdb.Close()
rstorage := &chunks.TestStorage{}
rdb := NewDatabase(rstorage.NewViewWithDefaultFormat()).(*database)
defer rdb.Close()
// Rerun the tests when using two difference Databases for left and
// right commits. Both databases have all the previous commits.
a, b, c, d = "ds-a", "ds-b", "ds-c", "ds-d"
a1, _ = addCommit(t, db, a, "a1")
d1, _ = addCommit(t, db, d, "d1")
a2, _ = addCommit(t, db, a, "a2", a1)
c2, _ = addCommit(t, db, c, "c2", a1)
d2, _ = addCommit(t, db, d, "d2", d1)
a3, _ = addCommit(t, db, a, "a3", a2)
b3, _ = addCommit(t, db, b, "b3", a2)
c3, _ = addCommit(t, db, c, "c3", c2, d2)
a4, _ = addCommit(t, db, a, "a4", a3)
b4, _ = addCommit(t, db, b, "b4", b3)
a5, _ = addCommit(t, db, a, "a5", a4)
b5, _ = addCommit(t, db, b, "b5", b4, a3)
a6, _ = addCommit(t, db, a, "a6", a5, b5)
// Rerun the tests when using two difference Databases for left and
// right commits. Both databases have all the previous commits.
a, b, c, d = "ds-a", "ds-b", "ds-c", "ds-d"
a1, _ = addCommit(t, db, a, "a1")
d1, _ = addCommit(t, db, d, "d1")
a2, _ = addCommit(t, db, a, "a2", a1)
c2, _ = addCommit(t, db, c, "c2", a1)
d2, _ = addCommit(t, db, d, "d2", d1)
a3, _ = addCommit(t, db, a, "a3", a2)
b3, _ = addCommit(t, db, b, "b3", a2)
c3, _ = addCommit(t, db, c, "c3", c2, d2)
a4, _ = addCommit(t, db, a, "a4", a3)
b4, _ = addCommit(t, db, b, "b4", b3)
a5, _ = addCommit(t, db, a, "a5", a4)
b5, _ = addCommit(t, db, b, "b5", b4, a3)
a6, _ = addCommit(t, db, a, "a6", a5, b5)
addCommit(t, rdb, a, "a1")
addCommit(t, rdb, d, "d1")
addCommit(t, rdb, a, "a2", a1)
addCommit(t, rdb, c, "c2", a1)
addCommit(t, rdb, d, "d2", d1)
addCommit(t, rdb, a, "a3", a2)
addCommit(t, rdb, b, "b3", a2)
addCommit(t, rdb, c, "c3", c2, d2)
addCommit(t, rdb, a, "a4", a3)
addCommit(t, rdb, b, "b4", b3)
addCommit(t, rdb, a, "a5", a4)
addCommit(t, rdb, b, "b5", b4, a3)
addCommit(t, rdb, a, "a6", a5, b5)
addCommit(t, rdb, a, "a1")
addCommit(t, rdb, d, "d1")
addCommit(t, rdb, a, "a2", a1)
addCommit(t, rdb, c, "c2", a1)
addCommit(t, rdb, d, "d2", d1)
addCommit(t, rdb, a, "a3", a2)
addCommit(t, rdb, b, "b3", a2)
addCommit(t, rdb, c, "c3", c2, d2)
addCommit(t, rdb, a, "a4", a3)
addCommit(t, rdb, b, "b4", b3)
addCommit(t, rdb, a, "a5", a4)
addCommit(t, rdb, b, "b5", b4, a3)
addCommit(t, rdb, a, "a6", a5, b5)
// Additionally, |db| has a6<-a7<-a8<-a9.
// |rdb| has a6<-ra7<-ra8<-ra9.
a7, _ := addCommit(t, db, a, "a7", a6)
a8, _ := addCommit(t, db, a, "a8", a7)
a9, _ := addCommit(t, db, a, "a9", a8)
// Additionally, |db| has a6<-a7<-a8<-a9.
// |rdb| has a6<-ra7<-ra8<-ra9.
a7, _ := addCommit(t, db, a, "a7", a6)
a8, _ := addCommit(t, db, a, "a8", a7)
a9, _ := addCommit(t, db, a, "a9", a8)
ra7, _ := addCommit(t, rdb, a, "ra7", a6)
ra8, _ := addCommit(t, rdb, a, "ra8", ra7)
ra9, _ := addCommit(t, rdb, a, "ra9", ra8)
ra7, _ := addCommit(t, rdb, a, "ra7", a6)
ra8, _ := addCommit(t, rdb, a, "ra8", ra7)
ra9, _ := addCommit(t, rdb, a, "ra9", ra8)
assertCommonAncestor(t, a1, a1, a1, db, rdb) // All self
assertCommonAncestor(t, a1, a1, a2, db, rdb) // One side self
assertCommonAncestor(t, a2, a3, b3, db, rdb) // Common parent
assertCommonAncestor(t, a2, a4, b4, db, rdb) // Common grandparent
assertCommonAncestor(t, a1, a6, c3, db, rdb) // Traversing multiple parents on both sides
assertCommonAncestor(t, a1, a1, a1, db, rdb, "all self")
assertCommonAncestor(t, a1, a1, a2, db, rdb, "one side self")
assertCommonAncestor(t, a2, a3, b3, db, rdb, "common parent")
assertCommonAncestor(t, a2, a4, b4, db, rdb, "common grandeparent")
assertCommonAncestor(t, a1, a6, c3, db, rdb, "traversing multiple parents on both sides")
assertCommonAncestor(t, a6, a9, ra9, db, rdb) // Common third parent
assertCommonAncestor(t, a6, a9, ra9, db, rdb, "common third parent")
a9c, err := commitFromValue(db.Format(), a9)
require.NoError(t, err)
ra9c, err := commitFromValue(rdb.Format(), ra9)
require.NoError(t, err)
_, _, err = FindCommonAncestor(context.Background(), ra9c, a9c, db, rdb)
assert.Error(err)
a9c, err := commitFromValue(db.Format(), a9)
require.NoError(t, err)
ra9c, err := commitFromValue(rdb.Format(), ra9)
require.NoError(t, err)
_, _, err = FindCommonAncestor(context.Background(), ra9c, a9c, db, rdb)
assert.Error(err)
})
}
func TestNewCommitRegressionTest(t *testing.T) {
@@ -687,7 +648,7 @@ func TestNewCommitRegressionTest(t *testing.T) {
defer db.Close()
parents := mustList(types.NewList(context.Background(), db))
parentsClosure := mustParentsClosure(t, false)(getParentsClosure(context.Background(), db, parents))
parentsClosure := mustParentsClosure(t, false)(writeTypesCommitParentClosure(context.Background(), db, parents))
c1, err := newCommit(context.Background(), types.String("one"), parents, parentsClosure, false, types.EmptyStruct(db.Format()))
assert.NoError(t, err)
cx, err := newCommit(context.Background(), types.Bool(true), parents, parentsClosure, false, types.EmptyStruct(db.Format()))
@@ -699,7 +660,7 @@ func TestNewCommitRegressionTest(t *testing.T) {
value := types.String("two")
parents, err = types.NewList(context.Background(), db, mustRef(types.NewRef(c1, db.Format())))
assert.NoError(t, err)
parentsClosure = mustParentsClosure(t, true)(getParentsClosure(context.Background(), db, parents))
parentsClosure = mustParentsClosure(t, true)(writeTypesCommitParentClosure(context.Background(), db, parents))
meta, err := types.NewStruct(db.Format(), "", types.StructData{
"basis": cx,
})

View File

@@ -102,145 +102,6 @@ func (db *database) loadDatasetsRefmap(ctx context.Context, rootHash hash.Hash)
return parse_storeroot([]byte(val.(types.SerialMessage)), db.chunkStore()), nil
}
func getParentsClosure(ctx context.Context, vrw types.ValueReadWriter, parentRefsL types.List) (types.Ref, bool, error) {
parentRefs := make([]types.Ref, int(parentRefsL.Len()))
parents := make([]types.Struct, len(parentRefs))
if len(parents) == 0 {
return types.Ref{}, false, nil
}
err := parentRefsL.IterAll(ctx, func(v types.Value, i uint64) error {
r, ok := v.(types.Ref)
if !ok {
return errors.New("parentsRef element was not a Ref")
}
parentRefs[int(i)] = r
tv, err := r.TargetValue(ctx, vrw)
if err != nil {
return err
}
s, ok := tv.(types.Struct)
if !ok {
return errors.New("parentRef target value was not a Struct")
}
parents[int(i)] = s
return nil
})
if err != nil {
return types.Ref{}, false, err
}
parentMaps := make([]types.Map, len(parents))
parentParentLists := make([]types.List, len(parents))
for i, p := range parents {
v, ok, err := p.MaybeGet(parentsClosureField)
if err != nil {
return types.Ref{}, false, err
}
if !ok || types.IsNull(v) {
empty, err := types.NewMap(ctx, vrw)
if err != nil {
return types.Ref{}, false, err
}
parentMaps[i] = empty
} else {
r, ok := v.(types.Ref)
if !ok {
return types.Ref{}, false, errors.New("unexpected field value type for parents_closure in commit struct")
}
tv, err := r.TargetValue(ctx, vrw)
if err != nil {
return types.Ref{}, false, err
}
parentMaps[i], ok = tv.(types.Map)
if !ok {
return types.Ref{}, false, fmt.Errorf("unexpected target value type for parents_closure in commit struct: %v", tv)
}
}
v, ok, err = p.MaybeGet(parentsListField)
if err != nil {
return types.Ref{}, false, err
}
if !ok || types.IsNull(v) {
empty, err := types.NewList(ctx, vrw)
if err != nil {
return types.Ref{}, false, err
}
parentParentLists[i] = empty
} else {
parentParentLists[i], ok = v.(types.List)
if !ok {
return types.Ref{}, false, errors.New("unexpected field value or type for parents_list in commit struct")
}
}
if parentMaps[i].Len() == 0 && parentParentLists[i].Len() != 0 {
// If one of the commits has an empty parents_closure, but non-empty parents, we will not record
// a parents_closure here.
return types.Ref{}, false, nil
}
}
// Convert parent lists to List<Ref<Value>>
for i, l := range parentParentLists {
newRefs := make([]types.Value, int(l.Len()))
err := l.IterAll(ctx, func(v types.Value, i uint64) error {
r, ok := v.(types.Ref)
if !ok {
return errors.New("unexpected entry type for parents_list in commit struct")
}
newRefs[int(i)], err = types.ToRefOfValue(r, vrw.Format())
if err != nil {
return err
}
return nil
})
if err != nil {
return types.Ref{}, false, err
}
parentParentLists[i], err = types.NewList(ctx, vrw, newRefs...)
if err != nil {
return types.Ref{}, false, err
}
}
editor := parentMaps[0].Edit()
for i, r := range parentRefs {
h := r.TargetHash()
key, err := types.NewTuple(vrw.Format(), types.Uint(r.Height()), types.InlineBlob(h[:]))
if err != nil {
editor.Close(ctx)
return types.Ref{}, false, err
}
editor.Set(key, parentParentLists[i])
}
for i := 1; i < len(parentMaps); i++ {
changes := make(chan types.ValueChanged)
var derr error
go func() {
defer close(changes)
derr = parentMaps[1].Diff(ctx, parentMaps[0], changes)
}()
for c := range changes {
if c.ChangeType == types.DiffChangeAdded {
editor.Set(c.Key, c.NewValue)
}
}
if derr != nil {
editor.Close(ctx)
return types.Ref{}, false, derr
}
}
m, err := editor.Map(ctx)
if err != nil {
return types.Ref{}, false, err
}
r, err := vrw.WriteValue(ctx, m)
if err != nil {
return types.Ref{}, false, err
}
r, err = types.ToRefOfValue(r, vrw.Format())
if err != nil {
return types.Ref{}, false, err
}
return r, true, nil
}
type refmapDatasetsMap struct {
am prolly.AddressMap
}
@@ -1004,7 +865,7 @@ func buildNewCommit(ctx context.Context, ds Dataset, v types.Value, opts CommitO
}
}
return newCommitForValue(ctx, ds.db, v, opts)
return newCommitForValue(ctx, ds.db.chunkStore(), ds.db, v, opts)
}
func (db *database) doHeadUpdate(ctx context.Context, ds Dataset, updateFunc func(ds Dataset) error) (Dataset, error) {

View File

@@ -44,7 +44,7 @@ func TestNewTag(t *testing.T) {
}
parents := mustList(types.NewList(ctx, db))
parentsClosure := mustParentsClosure(t, false)(getParentsClosure(ctx, db, parents))
parentsClosure := mustParentsClosure(t, false)(writeTypesCommitParentClosure(ctx, db, parents))
commit, err := newCommit(ctx, types.Float(1), parents, parentsClosure, false, types.EmptyStruct(db.Format()))
require.NoError(t, err)

View File

@@ -147,11 +147,16 @@ func (m Map) Last(ctx context.Context) (key, value val.Tuple, err error) {
return m.tuples.last(ctx)
}
// IterAll returns a mutableMapIter that iterates over the entire Map.
// IterAll returns a MapIter that iterates over the entire Map.
func (m Map) IterAll(ctx context.Context) (MapIter, error) {
return m.tuples.iterAll(ctx)
}
// IterAllReverse returns a MapIter that iterates over the entire Map from the end to the beginning.
func (m Map) IterAllReverse(ctx context.Context) (MapIter, error) {
return m.tuples.iterAllReverse(ctx)
}
// IterOrdinalRange returns a MapIter for the ordinal range beginning at |start| and ending before |stop|.
func (m Map) IterOrdinalRange(ctx context.Context, start, stop uint64) (MapIter, error) {
return m.tuples.iterOrdinalRange(ctx, start, stop)
@@ -232,11 +237,15 @@ func treeIterFromRange(
return nil, err
}
if start.Compare(stop) >= 0 {
stopF := func(curr *tree.Cursor) bool {
return curr.Compare(stop) >= 0
}
if stopF(start) {
start = nil // empty range
}
return &orderedTreeIter[val.Tuple, val.Tuple]{curr: start, stop: stop}, nil
return &orderedTreeIter[val.Tuple, val.Tuple]{curr: start, stop: stopF, step: start.Advance}, nil
}
type pointLookup struct {

View File

@@ -170,11 +170,43 @@ func (t orderedTree[K, V, O]) iterAll(ctx context.Context) (*orderedTreeIter[K,
return nil, err
}
if c.Compare(s) >= 0 {
c = nil // empty range
stop := func(curr *tree.Cursor) bool {
return curr.Compare(s) >= 0
}
return &orderedTreeIter[K, V]{curr: c, stop: s}, nil
if stop(c) {
// empty range
return &orderedTreeIter[K, V]{curr: nil}, nil
}
return &orderedTreeIter[K, V]{curr: c, stop: stop, step: c.Advance}, nil
}
func (t orderedTree[K, V, O]) iterAllReverse(ctx context.Context) (*orderedTreeIter[K, V], error) {
beginning, err := tree.NewCursorAtStart(ctx, t.ns, t.root)
if err != nil {
return nil, err
}
err = beginning.Retreat(ctx)
if err != nil {
return nil, err
}
end, err := tree.NewCursorAtEnd(ctx, t.ns, t.root)
if err != nil {
return nil, err
}
stop := func(curr *tree.Cursor) bool {
return curr.Compare(beginning) <= 0
}
if stop(end) {
// empty range
return &orderedTreeIter[K, V]{curr: nil}, nil
}
return &orderedTreeIter[K, V]{curr: end, stop: stop, step: end.Retreat}, nil
}
func (t orderedTree[K, V, O]) iterOrdinalRange(ctx context.Context, start, stop uint64) (*orderedTreeIter[K, V], error) {
@@ -197,7 +229,11 @@ func (t orderedTree[K, V, O]) iterOrdinalRange(ctx context.Context, start, stop
return nil, err
}
return &orderedTreeIter[K, V]{curr: lo, stop: hi}, nil
stopF := func(curr *tree.Cursor) bool {
return curr.Compare(hi) >= 0
}
return &orderedTreeIter[K, V]{curr: lo, stop: stopF, step: lo.Advance}, nil
}
// searchNode returns the smallest index where nd[i] >= query
@@ -232,8 +268,11 @@ var _ tree.CompareFn = orderedTree[tree.Item, tree.Item, ordering[tree.Item]]{}.
type orderedTreeIter[K, V ~[]byte] struct {
// current tuple location
curr *tree.Cursor
// non-inclusive range stop
stop *tree.Cursor
// the function called to moved |curr| forward in the direction of iteration.
step func(context.Context) error
// should return |true| if the passed in cursor is past the iteration's stopping point.
stop func(*tree.Cursor) bool
}
func (it *orderedTreeIter[K, V]) Next(ctx context.Context) (key K, value V, err error) {
@@ -244,11 +283,11 @@ func (it *orderedTreeIter[K, V]) Next(ctx context.Context) (key K, value V, err
k, v := tree.CurrentCursorItems(it.curr)
key, value = K(k), V(v)
err = it.curr.Advance(ctx)
err = it.step(ctx)
if err != nil {
return nil, nil, err
}
if it.curr.Compare(it.stop) >= 0 {
if it.stop(it.curr) {
// past the end of the range
it.curr = nil
}
@@ -266,12 +305,12 @@ func (it *orderedTreeIter[K, V]) current() (key K, value V) {
}
func (it *orderedTreeIter[K, V]) iterate(ctx context.Context) (err error) {
err = it.curr.Advance(ctx)
err = it.step(ctx)
if err != nil {
return err
}
if it.curr.Compare(it.stop) >= 0 {
if it.stop(it.curr) {
// past the end of the range
it.curr = nil
}

View File

@@ -220,7 +220,7 @@ func (tc *chunker[S]) AdvanceTo(ctx context.Context, next *Cursor) error {
if err != nil {
return err
}
tc.cur.invalidate()
tc.cur.invalidateAtEnd()
// no more pending chunks at this level, recurse
// into parent

View File

@@ -183,7 +183,7 @@ func skipCommonParents(ctx context.Context, from, to *Cursor) (err error) {
}
from.skipToNodeStart()
} else {
from.invalidate()
from.invalidateAtEnd()
}
if to.parent.Valid() {
@@ -192,7 +192,7 @@ func skipCommonParents(ctx context.Context, from, to *Cursor) (err error) {
}
to.skipToNodeStart()
} else {
to.invalidate()
to.invalidateAtEnd()
}
return

View File

@@ -325,11 +325,16 @@ func (cur *Cursor) search(item Item, cb CompareFn) (idx int) {
return idx
}
// invalidate sets the cursor's index to the node count.
func (cur *Cursor) invalidate() {
// invalidateAtEnd sets the cursor's index to the node count.
func (cur *Cursor) invalidateAtEnd() {
cur.idx = int(cur.nd.count)
}
// invalidateAtStart sets the cursor's index to -1.
func (cur *Cursor) invalidateAtStart() {
cur.idx = -1
}
// hasNext returns true if we do not need to recursively
// check the parent to know that the current cursor
// has more keys. hasNext can be false even if parent
@@ -375,7 +380,7 @@ func (cur *Cursor) Advance(ctx context.Context) error {
}
if cur.parent == nil {
cur.invalidate()
cur.invalidateAtEnd()
return nil
}
@@ -387,7 +392,7 @@ func (cur *Cursor) Advance(ctx context.Context) error {
if cur.parent.outOfBounds() {
// exhausted every parent cursor
cur.invalidate()
cur.invalidateAtEnd()
return nil
}
@@ -412,7 +417,7 @@ func (cur *Cursor) Retreat(ctx context.Context) error {
}
if cur.parent == nil {
cur.invalidate()
cur.invalidateAtStart()
return nil
}
@@ -424,7 +429,7 @@ func (cur *Cursor) Retreat(ctx context.Context) error {
if cur.parent.outOfBounds() {
// exhausted every parent cursor
cur.invalidate()
cur.invalidateAtStart()
return nil
}

View File

@@ -33,6 +33,33 @@ func TestNodeCursor(t *testing.T) {
testNewCursorAtItem(t, 1000)
testNewCursorAtItem(t, 10_000)
})
t.Run("retreat past beginning", func(t *testing.T) {
ctx := context.Background()
root, _, ns := randomTree(t, 10_000)
assert.NotNil(t, root)
before, err := NewCursorAtStart(ctx, ns, root)
assert.NoError(t, err)
err = before.Retreat(ctx)
assert.NoError(t, err)
assert.False(t, before.Valid())
start, err := NewCursorAtStart(ctx, ns, root)
assert.NoError(t, err)
assert.True(t, start.Compare(before) > 0, "start is after before")
assert.True(t, before.Compare(start) < 0, "before is before start")
// Backwards iteration...
end, err := NewCursorAtEnd(ctx, ns, root)
assert.NoError(t, err)
i := 0
for end.Compare(before) > 0 {
i++
err = end.Retreat(ctx)
assert.NoError(t, err)
}
assert.Equal(t, 10_000/2, i)
})
}
func testNewCursorAtItem(t *testing.T, count int) {

View File

@@ -340,7 +340,17 @@ func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error {
if err = cb(r); err != nil {
return err
}
// TODO: cb for parent closure.
addr = hash.New(msg.ParentClosureBytes())
if !addr.IsEmpty() {
r, err = constructRef(nbf, addr, PrimitiveTypeMap[ValueKind], SerialMessageRefHeight)
if err != nil {
return err
}
if err = cb(r); err != nil {
return err
}
}
default:
return fmt.Errorf("unsupported SerialMessage message with FileID: %s", serial.GetFileID([]byte(sm)))
}