created dedicated flatbuffer message for blobs

This commit is contained in:
Andy Arthur
2022-07-26 13:14:16 -07:00
parent 753e2bcfc2
commit e99fca7e7a
10 changed files with 398 additions and 5 deletions

205
go/gen/fb/serial/blob.go Normal file
View File

@@ -0,0 +1,205 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Code generated by the FlatBuffers compiler. DO NOT EDIT.
package serial
import (
flatbuffers "github.com/google/flatbuffers/go"
)
type Blob struct {
_tab flatbuffers.Table
}
func GetRootAsBlob(buf []byte, offset flatbuffers.UOffsetT) *Blob {
n := flatbuffers.GetUOffsetT(buf[offset:])
x := &Blob{}
x.Init(buf, n+offset)
return x
}
func GetSizePrefixedRootAsBlob(buf []byte, offset flatbuffers.UOffsetT) *Blob {
n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
x := &Blob{}
x.Init(buf, n+offset+flatbuffers.SizeUint32)
return x
}
func (rcv *Blob) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Bytes = buf
rcv._tab.Pos = i
}
func (rcv *Blob) Table() flatbuffers.Table {
return rcv._tab
}
func (rcv *Blob) Payload(j int) byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1))
}
return 0
}
func (rcv *Blob) PayloadLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *Blob) PayloadBytes() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *Blob) MutatePayload(j int, n byte) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n)
}
return false
}
func (rcv *Blob) AddressArray(j int) byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1))
}
return 0
}
func (rcv *Blob) AddressArrayLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *Blob) AddressArrayBytes() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *Blob) MutateAddressArray(j int, n byte) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n)
}
return false
}
func (rcv *Blob) SubtreeSizes(j int) byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1))
}
return 0
}
func (rcv *Blob) SubtreeSizesLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *Blob) SubtreeSizesBytes() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *Blob) MutateSubtreeSizes(j int, n byte) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n)
}
return false
}
func (rcv *Blob) TreeSize() uint64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
return rcv._tab.GetUint64(o + rcv._tab.Pos)
}
return 0
}
func (rcv *Blob) MutateTreeSize(n uint64) bool {
return rcv._tab.MutateUint64Slot(10, n)
}
func (rcv *Blob) TreeLevel() byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(12))
if o != 0 {
return rcv._tab.GetByte(o + rcv._tab.Pos)
}
return 0
}
func (rcv *Blob) MutateTreeLevel(n byte) bool {
return rcv._tab.MutateByteSlot(12, n)
}
func BlobStart(builder *flatbuffers.Builder) {
builder.StartObject(5)
}
func BlobAddPayload(builder *flatbuffers.Builder, payload flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(0, flatbuffers.UOffsetT(payload), 0)
}
func BlobStartPayloadVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(1, numElems, 1)
}
func BlobAddAddressArray(builder *flatbuffers.Builder, addressArray flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(addressArray), 0)
}
func BlobStartAddressArrayVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(1, numElems, 1)
}
func BlobAddSubtreeSizes(builder *flatbuffers.Builder, subtreeSizes flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(subtreeSizes), 0)
}
func BlobStartSubtreeSizesVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(1, numElems, 1)
}
func BlobAddTreeSize(builder *flatbuffers.Builder, treeSize uint64) {
builder.PrependUint64Slot(3, treeSize, 0)
}
func BlobAddTreeLevel(builder *flatbuffers.Builder, treeLevel byte) {
builder.PrependByteSlot(4, treeLevel, 0)
}
func BlobEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
}

View File

@@ -35,6 +35,7 @@ const CommitClosureFileID = "CMCL"
const TableSchemaFileID = "DSCH"
const ForeignKeyCollectionFileID = "DFKC"
const MergeArtifactsFileID = "ARTM"
const BlobFileID = "BLOB"
const MessageTypesKind int = 27

33
go/serial/blob.fbs Normal file
View File

@@ -0,0 +1,33 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace serial;
table Blob {
// leaf node payload
payload:[ubyte];
// array of subtree addresses for internal tree nodes
address_array:[ubyte];
// array of uvarint encoded subtree sizes
subtree_sizes:[ubyte];
tree_size:uint64;
tree_level:uint8;
}
// KEEP THIS IN SYNC WITH fileidentifiers.go
file_identifier "BLOB";
root_type Blob;

View File

