go/store/prolly: Create a CommitClosure ordered tree. Use it in go/store/datas for the commit closure.

This commit is contained in:
Aaron Son
2022-06-15 15:18:46 -07:00
parent f608335ba7
commit ac3fc4168b
15 changed files with 817 additions and 166 deletions

View File

@@ -0,0 +1,205 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Code generated by the FlatBuffers compiler. DO NOT EDIT.
package serial
import (
flatbuffers "github.com/google/flatbuffers/go"
)
type CommitClosure struct {
_tab flatbuffers.Table
}
func GetRootAsCommitClosure(buf []byte, offset flatbuffers.UOffsetT) *CommitClosure {
n := flatbuffers.GetUOffsetT(buf[offset:])
x := &CommitClosure{}
x.Init(buf, n+offset)
return x
}
func GetSizePrefixedRootAsCommitClosure(buf []byte, offset flatbuffers.UOffsetT) *CommitClosure {
n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
x := &CommitClosure{}
x.Init(buf, n+offset+flatbuffers.SizeUint32)
return x
}
func (rcv *CommitClosure) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Bytes = buf
rcv._tab.Pos = i
}
func (rcv *CommitClosure) Table() flatbuffers.Table {
return rcv._tab
}
func (rcv *CommitClosure) KeyItems(j int) byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1))
}
return 0
}
func (rcv *CommitClosure) KeyItemsLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *CommitClosure) KeyItemsBytes() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *CommitClosure) MutateKeyItems(j int, n byte) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n)
}
return false
}
func (rcv *CommitClosure) AddressArray(j int) byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1))
}
return 0
}
func (rcv *CommitClosure) AddressArrayLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *CommitClosure) AddressArrayBytes() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *CommitClosure) MutateAddressArray(j int, n byte) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n)
}
return false
}
func (rcv *CommitClosure) SubtreeCounts(j int) byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1))
}
return 0
}
func (rcv *CommitClosure) SubtreeCountsLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *CommitClosure) SubtreeCountsBytes() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *CommitClosure) MutateSubtreeCounts(j int, n byte) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n)
}
return false
}
func (rcv *CommitClosure) TreeCount() uint64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
return rcv._tab.GetUint64(o + rcv._tab.Pos)
}
return 0
}
func (rcv *CommitClosure) MutateTreeCount(n uint64) bool {
return rcv._tab.MutateUint64Slot(10, n)
}
func (rcv *CommitClosure) TreeLevel() byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
if o != 0 {
return rcv._tab.GetByte(o + rcv._tab.Pos)
}
return 0
}
func (rcv *CommitClosure) MutateTreeLevel(n byte) bool {
return rcv._tab.MutateByteSlot(12, n)
}
func CommitClosureStart(builder *flatbuffers.Builder) {
builder.StartObject(5)
}
func CommitClosureAddKeyItems(builder *flatbuffers.Builder, keyItems flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(keyItems), 0)
}
func CommitClosureStartKeyItemsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(1, numElems, 1)
}
func CommitClosureAddAddressArray(builder *flatbuffers.Builder, addressArray flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(addressArray), 0)
}
func CommitClosureStartAddressArrayVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(1, numElems, 1)
}
func CommitClosureAddSubtreeCounts(builder *flatbuffers.Builder, subtreeCounts flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(subtreeCounts), 0)
}
func CommitClosureStartSubtreeCountsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(1, numElems, 1)
}
func CommitClosureAddTreeCount(builder *flatbuffers.Builder, treeCount uint64) {
builder.PrependUint64Slot(3, treeCount, 0)
}
func CommitClosureAddTreeLevel(builder *flatbuffers.Builder, treeLevel byte) {
builder.PrependByteSlot(4, treeLevel, 0)
}
func CommitClosureEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
}

View File

