From ac3fc4168b883a3dc560e7a9b9bbb6b044c0f86f Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Wed, 15 Jun 2022 15:18:46 -0700 Subject: [PATCH] go/store/prolly: Create a CommitClosure ordered tree. Use it in go/store/datas for the commit closure. --- go/gen/fb/serial/commitclosure.go | 205 ++++++++++++++++++++++ go/gen/fb/serial/fileidentifiers.go | 1 + go/gen/fb/serial/prolly.go | 104 ----------- go/serial/commitclosure.fbs | 37 ++++ go/serial/fileidentifiers.go | 1 + go/serial/generate.sh | 9 +- go/serial/prolly.fbs | 12 -- go/store/cmd/noms/noms_root_test.go | 2 +- go/store/datas/commit_closure.go | 63 +++---- go/store/prolly/commit_closure.go | 159 +++++++++++++++++ go/store/prolly/commit_closure_test.go | 192 ++++++++++++++++++++ go/store/prolly/message/commit_closure.go | 168 ++++++++++++++++++ go/store/prolly/message/message.go | 14 ++ go/store/prolly/tree/node.go | 8 + go/store/prolly/tree/node_cursor.go | 8 +- 15 files changed, 817 insertions(+), 166 deletions(-) create mode 100644 go/gen/fb/serial/commitclosure.go create mode 100644 go/serial/commitclosure.fbs create mode 100644 go/store/prolly/commit_closure.go create mode 100644 go/store/prolly/commit_closure_test.go create mode 100644 go/store/prolly/message/commit_closure.go diff --git a/go/gen/fb/serial/commitclosure.go b/go/gen/fb/serial/commitclosure.go new file mode 100644 index 0000000000..22cbf09ecb --- /dev/null +++ b/go/gen/fb/serial/commitclosure.go @@ -0,0 +1,205 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by the FlatBuffers compiler. DO NOT EDIT. + +package serial + +import ( + flatbuffers "github.com/google/flatbuffers/go" +) + +type CommitClosure struct { + _tab flatbuffers.Table +} + +func GetRootAsCommitClosure(buf []byte, offset flatbuffers.UOffsetT) *CommitClosure { + n := flatbuffers.GetUOffsetT(buf[offset:]) + x := &CommitClosure{} + x.Init(buf, n+offset) + return x +} + +func GetSizePrefixedRootAsCommitClosure(buf []byte, offset flatbuffers.UOffsetT) *CommitClosure { + n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) + x := &CommitClosure{} + x.Init(buf, n+offset+flatbuffers.SizeUint32) + return x +} + +func (rcv *CommitClosure) Init(buf []byte, i flatbuffers.UOffsetT) { + rcv._tab.Bytes = buf + rcv._tab.Pos = i +} + +func (rcv *CommitClosure) Table() flatbuffers.Table { + return rcv._tab +} + +func (rcv *CommitClosure) KeyItems(j int) byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1)) + } + return 0 +} + +func (rcv *CommitClosure) KeyItemsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *CommitClosure) KeyItemsBytes() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *CommitClosure) MutateKeyItems(j int, n byte) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n) + } + return false +} + +func (rcv *CommitClosure) AddressArray(j int) byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1)) + } + return 0 +} + +func (rcv *CommitClosure) AddressArrayLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *CommitClosure) AddressArrayBytes() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *CommitClosure) MutateAddressArray(j int, n byte) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n) + } + return false +} + +func (rcv *CommitClosure) SubtreeCounts(j int) byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1)) + } + return 0 +} + +func (rcv *CommitClosure) SubtreeCountsLength() int { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.VectorLen(o) + } + return 0 +} + +func (rcv *CommitClosure) SubtreeCountsBytes() []byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + return rcv._tab.ByteVector(o + rcv._tab.Pos) + } + return nil +} + +func (rcv *CommitClosure) MutateSubtreeCounts(j int, n byte) bool { + o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) + if o != 0 { + a := rcv._tab.Vector(o) + return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n) + } + return false +} + +func (rcv *CommitClosure) TreeCount() uint64 { + o := flatbuffers.UOffsetT(rcv._tab.Offset(10)) + if o != 0 { + return rcv._tab.GetUint64(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *CommitClosure) MutateTreeCount(n uint64) bool { + return rcv._tab.MutateUint64Slot(10, n) +} + +func (rcv *CommitClosure) TreeLevel() byte { + o := flatbuffers.UOffsetT(rcv._tab.Offset(12)) + if o != 0 { + return rcv._tab.GetByte(o + rcv._tab.Pos) + } + return 0 +} + +func (rcv *CommitClosure) MutateTreeLevel(n byte) bool { + return rcv._tab.MutateByteSlot(12, n) +} + +func CommitClosureStart(builder *flatbuffers.Builder) { + builder.StartObject(5) +} +func CommitClosureAddKeyItems(builder *flatbuffers.Builder, keyItems flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(keyItems), 0) +} +func CommitClosureStartKeyItemsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(1, numElems, 1) +} +func CommitClosureAddAddressArray(builder *flatbuffers.Builder, addressArray flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(addressArray), 0) +} +func CommitClosureStartAddressArrayVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(1, numElems, 1) +} +func CommitClosureAddSubtreeCounts(builder *flatbuffers.Builder, subtreeCounts flatbuffers.UOffsetT) { + builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(subtreeCounts), 0) +} +func CommitClosureStartSubtreeCountsVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { + return builder.StartVector(1, numElems, 1) +} +func CommitClosureAddTreeCount(builder *flatbuffers.Builder, treeCount uint64) { + builder.PrependUint64Slot(3, treeCount, 0) +} +func CommitClosureAddTreeLevel(builder *flatbuffers.Builder, treeLevel byte) { + builder.PrependByteSlot(4, treeLevel, 0) +} +func CommitClosureEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { + return builder.EndObject() +} diff --git a/go/gen/fb/serial/fileidentifiers.go b/go/gen/fb/serial/fileidentifiers.go index a937b54178..749a7cb9c1 100644 --- a/go/gen/fb/serial/fileidentifiers.go +++ b/go/gen/fb/serial/fileidentifiers.go @@ -28,6 +28,7 @@ const RootValueFileID = "RTVL" const TableFileID = "DTBL" const ProllyTreeNodeFileID = "TUPM" const AddressMapFileID = "ADRM" +const CommitClosureFileID = "CMCL" func GetFileID(bs []byte) string { if len(bs) < 8 { diff --git a/go/gen/fb/serial/prolly.go b/go/gen/fb/serial/prolly.go index cbb0523aa8..a4a24b1921 100644 --- a/go/gen/fb/serial/prolly.go +++ b/go/gen/fb/serial/prolly.go @@ -395,107 +395,3 @@ func ProllyTreeNodeAddTreeLevel(builder *flatbuffers.Builder, treeLevel byte) { func ProllyTreeNodeEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { return builder.EndObject() } - -type CommitClosure struct { - _tab flatbuffers.Table -} - -func GetRootAsCommitClosure(buf []byte, offset flatbuffers.UOffsetT) *CommitClosure { - n := flatbuffers.GetUOffsetT(buf[offset:]) - x := &CommitClosure{} - x.Init(buf, n+offset) - return x -} - -func GetSizePrefixedRootAsCommitClosure(buf []byte, offset flatbuffers.UOffsetT) *CommitClosure { - n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:]) - x := &CommitClosure{} - x.Init(buf, n+offset+flatbuffers.SizeUint32) - return x -} - -func (rcv *CommitClosure) Init(buf []byte, i flatbuffers.UOffsetT) { - rcv._tab.Bytes = buf - rcv._tab.Pos = i -} - -func (rcv *CommitClosure) Table() flatbuffers.Table { - return rcv._tab -} - -func (rcv *CommitClosure) RefArray(j int) byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) - if o != 0 { - a := rcv._tab.Vector(o) - return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1)) - } - return 0 -} - -func (rcv *CommitClosure) RefArrayLength() int { - o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) - if o != 0 { - return rcv._tab.VectorLen(o) - } - return 0 -} - -func (rcv *CommitClosure) RefArrayBytes() []byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) - if o != 0 { - return rcv._tab.ByteVector(o + rcv._tab.Pos) - } - return nil -} - -func (rcv *CommitClosure) MutateRefArray(j int, n byte) bool { - o := flatbuffers.UOffsetT(rcv._tab.Offset(4)) - if o != 0 { - a := rcv._tab.Vector(o) - return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n) - } - return false -} - -func (rcv *CommitClosure) TreeCount() uint64 { - o := flatbuffers.UOffsetT(rcv._tab.Offset(6)) - if o != 0 { - return rcv._tab.GetUint64(o + rcv._tab.Pos) - } - return 0 -} - -func (rcv *CommitClosure) MutateTreeCount(n uint64) bool { - return rcv._tab.MutateUint64Slot(6, n) -} - -func (rcv *CommitClosure) TreeLevel() byte { - o := flatbuffers.UOffsetT(rcv._tab.Offset(8)) - if o != 0 { - return rcv._tab.GetByte(o + rcv._tab.Pos) - } - return 0 -} - -func (rcv *CommitClosure) MutateTreeLevel(n byte) bool { - return rcv._tab.MutateByteSlot(8, n) -} - -func CommitClosureStart(builder *flatbuffers.Builder) { - builder.StartObject(3) -} -func CommitClosureAddRefArray(builder *flatbuffers.Builder, refArray flatbuffers.UOffsetT) { - builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(refArray), 0) -} -func CommitClosureStartRefArrayVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT { - return builder.StartVector(1, numElems, 1) -} -func CommitClosureAddTreeCount(builder *flatbuffers.Builder, treeCount uint64) { - builder.PrependUint64Slot(1, treeCount, 0) -} -func CommitClosureAddTreeLevel(builder *flatbuffers.Builder, treeLevel byte) { - builder.PrependByteSlot(2, treeLevel, 0) -} -func CommitClosureEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT { - return builder.EndObject() -} diff --git a/go/serial/commitclosure.fbs b/go/serial/commitclosure.fbs new file mode 100644 index 0000000000..f4bba59a48 --- /dev/null +++ b/go/serial/commitclosure.fbs @@ -0,0 +1,37 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace serial; + +table CommitClosure { + // sorted array of key items; + // key items are [uint64; address]. + key_items:[ubyte] (required); + + // array of subtree addresses for internal prolly tree nodes + address_array:[ubyte]; + + // array of uvarint encoded subtree counts + subtree_counts:[ubyte]; + // total count of prolly tree + tree_count:uint64; + // prolly tree level, 0 for leaf nodes + tree_level:uint8; +} + + +// KEEP THIS IN SYNC WITH fileidentifiers.go +file_identifier "CMCL"; + +root_type CommitClosure; diff --git a/go/serial/fileidentifiers.go b/go/serial/fileidentifiers.go index a937b54178..749a7cb9c1 100644 --- a/go/serial/fileidentifiers.go +++ b/go/serial/fileidentifiers.go @@ -28,6 +28,7 @@ const RootValueFileID = "RTVL" const TableFileID = "DTBL" const ProllyTreeNodeFileID = "TUPM" const AddressMapFileID = "ADRM" +const CommitClosureFileID = "CMCL" func GetFileID(bs []byte) string { if len(bs) < 8 { diff --git a/go/serial/generate.sh b/go/serial/generate.sh index a73e337045..df347c500b 100755 --- a/go/serial/generate.sh +++ b/go/serial/generate.sh @@ -12,16 +12,17 @@ fi # generate golang (de)serialization package flatc -o $GEN_DIR --gen-onefile --filename-suffix "" --gen-mutable --go-namespace "serial" --go \ - commit.fbs \ - prolly.fbs \ addressmap.fbs \ + commit.fbs \ + commitclosure.fbs \ + encoding.fbs \ + prolly.fbs \ rootvalue.fbs \ schema.fbs \ storeroot.fbs \ table.fbs \ tag.fbs \ - workingset.fbs \ - encoding.fbs + workingset.fbs # prefix files with copyright header for FILE in $GEN_DIR/*.go; diff --git a/go/serial/prolly.fbs b/go/serial/prolly.fbs index 0260ce2bb8..b69633d967 100644 --- a/go/serial/prolly.fbs +++ b/go/serial/prolly.fbs @@ -52,18 +52,6 @@ table ProllyTreeNode { tree_level:uint8; } -table CommitClosure { - // array of commit ref hashes - ref_array:[ubyte] (required); - - // subtree member count - tree_count:uint64; - - // node tree level, 0 for leaf nodes - tree_level:uint8; -} - - // KEEP THIS IN SYNC WITH fileidentifiers.go file_identifier "TUPM"; diff --git a/go/store/cmd/noms/noms_root_test.go b/go/store/cmd/noms/noms_root_test.go index 368145d5b4..49b6ce0694 100644 --- a/go/store/cmd/noms/noms_root_test.go +++ b/go/store/cmd/noms/noms_root_test.go @@ -56,7 +56,7 @@ func (s *nomsRootTestSuite) TestBasic() { goldenGoodbye := "70b9adi6amrab3a5t4hcibdob0cq49m0\n" if types.Format_Default == types.Format_DOLT_DEV { goldenHello = "mmvr5g771a7eo56qpor2o0t5pvsolq75\n" - goldenGoodbye = "103spr29q69oq3b9a1ane7egt9uo4tkj\n" + goldenGoodbye = "e77rueijm2122jim6ah09afjtqcpaagd\n" } ds, _ = datas.CommitValue(context.Background(), db, ds, types.String("hello!")) diff --git a/go/store/datas/commit_closure.go b/go/store/datas/commit_closure.go index d6550e1ac6..6e49355a4a 100644 --- a/go/store/datas/commit_closure.go +++ b/go/store/datas/commit_closure.go @@ -23,11 +23,9 @@ import ( "github.com/dolthub/dolt/go/gen/fb/serial" "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/hash" - "github.com/dolthub/dolt/go/store/pool" "github.com/dolthub/dolt/go/store/prolly" "github.com/dolthub/dolt/go/store/prolly/tree" "github.com/dolthub/dolt/go/store/types" - "github.com/dolthub/dolt/go/store/val" ) func hackVRToCS(vr types.ValueReader) chunks.ChunkStore { @@ -58,12 +56,12 @@ func newParentsClosureIterator(ctx context.Context, c *Commit, vr types.ValueRea } node := tree.NodeFromBytes(v.(types.TupleRowStorage)) ns := tree.NewNodeStore(hackVRToCS(vr)) - m := prolly.NewMap(node, ns, commitKeyTupleDesc, commitValueTupleDesc) - mi, err := m.IterAllReverse(ctx) + cc := prolly.NewCommitClosure(node, ns) + ci, err := cc.IterAllReverse(ctx) if err != nil { return nil, err } - return &fbParentsClosureIterator{mi: mi, curr: commitToFbKeyTuple(c, ns.Pool()), err: nil}, nil + return &fbParentsClosureIterator{i: ci, curr: prolly.NewCommitClosureKey(ns.Pool(), c.Height(), c.Addr()), err: nil}, nil } s, ok := sv.(types.Struct) @@ -122,14 +120,6 @@ func commitToMapKeyTuple(f *types.NomsBinFormat, c *Commit) (types.Tuple, error) return types.NewTuple(f, types.Uint(c.Height()), types.InlineBlob(ib)) } -func commitToFbKeyTuple(c *Commit, p pool.BuffPool) val.Tuple { - tb := val.NewTupleBuilder(commitKeyTupleDesc) - tb.PutUint64(0, c.Height()) - h := c.Addr() - tb.PutAddress(1, h) - return tb.Build(p) -} - type parentsClosureIter interface { Err() error Hash() hash.Hash @@ -215,8 +205,8 @@ func (i *parentsClosureIterator) Next(ctx context.Context) bool { } type fbParentsClosureIterator struct { - mi prolly.MapIter - curr val.Tuple + i prolly.CommitClosureIter + curr prolly.CommitClosureKey err error } @@ -228,23 +218,21 @@ func (i *fbParentsClosureIterator) Height() uint64 { if i.err != nil { return 0 } - h, _ := commitKeyTupleDesc.GetUint64(0, i.curr) - return h + return i.curr.Height() } func (i *fbParentsClosureIterator) Hash() hash.Hash { if i.err != nil { return hash.Hash{} } - bs, _ := commitKeyTupleDesc.GetAddress(1, i.curr) - return bs + return i.curr.Addr() } func (i *fbParentsClosureIterator) Next(ctx context.Context) bool { if i.err != nil { return false } - i.curr, _, i.err = i.mi.Next(ctx) + i.curr, _, i.err = i.i.Next(ctx) if i.err == io.EOF { i.err = nil return false @@ -254,7 +242,7 @@ func (i *fbParentsClosureIterator) Next(ctx context.Context) bool { func (i *fbParentsClosureIterator) Less(f *types.NomsBinFormat, otherI parentsClosureIter) bool { other := otherI.(*fbParentsClosureIterator) - return commitKeyTupleDesc.Comparator().Compare(i.curr, other.curr, commitKeyTupleDesc) == -1 + return i.curr.Less(other.curr) } func writeTypesCommitParentClosure(ctx context.Context, vrw types.ValueReadWriter, parentRefsL types.List) (types.Ref, bool, error) { @@ -412,46 +400,39 @@ func writeFbCommitParentClosure(ctx context.Context, cs chunks.ChunkStore, vrw t } // Load them as ProllyTrees. ns := tree.NewNodeStore(cs) - maps := make([]prolly.Map, len(parents)) + closures := make([]prolly.CommitClosure, len(parents)) for i := range addrs { if !types.IsNull(vs[i]) { node := tree.NodeFromBytes(vs[i].(types.TupleRowStorage)) - maps[i] = prolly.NewMap(node, ns, commitKeyTupleDesc, commitValueTupleDesc) + closures[i] = prolly.NewCommitClosure(node, ns) } else { - maps[i], err = prolly.NewMapFromTuples(ctx, ns, commitKeyTupleDesc, commitValueTupleDesc) - if err != nil { - return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: NewMapFromTuples: %w", err) - } + closures[i] = prolly.NewEmptyCommitClosure(ns) } } // Add all the missing entries from [1, ...) maps to the 0th map. - editor := maps[0].Mutate() - for i := 1; i < len(maps); i++ { - err = prolly.DiffMaps(ctx, maps[0], maps[i], func(ctx context.Context, diff tree.Diff) error { + editor := closures[0].Editor() + for i := 1; i < len(closures); i++ { + err = prolly.DiffCommitClosures(ctx, closures[0], closures[i], func(ctx context.Context, diff tree.Diff) error { if diff.Type == tree.AddedDiff { - return editor.Put(ctx, val.Tuple(diff.Key), val.EmptyTuple) + return editor.Add(ctx, prolly.CommitClosureKey(diff.Key)) } return nil }) if err != nil && !errors.Is(err, io.EOF) { - return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: DiffMaps: %w", err) + return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: DiffCommitClosures: %w", err) } } // Add the parents themselves to the new map. - tb := val.NewTupleBuilder(commitKeyTupleDesc) for i := 0; i < len(parents); i++ { - tb.PutUint64(0, parents[i].Height()) - tb.PutAddress(1, parentAddrs[i]) - err = editor.Put(ctx, tb.Build(ns.Pool()), val.EmptyTuple) + err = editor.Add(ctx, prolly.NewCommitClosureKey(ns.Pool(), parents[i].Height(), parentAddrs[i])) if err != nil { - return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: MutableMap.Put: %w", err) + return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: MutableCommitClosure.Put: %w", err) } - tb.Recycle() } - // This puts the map in the NodeStore as well. - res, err := editor.Map(ctx) + // This puts the closure in the NodeStore as well. + res, err := editor.Flush(ctx) if err != nil { - return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: MutableMap.Map: %w", err) + return hash.Hash{}, fmt.Errorf("writeCommitParentClosure: MutableCommitClosure.Flush: %w", err) } return res.HashOf(), nil } diff --git a/go/store/prolly/commit_closure.go b/go/store/prolly/commit_closure.go new file mode 100644 index 0000000000..289790124a --- /dev/null +++ b/go/store/prolly/commit_closure.go @@ -0,0 +1,159 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prolly + +import ( + "bytes" + "context" + "encoding/binary" + + "github.com/dolthub/dolt/go/store/hash" + "github.com/dolthub/dolt/go/store/pool" + "github.com/dolthub/dolt/go/store/prolly/message" + "github.com/dolthub/dolt/go/store/prolly/tree" + "github.com/dolthub/dolt/go/store/types" +) + +type CommitClosureValue []byte + +type CommitClosure struct { + closure orderedTree[CommitClosureKey, CommitClosureValue, commitClosureKeyOrdering] +} + +type commitClosureKeyOrdering struct{} + +var _ ordering[CommitClosureKey] = commitClosureKeyOrdering{} + +func (o commitClosureKeyOrdering) Compare(left, right CommitClosureKey) int { + lh, rh := left.Height(), right.Height() + if lh == rh { + return bytes.Compare(left[8:], right[8:]) + } else if lh < rh { + return -1 + } + return 1 +} + +func NewEmptyCommitClosure(ns tree.NodeStore) CommitClosure { + serializer := message.CommitClosureSerializer{Pool: ns.Pool()} + msg := serializer.Serialize(nil, nil, nil, 0) + node := tree.NodeFromBytes(msg) + return NewCommitClosure(node, ns) +} + +func NewCommitClosure(node tree.Node, ns tree.NodeStore) CommitClosure { + return CommitClosure{ + closure: orderedTree[CommitClosureKey, CommitClosureValue, commitClosureKeyOrdering]{ + root: node, + ns: ns, + order: commitClosureKeyOrdering{}, + }, + } +} + +func (c CommitClosure) Count() int { + return c.closure.count() +} + +func (c CommitClosure) Height() int { + return c.closure.height() +} + +func (c CommitClosure) Node() tree.Node { + return c.closure.root +} + +func (c CommitClosure) HashOf() hash.Hash { + return c.closure.hashOf() +} + +func (c CommitClosure) Format() *types.NomsBinFormat { + return c.closure.ns.Format() +} + +func (c CommitClosure) Editor() CommitClosureEditor { + return CommitClosureEditor{ + closure: c.closure.mutate(), + } +} + +func (c CommitClosure) IterAllReverse(ctx context.Context) (CommitClosureIter, error) { + return c.closure.iterAllReverse(ctx) +} + +func DecodeCommitClosureKey(key []byte) (height uint64, addr hash.Hash) { + height = binary.LittleEndian.Uint64(key) + addr = hash.New(key[8:]) + return +} + +type CommitClosureEditor struct { + closure orderedMap[CommitClosureKey, CommitClosureValue, commitClosureKeyOrdering] +} + +type CommitClosureKey []byte + +type CommitClosureIter kvIter[CommitClosureKey, CommitClosureValue] + +func NewCommitClosureKey(p pool.BuffPool, height uint64, addr hash.Hash) CommitClosureKey { + r := p.Get(8 + 20) + binary.LittleEndian.PutUint64(r, height) + copy(r[8:], addr[:]) + return CommitClosureKey(r) +} + +func (k CommitClosureKey) Height() uint64 { + return binary.LittleEndian.Uint64(k) +} + +func (k CommitClosureKey) Addr() hash.Hash { + return hash.New(k[8:]) +} + +func (k CommitClosureKey) Less(other CommitClosureKey) bool { + return commitClosureKeyOrdering{}.Compare(k, other) < 0 +} + +var emptyCommitClosureValue CommitClosureValue = CommitClosureValue(make([]byte, 1)) + +func (wr CommitClosureEditor) Add(ctx context.Context, key CommitClosureKey) error { + return wr.closure.put(ctx, key, emptyCommitClosureValue) +} + +func (wr CommitClosureEditor) Delete(ctx context.Context, key CommitClosureKey) error { + return wr.closure.delete(ctx, key) +} + +func (wr CommitClosureEditor) Flush(ctx context.Context) (CommitClosure, error) { + tr := wr.closure.tree + serializer := message.CommitClosureSerializer{Pool: tr.ns.Pool()} + + root, err := tree.ApplyMutations(ctx, tr.ns, tr.root, serializer, wr.closure.mutations(), tr.compareItems) + if err != nil { + return CommitClosure{}, err + } + + return CommitClosure{ + closure: orderedTree[CommitClosureKey, CommitClosureValue, commitClosureKeyOrdering]{ + root: root, + ns: tr.ns, + order: tr.order, + }, + }, nil +} + +func DiffCommitClosures(ctx context.Context, from, to CommitClosure, cb DiffFn) error { + return diffOrderedTrees(ctx, from.closure, to.closure, cb) +} diff --git a/go/store/prolly/commit_closure_test.go b/go/store/prolly/commit_closure_test.go new file mode 100644 index 0000000000..bdc09d998c --- /dev/null +++ b/go/store/prolly/commit_closure_test.go @@ -0,0 +1,192 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package prolly + +import ( + "context" + "errors" + "fmt" + "io" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/dolthub/dolt/go/store/hash" + "github.com/dolthub/dolt/go/store/prolly/message" + "github.com/dolthub/dolt/go/store/prolly/tree" + "github.com/dolthub/dolt/go/store/types" +) + +func TestCommitClosure(t *testing.T) { + ns := tree.NewTestNodeStore() + ctx := context.Background() + + t.Run("Keys", func(t *testing.T) { + k0 := NewCommitClosureKey(ns.Pool(), 0, hash.Parse("00000000000000000000000000000000")) + assert.Equal(t, uint64(0), k0.Height()) + assert.True(t, k0.Addr().Equal(hash.Hash{})) + + k0_ := NewCommitClosureKey(ns.Pool(), 0, hash.Parse("00000000000000000000000000000000")) + assert.False(t, k0.Less(k0_)) + + h := hash.Parse("00000000000000000000000000000001") + k0_1 := NewCommitClosureKey(ns.Pool(), 0, h) + assert.True(t, k0_1.Addr().Equal(h)) + assert.True(t, k0.Less(k0_1)) + assert.False(t, k0_1.Less(k0_)) + + }) + + t.Run("Empty", func(t *testing.T) { + cc := NewEmptyCommitClosure(ns) + assert.NotNil(t, cc) + assert.Equal(t, 0, cc.Count()) + assert.Equal(t, 0, cc.closure.root.Count()) + assert.Equal(t, 1, cc.Height()) + + i, err := cc.IterAllReverse(ctx) + _, _, err = i.Next(ctx) + assert.Error(t, err) + assert.True(t, errors.Is(err, io.EOF)) + }) + + t.Run("Insert", func(t *testing.T) { + cc := NewEmptyCommitClosure(ns) + e := cc.Editor() + err := e.Add(ctx, NewCommitClosureKey(ns.Pool(), 0, hash.Parse("00000000000000000000000000000000"))) + assert.NoError(t, err) + err = e.Add(ctx, NewCommitClosureKey(ns.Pool(), 1, hash.Parse("00000000000000000000000000000000"))) + assert.NoError(t, err) + cc, err = e.Flush(ctx) + assert.NoError(t, err) + assert.Equal(t, 2, cc.Count()) + + i, err := cc.IterAllReverse(ctx) + assert.NoError(t, err) + k, _, err := i.Next(ctx) + assert.NoError(t, err) + assert.Equal(t, uint64(1), k.Height()) + k, _, err = i.Next(ctx) + assert.NoError(t, err) + assert.Equal(t, uint64(0), k.Height()) + _, _, err = i.Next(ctx) + assert.Error(t, err) + assert.True(t, errors.Is(err, io.EOF)) + + e = cc.Editor() + err = e.Add(ctx, NewCommitClosureKey(ns.Pool(), 0, hash.Parse("00000000000000000000000000000000"))) + assert.NoError(t, err) + err = e.Add(ctx, NewCommitClosureKey(ns.Pool(), 1, hash.Parse("00000000000000000000000000000000"))) + assert.NoError(t, err) + cc, err = e.Flush(ctx) + assert.NoError(t, err) + assert.Equal(t, 2, cc.Count()) + }) + + t.Run("Diff", func(t *testing.T) { + ccl := NewEmptyCommitClosure(ns) + e := ccl.Editor() + err := e.Add(ctx, NewCommitClosureKey(ns.Pool(), 0, hash.Parse("00000000000000000000000000000000"))) + assert.NoError(t, err) + err = e.Add(ctx, NewCommitClosureKey(ns.Pool(), 1, hash.Parse("00000000000000000000000000000000"))) + assert.NoError(t, err) + ccl, err = e.Flush(ctx) + assert.NoError(t, err) + assert.Equal(t, 2, ccl.Count()) + + ccr := NewEmptyCommitClosure(ns) + e = ccr.Editor() + err = e.Add(ctx, NewCommitClosureKey(ns.Pool(), 0, hash.Parse("00000000000000000000000000000000"))) + assert.NoError(t, err) + err = e.Add(ctx, NewCommitClosureKey(ns.Pool(), 1, hash.Parse("00000000000000000000000000000000"))) + assert.NoError(t, err) + err = e.Add(ctx, NewCommitClosureKey(ns.Pool(), 1, hash.Parse("00000000000000000000000000000001"))) + assert.NoError(t, err) + err = e.Add(ctx, NewCommitClosureKey(ns.Pool(), 2, hash.Parse("00000000000000000000000000000000"))) + assert.NoError(t, err) + ccr, err = e.Flush(ctx) + assert.NoError(t, err) + assert.Equal(t, 4, ccr.Count()) + + var numadds, numdels int + err = DiffCommitClosures(ctx, ccl, ccr, func(ctx context.Context, d tree.Diff) error { + if d.Type == tree.AddedDiff { + numadds++ + } else if d.Type == tree.RemovedDiff { + numdels++ + } + return nil + }) + assert.Error(t, err) + assert.True(t, errors.Is(err, io.EOF)) + assert.Equal(t, 2, numadds) + assert.Equal(t, 0, numdels) + }) + + t.Run("WalkAddresses", func(t *testing.T) { + cc := NewEmptyCommitClosure(ns) + e := cc.Editor() + for i := 0; i < 4096; i++ { + err := e.Add(ctx, NewCommitClosureKey(ns.Pool(), uint64(i), hash.Parse(fmt.Sprintf("%0.32d", i)))) + require.NoError(t, err) + } + cc, err := e.Flush(ctx) + require.NoError(t, err) + assert.Equal(t, 4096, cc.Count()) + + // Walk the addresses in the root. + msg := message.Message(tree.ValueFromNode(cc.closure.root).(types.TupleRowStorage)) + numaddresses := 0 + err = message.WalkAddresses(ctx, msg, func(ctx context.Context, addr hash.Hash) error { + numaddresses++ + return nil + }) + require.NoError(t, err) + assert.Less(t, numaddresses, 4096) + + // Walk all addresses in the tree. + numaddresses = 0 + err = tree.WalkAddresses(ctx, cc.closure.root, ns, func(ctx context.Context, addr hash.Hash) error { + numaddresses++ + return nil + }) + require.NoError(t, err) + assert.Less(t, 4096, numaddresses) + }) + + t.Run("WalkNodes", func(t *testing.T) { + cc := NewEmptyCommitClosure(ns) + e := cc.Editor() + for i := 0; i < 4096; i++ { + err := e.Add(ctx, NewCommitClosureKey(ns.Pool(), uint64(i), hash.Parse(fmt.Sprintf("%0.32d", i)))) + require.NoError(t, err) + } + cc, err := e.Flush(ctx) + require.NoError(t, err) + assert.Equal(t, 4096, cc.Count()) + + numnodes := 0 + totalentries := 0 + err = tree.WalkNodes(ctx, cc.closure.root, ns, func(ctx context.Context, node tree.Node) error { + numnodes++ + totalentries += node.Count() + return nil + }) + require.NoError(t, err) + assert.Less(t, cc.closure.root.Count(), numnodes) + assert.Less(t, 4096, totalentries) + }) +} diff --git a/go/store/prolly/message/commit_closure.go b/go/store/prolly/message/commit_closure.go new file mode 100644 index 0000000000..090cf98a9c --- /dev/null +++ b/go/store/prolly/message/commit_closure.go @@ -0,0 +1,168 @@ +// Copyright 2022 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package message + +import ( + "context" + "encoding/binary" + + fb "github.com/google/flatbuffers/go" + + "github.com/dolthub/dolt/go/gen/fb/serial" + "github.com/dolthub/dolt/go/store/hash" + "github.com/dolthub/dolt/go/store/pool" + "github.com/dolthub/dolt/go/store/val" +) + +var commitClosureKeyOffsets []byte +var commitClosureValueOffsets []byte +var commitClosureEmptyValueBytes []byte + +func init() { + commitClosureKeyOffsets = make([]byte, (maxChunkSz/commitClosureKeyLength)*uint16Size) + commitClosureValueOffsets = make([]byte, (maxChunkSz/commitClosureKeyLength)*uint16Size) + commitClosureEmptyValueBytes = make([]byte, 0) + + buf := commitClosureKeyOffsets + off := uint16(commitClosureKeyLength) + for len(buf) > 0 { + binary.LittleEndian.PutUint16(buf, off) + buf = buf[uint16Size:] + off += uint16(commitClosureKeyLength) + } +} + +func offsetsForCommitClosureKeys(buf []byte) []byte { + cnt := len(buf) / commitClosureKeyLength + return commitClosureKeyOffsets[:cnt*uint16Size] +} + +func getCommitClosureKeys(msg Message) val.SlicedBuffer { + var ret val.SlicedBuffer + m := serial.GetRootAsCommitClosure(msg, messagePrefixSz) + ret.Buf = m.KeyItemsBytes() + ret.Offs = offsetsForCommitClosureKeys(ret.Buf) + return ret +} + +func getCommitClosureValues(msg Message) val.SlicedBuffer { + var ret val.SlicedBuffer + m := serial.GetRootAsCommitClosure(msg, messagePrefixSz) + if m.AddressArrayLength() == 0 { + ret.Buf = commitClosureEmptyValueBytes + ret.Offs = commitClosureValueOffsets[:getCommitClosureCount(msg)*uint16Size] + return ret + } + ret.Buf = m.AddressArrayBytes() + ret.Offs = offsetsForAddressArray(ret.Buf) + return ret +} + +// uint64 + hash. +const commitClosureKeyLength = 8 + 20 + +func getCommitClosureCount(msg Message) uint16 { + m := serial.GetRootAsCommitClosure(msg, messagePrefixSz) + return uint16(m.KeyItemsLength() / commitClosureKeyLength) +} + +func getCommitClosureTreeLevel(msg Message) int { + m := serial.GetRootAsCommitClosure(msg, messagePrefixSz) + return int(m.TreeLevel()) +} + +func getCommitClosureTreeCount(msg Message) int { + m := serial.GetRootAsCommitClosure(msg, messagePrefixSz) + return int(m.TreeCount()) +} + +func getCommitClosureSubtrees(msg Message) []uint64 { + counts := make([]uint64, getCommitClosureCount(msg)) + m := serial.GetRootAsCommitClosure(msg, messagePrefixSz) + return decodeVarints(m.SubtreeCountsBytes(), counts) +} + +func walkCommitClosureAddresses(ctx context.Context, msg Message, cb func(ctx context.Context, addr hash.Hash) error) error { + m := serial.GetRootAsCommitClosure(msg, messagePrefixSz) + arr := m.AddressArrayBytes() + for i := 0; i < len(arr)/hash.ByteLen; i++ { + addr := hash.New(arr[i*addrSize : (i+1)*addrSize]) + if err := cb(ctx, addr); err != nil { + return err + } + } + if m.TreeLevel() == 0 { + // If Level() == 0, walk addresses in keys. + keybytes := m.KeyItemsBytes() + for i := 8; i < len(keybytes); i += commitClosureKeyLength { + addr := hash.New(keybytes[i : i+addrSize]) + if err := cb(ctx, addr); err != nil { + return err + } + } + } + return nil +} + +var commitClosureFileID = []byte(serial.CommitClosureFileID) + +type CommitClosureSerializer struct { + Pool pool.BuffPool +} + +var _ Serializer = CommitClosureSerializer{} + +func (s CommitClosureSerializer) Serialize(keys, addrs [][]byte, subtrees []uint64, level int) Message { + var keyArr, addrArr, cardArr fb.UOffsetT + + keySz, addrSz, totalSz := estimateCommitClosureSize(keys, addrs, subtrees) + b := getFlatbufferBuilder(s.Pool, totalSz) + + // keys + keyArr = writeItemBytes(b, keys, keySz) + + if level > 0 { + // addresses + addrArr = writeItemBytes(b, addrs, addrSz) + + // subtree cardinalities + cardArr = writeCountArray(b, subtrees) + } + + serial.CommitClosureStart(b) + serial.CommitClosureAddKeyItems(b, keyArr) + + if level > 0 { + serial.CommitClosureAddAddressArray(b, addrArr) + serial.CommitClosureAddSubtreeCounts(b, cardArr) + serial.CommitClosureAddTreeCount(b, sumSubtrees(subtrees)) + } else { + serial.CommitClosureAddTreeCount(b, uint64(len(keys))) + } + serial.CommitClosureAddTreeLevel(b, uint8(level)) + + return finishMessage(b, serial.CommitClosureEnd(b), commitClosureFileID) +} + +func estimateCommitClosureSize(keys, addresses [][]byte, subtrees []uint64) (keySz, addrSz, totalSz int) { + keySz = commitClosureKeyLength * len(keys) + addrSz = addrSize * len(addresses) + totalSz += keySz + addrSz + totalSz += len(subtrees) * binary.MaxVarintLen64 + totalSz += 8 + 1 + 1 + 1 + totalSz += 72 + totalSz += messagePrefixSz + return +} diff --git a/go/store/prolly/message/message.go b/go/store/prolly/message/message.go index 5e9d11ed11..dc2e2e3e61 100644 --- a/go/store/prolly/message/message.go +++ b/go/store/prolly/message/message.go @@ -45,6 +45,12 @@ func GetKeysAndValues(msg Message) (keys, values val.SlicedBuffer, cnt uint16) { cnt = getAddressMapCount(msg) return } + if id == serial.CommitClosureFileID { + keys = getCommitClosureKeys(msg) + values = getCommitClosureValues(msg) + cnt = getCommitClosureCount(msg) + return + } panic(fmt.Sprintf("unknown message id %s", id)) } @@ -56,6 +62,8 @@ func WalkAddresses(ctx context.Context, msg Message, cb func(ctx context.Context return walkProllyMapAddresses(ctx, msg, cb) case serial.AddressMapFileID: return walkAddressMapAddresses(ctx, msg, cb) + case serial.CommitClosureFileID: + return walkCommitClosureAddresses(ctx, msg, cb) default: panic(fmt.Sprintf("unknown message id %s", id)) } @@ -68,6 +76,8 @@ func GetTreeLevel(msg Message) int { return getProllyMapTreeLevel(msg) case serial.AddressMapFileID: return getAddressMapTreeLevel(msg) + case serial.CommitClosureFileID: + return getCommitClosureTreeLevel(msg) default: panic(fmt.Sprintf("unknown message id %s", id)) } @@ -80,6 +90,8 @@ func GetTreeCount(msg Message) int { return getProllyMapTreeCount(msg) case serial.AddressMapFileID: return getAddressMapTreeCount(msg) + case serial.CommitClosureFileID: + return getCommitClosureTreeCount(msg) default: panic(fmt.Sprintf("unknown message id %s", id)) } @@ -92,6 +104,8 @@ func GetSubtrees(msg Message) []uint64 { return getProllyMapSubtrees(msg) case serial.AddressMapFileID: return getAddressMapSubtrees(msg) + case serial.CommitClosureFileID: + return getCommitClosureSubtrees(msg) default: panic(fmt.Sprintf("unknown message id %s", id)) } diff --git a/go/store/prolly/tree/node.go b/go/store/prolly/tree/node.go index 1baf1dbeca..2572ec3110 100644 --- a/go/store/prolly/tree/node.go +++ b/go/store/prolly/tree/node.go @@ -43,6 +43,10 @@ func WalkAddresses(ctx context.Context, nd Node, ns NodeStore, cb AddressCb) err return err } + if nd.IsLeaf() { + return nil + } + child, err := ns.Read(ctx, addr) if err != nil { return err @@ -59,6 +63,10 @@ func WalkNodes(ctx context.Context, nd Node, ns NodeStore, cb NodeCb) error { return err } + if nd.IsLeaf() { + return nil + } + return walkAddresses(ctx, nd, func(ctx context.Context, addr hash.Hash) error { child, err := ns.Read(ctx, addr) if err != nil { diff --git a/go/store/prolly/tree/node_cursor.go b/go/store/prolly/tree/node_cursor.go index 403bffc999..d510503a3c 100644 --- a/go/store/prolly/tree/node_cursor.go +++ b/go/store/prolly/tree/node_cursor.go @@ -241,7 +241,7 @@ func (cur *Cursor) firstKey() Item { } func (cur *Cursor) lastKey() Item { - lastKeyIdx := int(cur.nd.count - 1) + lastKeyIdx := int(cur.nd.count) - 1 return cur.nd.GetKey(lastKeyIdx) } @@ -250,7 +250,7 @@ func (cur *Cursor) skipToNodeStart() { } func (cur *Cursor) skipToNodeEnd() { - lastKeyIdx := int(cur.nd.count - 1) + lastKeyIdx := int(cur.nd.count) - 1 cur.idx = lastKeyIdx } @@ -258,7 +258,7 @@ func (cur *Cursor) keepInBounds() { if cur.idx < 0 { cur.skipToNodeStart() } - lastKeyIdx := int(cur.nd.count - 1) + lastKeyIdx := int(cur.nd.count) - 1 if cur.idx > lastKeyIdx { cur.skipToNodeEnd() } @@ -271,7 +271,7 @@ func (cur *Cursor) atNodeStart() bool { // atNodeEnd returns true if the cursor's current |idx| // points to the last node item func (cur *Cursor) atNodeEnd() bool { - lastKeyIdx := int(cur.nd.count - 1) + lastKeyIdx := int(cur.nd.count) - 1 return cur.idx == lastKeyIdx }