@@ -35,6 +35,7 @@ const CommitClosureFileID = "CMCL"
const TableSchemaFileID = "DSCH"
const ForeignKeyCollectionFileID = "DFKC"
const MergeArtifactsFileID = "ARTM"
const BlobFileID = "BLOB"
const MessageTypesKind int = 27

View File

@@ -13,6 +13,7 @@ fi
# generate golang (de)serialization package
flatc -o $GEN_DIR --gen-onefile --filename-suffix "" --gen-mutable --go-namespace "serial" --go \
addressmap.fbs \
blob.fbs \
commit.fbs \
commitclosure.fbs \
encoding.fbs \

View File

@@ -0,0 +1,139 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package message
import (
"context"
"encoding/binary"
"github.com/dolthub/dolt/go/gen/fb/serial"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/pool"
"github.com/dolthub/dolt/go/store/val"
)
var blobFileID = []byte(serial.BlobFileID)
type BlobSerializer struct {
Pool pool.BuffPool
}
var _ Serializer = BlobSerializer{}
func (s BlobSerializer) Serialize(keys, values [][]byte, subtrees []uint64, level int) serial.Message {
bufSz := estimateBlobSize(values, subtrees)
b := getFlatbufferBuilder(s.Pool, bufSz)
if level == 0 {
assertTrue(len(values) == 1)
assertTrue(len(subtrees) == 1)
payload := b.CreateByteVector(values[0])
serial.BlobStart(b)
serial.BlobAddPayload(b, payload)
} else {
addrs := writeItemBytes(b, values, len(values)*hash.ByteLen)
cards := writeCountArray(b, subtrees)
serial.BlobStart(b)
serial.BlobAddAddressArray(b, addrs)
serial.BlobAddSubtreeSizes(b, cards)
}
serial.BlobAddTreeSize(b, sumSubtrees(subtrees))
serial.BlobAddTreeLevel(b, uint8(level))
return serial.FinishMessage(b, serial.BlobEnd(b), blobFileID)
}
func getBlobKeys(msg serial.Message) val.SlicedBuffer {
cnt := getBlobCount(msg)
buf := make([]byte, cnt)
for i := range buf {
buf[i] = 0
}
offs := make([]byte, cnt*2)
for i := 0; i < int(cnt); i++ {
b := offs[i*2 : (i+1)*2]
binary.LittleEndian.PutUint16(b, uint16(i))
}
return val.SlicedBuffer{
Buf: buf,
Offs: offs,
}
}
func getBlobValues(msg serial.Message) val.SlicedBuffer {
b := serial.GetRootAsBlob(msg, serial.MessagePrefixSz)
if b.TreeLevel() > 0 {
arr := b.AddressArrayBytes()
off := offsetsForAddressArray(arr)
return val.SlicedBuffer{
Buf: arr,
Offs: off,
}
}
return val.SlicedBuffer{
Buf: b.PayloadBytes(),
Offs: []byte{},
}
}
func getBlobCount(msg serial.Message) uint16 {
b := serial.GetRootAsBlob(msg, serial.MessagePrefixSz)
if b.TreeLevel() == 0 {
return 1
}
return uint16(b.AddressArrayLength() / hash.ByteLen)
}
func walkBlobAddresses(ctx context.Context, msg serial.Message, cb func(ctx context.Context, addr hash.Hash) error) error {
b := serial.GetRootAsBlob(msg, serial.MessagePrefixSz)
arr := b.AddressArrayBytes()
for i := 0; i < len(arr)/hash.ByteLen; i++ {
addr := hash.New(arr[i*addrSize : (i+1)*addrSize])
if err := cb(ctx, addr); err != nil {
return err
}
}
return nil
}
func getBlobTreeLevel(msg serial.Message) int {
b := serial.GetRootAsBlob(msg, serial.MessagePrefixSz)
return int(b.TreeLevel())
}
func getBlobTreeCount(msg serial.Message) int {
b := serial.GetRootAsBlob(msg, serial.MessagePrefixSz)
return int(b.TreeSize())
}
func getBlobSubtrees(msg serial.Message) []uint64 {
b := serial.GetRootAsBlob(msg, serial.MessagePrefixSz)
if b.TreeLevel() == 0 {
return nil
}
counts := make([]uint64, b.AddressArrayLength()/hash.ByteLen)
return decodeVarints(b.SubtreeSizesBytes(), counts)
}
func estimateBlobSize(values [][]byte, subtrees []uint64) (bufSz int) {
for i := range values {
bufSz += len(values[i])
}
bufSz += len(subtrees) * binary.MaxVarintLen64
bufSz += 200 // overhead
return
}