@@ -28,6 +28,7 @@ const RootValueFileID = "RTVL"
const TableFileID = "DTBL"
const ProllyTreeNodeFileID = "TUPM"
const AddressMapFileID = "ADRM"
const CommitClosureFileID = "CMCL"
func GetFileID(bs []byte) string {
if len(bs) < 8 {

View File

@@ -395,107 +395,3 @@ func ProllyTreeNodeAddTreeLevel(builder *flatbuffers.Builder, treeLevel byte) {
func ProllyTreeNodeEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
}
type CommitClosure struct {
_tab flatbuffers.Table
}
func GetRootAsCommitClosure(buf []byte, offset flatbuffers.UOffsetT) *CommitClosure {
n := flatbuffers.GetUOffsetT(buf[offset:])
x := &CommitClosure{}
x.Init(buf, n+offset)
return x
}
func GetSizePrefixedRootAsCommitClosure(buf []byte, offset flatbuffers.UOffsetT) *CommitClosure {
n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
x := &CommitClosure{}
x.Init(buf, n+offset+flatbuffers.SizeUint32)
return x
}
func (rcv *CommitClosure) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Bytes = buf
rcv._tab.Pos = i
}
func (rcv *CommitClosure) Table() flatbuffers.Table {
return rcv._tab
}
func (rcv *CommitClosure) RefArray(j int) byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1))
}
return 0
}
func (rcv *CommitClosure) RefArrayLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *CommitClosure) RefArrayBytes() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *CommitClosure) MutateRefArray(j int, n byte) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n)
}
return false
}
func (rcv *CommitClosure) TreeCount() uint64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return rcv._tab.GetUint64(o + rcv._tab.Pos)
}
return 0
}
func (rcv *CommitClosure) MutateTreeCount(n uint64) bool {
return rcv._tab.MutateUint64Slot(6, n)
}
func (rcv *CommitClosure) TreeLevel() byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
return rcv._tab.GetByte(o + rcv._tab.Pos)
}
return 0
}
func (rcv *CommitClosure) MutateTreeLevel(n byte) bool {
return rcv._tab.MutateByteSlot(8, n)
}
func CommitClosureStart(builder *flatbuffers.Builder) {
builder.StartObject(3)
}
func CommitClosureAddRefArray(builder *flatbuffers.Builder, refArray flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(refArray), 0)
}
func CommitClosureStartRefArrayVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(1, numElems, 1)
}
func CommitClosureAddTreeCount(builder *flatbuffers.Builder, treeCount uint64) {
builder.PrependUint64Slot(1, treeCount, 0)
}
func CommitClosureAddTreeLevel(builder *flatbuffers.Builder, treeLevel byte) {
builder.PrependByteSlot(2, treeLevel, 0)
}
func CommitClosureEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
}

View File

@@ -0,0 +1,37 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace serial;
table CommitClosure {
// sorted array of key items;
// key items are [uint64; address].
key_items:[ubyte] (required);
// array of subtree addresses for internal prolly tree nodes
address_array:[ubyte];
// array of uvarint encoded subtree counts
subtree_counts:[ubyte];
// total count of prolly tree
tree_count:uint64;
// prolly tree level, 0 for leaf nodes
tree_level:uint8;
}
// KEEP THIS IN SYNC WITH fileidentifiers.go
file_identifier "CMCL";
root_type CommitClosure;

View File

