first pass at migrating types.Map to prolly.Map

This commit is contained in:
Andy Arthur
2022-07-27 16:35:50 -07:00
parent c24ae5d370
commit c325d4f497
5 changed files with 376 additions and 45 deletions
+1 -1
View File
@@ -39,7 +39,7 @@ const (
)
var (
targetFormat = types.Format_DOLT_DEV
targetFormat = types.Format_DOLT_1
migrationMsg = fmt.Sprintf("migrating database to Noms Binary Format %s", targetFormat.VersionString())
)
+163 -43
View File
@@ -16,8 +16,11 @@ package migrate
import (
"context"
"fmt"
"io"
"github.com/dolthub/vitess/go/vt/proto/query"
"golang.org/x/sync/errgroup"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
@@ -25,7 +28,10 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/prolly"
"github.com/dolthub/dolt/go/store/prolly/tree"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/val"
)
func migrateWorkingSet(ctx context.Context, wsRef ref.WorkingSetRef, old, new *doltdb.DoltDB, prog Progress) error {
@@ -195,6 +201,13 @@ func migrateRoot(ctx context.Context, root *doltdb.RootValue, new *doltdb.DoltDB
return true, err
}
ok, err := tbl.HasConflicts(ctx)
if err != nil {
return true, err
} else if ok {
return true, fmt.Errorf("cannot migrate table with conflicts (%s)", name)
}
migrated, err = migrated.PutTable(ctx, name, mtbl)
if err != nil {
return true, err
@@ -213,22 +226,6 @@ func migrateRoot(ctx context.Context, root *doltdb.RootValue, new *doltdb.DoltDB
}
func migrateTable(ctx context.Context, name string, table *doltdb.Table, new *doltdb.DoltDB) (*doltdb.Table, error) {
rows, err := table.GetRowData(ctx)
if err != nil {
return nil, err
}
err = migrateNomsMap(ctx, rows, table.ValueReadWriter(), new.ValueReadWriter())
if err != nil {
return nil, err
}
ai, err := table.GetAutoIncrementValue(ctx)
if err != nil {
return nil, err
}
autoInc := types.Uint(ai)
sch, err := table.GetSchema(ctx)
if err != nil {
return nil, err
@@ -245,6 +242,16 @@ func migrateTable(ctx context.Context, name string, table *doltdb.Table, new *do
return nil, err
}
rows, err := table.GetRowData(ctx)
if err != nil {
return nil, err
}
newRows, err := migrateIndex(ctx, sch, rows, table.ValueReadWriter(), new.NodeStore())
if err != nil {
return nil, err
}
oldSet, err := table.GetIndexSet(ctx)
if err != nil {
return nil, err
@@ -255,7 +262,13 @@ func migrateTable(ctx context.Context, name string, table *doltdb.Table, new *do
return nil, err
}
return doltdb.NewTable(ctx, new.ValueReadWriter(), new.NodeStore(), sch, rows, newSet, autoInc)
ai, err := table.GetAutoIncrementValue(ctx)
if err != nil {
return nil, err
}
autoInc := types.Uint(ai)
return doltdb.NewTable(ctx, new.ValueReadWriter(), new.NodeStore(), sch, newRows, newSet, autoInc)
}
// patchMigrateSchema attempts to correct irregularities in existing schemas
@@ -285,11 +298,13 @@ func migrateIndexSet(ctx context.Context, sch schema.Schema, oldSet durable.Inde
if err != nil {
return nil, err
}
if err = migrateNomsMap(ctx, idx, old, new.ValueReadWriter()); err != nil {
newIdx, err := migrateIndex(ctx, def.Schema(), idx, old, new.NodeStore())
if err != nil {
return nil, err
}
newSet, err = newSet.PutIndex(ctx, def.Name(), idx)
newSet, err = newSet.PutIndex(ctx, def.Name(), newIdx)
if err != nil {
return nil, err
}
@@ -297,36 +312,141 @@ func migrateIndexSet(ctx context.Context, sch schema.Schema, oldSet durable.Inde
return newSet, nil
}
func migrateNomsMap(ctx context.Context, idx durable.Index, old, new types.ValueReadWriter) error {
m := durable.NomsMapFromIndex(idx)
return copyTreeFromValue(ctx, m, old, new)
func migrateIndex(ctx context.Context, sch schema.Schema, idx durable.Index, old types.ValueReadWriter, ns tree.NodeStore) (durable.Index, error) {
eg, ctx := errgroup.WithContext(ctx)
reader := make(chan types.Tuple, 256)
writer := make(chan val.Tuple, 256)
kt, vt := tupleTranslatorsFromSchema(sch, old)
kd := kt.builder.Desc
vd := vt.builder.Desc
var oldMap = durable.NomsMapFromIndex(idx)
var newMap prolly.Map
// read old noms map
eg.Go(func() error {
defer close(reader)
return readNomsMap(ctx, oldMap, reader)
})
// translate noms tuples to prolly tuples
eg.Go(func() error {
defer close(writer)
return translateTuples(ctx, kt, vt, reader, writer)
})
// write tuples in new prolly map
eg.Go(func() (err error) {
newMap, err = writeProllyMap(ctx, ns, kd, vd, writer)
return
})
if err := eg.Wait(); err != nil {
return nil, err
}
return durable.IndexFromProllyMap(newMap), nil
}
// copyTreeFromValue recursively copies |v| and all its children from |old| to |new|.
func copyTreeFromValue(ctx context.Context, v types.Value, old, new types.ValueReadWriter) error {
if _, err := new.WriteValue(ctx, v); err != nil {
return err
}
return types.WalkAddrs(v, old.Format(), func(h hash.Hash, isleaf bool) error {
if err := copyValue(ctx, h, old, new); err != nil {
return err
func readNomsMap(ctx context.Context, m types.Map, reader chan<- types.Tuple) error {
return m.Iter(ctx, func(key, value types.Value) (stop bool, err error) {
select {
case reader <- key.(types.Tuple):
case _ = <-ctx.Done():
stop = true
return
}
if isleaf {
return nil
select {
case reader <- value.(types.Tuple):
case _ = <-ctx.Done():
stop = true
return
}
val, err := old.ReadValue(ctx, h)
if err != nil {
return err
}
return copyTreeFromValue(ctx, val, old, new)
return
})
}
func copyValue(ctx context.Context, addr hash.Hash, old, new types.ValueReadWriter) (err error) {
var v types.Value
if v, err = old.ReadValue(ctx, addr); err != nil {
return err
func translateTuples(ctx context.Context, kt, vt translator, reader <-chan types.Tuple, writer chan<- val.Tuple) error {
for {
var (
oldKey types.Tuple
oldVal types.Tuple
ok bool
)
select {
case oldKey, ok = <-reader:
if !ok {
return nil // done
}
case _ = <-ctx.Done():
return nil
}
newKey, err := kt.TranslateTuple(ctx, oldKey)
if err != nil {
return err
}
select {
case writer <- newKey:
case _ = <-ctx.Done():
return nil
}
select {
case oldVal, ok = <-reader:
assertTrue(ok)
case _ = <-ctx.Done():
return nil
}
newVal, err := vt.TranslateTuple(ctx, oldVal)
if err != nil {
return err
}
select {
case writer <- newVal:
case _ = <-ctx.Done():
return nil
}
}
_, err = new.WriteValue(ctx, v)
return
}
func writeProllyMap(ctx context.Context, ns tree.NodeStore, kd, vd val.TupleDesc, writer <-chan val.Tuple) (prolly.Map, error) {
pro := channelProvider{tuples: writer}
return prolly.NewMapFromProvider(ctx, ns, kd, vd, pro)
}
type channelProvider struct {
tuples <-chan val.Tuple
}
var _ prolly.TupleProvider = channelProvider{}
func (p channelProvider) Next(ctx context.Context) (val.Tuple, val.Tuple, error) {
var (
k, v val.Tuple
ok bool
)
select {
case k, ok = <-p.tuples:
if !ok {
return nil, nil, io.EOF // done
}
case _ = <-ctx.Done():
return nil, nil, nil
}
select {
case v, ok = <-p.tuples:
assertTrue(ok)
case _ = <-ctx.Done():
return nil, nil, nil
}
return k, v, nil
}
+205
View File
@@ -0,0 +1,205 @@
// Copyright 2022 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package migrate
import (
"context"
"fmt"
"time"
"github.com/shopspring/decimal"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/store/pool"
"github.com/dolthub/dolt/go/store/prolly/shim"
"github.com/dolthub/dolt/go/store/types"
"github.com/dolthub/dolt/go/store/val"
)
type translator struct {
builder *val.TupleBuilder
mapping map[uint64]int
pool pool.BuffPool
}
func tupleTranslatorsFromSchema(sch schema.Schema, old types.ValueReadWriter) (kt, vt translator) {
kd := shim.KeyDescriptorFromSchema(sch)
kt = newTupleTranslator(sch.GetPKCols(), kd)
vd := shim.ValueDescriptorFromSchema(sch)
vt = newTupleTranslator(sch.GetNonPKCols(), vd)
return
}
func newTupleTranslator(cols *schema.ColCollection, desc val.TupleDesc) translator {
return translator{
builder: val.NewTupleBuilder(desc),
mapping: cols.TagToIdx,
pool: pool.NewBuffPool(),
}
}
// TranslateTuple translates a types.Tuple into a val.Tuple.
func (t translator) TranslateTuple(ctx context.Context, tup types.Tuple) (val.Tuple, error) {
if !isEven(tup.Len()) {
return nil, fmt.Errorf("expected even-legnth tuple (len %d)", tup.Len())
}
var tag uint64
err := tup.IterFields(func(i uint64, value types.Value) (stop bool, err error) {
// even fields are column tags, odd fields are column values
if isEven(i) {
tag = uint64(value.(types.Uint))
} else {
// |tag| set in previous iteration
pos := t.mapping[tag]
err = translateNomsField(value, pos, t.builder)
stop = err != nil
}
return
})
if err != nil {
return nil, err
}
return t.builder.Build(t.pool), nil
}
func translateNomsField(value types.Value, idx int, b *val.TupleBuilder) error {
nk := value.Kind()
switch nk {
case types.UintKind:
translateUintField(value.(types.Uint), idx, b)
case types.IntKind:
translateIntField(value.(types.Int), idx, b)
case types.FloatKind:
translateFloatField(value.(types.Float), idx, b)
case types.TimestampKind:
translateTimestampField(value.(types.Timestamp), idx, b)
case types.BoolKind:
b.PutBool(idx, bool(value.(types.Bool)))
case types.StringKind:
b.PutString(idx, string(value.(types.String)))
case types.UUIDKind:
uuid := value.(types.UUID)
b.PutHash128(idx, uuid[:])
case types.InlineBlobKind:
b.PutByteString(idx, value.(types.InlineBlob))
case types.DecimalKind:
b.PutDecimal(idx, decimal.Decimal(value.(types.Decimal)))
case types.PointKind, types.LineStringKind,
types.PolygonKind, types.GeometryKind:
translateGeometryField(value, idx, b)
case types.JSONKind:
translateJSONField(value.(types.JSON), idx, b)
case types.BlobKind:
translateBlobField(value.(types.Blob), idx, b)
default:
return fmt.Errorf("encountered unexpected NomsKind %s",
types.KindToString[nk])
}
return nil
}
func translateUintField(value types.Uint, idx int, b *val.TupleBuilder) {
typ := b.Desc.Types[idx]
switch typ.Enc {
case val.Uint8Enc:
b.PutUint8(idx, uint8(value))
case val.Uint16Enc:
b.PutUint16(idx, uint16(value))
case val.Uint32Enc:
b.PutUint32(idx, uint32(value))
case val.Uint64Enc:
b.PutUint64(idx, uint64(value))
case val.EnumEnc:
b.PutEnum(idx, uint16(value))
case val.SetEnc:
b.PutSet(idx, uint64(value))
default:
panic(fmt.Sprintf("unexpected encoding for uint (%d)", typ.Enc))
}
}
func translateIntField(value types.Int, idx int, b *val.TupleBuilder) {
typ := b.Desc.Types[idx]
switch typ.Enc {
case val.Int8Enc:
b.PutInt8(idx, int8(value))
case val.Int16Enc:
b.PutInt16(idx, int16(value))
case val.Int32Enc:
b.PutInt32(idx, int32(value))
case val.Int64Enc:
b.PutInt64(idx, int64(value))
case val.YearEnc:
b.PutInt16(idx, int16(value))
case val.TimeEnc:
b.PutInt64(idx, int64(value))
default:
panic(fmt.Sprintf("unexpected encoding for int (%d)", typ.Enc))
}
}
func translateFloatField(value types.Float, idx int, b *val.TupleBuilder) {
typ := b.Desc.Types[idx]
switch typ.Enc {
case val.Float32Enc:
b.PutFloat32(idx, float32(value))
case val.Float64Enc:
b.PutFloat64(idx, float64(value))
default:
panic(fmt.Sprintf("unexpected encoding for float (%d)", typ.Enc))
}
}
func translateTimestampField(value types.Timestamp, idx int, b *val.TupleBuilder) {
typ := b.Desc.Types[idx]
switch typ.Enc {
case val.DateEnc:
b.PutDate(idx, time.Time(value))
case val.DatetimeEnc:
b.PutDatetime(idx, time.Time(value))
default:
panic(fmt.Sprintf("unexpected encoding for timestamp (%d)", typ.Enc))
}
}
func translateGeometryField(value types.Value, ids int, b *val.TupleBuilder) {
panic("unimplemeted")
}
func translateJSONField(value types.JSON, idx int, b *val.TupleBuilder) {
panic("unimplemeted")
}
func translateBlobField(value types.Blob, idx int, b *val.TupleBuilder) {
panic("unimplemeted")
}
func isEven(n uint64) bool {
return n%2 == 0
}
@@ -217,3 +217,9 @@ func hashRow(sctx *sql.Context, r sql.Row) (uint64, error) {
}
return sql.HashOf(r)
}
func assertTrue(b bool) {
if !b {
panic("expected true")
}
}
+1 -1
View File
@@ -5,7 +5,7 @@ setup() {
skip_nbf_dolt_1
skip_nbf_dolt_dev
TARGET_NBF="__DOLT_DEV__"
TARGET_NBF="__DOLT_1__"
setup_common
}