added pkg message to abstract flatbuffers message serialization

This commit is contained in:
Andy Arthur
2022-04-29 16:27:05 -07:00
parent 63e666a2ba
commit 4598ec5173
5 changed files with 285 additions and 56 deletions

View File

@@ -0,0 +1,49 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package message
import (
"encoding/binary"
"math"
"github.com/dolthub/dolt/go/store/hash"
)
const (
maxChunkSz = math.MaxUint16
addrSz = hash.ByteLen
offsetCount = maxChunkSz / addrSz
uint16Size = 2
)
var addressOffsets []byte
func init() {
addressOffsets = make([]byte, offsetCount*uint16Size)
buf := addressOffsets
off := uint16(addrSz)
for len(buf) > 0 {
binary.LittleEndian.PutUint16(buf, off)
buf = buf[uint16Size:]
off += uint16(addrSz)
}
}
func offsetsForAddressArray(arr []byte) (offs []byte) {
cnt := len(arr) / addrSz
offs = addressOffsets[:cnt*uint16Size]
return
}

View File

@@ -0,0 +1,102 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package message
import (
"context"
"fmt"
"github.com/dolthub/dolt/go/gen/fb/serial"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/val"
)
type Message []byte
func GetKeys(msg Message) val.SlicedBuffer {
id := serial.GetFileID(msg)
switch id {
case serial.ProllyTreeNodeFileID:
return getProllyMapKeys(msg)
default:
panic(fmt.Sprintf("unknown message id %s", id))
}
}
func GetValues(msg Message) val.SlicedBuffer {
id := serial.GetFileID(msg)
switch id {
case serial.ProllyTreeNodeFileID:
return getProllyMapValues(msg)
default:
panic(fmt.Sprintf("unknown message id %s", id))
}
}
func WalkAddresses(ctx context.Context, msg Message, cb func(ctx context.Context, addr hash.Hash) error) error {
id := serial.GetFileID(msg)
switch id {
case serial.ProllyTreeNodeFileID:
return walkProllyMapAddresses(ctx, msg, cb)
default:
panic(fmt.Sprintf("unknown message id %s", id))
}
}
func GetCount(msg Message) uint16 {
id := serial.GetFileID(msg)
switch id {
case serial.ProllyTreeNodeFileID:
return getProllyMapCount(msg)
default:
panic(fmt.Sprintf("unknown message id %s", id))
}
}
func GetTreeLevel(msg Message) int {
id := serial.GetFileID(msg)
switch id {
case serial.ProllyTreeNodeFileID:
return getProllyMapTreeLevel(msg)
default:
panic(fmt.Sprintf("unknown message id %s", id))
}
}
func GetTreeCount(msg Message) int {
id := serial.GetFileID(msg)
switch id {
case serial.ProllyTreeNodeFileID:
return getProllyMapTreeCount(msg)
default:
panic(fmt.Sprintf("unknown message id %s", id))
}
}
func GetSubtrees(msg Message) []byte {
id := serial.GetFileID(msg)
switch id {
case serial.ProllyTreeNodeFileID:
return getProllyMapSubtrees(msg)
default:
panic(fmt.Sprintf("unknown message id %s", id))
}
}
func assertFalse(b bool) {
if b {
panic("assertion failed")
}
}

View File

@@ -0,0 +1,120 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package message
import (
"context"
fb "github.com/google/flatbuffers/go"
"github.com/dolthub/dolt/go/gen/fb/serial"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/val"
)
const (
// These constants are mirrored from serial.ProllyTreeNode.KeyOffsetsLength()
// and serial.ProllyTreeNode.ValueOffsetsLength() respectively.
// They are only as stable as the flatbuffers schemas that define them.
keyOffsetsVOffset = 6
valueOffsetsVOffset = 12
)
func getProllyMapKeys(msg Message) (keys val.SlicedBuffer) {
pm := serial.GetRootAsProllyTreeNode(msg, 0)
keys.Buf = pm.KeyItemsBytes()
keys.Offs = getKeyOffsetsVector(pm)
return
}
func getProllyMapValues(msg Message) (values val.SlicedBuffer) {
pm := serial.GetRootAsProllyTreeNode(msg, 0)
items := pm.ValueItemsBytes()
if items != nil {
values.Buf = items
values.Offs = getValueOffsetsVector(pm)
} else {
values.Buf = pm.AddressArrayBytes()
values.Offs = offsetsForAddressArray(values.Buf)
}
return
}
func walkProllyMapAddresses(ctx context.Context, msg Message, cb func(ctx context.Context, addr hash.Hash) error) error {
pm := serial.GetRootAsProllyTreeNode(msg, 0)
arr := pm.AddressArrayBytes()
for i := 0; i < len(arr)/hash.ByteLen; i++ {
addr := hash.New(arr[i*addrSz : (i+1)*addrSz])
if err := cb(ctx, addr); err != nil {
return err
}
}
cnt := pm.ValueAddressOffsetsLength()
arr2 := pm.ValueItemsBytes()
for i := 0; i < cnt; i++ {
o := pm.ValueAddressOffsets(i)
addr := hash.New(arr[o : o+addrSz])
if err := cb(ctx, addr); err != nil {
return err
}
}
assertFalse((arr != nil) && (arr2 != nil))
return nil
}
func getProllyMapCount(msg Message) uint16 {
pm := serial.GetRootAsProllyTreeNode(msg, 0)
if pm.KeyItemsLength() == 0 {
return 0
}
// zeroth offset ommitted from array
return uint16(pm.KeyOffsetsLength() + 1)
}
func getProllyMapTreeLevel(msg Message) int {
pm := serial.GetRootAsProllyTreeNode(msg, 0)
return int(pm.TreeLevel())
}
func getProllyMapTreeCount(msg Message) int {
pm := serial.GetRootAsProllyTreeNode(msg, 0)
return int(pm.TreeCount())
}
func getProllyMapSubtrees(msg Message) []byte {
pm := serial.GetRootAsProllyTreeNode(msg, 0)
return pm.SubtreeCountsBytes()
}
func getKeyOffsetsVector(pm *serial.ProllyTreeNode) []byte {
sz := pm.KeyOffsetsLength() * 2
tab := pm.Table()
vec := tab.Offset(keyOffsetsVOffset)
start := int(tab.Vector(fb.UOffsetT(vec)))
stop := start + sz
return tab.Bytes[start:stop]
}
func getValueOffsetsVector(pm *serial.ProllyTreeNode) []byte {
sz := pm.ValueOffsetsLength() * 2
tab := pm.Table()
vec := tab.Offset(valueOffsetsVOffset)
start := int(tab.Vector(fb.UOffsetT(vec)))
stop := start + sz
return tab.Bytes[start:stop]
}

