From 5fcd3ccee44c9a45ee4669fbc46b059e25b5b640 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Fri, 22 Jul 2022 14:16:34 -0700 Subject: [PATCH] go/store/prolly/message: Move MessagePrefixSz, FinishMessage to serial package. --- go/cmd/dolt/commands/roots.go | 5 +- go/gen/fb/serial/fileidentifiers.go | 40 ++++++++++++- go/libraries/doltcore/doltdb/durable/table.go | 9 ++- .../doltdb/foreign_key_serialization.go | 11 ++-- go/libraries/doltcore/doltdb/root_val.go | 11 ++-- .../doltcore/schema/encoding/serialization.go | 7 +-- go/serial/fileidentifiers.go | 40 ++++++++++++- go/store/cmd/noms/noms_show.go | 9 ++- go/store/datas/commit.go | 23 ++++---- go/store/datas/commit_closure.go | 3 +- go/store/datas/dataset.go | 7 +-- go/store/datas/refmap.go | 11 ++-- go/store/datas/tag.go | 7 +-- go/store/datas/workingset.go | 7 +-- go/store/prolly/commit_closure_test.go | 3 +- go/store/prolly/message/address_map.go | 34 +++++------ go/store/prolly/message/commit_closure.go | 34 +++++------ go/store/prolly/message/message.go | 58 ++++--------------- go/store/prolly/message/prolly_map.go | 30 +++++----- go/store/prolly/tree/node.go | 3 +- go/store/types/noms_kind.go | 6 +- go/store/types/path.go | 5 +- go/store/types/serial_message.go | 34 +++++------ 23 files changed, 211 insertions(+), 186 deletions(-) diff --git a/go/cmd/dolt/commands/roots.go b/go/cmd/dolt/commands/roots.go index 0a305f849e..913c7d912a 100644 --- a/go/cmd/dolt/commands/roots.go +++ b/go/cmd/dolt/commands/roots.go @@ -34,7 +34,6 @@ import ( "github.com/dolthub/dolt/go/libraries/utils/filesys" "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/nbs" - "github.com/dolthub/dolt/go/store/prolly/message" "github.com/dolthub/dolt/go/store/prolly/tree" "github.com/dolthub/dolt/go/store/spec" "github.com/dolthub/dolt/go/store/types" @@ -152,8 +151,8 @@ func (cmd RootsCmd) processTableFile(ctx context.Context, path string, modified cli.Println() } } else if sm, ok := value.(types.SerialMessage); ok { - if serial.GetFileID(sm[message.MessagePrefixSz:]) == serial.StoreRootFileID { - msg := serial.GetRootAsStoreRoot([]byte(sm), message.MessagePrefixSz) + if serial.GetFileID(sm) == serial.StoreRootFileID { + msg := serial.GetRootAsStoreRoot([]byte(sm), serial.MessagePrefixSz) ambytes := msg.AddressMapBytes() node := tree.NodeFromBytes(ambytes) err := tree.OutputAddressMapNode(cli.OutStream, node) diff --git a/go/gen/fb/serial/fileidentifiers.go b/go/gen/fb/serial/fileidentifiers.go index 228ffd1364..d60ee6f774 100644 --- a/go/gen/fb/serial/fileidentifiers.go +++ b/go/gen/fb/serial/fileidentifiers.go @@ -15,7 +15,10 @@ package serial import ( + "encoding/binary" "unsafe" + + fb "github.com/google/flatbuffers/go" ) // KEEP THESE IN SYNC WITH .fbs FILES! @@ -32,11 +35,44 @@ const CommitClosureFileID = "CMCL" const TableSchemaFileID = "DSCH" const ForeignKeyCollectionFileID = "DFKC" +const MessageTypesKind int = 27 + +const MessagePrefixSz = 4 + +type Message []byte + func GetFileID(bs []byte) string { - if len(bs) < 8 { + if len(bs) < 8+MessagePrefixSz { return "" } - return byteSliceToString(bs[4:8]) + return byteSliceToString(bs[MessagePrefixSz+4 : MessagePrefixSz+8]) +} + +func FinishMessage(b *fb.Builder, off fb.UOffsetT, fileID []byte) Message { + // We finish the buffer by prefixing it with: + // 1) 1 byte NomsKind == SerialMessage. + // 2) big endian 3 byte uint representing the size of the message, not + // including the kind or size prefix bytes. + // + // This allows chunks we serialize here to be read by types binary + // codec. + // + // All accessors in this package expect this prefix to be on the front + // of the message bytes as well. See |MessagePrefixSz|. + + b.Prep(1, fb.SizeInt32+4+MessagePrefixSz) + b.FinishWithFileIdentifier(off, fileID) + + var size [4]byte + binary.BigEndian.PutUint32(size[:], uint32(len(b.Bytes)-int(b.Head()))) + if size[0] != 0 { + panic("message is too large to be encoded") + } + + bytes := b.Bytes[b.Head()-MessagePrefixSz:] + bytes[0] = byte(MessageTypesKind) + copy(bytes[1:], size[1:]) + return bytes } // byteSliceToString converts a []byte to string without a heap allocation. diff --git a/go/libraries/doltcore/doltdb/durable/table.go b/go/libraries/doltcore/doltdb/durable/table.go index 430cdd6aea..45e76d5da9 100644 --- a/go/libraries/doltcore/doltdb/durable/table.go +++ b/go/libraries/doltcore/doltdb/durable/table.go @@ -30,7 +30,6 @@ import ( "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/pool" "github.com/dolthub/dolt/go/store/prolly" - "github.com/dolthub/dolt/go/store/prolly/message" "github.com/dolthub/dolt/go/store/prolly/shim" "github.com/dolthub/dolt/go/store/prolly/tree" "github.com/dolthub/dolt/go/store/types" @@ -195,12 +194,12 @@ func TableFromAddr(ctx context.Context, vrw types.ValueReadWriter, ns tree.NodeS err = errors.New("table ref is unexpected noms value; not SerialMessage") return nil, err } - id := serial.GetFileID(sm[message.MessagePrefixSz:]) + id := serial.GetFileID(sm) if id != serial.TableFileID { err = errors.New("table ref is unexpected noms value; GetFileID == " + id) return nil, err } - return doltDevTable{vrw, ns, serial.GetRootAsTable([]byte(sm), message.MessagePrefixSz)}, nil + return doltDevTable{vrw, ns, serial.GetRootAsTable([]byte(sm), serial.MessagePrefixSz)}, nil } } @@ -774,8 +773,8 @@ func (fields serialTableFields) write() *serial.Table { serial.TableAddConflicts(builder, conflictsoff) serial.TableAddViolations(builder, violationsoff) serial.TableAddArtifacts(builder, artifactsoff) - bs := message.FinishMessage(builder, serial.TableEnd(builder), []byte(serial.TableFileID)) - return serial.GetRootAsTable(bs, message.MessagePrefixSz) + bs := serial.FinishMessage(builder, serial.TableEnd(builder), []byte(serial.TableFileID)) + return serial.GetRootAsTable(bs, serial.MessagePrefixSz) } func newDoltDevTable(ctx context.Context, vrw types.ValueReadWriter, ns tree.NodeStore, sch schema.Schema, rows Index, indexes IndexSet, autoIncVal types.Value) (Table, error) { diff --git a/go/libraries/doltcore/doltdb/foreign_key_serialization.go b/go/libraries/doltcore/doltdb/foreign_key_serialization.go index 131563dc51..b24454930c 100644 --- a/go/libraries/doltcore/doltdb/foreign_key_serialization.go +++ b/go/libraries/doltcore/doltdb/foreign_key_serialization.go @@ -22,7 +22,6 @@ import ( "github.com/dolthub/dolt/go/gen/fb/serial" "github.com/dolthub/dolt/go/store/marshal" - "github.com/dolthub/dolt/go/store/prolly/message" "github.com/dolthub/dolt/go/store/types" ) @@ -85,11 +84,11 @@ func serializeNomsForeignKeys(ctx context.Context, vrw types.ValueReadWriter, fk // deserializeNomsForeignKeys returns a new ForeignKeyCollection using the provided map returned previously by GetMap. func deserializeFlatbufferForeignKeys(msg types.SerialMessage) (*ForeignKeyCollection, error) { - if serial.GetFileID(msg[message.MessagePrefixSz:]) != serial.ForeignKeyCollectionFileID { + if serial.GetFileID(msg) != serial.ForeignKeyCollectionFileID { return nil, fmt.Errorf("expect Serial Message with ForeignKeyCollectionFileID") } - c := serial.GetRootAsForeignKeyCollection(msg, message.MessagePrefixSz) + c := serial.GetRootAsForeignKeyCollection(msg, serial.MessagePrefixSz) collection := &ForeignKeyCollection{ foreignKeys: make(map[string]ForeignKey, c.ForeignKeysLength()), } @@ -204,7 +203,7 @@ func serializeFlatbufferForeignKeys(fkc *ForeignKeyCollection) types.SerialMessa serial.ForeignKeyCollectionStart(b) serial.ForeignKeyCollectionAddForeignKeys(b, vec) o := serial.ForeignKeyCollectionEnd(b) - return []byte(message.FinishMessage(b, o, []byte(serial.ForeignKeyCollectionFileID))) + return []byte(serial.FinishMessage(b, o, []byte(serial.ForeignKeyCollectionFileID))) } func serializeStringVector(b *fb.Builder, s []string) fb.UOffsetT { @@ -228,9 +227,9 @@ func serializeUint64Vector(b *fb.Builder, u []uint64) fb.UOffsetT { } func emptyForeignKeyCollection(msg types.SerialMessage) bool { - if serial.GetFileID(msg[message.MessagePrefixSz:]) != serial.ForeignKeyCollectionFileID { + if serial.GetFileID(msg) != serial.ForeignKeyCollectionFileID { return false } - c := serial.GetRootAsForeignKeyCollection(msg, message.MessagePrefixSz) + c := serial.GetRootAsForeignKeyCollection(msg, serial.MessagePrefixSz) return c.ForeignKeysLength() == 0 } diff --git a/go/libraries/doltcore/doltdb/root_val.go b/go/libraries/doltcore/doltdb/root_val.go index a859742698..e42c3c286d 100644 --- a/go/libraries/doltcore/doltdb/root_val.go +++ b/go/libraries/doltcore/doltdb/root_val.go @@ -29,7 +29,6 @@ import ( "github.com/dolthub/dolt/go/store/datas" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/prolly" - "github.com/dolthub/dolt/go/store/prolly/message" "github.com/dolthub/dolt/go/store/prolly/shim" "github.com/dolthub/dolt/go/store/prolly/tree" "github.com/dolthub/dolt/go/store/types" @@ -256,7 +255,7 @@ func newRootValue(vrw types.ValueReadWriter, ns tree.NodeStore, v types.Value) ( var storage rvStorage if vrw.Format().UsesFlatbuffers() { - srv := serial.GetRootAsRootValue([]byte(v.(types.SerialMessage)), message.MessagePrefixSz) + srv := serial.GetRootAsRootValue([]byte(v.(types.SerialMessage)), serial.MessagePrefixSz) storage = fbRvStorage{srv} } else { st, ok := v.(types.Struct) @@ -307,7 +306,7 @@ func decodeRootNomsValue(vrw types.ValueReadWriter, ns tree.NodeStore, val types func isRootValue(nbf *types.NomsBinFormat, val types.Value) bool { if nbf.UsesFlatbuffers() { if sm, ok := val.(types.SerialMessage); ok { - return string(serial.GetFileID(sm[message.MessagePrefixSz:])) == serial.RootValueFileID + return string(serial.GetFileID(sm)) == serial.RootValueFileID } } else { if st, ok := val.(types.Struct); ok { @@ -333,7 +332,7 @@ func EmptyRootValue(ctx context.Context, vrw types.ValueReadWriter, ns tree.Node serial.RootValueAddTables(builder, tablesoff) serial.RootValueAddForeignKeyAddr(builder, fkoff) serial.RootValueAddSuperSchemasAddr(builder, ssoff) - bs := message.FinishMessage(builder, serial.RootValueEnd(builder), []byte(serial.RootValueFileID)) + bs := serial.FinishMessage(builder, serial.RootValueEnd(builder), []byte(serial.RootValueFileID)) return newRootValue(vrw, ns, types.SerialMessage(bs)) } @@ -1269,8 +1268,8 @@ func (r fbRvStorage) EditTablesMap(ctx context.Context, vrw types.ValueReadWrite serial.RootValueAddForeignKeyAddr(builder, fkoff) serial.RootValueAddSuperSchemasAddr(builder, ssoff) - bs := message.FinishMessage(builder, serial.RootValueEnd(builder), []byte(serial.RootValueFileID)) - return fbRvStorage{serial.GetRootAsRootValue(bs, message.MessagePrefixSz)}, nil + bs := serial.FinishMessage(builder, serial.RootValueEnd(builder), []byte(serial.RootValueFileID)) + return fbRvStorage{serial.GetRootAsRootValue(bs, serial.MessagePrefixSz)}, nil } func (r fbRvStorage) SetForeignKeyMap(ctx context.Context, vrw types.ValueReadWriter, v types.Value) (rvStorage, error) { diff --git a/go/libraries/doltcore/schema/encoding/serialization.go b/go/libraries/doltcore/schema/encoding/serialization.go index 078d8d2c8b..812015898d 100644 --- a/go/libraries/doltcore/schema/encoding/serialization.go +++ b/go/libraries/doltcore/schema/encoding/serialization.go @@ -25,7 +25,6 @@ import ( "github.com/dolthub/dolt/go/gen/fb/serial" "github.com/dolthub/dolt/go/libraries/doltcore/schema" "github.com/dolthub/dolt/go/libraries/doltcore/schema/typeinfo" - "github.com/dolthub/dolt/go/store/prolly/message" "github.com/dolthub/dolt/go/store/types" ) @@ -63,7 +62,7 @@ func serializeSchemaAsFlatbuffer(sch schema.Schema) ([]byte, error) { serial.TableSchemaAddSecondaryIndexes(b, indexes) serial.TableSchemaAddChecks(b, checks) root := serial.TableSchemaEnd(b) - bs := message.FinishMessage(b, root, []byte(serial.TableSchemaFileID)) + bs := serial.FinishMessage(b, root, []byte(serial.TableSchemaFileID)) return bs, nil } @@ -76,8 +75,8 @@ func DeserializeSchema(ctx context.Context, nbf *types.NomsBinFormat, v types.Va } func deserializeSchemaFromFlatbuffer(ctx context.Context, buf []byte) (schema.Schema, error) { - assertTrue(serial.GetFileID(buf[message.MessagePrefixSz:]) == serial.TableSchemaFileID) - s := serial.GetRootAsTableSchema(buf, message.MessagePrefixSz) + assertTrue(serial.GetFileID(buf) == serial.TableSchemaFileID) + s := serial.GetRootAsTableSchema(buf, serial.MessagePrefixSz) cols, err := deserializeColumns(ctx, s) if err != nil { diff --git a/go/serial/fileidentifiers.go b/go/serial/fileidentifiers.go index 228ffd1364..59d877fa98 100644 --- a/go/serial/fileidentifiers.go +++ b/go/serial/fileidentifiers.go @@ -15,7 +15,10 @@ package serial import ( + "encoding/binary" "unsafe" + + fb "github.com/google/flatbuffers/go" ) // KEEP THESE IN SYNC WITH .fbs FILES! @@ -32,11 +35,44 @@ const CommitClosureFileID = "CMCL" const TableSchemaFileID = "DSCH" const ForeignKeyCollectionFileID = "DFKC" +const MessageTypesKind int = 27 + +const MessagePrefixSz = 4 + +type Message []byte + func GetFileID(bs []byte) string { - if len(bs) < 8 { + if len(bs) < 8 + MessagePrefixSz { return "" } - return byteSliceToString(bs[4:8]) + return byteSliceToString(bs[MessagePrefixSz+4:MessagePrefixSz+8]) +} + +func FinishMessage(b *fb.Builder, off fb.UOffsetT, fileID []byte) Message { + // We finish the buffer by prefixing it with: + // 1) 1 byte NomsKind == SerialMessage. + // 2) big endian 3 byte uint representing the size of the message, not + // including the kind or size prefix bytes. + // + // This allows chunks we serialize here to be read by types binary + // codec. + // + // All accessors in this package expect this prefix to be on the front + // of the message bytes as well. See |MessagePrefixSz|. + + b.Prep(1, fb.SizeInt32+4+MessagePrefixSz) + b.FinishWithFileIdentifier(off, fileID) + + var size [4]byte + binary.BigEndian.PutUint32(size[:], uint32(len(b.Bytes)-int(b.Head()))) + if size[0] != 0 { + panic("message is too large to be encoded") + } + + bytes := b.Bytes[b.Head()-MessagePrefixSz:] + bytes[0] = byte(MessageTypesKind) + copy(bytes[1:], size[1:]) + return bytes } // byteSliceToString converts a []byte to string without a heap allocation. diff --git a/go/store/cmd/noms/noms_show.go b/go/store/cmd/noms/noms_show.go index e75d4ad4dc..e2d4844f60 100644 --- a/go/store/cmd/noms/noms_show.go +++ b/go/store/cmd/noms/noms_show.go @@ -34,7 +34,6 @@ import ( "github.com/dolthub/dolt/go/store/cmd/noms/util" "github.com/dolthub/dolt/go/store/config" "github.com/dolthub/dolt/go/store/hash" - "github.com/dolthub/dolt/go/store/prolly/message" "github.com/dolthub/dolt/go/store/prolly/shim" "github.com/dolthub/dolt/go/store/prolly/tree" "github.com/dolthub/dolt/go/store/types" @@ -128,7 +127,7 @@ func outputType(value types.Value) { var typeString string switch value := value.(type) { case types.SerialMessage: - switch serial.GetFileID(value[message.MessagePrefixSz:]) { + switch serial.GetFileID(value) { case serial.StoreRootFileID: typeString = "StoreRoot" case serial.TagFileID: @@ -162,9 +161,9 @@ func outputEncodedValue(ctx context.Context, w io.Writer, value types.Value) err switch value := value.(type) { // Some types of serial message need to be output here because of dependency cycles between types / tree package case types.SerialMessage: - switch serial.GetFileID(value[message.MessagePrefixSz:]) { + switch serial.GetFileID(value) { case serial.TableFileID: - msg := serial.GetRootAsTable(value, message.MessagePrefixSz) + msg := serial.GetRootAsTable(value, serial.MessagePrefixSz) fmt.Fprintf(w, "{\n") fmt.Fprintf(w, "\tSchema: #%s\n", hash.New(msg.SchemaBytes()).String()) @@ -191,7 +190,7 @@ func outputEncodedValue(ctx context.Context, w io.Writer, value types.Value) err return nil case serial.StoreRootFileID: - msg := serial.GetRootAsStoreRoot(value, message.MessagePrefixSz) + msg := serial.GetRootAsStoreRoot(value, serial.MessagePrefixSz) ambytes := msg.AddressMapBytes() node := tree.NodeFromBytes(ambytes) return tree.OutputAddressMapNode(w, node) diff --git a/go/store/datas/commit.go b/go/store/datas/commit.go index eff42125af..68538571e2 100644 --- a/go/store/datas/commit.go +++ b/go/store/datas/commit.go @@ -34,7 +34,6 @@ import ( "github.com/dolthub/dolt/go/store/d" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/nomdl" - "github.com/dolthub/dolt/go/store/prolly/message" "github.com/dolthub/dolt/go/store/prolly/tree" "github.com/dolthub/dolt/go/store/types" "github.com/dolthub/dolt/go/store/val" @@ -137,7 +136,7 @@ func NewCommitForValue(ctx context.Context, cs chunks.ChunkStore, vrw types.Valu return newCommitForValue(ctx, cs, vrw, ns, v, opts) } -func commit_flatbuffer(vaddr hash.Hash, opts CommitOptions, heights []uint64, parentsClosureAddr hash.Hash) (message.Message, uint64) { +func commit_flatbuffer(vaddr hash.Hash, opts CommitOptions, heights []uint64, parentsClosureAddr hash.Hash) (serial.Message, uint64) { builder := flatbuffers.NewBuilder(1024) vaddroff := builder.CreateByteVector(vaddr[:]) @@ -176,7 +175,7 @@ func commit_flatbuffer(vaddr hash.Hash, opts CommitOptions, heights []uint64, pa serial.CommitAddTimestampMillis(builder, opts.Meta.Timestamp) serial.CommitAddUserTimestampMillis(builder, opts.Meta.UserTimestamp) - bytes := message.FinishMessage(builder, serial.CommitEnd(builder), []byte(serial.CommitFileID)) + bytes := serial.FinishMessage(builder, serial.CommitEnd(builder), []byte(serial.CommitFileID)) return bytes, maxheight + 1 } @@ -203,7 +202,7 @@ func newCommitForValue(ctx context.Context, cs chunks.ChunkStore, vrw types.Valu return nil, err } for i := range heights { - parents[i] = serial.GetRootAsCommit([]byte(parentValues[i].(types.SerialMessage)), message.MessagePrefixSz) + parents[i] = serial.GetRootAsCommit([]byte(parentValues[i].(types.SerialMessage)), serial.MessagePrefixSz) heights[i] = parents[i].Height() } parentClosureAddr, err := writeFbCommitParentClosure(ctx, cs, vrw, ns, parents, opts.Parents) @@ -263,7 +262,7 @@ func newCommitForValue(ctx context.Context, cs chunks.ChunkStore, vrw types.Valu func commitPtr(nbf *types.NomsBinFormat, v types.Value, r *types.Ref) (*Commit, error) { if nbf.UsesFlatbuffers() { bs := []byte(v.(types.SerialMessage)) - height := serial.GetRootAsCommit(bs, message.MessagePrefixSz).Height() + height := serial.GetRootAsCommit(bs, serial.MessagePrefixSz).Height() var addr hash.Hash if r != nil { addr = r.TargetHash() @@ -432,7 +431,7 @@ func FindClosureCommonAncestor(ctx context.Context, cl CommitClosure, cm *Commit func GetCommitParents(ctx context.Context, vr types.ValueReader, cv types.Value) ([]*Commit, error) { if sm, ok := cv.(types.SerialMessage); ok { data := []byte(sm) - if serial.GetFileID(data[message.MessagePrefixSz:]) != serial.CommitFileID { + if serial.GetFileID(data) != serial.CommitFileID { return nil, errors.New("GetCommitParents: provided value is not a commit.") } addrs, err := types.SerialCommitParentAddrs(vr.Format(), sm) @@ -445,7 +444,7 @@ func GetCommitParents(ctx context.Context, vr types.ValueReader, cv types.Value) if v == nil { return nil, fmt.Errorf("GetCommitParents: Did not find parent Commit in ValueReader: %s", addrs[i].String()) } - csm := serial.GetRootAsCommit([]byte(v.(types.SerialMessage)), message.MessagePrefixSz) + csm := serial.GetRootAsCommit([]byte(v.(types.SerialMessage)), serial.MessagePrefixSz) res[i] = &Commit{ val: v, height: csm.Height(), @@ -512,10 +511,10 @@ func GetCommitParents(ctx context.Context, vr types.ValueReader, cv types.Value) func GetCommitMeta(ctx context.Context, cv types.Value) (*CommitMeta, error) { if sm, ok := cv.(types.SerialMessage); ok { data := []byte(sm) - if serial.GetFileID(data[message.MessagePrefixSz:]) != serial.CommitFileID { + if serial.GetFileID(data) != serial.CommitFileID { return nil, errors.New("GetCommitMeta: provided value is not a commit.") } - cmsg := serial.GetRootAsCommit(data, message.MessagePrefixSz) + cmsg := serial.GetRootAsCommit(data, serial.MessagePrefixSz) ret := &CommitMeta{} ret.Name = string(cmsg.Name()) ret.Email = string(cmsg.Email()) @@ -548,10 +547,10 @@ func GetCommitMeta(ctx context.Context, cv types.Value) (*CommitMeta, error) { func GetCommittedValue(ctx context.Context, vr types.ValueReader, cv types.Value) (types.Value, error) { if sm, ok := cv.(types.SerialMessage); ok { data := []byte(sm) - if serial.GetFileID(data[message.MessagePrefixSz:]) != serial.CommitFileID { + if serial.GetFileID(data) != serial.CommitFileID { return nil, errors.New("GetCommittedValue: provided value is not a commit.") } - cmsg := serial.GetRootAsCommit(data, message.MessagePrefixSz) + cmsg := serial.GetRootAsCommit(data, serial.MessagePrefixSz) var roothash hash.Hash copy(roothash[:], cmsg.RootBytes()) return vr.ReadValue(ctx, roothash) @@ -674,7 +673,7 @@ func IsCommit(v types.Value) (bool, error) { return types.IsValueSubtypeOf(s.Format(), v, valueCommitType) } else if sm, ok := v.(types.SerialMessage); ok { data := []byte(sm) - return serial.GetFileID(data[message.MessagePrefixSz:]) == serial.CommitFileID, nil + return serial.GetFileID(data) == serial.CommitFileID, nil } else { return false, nil } diff --git a/go/store/datas/commit_closure.go b/go/store/datas/commit_closure.go index c5f5e3dc7f..5b3e59f081 100644 --- a/go/store/datas/commit_closure.go +++ b/go/store/datas/commit_closure.go @@ -24,7 +24,6 @@ import ( "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/prolly" - "github.com/dolthub/dolt/go/store/prolly/message" "github.com/dolthub/dolt/go/store/prolly/tree" "github.com/dolthub/dolt/go/store/types" ) @@ -33,7 +32,7 @@ func newParentsClosureIterator(ctx context.Context, c *Commit, vr types.ValueRea sv := c.NomsValue() if _, ok := sv.(types.SerialMessage); ok { - msg := serial.GetRootAsCommit(sv.(types.SerialMessage), message.MessagePrefixSz) + msg := serial.GetRootAsCommit(sv.(types.SerialMessage), serial.MessagePrefixSz) addr := hash.New(msg.ParentClosureBytes()) if addr.IsEmpty() { return nil, nil diff --git a/go/store/datas/dataset.go b/go/store/datas/dataset.go index 9636f71272..55cc79bc18 100644 --- a/go/store/datas/dataset.go +++ b/go/store/datas/dataset.go @@ -29,7 +29,6 @@ import ( "github.com/dolthub/dolt/go/gen/fb/serial" "github.com/dolthub/dolt/go/store/hash" - "github.com/dolthub/dolt/go/store/prolly/message" "github.com/dolthub/dolt/go/store/types" ) @@ -149,7 +148,7 @@ type serialTagHead struct { } func newSerialTagHead(bs []byte, addr hash.Hash) serialTagHead { - return serialTagHead{serial.GetRootAsTag(bs, message.MessagePrefixSz), addr} + return serialTagHead{serial.GetRootAsTag(bs, serial.MessagePrefixSz), addr} } func (h serialTagHead) TypeName() string { @@ -186,7 +185,7 @@ type serialWorkingSetHead struct { } func newSerialWorkingSetHead(bs []byte, addr hash.Hash) serialWorkingSetHead { - return serialWorkingSetHead{serial.GetRootAsWorkingSet(bs, message.MessagePrefixSz), addr} + return serialWorkingSetHead{serial.GetRootAsWorkingSet(bs, serial.MessagePrefixSz), addr} } func (h serialWorkingSetHead) TypeName() string { @@ -305,7 +304,7 @@ func newHead(head types.Value, addr hash.Hash) (dsHead, error) { if sm, ok := head.(types.SerialMessage); ok { data := []byte(sm) - fid := serial.GetFileID(data[message.MessagePrefixSz:]) + fid := serial.GetFileID(data) if fid == serial.TagFileID { return newSerialTagHead(data, addr), nil } diff --git a/go/store/datas/refmap.go b/go/store/datas/refmap.go index 760ac2a2c1..84761e19d3 100644 --- a/go/store/datas/refmap.go +++ b/go/store/datas/refmap.go @@ -19,25 +19,24 @@ import ( "github.com/dolthub/dolt/go/gen/fb/serial" "github.com/dolthub/dolt/go/store/prolly" - "github.com/dolthub/dolt/go/store/prolly/message" "github.com/dolthub/dolt/go/store/prolly/tree" "github.com/dolthub/dolt/go/store/types" ) -func storeroot_flatbuffer(am prolly.AddressMap) message.Message { +func storeroot_flatbuffer(am prolly.AddressMap) serial.Message { builder := flatbuffers.NewBuilder(1024) ambytes := []byte(tree.ValueFromNode(am.Node()).(types.SerialMessage)) voff := builder.CreateByteVector(ambytes) serial.StoreRootStart(builder) serial.StoreRootAddAddressMap(builder, voff) - return message.FinishMessage(builder, serial.StoreRootEnd(builder), []byte(serial.StoreRootFileID)) + return serial.FinishMessage(builder, serial.StoreRootEnd(builder), []byte(serial.StoreRootFileID)) } func parse_storeroot(bs []byte, ns tree.NodeStore) prolly.AddressMap { - if serial.GetFileID(bs[message.MessagePrefixSz:]) != serial.StoreRootFileID { - panic("expected store root file id, got: " + serial.GetFileID(bs[message.MessagePrefixSz:])) + if serial.GetFileID(bs) != serial.StoreRootFileID { + panic("expected store root file id, got: " + serial.GetFileID(bs)) } - sr := serial.GetRootAsStoreRoot(bs, message.MessagePrefixSz) + sr := serial.GetRootAsStoreRoot(bs, serial.MessagePrefixSz) mapbytes := sr.AddressMapBytes() node := tree.NodeFromBytes(mapbytes) return prolly.NewAddressMap(node, ns) diff --git a/go/store/datas/tag.go b/go/store/datas/tag.go index 1cf0db75b8..42caa2a661 100644 --- a/go/store/datas/tag.go +++ b/go/store/datas/tag.go @@ -23,7 +23,6 @@ import ( "github.com/dolthub/dolt/go/gen/fb/serial" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/nomdl" - "github.com/dolthub/dolt/go/store/prolly/message" "github.com/dolthub/dolt/go/store/types" ) @@ -112,7 +111,7 @@ func newTag(ctx context.Context, db *database, commitAddr hash.Hash, meta *TagMe } } -func tag_flatbuffer(commitAddr hash.Hash, meta *TagMeta) message.Message { +func tag_flatbuffer(commitAddr hash.Hash, meta *TagMeta) serial.Message { builder := flatbuffers.NewBuilder(1024) addroff := builder.CreateByteVector(commitAddr[:]) var nameOff, emailOff, descOff flatbuffers.UOffsetT @@ -130,7 +129,7 @@ func tag_flatbuffer(commitAddr hash.Hash, meta *TagMeta) message.Message { serial.TagAddTimestampMillis(builder, meta.Timestamp) serial.TagAddUserTimestampMillis(builder, meta.UserTimestamp) } - return message.FinishMessage(builder, serial.TagEnd(builder), []byte(serial.TagFileID)) + return serial.FinishMessage(builder, serial.TagEnd(builder), []byte(serial.TagFileID)) } func IsTag(v types.Value) (bool, error) { @@ -138,7 +137,7 @@ func IsTag(v types.Value) (bool, error) { return types.IsValueSubtypeOf(s.Format(), v, valueTagType) } else if sm, ok := v.(types.SerialMessage); ok { data := []byte(sm) - return serial.GetFileID(data[message.MessagePrefixSz:]) == serial.TagFileID, nil + return serial.GetFileID(data) == serial.TagFileID, nil } return false, nil } diff --git a/go/store/datas/workingset.go b/go/store/datas/workingset.go index 9c1c30e92e..fe7ae71f9b 100755 --- a/go/store/datas/workingset.go +++ b/go/store/datas/workingset.go @@ -21,7 +21,6 @@ import ( "github.com/dolthub/dolt/go/gen/fb/serial" "github.com/dolthub/dolt/go/store/hash" - "github.com/dolthub/dolt/go/store/prolly/message" "github.com/dolthub/dolt/go/store/types" ) @@ -149,7 +148,7 @@ func newWorkingSet(ctx context.Context, db *database, meta *WorkingSetMeta, work return ref.TargetHash(), ref, nil } -func workingset_flatbuffer(working hash.Hash, staged *hash.Hash, mergeState *MergeState, meta *WorkingSetMeta) message.Message { +func workingset_flatbuffer(working hash.Hash, staged *hash.Hash, mergeState *MergeState, meta *WorkingSetMeta) serial.Message { builder := flatbuffers.NewBuilder(1024) workingoff := builder.CreateByteVector(working[:]) var stagedOff, mergeStateOff flatbuffers.UOffsetT @@ -186,7 +185,7 @@ func workingset_flatbuffer(working hash.Hash, staged *hash.Hash, mergeState *Mer serial.WorkingSetAddDesc(builder, descOff) serial.WorkingSetAddTimestampMillis(builder, meta.Timestamp) } - return message.FinishMessage(builder, serial.WorkingSetEnd(builder), []byte(serial.WorkingSetFileID)) + return serial.FinishMessage(builder, serial.WorkingSetEnd(builder), []byte(serial.WorkingSetFileID)) } func NewMergeState(ctx context.Context, vrw types.ValueReadWriter, preMergeWorking types.Ref, commit *Commit) (*MergeState, error) { @@ -221,7 +220,7 @@ func IsWorkingSet(v types.Value) (bool, error) { // types.IsValueSubtypeOf is very strict about the type description. return s.Name() == workingSetName, nil } else if sm, ok := v.(types.SerialMessage); ok { - return serial.GetFileID(sm[message.MessagePrefixSz:]) == serial.WorkingSetFileID, nil + return serial.GetFileID(sm) == serial.WorkingSetFileID, nil } else { return false, nil } diff --git a/go/store/prolly/commit_closure_test.go b/go/store/prolly/commit_closure_test.go index 9264010cf4..ebd6a42e98 100644 --- a/go/store/prolly/commit_closure_test.go +++ b/go/store/prolly/commit_closure_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/dolthub/dolt/go/gen/fb/serial" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/prolly/message" "github.com/dolthub/dolt/go/store/prolly/tree" @@ -148,7 +149,7 @@ func TestCommitClosure(t *testing.T) { assert.Equal(t, 4096, cc.Count()) // Walk the addresses in the root. - msg := message.Message(tree.ValueFromNode(cc.closure.root).(types.SerialMessage)) + msg := serial.Message(tree.ValueFromNode(cc.closure.root).(types.SerialMessage)) numaddresses := 0 err = message.WalkAddresses(ctx, msg, func(ctx context.Context, addr hash.Hash) error { numaddresses++ diff --git a/go/store/prolly/message/address_map.go b/go/store/prolly/message/address_map.go index becab1f87a..b1fbcd6021 100644 --- a/go/store/prolly/message/address_map.go +++ b/go/store/prolly/message/address_map.go @@ -40,7 +40,7 @@ type AddressMapSerializer struct { var _ Serializer = AddressMapSerializer{} -func (s AddressMapSerializer) Serialize(keys, addrs [][]byte, subtrees []uint64, level int) Message { +func (s AddressMapSerializer) Serialize(keys, addrs [][]byte, subtrees []uint64, level int) serial.Message { var ( keyArr, keyOffs fb.UOffsetT addrArr, cardArr fb.UOffsetT @@ -75,25 +75,25 @@ func (s AddressMapSerializer) Serialize(keys, addrs [][]byte, subtrees []uint64, } serial.AddressMapAddTreeLevel(b, uint8(level)) - return FinishMessage(b, serial.AddressMapEnd(b), addressMapFileID) + return serial.FinishMessage(b, serial.AddressMapEnd(b), addressMapFileID) } -func getAddressMapKeys(msg Message) (keys val.SlicedBuffer) { - am := serial.GetRootAsAddressMap(msg, MessagePrefixSz) +func getAddressMapKeys(msg serial.Message) (keys val.SlicedBuffer) { + am := serial.GetRootAsAddressMap(msg, serial.MessagePrefixSz) keys.Buf = am.KeyItemsBytes() keys.Offs = getAddressMapKeyOffsets(am) return } -func getAddressMapValues(msg Message) (values val.SlicedBuffer) { - am := serial.GetRootAsAddressMap(msg, MessagePrefixSz) +func getAddressMapValues(msg serial.Message) (values val.SlicedBuffer) { + am := serial.GetRootAsAddressMap(msg, serial.MessagePrefixSz) values.Buf = am.AddressArrayBytes() values.Offs = offsetsForAddressArray(values.Buf) return } -func walkAddressMapAddresses(ctx context.Context, msg Message, cb func(ctx context.Context, addr hash.Hash) error) error { - am := serial.GetRootAsAddressMap(msg, MessagePrefixSz) +func walkAddressMapAddresses(ctx context.Context, msg serial.Message, cb func(ctx context.Context, addr hash.Hash) error) error { + am := serial.GetRootAsAddressMap(msg, serial.MessagePrefixSz) arr := am.AddressArrayBytes() for i := 0; i < len(arr)/hash.ByteLen; i++ { addr := hash.New(arr[i*addrSize : (i+1)*addrSize]) @@ -104,8 +104,8 @@ func walkAddressMapAddresses(ctx context.Context, msg Message, cb func(ctx conte return nil } -func getAddressMapCount(msg Message) uint16 { - am := serial.GetRootAsAddressMap(msg, MessagePrefixSz) +func getAddressMapCount(msg serial.Message) uint16 { + am := serial.GetRootAsAddressMap(msg, serial.MessagePrefixSz) if am.KeyItemsLength() == 0 { return 0 } @@ -113,19 +113,19 @@ func getAddressMapCount(msg Message) uint16 { return uint16(am.KeyOffsetsLength() + 1) } -func getAddressMapTreeLevel(msg Message) int { - am := serial.GetRootAsAddressMap(msg, MessagePrefixSz) +func getAddressMapTreeLevel(msg serial.Message) int { + am := serial.GetRootAsAddressMap(msg, serial.MessagePrefixSz) return int(am.TreeLevel()) } -func getAddressMapTreeCount(msg Message) int { - am := serial.GetRootAsAddressMap(msg, MessagePrefixSz) +func getAddressMapTreeCount(msg serial.Message) int { + am := serial.GetRootAsAddressMap(msg, serial.MessagePrefixSz) return int(am.TreeCount()) } -func getAddressMapSubtrees(msg Message) []uint64 { +func getAddressMapSubtrees(msg serial.Message) []uint64 { counts := make([]uint64, getAddressMapCount(msg)) - am := serial.GetRootAsAddressMap(msg, MessagePrefixSz) + am := serial.GetRootAsAddressMap(msg, serial.MessagePrefixSz) return decodeVarints(am.SubtreeCountsBytes(), counts) } @@ -149,6 +149,6 @@ func estimateAddressMapSize(keys, addresses [][]byte, subtrees []uint64) (keySz, totalSz += len(subtrees) * binary.MaxVarintLen64 totalSz += 8 + 1 + 1 + 1 totalSz += 72 - totalSz += MessagePrefixSz + totalSz += serial.MessagePrefixSz return } diff --git a/go/store/prolly/message/commit_closure.go b/go/store/prolly/message/commit_closure.go index dfb995e148..856abab007 100644 --- a/go/store/prolly/message/commit_closure.go +++ b/go/store/prolly/message/commit_closure.go @@ -49,17 +49,17 @@ func offsetsForCommitClosureKeys(buf []byte) []byte { return commitClosureKeyOffsets[:cnt*uint16Size] } -func getCommitClosureKeys(msg Message) val.SlicedBuffer { +func getCommitClosureKeys(msg serial.Message) val.SlicedBuffer { var ret val.SlicedBuffer - m := serial.GetRootAsCommitClosure(msg, MessagePrefixSz) + m := serial.GetRootAsCommitClosure(msg, serial.MessagePrefixSz) ret.Buf = m.KeyItemsBytes() ret.Offs = offsetsForCommitClosureKeys(ret.Buf) return ret } -func getCommitClosureValues(msg Message) val.SlicedBuffer { +func getCommitClosureValues(msg serial.Message) val.SlicedBuffer { var ret val.SlicedBuffer - m := serial.GetRootAsCommitClosure(msg, MessagePrefixSz) + m := serial.GetRootAsCommitClosure(msg, serial.MessagePrefixSz) if m.AddressArrayLength() == 0 { ret.Buf = commitClosureEmptyValueBytes ret.Offs = commitClosureValueOffsets[:getCommitClosureCount(msg)*uint16Size] @@ -73,29 +73,29 @@ func getCommitClosureValues(msg Message) val.SlicedBuffer { // uint64 + hash. const commitClosureKeyLength = 8 + 20 -func getCommitClosureCount(msg Message) uint16 { - m := serial.GetRootAsCommitClosure(msg, MessagePrefixSz) +func getCommitClosureCount(msg serial.Message) uint16 { + m := serial.GetRootAsCommitClosure(msg, serial.MessagePrefixSz) return uint16(m.KeyItemsLength() / commitClosureKeyLength) } -func getCommitClosureTreeLevel(msg Message) int { - m := serial.GetRootAsCommitClosure(msg, MessagePrefixSz) +func getCommitClosureTreeLevel(msg serial.Message) int { + m := serial.GetRootAsCommitClosure(msg, serial.MessagePrefixSz) return int(m.TreeLevel()) } -func getCommitClosureTreeCount(msg Message) int { - m := serial.GetRootAsCommitClosure(msg, MessagePrefixSz) +func getCommitClosureTreeCount(msg serial.Message) int { + m := serial.GetRootAsCommitClosure(msg, serial.MessagePrefixSz) return int(m.TreeCount()) } -func getCommitClosureSubtrees(msg Message) []uint64 { +func getCommitClosureSubtrees(msg serial.Message) []uint64 { counts := make([]uint64, getCommitClosureCount(msg)) - m := serial.GetRootAsCommitClosure(msg, MessagePrefixSz) + m := serial.GetRootAsCommitClosure(msg, serial.MessagePrefixSz) return decodeVarints(m.SubtreeCountsBytes(), counts) } -func walkCommitClosureAddresses(ctx context.Context, msg Message, cb func(ctx context.Context, addr hash.Hash) error) error { - m := serial.GetRootAsCommitClosure(msg, MessagePrefixSz) +func walkCommitClosureAddresses(ctx context.Context, msg serial.Message, cb func(ctx context.Context, addr hash.Hash) error) error { + m := serial.GetRootAsCommitClosure(msg, serial.MessagePrefixSz) arr := m.AddressArrayBytes() for i := 0; i < len(arr)/hash.ByteLen; i++ { addr := hash.New(arr[i*addrSize : (i+1)*addrSize]) @@ -124,7 +124,7 @@ type CommitClosureSerializer struct { var _ Serializer = CommitClosureSerializer{} -func (s CommitClosureSerializer) Serialize(keys, addrs [][]byte, subtrees []uint64, level int) Message { +func (s CommitClosureSerializer) Serialize(keys, addrs [][]byte, subtrees []uint64, level int) serial.Message { var keyArr, addrArr, cardArr fb.UOffsetT keySz, addrSz, totalSz := estimateCommitClosureSize(keys, addrs, subtrees) @@ -153,7 +153,7 @@ func (s CommitClosureSerializer) Serialize(keys, addrs [][]byte, subtrees []uint } serial.CommitClosureAddTreeLevel(b, uint8(level)) - return FinishMessage(b, serial.CommitClosureEnd(b), commitClosureFileID) + return serial.FinishMessage(b, serial.CommitClosureEnd(b), commitClosureFileID) } func estimateCommitClosureSize(keys, addresses [][]byte, subtrees []uint64) (keySz, addrSz, totalSz int) { @@ -163,6 +163,6 @@ func estimateCommitClosureSize(keys, addresses [][]byte, subtrees []uint64) (key totalSz += len(subtrees) * binary.MaxVarintLen64 totalSz += 8 + 1 + 1 + 1 totalSz += 72 - totalSz += MessagePrefixSz + totalSz += serial.MessagePrefixSz return } diff --git a/go/store/prolly/message/message.go b/go/store/prolly/message/message.go index 2b2afd0612..14844b61f0 100644 --- a/go/store/prolly/message/message.go +++ b/go/store/prolly/message/message.go @@ -16,55 +16,19 @@ package message import ( "context" - "encoding/binary" "fmt" - fb "github.com/google/flatbuffers/go" - "github.com/dolthub/dolt/go/gen/fb/serial" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/val" ) -const MessageTypesKind int = 27 - -const MessagePrefixSz = 4 - -type Message []byte - -func FinishMessage(b *fb.Builder, off fb.UOffsetT, fileID []byte) Message { - // We finish the buffer by prefixing it with: - // 1) 1 byte NomsKind == SerialMessage. - // 2) big endian 3 byte uint representing the size of the message, not - // including the kind or size prefix bytes. - // - // This allows chunks we serialize here to be read by types binary - // codec. - // - // All accessors in this package expect this prefix to be on the front - // of the message bytes as well. See |MessagePrefixSz|. - - b.Prep(1, fb.SizeInt32+4+MessagePrefixSz) - b.FinishWithFileIdentifier(off, fileID) - - var size [4]byte - binary.BigEndian.PutUint32(size[:], uint32(len(b.Bytes)-int(b.Head()))) - if size[0] != 0 { - panic("message is too large to be encoded") - } - - bytes := b.Bytes[b.Head()-MessagePrefixSz:] - bytes[0] = byte(MessageTypesKind) - copy(bytes[1:], size[1:]) - return bytes -} - type Serializer interface { - Serialize(keys, values [][]byte, subtrees []uint64, level int) Message + Serialize(keys, values [][]byte, subtrees []uint64, level int) serial.Message } -func GetKeysAndValues(msg Message) (keys, values val.SlicedBuffer, cnt uint16) { - id := serial.GetFileID(msg[MessagePrefixSz:]) +func GetKeysAndValues(msg serial.Message) (keys, values val.SlicedBuffer, cnt uint16) { + id := serial.GetFileID(msg) if id == serial.ProllyTreeNodeFileID { return getProllyMapKeysAndValues(msg) @@ -85,8 +49,8 @@ func GetKeysAndValues(msg Message) (keys, values val.SlicedBuffer, cnt uint16) { panic(fmt.Sprintf("unknown message id %s", id)) } -func WalkAddresses(ctx context.Context, msg Message, cb func(ctx context.Context, addr hash.Hash) error) error { - id := serial.GetFileID(msg[MessagePrefixSz:]) +func WalkAddresses(ctx context.Context, msg serial.Message, cb func(ctx context.Context, addr hash.Hash) error) error { + id := serial.GetFileID(msg) switch id { case serial.ProllyTreeNodeFileID: return walkProllyMapAddresses(ctx, msg, cb) @@ -99,8 +63,8 @@ func WalkAddresses(ctx context.Context, msg Message, cb func(ctx context.Context } } -func GetTreeLevel(msg Message) int { - id := serial.GetFileID(msg[MessagePrefixSz:]) +func GetTreeLevel(msg serial.Message) int { + id := serial.GetFileID(msg) switch id { case serial.ProllyTreeNodeFileID: return getProllyMapTreeLevel(msg) @@ -113,8 +77,8 @@ func GetTreeLevel(msg Message) int { } } -func GetTreeCount(msg Message) int { - id := serial.GetFileID(msg[MessagePrefixSz:]) +func GetTreeCount(msg serial.Message) int { + id := serial.GetFileID(msg) switch id { case serial.ProllyTreeNodeFileID: return getProllyMapTreeCount(msg) @@ -127,8 +91,8 @@ func GetTreeCount(msg Message) int { } } -func GetSubtrees(msg Message) []uint64 { - id := serial.GetFileID(msg[MessagePrefixSz:]) +func GetSubtrees(msg serial.Message) []uint64 { + id := serial.GetFileID(msg) switch id { case serial.ProllyTreeNodeFileID: return getProllyMapSubtrees(msg) diff --git a/go/store/prolly/message/prolly_map.go b/go/store/prolly/message/prolly_map.go index 1703934cd9..016015aac8 100644 --- a/go/store/prolly/message/prolly_map.go +++ b/go/store/prolly/message/prolly_map.go @@ -44,7 +44,7 @@ type ProllyMapSerializer struct { var _ Serializer = ProllyMapSerializer{} -func (s ProllyMapSerializer) Serialize(keys, values [][]byte, subtrees []uint64, level int) Message { +func (s ProllyMapSerializer) Serialize(keys, values [][]byte, subtrees []uint64, level int) serial.Message { var ( keyTups, keyOffs fb.UOffsetT valTups, valOffs fb.UOffsetT @@ -93,11 +93,11 @@ func (s ProllyMapSerializer) Serialize(keys, values [][]byte, subtrees []uint64, serial.ProllyTreeNodeAddValueType(b, serial.ItemTypeTupleFormatAlpha) serial.ProllyTreeNodeAddTreeLevel(b, uint8(level)) - return FinishMessage(b, serial.ProllyTreeNodeEnd(b), prollyMapFileID) + return serial.FinishMessage(b, serial.ProllyTreeNodeEnd(b), prollyMapFileID) } -func getProllyMapKeysAndValues(msg Message) (keys, values val.SlicedBuffer, cnt uint16) { - pm := serial.GetRootAsProllyTreeNode(msg, MessagePrefixSz) +func getProllyMapKeysAndValues(msg serial.Message) (keys, values val.SlicedBuffer, cnt uint16) { + pm := serial.GetRootAsProllyTreeNode(msg, serial.MessagePrefixSz) keys.Buf = pm.KeyItemsBytes() keys.Offs = getProllyMapKeyOffsets(pm) @@ -119,8 +119,8 @@ func getProllyMapKeysAndValues(msg Message) (keys, values val.SlicedBuffer, cnt return } -func walkProllyMapAddresses(ctx context.Context, msg Message, cb func(ctx context.Context, addr hash.Hash) error) error { - pm := serial.GetRootAsProllyTreeNode(msg, MessagePrefixSz) +func walkProllyMapAddresses(ctx context.Context, msg serial.Message, cb func(ctx context.Context, addr hash.Hash) error) error { + pm := serial.GetRootAsProllyTreeNode(msg, serial.MessagePrefixSz) arr := pm.AddressArrayBytes() for i := 0; i < len(arr)/hash.ByteLen; i++ { addr := hash.New(arr[i*addrSize : (i+1)*addrSize]) @@ -142,8 +142,8 @@ func walkProllyMapAddresses(ctx context.Context, msg Message, cb func(ctx contex return nil } -func getProllyMapCount(msg Message) uint16 { - pm := serial.GetRootAsProllyTreeNode(msg, MessagePrefixSz) +func getProllyMapCount(msg serial.Message) uint16 { + pm := serial.GetRootAsProllyTreeNode(msg, serial.MessagePrefixSz) if pm.KeyItemsLength() == 0 { return 0 } @@ -151,19 +151,19 @@ func getProllyMapCount(msg Message) uint16 { return uint16(pm.KeyOffsetsLength() + 1) } -func getProllyMapTreeLevel(msg Message) int { - pm := serial.GetRootAsProllyTreeNode(msg, MessagePrefixSz) +func getProllyMapTreeLevel(msg serial.Message) int { + pm := serial.GetRootAsProllyTreeNode(msg, serial.MessagePrefixSz) return int(pm.TreeLevel()) } -func getProllyMapTreeCount(msg Message) int { - pm := serial.GetRootAsProllyTreeNode(msg, MessagePrefixSz) +func getProllyMapTreeCount(msg serial.Message) int { + pm := serial.GetRootAsProllyTreeNode(msg, serial.MessagePrefixSz) return int(pm.TreeCount()) } -func getProllyMapSubtrees(msg Message) []uint64 { +func getProllyMapSubtrees(msg serial.Message) []uint64 { counts := make([]uint64, getProllyMapCount(msg)) - pm := serial.GetRootAsProllyTreeNode(msg, MessagePrefixSz) + pm := serial.GetRootAsProllyTreeNode(msg, serial.MessagePrefixSz) return decodeVarints(pm.SubtreeCountsBytes(), counts) } @@ -213,7 +213,7 @@ func estimateProllyMapSize(keys, values [][]byte, subtrees []uint64, valAddrsCnt bufSz += 72 // vtable (approx) bufSz += 100 // padding? bufSz += valAddrsCnt * len(values) - bufSz += MessagePrefixSz + bufSz += serial.MessagePrefixSz return keySz, valSz, bufSz } diff --git a/go/store/prolly/tree/node.go b/go/store/prolly/tree/node.go index 4a706e0554..4f29ded057 100644 --- a/go/store/prolly/tree/node.go +++ b/go/store/prolly/tree/node.go @@ -19,6 +19,7 @@ import ( "encoding/hex" "io" + "github.com/dolthub/dolt/go/gen/fb/serial" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/prolly/message" "github.com/dolthub/dolt/go/store/types" @@ -35,7 +36,7 @@ type Node struct { keys, values val.SlicedBuffer subtrees subtreeCounts count uint16 - msg message.Message + msg serial.Message } type AddressCb func(ctx context.Context, addr hash.Hash) error diff --git a/go/store/types/noms_kind.go b/go/store/types/noms_kind.go index aabc58093e..7db225b5bd 100644 --- a/go/store/types/noms_kind.go +++ b/go/store/types/noms_kind.go @@ -22,7 +22,7 @@ package types import ( - "github.com/dolthub/dolt/go/store/prolly/message" + "github.com/dolthub/dolt/go/gen/fb/serial" ) // NomsKind allows a TypeDesc to indicate what kind of type is described. @@ -127,8 +127,8 @@ func init() { SupportedKinds[PolygonKind] = true SupportedKinds[SerialMessageKind] = true - if message.MessageTypesKind != int(SerialMessageKind) { - panic("internal error: message.MessageTypesKind != SerialMessageKind") + if serial.MessageTypesKind != int(SerialMessageKind) { + panic("internal error: serial.MessageTypesKind != SerialMessageKind") } } diff --git a/go/store/types/path.go b/go/store/types/path.go index 4250d2d172..ee17ee6e89 100644 --- a/go/store/types/path.go +++ b/go/store/types/path.go @@ -34,7 +34,6 @@ import ( "github.com/dolthub/dolt/go/gen/fb/serial" "github.com/dolthub/dolt/go/store/d" "github.com/dolthub/dolt/go/store/hash" - "github.com/dolthub/dolt/go/store/prolly/message" ) // For an annotation like @type, 1st capture group is the annotation. @@ -236,8 +235,8 @@ func (fp FieldPath) Resolve(ctx context.Context, v Value, vr ValueReader) (Value return sv, nil } case SerialMessage: - if serial.GetFileID(v[message.MessagePrefixSz:]) == serial.CommitFileID && fp.Name == "value" { - msg := serial.GetRootAsCommit(v, message.MessagePrefixSz) + if serial.GetFileID(v) == serial.CommitFileID && fp.Name == "value" { + msg := serial.GetRootAsCommit(v, serial.MessagePrefixSz) addr := hash.New(msg.RootBytes()) return vr.ReadValue(ctx, addr) } diff --git a/go/store/types/serial_message.go b/go/store/types/serial_message.go index f86a91fd48..77be38ba97 100644 --- a/go/store/types/serial_message.go +++ b/go/store/types/serial_message.go @@ -53,10 +53,10 @@ func (sm SerialMessage) Hash(nbf *NomsBinFormat) (hash.Hash, error) { } func (sm SerialMessage) HumanReadableString() string { - id := serial.GetFileID(sm[message.MessagePrefixSz:]) + id := serial.GetFileID(sm) switch id { case serial.StoreRootFileID: - msg := serial.GetRootAsStoreRoot([]byte(sm), message.MessagePrefixSz) + msg := serial.GetRootAsStoreRoot([]byte(sm), serial.MessagePrefixSz) ret := &strings.Builder{} mapbytes := msg.AddressMapBytes() fmt.Fprintf(ret, "StoreRoot{%s}", SerialMessage(mapbytes).HumanReadableString()) @@ -64,7 +64,7 @@ func (sm SerialMessage) HumanReadableString() string { case serial.TagFileID: return "Tag" case serial.WorkingSetFileID: - msg := serial.GetRootAsWorkingSet(sm, message.MessagePrefixSz) + msg := serial.GetRootAsWorkingSet(sm, serial.MessagePrefixSz) ret := &strings.Builder{} fmt.Fprintf(ret, "{\n") fmt.Fprintf(ret, "\tName: %s\n", msg.Name()) @@ -76,7 +76,7 @@ func (sm SerialMessage) HumanReadableString() string { fmt.Fprintf(ret, "}") return ret.String() case serial.CommitFileID: - msg := serial.GetRootAsCommit(sm, message.MessagePrefixSz) + msg := serial.GetRootAsCommit(sm, serial.MessagePrefixSz) ret := &strings.Builder{} fmt.Fprintf(ret, "{\n") fmt.Fprintf(ret, "\tName: %s\n", msg.Name()) @@ -104,7 +104,7 @@ func (sm SerialMessage) HumanReadableString() string { fmt.Fprintf(ret, "}") return ret.String() case serial.RootValueFileID: - msg := serial.GetRootAsRootValue(sm, message.MessagePrefixSz) + msg := serial.GetRootAsRootValue(sm, serial.MessagePrefixSz) ret := &strings.Builder{} fmt.Fprintf(ret, "{\n") fmt.Fprintf(ret, "\tFeatureVersion: %d\n", msg.FeatureVersion()) @@ -115,7 +115,7 @@ func (sm SerialMessage) HumanReadableString() string { fmt.Fprintf(ret, "}") return ret.String() case serial.TableFileID: - msg := serial.GetRootAsTable(sm, message.MessagePrefixSz) + msg := serial.GetRootAsTable(sm, serial.MessagePrefixSz) ret := &strings.Builder{} fmt.Fprintf(ret, "{\n") @@ -133,7 +133,7 @@ func (sm SerialMessage) HumanReadableString() string { fmt.Fprintf(ret, "}") return ret.String() case serial.AddressMapFileID: - keys, values, cnt := message.GetKeysAndValues(message.Message(sm)) + keys, values, cnt := message.GetKeysAndValues(serial.Message(sm)) var b strings.Builder b.Write([]byte("AddressMap{\n")) for i := uint16(0); i < cnt; i++ { @@ -164,15 +164,15 @@ func (sm SerialMessage) Less(nbf *NomsBinFormat, other LesserValuable) (bool, er const SerialMessageRefHeight = 1024 func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error { - switch serial.GetFileID(sm[message.MessagePrefixSz:]) { + switch serial.GetFileID(sm) { case serial.StoreRootFileID: - msg := serial.GetRootAsStoreRoot([]byte(sm), message.MessagePrefixSz) + msg := serial.GetRootAsStoreRoot([]byte(sm), serial.MessagePrefixSz) if msg.AddressMapLength() > 0 { mapbytes := msg.AddressMapBytes() return SerialMessage(mapbytes).walkRefs(nbf, cb) } case serial.TagFileID: - msg := serial.GetRootAsTag([]byte(sm), message.MessagePrefixSz) + msg := serial.GetRootAsTag([]byte(sm), serial.MessagePrefixSz) addr := hash.New(msg.CommitAddrBytes()) r, err := constructRef(nbf, addr, PrimitiveTypeMap[ValueKind], SerialMessageRefHeight) if err != nil { @@ -180,7 +180,7 @@ func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error { } return cb(r) case serial.WorkingSetFileID: - msg := serial.GetRootAsWorkingSet([]byte(sm), message.MessagePrefixSz) + msg := serial.GetRootAsWorkingSet([]byte(sm), serial.MessagePrefixSz) addr := hash.New(msg.WorkingRootAddrBytes()) r, err := constructRef(nbf, addr, PrimitiveTypeMap[ValueKind], SerialMessageRefHeight) if err != nil { @@ -220,7 +220,7 @@ func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error { } } case serial.RootValueFileID: - msg := serial.GetRootAsRootValue([]byte(sm), message.MessagePrefixSz) + msg := serial.GetRootAsRootValue([]byte(sm), serial.MessagePrefixSz) err := SerialMessage(msg.TablesBytes()).walkRefs(nbf, cb) if err != nil { return err @@ -246,7 +246,7 @@ func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error { } } case serial.TableFileID: - msg := serial.GetRootAsTable([]byte(sm), message.MessagePrefixSz) + msg := serial.GetRootAsTable([]byte(sm), serial.MessagePrefixSz) addr := hash.New(msg.SchemaBytes()) r, err := constructRef(nbf, addr, PrimitiveTypeMap[ValueKind], SerialMessageRefHeight) if err != nil { @@ -344,7 +344,7 @@ func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error { return err } } - msg := serial.GetRootAsCommit([]byte(sm), message.MessagePrefixSz) + msg := serial.GetRootAsCommit([]byte(sm), serial.MessagePrefixSz) addr := hash.New(msg.RootBytes()) r, err := constructRef(nbf, addr, PrimitiveTypeMap[ValueKind], SerialMessageRefHeight) if err != nil { @@ -373,7 +373,7 @@ func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error { case serial.AddressMapFileID: fallthrough case serial.CommitClosureFileID: - return message.WalkAddresses(context.TODO(), message.Message(sm), func(ctx context.Context, addr hash.Hash) error { + return message.WalkAddresses(context.TODO(), serial.Message(sm), func(ctx context.Context, addr hash.Hash) error { r, err := constructRef(nbf, addr, PrimitiveTypeMap[ValueKind], SerialMessageRefHeight) if err != nil { return err @@ -381,13 +381,13 @@ func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error { return cb(r) }) default: - return fmt.Errorf("unsupported SerialMessage message with FileID: %s", serial.GetFileID(sm[message.MessagePrefixSz:])) + return fmt.Errorf("unsupported SerialMessage message with FileID: %s", serial.GetFileID(sm)) } return nil } func SerialCommitParentAddrs(nbf *NomsBinFormat, sm SerialMessage) ([]hash.Hash, error) { - msg := serial.GetRootAsCommit([]byte(sm), message.MessagePrefixSz) + msg := serial.GetRootAsCommit([]byte(sm), serial.MessagePrefixSz) addrs := msg.ParentAddrsBytes() n := len(addrs) / 20 ret := make([]hash.Hash, n)