@@ -28,6 +28,7 @@ const RootValueFileID = "RTVL"
const TableFileID = "DTBL"
const ProllyTreeNodeFileID = "TUPM"
const AddressMapFileID = "ADRM"
const CommitClosureFileID = "CMCL"
func GetFileID(bs []byte) string {
if len(bs) < 8 {

View File

@@ -12,16 +12,17 @@ fi
# generate golang (de)serialization package
flatc -o $GEN_DIR --gen-onefile --filename-suffix "" --gen-mutable --go-namespace "serial" --go \
commit.fbs \
prolly.fbs \
addressmap.fbs \
commit.fbs \
commitclosure.fbs \
encoding.fbs \
prolly.fbs \
rootvalue.fbs \
schema.fbs \
storeroot.fbs \
table.fbs \
tag.fbs \
workingset.fbs \
encoding.fbs
workingset.fbs
# prefix files with copyright header
for FILE in $GEN_DIR/*.go;

View File

@@ -52,18 +52,6 @@ table ProllyTreeNode {
tree_level:uint8;
}
table CommitClosure {
// array of commit ref hashes
ref_array:[ubyte] (required);
// subtree member count
tree_count:uint64;
// node tree level, 0 for leaf nodes
tree_level:uint8;
}
// KEEP THIS IN SYNC WITH fileidentifiers.go
file_identifier "TUPM";

View File

@@ -56,7 +56,7 @@ func (s *nomsRootTestSuite) TestBasic() {
goldenGoodbye := "70b9adi6amrab3a5t4hcibdob0cq49m0\n"
if types.Format_Default == types.Format_DOLT_DEV {
goldenHello = "mmvr5g771a7eo56qpor2o0t5pvsolq75\n"
goldenGoodbye = "103spr29q69oq3b9a1ane7egt9uo4tkj\n"
goldenGoodbye = "e77rueijm2122jim6ah09afjtqcpaagd\n"
}
ds, _ = datas.CommitValue(context.Background(), db, ds, types.String("hello!"))

View File

@@ -23,11 +23,9 @@ import (
"github.com/dolthub/dolt/go/gen/fb/serial"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/pool"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/val"
)
func hackVRToCS(vr types.ValueReader) chunks.ChunkStore {
@@ -58,12 +56,12 @@ func newParentsClosureIterator(ctx context.Context, c *Commit, vr types.ValueRea
}
node := tree.NodeFromBytes(v.(types.TupleRowStorage))
ns := tree.NewNodeStore(hackVRToCS(vr))
m := prolly.NewMap(node, ns, commitKeyTupleDesc, commitValueTupleDesc)
mi, err := m.IterAllReverse(ctx)
cc := prolly.NewCommitClosure(node, ns)
ci, err := cc.IterAllReverse(ctx)
if err != nil {
return nil, err
}
return &fbParentsClosureIterator{mi: mi, curr: commitToFbKeyTuple(c, ns.Pool()), err: nil}, nil
return &fbParentsClosureIterator{i: ci, curr: prolly.NewCommitClosureKey(ns.Pool(), c.Height(), c.Addr()), err: nil}, nil
}
s, ok := sv.(types.Struct)
@@ -122,14 +120,6 @@ func commitToMapKeyTuple(f *types.NomsBinFormat, c *Commit) (types.Tuple, error)
return types.NewTuple(f, types.Uint(c.Height()), types.InlineBlob(ib))
}
func commitToFbKeyTuple(c *Commit, p pool.BuffPool) val.Tuple {
tb := val.NewTupleBuilder(commitKeyTupleDesc)
tb.PutUint64(0, c.Height())
h := c.Addr()
tb.PutAddress(1, h)
return tb.Build(p)
}
type parentsClosureIter interface {
Err() error
Hash() hash.Hash
@@ -215,8 +205,8 @@ func (i *parentsClosureIterator) Next(ctx context.Context) bool {
}
type fbParentsClosureIterator struct {
mi prolly.MapIter
curr val.Tuple
i prolly.CommitClosureIter
curr prolly.CommitClosureKey
err error
}
@@ -228,23 +218,21 @@ func (i *fbParentsClosureIterator) Height() uint64 {
if i.err != nil {
return 0
}
h, _ := commitKeyTupleDesc.GetUint64(0, i.curr)
return h
return i.curr.Height()
}
func (i *fbParentsClosureIterator) Hash() hash.Hash {
if i.err != nil {
return hash.Hash{}
}
bs, _ := commitKeyTupleDesc.GetAddress(1, i.curr)
return bs
return i.curr.Addr()
}
func (i *fbParentsClosureIterator) Next(ctx context.Context) bool {
if i.err != nil {
return false
}
i.curr, _, i.err = i.mi.Next(ctx)
i.curr, _, i.err = i.i.Next(ctx)
if i.err == io.EOF {
i.err = nil
return false
@@ -254,7 +242,7 @@ func (i *fbParentsClosureIterator) Next(ctx context.Context) bool {
func (i *fbParentsClosureIterator) Less(f *types.NomsBinFormat, otherI parentsClosureIter) bool {
other := otherI.(*fbParentsClosureIterator)
return commitKeyTupleDesc.Comparator().Compare(i.curr, other.curr, commitKeyTupleDesc) == -1
return i.curr.Less(other.curr)
}
func writeTypesCommitParentClosure(ctx context.Context, vrw types.ValueReadWriter, parentRefsL types.List) (types.Ref, bool, error) {
@@ -412,46 +400,39 @@ func writeFbCommitParentClosure(ctx context.Context, cs chunks.ChunkStore, vrw t
}
// Load them as ProllyTrees.
ns := tree.NewNodeStore(cs)
maps := make([]prolly.Map, len(parents))
closures := make([]prolly.CommitClosure, len(parents))
for i := range addrs {
if !types.IsNull(vs[i]) {
node := tree.NodeFromBytes(vs[i].(types.TupleRowStorage))
maps[i] = prolly.NewMap(node, ns, commitKeyTupleDesc, commitValueTupleDesc)
closures[i] = prolly.NewCommitClosure(node, ns)
} else {
maps[i], err = prolly.NewMapFromTuples(ctx, ns, commitKeyTupleDesc, commitValueTupleDesc)
if err != nil {
return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: NewMapFromTuples: %w", err)
}
closures[i] = prolly.NewEmptyCommitClosure(ns)
}
}
// Add all the missing entries from [1, ...) maps to the 0th map.
editor := maps[0].Mutate()
for i := 1; i < len(maps); i++ {
err = prolly.DiffMaps(ctx, maps[0], maps[i], func(ctx context.Context, diff tree.Diff) error {
editor := closures[0].Editor()
for i := 1; i < len(closures); i++ {
err = prolly.DiffCommitClosures(ctx, closures[0], closures[i], func(ctx context.Context, diff tree.Diff) error {
if diff.Type == tree.AddedDiff {
return editor.Put(ctx, val.Tuple(diff.Key), val.EmptyTuple)
return editor.Add(ctx, prolly.CommitClosureKey(diff.Key))
}
return nil
})
if err != nil && !errors.Is(err, io.EOF) {
return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: DiffMaps: %w", err)
return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: DiffCommitClosures: %w", err)
}
}
// Add the parents themselves to the new map.
tb := val.NewTupleBuilder(commitKeyTupleDesc)
for i := 0; i < len(parents); i++ {
tb.PutUint64(0, parents[i].Height())
tb.PutAddress(1, parentAddrs[i])
err = editor.Put(ctx, tb.Build(ns.Pool()), val.EmptyTuple)
err = editor.Add(ctx, prolly.NewCommitClosureKey(ns.Pool(), parents[i].Height(), parentAddrs[i]))
if err != nil {
return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: MutableMap.Put: %w", err)
return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: MutableCommitClosure.Put: %w", err)
}
tb.Recycle()
}
// This puts the map in the NodeStore as well.
res, err := editor.Map(ctx)
// This puts the closure in the NodeStore as well.
res, err := editor.Flush(ctx)
if err != nil {
return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: MutableMap.Map: %w", err)
return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: MutableCommitClosure.Flush: %w", err)
}
return res.HashOf(), nil
}

View File

@@ -0,0 +1,159 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prolly
import (
"bytes"
"context"
"encoding/binary"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/pool"
"github.com/dolthub/dolt/go/store/prolly/message"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
)
type CommitClosureValue []byte
type CommitClosure struct {
closure orderedTree[CommitClosureKey, CommitClosureValue, commitClosureKeyOrdering]
}
type commitClosureKeyOrdering struct{}
var _ ordering[CommitClosureKey] = commitClosureKeyOrdering{}
func (o commitClosureKeyOrdering) Compare(left, right CommitClosureKey) int {
lh, rh := left.Height(), right.Height()
if lh == rh {
return bytes.Compare(left[8:], right[8:])
} else if lh < rh {
return -1
}
return 1
}
func NewEmptyCommitClosure(ns tree.NodeStore) CommitClosure {
serializer := message.CommitClosureSerializer{Pool: ns.Pool()}
msg := serializer.Serialize(nil, nil, nil, 0)
node := tree.NodeFromBytes(msg)
return NewCommitClosure(node, ns)
}
func NewCommitClosure(node tree.Node, ns tree.NodeStore) CommitClosure {
return CommitClosure{
closure: orderedTree[CommitClosureKey, CommitClosureValue, commitClosureKeyOrdering]{
root: node,
ns: ns,
order: commitClosureKeyOrdering{},
},
}
}
func (c CommitClosure) Count() int {
return c.closure.count()
}
func (c CommitClosure) Height() int {
return c.closure.height()
}
func (c CommitClosure) Node() tree.Node {
return c.closure.root
}
func (c CommitClosure) HashOf() hash.Hash {
return c.closure.hashOf()
}
func (c CommitClosure) Format() *types.NomsBinFormat {
return c.closure.ns.Format()
}
func (c CommitClosure) Editor() CommitClosureEditor {
return CommitClosureEditor{
closure: c.closure.mutate(),
}
}
func (c CommitClosure) IterAllReverse(ctx context.Context) (CommitClosureIter, error) {
return c.closure.iterAllReverse(ctx)
}
func DecodeCommitClosureKey(key []byte) (height uint64, addr hash.Hash) {
height = binary.LittleEndian.Uint64(key)
addr = hash.New(key[8:])
return
}
type CommitClosureEditor struct {
closure orderedMap[CommitClosureKey, CommitClosureValue, commitClosureKeyOrdering]
}
type CommitClosureKey []byte
type CommitClosureIter kvIter[CommitClosureKey, CommitClosureValue]
func NewCommitClosureKey(p pool.BuffPool, height uint64, addr hash.Hash) CommitClosureKey {
r := p.Get(8 + 20)
binary.LittleEndian.PutUint64(r, height)
copy(r[8:], addr[:])
return CommitClosureKey(r)
}
func (k CommitClosureKey) Height() uint64 {
return binary.LittleEndian.Uint64(k)
}
func (k CommitClosureKey) Addr() hash.Hash {
return hash.New(k[8:])
}
func (k CommitClosureKey) Less(other CommitClosureKey) bool {
return commitClosureKeyOrdering{}.Compare(k, other) < 0
}
var emptyCommitClosureValue CommitClosureValue = CommitClosureValue(make([]byte, 1))
func (wr CommitClosureEditor) Add(ctx context.Context, key CommitClosureKey) error {
return wr.closure.put(ctx, key, emptyCommitClosureValue)
}
func (wr CommitClosureEditor) Delete(ctx context.Context, key CommitClosureKey) error {
return wr.closure.delete(ctx, key)
}
func (wr CommitClosureEditor) Flush(ctx context.Context) (CommitClosure, error) {
tr := wr.closure.tree
serializer := message.CommitClosureSerializer{Pool: tr.ns.Pool()}
root, err := tree.ApplyMutations(ctx, tr.ns, tr.root, serializer, wr.closure.mutations(), tr.compareItems)
if err != nil {
return CommitClosure{}, err
}
return CommitClosure{
closure: orderedTree[CommitClosureKey, CommitClosureValue, commitClosureKeyOrdering]{
root: root,
ns: tr.ns,
order: tr.order,
},
}, nil
}
func DiffCommitClosures(ctx context.Context, from, to CommitClosure, cb DiffFn) error {
return diffOrderedTrees(ctx, from.closure, to.closure, cb)
}

View File

@@ -0,0 +1,192 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package prolly
import (
"context"
"errors"
"fmt"
"io"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/prolly/message"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
)
func TestCommitClosure(t *testing.T) {
ns := tree.NewTestNodeStore()
ctx := context.Background()
t.Run("Keys", func(t *testing.T) {
k0 := NewCommitClosureKey(ns.Pool(), 0, hash.Parse("00000000000000000000000000000000"))
assert.Equal(t, uint64(0), k0.Height())
assert.True(t, k0.Addr().Equal(hash.Hash{}))
k0_ := NewCommitClosureKey(ns.Pool(), 0, hash.Parse("00000000000000000000000000000000"))
assert.False(t, k0.Less(k0_))
h := hash.Parse("00000000000000000000000000000001")
k0_1 := NewCommitClosureKey(ns.Pool(), 0, h)
assert.True(t, k0_1.Addr().Equal(h))
assert.True(t, k0.Less(k0_1))
assert.False(t, k0_1.Less(k0_))
})
t.Run("Empty", func(t *testing.T) {
cc := NewEmptyCommitClosure(ns)
assert.NotNil(t, cc)
assert.Equal(t, 0, cc.Count())
assert.Equal(t, 0, cc.closure.root.Count())
assert.Equal(t, 1, cc.Height())
i, err := cc.IterAllReverse(ctx)
_, _, err = i.Next(ctx)
assert.Error(t, err)
assert.True(t, errors.Is(err, io.EOF))
})
t.Run("Insert", func(t *testing.T) {
cc := NewEmptyCommitClosure(ns)
e := cc.Editor()
err := e.Add(ctx, NewCommitClosureKey(ns.Pool(), 0, hash.Parse("00000000000000000000000000000000")))
assert.NoError(t, err)
err = e.Add(ctx, NewCommitClosureKey(ns.Pool(), 1, hash.Parse("00000000000000000000000000000000")))
assert.NoError(t, err)
cc, err = e.Flush(ctx)
assert.NoError(t, err)
assert.Equal(t, 2, cc.Count())
i, err := cc.IterAllReverse(ctx)
assert.NoError(t, err)
k, _, err := i.Next(ctx)
assert.NoError(t, err)
assert.Equal(t, uint64(1), k.Height())
k, _, err = i.Next(ctx)
assert.NoError(t, err)
assert.Equal(t, uint64(0), k.Height())
_, _, err = i.Next(ctx)
assert.Error(t, err)
assert.True(t, errors.Is(err, io.EOF))
e = cc.Editor()
err = e.Add(ctx, NewCommitClosureKey(ns.Pool(), 0, hash.Parse("00000000000000000000000000000000")))
assert.NoError(t, err)
err = e.Add(ctx, NewCommitClosureKey(ns.Pool(), 1, hash.Parse("00000000000000000000000000000000")))
assert.NoError(t, err)
cc, err = e.Flush(ctx)
assert.NoError(t, err)
assert.Equal(t, 2, cc.Count())
})
t.Run("Diff", func(t *testing.T) {
ccl := NewEmptyCommitClosure(ns)
e := ccl.Editor()
err := e.Add(ctx, NewCommitClosureKey(ns.Pool(), 0, hash.Parse("00000000000000000000000000000000")))
assert.NoError(t, err)
err = e.Add(ctx, NewCommitClosureKey(ns.Pool(), 1, hash.Parse("00000000000000000000000000000000")))
assert.NoError(t, err)
ccl, err = e.Flush(ctx)
assert.NoError(t, err)
assert.Equal(t, 2, ccl.Count())
ccr := NewEmptyCommitClosure(ns)
e = ccr.Editor()
err = e.Add(ctx, NewCommitClosureKey(ns.Pool(), 0, hash.Parse("00000000000000000000000000000000")))
assert.NoError(t, err)
err = e.Add(ctx, NewCommitClosureKey(ns.Pool(), 1, hash.Parse("00000000000000000000000000000000")))
assert.NoError(t, err)
err = e.Add(ctx, NewCommitClosureKey(ns.Pool(), 1, hash.Parse("00000000000000000000000000000001")))
assert.NoError(t, err)
err = e.Add(ctx, NewCommitClosureKey(ns.Pool(), 2, hash.Parse("00000000000000000000000000000000")))
assert.NoError(t, err)
ccr, err = e.Flush(ctx)
assert.NoError(t, err)
assert.Equal(t, 4, ccr.Count())
var numadds, numdels int
err = DiffCommitClosures(ctx, ccl, ccr, func(ctx context.Context, d tree.Diff) error {
if d.Type == tree.AddedDiff {
numadds++
} else if d.Type == tree.RemovedDiff {
numdels++
}
return nil
})
assert.Error(t, err)
assert.True(t, errors.Is(err, io.EOF))
assert.Equal(t, 2, numadds)
assert.Equal(t, 0, numdels)
})
t.Run("WalkAddresses", func(t *testing.T) {
cc := NewEmptyCommitClosure(ns)
e := cc.Editor()
for i := 0; i < 4096; i++ {
err := e.Add(ctx, NewCommitClosureKey(ns.Pool(), uint64(i), hash.Parse(fmt.Sprintf("%0.32d", i))))
require.NoError(t, err)
}
cc, err := e.Flush(ctx)
require.NoError(t, err)
assert.Equal(t, 4096, cc.Count())
// Walk the addresses in the root.
msg := message.Message(tree.ValueFromNode(cc.closure.root).(types.TupleRowStorage))
numaddresses := 0
err = message.WalkAddresses(ctx, msg, func(ctx context.Context, addr hash.Hash) error {
numaddresses++
return nil
})
require.NoError(t, err)
assert.Less(t, numaddresses, 4096)
// Walk all addresses in the tree.
numaddresses = 0
err = tree.WalkAddresses(ctx, cc.closure.root, ns, func(ctx context.Context, addr hash.Hash) error {
numaddresses++
return nil
})
require.NoError(t, err)
assert.Less(t, 4096, numaddresses)
})
t.Run("WalkNodes", func(t *testing.T) {
cc := NewEmptyCommitClosure(ns)
e := cc.Editor()
for i := 0; i < 4096; i++ {
err := e.Add(ctx, NewCommitClosureKey(ns.Pool(), uint64(i), hash.Parse(fmt.Sprintf("%0.32d", i))))
require.NoError(t, err)
}
cc, err := e.Flush(ctx)
require.NoError(t, err)
assert.Equal(t, 4096, cc.Count())
numnodes := 0
totalentries := 0
err = tree.WalkNodes(ctx, cc.closure.root, ns, func(ctx context.Context, node tree.Node) error {
numnodes++
totalentries += node.Count()
return nil
})
require.NoError(t, err)
assert.Less(t, cc.closure.root.Count(), numnodes)
assert.Less(t, 4096, totalentries)
})
}

View File

@@ -0,0 +1,168 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package message
import (
"context"
"encoding/binary"
fb "github.com/google/flatbuffers/go"
"github.com/dolthub/dolt/go/gen/fb/serial"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/pool"
"github.com/dolthub/dolt/go/store/val"
)
var commitClosureKeyOffsets []byte
var commitClosureValueOffsets []byte
var commitClosureEmptyValueBytes []byte
func init() {
commitClosureKeyOffsets = make([]byte, (maxChunkSz/commitClosureKeyLength)*uint16Size)
commitClosureValueOffsets = make([]byte, (maxChunkSz/commitClosureKeyLength)*uint16Size)
commitClosureEmptyValueBytes = make([]byte, 0)
buf := commitClosureKeyOffsets
off := uint16(commitClosureKeyLength)
for len(buf) > 0 {
binary.LittleEndian.PutUint16(buf, off)
buf = buf[uint16Size:]
off += uint16(commitClosureKeyLength)
}
}
func offsetsForCommitClosureKeys(buf []byte) []byte {
cnt := len(buf) / commitClosureKeyLength
return commitClosureKeyOffsets[:cnt*uint16Size]
}
func getCommitClosureKeys(msg Message) val.SlicedBuffer {
var ret val.SlicedBuffer
m := serial.GetRootAsCommitClosure(msg, messagePrefixSz)
ret.Buf = m.KeyItemsBytes()
ret.Offs = offsetsForCommitClosureKeys(ret.Buf)
return ret
}
func getCommitClosureValues(msg Message) val.SlicedBuffer {
var ret val.SlicedBuffer
m := serial.GetRootAsCommitClosure(msg, messagePrefixSz)
if m.AddressArrayLength() == 0 {
ret.Buf = commitClosureEmptyValueBytes
ret.Offs = commitClosureValueOffsets[:getCommitClosureCount(msg)*uint16Size]
return ret
}
ret.Buf = m.AddressArrayBytes()
ret.Offs = offsetsForAddressArray(ret.Buf)
return ret
}
// uint64 + hash.
const commitClosureKeyLength = 8 + 20
func getCommitClosureCount(msg Message) uint16 {
m := serial.GetRootAsCommitClosure(msg, messagePrefixSz)
return uint16(m.KeyItemsLength() / commitClosureKeyLength)
}
func getCommitClosureTreeLevel(msg Message) int {
m := serial.GetRootAsCommitClosure(msg, messagePrefixSz)
return int(m.TreeLevel())
}
func getCommitClosureTreeCount(msg Message) int {
m := serial.GetRootAsCommitClosure(msg, messagePrefixSz)
return int(m.TreeCount())
}
func getCommitClosureSubtrees(msg Message) []uint64 {
counts := make([]uint64, getCommitClosureCount(msg))
m := serial.GetRootAsCommitClosure(msg, messagePrefixSz)
return decodeVarints(m.SubtreeCountsBytes(), counts)
}
func walkCommitClosureAddresses(ctx context.Context, msg Message, cb func(ctx context.Context, addr hash.Hash) error) error {
m := serial.GetRootAsCommitClosure(msg, messagePrefixSz)
arr := m.AddressArrayBytes()
for i := 0; i < len(arr)/hash.ByteLen; i++ {
addr := hash.New(arr[i*addrSize : (i+1)*addrSize])
if err := cb(ctx, addr); err != nil {
return err
}
}
if m.TreeLevel() == 0 {
// If Level() == 0, walk addresses in keys.
keybytes := m.KeyItemsBytes()
for i := 8; i < len(keybytes); i += commitClosureKeyLength {
addr := hash.New(keybytes[i : i+addrSize])
if err := cb(ctx, addr); err != nil {
return err
}
}
}
return nil
}
var commitClosureFileID = []byte(serial.CommitClosureFileID)
type CommitClosureSerializer struct {
Pool pool.BuffPool
}
var _ Serializer = CommitClosureSerializer{}
func (s CommitClosureSerializer) Serialize(keys, addrs [][]byte, subtrees []uint64, level int) Message {
var keyArr, addrArr, cardArr fb.UOffsetT
keySz, addrSz, totalSz := estimateCommitClosureSize(keys, addrs, subtrees)
b := getFlatbufferBuilder(s.Pool, totalSz)
// keys
keyArr = writeItemBytes(b, keys, keySz)
if level > 0 {
// addresses
addrArr = writeItemBytes(b, addrs, addrSz)
// subtree cardinalities
cardArr = writeCountArray(b, subtrees)
}
serial.CommitClosureStart(b)
serial.CommitClosureAddKeyItems(b, keyArr)
if level > 0 {
serial.CommitClosureAddAddressArray(b, addrArr)
serial.CommitClosureAddSubtreeCounts(b, cardArr)
serial.CommitClosureAddTreeCount(b, sumSubtrees(subtrees))
} else {
serial.CommitClosureAddTreeCount(b, uint64(len(keys)))
}
serial.CommitClosureAddTreeLevel(b, uint8(level))
return finishMessage(b, serial.CommitClosureEnd(b), commitClosureFileID)
}
func estimateCommitClosureSize(keys, addresses [][]byte, subtrees []uint64) (keySz, addrSz, totalSz int) {
keySz = commitClosureKeyLength * len(keys)
addrSz = addrSize * len(addresses)
totalSz += keySz + addrSz
totalSz += len(subtrees) * binary.MaxVarintLen64
totalSz += 8 + 1 + 1 + 1
totalSz += 72
totalSz += messagePrefixSz
return
}

View File

@@ -45,6 +45,12 @@ func GetKeysAndValues(msg Message) (keys, values val.SlicedBuffer, cnt uint16) {
cnt = getAddressMapCount(msg)
return
}
if id == serial.CommitClosureFileID {
keys = getCommitClosureKeys(msg)
values = getCommitClosureValues(msg)
cnt = getCommitClosureCount(msg)
return
}
panic(fmt.Sprintf("unknown message id %s", id))
}
@@ -56,6 +62,8 @@ func WalkAddresses(ctx context.Context, msg Message, cb func(ctx context.Context
return walkProllyMapAddresses(ctx, msg, cb)
case serial.AddressMapFileID:
return walkAddressMapAddresses(ctx, msg, cb)
case serial.CommitClosureFileID:
return walkCommitClosureAddresses(ctx, msg, cb)
default:
panic(fmt.Sprintf("unknown message id %s", id))
}
@@ -68,6 +76,8 @@ func GetTreeLevel(msg Message) int {
return getProllyMapTreeLevel(msg)
case serial.AddressMapFileID:
return getAddressMapTreeLevel(msg)
case serial.CommitClosureFileID:
return getCommitClosureTreeLevel(msg)
default:
panic(fmt.Sprintf("unknown message id %s", id))
}
@@ -80,6 +90,8 @@ func GetTreeCount(msg Message) int {
return getProllyMapTreeCount(msg)
case serial.AddressMapFileID:
return getAddressMapTreeCount(msg)
case serial.CommitClosureFileID:
return getCommitClosureTreeCount(msg)
default:
panic(fmt.Sprintf("unknown message id %s", id))
}
@@ -92,6 +104,8 @@ func GetSubtrees(msg Message) []uint64 {
return getProllyMapSubtrees(msg)
case serial.AddressMapFileID:
return getAddressMapSubtrees(msg)
case serial.CommitClosureFileID:
return getCommitClosureSubtrees(msg)
default:
panic(fmt.Sprintf("unknown message id %s", id))
}

View File

@@ -43,6 +43,10 @@ func WalkAddresses(ctx context.Context, nd Node, ns NodeStore, cb AddressCb) err
return err
}
if nd.IsLeaf() {
return nil
}
child, err := ns.Read(ctx, addr)
if err != nil {
return err
@@ -59,6 +63,10 @@ func WalkNodes(ctx context.Context, nd Node, ns NodeStore, cb NodeCb) error {
return err
}
if nd.IsLeaf() {
return nil
}
return walkAddresses(ctx, nd, func(ctx context.Context, addr hash.Hash) error {
child, err := ns.Read(ctx, addr)
if err != nil {

View File

@@ -241,7 +241,7 @@ func (cur *Cursor) firstKey() Item {
}
func (cur *Cursor) lastKey() Item {
lastKeyIdx := int(cur.nd.count - 1)
lastKeyIdx := int(cur.nd.count) - 1
return cur.nd.GetKey(lastKeyIdx)
}
@@ -250,7 +250,7 @@ func (cur *Cursor) skipToNodeStart() {
}
func (cur *Cursor) skipToNodeEnd() {
lastKeyIdx := int(cur.nd.count - 1)
lastKeyIdx := int(cur.nd.count) - 1
cur.idx = lastKeyIdx
}
@@ -258,7 +258,7 @@ func (cur *Cursor) keepInBounds() {
if cur.idx < 0 {
cur.skipToNodeStart()
}
lastKeyIdx := int(cur.nd.count - 1)
lastKeyIdx := int(cur.nd.count) - 1
if cur.idx > lastKeyIdx {
cur.skipToNodeEnd()
}
@@ -271,7 +271,7 @@ func (cur *Cursor) atNodeStart() bool {
// atNodeEnd returns true if the cursor's current |idx|
// points to the last node item
func (cur *Cursor) atNodeEnd() bool {
lastKeyIdx := int(cur.nd.count - 1)
lastKeyIdx := int(cur.nd.count) - 1
return cur.idx == lastKeyIdx
}