View File

@@ -24,6 +24,7 @@ import (
"github.com/dolthub/dolt/go/gen/fb/serial"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/prolly/message"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/val"
)
@@ -45,8 +46,8 @@ type Node struct {
// keys and values contain sub-slices of |msg|,
// allowing faster lookups by avoiding the vtable
keys, values val.SlicedBuffer
msg serial.ProllyTreeNode
count uint16
msg message.Message
}
type AddressCb func(ctx context.Context, addr hash.Hash) error
@@ -82,30 +83,11 @@ func WalkNodes(ctx context.Context, nd Node, ns NodeStore, cb NodeCb) error {
})
}
func NodeFromBytes(buf []byte) Node {
msg := serial.GetRootAsProllyTreeNode(buf, 0)
return nodeFromFlatbuffer(*msg)
}
func nodeFromFlatbuffer(msg serial.ProllyTreeNode) Node {
keys := val.SlicedBuffer{
Buf: msg.KeyItemsBytes(),
Offs: getKeyOffsetsVector(msg),
}
values := val.SlicedBuffer{
Buf: msg.ValueItemsBytes(),
Offs: getValueOffsetsVector(msg),
}
count := msg.KeyOffsetsLength() + 1
if len(keys.Buf) == 0 {
count = 0
}
func NodeFromBytes(msg []byte) Node {
return Node{
keys: keys,
values: values,
count: uint16(count),
keys: message.GetKeys(msg),
values: message.GetValues(msg),
count: message.GetCount(msg),
msg: msg,
}
}
@@ -119,7 +101,7 @@ func (nd Node) Count() int {
}
func (nd Node) TreeCount() int {
return int(nd.msg.TreeCount())
return message.GetTreeCount(nd.msg)
}
func (nd Node) Size() int {
@@ -128,7 +110,7 @@ func (nd Node) Size() int {
// Level returns the tree Level for this node
func (nd Node) Level() int {
return int(nd.msg.TreeLevel())
return message.GetTreeLevel(nd.msg)
}
// IsLeaf returns whether this node is a leaf
@@ -143,25 +125,17 @@ func (nd Node) GetKey(i int) Item {
// getValue returns the |ith| value of this node.
func (nd Node) getValue(i int) Item {
// todo(andy): abstract value access
if nd.values.Buf != nil {
return nd.values.GetSlice(i)
} else {
r := nd.getAddress(i)
return r[:]
}
return nd.values.GetSlice(i)
}
// getAddress returns the |ith| address of this node.
// This method assumes values are 20-byte address hashes.
func (nd Node) getAddress(i int) hash.Hash {
refs := nd.msg.AddressArrayBytes()
start, stop := i*addrSz, (i+1)*addrSz
return hash.New(refs[start:stop])
return hash.New(nd.getValue(i))
}
func (nd Node) getSubtreeCounts() SubtreeCounts {
arr := nd.msg.SubtreeCountsBytes()
arr := message.GetSubtrees(nd.msg)
return readSubtreeCounts(int(nd.count), arr)
}
@@ -170,27 +144,11 @@ func (nd Node) empty() bool {
}
func (nd Node) bytes() []byte {
return nd.msg.Table().Bytes
return nd.msg
}
func walkAddresses(ctx context.Context, nd Node, cb AddressCb) (err error) {
arr := nd.msg.AddressArrayBytes()
cnt := len(arr) / addrSz
for i := 0; i < cnt; i++ {
if err = cb(ctx, nd.getAddress(i)); err != nil {
return err
}
}
cnt2 := nd.msg.ValueAddressOffsetsLength()
for i := 0; i < cnt2; i++ {
o := nd.msg.ValueAddressOffsets(i)
addr := hash.New(nd.values.Buf[o : o+addrSz])
if err = cb(ctx, addr); err != nil {
return err
}
}
return
return message.WalkAddresses(ctx, nd.msg, cb)
}
func getKeyOffsetsVector(msg serial.ProllyTreeNode) []byte {

View File

@@ -64,7 +64,7 @@ func TestRoundTripNodeItems(t *testing.T) {
func TestNodeSize(t *testing.T) {
sz := unsafe.Sizeof(Node{})
assert.Equal(t, 136, int(sz))
assert.Equal(t, 128, int(sz))
}
func TestCountArray(t *testing.T) {