go/libraries/doltcore/doltdb: root_val.go: Iterate on flatbuffers RootValue implementation.

This commit is contained in:
Aaron Son
2022-04-01 14:34:14 -07:00
parent 67e059d70a
commit 736bb01eb8
12 changed files with 610 additions and 133 deletions

View File

@@ -20,6 +20,7 @@ const StoreRootFileID = "STRT"
const TagFileID = "DTAG"
const WorkingSetFileID = "WRST"
const CommitFileID = "DCMT"
const RootValueFileID = "RTVL"
func GetFileID(bs []byte) string {
if len(bs) < 8 {

View File

@@ -0,0 +1,190 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Code generated by the FlatBuffers compiler. DO NOT EDIT.
package serial
import (
flatbuffers "github.com/google/flatbuffers/go"
)
type RootValue struct {
_tab flatbuffers.Table
}
func GetRootAsRootValue(buf []byte, offset flatbuffers.UOffsetT) *RootValue {
n := flatbuffers.GetUOffsetT(buf[offset:])
x := &RootValue{}
x.Init(buf, n+offset)
return x
}
func GetSizePrefixedRootAsRootValue(buf []byte, offset flatbuffers.UOffsetT) *RootValue {
n := flatbuffers.GetUOffsetT(buf[offset+flatbuffers.SizeUint32:])
x := &RootValue{}
x.Init(buf, n+offset+flatbuffers.SizeUint32)
return x
}
func (rcv *RootValue) Init(buf []byte, i flatbuffers.UOffsetT) {
rcv._tab.Bytes = buf
rcv._tab.Pos = i
}
func (rcv *RootValue) Table() flatbuffers.Table {
return rcv._tab
}
func (rcv *RootValue) FeatureVersion() int64 {
o := flatbuffers.UOffsetT(rcv._tab.Offset(4))
if o != 0 {
return rcv._tab.GetInt64(o + rcv._tab.Pos)
}
return 0
}
func (rcv *RootValue) MutateFeatureVersion(n int64) bool {
return rcv._tab.MutateInt64Slot(4, n)
}
func (rcv *RootValue) TablesAddr(j int) byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1))
}
return 0
}
func (rcv *RootValue) TablesAddrLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *RootValue) TablesAddrBytes() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *RootValue) MutateTablesAddr(j int, n byte) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(6))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n)
}
return false
}
func (rcv *RootValue) ForeignKeyAddr(j int) byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1))
}
return 0
}
func (rcv *RootValue) ForeignKeyAddrLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *RootValue) ForeignKeyAddrBytes() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *RootValue) MutateForeignKeyAddr(j int, n byte) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(8))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n)
}
return false
}
func (rcv *RootValue) SuperSchemasAddr(j int) byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.GetByte(a + flatbuffers.UOffsetT(j*1))
}
return 0
}
func (rcv *RootValue) SuperSchemasAddrLength() int {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
return rcv._tab.VectorLen(o)
}
return 0
}
func (rcv *RootValue) SuperSchemasAddrBytes() []byte {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
return rcv._tab.ByteVector(o + rcv._tab.Pos)
}
return nil
}
func (rcv *RootValue) MutateSuperSchemasAddr(j int, n byte) bool {
o := flatbuffers.UOffsetT(rcv._tab.Offset(10))
if o != 0 {
a := rcv._tab.Vector(o)
return rcv._tab.MutateByte(a+flatbuffers.UOffsetT(j*1), n)
}
return false
}
func RootValueStart(builder *flatbuffers.Builder) {
builder.StartObject(4)
}
func RootValueAddFeatureVersion(builder *flatbuffers.Builder, featureVersion int64) {
builder.PrependInt64Slot(0, featureVersion, 0)
}
func RootValueAddTablesAddr(builder *flatbuffers.Builder, tablesAddr flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(1, flatbuffers.UOffsetT(tablesAddr), 0)
}
func RootValueStartTablesAddrVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(1, numElems, 1)
}
func RootValueAddForeignKeyAddr(builder *flatbuffers.Builder, foreignKeyAddr flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(2, flatbuffers.UOffsetT(foreignKeyAddr), 0)
}
func RootValueStartForeignKeyAddrVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(1, numElems, 1)
}
func RootValueAddSuperSchemasAddr(builder *flatbuffers.Builder, superSchemasAddr flatbuffers.UOffsetT) {
builder.PrependUOffsetTSlot(3, flatbuffers.UOffsetT(superSchemasAddr), 0)
}
func RootValueStartSuperSchemasAddrVector(builder *flatbuffers.Builder, numElems int) flatbuffers.UOffsetT {
return builder.StartVector(1, numElems, 1)
}
func RootValueEnd(builder *flatbuffers.Builder) flatbuffers.UOffsetT {
return builder.EndObject()
}

View File

