go/{store/datas,libraries/doltcore/doltdb/durable}: Move __DOLT_1__ format to flatbuffers top-of-DAG.

This commit is contained in:
Aaron Son
2022-05-05 12:48:58 -07:00
parent 6a5f5c6282
commit 0d25806b38
11 changed files with 63 additions and 32 deletions

View File

@@ -49,6 +49,10 @@ type Index interface {
// AddColumnToRows adds the column given to the rows data and returns the resulting rows.
// The |newCol| is present in |newSchema|.
AddColumnToRows(ctx context.Context, newCol string, newSchema schema.Schema) (Index, error)
// Returns the serialized bytes of the (top of the) index.
// Non-public. Used for flatbuffers Table persistence.
bytes() ([]byte, error)
}
// IndexSet stores a collection secondary Indexes.
@@ -178,6 +182,15 @@ func (i nomsIndex) Format() *types.NomsBinFormat {
return i.vrw.Format()
}
// bytes implements Index.
func (i nomsIndex) bytes() ([]byte, error) {
rowschunk, err := types.EncodeValue(i.index, i.vrw.Format())
if err != nil {
return nil, err
}
return rowschunk.Data(), nil
}
func (i nomsIndex) AddColumnToRows(ctx context.Context, newCol string, newSchema schema.Schema) (Index, error) {
// no-op for noms indexes because of tag-based mapping
return i, nil
@@ -219,6 +232,11 @@ func (i prollyIndex) Format() *types.NomsBinFormat {
return i.index.Format()
}
// bytes implements Index.
func (i prollyIndex) bytes() ([]byte, error) {
return []byte(prolly.ValueFromMap(i.index).(types.TupleRowStorage)), nil
}
var _ Index = prollyIndex{}
func (i prollyIndex) AddColumnToRows(ctx context.Context, newCol string, newSchema schema.Schema) (Index, error) {
@@ -286,7 +304,7 @@ func (i prollyIndex) AddColumnToRows(ctx context.Context, newCol string, newSche
// NewIndexSet returns an empty IndexSet.
func NewIndexSet(ctx context.Context, vrw types.ValueReadWriter) IndexSet {
if vrw.Format() == types.Format_DOLT_DEV {
if vrw.Format().UsesFlatbuffers() {
builder := flatbuffers.NewBuilder(24)
serial.RefMapStart(builder)
builder.Finish(serial.RefMapEnd(builder))

View File

@@ -30,6 +30,7 @@ import (
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/pool"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/types"
)
@@ -138,7 +139,7 @@ func NewNomsTable(ctx context.Context, vrw types.ValueReadWriter, sch schema.Sch
// NewTable returns a new Table.
func NewTable(ctx context.Context, vrw types.ValueReadWriter, sch schema.Schema, rows Index, indexes IndexSet, autoIncVal types.Value) (Table, error) {
if vrw.Format() == types.Format_DOLT_DEV {
if vrw.Format().UsesFlatbuffers() {
return newDoltDevTable(ctx, vrw, sch, rows, indexes, autoIncVal)
}
@@ -191,7 +192,7 @@ func TableFromAddr(ctx context.Context, vrw types.ValueReadWriter, addr hash.Has
return nil, err
}
if vrw.Format() != types.Format_DOLT_DEV {
if !vrw.Format().UsesFlatbuffers() {
st, ok := val.(types.Struct)
if !ok {
err = errors.New("table ref is unexpected noms value")
@@ -655,12 +656,10 @@ func newDoltDevTable(ctx context.Context, vrw types.ValueReadWriter, sch schema.
}
schemaAddr := schemaRef.TargetHash()
rowsmap := rows.(nomsIndex).index
rowschunk, err := types.EncodeValue(rowsmap, vrw.Format())
rowsbytes, err := rows.bytes()
if err != nil {
return nil, err
}
rowsbytes := rowschunk.Data()
if indexes == nil {
indexes = NewIndexSet(ctx, vrw)
@@ -727,21 +726,28 @@ func (t doltDevTable) SetSchema(ctx context.Context, sch schema.Schema) (Table,
func (t doltDevTable) GetTableRows(ctx context.Context) (Index, error) {
rowbytes := t.msg.PrimaryIndexBytes()
rowchunk := chunks.NewChunk(rowbytes)
tv, err := types.DecodeValue(rowchunk, t.vrw)
if err != nil {
return nil, err
if t.vrw.Format() == types.Format_DOLT_DEV {
rowchunk := chunks.NewChunk(rowbytes)
tv, err := types.DecodeValue(rowchunk, t.vrw)
if err != nil {
return nil, err
}
return IndexFromNomsMap(tv.(types.Map), t.vrw), nil
} else {
sch, err := t.GetSchema(ctx)
if err != nil {
return nil, err
}
m := prolly.MapFromValue(types.TupleRowStorage(rowbytes), sch, t.vrw)
return IndexFromProllyMap(m), nil
}
return IndexFromNomsMap(tv.(types.Map), t.vrw), nil
}
func (t doltDevTable) SetTableRows(ctx context.Context, rows Index) (Table, error) {
rowsmap := rows.(nomsIndex).index
rowschunk, err := types.EncodeValue(rowsmap, t.vrw.Format())
rowsbytes, err := rows.bytes()
if err != nil {
return nil, err
}
rowsbytes := rowschunk.Data()
fields := t.fields()
fields.rows = rowsbytes

View File

@@ -250,7 +250,7 @@ func (r nomsRvStorage) nomsValue() types.Value {
func newRootValue(vrw types.ValueReadWriter, v types.Value) (*RootValue, error) {
var storage rvStorage
if vrw.Format() == types.Format_DOLT_DEV {
if vrw.Format().UsesFlatbuffers() {
srv := serial.GetRootAsRootValue([]byte(v.(types.SerialMessage)), 0)
storage = fbRvStorage{srv}
} else {
@@ -278,7 +278,7 @@ func newRootValue(vrw types.ValueReadWriter, v types.Value) (*RootValue, error)
}
func isRootValue(nbf *types.NomsBinFormat, val types.Value) bool {
if nbf == types.Format_DOLT_DEV {
if nbf.UsesFlatbuffers() {
if sm, ok := val.(types.SerialMessage); ok {
return string(serial.GetFileID([]byte(sm))) == serial.RootValueFileID
}
@@ -291,7 +291,7 @@ func isRootValue(nbf *types.NomsBinFormat, val types.Value) bool {
}
func EmptyRootValue(ctx context.Context, vrw types.ValueReadWriter) (*RootValue, error) {
if vrw.Format() == types.Format_DOLT_DEV {
if vrw.Format().UsesFlatbuffers() {
builder := flatbuffers.NewBuilder(80)
var empty hash.Hash
serial.RefMapStart(builder)

View File

@@ -177,7 +177,7 @@ func newCommitForValue(ctx context.Context, vrw types.ValueReadWriter, v types.V
opts.Meta = &CommitMeta{}
}
if vrw.Format() == types.Format_DOLT_DEV {
if vrw.Format().UsesFlatbuffers() {
r, err := vrw.WriteValue(ctx, v)
if err != nil {
return nil, err
@@ -241,7 +241,7 @@ func newCommitForValue(ctx context.Context, vrw types.ValueReadWriter, v types.V
}
func commitPtr(nbf *types.NomsBinFormat, v types.Value, r *types.Ref) (*Commit, error) {
if nbf == types.Format_DOLT_DEV {
if nbf.UsesFlatbuffers() {
bs := []byte(v.(types.SerialMessage))
height := serial.GetRootAsCommit(bs, 0).Height()
var addr hash.Hash

View File

@@ -283,7 +283,7 @@ func (db *database) Datasets(ctx context.Context) (DatasetsMap, error) {
return nil, err
}
if db.Format() == types.Format_DOLT_DEV {
if db.Format().UsesFlatbuffers() {
rm, err := db.loadDatasetsRefmap(ctx, rootHash)
if err != nil {
return nil, err
@@ -792,7 +792,7 @@ func (db *database) update(ctx context.Context,
var newRootHash hash.Hash
if db.Format() == types.Format_DOLT_DEV {
if db.Format().UsesFlatbuffers() {
datasets, err := db.loadDatasetsRefmap(ctx, root)
if err != nil {
return err

View File

@@ -278,7 +278,7 @@ func mustNomsMap(t *testing.T, dsm DatasetsMap) types.Map {
}
func (suite *DatabaseSuite) TestDatasetsMapType() {
if suite.db.Format() == types.Format_DOLT_DEV {
if suite.db.Format().UsesFlatbuffers() {
suite.T().Skip()
}

View File

@@ -52,7 +52,7 @@ type TagOptions struct {
// persists it, and returns its addr. Also returns a types.Ref to the tag, if
// the format for |db| is noms.
func newTag(ctx context.Context, db *database, commitAddr hash.Hash, meta *TagMeta) (hash.Hash, types.Ref, error) {
if db.Format() != types.Format_DOLT_DEV {
if !db.Format().UsesFlatbuffers() {
commitSt, err := db.ReadValue(ctx, commitAddr)
if err != nil {
return hash.Hash{}, types.Ref{}, err

View File

@@ -39,7 +39,7 @@ func TestNewTag(t *testing.T) {
db := NewDatabase(storage.NewViewWithDefaultFormat()).(*database)
defer db.Close()
if db.Format() == types.Format_DOLT_DEV {
if db.Format().UsesFlatbuffers() {
t.Skip()
}

View File

@@ -99,7 +99,7 @@ type WorkingSetSpec struct {
// ```
// where M is a struct type and R is a ref type.
func newWorkingSet(ctx context.Context, db *database, meta *WorkingSetMeta, workingRef, stagedRef types.Ref, mergeState *MergeState) (hash.Hash, types.Ref, error) {
if db.Format() == types.Format_DOLT_DEV {
if db.Format().UsesFlatbuffers() {
stagedAddr := stagedRef.TargetHash()
data := workingset_flatbuffer(workingRef.TargetHash(), &stagedAddr, mergeState, meta)
@@ -191,7 +191,7 @@ func workingset_flatbuffer(working hash.Hash, staged *hash.Hash, mergeState *Mer
}
func NewMergeState(ctx context.Context, vrw types.ValueReadWriter, preMergeWorking types.Ref, commit *Commit) (*MergeState, error) {
if vrw.Format() == types.Format_DOLT_DEV {
if vrw.Format().UsesFlatbuffers() {
ms := &MergeState{
preMergeWorkingAddr: new(hash.Hash),
fromCommitAddr: new(hash.Hash),

View File

@@ -120,3 +120,7 @@ func (nbf *NomsBinFormat) VersionString() string {
panic("unrecognized NomsBinFormat tag value")
}
}
func (nbf *NomsBinFormat) UsesFlatbuffers() bool {
return nbf.tag == formatTag_DOLT_1 || nbf.tag == formatTag_DOLT_DEV
}

View File

@@ -268,13 +268,16 @@ func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error {
mapbytes := msg.PrimaryIndexBytes()
dec := newValueDecoder(mapbytes, nil)
v, err := dec.readValue(nbf)
if err != nil {
return err
if nbf == Format_DOLT_DEV {
dec := newValueDecoder(mapbytes, nil)
v, err := dec.readValue(nbf)
if err != nil {
return err
}
return v.walkRefs(nbf, cb)
} else {
return TupleRowStorage(mapbytes).walkRefs(nbf, cb)
}
return v.walkRefs(nbf, cb)
case serial.CommitFileID:
parents, err := SerialCommitParentAddrs(nbf, sm)
if err != nil {