mirror of
https://github.com/dolthub/dolt.git
synced 2026-03-01 10:09:41 -06:00
go/store/prolly/message: Move MessagePrefixSz, FinishMessage to serial package.
This commit is contained in:
@@ -34,7 +34,6 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/utils/filesys"
|
||||
"github.com/dolthub/dolt/go/store/chunks"
|
||||
"github.com/dolthub/dolt/go/store/nbs"
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/prolly/tree"
|
||||
"github.com/dolthub/dolt/go/store/spec"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
@@ -152,8 +151,8 @@ func (cmd RootsCmd) processTableFile(ctx context.Context, path string, modified
|
||||
cli.Println()
|
||||
}
|
||||
} else if sm, ok := value.(types.SerialMessage); ok {
|
||||
if serial.GetFileID(sm[message.MessagePrefixSz:]) == serial.StoreRootFileID {
|
||||
msg := serial.GetRootAsStoreRoot([]byte(sm), message.MessagePrefixSz)
|
||||
if serial.GetFileID(sm) == serial.StoreRootFileID {
|
||||
msg := serial.GetRootAsStoreRoot([]byte(sm), serial.MessagePrefixSz)
|
||||
ambytes := msg.AddressMapBytes()
|
||||
node := tree.NodeFromBytes(ambytes)
|
||||
err := tree.OutputAddressMapNode(cli.OutStream, node)
|
||||
|
||||
@@ -15,7 +15,10 @@
|
||||
package serial
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"unsafe"
|
||||
|
||||
fb "github.com/google/flatbuffers/go"
|
||||
)
|
||||
|
||||
// KEEP THESE IN SYNC WITH .fbs FILES!
|
||||
@@ -32,11 +35,44 @@ const CommitClosureFileID = "CMCL"
|
||||
const TableSchemaFileID = "DSCH"
|
||||
const ForeignKeyCollectionFileID = "DFKC"
|
||||
|
||||
const MessageTypesKind int = 27
|
||||
|
||||
const MessagePrefixSz = 4
|
||||
|
||||
type Message []byte
|
||||
|
||||
func GetFileID(bs []byte) string {
|
||||
if len(bs) < 8 {
|
||||
if len(bs) < 8+MessagePrefixSz {
|
||||
return ""
|
||||
}
|
||||
return byteSliceToString(bs[4:8])
|
||||
return byteSliceToString(bs[MessagePrefixSz+4 : MessagePrefixSz+8])
|
||||
}
|
||||
|
||||
func FinishMessage(b *fb.Builder, off fb.UOffsetT, fileID []byte) Message {
|
||||
// We finish the buffer by prefixing it with:
|
||||
// 1) 1 byte NomsKind == SerialMessage.
|
||||
// 2) big endian 3 byte uint representing the size of the message, not
|
||||
// including the kind or size prefix bytes.
|
||||
//
|
||||
// This allows chunks we serialize here to be read by types binary
|
||||
// codec.
|
||||
//
|
||||
// All accessors in this package expect this prefix to be on the front
|
||||
// of the message bytes as well. See |MessagePrefixSz|.
|
||||
|
||||
b.Prep(1, fb.SizeInt32+4+MessagePrefixSz)
|
||||
b.FinishWithFileIdentifier(off, fileID)
|
||||
|
||||
var size [4]byte
|
||||
binary.BigEndian.PutUint32(size[:], uint32(len(b.Bytes)-int(b.Head())))
|
||||
if size[0] != 0 {
|
||||
panic("message is too large to be encoded")
|
||||
}
|
||||
|
||||
bytes := b.Bytes[b.Head()-MessagePrefixSz:]
|
||||
bytes[0] = byte(MessageTypesKind)
|
||||
copy(bytes[1:], size[1:])
|
||||
return bytes
|
||||
}
|
||||
|
||||
// byteSliceToString converts a []byte to string without a heap allocation.
|
||||
|
||||
@@ -30,7 +30,6 @@ import (
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/pool"
|
||||
"github.com/dolthub/dolt/go/store/prolly"
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/prolly/shim"
|
||||
"github.com/dolthub/dolt/go/store/prolly/tree"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
@@ -195,12 +194,12 @@ func TableFromAddr(ctx context.Context, vrw types.ValueReadWriter, ns tree.NodeS
|
||||
err = errors.New("table ref is unexpected noms value; not SerialMessage")
|
||||
return nil, err
|
||||
}
|
||||
id := serial.GetFileID(sm[message.MessagePrefixSz:])
|
||||
id := serial.GetFileID(sm)
|
||||
if id != serial.TableFileID {
|
||||
err = errors.New("table ref is unexpected noms value; GetFileID == " + id)
|
||||
return nil, err
|
||||
}
|
||||
return doltDevTable{vrw, ns, serial.GetRootAsTable([]byte(sm), message.MessagePrefixSz)}, nil
|
||||
return doltDevTable{vrw, ns, serial.GetRootAsTable([]byte(sm), serial.MessagePrefixSz)}, nil
|
||||
}
|
||||
}
|
||||
|
||||
@@ -774,8 +773,8 @@ func (fields serialTableFields) write() *serial.Table {
|
||||
serial.TableAddConflicts(builder, conflictsoff)
|
||||
serial.TableAddViolations(builder, violationsoff)
|
||||
serial.TableAddArtifacts(builder, artifactsoff)
|
||||
bs := message.FinishMessage(builder, serial.TableEnd(builder), []byte(serial.TableFileID))
|
||||
return serial.GetRootAsTable(bs, message.MessagePrefixSz)
|
||||
bs := serial.FinishMessage(builder, serial.TableEnd(builder), []byte(serial.TableFileID))
|
||||
return serial.GetRootAsTable(bs, serial.MessagePrefixSz)
|
||||
}
|
||||
|
||||
func newDoltDevTable(ctx context.Context, vrw types.ValueReadWriter, ns tree.NodeStore, sch schema.Schema, rows Index, indexes IndexSet, autoIncVal types.Value) (Table, error) {
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
|
||||
"github.com/dolthub/dolt/go/gen/fb/serial"
|
||||
"github.com/dolthub/dolt/go/store/marshal"
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
@@ -85,11 +84,11 @@ func serializeNomsForeignKeys(ctx context.Context, vrw types.ValueReadWriter, fk
|
||||
|
||||
// deserializeNomsForeignKeys returns a new ForeignKeyCollection using the provided map returned previously by GetMap.
|
||||
func deserializeFlatbufferForeignKeys(msg types.SerialMessage) (*ForeignKeyCollection, error) {
|
||||
if serial.GetFileID(msg[message.MessagePrefixSz:]) != serial.ForeignKeyCollectionFileID {
|
||||
if serial.GetFileID(msg) != serial.ForeignKeyCollectionFileID {
|
||||
return nil, fmt.Errorf("expect Serial Message with ForeignKeyCollectionFileID")
|
||||
}
|
||||
|
||||
c := serial.GetRootAsForeignKeyCollection(msg, message.MessagePrefixSz)
|
||||
c := serial.GetRootAsForeignKeyCollection(msg, serial.MessagePrefixSz)
|
||||
collection := &ForeignKeyCollection{
|
||||
foreignKeys: make(map[string]ForeignKey, c.ForeignKeysLength()),
|
||||
}
|
||||
@@ -204,7 +203,7 @@ func serializeFlatbufferForeignKeys(fkc *ForeignKeyCollection) types.SerialMessa
|
||||
serial.ForeignKeyCollectionStart(b)
|
||||
serial.ForeignKeyCollectionAddForeignKeys(b, vec)
|
||||
o := serial.ForeignKeyCollectionEnd(b)
|
||||
return []byte(message.FinishMessage(b, o, []byte(serial.ForeignKeyCollectionFileID)))
|
||||
return []byte(serial.FinishMessage(b, o, []byte(serial.ForeignKeyCollectionFileID)))
|
||||
}
|
||||
|
||||
func serializeStringVector(b *fb.Builder, s []string) fb.UOffsetT {
|
||||
@@ -228,9 +227,9 @@ func serializeUint64Vector(b *fb.Builder, u []uint64) fb.UOffsetT {
|
||||
}
|
||||
|
||||
func emptyForeignKeyCollection(msg types.SerialMessage) bool {
|
||||
if serial.GetFileID(msg[message.MessagePrefixSz:]) != serial.ForeignKeyCollectionFileID {
|
||||
if serial.GetFileID(msg) != serial.ForeignKeyCollectionFileID {
|
||||
return false
|
||||
}
|
||||
c := serial.GetRootAsForeignKeyCollection(msg, message.MessagePrefixSz)
|
||||
c := serial.GetRootAsForeignKeyCollection(msg, serial.MessagePrefixSz)
|
||||
return c.ForeignKeysLength() == 0
|
||||
}
|
||||
|
||||
@@ -29,7 +29,6 @@ import (
|
||||
"github.com/dolthub/dolt/go/store/datas"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/prolly"
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/prolly/shim"
|
||||
"github.com/dolthub/dolt/go/store/prolly/tree"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
@@ -256,7 +255,7 @@ func newRootValue(vrw types.ValueReadWriter, ns tree.NodeStore, v types.Value) (
|
||||
var storage rvStorage
|
||||
|
||||
if vrw.Format().UsesFlatbuffers() {
|
||||
srv := serial.GetRootAsRootValue([]byte(v.(types.SerialMessage)), message.MessagePrefixSz)
|
||||
srv := serial.GetRootAsRootValue([]byte(v.(types.SerialMessage)), serial.MessagePrefixSz)
|
||||
storage = fbRvStorage{srv}
|
||||
} else {
|
||||
st, ok := v.(types.Struct)
|
||||
@@ -307,7 +306,7 @@ func decodeRootNomsValue(vrw types.ValueReadWriter, ns tree.NodeStore, val types
|
||||
func isRootValue(nbf *types.NomsBinFormat, val types.Value) bool {
|
||||
if nbf.UsesFlatbuffers() {
|
||||
if sm, ok := val.(types.SerialMessage); ok {
|
||||
return string(serial.GetFileID(sm[message.MessagePrefixSz:])) == serial.RootValueFileID
|
||||
return string(serial.GetFileID(sm)) == serial.RootValueFileID
|
||||
}
|
||||
} else {
|
||||
if st, ok := val.(types.Struct); ok {
|
||||
@@ -333,7 +332,7 @@ func EmptyRootValue(ctx context.Context, vrw types.ValueReadWriter, ns tree.Node
|
||||
serial.RootValueAddTables(builder, tablesoff)
|
||||
serial.RootValueAddForeignKeyAddr(builder, fkoff)
|
||||
serial.RootValueAddSuperSchemasAddr(builder, ssoff)
|
||||
bs := message.FinishMessage(builder, serial.RootValueEnd(builder), []byte(serial.RootValueFileID))
|
||||
bs := serial.FinishMessage(builder, serial.RootValueEnd(builder), []byte(serial.RootValueFileID))
|
||||
return newRootValue(vrw, ns, types.SerialMessage(bs))
|
||||
}
|
||||
|
||||
@@ -1269,8 +1268,8 @@ func (r fbRvStorage) EditTablesMap(ctx context.Context, vrw types.ValueReadWrite
|
||||
serial.RootValueAddForeignKeyAddr(builder, fkoff)
|
||||
serial.RootValueAddSuperSchemasAddr(builder, ssoff)
|
||||
|
||||
bs := message.FinishMessage(builder, serial.RootValueEnd(builder), []byte(serial.RootValueFileID))
|
||||
return fbRvStorage{serial.GetRootAsRootValue(bs, message.MessagePrefixSz)}, nil
|
||||
bs := serial.FinishMessage(builder, serial.RootValueEnd(builder), []byte(serial.RootValueFileID))
|
||||
return fbRvStorage{serial.GetRootAsRootValue(bs, serial.MessagePrefixSz)}, nil
|
||||
}
|
||||
|
||||
func (r fbRvStorage) SetForeignKeyMap(ctx context.Context, vrw types.ValueReadWriter, v types.Value) (rvStorage, error) {
|
||||
|
||||
@@ -25,7 +25,6 @@ import (
|
||||
"github.com/dolthub/dolt/go/gen/fb/serial"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema/typeinfo"
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
@@ -63,7 +62,7 @@ func serializeSchemaAsFlatbuffer(sch schema.Schema) ([]byte, error) {
|
||||
serial.TableSchemaAddSecondaryIndexes(b, indexes)
|
||||
serial.TableSchemaAddChecks(b, checks)
|
||||
root := serial.TableSchemaEnd(b)
|
||||
bs := message.FinishMessage(b, root, []byte(serial.TableSchemaFileID))
|
||||
bs := serial.FinishMessage(b, root, []byte(serial.TableSchemaFileID))
|
||||
return bs, nil
|
||||
}
|
||||
|
||||
@@ -76,8 +75,8 @@ func DeserializeSchema(ctx context.Context, nbf *types.NomsBinFormat, v types.Va
|
||||
}
|
||||
|
||||
func deserializeSchemaFromFlatbuffer(ctx context.Context, buf []byte) (schema.Schema, error) {
|
||||
assertTrue(serial.GetFileID(buf[message.MessagePrefixSz:]) == serial.TableSchemaFileID)
|
||||
s := serial.GetRootAsTableSchema(buf, message.MessagePrefixSz)
|
||||
assertTrue(serial.GetFileID(buf) == serial.TableSchemaFileID)
|
||||
s := serial.GetRootAsTableSchema(buf, serial.MessagePrefixSz)
|
||||
|
||||
cols, err := deserializeColumns(ctx, s)
|
||||
if err != nil {
|
||||
|
||||
@@ -15,7 +15,10 @@
|
||||
package serial
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"unsafe"
|
||||
|
||||
fb "github.com/google/flatbuffers/go"
|
||||
)
|
||||
|
||||
// KEEP THESE IN SYNC WITH .fbs FILES!
|
||||
@@ -32,11 +35,44 @@ const CommitClosureFileID = "CMCL"
|
||||
const TableSchemaFileID = "DSCH"
|
||||
const ForeignKeyCollectionFileID = "DFKC"
|
||||
|
||||
const MessageTypesKind int = 27
|
||||
|
||||
const MessagePrefixSz = 4
|
||||
|
||||
type Message []byte
|
||||
|
||||
func GetFileID(bs []byte) string {
|
||||
if len(bs) < 8 {
|
||||
if len(bs) < 8 + MessagePrefixSz {
|
||||
return ""
|
||||
}
|
||||
return byteSliceToString(bs[4:8])
|
||||
return byteSliceToString(bs[MessagePrefixSz+4:MessagePrefixSz+8])
|
||||
}
|
||||
|
||||
func FinishMessage(b *fb.Builder, off fb.UOffsetT, fileID []byte) Message {
|
||||
// We finish the buffer by prefixing it with:
|
||||
// 1) 1 byte NomsKind == SerialMessage.
|
||||
// 2) big endian 3 byte uint representing the size of the message, not
|
||||
// including the kind or size prefix bytes.
|
||||
//
|
||||
// This allows chunks we serialize here to be read by types binary
|
||||
// codec.
|
||||
//
|
||||
// All accessors in this package expect this prefix to be on the front
|
||||
// of the message bytes as well. See |MessagePrefixSz|.
|
||||
|
||||
b.Prep(1, fb.SizeInt32+4+MessagePrefixSz)
|
||||
b.FinishWithFileIdentifier(off, fileID)
|
||||
|
||||
var size [4]byte
|
||||
binary.BigEndian.PutUint32(size[:], uint32(len(b.Bytes)-int(b.Head())))
|
||||
if size[0] != 0 {
|
||||
panic("message is too large to be encoded")
|
||||
}
|
||||
|
||||
bytes := b.Bytes[b.Head()-MessagePrefixSz:]
|
||||
bytes[0] = byte(MessageTypesKind)
|
||||
copy(bytes[1:], size[1:])
|
||||
return bytes
|
||||
}
|
||||
|
||||
// byteSliceToString converts a []byte to string without a heap allocation.
|
||||
|
||||
@@ -34,7 +34,6 @@ import (
|
||||
"github.com/dolthub/dolt/go/store/cmd/noms/util"
|
||||
"github.com/dolthub/dolt/go/store/config"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/prolly/shim"
|
||||
"github.com/dolthub/dolt/go/store/prolly/tree"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
@@ -128,7 +127,7 @@ func outputType(value types.Value) {
|
||||
var typeString string
|
||||
switch value := value.(type) {
|
||||
case types.SerialMessage:
|
||||
switch serial.GetFileID(value[message.MessagePrefixSz:]) {
|
||||
switch serial.GetFileID(value) {
|
||||
case serial.StoreRootFileID:
|
||||
typeString = "StoreRoot"
|
||||
case serial.TagFileID:
|
||||
@@ -162,9 +161,9 @@ func outputEncodedValue(ctx context.Context, w io.Writer, value types.Value) err
|
||||
switch value := value.(type) {
|
||||
// Some types of serial message need to be output here because of dependency cycles between types / tree package
|
||||
case types.SerialMessage:
|
||||
switch serial.GetFileID(value[message.MessagePrefixSz:]) {
|
||||
switch serial.GetFileID(value) {
|
||||
case serial.TableFileID:
|
||||
msg := serial.GetRootAsTable(value, message.MessagePrefixSz)
|
||||
msg := serial.GetRootAsTable(value, serial.MessagePrefixSz)
|
||||
|
||||
fmt.Fprintf(w, "{\n")
|
||||
fmt.Fprintf(w, "\tSchema: #%s\n", hash.New(msg.SchemaBytes()).String())
|
||||
@@ -191,7 +190,7 @@ func outputEncodedValue(ctx context.Context, w io.Writer, value types.Value) err
|
||||
|
||||
return nil
|
||||
case serial.StoreRootFileID:
|
||||
msg := serial.GetRootAsStoreRoot(value, message.MessagePrefixSz)
|
||||
msg := serial.GetRootAsStoreRoot(value, serial.MessagePrefixSz)
|
||||
ambytes := msg.AddressMapBytes()
|
||||
node := tree.NodeFromBytes(ambytes)
|
||||
return tree.OutputAddressMapNode(w, node)
|
||||
|
||||
@@ -34,7 +34,6 @@ import (
|
||||
"github.com/dolthub/dolt/go/store/d"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/nomdl"
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/prolly/tree"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
"github.com/dolthub/dolt/go/store/val"
|
||||
@@ -137,7 +136,7 @@ func NewCommitForValue(ctx context.Context, cs chunks.ChunkStore, vrw types.Valu
|
||||
return newCommitForValue(ctx, cs, vrw, ns, v, opts)
|
||||
}
|
||||
|
||||
func commit_flatbuffer(vaddr hash.Hash, opts CommitOptions, heights []uint64, parentsClosureAddr hash.Hash) (message.Message, uint64) {
|
||||
func commit_flatbuffer(vaddr hash.Hash, opts CommitOptions, heights []uint64, parentsClosureAddr hash.Hash) (serial.Message, uint64) {
|
||||
builder := flatbuffers.NewBuilder(1024)
|
||||
vaddroff := builder.CreateByteVector(vaddr[:])
|
||||
|
||||
@@ -176,7 +175,7 @@ func commit_flatbuffer(vaddr hash.Hash, opts CommitOptions, heights []uint64, pa
|
||||
serial.CommitAddTimestampMillis(builder, opts.Meta.Timestamp)
|
||||
serial.CommitAddUserTimestampMillis(builder, opts.Meta.UserTimestamp)
|
||||
|
||||
bytes := message.FinishMessage(builder, serial.CommitEnd(builder), []byte(serial.CommitFileID))
|
||||
bytes := serial.FinishMessage(builder, serial.CommitEnd(builder), []byte(serial.CommitFileID))
|
||||
return bytes, maxheight + 1
|
||||
}
|
||||
|
||||
@@ -203,7 +202,7 @@ func newCommitForValue(ctx context.Context, cs chunks.ChunkStore, vrw types.Valu
|
||||
return nil, err
|
||||
}
|
||||
for i := range heights {
|
||||
parents[i] = serial.GetRootAsCommit([]byte(parentValues[i].(types.SerialMessage)), message.MessagePrefixSz)
|
||||
parents[i] = serial.GetRootAsCommit([]byte(parentValues[i].(types.SerialMessage)), serial.MessagePrefixSz)
|
||||
heights[i] = parents[i].Height()
|
||||
}
|
||||
parentClosureAddr, err := writeFbCommitParentClosure(ctx, cs, vrw, ns, parents, opts.Parents)
|
||||
@@ -263,7 +262,7 @@ func newCommitForValue(ctx context.Context, cs chunks.ChunkStore, vrw types.Valu
|
||||
func commitPtr(nbf *types.NomsBinFormat, v types.Value, r *types.Ref) (*Commit, error) {
|
||||
if nbf.UsesFlatbuffers() {
|
||||
bs := []byte(v.(types.SerialMessage))
|
||||
height := serial.GetRootAsCommit(bs, message.MessagePrefixSz).Height()
|
||||
height := serial.GetRootAsCommit(bs, serial.MessagePrefixSz).Height()
|
||||
var addr hash.Hash
|
||||
if r != nil {
|
||||
addr = r.TargetHash()
|
||||
@@ -432,7 +431,7 @@ func FindClosureCommonAncestor(ctx context.Context, cl CommitClosure, cm *Commit
|
||||
func GetCommitParents(ctx context.Context, vr types.ValueReader, cv types.Value) ([]*Commit, error) {
|
||||
if sm, ok := cv.(types.SerialMessage); ok {
|
||||
data := []byte(sm)
|
||||
if serial.GetFileID(data[message.MessagePrefixSz:]) != serial.CommitFileID {
|
||||
if serial.GetFileID(data) != serial.CommitFileID {
|
||||
return nil, errors.New("GetCommitParents: provided value is not a commit.")
|
||||
}
|
||||
addrs, err := types.SerialCommitParentAddrs(vr.Format(), sm)
|
||||
@@ -445,7 +444,7 @@ func GetCommitParents(ctx context.Context, vr types.ValueReader, cv types.Value)
|
||||
if v == nil {
|
||||
return nil, fmt.Errorf("GetCommitParents: Did not find parent Commit in ValueReader: %s", addrs[i].String())
|
||||
}
|
||||
csm := serial.GetRootAsCommit([]byte(v.(types.SerialMessage)), message.MessagePrefixSz)
|
||||
csm := serial.GetRootAsCommit([]byte(v.(types.SerialMessage)), serial.MessagePrefixSz)
|
||||
res[i] = &Commit{
|
||||
val: v,
|
||||
height: csm.Height(),
|
||||
@@ -512,10 +511,10 @@ func GetCommitParents(ctx context.Context, vr types.ValueReader, cv types.Value)
|
||||
func GetCommitMeta(ctx context.Context, cv types.Value) (*CommitMeta, error) {
|
||||
if sm, ok := cv.(types.SerialMessage); ok {
|
||||
data := []byte(sm)
|
||||
if serial.GetFileID(data[message.MessagePrefixSz:]) != serial.CommitFileID {
|
||||
if serial.GetFileID(data) != serial.CommitFileID {
|
||||
return nil, errors.New("GetCommitMeta: provided value is not a commit.")
|
||||
}
|
||||
cmsg := serial.GetRootAsCommit(data, message.MessagePrefixSz)
|
||||
cmsg := serial.GetRootAsCommit(data, serial.MessagePrefixSz)
|
||||
ret := &CommitMeta{}
|
||||
ret.Name = string(cmsg.Name())
|
||||
ret.Email = string(cmsg.Email())
|
||||
@@ -548,10 +547,10 @@ func GetCommitMeta(ctx context.Context, cv types.Value) (*CommitMeta, error) {
|
||||
func GetCommittedValue(ctx context.Context, vr types.ValueReader, cv types.Value) (types.Value, error) {
|
||||
if sm, ok := cv.(types.SerialMessage); ok {
|
||||
data := []byte(sm)
|
||||
if serial.GetFileID(data[message.MessagePrefixSz:]) != serial.CommitFileID {
|
||||
if serial.GetFileID(data) != serial.CommitFileID {
|
||||
return nil, errors.New("GetCommittedValue: provided value is not a commit.")
|
||||
}
|
||||
cmsg := serial.GetRootAsCommit(data, message.MessagePrefixSz)
|
||||
cmsg := serial.GetRootAsCommit(data, serial.MessagePrefixSz)
|
||||
var roothash hash.Hash
|
||||
copy(roothash[:], cmsg.RootBytes())
|
||||
return vr.ReadValue(ctx, roothash)
|
||||
@@ -674,7 +673,7 @@ func IsCommit(v types.Value) (bool, error) {
|
||||
return types.IsValueSubtypeOf(s.Format(), v, valueCommitType)
|
||||
} else if sm, ok := v.(types.SerialMessage); ok {
|
||||
data := []byte(sm)
|
||||
return serial.GetFileID(data[message.MessagePrefixSz:]) == serial.CommitFileID, nil
|
||||
return serial.GetFileID(data) == serial.CommitFileID, nil
|
||||
} else {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
@@ -24,7 +24,6 @@ import (
|
||||
"github.com/dolthub/dolt/go/store/chunks"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/prolly"
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/prolly/tree"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
@@ -33,7 +32,7 @@ func newParentsClosureIterator(ctx context.Context, c *Commit, vr types.ValueRea
|
||||
sv := c.NomsValue()
|
||||
|
||||
if _, ok := sv.(types.SerialMessage); ok {
|
||||
msg := serial.GetRootAsCommit(sv.(types.SerialMessage), message.MessagePrefixSz)
|
||||
msg := serial.GetRootAsCommit(sv.(types.SerialMessage), serial.MessagePrefixSz)
|
||||
addr := hash.New(msg.ParentClosureBytes())
|
||||
if addr.IsEmpty() {
|
||||
return nil, nil
|
||||
|
||||
@@ -29,7 +29,6 @@ import (
|
||||
|
||||
"github.com/dolthub/dolt/go/gen/fb/serial"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
@@ -149,7 +148,7 @@ type serialTagHead struct {
|
||||
}
|
||||
|
||||
func newSerialTagHead(bs []byte, addr hash.Hash) serialTagHead {
|
||||
return serialTagHead{serial.GetRootAsTag(bs, message.MessagePrefixSz), addr}
|
||||
return serialTagHead{serial.GetRootAsTag(bs, serial.MessagePrefixSz), addr}
|
||||
}
|
||||
|
||||
func (h serialTagHead) TypeName() string {
|
||||
@@ -186,7 +185,7 @@ type serialWorkingSetHead struct {
|
||||
}
|
||||
|
||||
func newSerialWorkingSetHead(bs []byte, addr hash.Hash) serialWorkingSetHead {
|
||||
return serialWorkingSetHead{serial.GetRootAsWorkingSet(bs, message.MessagePrefixSz), addr}
|
||||
return serialWorkingSetHead{serial.GetRootAsWorkingSet(bs, serial.MessagePrefixSz), addr}
|
||||
}
|
||||
|
||||
func (h serialWorkingSetHead) TypeName() string {
|
||||
@@ -305,7 +304,7 @@ func newHead(head types.Value, addr hash.Hash) (dsHead, error) {
|
||||
|
||||
if sm, ok := head.(types.SerialMessage); ok {
|
||||
data := []byte(sm)
|
||||
fid := serial.GetFileID(data[message.MessagePrefixSz:])
|
||||
fid := serial.GetFileID(data)
|
||||
if fid == serial.TagFileID {
|
||||
return newSerialTagHead(data, addr), nil
|
||||
}
|
||||
|
||||
@@ -19,25 +19,24 @@ import (
|
||||
|
||||
"github.com/dolthub/dolt/go/gen/fb/serial"
|
||||
"github.com/dolthub/dolt/go/store/prolly"
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/prolly/tree"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
func storeroot_flatbuffer(am prolly.AddressMap) message.Message {
|
||||
func storeroot_flatbuffer(am prolly.AddressMap) serial.Message {
|
||||
builder := flatbuffers.NewBuilder(1024)
|
||||
ambytes := []byte(tree.ValueFromNode(am.Node()).(types.SerialMessage))
|
||||
voff := builder.CreateByteVector(ambytes)
|
||||
serial.StoreRootStart(builder)
|
||||
serial.StoreRootAddAddressMap(builder, voff)
|
||||
return message.FinishMessage(builder, serial.StoreRootEnd(builder), []byte(serial.StoreRootFileID))
|
||||
return serial.FinishMessage(builder, serial.StoreRootEnd(builder), []byte(serial.StoreRootFileID))
|
||||
}
|
||||
|
||||
func parse_storeroot(bs []byte, ns tree.NodeStore) prolly.AddressMap {
|
||||
if serial.GetFileID(bs[message.MessagePrefixSz:]) != serial.StoreRootFileID {
|
||||
panic("expected store root file id, got: " + serial.GetFileID(bs[message.MessagePrefixSz:]))
|
||||
if serial.GetFileID(bs) != serial.StoreRootFileID {
|
||||
panic("expected store root file id, got: " + serial.GetFileID(bs))
|
||||
}
|
||||
sr := serial.GetRootAsStoreRoot(bs, message.MessagePrefixSz)
|
||||
sr := serial.GetRootAsStoreRoot(bs, serial.MessagePrefixSz)
|
||||
mapbytes := sr.AddressMapBytes()
|
||||
node := tree.NodeFromBytes(mapbytes)
|
||||
return prolly.NewAddressMap(node, ns)
|
||||
|
||||
@@ -23,7 +23,6 @@ import (
|
||||
"github.com/dolthub/dolt/go/gen/fb/serial"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/nomdl"
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
@@ -112,7 +111,7 @@ func newTag(ctx context.Context, db *database, commitAddr hash.Hash, meta *TagMe
|
||||
}
|
||||
}
|
||||
|
||||
func tag_flatbuffer(commitAddr hash.Hash, meta *TagMeta) message.Message {
|
||||
func tag_flatbuffer(commitAddr hash.Hash, meta *TagMeta) serial.Message {
|
||||
builder := flatbuffers.NewBuilder(1024)
|
||||
addroff := builder.CreateByteVector(commitAddr[:])
|
||||
var nameOff, emailOff, descOff flatbuffers.UOffsetT
|
||||
@@ -130,7 +129,7 @@ func tag_flatbuffer(commitAddr hash.Hash, meta *TagMeta) message.Message {
|
||||
serial.TagAddTimestampMillis(builder, meta.Timestamp)
|
||||
serial.TagAddUserTimestampMillis(builder, meta.UserTimestamp)
|
||||
}
|
||||
return message.FinishMessage(builder, serial.TagEnd(builder), []byte(serial.TagFileID))
|
||||
return serial.FinishMessage(builder, serial.TagEnd(builder), []byte(serial.TagFileID))
|
||||
}
|
||||
|
||||
func IsTag(v types.Value) (bool, error) {
|
||||
@@ -138,7 +137,7 @@ func IsTag(v types.Value) (bool, error) {
|
||||
return types.IsValueSubtypeOf(s.Format(), v, valueTagType)
|
||||
} else if sm, ok := v.(types.SerialMessage); ok {
|
||||
data := []byte(sm)
|
||||
return serial.GetFileID(data[message.MessagePrefixSz:]) == serial.TagFileID, nil
|
||||
return serial.GetFileID(data) == serial.TagFileID, nil
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
|
||||
@@ -21,7 +21,6 @@ import (
|
||||
|
||||
"github.com/dolthub/dolt/go/gen/fb/serial"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
|
||||
@@ -149,7 +148,7 @@ func newWorkingSet(ctx context.Context, db *database, meta *WorkingSetMeta, work
|
||||
return ref.TargetHash(), ref, nil
|
||||
}
|
||||
|
||||
func workingset_flatbuffer(working hash.Hash, staged *hash.Hash, mergeState *MergeState, meta *WorkingSetMeta) message.Message {
|
||||
func workingset_flatbuffer(working hash.Hash, staged *hash.Hash, mergeState *MergeState, meta *WorkingSetMeta) serial.Message {
|
||||
builder := flatbuffers.NewBuilder(1024)
|
||||
workingoff := builder.CreateByteVector(working[:])
|
||||
var stagedOff, mergeStateOff flatbuffers.UOffsetT
|
||||
@@ -186,7 +185,7 @@ func workingset_flatbuffer(working hash.Hash, staged *hash.Hash, mergeState *Mer
|
||||
serial.WorkingSetAddDesc(builder, descOff)
|
||||
serial.WorkingSetAddTimestampMillis(builder, meta.Timestamp)
|
||||
}
|
||||
return message.FinishMessage(builder, serial.WorkingSetEnd(builder), []byte(serial.WorkingSetFileID))
|
||||
return serial.FinishMessage(builder, serial.WorkingSetEnd(builder), []byte(serial.WorkingSetFileID))
|
||||
}
|
||||
|
||||
func NewMergeState(ctx context.Context, vrw types.ValueReadWriter, preMergeWorking types.Ref, commit *Commit) (*MergeState, error) {
|
||||
@@ -221,7 +220,7 @@ func IsWorkingSet(v types.Value) (bool, error) {
|
||||
// types.IsValueSubtypeOf is very strict about the type description.
|
||||
return s.Name() == workingSetName, nil
|
||||
} else if sm, ok := v.(types.SerialMessage); ok {
|
||||
return serial.GetFileID(sm[message.MessagePrefixSz:]) == serial.WorkingSetFileID, nil
|
||||
return serial.GetFileID(sm) == serial.WorkingSetFileID, nil
|
||||
} else {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/dolthub/dolt/go/gen/fb/serial"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/prolly/tree"
|
||||
@@ -148,7 +149,7 @@ func TestCommitClosure(t *testing.T) {
|
||||
assert.Equal(t, 4096, cc.Count())
|
||||
|
||||
// Walk the addresses in the root.
|
||||
msg := message.Message(tree.ValueFromNode(cc.closure.root).(types.SerialMessage))
|
||||
msg := serial.Message(tree.ValueFromNode(cc.closure.root).(types.SerialMessage))
|
||||
numaddresses := 0
|
||||
err = message.WalkAddresses(ctx, msg, func(ctx context.Context, addr hash.Hash) error {
|
||||
numaddresses++
|
||||
|
||||
@@ -40,7 +40,7 @@ type AddressMapSerializer struct {
|
||||
|
||||
var _ Serializer = AddressMapSerializer{}
|
||||
|
||||
func (s AddressMapSerializer) Serialize(keys, addrs [][]byte, subtrees []uint64, level int) Message {
|
||||
func (s AddressMapSerializer) Serialize(keys, addrs [][]byte, subtrees []uint64, level int) serial.Message {
|
||||
var (
|
||||
keyArr, keyOffs fb.UOffsetT
|
||||
addrArr, cardArr fb.UOffsetT
|
||||
@@ -75,25 +75,25 @@ func (s AddressMapSerializer) Serialize(keys, addrs [][]byte, subtrees []uint64,
|
||||
}
|
||||
serial.AddressMapAddTreeLevel(b, uint8(level))
|
||||
|
||||
return FinishMessage(b, serial.AddressMapEnd(b), addressMapFileID)
|
||||
return serial.FinishMessage(b, serial.AddressMapEnd(b), addressMapFileID)
|
||||
}
|
||||
|
||||
func getAddressMapKeys(msg Message) (keys val.SlicedBuffer) {
|
||||
am := serial.GetRootAsAddressMap(msg, MessagePrefixSz)
|
||||
func getAddressMapKeys(msg serial.Message) (keys val.SlicedBuffer) {
|
||||
am := serial.GetRootAsAddressMap(msg, serial.MessagePrefixSz)
|
||||
keys.Buf = am.KeyItemsBytes()
|
||||
keys.Offs = getAddressMapKeyOffsets(am)
|
||||
return
|
||||
}
|
||||
|
||||
func getAddressMapValues(msg Message) (values val.SlicedBuffer) {
|
||||
am := serial.GetRootAsAddressMap(msg, MessagePrefixSz)
|
||||
func getAddressMapValues(msg serial.Message) (values val.SlicedBuffer) {
|
||||
am := serial.GetRootAsAddressMap(msg, serial.MessagePrefixSz)
|
||||
values.Buf = am.AddressArrayBytes()
|
||||
values.Offs = offsetsForAddressArray(values.Buf)
|
||||
return
|
||||
}
|
||||
|
||||
func walkAddressMapAddresses(ctx context.Context, msg Message, cb func(ctx context.Context, addr hash.Hash) error) error {
|
||||
am := serial.GetRootAsAddressMap(msg, MessagePrefixSz)
|
||||
func walkAddressMapAddresses(ctx context.Context, msg serial.Message, cb func(ctx context.Context, addr hash.Hash) error) error {
|
||||
am := serial.GetRootAsAddressMap(msg, serial.MessagePrefixSz)
|
||||
arr := am.AddressArrayBytes()
|
||||
for i := 0; i < len(arr)/hash.ByteLen; i++ {
|
||||
addr := hash.New(arr[i*addrSize : (i+1)*addrSize])
|
||||
@@ -104,8 +104,8 @@ func walkAddressMapAddresses(ctx context.Context, msg Message, cb func(ctx conte
|
||||
return nil
|
||||
}
|
||||
|
||||
func getAddressMapCount(msg Message) uint16 {
|
||||
am := serial.GetRootAsAddressMap(msg, MessagePrefixSz)
|
||||
func getAddressMapCount(msg serial.Message) uint16 {
|
||||
am := serial.GetRootAsAddressMap(msg, serial.MessagePrefixSz)
|
||||
if am.KeyItemsLength() == 0 {
|
||||
return 0
|
||||
}
|
||||
@@ -113,19 +113,19 @@ func getAddressMapCount(msg Message) uint16 {
|
||||
return uint16(am.KeyOffsetsLength() + 1)
|
||||
}
|
||||
|
||||
func getAddressMapTreeLevel(msg Message) int {
|
||||
am := serial.GetRootAsAddressMap(msg, MessagePrefixSz)
|
||||
func getAddressMapTreeLevel(msg serial.Message) int {
|
||||
am := serial.GetRootAsAddressMap(msg, serial.MessagePrefixSz)
|
||||
return int(am.TreeLevel())
|
||||
}
|
||||
|
||||
func getAddressMapTreeCount(msg Message) int {
|
||||
am := serial.GetRootAsAddressMap(msg, MessagePrefixSz)
|
||||
func getAddressMapTreeCount(msg serial.Message) int {
|
||||
am := serial.GetRootAsAddressMap(msg, serial.MessagePrefixSz)
|
||||
return int(am.TreeCount())
|
||||
}
|
||||
|
||||
func getAddressMapSubtrees(msg Message) []uint64 {
|
||||
func getAddressMapSubtrees(msg serial.Message) []uint64 {
|
||||
counts := make([]uint64, getAddressMapCount(msg))
|
||||
am := serial.GetRootAsAddressMap(msg, MessagePrefixSz)
|
||||
am := serial.GetRootAsAddressMap(msg, serial.MessagePrefixSz)
|
||||
return decodeVarints(am.SubtreeCountsBytes(), counts)
|
||||
}
|
||||
|
||||
@@ -149,6 +149,6 @@ func estimateAddressMapSize(keys, addresses [][]byte, subtrees []uint64) (keySz,
|
||||
totalSz += len(subtrees) * binary.MaxVarintLen64
|
||||
totalSz += 8 + 1 + 1 + 1
|
||||
totalSz += 72
|
||||
totalSz += MessagePrefixSz
|
||||
totalSz += serial.MessagePrefixSz
|
||||
return
|
||||
}
|
||||
|
||||
@@ -49,17 +49,17 @@ func offsetsForCommitClosureKeys(buf []byte) []byte {
|
||||
return commitClosureKeyOffsets[:cnt*uint16Size]
|
||||
}
|
||||
|
||||
func getCommitClosureKeys(msg Message) val.SlicedBuffer {
|
||||
func getCommitClosureKeys(msg serial.Message) val.SlicedBuffer {
|
||||
var ret val.SlicedBuffer
|
||||
m := serial.GetRootAsCommitClosure(msg, MessagePrefixSz)
|
||||
m := serial.GetRootAsCommitClosure(msg, serial.MessagePrefixSz)
|
||||
ret.Buf = m.KeyItemsBytes()
|
||||
ret.Offs = offsetsForCommitClosureKeys(ret.Buf)
|
||||
return ret
|
||||
}
|
||||
|
||||
func getCommitClosureValues(msg Message) val.SlicedBuffer {
|
||||
func getCommitClosureValues(msg serial.Message) val.SlicedBuffer {
|
||||
var ret val.SlicedBuffer
|
||||
m := serial.GetRootAsCommitClosure(msg, MessagePrefixSz)
|
||||
m := serial.GetRootAsCommitClosure(msg, serial.MessagePrefixSz)
|
||||
if m.AddressArrayLength() == 0 {
|
||||
ret.Buf = commitClosureEmptyValueBytes
|
||||
ret.Offs = commitClosureValueOffsets[:getCommitClosureCount(msg)*uint16Size]
|
||||
@@ -73,29 +73,29 @@ func getCommitClosureValues(msg Message) val.SlicedBuffer {
|
||||
// uint64 + hash.
|
||||
const commitClosureKeyLength = 8 + 20
|
||||
|
||||
func getCommitClosureCount(msg Message) uint16 {
|
||||
m := serial.GetRootAsCommitClosure(msg, MessagePrefixSz)
|
||||
func getCommitClosureCount(msg serial.Message) uint16 {
|
||||
m := serial.GetRootAsCommitClosure(msg, serial.MessagePrefixSz)
|
||||
return uint16(m.KeyItemsLength() / commitClosureKeyLength)
|
||||
}
|
||||
|
||||
func getCommitClosureTreeLevel(msg Message) int {
|
||||
m := serial.GetRootAsCommitClosure(msg, MessagePrefixSz)
|
||||
func getCommitClosureTreeLevel(msg serial.Message) int {
|
||||
m := serial.GetRootAsCommitClosure(msg, serial.MessagePrefixSz)
|
||||
return int(m.TreeLevel())
|
||||
}
|
||||
|
||||
func getCommitClosureTreeCount(msg Message) int {
|
||||
m := serial.GetRootAsCommitClosure(msg, MessagePrefixSz)
|
||||
func getCommitClosureTreeCount(msg serial.Message) int {
|
||||
m := serial.GetRootAsCommitClosure(msg, serial.MessagePrefixSz)
|
||||
return int(m.TreeCount())
|
||||
}
|
||||
|
||||
func getCommitClosureSubtrees(msg Message) []uint64 {
|
||||
func getCommitClosureSubtrees(msg serial.Message) []uint64 {
|
||||
counts := make([]uint64, getCommitClosureCount(msg))
|
||||
m := serial.GetRootAsCommitClosure(msg, MessagePrefixSz)
|
||||
m := serial.GetRootAsCommitClosure(msg, serial.MessagePrefixSz)
|
||||
return decodeVarints(m.SubtreeCountsBytes(), counts)
|
||||
}
|
||||
|
||||
func walkCommitClosureAddresses(ctx context.Context, msg Message, cb func(ctx context.Context, addr hash.Hash) error) error {
|
||||
m := serial.GetRootAsCommitClosure(msg, MessagePrefixSz)
|
||||
func walkCommitClosureAddresses(ctx context.Context, msg serial.Message, cb func(ctx context.Context, addr hash.Hash) error) error {
|
||||
m := serial.GetRootAsCommitClosure(msg, serial.MessagePrefixSz)
|
||||
arr := m.AddressArrayBytes()
|
||||
for i := 0; i < len(arr)/hash.ByteLen; i++ {
|
||||
addr := hash.New(arr[i*addrSize : (i+1)*addrSize])
|
||||
@@ -124,7 +124,7 @@ type CommitClosureSerializer struct {
|
||||
|
||||
var _ Serializer = CommitClosureSerializer{}
|
||||
|
||||
func (s CommitClosureSerializer) Serialize(keys, addrs [][]byte, subtrees []uint64, level int) Message {
|
||||
func (s CommitClosureSerializer) Serialize(keys, addrs [][]byte, subtrees []uint64, level int) serial.Message {
|
||||
var keyArr, addrArr, cardArr fb.UOffsetT
|
||||
|
||||
keySz, addrSz, totalSz := estimateCommitClosureSize(keys, addrs, subtrees)
|
||||
@@ -153,7 +153,7 @@ func (s CommitClosureSerializer) Serialize(keys, addrs [][]byte, subtrees []uint
|
||||
}
|
||||
serial.CommitClosureAddTreeLevel(b, uint8(level))
|
||||
|
||||
return FinishMessage(b, serial.CommitClosureEnd(b), commitClosureFileID)
|
||||
return serial.FinishMessage(b, serial.CommitClosureEnd(b), commitClosureFileID)
|
||||
}
|
||||
|
||||
func estimateCommitClosureSize(keys, addresses [][]byte, subtrees []uint64) (keySz, addrSz, totalSz int) {
|
||||
@@ -163,6 +163,6 @@ func estimateCommitClosureSize(keys, addresses [][]byte, subtrees []uint64) (key
|
||||
totalSz += len(subtrees) * binary.MaxVarintLen64
|
||||
totalSz += 8 + 1 + 1 + 1
|
||||
totalSz += 72
|
||||
totalSz += MessagePrefixSz
|
||||
totalSz += serial.MessagePrefixSz
|
||||
return
|
||||
}
|
||||
|
||||
@@ -16,55 +16,19 @@ package message
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
|
||||
fb "github.com/google/flatbuffers/go"
|
||||
|
||||
"github.com/dolthub/dolt/go/gen/fb/serial"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/val"
|
||||
)
|
||||
|
||||
const MessageTypesKind int = 27
|
||||
|
||||
const MessagePrefixSz = 4
|
||||
|
||||
type Message []byte
|
||||
|
||||
func FinishMessage(b *fb.Builder, off fb.UOffsetT, fileID []byte) Message {
|
||||
// We finish the buffer by prefixing it with:
|
||||
// 1) 1 byte NomsKind == SerialMessage.
|
||||
// 2) big endian 3 byte uint representing the size of the message, not
|
||||
// including the kind or size prefix bytes.
|
||||
//
|
||||
// This allows chunks we serialize here to be read by types binary
|
||||
// codec.
|
||||
//
|
||||
// All accessors in this package expect this prefix to be on the front
|
||||
// of the message bytes as well. See |MessagePrefixSz|.
|
||||
|
||||
b.Prep(1, fb.SizeInt32+4+MessagePrefixSz)
|
||||
b.FinishWithFileIdentifier(off, fileID)
|
||||
|
||||
var size [4]byte
|
||||
binary.BigEndian.PutUint32(size[:], uint32(len(b.Bytes)-int(b.Head())))
|
||||
if size[0] != 0 {
|
||||
panic("message is too large to be encoded")
|
||||
}
|
||||
|
||||
bytes := b.Bytes[b.Head()-MessagePrefixSz:]
|
||||
bytes[0] = byte(MessageTypesKind)
|
||||
copy(bytes[1:], size[1:])
|
||||
return bytes
|
||||
}
|
||||
|
||||
type Serializer interface {
|
||||
Serialize(keys, values [][]byte, subtrees []uint64, level int) Message
|
||||
Serialize(keys, values [][]byte, subtrees []uint64, level int) serial.Message
|
||||
}
|
||||
|
||||
func GetKeysAndValues(msg Message) (keys, values val.SlicedBuffer, cnt uint16) {
|
||||
id := serial.GetFileID(msg[MessagePrefixSz:])
|
||||
func GetKeysAndValues(msg serial.Message) (keys, values val.SlicedBuffer, cnt uint16) {
|
||||
id := serial.GetFileID(msg)
|
||||
|
||||
if id == serial.ProllyTreeNodeFileID {
|
||||
return getProllyMapKeysAndValues(msg)
|
||||
@@ -85,8 +49,8 @@ func GetKeysAndValues(msg Message) (keys, values val.SlicedBuffer, cnt uint16) {
|
||||
panic(fmt.Sprintf("unknown message id %s", id))
|
||||
}
|
||||
|
||||
func WalkAddresses(ctx context.Context, msg Message, cb func(ctx context.Context, addr hash.Hash) error) error {
|
||||
id := serial.GetFileID(msg[MessagePrefixSz:])
|
||||
func WalkAddresses(ctx context.Context, msg serial.Message, cb func(ctx context.Context, addr hash.Hash) error) error {
|
||||
id := serial.GetFileID(msg)
|
||||
switch id {
|
||||
case serial.ProllyTreeNodeFileID:
|
||||
return walkProllyMapAddresses(ctx, msg, cb)
|
||||
@@ -99,8 +63,8 @@ func WalkAddresses(ctx context.Context, msg Message, cb func(ctx context.Context
|
||||
}
|
||||
}
|
||||
|
||||
func GetTreeLevel(msg Message) int {
|
||||
id := serial.GetFileID(msg[MessagePrefixSz:])
|
||||
func GetTreeLevel(msg serial.Message) int {
|
||||
id := serial.GetFileID(msg)
|
||||
switch id {
|
||||
case serial.ProllyTreeNodeFileID:
|
||||
return getProllyMapTreeLevel(msg)
|
||||
@@ -113,8 +77,8 @@ func GetTreeLevel(msg Message) int {
|
||||
}
|
||||
}
|
||||
|
||||
func GetTreeCount(msg Message) int {
|
||||
id := serial.GetFileID(msg[MessagePrefixSz:])
|
||||
func GetTreeCount(msg serial.Message) int {
|
||||
id := serial.GetFileID(msg)
|
||||
switch id {
|
||||
case serial.ProllyTreeNodeFileID:
|
||||
return getProllyMapTreeCount(msg)
|
||||
@@ -127,8 +91,8 @@ func GetTreeCount(msg Message) int {
|
||||
}
|
||||
}
|
||||
|
||||
func GetSubtrees(msg Message) []uint64 {
|
||||
id := serial.GetFileID(msg[MessagePrefixSz:])
|
||||
func GetSubtrees(msg serial.Message) []uint64 {
|
||||
id := serial.GetFileID(msg)
|
||||
switch id {
|
||||
case serial.ProllyTreeNodeFileID:
|
||||
return getProllyMapSubtrees(msg)
|
||||
|
||||
@@ -44,7 +44,7 @@ type ProllyMapSerializer struct {
|
||||
|
||||
var _ Serializer = ProllyMapSerializer{}
|
||||
|
||||
func (s ProllyMapSerializer) Serialize(keys, values [][]byte, subtrees []uint64, level int) Message {
|
||||
func (s ProllyMapSerializer) Serialize(keys, values [][]byte, subtrees []uint64, level int) serial.Message {
|
||||
var (
|
||||
keyTups, keyOffs fb.UOffsetT
|
||||
valTups, valOffs fb.UOffsetT
|
||||
@@ -93,11 +93,11 @@ func (s ProllyMapSerializer) Serialize(keys, values [][]byte, subtrees []uint64,
|
||||
serial.ProllyTreeNodeAddValueType(b, serial.ItemTypeTupleFormatAlpha)
|
||||
serial.ProllyTreeNodeAddTreeLevel(b, uint8(level))
|
||||
|
||||
return FinishMessage(b, serial.ProllyTreeNodeEnd(b), prollyMapFileID)
|
||||
return serial.FinishMessage(b, serial.ProllyTreeNodeEnd(b), prollyMapFileID)
|
||||
}
|
||||
|
||||
func getProllyMapKeysAndValues(msg Message) (keys, values val.SlicedBuffer, cnt uint16) {
|
||||
pm := serial.GetRootAsProllyTreeNode(msg, MessagePrefixSz)
|
||||
func getProllyMapKeysAndValues(msg serial.Message) (keys, values val.SlicedBuffer, cnt uint16) {
|
||||
pm := serial.GetRootAsProllyTreeNode(msg, serial.MessagePrefixSz)
|
||||
|
||||
keys.Buf = pm.KeyItemsBytes()
|
||||
keys.Offs = getProllyMapKeyOffsets(pm)
|
||||
@@ -119,8 +119,8 @@ func getProllyMapKeysAndValues(msg Message) (keys, values val.SlicedBuffer, cnt
|
||||
return
|
||||
}
|
||||
|
||||
func walkProllyMapAddresses(ctx context.Context, msg Message, cb func(ctx context.Context, addr hash.Hash) error) error {
|
||||
pm := serial.GetRootAsProllyTreeNode(msg, MessagePrefixSz)
|
||||
func walkProllyMapAddresses(ctx context.Context, msg serial.Message, cb func(ctx context.Context, addr hash.Hash) error) error {
|
||||
pm := serial.GetRootAsProllyTreeNode(msg, serial.MessagePrefixSz)
|
||||
arr := pm.AddressArrayBytes()
|
||||
for i := 0; i < len(arr)/hash.ByteLen; i++ {
|
||||
addr := hash.New(arr[i*addrSize : (i+1)*addrSize])
|
||||
@@ -142,8 +142,8 @@ func walkProllyMapAddresses(ctx context.Context, msg Message, cb func(ctx contex
|
||||
return nil
|
||||
}
|
||||
|
||||
func getProllyMapCount(msg Message) uint16 {
|
||||
pm := serial.GetRootAsProllyTreeNode(msg, MessagePrefixSz)
|
||||
func getProllyMapCount(msg serial.Message) uint16 {
|
||||
pm := serial.GetRootAsProllyTreeNode(msg, serial.MessagePrefixSz)
|
||||
if pm.KeyItemsLength() == 0 {
|
||||
return 0
|
||||
}
|
||||
@@ -151,19 +151,19 @@ func getProllyMapCount(msg Message) uint16 {
|
||||
return uint16(pm.KeyOffsetsLength() + 1)
|
||||
}
|
||||
|
||||
func getProllyMapTreeLevel(msg Message) int {
|
||||
pm := serial.GetRootAsProllyTreeNode(msg, MessagePrefixSz)
|
||||
func getProllyMapTreeLevel(msg serial.Message) int {
|
||||
pm := serial.GetRootAsProllyTreeNode(msg, serial.MessagePrefixSz)
|
||||
return int(pm.TreeLevel())
|
||||
}
|
||||
|
||||
func getProllyMapTreeCount(msg Message) int {
|
||||
pm := serial.GetRootAsProllyTreeNode(msg, MessagePrefixSz)
|
||||
func getProllyMapTreeCount(msg serial.Message) int {
|
||||
pm := serial.GetRootAsProllyTreeNode(msg, serial.MessagePrefixSz)
|
||||
return int(pm.TreeCount())
|
||||
}
|
||||
|
||||
func getProllyMapSubtrees(msg Message) []uint64 {
|
||||
func getProllyMapSubtrees(msg serial.Message) []uint64 {
|
||||
counts := make([]uint64, getProllyMapCount(msg))
|
||||
pm := serial.GetRootAsProllyTreeNode(msg, MessagePrefixSz)
|
||||
pm := serial.GetRootAsProllyTreeNode(msg, serial.MessagePrefixSz)
|
||||
return decodeVarints(pm.SubtreeCountsBytes(), counts)
|
||||
}
|
||||
|
||||
@@ -213,7 +213,7 @@ func estimateProllyMapSize(keys, values [][]byte, subtrees []uint64, valAddrsCnt
|
||||
bufSz += 72 // vtable (approx)
|
||||
bufSz += 100 // padding?
|
||||
bufSz += valAddrsCnt * len(values)
|
||||
bufSz += MessagePrefixSz
|
||||
bufSz += serial.MessagePrefixSz
|
||||
|
||||
return keySz, valSz, bufSz
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"encoding/hex"
|
||||
"io"
|
||||
|
||||
"github.com/dolthub/dolt/go/gen/fb/serial"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
@@ -35,7 +36,7 @@ type Node struct {
|
||||
keys, values val.SlicedBuffer
|
||||
subtrees subtreeCounts
|
||||
count uint16
|
||||
msg message.Message
|
||||
msg serial.Message
|
||||
}
|
||||
|
||||
type AddressCb func(ctx context.Context, addr hash.Hash) error
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
"github.com/dolthub/dolt/go/gen/fb/serial"
|
||||
)
|
||||
|
||||
// NomsKind allows a TypeDesc to indicate what kind of type is described.
|
||||
@@ -127,8 +127,8 @@ func init() {
|
||||
SupportedKinds[PolygonKind] = true
|
||||
SupportedKinds[SerialMessageKind] = true
|
||||
|
||||
if message.MessageTypesKind != int(SerialMessageKind) {
|
||||
panic("internal error: message.MessageTypesKind != SerialMessageKind")
|
||||
if serial.MessageTypesKind != int(SerialMessageKind) {
|
||||
panic("internal error: serial.MessageTypesKind != SerialMessageKind")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -34,7 +34,6 @@ import (
|
||||
"github.com/dolthub/dolt/go/gen/fb/serial"
|
||||
"github.com/dolthub/dolt/go/store/d"
|
||||
"github.com/dolthub/dolt/go/store/hash"
|
||||
"github.com/dolthub/dolt/go/store/prolly/message"
|
||||
)
|
||||
|
||||
// For an annotation like @type, 1st capture group is the annotation.
|
||||
@@ -236,8 +235,8 @@ func (fp FieldPath) Resolve(ctx context.Context, v Value, vr ValueReader) (Value
|
||||
return sv, nil
|
||||
}
|
||||
case SerialMessage:
|
||||
if serial.GetFileID(v[message.MessagePrefixSz:]) == serial.CommitFileID && fp.Name == "value" {
|
||||
msg := serial.GetRootAsCommit(v, message.MessagePrefixSz)
|
||||
if serial.GetFileID(v) == serial.CommitFileID && fp.Name == "value" {
|
||||
msg := serial.GetRootAsCommit(v, serial.MessagePrefixSz)
|
||||
addr := hash.New(msg.RootBytes())
|
||||
return vr.ReadValue(ctx, addr)
|
||||
}
|
||||
|
||||
@@ -53,10 +53,10 @@ func (sm SerialMessage) Hash(nbf *NomsBinFormat) (hash.Hash, error) {
|
||||
}
|
||||
|
||||
func (sm SerialMessage) HumanReadableString() string {
|
||||
id := serial.GetFileID(sm[message.MessagePrefixSz:])
|
||||
id := serial.GetFileID(sm)
|
||||
switch id {
|
||||
case serial.StoreRootFileID:
|
||||
msg := serial.GetRootAsStoreRoot([]byte(sm), message.MessagePrefixSz)
|
||||
msg := serial.GetRootAsStoreRoot([]byte(sm), serial.MessagePrefixSz)
|
||||
ret := &strings.Builder{}
|
||||
mapbytes := msg.AddressMapBytes()
|
||||
fmt.Fprintf(ret, "StoreRoot{%s}", SerialMessage(mapbytes).HumanReadableString())
|
||||
@@ -64,7 +64,7 @@ func (sm SerialMessage) HumanReadableString() string {
|
||||
case serial.TagFileID:
|
||||
return "Tag"
|
||||
case serial.WorkingSetFileID:
|
||||
msg := serial.GetRootAsWorkingSet(sm, message.MessagePrefixSz)
|
||||
msg := serial.GetRootAsWorkingSet(sm, serial.MessagePrefixSz)
|
||||
ret := &strings.Builder{}
|
||||
fmt.Fprintf(ret, "{\n")
|
||||
fmt.Fprintf(ret, "\tName: %s\n", msg.Name())
|
||||
@@ -76,7 +76,7 @@ func (sm SerialMessage) HumanReadableString() string {
|
||||
fmt.Fprintf(ret, "}")
|
||||
return ret.String()
|
||||
case serial.CommitFileID:
|
||||
msg := serial.GetRootAsCommit(sm, message.MessagePrefixSz)
|
||||
msg := serial.GetRootAsCommit(sm, serial.MessagePrefixSz)
|
||||
ret := &strings.Builder{}
|
||||
fmt.Fprintf(ret, "{\n")
|
||||
fmt.Fprintf(ret, "\tName: %s\n", msg.Name())
|
||||
@@ -104,7 +104,7 @@ func (sm SerialMessage) HumanReadableString() string {
|
||||
fmt.Fprintf(ret, "}")
|
||||
return ret.String()
|
||||
case serial.RootValueFileID:
|
||||
msg := serial.GetRootAsRootValue(sm, message.MessagePrefixSz)
|
||||
msg := serial.GetRootAsRootValue(sm, serial.MessagePrefixSz)
|
||||
ret := &strings.Builder{}
|
||||
fmt.Fprintf(ret, "{\n")
|
||||
fmt.Fprintf(ret, "\tFeatureVersion: %d\n", msg.FeatureVersion())
|
||||
@@ -115,7 +115,7 @@ func (sm SerialMessage) HumanReadableString() string {
|
||||
fmt.Fprintf(ret, "}")
|
||||
return ret.String()
|
||||
case serial.TableFileID:
|
||||
msg := serial.GetRootAsTable(sm, message.MessagePrefixSz)
|
||||
msg := serial.GetRootAsTable(sm, serial.MessagePrefixSz)
|
||||
ret := &strings.Builder{}
|
||||
|
||||
fmt.Fprintf(ret, "{\n")
|
||||
@@ -133,7 +133,7 @@ func (sm SerialMessage) HumanReadableString() string {
|
||||
fmt.Fprintf(ret, "}")
|
||||
return ret.String()
|
||||
case serial.AddressMapFileID:
|
||||
keys, values, cnt := message.GetKeysAndValues(message.Message(sm))
|
||||
keys, values, cnt := message.GetKeysAndValues(serial.Message(sm))
|
||||
var b strings.Builder
|
||||
b.Write([]byte("AddressMap{\n"))
|
||||
for i := uint16(0); i < cnt; i++ {
|
||||
@@ -164,15 +164,15 @@ func (sm SerialMessage) Less(nbf *NomsBinFormat, other LesserValuable) (bool, er
|
||||
const SerialMessageRefHeight = 1024
|
||||
|
||||
func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error {
|
||||
switch serial.GetFileID(sm[message.MessagePrefixSz:]) {
|
||||
switch serial.GetFileID(sm) {
|
||||
case serial.StoreRootFileID:
|
||||
msg := serial.GetRootAsStoreRoot([]byte(sm), message.MessagePrefixSz)
|
||||
msg := serial.GetRootAsStoreRoot([]byte(sm), serial.MessagePrefixSz)
|
||||
if msg.AddressMapLength() > 0 {
|
||||
mapbytes := msg.AddressMapBytes()
|
||||
return SerialMessage(mapbytes).walkRefs(nbf, cb)
|
||||
}
|
||||
case serial.TagFileID:
|
||||
msg := serial.GetRootAsTag([]byte(sm), message.MessagePrefixSz)
|
||||
msg := serial.GetRootAsTag([]byte(sm), serial.MessagePrefixSz)
|
||||
addr := hash.New(msg.CommitAddrBytes())
|
||||
r, err := constructRef(nbf, addr, PrimitiveTypeMap[ValueKind], SerialMessageRefHeight)
|
||||
if err != nil {
|
||||
@@ -180,7 +180,7 @@ func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error {
|
||||
}
|
||||
return cb(r)
|
||||
case serial.WorkingSetFileID:
|
||||
msg := serial.GetRootAsWorkingSet([]byte(sm), message.MessagePrefixSz)
|
||||
msg := serial.GetRootAsWorkingSet([]byte(sm), serial.MessagePrefixSz)
|
||||
addr := hash.New(msg.WorkingRootAddrBytes())
|
||||
r, err := constructRef(nbf, addr, PrimitiveTypeMap[ValueKind], SerialMessageRefHeight)
|
||||
if err != nil {
|
||||
@@ -220,7 +220,7 @@ func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error {
|
||||
}
|
||||
}
|
||||
case serial.RootValueFileID:
|
||||
msg := serial.GetRootAsRootValue([]byte(sm), message.MessagePrefixSz)
|
||||
msg := serial.GetRootAsRootValue([]byte(sm), serial.MessagePrefixSz)
|
||||
err := SerialMessage(msg.TablesBytes()).walkRefs(nbf, cb)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -246,7 +246,7 @@ func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error {
|
||||
}
|
||||
}
|
||||
case serial.TableFileID:
|
||||
msg := serial.GetRootAsTable([]byte(sm), message.MessagePrefixSz)
|
||||
msg := serial.GetRootAsTable([]byte(sm), serial.MessagePrefixSz)
|
||||
addr := hash.New(msg.SchemaBytes())
|
||||
r, err := constructRef(nbf, addr, PrimitiveTypeMap[ValueKind], SerialMessageRefHeight)
|
||||
if err != nil {
|
||||
@@ -344,7 +344,7 @@ func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error {
|
||||
return err
|
||||
}
|
||||
}
|
||||
msg := serial.GetRootAsCommit([]byte(sm), message.MessagePrefixSz)
|
||||
msg := serial.GetRootAsCommit([]byte(sm), serial.MessagePrefixSz)
|
||||
addr := hash.New(msg.RootBytes())
|
||||
r, err := constructRef(nbf, addr, PrimitiveTypeMap[ValueKind], SerialMessageRefHeight)
|
||||
if err != nil {
|
||||
@@ -373,7 +373,7 @@ func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error {
|
||||
case serial.AddressMapFileID:
|
||||
fallthrough
|
||||
case serial.CommitClosureFileID:
|
||||
return message.WalkAddresses(context.TODO(), message.Message(sm), func(ctx context.Context, addr hash.Hash) error {
|
||||
return message.WalkAddresses(context.TODO(), serial.Message(sm), func(ctx context.Context, addr hash.Hash) error {
|
||||
r, err := constructRef(nbf, addr, PrimitiveTypeMap[ValueKind], SerialMessageRefHeight)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -381,13 +381,13 @@ func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error {
|
||||
return cb(r)
|
||||
})
|
||||
default:
|
||||
return fmt.Errorf("unsupported SerialMessage message with FileID: %s", serial.GetFileID(sm[message.MessagePrefixSz:]))
|
||||
return fmt.Errorf("unsupported SerialMessage message with FileID: %s", serial.GetFileID(sm))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func SerialCommitParentAddrs(nbf *NomsBinFormat, sm SerialMessage) ([]hash.Hash, error) {
|
||||
msg := serial.GetRootAsCommit([]byte(sm), message.MessagePrefixSz)
|
||||
msg := serial.GetRootAsCommit([]byte(sm), serial.MessagePrefixSz)
|
||||
addrs := msg.ParentAddrsBytes()
|
||||
n := len(addrs) / 20
|
||||
ret := make([]hash.Hash, n)
|
||||
|
||||
Reference in New Issue
Block a user