View File

@@ -51,6 +51,12 @@ func GetKeysAndValues(msg serial.Message) (keys, values val.SlicedBuffer, cnt ui
cnt = getCommitClosureCount(msg)
return
}
if id == serial.BlobFileID {
keys = getBlobKeys(msg)
values = getBlobValues(msg)
cnt = getBlobCount(msg)
return
}
panic(fmt.Sprintf("unknown message id %s", id))
}
@@ -66,6 +72,8 @@ func WalkAddresses(ctx context.Context, msg serial.Message, cb func(ctx context.
return walkMergeArtifactAddresses(ctx, msg, cb)
case serial.CommitClosureFileID:
return walkCommitClosureAddresses(ctx, msg, cb)
case serial.BlobFileID:
return walkBlobAddresses(ctx, msg, cb)
default:
panic(fmt.Sprintf("unknown message id %s", id))
}
@@ -82,6 +90,8 @@ func GetTreeLevel(msg serial.Message) int {
return getMergeArtifactTreeLevel(msg)
case serial.CommitClosureFileID:
return getCommitClosureTreeLevel(msg)
case serial.BlobFileID:
return getBlobTreeLevel(msg)
default:
panic(fmt.Sprintf("unknown message id %s", id))
}
@@ -98,6 +108,8 @@ func GetTreeCount(msg serial.Message) int {
return getMergeArtifactTreeCount(msg)
case serial.CommitClosureFileID:
return getCommitClosureTreeCount(msg)
case serial.BlobFileID:
return getBlobTreeCount(msg)
default:
panic(fmt.Sprintf("unknown message id %s", id))
}
@@ -114,6 +126,8 @@ func GetSubtrees(msg serial.Message) []uint64 {
return getMergeArtifactSubtrees(msg)
case serial.CommitClosureFileID:
return getCommitClosureSubtrees(msg)
case serial.BlobFileID:
return getBlobSubtrees(msg)
default:
panic(fmt.Sprintf("unknown message id %s", id))
}

View File

@@ -196,7 +196,7 @@ func estimateProllyMapSize(keys, values [][]byte, subtrees []uint64, valAddrsCnt
keySz += len(keys[i])
valSz += len(values[i])
}
refCntSz := len(subtrees) * binary.MaxVarintLen64
subtreesSz := len(subtrees) * binary.MaxVarintLen64
// constraints enforced upstream
if keySz > int(MaxVectorOffset) {
@@ -208,7 +208,7 @@ func estimateProllyMapSize(keys, values [][]byte, subtrees []uint64, valAddrsCnt
// todo(andy): better estimates
bufSz += keySz + valSz // tuples
bufSz += refCntSz // subtree counts
bufSz += subtreesSz // subtree counts
bufSz += len(keys)*2 + len(values)*2 // offsets
bufSz += 8 + 1 + 1 + 1 // metadata
bufSz += 72 // vtable (approx)

View File

@@ -25,7 +25,6 @@ import (
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/prolly/message"
"github.com/dolthub/dolt/go/store/val"
)
const DefaultFixedChunkLength = 4000
@@ -243,7 +242,7 @@ type ImmutableTree struct {
}
func NewImmutableTreeFromReader(ctx context.Context, r io.Reader, ns NodeStore, chunkSize int) (*ImmutableTree, error) {
s := message.ProllyMapSerializer{Pool: ns.Pool(), ValDesc: val.TupleDesc{}}
s := message.BlobSerializer{Pool: ns.Pool()}
root, err := buildImmutableTree(ctx, r, ns, s, chunkSize)
if errors.Is(err, io.EOF) {
return &ImmutableTree{Addr: hash.Hash{}}, nil

View File

@@ -138,7 +138,7 @@ func TestWriteImmutableTree(t *testing.T) {
ctx := context.Background()
r := bytes.NewReader(buf)
ns := NewTestNodeStore()
serializer := message.ProllyMapSerializer{Pool: ns.Pool()}
serializer := message.BlobSerializer{Pool: ns.Pool()}
root, err := buildImmutableTree(ctx, r, ns, serializer, tt.chunkSize)
if tt.err != nil {
require.True(t, errors.Is(err, tt.err))