@@ -84,8 +84,7 @@ func (c *Commit) GetRootValue(ctx context.Context) (*RootValue, error) {
if rootV == nil {
return nil, errHasNoRootValue
}
// TODO: Get rid of this types.Struct assert.
return newRootValue(c.vrw, rootV.(types.Struct))
return newRootValue(c.vrw, rootV)
}
func (c *Commit) GetParent(ctx context.Context, idx int) (*Commit, error) {

View File

@@ -178,7 +178,7 @@ func (ddb *DoltDB) WriteEmptyRepoWithCommitTimeAndDefaultBranch(
return err
}
firstCommit, err := ddb.db.Commit(ctx, ds, rv.valueSt, commitOpts)
firstCommit, err := ddb.db.Commit(ctx, ds, rv.nomsValue(), commitOpts)
if err != nil {
return err
@@ -382,13 +382,11 @@ func (ddb *DoltDB) WriteRootValue(ctx context.Context, rv *RootValue) (hash.Hash
// This method is the primary place in doltcore that handles setting the FeatureVersion of root values to the current
// value, so all writes of RootValues should happen here or via WriteRootValue.
func (ddb *DoltDB) writeRootValue(ctx context.Context, rv *RootValue) (types.Ref, error) {
var err error
rv.valueSt, err = rv.valueSt.Set(featureVersKey, types.Int(DoltFeatureVersion))
rv, err := rv.setFeatureVersion(DoltFeatureVersion)
if err != nil {
return types.Ref{}, err
}
return ddb.vrw.WriteValue(ctx, rv.valueSt)
return ddb.vrw.WriteValue(ctx, rv.nomsValue())
}
// ReadRootValue reads the RootValue associated with the hash given and returns it. Returns an error if the value cannot
@@ -495,7 +493,7 @@ func (ddb *DoltDB) CommitWithParentCommits(ctx context.Context, valHash hash.Has
return nil, err
}
if st, ok := val.(types.Struct); !ok || st.Name() != ddbRootStructName {
if !isRootValue(ddb.vrw.Format(), val) {
return nil, errors.New("can't commit a value that is not a valid root value")
}
@@ -559,7 +557,7 @@ func (ddb *DoltDB) CommitDanglingWithParentCommits(ctx context.Context, valHash
if err != nil {
return nil, err
}
if st, ok := val.(types.Struct); !ok || st.Name() != ddbRootStructName {
if !isRootValue(ddb.vrw.Format(), val) {
return nil, errors.New("can't commit a value that is not a valid root value")
}
@@ -988,7 +986,7 @@ func (ddb *DoltDB) CommitWithWorkingSet(
return nil, err
}
commitDataset, _, err := ddb.db.CommitWithWorkingSet(ctx, headDs, wsDs, commit.Roots.Staged.valueSt, datas.WorkingSetSpec{
commitDataset, _, err := ddb.db.CommitWithWorkingSet(ctx, headDs, wsDs, commit.Roots.Staged.nomsValue(), datas.WorkingSetSpec{
Meta: meta,
WorkingRoot: workingRootRef,
StagedRoot: stagedRef,

View File

@@ -21,8 +21,11 @@ import (
"fmt"
"strings"
"github.com/google/flatbuffers/go"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
"github.com/dolthub/dolt/go/gen/fb/serial"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/schema/encoding"
"github.com/dolthub/dolt/go/libraries/utils/set"
@@ -45,20 +48,142 @@ type FeatureVersion int64
// only variable for testing.
var DoltFeatureVersion FeatureVersion = 2 // last bumped when changing TEXT types to use noms Blobs
// RootValue defines the structure used inside all Dolthub noms dbs
// RootValue is the value of the Database and is the committed value in every Dolt commit.
type RootValue struct {
vrw types.ValueReadWriter
valueSt types.Struct
st rvStorage
fkc *ForeignKeyCollection // cache the first load
}
func newRootValue(vrw types.ValueReadWriter, st types.Struct) (*RootValue, error) {
v, ok, err := st.MaybeGet(featureVersKey)
type rvStorage interface {
GetFeatureVersion() (FeatureVersion, bool, error)
GetTablesMap(ctx context.Context, vr types.ValueReader) (types.Map, bool, error)
GetSuperSchemaMap(ctx context.Context, vr types.ValueReader) (types.Map, bool, error)
GetForeignKeyMap(ctx context.Context, vr types.ValueReader) (types.Map, bool, error)
SetSuperSchemaMap(ctx context.Context, vrw types.ValueReadWriter, m types.Map) (rvStorage, error)
SetTablesMap(ctx context.Context, vrw types.ValueReadWriter, m types.Map) (rvStorage, error)
SetForeignKeyMap(ctx context.Context, vrw types.ValueReadWriter, m types.Map) (rvStorage, error)
SetFeatureVersion(v FeatureVersion) (rvStorage, error)
DebugString(ctx context.Context) string
nomsValue() types.Value
}
type nomsRvStorage struct {
valueSt types.Struct
}
func (r nomsRvStorage) GetFeatureVersion() (FeatureVersion, bool, error) {
v, ok, err := r.valueSt.MaybeGet(featureVersKey)
if err != nil {
return 0, false, err
}
if ok {
return FeatureVersion(v.(types.Int)), true, nil
} else {
return 0, false, nil
}
}
func (r nomsRvStorage) GetTablesMap(context.Context, types.ValueReader) (types.Map, bool, error) {
v, found, err := r.valueSt.MaybeGet(tablesKey)
if err != nil {
return types.Map{}, false, err
}
if !found {
return types.Map{}, false, nil
}
return v.(types.Map), true, nil
}
func (r nomsRvStorage) GetSuperSchemaMap(context.Context, types.ValueReader) (types.Map, bool, error) {
v, found, err := r.valueSt.MaybeGet(superSchemasKey)
if err != nil {
return types.Map{}, false, err
}
if !found {
return types.Map{}, false, nil
}
return v.(types.Map), true, nil
}
func (r nomsRvStorage) GetForeignKeyMap(context.Context, types.ValueReader) (types.Map, bool, error) {
v, found, err := r.valueSt.MaybeGet(foreignKeyKey)
if err != nil {
return types.Map{}, false, err
}
if !found {
return types.Map{}, false, err
}
return v.(types.Map), true, nil
}
func (r nomsRvStorage) SetSuperSchemaMap(ctx context.Context, vrw types.ValueReadWriter, m types.Map) (rvStorage, error) {
st, err := r.valueSt.Set(superSchemasKey, m)
if err != nil {
return nomsRvStorage{}, err
}
return nomsRvStorage{st}, nil
}
func (r nomsRvStorage) SetTablesMap(ctx context.Context, vrw types.ValueReadWriter, m types.Map) (rvStorage, error) {
st, err := r.valueSt.Set(tablesKey, m)
if err != nil {
return nomsRvStorage{}, err
}
return nomsRvStorage{st}, nil
}
func (r nomsRvStorage) SetForeignKeyMap(ctx context.Context, vrw types.ValueReadWriter, m types.Map) (rvStorage, error) {
st, err := r.valueSt.Set(foreignKeyKey, m)
if err != nil {
return nomsRvStorage{}, err
}
return nomsRvStorage{st}, nil
}
func (r nomsRvStorage) SetFeatureVersion(v FeatureVersion) (rvStorage, error) {
st, err := r.valueSt.Set(featureVersKey, types.Int(v))
if err != nil {
return nomsRvStorage{}, err
}
return nomsRvStorage{st}, nil
}
func (r nomsRvStorage) DebugString(ctx context.Context) string {
var buf bytes.Buffer
err := types.WriteEncodedValue(ctx, &buf, r.valueSt)
if err != nil {
panic(err)
}
return buf.String()
}
func (r nomsRvStorage) nomsValue() types.Value {
return r.valueSt
}
func newRootValue(vrw types.ValueReadWriter, v types.Value) (*RootValue, error) {
var storage rvStorage
if vrw.Format() == types.Format_DOLT_DEV {
srv := serial.GetRootAsRootValue([]byte(v.(types.SerialMessage)), 0)
storage = fbRvStorage{srv}
} else {
st, ok := v.(types.Struct)
if !ok {
return nil, errors.New("invalid value passed to newRootValue")
}
storage = nomsRvStorage{st}
}
ver, ok, err := storage.GetFeatureVersion()
if err != nil {
return nil, err
}
if ok {
ver := FeatureVersion(v.(types.Int))
if DoltFeatureVersion < ver {
return nil, ErrClientOutOfDate{
ClientVer: DoltFeatureVersion,
@@ -67,10 +192,39 @@ func newRootValue(vrw types.ValueReadWriter, st types.Struct) (*RootValue, error
}
}
return &RootValue{vrw, st, nil}, nil
return &RootValue{vrw, storage, nil}, nil
}
func isRootValue(nbf *types.NomsBinFormat, val types.Value) bool {
if nbf == types.Format_DOLT_DEV {
if sm, ok := val.(types.SerialMessage); ok {
return string(serial.GetFileID([]byte(sm))) == serial.RootValueFileID
}
} else {
if st, ok := val.(types.Struct); ok {
return st.Name() == ddbRootStructName
}
}
return false
}
func EmptyRootValue(ctx context.Context, vrw types.ValueReadWriter) (*RootValue, error) {
if vrw.Format() == types.Format_DOLT_DEV {
builder := flatbuffers.NewBuilder(80)
var empty hash.Hash
tablesoff := builder.CreateByteVector(empty[:])
fkoff := builder.CreateByteVector(empty[:])
ssoff := builder.CreateByteVector(empty[:])
serial.RootValueStart(builder)
serial.RootValueAddFeatureVersion(builder, int64(DoltFeatureVersion))
serial.RootValueAddTablesAddr(builder, tablesoff)
serial.RootValueAddForeignKeyAddr(builder, fkoff)
serial.RootValueAddSuperSchemasAddr(builder, ssoff)
builder.FinishWithFileIdentifier(serial.RootValueEnd(builder), []byte(serial.RootValueFileID))
bs := builder.FinishedBytes()
return newRootValue(vrw, types.SerialMessage(bs))
}
empty, err := types.NewMap(ctx, vrw)
if err != nil {
return nil, err
@@ -97,39 +251,28 @@ func (root *RootValue) VRW() types.ValueReadWriter {
// GetFeatureVersion returns the feature version of this root, if one is written
func (root *RootValue) GetFeatureVersion(ctx context.Context) (ver FeatureVersion, ok bool, err error) {
v, ok, err := root.valueSt.MaybeGet(featureVersKey)
if err != nil || !ok {
return ver, ok, err
return root.st.GetFeatureVersion()
}
func (root *RootValue) setFeatureVersion(v FeatureVersion) (*RootValue, error) {
newStorage, err := root.st.SetFeatureVersion(v)
if err != nil {
return nil, err
}
ver = FeatureVersion(v.(types.Int))
return ver, ok, err
return root.withStorage(newStorage), nil
}
func (root *RootValue) HasTable(ctx context.Context, tName string) (bool, error) {
val, found, err := root.valueSt.MaybeGet(tablesKey)
tableMap, found, err := root.st.GetTablesMap(ctx, root.vrw)
if err != nil {
return false, err
}
if !found {
return false, nil
}
tableMap := val.(types.Map)
return tableMap.Has(ctx, types.String(tName))
}
// TableNameInUse checks if a name can be used to create a new table. The most recent
// names of all current tables and all previously existing tables cannot be used.
func (root *RootValue) TableNameInUse(ctx context.Context, tName string) (bool, error) {
_, ok, err := root.GetSuperSchema(ctx, tName)
if err != nil {
return false, err
}
return ok, nil
}
// GetSuperSchema returns the SuperSchema for the table name specified if that table exists.
func (root *RootValue) GetSuperSchema(ctx context.Context, tName string) (*schema.SuperSchema, bool, error) {
// SuperSchema is only persisted on Commit()
@@ -311,19 +454,15 @@ func (root *RootValue) getSuperSchemaAtLastCommit(ctx context.Context, tName str
}
func (root *RootValue) getOrCreateSuperSchemaMap(ctx context.Context) (types.Map, error) {
v, found, err := root.valueSt.MaybeGet(superSchemasKey)
m, found, err := root.st.GetSuperSchemaMap(ctx, root.vrw)
if err != nil {
return types.EmptyMap, err
return types.Map{}, err
}
if found {
return m, nil
}
var ssm types.Map
if found {
ssm = v.(types.Map)
} else {
ssm, err = types.NewMap(ctx, root.vrw)
}
return ssm, err
return types.NewMap(ctx, root.vrw)
}
func (root *RootValue) GetAllSchemas(ctx context.Context) (map[string]schema.Schema, error) {
@@ -341,7 +480,7 @@ func (root *RootValue) GetAllSchemas(ctx context.Context) (map[string]schema.Sch
}
func (root *RootValue) GetTableHash(ctx context.Context, tName string) (hash.Hash, bool, error) {
tableMap, err := root.getTableMap()
tableMap, err := root.getTableMap(ctx)
if err != nil {
return hash.Hash{}, false, err
}
@@ -378,7 +517,7 @@ func (root *RootValue) SetTableHash(ctx context.Context, tName string, h hash.Ha
// ResolveTableName resolves a case-insensitive name to the exact name as stored in Dolt. Returns false if no matching
// name was found.
func (root *RootValue) ResolveTableName(ctx context.Context, tName string) (string, bool, error) {
tableMap, err := root.getTableMap()
tableMap, err := root.getTableMap(ctx)
if err != nil {
return "", false, err
}
@@ -408,7 +547,7 @@ func (root *RootValue) ResolveTableName(ctx context.Context, tName string) (stri
// GetTable will retrieve a table by its case-sensitive name.
func (root *RootValue) GetTable(ctx context.Context, tName string) (*Table, bool, error) {
tableMap, err := root.getTableMap()
tableMap, err := root.getTableMap(ctx)
if err != nil {
return nil, false, err
@@ -479,7 +618,7 @@ func (root *RootValue) GetTableByColTag(ctx context.Context, tag uint64) (tbl *T
// GetTableNames retrieves the lists of all tables for a RootValue
func (root *RootValue) GetTableNames(ctx context.Context) ([]string, error) {
tableMap, err := root.getTableMap()
tableMap, err := root.getTableMap(ctx)
if err != nil {
return nil, err
@@ -500,19 +639,15 @@ func (root *RootValue) GetTableNames(ctx context.Context) ([]string, error) {
return names, nil
}
func (root *RootValue) getTableMap() (types.Map, error) {
val, found, err := root.valueSt.MaybeGet(tablesKey)
func (root *RootValue) getTableMap(ctx context.Context) (types.Map, error) {
m, found, err := root.st.GetTablesMap(ctx, root.vrw)
if err != nil {
return types.EmptyMap, err
return types.Map{}, err
}
if !found || val == nil {
return types.EmptyMap, err
if !found {
return types.NewMap(ctx, root.vrw)
}
tableMap := val.(types.Map)
return tableMap, err
return m, nil
}
func (root *RootValue) TablesInConflict(ctx context.Context) ([]string, error) {
@@ -588,7 +723,7 @@ func (root *RootValue) HasConstraintViolations(ctx context.Context) (bool, error
// IterTables calls the callback function cb on each table in this RootValue.
func (root *RootValue) IterTables(ctx context.Context, cb func(name string, table *Table, sch schema.Schema) (stop bool, err error)) error {
tm, err := root.getTableMap()
tm, err := root.getTableMap(ctx)
if err != nil {
return err
@@ -648,20 +783,19 @@ func (root *RootValue) iterSuperSchemas(ctx context.Context, cb func(name string
// PutSuperSchema writes a new map entry for the table name and super schema supplied, it will overwrite an existing entry.
func (root *RootValue) PutSuperSchema(ctx context.Context, tName string, ss *schema.SuperSchema) (*RootValue, error) {
newRoot := root
ssm, err := newRoot.getOrCreateSuperSchemaMap(ctx)
ssm, err := root.getOrCreateSuperSchemaMap(ctx)
if err != nil {
return nil, err
}
ssVal, err := encoding.MarshalSuperSchemaAsNomsValue(ctx, newRoot.VRW(), ss)
ssVal, err := encoding.MarshalSuperSchemaAsNomsValue(ctx, root.VRW(), ss)
if err != nil {
return nil, err
}
ssRef, err := WriteValAndGetRef(ctx, newRoot.VRW(), ssVal)
ssRef, err := WriteValAndGetRef(ctx, root.VRW(), ssVal)
if err != nil {
return nil, err
@@ -673,14 +807,20 @@ func (root *RootValue) PutSuperSchema(ctx context.Context, tName string, ss *sch
return nil, err
}
newRootSt := newRoot.valueSt
newRootSt, err = newRootSt.Set(superSchemasKey, m)
newStorage, err := root.st.SetSuperSchemaMap(ctx, root.vrw, m)
if err != nil {
return nil, err
}
return newRootValue(root.vrw, newRootSt)
return root.withStorage(newStorage), nil
}
func (root *RootValue) withStorage(st rvStorage) *RootValue {
return &RootValue{root.vrw, st, nil}
}
func (root *RootValue) nomsValue() types.Value {
return root.st.nomsValue()
}
// PutTable inserts a table by name into the map of tables. If a table already exists with that name it will be replaced
@@ -703,7 +843,7 @@ func putTable(ctx context.Context, root *RootValue, tName string, tableRef types
panic("Don't attempt to put a table with a name that fails the IsValidTableName check")
}
tableMap, err := root.getTableMap()
tableMap, err := root.getTableMap(ctx)
if err != nil {
return nil, err
@@ -718,14 +858,12 @@ func putTable(ctx context.Context, root *RootValue, tName string, tableRef types
return nil, err
}
rootValSt := root.valueSt
rootValSt, err = rootValSt.Set(tablesKey, m)
newStorage, err := root.st.SetTablesMap(ctx, root.vrw, m)
if err != nil {
return nil, err
}
return newRootValue(root.vrw, rootValSt)
return root.withStorage(newStorage), nil
}
// CreateEmptyTable creates an empty table in this root with the name and schema given, returning the new root value.
@@ -760,13 +898,12 @@ func (root *RootValue) CreateEmptyTable(ctx context.Context, tName string, sch s
// HashOf gets the hash of the root value
func (root *RootValue) HashOf() (hash.Hash, error) {
return root.valueSt.Hash(root.vrw.Format())
return root.st.nomsValue().Hash(root.vrw.Format())
}
// UpdateSuperSchemasFromOther updates SuperSchemas of tblNames using SuperSchemas from other.
func (root *RootValue) UpdateSuperSchemasFromOther(ctx context.Context, tblNames []string, other *RootValue) (*RootValue, error) {
newRoot := root
ssm, err := newRoot.getOrCreateSuperSchemaMap(ctx)
ssm, err := root.getOrCreateSuperSchemaMap(ctx)
if err != nil {
return nil, err
@@ -805,13 +942,13 @@ func (root *RootValue) UpdateSuperSchemasFromOther(ctx context.Context, tblNames
return nil, err
}
ssVal, err := encoding.MarshalSuperSchemaAsNomsValue(ctx, newRoot.VRW(), newSS)
ssVal, err := encoding.MarshalSuperSchemaAsNomsValue(ctx, root.VRW(), newSS)
if err != nil {
return nil, err
}
ssRef, err := WriteValAndGetRef(ctx, newRoot.VRW(), ssVal)
ssRef, err := WriteValAndGetRef(ctx, root.VRW(), ssVal)
if err != nil {
return nil, err
@@ -826,20 +963,18 @@ func (root *RootValue) UpdateSuperSchemasFromOther(ctx context.Context, tblNames
return nil, err
}
newRootSt := newRoot.valueSt
newRootSt, err = newRootSt.Set(superSchemasKey, m)
newStorage, err := root.st.SetSuperSchemaMap(ctx, root.vrw, m)
if err != nil {
return nil, err
}
return newRootValue(root.vrw, newRootSt)
return root.withStorage(newStorage), nil
}
// RenameTable renames a table by changing its string key in the RootValue's table map. In order to preserve
// column tag information, use this method instead of a table drop + add.
func (root *RootValue) RenameTable(ctx context.Context, oldName, newName string) (*RootValue, error) {
tableMap, err := root.getTableMap()
tableMap, err := root.getTableMap(ctx)
if err != nil {
return nil, err
}
@@ -866,8 +1001,8 @@ func (root *RootValue) RenameTable(ctx context.Context, oldName, newName string)
if err != nil {
return nil, err
}
rootValSt := root.valueSt
rootValSt, err = rootValSt.Set(tablesKey, tableMap)
newStorage, err := root.st.SetTablesMap(ctx, root.vrw, tableMap)
if err != nil {
return nil, err
}
@@ -881,7 +1016,7 @@ func (root *RootValue) RenameTable(ctx context.Context, oldName, newName string)
if err != nil {
return nil, err
}
rootValSt, err = rootValSt.Set(foreignKeyKey, fkMap)
newStorage, err = newStorage.SetForeignKeyMap(ctx, root.vrw, fkMap)
if err != nil {
return nil, err
}
@@ -903,18 +1038,17 @@ func (root *RootValue) RenameTable(ctx context.Context, oldName, newName string)
return nil, err
}
rootValSt, err = rootValSt.Set(superSchemasKey, ssMap)
newStorage, err = newStorage.SetSuperSchemaMap(ctx, root.vrw, ssMap)
if err != nil {
return nil, err
}
return newRootValue(root.vrw, rootValSt)
}
return newRootValue(root.vrw, rootValSt)
return root.withStorage(newStorage), nil
}
func (root *RootValue) RemoveTables(ctx context.Context, allowDroppingFKReferenced bool, tables ...string) (*RootValue, error) {
tableMap, err := root.getTableMap()
tableMap, err := root.getTableMap(ctx)
if err != nil {
return nil, err
@@ -939,19 +1073,14 @@ func (root *RootValue) RemoveTables(ctx context.Context, allowDroppingFKReferenc
return nil, err
}
rootValSt, err := root.valueSt.Set(tablesKey, m)
newStorage, err := root.st.SetTablesMap(ctx, root.vrw, m)
if err != nil {
return nil, err
}
newRoot, err := newRootValue(root.vrw, rootValSt)
if err != nil {
return nil, err
}
newRoot := root.withStorage(newStorage)
fkc, err := newRoot.GetForeignKeyCollection(ctx)
if err != nil {
return nil, err
}
@@ -989,20 +1118,18 @@ func (root *RootValue) GetForeignKeyCollection(ctx context.Context) (*ForeignKey
// is to retrieve a ForeignKeyCollection in particular, it is advised to call GetForeignKeyCollection as it caches the
// result for performance.
func (root *RootValue) GetForeignKeyCollectionMap(ctx context.Context) (types.Map, error) {
v, found, err := root.valueSt.MaybeGet(foreignKeyKey)
fkMap, found, err := root.st.GetForeignKeyMap(ctx, root.vrw)
if err != nil {
return types.EmptyMap, err
return types.Map{}, err
}
var fkMap types.Map
if found {
fkMap = v.(types.Map)
} else {
if !found {
fkMap, err = types.NewMap(ctx, root.vrw)
if err != nil {
return types.EmptyMap, err
return types.Map{}, err
}
}
return fkMap, nil
}
@@ -1012,11 +1139,11 @@ func (root *RootValue) PutForeignKeyCollection(ctx context.Context, fkc *Foreign
if err != nil {
return nil, err
}
rootValSt, err := root.valueSt.Set(foreignKeyKey, fkMap)
newStorage, err := root.st.SetForeignKeyMap(ctx, root.vrw, fkMap)
if err != nil {
return nil, err
}
return &RootValue{root.vrw, rootValSt, fkc.copy()}, nil
return root.withStorage(newStorage), nil
}
// ValidateForeignKeysOnSchemas ensures that all foreign keys' tables are present, removing any foreign keys where the declared
@@ -1073,18 +1200,6 @@ func (root *RootValue) ValidateForeignKeysOnSchemas(ctx context.Context) (*RootV
return root.PutForeignKeyCollection(ctx, fkCollection)
}
// RootNeedsUniqueTagsMigration determines if this root needs to be migrated to uniquify its tags.
func RootNeedsUniqueTagsMigration(root *RootValue) (bool, error) {
// SuperSchemas were added in the same update that required unique tags. If a root does not have a
// SuperSchema map then it was created before the unique tags constraint was enforced.
_, found, err := root.valueSt.MaybeGet(superSchemasKey)
if err != nil {
return false, err
}
needToMigrate := !found
return needToMigrate, nil
}
// GetRootValueSuperSchema creates a SuperSchema with every column in history of root.
func GetRootValueSuperSchema(ctx context.Context, root *RootValue) (*schema.SuperSchema, error) {
ssMap, err := root.getOrCreateSuperSchemaMap(ctx)
@@ -1122,7 +1237,7 @@ func GetRootValueSuperSchema(ctx context.Context, root *RootValue) (*schema.Supe
}
// super schemas are only persisted on commit, so add in working schemas
tblMap, err := root.getTableMap()
tblMap, err := root.getTableMap(ctx)
if err != nil {
return nil, err
}
@@ -1224,10 +1339,7 @@ func validateTagUniqueness(ctx context.Context, root *RootValue, tableName strin
// when debugging tests.
func (root *RootValue) DebugString(ctx context.Context, transitive bool) string {
var buf bytes.Buffer
err := types.WriteEncodedValue(ctx, &buf, root.valueSt)
if err != nil {
panic(err)
}
buf.WriteString(root.st.DebugString(ctx))
if transitive {
buf.WriteString("\nTables:")
@@ -1272,3 +1384,116 @@ func (root *RootValue) MapTableHashes(ctx context.Context) (map[string]hash.Hash
}
return nameToHash, nil
}
type fbRvStorage struct {
srv *serial.RootValue
}
func (r fbRvStorage) GetFeatureVersion() (FeatureVersion, bool, error) {
return FeatureVersion(r.srv.FeatureVersion()), true, nil
}
func (r fbRvStorage) GetTablesMap(ctx context.Context, vr types.ValueReader) (types.Map, bool, error) {
addr := hash.New(r.srv.TablesAddrBytes())
if addr.IsEmpty() {
return types.Map{}, false, nil
}
v, err := vr.ReadValue(ctx, addr)
if err != nil {
return types.Map{}, false, err
}
return v.(types.Map), true, nil
}
func (r fbRvStorage) GetSuperSchemaMap(ctx context.Context, vr types.ValueReader) (types.Map, bool, error) {
addr := hash.New(r.srv.SuperSchemasAddrBytes())
if addr.IsEmpty() {
return types.Map{}, false, nil
}
v, err := vr.ReadValue(ctx, addr)
if err != nil {
return types.Map{}, false, err
}
return v.(types.Map), true, nil
}
func (r fbRvStorage) GetForeignKeyMap(ctx context.Context, vr types.ValueReader) (types.Map, bool, error) {
addr := hash.New(r.srv.ForeignKeyAddrBytes())
if addr.IsEmpty() {
return types.Map{}, false, nil
}
v, err := vr.ReadValue(ctx, addr)
if err != nil {
return types.Map{}, false, err
}
return v.(types.Map), true, nil
}
func (r fbRvStorage) SetSuperSchemaMap(ctx context.Context, vrw types.ValueReadWriter, m types.Map) (rvStorage, error) {
var h hash.Hash
if !m.Empty() {
ref, err := vrw.WriteValue(ctx, m)
if err != nil {
return nil, err
}
h = ref.TargetHash()
}
ret := r.clone()
copy(ret.srv.SuperSchemasAddrBytes(), h[:])
return ret, nil
}
func (r fbRvStorage) SetTablesMap(ctx context.Context, vrw types.ValueReadWriter, m types.Map) (rvStorage, error) {
var h hash.Hash
if !m.Empty() {
ref, err := vrw.WriteValue(ctx, m)
if err != nil {
return nil, err
}
h = ref.TargetHash()
}
ret := r.clone()
copy(ret.srv.TablesAddrBytes(), h[:])
return ret, nil
}
func (r fbRvStorage) SetForeignKeyMap(ctx context.Context, vrw types.ValueReadWriter, m types.Map) (rvStorage, error) {
var h hash.Hash
if !m.Empty() {
ref, err := vrw.WriteValue(ctx, m)
if err != nil {
return nil, err
}
h = ref.TargetHash()
}
ret := r.clone()
copy(ret.srv.ForeignKeyAddrBytes(), h[:])
return ret, nil
}
func (r fbRvStorage) SetFeatureVersion(v FeatureVersion) (rvStorage, error) {
ret := r.clone()
ret.srv.MutateFeatureVersion(int64(v))
return ret, nil
}
func (r fbRvStorage) clone() fbRvStorage {
bs := make([]byte, len(r.srv.Table().Bytes))
copy(bs, r.srv.Table().Bytes)
var ret serial.RootValue
ret.Init(bs, r.srv.Table().Pos)
return fbRvStorage{&ret}
}
func (r fbRvStorage) DebugString(ctx context.Context) string {
return fmt.Sprintf("fbRvStorage[%d, %s, %s, %s]",
r.srv.FeatureVersion(),
hash.New(r.srv.TablesAddrBytes()).String(),
hash.New(r.srv.ForeignKeyAddrBytes()).String(),
hash.New(r.srv.SuperSchemasAddrBytes()).String())
return "fbRvStorage ["
}
func (r fbRvStorage) nomsValue() types.Value {
return types.SerialMessage(r.srv.Table().Bytes)
}

View File

@@ -79,7 +79,7 @@ func newMergeState(ctx context.Context, vrw types.ValueReadWriter, mergeState ty
return nil, err
}
workingRoot, err := newRootValue(vrw, workingRootValSt.(types.Struct))
workingRoot, err := newRootValue(vrw, workingRootValSt)
if err != nil {
return nil, err
}
@@ -187,23 +187,23 @@ func NewWorkingSet(ctx context.Context, name string, vrw types.ValueReadWriter,
}
}
workingRootValSt, err := vrw.ReadValue(ctx, dsws.WorkingAddr)
workingRootVal, err := vrw.ReadValue(ctx, dsws.WorkingAddr)
if err != nil {
return nil, err
}
workingRoot, err := newRootValue(vrw, workingRootValSt.(types.Struct))
workingRoot, err := newRootValue(vrw, workingRootVal)
if err != nil {
return nil, err
}
var stagedRoot *RootValue
if dsws.StagedAddr != nil {
stagedRootValSt, err := vrw.ReadValue(ctx, *dsws.StagedAddr)
stagedRootVal, err := vrw.ReadValue(ctx, *dsws.StagedAddr)
if err != nil {
return nil, err
}
stagedRoot, err = newRootValue(vrw, stagedRootValSt.(types.Struct))
stagedRoot, err = newRootValue(vrw, stagedRootVal)
if err != nil {
return nil, err
}

View File

@@ -145,7 +145,7 @@ func mergeIntoWorkingSet(ctx *sql.Context, sess *dsess.DoltSession, roots doltdb
return ws, noConflicts, doltdb.ErrMergeActive
}
err := checkForUncommittedChanges(roots.Working, roots.Head)
err := checkForUncommittedChanges(ctx, roots.Working, roots.Head)
if err != nil {
return ws, noConflicts, err
}
@@ -409,7 +409,7 @@ func mergeRootToWorking(
return ws, nil
}
func checkForUncommittedChanges(root *doltdb.RootValue, headRoot *doltdb.RootValue) error {
func checkForUncommittedChanges(ctx *sql.Context, root *doltdb.RootValue, headRoot *doltdb.RootValue) error {
rh, err := root.HashOf()
if err != nil {
@@ -423,6 +423,7 @@ func checkForUncommittedChanges(root *doltdb.RootValue, headRoot *doltdb.RootVal
}
if rh != hrh {
fmt.Printf("root: %s\nheadRoot: %s\n", root.DebugString(ctx, true), headRoot.DebugString(ctx, true))
return ErrUncommittedChanges.New()
}
return nil

View File

@@ -88,7 +88,7 @@ func LoadedLocalLocation() *time.Location {
func BasicSelectTests() []SelectTest {
headCommitHash := "so275enkvulb96mkckbun1kjo9seg7c9"
if types.Format_Default == types.Format_DOLT_DEV {
headCommitHash = "pi04gco3l5jb373lhohv1ggoif4o7d8p"
headCommitHash = "k5r059st4fjs8lmksast0npbi89kek27"
}
return []SelectTest{
{

View File

@@ -20,6 +20,7 @@ const StoreRootFileID = "STRT"
const TagFileID = "DTAG"
const WorkingSetFileID = "WRST"
const CommitFileID = "DCMT"
const RootValueFileID = "RTVL"
func GetFileID(bs []byte) string {
if len(bs) < 8 {

View File

@@ -10,10 +10,11 @@ flatc -o $GEN_DIR --gen-onefile --filename-suffix "" --gen-mutable --go-namespac
commit.fbs \
database.fbs \
prolly.fbs \
rootvalue.fbs \
schema.fbs \
storeroot.fbs \
tag.fbs \
table.fbs \
tag.fbs \
workingset.fbs
# prefix files with copyright header

29
go/serial/rootvalue.fbs Normal file
View File

@@ -0,0 +1,29 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
namespace serial;
table RootValue {
feature_version:int64;
tables_addr:[ubyte];
foreign_key_addr:[ubyte];
super_schemas_addr:[ubyte];
}
// KEEP THIS IN SYNC WITH fileidentifiers.go
file_identifier "RTVL";
root_type RootValue;

View File

@@ -139,6 +139,38 @@ func (sm SerialMessage) walkRefs(nbf *NomsBinFormat, cb RefCallback) error {
return err
}
}
case serial.RootValueFileID:
msg := serial.GetRootAsRootValue([]byte(sm), 0)
addr := hash.New(msg.TablesAddrBytes())
if !addr.IsEmpty() {
r, err := constructRef(nbf, addr, PrimitiveTypeMap[ValueKind], SerialMessageRefHeight)
if err != nil {
return err
}
if err = cb(r); err != nil {
return err
}
}
addr = hash.New(msg.ForeignKeyAddrBytes())
if !addr.IsEmpty() {
r, err := constructRef(nbf, addr, PrimitiveTypeMap[ValueKind], SerialMessageRefHeight)
if err != nil {
return err
}
if err = cb(r); err != nil {
return err
}
}
addr = hash.New(msg.SuperSchemasAddrBytes())
if !addr.IsEmpty() {
r, err := constructRef(nbf, addr, PrimitiveTypeMap[ValueKind], SerialMessageRefHeight)
if err != nil {
return err
}
if err = cb(r); err != nil {
return err
}
}
case serial.CommitFileID:
parents, err := SerialCommitParentRefs(nbf, sm)
if err != nil {