mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-26 03:30:09 -05:00
go/doltcore/rowconv: cleaup pkg rowconv
This commit is contained in:
@@ -20,13 +20,9 @@ import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/rowconv"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/row"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/set"
|
||||
@@ -677,130 +673,6 @@ func (refOp ForeignKeyReferentialAction) ReducedString() string {
|
||||
}
|
||||
}
|
||||
|
||||
// ValidateData ensures that the foreign key is valid by comparing the index data from the given table
|
||||
// against the index data from the referenced table. Returns an error for each violation.
|
||||
func (fk ForeignKey) ValidateData(
|
||||
ctx context.Context,
|
||||
childSch schema.Schema,
|
||||
childRowData, childIdxData, parentIdxData types.Map,
|
||||
childDef, parentDef schema.Index,
|
||||
vrw types.ValueReadWriter,
|
||||
) error {
|
||||
// Any unresolved foreign key does not yet reference any indexes, therefore there's no need to error here.
|
||||
// Data will be validated whenever the foreign key is resolved.
|
||||
if !fk.IsResolved() {
|
||||
return nil
|
||||
}
|
||||
if fk.ReferencedTableIndex != parentDef.Name() {
|
||||
return fmt.Errorf("cannot validate data as wrong referenced index was given: expected `%s` but received `%s`",
|
||||
fk.ReferencedTableIndex, parentDef.Name())
|
||||
}
|
||||
|
||||
tagMap := make(map[uint64]uint64, len(fk.TableColumns))
|
||||
for i, childTag := range fk.TableColumns {
|
||||
tagMap[childTag] = fk.ReferencedTableColumns[i]
|
||||
}
|
||||
|
||||
// FieldMappings ignore columns not in the tagMap
|
||||
fm, err := rowconv.NewFieldMapping(childDef.Schema(), parentDef.Schema(), tagMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rc, err := rowconv.NewRowConverter(ctx, vrw, fm)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
rdr, err := noms.NewNomsMapReader(ctx, childIdxData, childDef.Schema())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var violatingRows []row.Row
|
||||
for {
|
||||
childIdxRow, err := rdr.ReadRow(ctx)
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Check if there are any NULL values, as they should be skipped
|
||||
hasNulls := false
|
||||
_, err = childIdxRow.IterSchema(childDef.Schema(), func(tag uint64, val types.Value) (stop bool, err error) {
|
||||
if types.IsNull(val) {
|
||||
hasNulls = true
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if hasNulls {
|
||||
continue
|
||||
}
|
||||
|
||||
parentIdxRow, err := rc.Convert(childIdxRow)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if row.IsEmpty(parentIdxRow) {
|
||||
continue
|
||||
}
|
||||
|
||||
partial, err := row.ReduceToIndexPartialKey(parentDef, parentIdxRow)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
indexIter := noms.NewNomsRangeReader(parentDef.Schema(), parentIdxData,
|
||||
[]*noms.ReadRange{{Start: partial, Inclusive: true, Reverse: false, Check: noms.InRangeCheckPartial(partial)}},
|
||||
)
|
||||
|
||||
switch _, err = indexIter.ReadRow(ctx); err {
|
||||
case nil:
|
||||
continue // parent table contains child key
|
||||
case io.EOF:
|
||||
childFullKey, err := childIdxRow.NomsMapKey(childDef.Schema()).Value(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
childKey, err := childDef.ToTableTuple(ctx, childFullKey.(types.Tuple), childIdxRow.Format())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
childVal, ok, err := childRowData.MaybeGetTuple(ctx, childKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !ok {
|
||||
childKeyStr, _ := types.EncodedValue(ctx, childKey)
|
||||
return fmt.Errorf("could not find row value for key %s on table `%s`", childKeyStr, fk.TableName)
|
||||
}
|
||||
childRow, err := row.FromNoms(childSch, childKey, childVal)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
violatingRows = append(violatingRows, childRow)
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if len(violatingRows) == 0 {
|
||||
return nil
|
||||
} else {
|
||||
return &ForeignKeyViolationError{
|
||||
ForeignKey: fk,
|
||||
Schema: childSch,
|
||||
ViolationRows: violatingRows,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ColumnHasFkRelationship returns a foreign key that uses this tag. Returns n
|
||||
func (fkc *ForeignKeyCollection) ColumnHasFkRelationship(tag uint64) (ForeignKey, bool) {
|
||||
for _, key := range fkc.AllKeys() {
|
||||
|
||||
@@ -21,9 +21,7 @@ import (
|
||||
|
||||
"github.com/dolthub/dolt/go/cmd/dolt/errhand"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/table/untyped"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/filesys"
|
||||
"github.com/dolthub/dolt/go/libraries/utils/set"
|
||||
)
|
||||
|
||||
// ErrMappingFileRead is an error returned when a mapping file cannot be read
|
||||
@@ -43,12 +41,6 @@ func (err *BadMappingErr) Error() string {
|
||||
return fmt.Sprintf("Mapping file attempted to map %s to %s, but one or both of those fields are unknown.", err.srcField, err.destField)
|
||||
}
|
||||
|
||||
// IsBadMappingErr returns true if the error is a BadMappingErr
|
||||
func IsBadMappingErr(err error) bool {
|
||||
_, ok := err.(*BadMappingErr)
|
||||
return ok
|
||||
}
|
||||
|
||||
// NameMapper is a simple interface for mapping a string to another string
|
||||
type NameMapper map[string]string
|
||||
|
||||
@@ -83,34 +75,6 @@ type FieldMapping struct {
|
||||
SrcToDest map[uint64]uint64
|
||||
}
|
||||
|
||||
// MapsAllDestPKs checks that each PK column in DestSch has a corresponding column in SrcSch
|
||||
func (fm *FieldMapping) MapsAllDestPKs() bool {
|
||||
ds := set.NewUint64Set(nil)
|
||||
for _, v := range fm.SrcToDest {
|
||||
ds.Add(v)
|
||||
}
|
||||
for _, tag := range fm.DestSch.GetPKCols().Tags {
|
||||
if !ds.Contains(tag) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func InvertMapping(fm *FieldMapping) *FieldMapping {
|
||||
invertedMap := make(map[uint64]uint64)
|
||||
|
||||
for k, v := range fm.SrcToDest {
|
||||
invertedMap[v] = k
|
||||
}
|
||||
|
||||
return &FieldMapping{
|
||||
SrcSch: fm.DestSch,
|
||||
DestSch: fm.SrcSch,
|
||||
SrcToDest: invertedMap,
|
||||
}
|
||||
}
|
||||
|
||||
// NewFieldMapping creates a FieldMapping from a source schema, a destination schema, and a map from tags in the source
|
||||
// schema to tags in the dest schema.
|
||||
func NewFieldMapping(srcSch, destSch schema.Schema, srcTagToDestTag map[uint64]uint64) (*FieldMapping, error) {
|
||||
@@ -131,15 +95,6 @@ func NewFieldMapping(srcSch, destSch schema.Schema, srcTagToDestTag map[uint64]u
|
||||
return &FieldMapping{srcSch, destSch, srcTagToDestTag}, nil
|
||||
}
|
||||
|
||||
// Returns the identity mapping for the schema given.
|
||||
func IdentityMapping(sch schema.Schema) *FieldMapping {
|
||||
fieldMapping, err := TagMapping(sch, sch)
|
||||
if err != nil {
|
||||
panic("Error creating identity mapping")
|
||||
}
|
||||
return fieldMapping
|
||||
}
|
||||
|
||||
// TagMapping takes a source schema and a destination schema and maps all columns which have a matching tag in the
|
||||
// source and destination schemas.
|
||||
func TagMapping(srcSch, destSch schema.Schema) (*FieldMapping, error) {
|
||||
@@ -223,41 +178,6 @@ func NameMapperFromFile(mappingFile string, FS filesys.ReadableFS) (NameMapper,
|
||||
return nm, nil
|
||||
}
|
||||
|
||||
// TagMappingWithNameFallback takes a source schema and a destination schema and maps columns
|
||||
// by matching tags first, then attempts to match by column name for any columns that didn't
|
||||
// match with an exact tag.
|
||||
func TagMappingWithNameFallback(srcSch, destSch schema.Schema) (*FieldMapping, error) {
|
||||
successes := 0
|
||||
srcCols := srcSch.GetAllCols()
|
||||
destCols := destSch.GetAllCols()
|
||||
|
||||
srcToDest := make(map[uint64]uint64, destCols.Size())
|
||||
err := destCols.Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
|
||||
srcCol, ok := srcCols.GetByTag(tag)
|
||||
if !ok {
|
||||
srcCol, ok = srcCols.GetByName(col.Name)
|
||||
|
||||
if !ok {
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
srcToDest[srcCol.Tag] = col.Tag
|
||||
successes++
|
||||
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if successes == 0 {
|
||||
return nil, ErrEmptyMapping
|
||||
}
|
||||
|
||||
return NewFieldMapping(srcSch, destSch, srcToDest)
|
||||
}
|
||||
|
||||
// TagMappingByTagAndName takes a source schema and a destination schema and maps
|
||||
// pks by tag and non-pks by name.
|
||||
func TagMappingByTagAndName(srcSch, destSch schema.Schema) (*FieldMapping, error) {
|
||||
@@ -294,29 +214,3 @@ func TagMappingByTagAndName(srcSch, destSch schema.Schema) (*FieldMapping, error
|
||||
|
||||
return NewFieldMapping(srcSch, destSch, srcToDest)
|
||||
}
|
||||
|
||||
// TypedToUntypedMapping takes a schema and creates a mapping to an untyped schema with all the same columns.
|
||||
func TypedToUntypedMapping(sch schema.Schema) (*FieldMapping, error) {
|
||||
untypedSch, err := untyped.UntypeSchema(sch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
identityMap := make(map[uint64]uint64)
|
||||
err = sch.GetAllCols().Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
|
||||
identityMap[tag] = tag
|
||||
return false, nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mapping, err := NewFieldMapping(sch, untypedSch, identityMap)
|
||||
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return mapping, nil
|
||||
}
|
||||
|
||||
@@ -105,10 +105,6 @@ func TestFieldMapping(t *testing.T) {
|
||||
if !reflect.DeepEqual(mapping.SrcToDest, test.expected) {
|
||||
t.Error("Mapping does not match expected. Expected:", test.expected, "Actual:", mapping.SrcToDest)
|
||||
}
|
||||
|
||||
//if test.identity != mapping.IsIdentityMapping() {
|
||||
// t.Error("identity expected", test.identity, "actual:", !test.identity)
|
||||
//}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,12 +100,6 @@ func (rc *RowConverter) ConvertWithWarnings(inRow row.Row, warnFn WarnFunction)
|
||||
return rc.convert(inRow, warnFn)
|
||||
}
|
||||
|
||||
// Convert takes an input row, maps its columns to destination columns, and performs any type conversion needed to
|
||||
// create a row of the expected destination schema.
|
||||
func (rc *RowConverter) Convert(inRow row.Row) (row.Row, error) {
|
||||
return rc.convert(inRow, nil)
|
||||
}
|
||||
|
||||
// convert takes a row and maps its columns to their destination columns, automatically performing any type conversion
|
||||
// needed, and using the optional WarnFunction to let callers log warnings on any type conversion errors.
|
||||
func (rc *RowConverter) convert(inRow row.Row, warnFn WarnFunction) (row.Row, error) {
|
||||
|
||||
@@ -17,12 +17,7 @@ package rowconv
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/row"
|
||||
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
|
||||
"github.com/dolthub/dolt/go/store/types"
|
||||
)
|
||||
@@ -39,52 +34,6 @@ var srcCols = schema.NewColCollection(
|
||||
|
||||
var srcSch = schema.MustSchemaFromCols(srcCols)
|
||||
|
||||
func TestRowConverter(t *testing.T) {
|
||||
mapping, err := TypedToUntypedMapping(srcSch)
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
vrw := types.NewMemoryValueStore()
|
||||
rConv, err := NewRowConverter(context.Background(), vrw, mapping)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("Error creating row converter")
|
||||
}
|
||||
|
||||
id, _ := uuid.NewRandom()
|
||||
tt := types.Timestamp(time.Now())
|
||||
inRow, err := row.New(vrw.Format(), srcSch, row.TaggedValues{
|
||||
0: types.UUID(id),
|
||||
1: types.Float(1.25),
|
||||
2: types.Uint(12345678),
|
||||
3: types.Bool(true),
|
||||
4: types.Int(-1234),
|
||||
5: types.String("string string string"),
|
||||
6: tt,
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
outData, err := rConv.Convert(inRow)
|
||||
require.NoError(t, err)
|
||||
|
||||
destSch := mapping.DestSch
|
||||
expected, err := row.New(vrw.Format(), destSch, row.TaggedValues{
|
||||
0: types.String(id.String()),
|
||||
1: types.String("1.25"),
|
||||
2: types.String("12345678"),
|
||||
3: types.String("1"),
|
||||
4: types.String("-1234"),
|
||||
5: types.String("string string string"),
|
||||
6: types.String(tt.String()),
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
if !row.AreEqual(outData, expected, destSch) {
|
||||
t.Error("\n", row.Fmt(context.Background(), expected, destSch), "!=\n", row.Fmt(context.Background(), outData, destSch))
|
||||
}
|
||||
}
|
||||
|
||||
func TestUnneccessaryConversion(t *testing.T) {
|
||||
mapping, err := TagMapping(srcSch, srcSch)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user