From 791a128fa3d526827866820c84df8f84f2e3d2ca Mon Sep 17 00:00:00 2001 From: Vinai Rachakonda Date: Fri, 26 Nov 2021 15:42:26 -0800 Subject: [PATCH] Move Import Write Path to Engine (#2426) --- go/cmd/dolt/commands/engine/sqlengine.go | 4 +- go/cmd/dolt/commands/tblcmds/import.go | 340 +++++++++++++----- go/go.mod | 2 +- go/go.sum | 4 +- .../doltcore/mvdata/channel_row_source.go | 98 +++++ go/libraries/doltcore/mvdata/data_mover.go | 51 ++- go/libraries/doltcore/mvdata/engine_mover.go | 314 ++++++++++++++++ .../doltcore/rowconv/row_converter.go | 52 --- .../doltcore/rowconv/row_converter_test.go | 42 --- go/libraries/doltcore/sqle/sqlddl_test.go | 2 +- go/libraries/doltcore/sqle/sqlutil/sql_row.go | 4 + .../doltcore/table/pipeline/errors.go | 16 +- .../doltcore/table/pipeline/errors_test.go | 2 +- .../doltcore/table/pipeline/procfunc_help.go | 4 +- .../doltcore/table/pipeline/transform.go | 2 +- integration-tests/bats/default-values.bats | 33 ++ .../bats/import-create-tables.bats | 13 +- .../bats/import-update-tables.bats | 117 ++++-- integration-tests/bats/index.bats | 9 +- integration-tests/bats/triggers.bats | 22 ++ 20 files changed, 869 insertions(+), 262 deletions(-) create mode 100644 go/libraries/doltcore/mvdata/channel_row_source.go create mode 100644 go/libraries/doltcore/mvdata/engine_mover.go diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index df632ac20a..2414371b14 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -54,6 +54,7 @@ func NewSqlEngine( format PrintResultFormat, initialDb string, autocommit bool) (*SqlEngine, error) { + au := new(auth.None) parallelism := runtime.GOMAXPROCS(0) @@ -68,8 +69,7 @@ func NewSqlEngine( pro := dsqle.NewDoltDatabaseProvider(mrEnv.Config(), mrEnv.FileSystem(), all...) - engine := gms.New(analyzer.NewBuilder(pro).WithParallelism( - parallelism).Build(), &gms.Config{Auth: au}) + engine := gms.New(analyzer.NewBuilder(pro).WithParallelism(parallelism).Build(), &gms.Config{Auth: au}) if dbg, ok := os.LookupEnv("DOLT_SQL_DEBUG_LOG"); ok && strings.ToLower(dbg) == "true" { engine.Analyzer.Debug = true diff --git a/go/cmd/dolt/commands/tblcmds/import.go b/go/cmd/dolt/commands/tblcmds/import.go index b6154f30a2..e2fdc9957b 100644 --- a/go/cmd/dolt/commands/tblcmds/import.go +++ b/go/cmd/dolt/commands/tblcmds/import.go @@ -16,13 +16,15 @@ package tblcmds import ( "context" - "errors" "fmt" "io" "os" "strings" + "sync/atomic" + "github.com/dolthub/go-mysql-server/sql" "github.com/fatih/color" + "golang.org/x/sync/errgroup" "github.com/dolthub/dolt/go/cmd/dolt/cli" "github.com/dolthub/dolt/go/cmd/dolt/commands" @@ -32,11 +34,12 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/mvdata" + "github.com/dolthub/dolt/go/libraries/doltcore/row" "github.com/dolthub/dolt/go/libraries/doltcore/rowconv" "github.com/dolthub/dolt/go/libraries/doltcore/schema" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil" "github.com/dolthub/dolt/go/libraries/doltcore/table" - "github.com/dolthub/dolt/go/libraries/doltcore/table/editor" - "github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms" + "github.com/dolthub/dolt/go/libraries/doltcore/table/pipeline" "github.com/dolthub/dolt/go/libraries/utils/argparser" "github.com/dolthub/dolt/go/libraries/utils/filesys" "github.com/dolthub/dolt/go/libraries/utils/funcitr" @@ -87,17 +90,8 @@ In create, update, and replace scenarios the file's extension is used to infer t }, } -type tableImportOp string - -const ( - CreateOp tableImportOp = "overwrite" - ReplaceOp tableImportOp = "replace" - UpdateOp tableImportOp = "update" - InvalidOp tableImportOp = "invalid" -) - type importOptions struct { - operation tableImportOp + operation mvdata.TableImportOp tableName string contOnErr bool force bool @@ -136,7 +130,7 @@ func (m importOptions) FloatThreshold() float64 { } func (m importOptions) checkOverwrite(ctx context.Context, root *doltdb.RootValue, fs filesys.ReadableFS) (bool, error) { - if !m.force && m.operation == CreateOp { + if !m.force && m.operation == mvdata.CreateOp { return m.dest.Exists(ctx, root, fs) } return false, nil @@ -208,17 +202,17 @@ func getImportMoveOptions(ctx context.Context, apr *argparser.ArgParseResults, d } } - var moveOp tableImportOp + var moveOp mvdata.TableImportOp switch { case apr.Contains(createParam): - moveOp = CreateOp + moveOp = mvdata.CreateOp case apr.Contains(replaceParam): - moveOp = ReplaceOp + moveOp = mvdata.ReplaceOp default: - moveOp = UpdateOp + moveOp = mvdata.UpdateOp } - if moveOp != CreateOp { + if moveOp != mvdata.CreateOp { root, err := dEnv.WorkingRoot(ctx) if err != nil { return nil, errhand.VerboseErrorFromError(err) @@ -383,26 +377,54 @@ func (cmd ImportCmd) Exec(ctx context.Context, commandStr string, args []string, return commands.HandleVErrAndExitCode(verr, usage) } - mover, nDMErr := newImportDataMover(ctx, root, dEnv, mvOpts, importStatsCB) - + rd, nDMErr := newImportDataReader(ctx, root, dEnv, mvOpts) if nDMErr != nil { - verr = newDataMoverErrToVerr(mvOpts, nDMErr) return commands.HandleVErrAndExitCode(verr, usage) } - skipped, verr := mvdata.MoveData(ctx, dEnv, mover, mvOpts) + wrSch, nDMErr := getWriterSchema(ctx, root, dEnv, rd.GetSchema(), mvOpts) + if nDMErr != nil { + verr = newDataMoverErrToVerr(mvOpts, nDMErr) + return commands.HandleVErrAndExitCode(verr, usage) + } + + wr, nDMErr := newImportDataWriter(ctx, dEnv, wrSch, mvOpts) + if nDMErr != nil { + verr = newDataMoverErrToVerr(mvOpts, nDMErr) + return commands.HandleVErrAndExitCode(verr, usage) + } + + skipped, err := move(ctx, rd, wr, mvOpts) + if err != nil { + if pipeline.IsTransformFailure(err) { + bdr := errhand.BuildDError("\nA bad row was encountered while moving data.") + r := pipeline.GetTransFailureSqlRow(err) + + if r != nil { + bdr.AddDetails("Bad Row: " + sql.FormatRow(r)) + } + + details := pipeline.GetTransFailureDetails(err) + + bdr.AddDetails(details) + bdr.AddDetails("These can be ignored using '--continue'") + + return commands.HandleVErrAndExitCode(bdr.Build(), usage) + } + + verr = errhand.BuildDError("An error occurred moving data:\n").AddCause(err).Build() + return commands.HandleVErrAndExitCode(verr, usage) + } cli.PrintErrln() if skipped > 0 { cli.PrintErrln(color.YellowString("Lines skipped: %d", skipped)) } - if verr == nil { - cli.PrintErrln(color.CyanString("Import completed successfully.")) - } + cli.PrintErrln(color.CyanString("Import completed successfully.")) - return commands.HandleVErrAndExitCode(verr, usage) + return 0 } var displayStrLen int @@ -414,9 +436,10 @@ func importStatsCB(stats types.AppliedEditStats) { displayStrLen = cli.DeleteAndPrint(displayStrLen, displayStr) } -func newImportDataMover(ctx context.Context, root *doltdb.RootValue, dEnv *env.DoltEnv, impOpts *importOptions, statsCB noms.StatsCB) (*mvdata.DataMover, *mvdata.DataMoverCreationError) { +func newImportDataReader(ctx context.Context, root *doltdb.RootValue, dEnv *env.DoltEnv, impOpts *importOptions) (table.TableReadCloser, *mvdata.DataMoverCreationError) { var err error + // Checks whether import destination table already exists. This can probably be simplified to not need a root value... ow, err := impOpts.checkOverwrite(ctx, root, dEnv.FS) if err != nil { return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.CreateReaderErr, Cause: err} @@ -425,83 +448,156 @@ func newImportDataMover(ctx context.Context, root *doltdb.RootValue, dEnv *env.D return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.CreateReaderErr, Cause: fmt.Errorf("%s already exists. Use -f to overwrite.", impOpts.DestName())} } - wrSch, dmce := getImportSchema(ctx, root, dEnv.FS, impOpts) - if dmce != nil { - return nil, dmce - } - - rd, srcIsSorted, err := impOpts.src.NewReader(ctx, root, dEnv.FS, impOpts.srcOptions) + rd, _, err := impOpts.src.NewReader(ctx, root, dEnv.FS, impOpts.srcOptions) if err != nil { return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.CreateReaderErr, Cause: err} } - defer func() { - if rd != nil { - rd.Close(ctx) - } - }() + return rd, nil +} - err = wrSch.GetPKCols().Iter(func(tag uint64, col schema.Column) (stop bool, err error) { - preImage := impOpts.nameMapper.PreImage(col.Name) - _, found := rd.GetSchema().GetAllCols().GetByName(preImage) +func getWriterSchema(ctx context.Context, root *doltdb.RootValue, dEnv *env.DoltEnv, rdSchema schema.Schema, imOpts *importOptions) (schema.Schema, *mvdata.DataMoverCreationError) { + wrSch, dmce := getImportSchema(ctx, root, dEnv.FS, imOpts) + if dmce != nil { + return nil, dmce + } + + err := wrSch.GetPKCols().Iter(func(tag uint64, col schema.Column) (stop bool, err error) { + preImage := imOpts.nameMapper.PreImage(col.Name) + _, found := rdSchema.GetAllCols().GetByName(preImage) if !found { err = fmt.Errorf("input primary keys do not match primary keys of existing table") } return err == nil, err }) - if err != nil { return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.SchemaErr, Cause: err} } - transforms, fieldMapping, err := mvdata.NameMapTransform(ctx, root.VRW(), rd.GetSchema(), wrSch, impOpts.nameMapper) - if err != nil { - return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.CreateMapperErr, Cause: err} - } + // allow subsetting of the final write schema only if it is an update operation. Every other operation must + // match perfectly. + if wrSch.GetAllCols().Size() != rdSchema.GetAllCols().Size() && imOpts.operation == mvdata.UpdateOp { + ret := schema.NewColCollection() - // read tags will be the tags of read rows which come from the imported data. Being able to distinguish columns coming - // from the import data allows us to merge the data with existing rows - rdTags := make([]uint64, 0, len(fieldMapping.SrcToDest)) - for _, tag := range fieldMapping.SrcToDest { - rdTags = append(rdTags, tag) - } + rdSchema.GetAllCols().Iter(func(tag uint64, col schema.Column) (stop bool, err error) { + wrColName := imOpts.nameMapper.Map(col.Name) + wrCol, ok := wrSch.GetAllCols().GetByName(wrColName) + if ok { + ret = ret.Append(wrCol) + } - bulkTeaf := editor.NewBulkImportTEAFactory(root.VRW().Format(), root.VRW(), dEnv.TempTableFilesDir()) - opts := editor.Options{Deaf: bulkTeaf} + return false, nil + }) - var wr table.TableWriteCloser - switch impOpts.operation { - case CreateOp: - filePath, err := dEnv.FS.Abs(impOpts.DestName()) + newSch, err := schema.SchemaFromCols(ret) if err != nil { - return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.CreateWriterErr, Cause: err} + return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.SchemaErr, Cause: err} } - writer, err := dEnv.FS.OpenForWrite(filePath, os.ModePerm) - if err != nil { - return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.CreateWriterErr, Cause: err} - } - wr, err = impOpts.dest.NewCreatingWriter(ctx, impOpts, root, srcIsSorted, wrSch, statsCB, opts, writer) - case ReplaceOp: - wr, err = impOpts.dest.NewReplacingWriter(ctx, impOpts, root, srcIsSorted, wrSch, statsCB, opts) - case UpdateOp: - wr, err = impOpts.dest.NewUpdatingWriter(ctx, impOpts, root, srcIsSorted, wrSch, statsCB, rdTags, opts) - default: - err = errors.New("invalid move operation") + + return newSch, nil } + return wrSch, nil +} + +func newImportDataWriter(ctx context.Context, dEnv *env.DoltEnv, wrSchema schema.Schema, imOpts *importOptions) (mvdata.DataWriter, *mvdata.DataMoverCreationError) { + moveOps := &mvdata.MoverOptions{Force: imOpts.force, TableToWriteTo: imOpts.tableName, ContinueOnErr: imOpts.contOnErr, Operation: imOpts.operation} + + mv, err := mvdata.NewSqlEngineMover(ctx, dEnv, wrSchema, moveOps, importStatsCB) if err != nil { return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.CreateWriterErr, Cause: err} } - imp := &mvdata.DataMover{ - Rd: rd, - Transforms: transforms, - Wr: wr, - ContOnErr: impOpts.contOnErr, - } - rd = nil + return mv, nil +} - return imp, nil +func move(ctx context.Context, rd table.TableReadCloser, wr mvdata.DataWriter, options *importOptions) (int64, error) { + g, ctx := errgroup.WithContext(ctx) + + // Setup the necessary data points for the import job + parsedRowChan := make(chan sql.Row) + var rowErr error + var printStarted bool + var badCount int64 + badRowCB := func(trf *pipeline.TransformRowFailure) (quit bool) { + if !options.contOnErr { + rowErr = trf + return true + } + + if !printStarted { + cli.PrintErrln("The following rows were skipped:") + printStarted = true + } + + r := pipeline.GetTransFailureSqlRow(trf) + + if r != nil { + cli.PrintErr(sql.FormatRow(r)) + } + + atomic.AddInt64(&badCount, 1) + return false + } + + // Start the group that reads rows from the reader + g.Go(func() error { + defer close(parsedRowChan) + + for { + r, err := rd.ReadRow(ctx) + if err != nil { + if err == io.EOF { + return nil + } else if table.IsBadRow(err) { + dRow, _ := sqlutil.DoltRowToSqlRow(r, rd.GetSchema()) + trf := &pipeline.TransformRowFailure{Row: nil, SqlRow: dRow, TransformName: "reader", Details: err.Error()} + quit := badRowCB(trf) + if quit { + return trf + } + } else { + return err + } + } else { + dRow, err := transformToDoltRow(r, rd.GetSchema(), wr.Schema(), options.nameMapper) + if err != nil { + return err + } + + select { + case <-ctx.Done(): + return ctx.Err() + case parsedRowChan <- dRow: + } + } + } + }) + + // Start the group that writes rows + g.Go(func() error { + err := wr.WriteRows(ctx, parsedRowChan, badRowCB) + if err != nil { + return err + } + + return nil + }) + + err := g.Wait() + if err != nil && err != io.EOF { + return badCount, err + } + if rowErr != nil { + return badCount, rowErr + } + + err = wr.Commit(ctx) + if err != nil { + return badCount, err + } + + return badCount, nil } func getImportSchema(ctx context.Context, root *doltdb.RootValue, fs filesys.Filesys, impOpts *importOptions) (schema.Schema, *mvdata.DataMoverCreationError) { @@ -519,7 +615,7 @@ func getImportSchema(ctx context.Context, root *doltdb.RootValue, fs filesys.Fil return out, nil } - if impOpts.operation == CreateOp { + if impOpts.operation == mvdata.CreateOp { if impOpts.srcIsStream() { // todo: capture stream data to file so we can use schema inferrence return nil, nil @@ -607,3 +703,87 @@ func newDataMoverErrToVerr(mvOpts *importOptions, err *mvdata.DataMoverCreationE panic("Unhandled Error type") } + +// transformToDoltRow does 1) Convert to a sql.Row 2) Matches the read and write schema with subsetting and name matching. +// 3) Addresses any type inconsistencies. +func transformToDoltRow(row row.Row, rdSchema schema.Schema, wrSchema sql.Schema, nameMapper rowconv.NameMapper) (sql.Row, error) { + doltRow, err := sqlutil.DoltRowToSqlRow(row, rdSchema) + if err != nil { + return nil, err + } + + for i, col := range wrSchema { + switch col.Type { + case sql.Boolean, sql.Int8, sql.MustCreateBitType(1): // TODO: noms bool wraps MustCreateBitType + val, ok := stringToBoolean(doltRow[i].(string)) + if ok { + doltRow[i] = val + } + } + + switch col.Type.(type) { + case sql.StringType: + default: + doltRow[i] = emptyStringToNil(doltRow[i]) + } + } + + rdSchemaAsDoltSchema, err := sqlutil.FromDoltSchema(wrSchema[0].Source, rdSchema) + if err != nil { + return nil, err + } + + doltRow = matchReadSchemaToWriteSchema(doltRow, rdSchemaAsDoltSchema, wrSchema, nameMapper) + return doltRow, nil +} + +func stringToBoolean(s string) (result bool, canConvert bool) { + lower := strings.ToLower(s) + switch lower { + case "true": + return true, true + case "false": + return false, true + case "0": + return false, true + case "1": + return true, false + default: + return false, false + } +} + +func emptyStringToNil(val interface{}) interface{} { + if val == nil { + return val + } + + if s, canConvert := val.(string); canConvert { + if s == "" { + return nil + } + } + + return val +} + +// matchReadSchemaToWriteSchema takes the read schema and accounts for subsetting and mapper (-m) differences. +func matchReadSchemaToWriteSchema(row sql.Row, rdSchema, wrSchema sql.Schema, nameMapper rowconv.NameMapper) sql.Row { + returnRow := sql.Row{} + + for _, wCol := range wrSchema { + seen := false + for rIdx, rCol := range rdSchema { + if rCol.Name == nameMapper.PreImage(wCol.Name) { + returnRow = append(returnRow, row[rIdx]) + seen = true + } + } + + if !seen { + returnRow = append(returnRow, nil) + } + } + + return returnRow +} diff --git a/go/go.mod b/go/go.mod index 27ffcb8ad0..24521c6650 100644 --- a/go/go.mod +++ b/go/go.mod @@ -19,7 +19,7 @@ require ( github.com/denisbrodbeck/machineid v1.0.1 github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi v0.0.0-20201005193433-3ee972b1d078 github.com/dolthub/fslock v0.0.3 - github.com/dolthub/go-mysql-server v0.11.1-0.20211123231205-91cc0c1ae82f + github.com/dolthub/go-mysql-server v0.11.1-0.20211126215923-37e939b19247 github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446 github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66 github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81 diff --git a/go/go.sum b/go/go.sum index a1d6306c5b..2ce548c6bb 100755 --- a/go/go.sum +++ b/go/go.sum @@ -173,8 +173,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U= github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0= -github.com/dolthub/go-mysql-server v0.11.1-0.20211123231205-91cc0c1ae82f h1:WqEdr++dbFB1a4cAlzLAg+em1sc4Nigb8UxSNlBXSSw= -github.com/dolthub/go-mysql-server v0.11.1-0.20211123231205-91cc0c1ae82f/go.mod h1:q1U4zfAUIVcpAAASqlcmGJ6DqZ2V3LiFseAm97TqJTY= +github.com/dolthub/go-mysql-server v0.11.1-0.20211126215923-37e939b19247 h1:8NuMyxbitJPTXh3qam7LOwvc1eKguUKDsxb+aKVCdWY= +github.com/dolthub/go-mysql-server v0.11.1-0.20211126215923-37e939b19247/go.mod h1:q1U4zfAUIVcpAAASqlcmGJ6DqZ2V3LiFseAm97TqJTY= github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446 h1:0ol5pj+QlKUKAtqs1LiPM3ZJKs+rHPgLSsMXmhTrCAM= github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446/go.mod h1:dhGBqcCEfK5kuFmeO5+WOx3hqc1k3M29c1oS/R7N4ms= github.com/dolthub/jsonpath v0.0.0-20210609232853-d49537a30474 h1:xTrR+l5l+1Lfq0NvhiEsctylXinUMFhhsqaEcl414p8= diff --git a/go/libraries/doltcore/mvdata/channel_row_source.go b/go/libraries/doltcore/mvdata/channel_row_source.go new file mode 100644 index 0000000000..924ecead7c --- /dev/null +++ b/go/libraries/doltcore/mvdata/channel_row_source.go @@ -0,0 +1,98 @@ +// Copyright 2020-2021 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mvdata + +import ( + "fmt" + "io" + + "github.com/dolthub/go-mysql-server/sql" +) + +// ChannelRowSource is a sql.Node that wraps a channel as a sql.RowIter. +type ChannelRowSource struct { + schema sql.Schema + rowChannel chan sql.Row +} + +// NewChannelRowSource returns a ChannelRowSource object. +func NewChannelRowSource(schema sql.Schema, rowChannel chan sql.Row) *ChannelRowSource { + return &ChannelRowSource{schema: schema, rowChannel: rowChannel} +} + +var _ sql.Node = (*ChannelRowSource)(nil) + +// Resolved implements the sql.Node interface. +func (c *ChannelRowSource) Resolved() bool { + return true +} + +// String implements the sql.Node interface. +func (c *ChannelRowSource) String() string { + return fmt.Sprintf("ChannelRowSource()") +} + +// Schema implements the sql.Node interface. +func (c *ChannelRowSource) Schema() sql.Schema { + return c.schema +} + +// Children implements the sql.Node interface. +func (c *ChannelRowSource) Children() []sql.Node { + return nil +} + +// RowIter implements the sql.Node interface. +func (c *ChannelRowSource) RowIter(ctx *sql.Context, row sql.Row) (sql.RowIter, error) { + return &channelRowIter{ + rowChannel: c.rowChannel, + ctx: ctx, + }, nil +} + +// WithChildren implements the sql.Node interface. +func (c *ChannelRowSource) WithChildren(children ...sql.Node) (sql.Node, error) { + if len(children) != 0 { + return nil, sql.ErrInvalidChildrenNumber.New(c, len(children), 0) + } + + return c, nil +} + +// channelRowIter wraps the channel under the sql.RowIter interface +type channelRowIter struct { + rowChannel chan sql.Row + ctx *sql.Context +} + +var _ sql.RowIter = (*channelRowIter)(nil) + +// Next implements the sql.RowIter interface. +func (c *channelRowIter) Next() (sql.Row, error) { + for r := range c.rowChannel { + select { + case <-c.ctx.Done(): + return nil, io.EOF + default: + return r, nil + } + } + return nil, io.EOF +} + +// Close implements the sql.RowIter interface. +func (c *channelRowIter) Close(context *sql.Context) error { + return nil +} diff --git a/go/libraries/doltcore/mvdata/data_mover.go b/go/libraries/doltcore/mvdata/data_mover.go index 4d1f1480b3..ae23048aa3 100644 --- a/go/libraries/doltcore/mvdata/data_mover.go +++ b/go/libraries/doltcore/mvdata/data_mover.go @@ -22,22 +22,21 @@ import ( "fmt" "sync/atomic" - "github.com/dolthub/dolt/go/cmd/dolt/cli" - "github.com/dolthub/dolt/go/libraries/doltcore/table/untyped/csv" + "github.com/dolthub/go-mysql-server/sql" + "github.com/dolthub/dolt/go/cmd/dolt/cli" "github.com/dolthub/dolt/go/cmd/dolt/errhand" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/env/actions" "github.com/dolthub/dolt/go/libraries/doltcore/row" - "github.com/dolthub/dolt/go/libraries/doltcore/rowconv" "github.com/dolthub/dolt/go/libraries/doltcore/schema" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil" "github.com/dolthub/dolt/go/libraries/doltcore/table" "github.com/dolthub/dolt/go/libraries/doltcore/table/pipeline" + "github.com/dolthub/dolt/go/libraries/doltcore/table/untyped/csv" "github.com/dolthub/dolt/go/libraries/utils/filesys" "github.com/dolthub/dolt/go/libraries/utils/set" - "github.com/dolthub/dolt/go/store/types" ) type CsvOptions struct { @@ -53,6 +52,13 @@ type JSONOptions struct { SchFile string } +type MoverOptions struct { + ContinueOnErr bool + Force bool + TableToWriteTo string + Operation TableImportOp +} + type DataMoverOptions interface { WritesToTable() bool SrcName() string @@ -64,6 +70,12 @@ type DataMoverCloser interface { Flush(context.Context) (*doltdb.RootValue, error) } +type DataWriter interface { + WriteRows(ctx context.Context, inputChannel chan sql.Row, badRowCb func(*pipeline.TransformRowFailure) bool) error + Commit(ctx context.Context) error + Schema() sql.Schema +} + type DataMover struct { Rd table.TableReadCloser Transforms *pipeline.TransformCollection @@ -262,29 +274,6 @@ func MoveData(ctx context.Context, dEnv *env.DoltEnv, mover *DataMover, mvOpts D return badCount, nil } -// NameMapTransform creates a pipeline transform that converts rows from inSch to outSch based on a name mapping. -func NameMapTransform(ctx context.Context, vrw types.ValueReadWriter, inSch schema.Schema, outSch schema.Schema, mapper rowconv.NameMapper) (*pipeline.TransformCollection, *rowconv.FieldMapping, error) { - mapping, err := rowconv.NameMapping(inSch, outSch, mapper) - - if err != nil { - return nil, nil, err - } - - rconv, err := rowconv.NewImportRowConverter(ctx, vrw, mapping) - - if err != nil { - return nil, nil, err - } - - transforms := pipeline.NewTransformCollection() - if !rconv.IdentityConverter { - nt := pipeline.NewNamedTransform("Mapping transform", pipeline.GetRowConvTransformFunc(rconv)) - transforms.AppendTransforms(nt) - } - - return transforms, mapping, nil -} - // SchAndTableNameFromFile reads a SQL schema file and creates a Dolt schema from it. func SchAndTableNameFromFile(ctx context.Context, path string, fs filesys.ReadableFS, root *doltdb.RootValue) (string, schema.Schema, error) { if path != "" { @@ -352,3 +341,11 @@ func InferSchema(ctx context.Context, root *doltdb.RootValue, rd table.TableRead return schema.SchemaFromCols(newCols) } + +type TableImportOp string + +const ( + CreateOp TableImportOp = "overwrite" + ReplaceOp TableImportOp = "replace" + UpdateOp TableImportOp = "update" +) diff --git a/go/libraries/doltcore/mvdata/engine_mover.go b/go/libraries/doltcore/mvdata/engine_mover.go new file mode 100644 index 0000000000..48227b643a --- /dev/null +++ b/go/libraries/doltcore/mvdata/engine_mover.go @@ -0,0 +1,314 @@ +// Copyright 2021 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package mvdata + +import ( + "context" + "fmt" + "io" + "sync/atomic" + + "github.com/dolthub/go-mysql-server/sql" + "github.com/dolthub/go-mysql-server/sql/analyzer" + "github.com/dolthub/go-mysql-server/sql/expression" + "github.com/dolthub/go-mysql-server/sql/plan" + + "github.com/dolthub/dolt/go/cmd/dolt/commands/engine" + "github.com/dolthub/dolt/go/cmd/dolt/errhand" + "github.com/dolthub/dolt/go/libraries/doltcore/env" + "github.com/dolthub/dolt/go/libraries/doltcore/schema" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil" + "github.com/dolthub/dolt/go/libraries/doltcore/table/pipeline" + "github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms" + "github.com/dolthub/dolt/go/store/types" +) + +type sqlEngineMover struct { + se *engine.SqlEngine + sqlCtx *sql.Context + + tableName string + database string + wrSch sql.Schema + contOnErr bool + force bool + + statsCB noms.StatsCB + stats types.AppliedEditStats + statOps int32 + + importOption TableImportOp +} + +func NewSqlEngineMover(ctx context.Context, dEnv *env.DoltEnv, writeSch schema.Schema, options *MoverOptions, statsCB noms.StatsCB) (*sqlEngineMover, error) { + mrEnv, err := env.DoltEnvAsMultiEnv(ctx, dEnv) + if err != nil { + return nil, err + } + + // Choose the first DB as the current one. This will be the DB in the working dir if there was one there + var dbName string + mrEnv.Iter(func(name string, _ *env.DoltEnv) (stop bool, err error) { + dbName = name + return true, nil + }) + + se, err := engine.NewSqlEngine(ctx, mrEnv, engine.FormatCsv, dbName, false) + if err != nil { + return nil, err + } + + sqlCtx, err := se.NewContext(ctx) + if err != nil { + return nil, err + } + + dsess.DSessFromSess(sqlCtx.Session).EnableBatchedMode() + + err = sqlCtx.Session.SetSessionVariable(sqlCtx, sql.AutoCommitSessionVar, false) + if err != nil { + return nil, errhand.VerboseErrorFromError(err) + } + + doltSchema, err := sqlutil.FromDoltSchema(options.TableToWriteTo, writeSch) + if err != nil { + return nil, err + } + + return &sqlEngineMover{ + se: se, + contOnErr: options.ContinueOnErr, + force: options.Force, + + database: dbName, + tableName: options.TableToWriteTo, + wrSch: doltSchema, + + statsCB: statsCB, + importOption: options.Operation, + }, nil +} + +// StartWriting implements the DataWriter interface. +func (s *sqlEngineMover) WriteRows(ctx context.Context, inputChannel chan sql.Row, badRowCb func(*pipeline.TransformRowFailure) bool) (err error) { + s.sqlCtx, err = s.se.NewContext(ctx) + if err != nil { + return err + } + + err = s.forceDropTableIfNeeded() + if err != nil { + return err + } + + _, _, err = s.se.Query(s.sqlCtx, fmt.Sprintf("START TRANSACTION")) + if err != nil { + return err + } + + err = s.createOrEmptyTableIfNeeded() + if err != nil { + return err + } + + updateStats := func(row sql.Row) { + if row == nil { + return + } + + // If the length of the row does not match the schema then we have an update operation. + if len(row) != len(s.wrSch) { + oldRow := row[:len(row)/2] + newRow := row[len(row)/2:] + + if ok, err := oldRow.Equals(newRow, s.wrSch); err == nil { + if ok { + s.stats.SameVal++ + } else { + s.stats.Modifications++ + } + } + } else { + s.stats.Additions++ + } + } + + insertOrUpdateOperation, err := s.getInsertNode(inputChannel) + if err != nil { + return err + } + + iter, err := insertOrUpdateOperation.RowIter(s.sqlCtx, nil) + if err != nil { + return err + } + defer func() { + if err != nil { + iter.Close(s.sqlCtx) // save the error that should be propagated. + } else { + err = iter.Close(s.sqlCtx) + } + }() + + for { + if s.statsCB != nil && atomic.LoadInt32(&s.statOps) >= tableWriterStatUpdateRate { + atomic.StoreInt32(&s.statOps, 0) + s.statsCB(s.stats) + } + + row, err := iter.Next() + + // All other errors are handled by the errorHandler + if err == nil { + _ = atomic.AddInt32(&s.statOps, 1) + updateStats(row) + } else if err == io.EOF { + atomic.LoadInt32(&s.statOps) + atomic.StoreInt32(&s.statOps, 0) + s.statsCB(s.stats) + + return err + } else { + var offendingRow sql.Row + switch n := err.(type) { + case sql.WrappedInsertError: + offendingRow = n.OffendingRow + case sql.ErrInsertIgnore: + offendingRow = n.OffendingRow + } + + trf := &pipeline.TransformRowFailure{Row: nil, SqlRow: offendingRow, TransformName: "write", Details: err.Error()} + quit := badRowCb(trf) + if quit { + return trf + } + } + } +} + +// Commit implements the DataWriter interface. +func (s *sqlEngineMover) Commit(ctx context.Context) error { + _, _, err := s.se.Query(s.sqlCtx, "COMMIT") + return err +} + +// GetSchema implements the DataWriter interface. +func (s *sqlEngineMover) Schema() sql.Schema { + return s.wrSch +} + +// forceDropTableIfNeeded drop the given table in case the -f parameter is passed. +func (s *sqlEngineMover) forceDropTableIfNeeded() error { + if s.force { + _, _, err := s.se.Query(s.sqlCtx, fmt.Sprintf("DROP TABLE IF EXISTS %s", s.tableName)) + return err + } + + return nil +} + +// createOrEmptyTableIfNeeded either creates or truncates the table given a -c or -r parameter. +func (s *sqlEngineMover) createOrEmptyTableIfNeeded() error { + switch s.importOption { + case CreateOp: + return s.createTable() + case ReplaceOp: + _, _, err := s.se.Query(s.sqlCtx, fmt.Sprintf("TRUNCATE TABLE %s", s.tableName)) + return err + default: + return nil + } +} + +// createTable creates a table. +func (s *sqlEngineMover) createTable() error { + cr := plan.NewCreateTable(sql.UnresolvedDatabase(s.database), s.tableName, false, false, &plan.TableSpec{Schema: s.wrSch}) + analyzed, err := s.se.Analyze(s.sqlCtx, cr) + if err != nil { + return err + } + + analyzedQueryProcess := analyzer.StripQueryProcess(analyzed.(*plan.QueryProcess)) + + ri, err := analyzedQueryProcess.RowIter(s.sqlCtx, nil) + if err != nil { + return err + } + + for { + _, err = ri.Next() + if err != nil { + return ri.Close(s.sqlCtx) + } + } +} + +// getInsertNode returns the sql.Node to be iterated on given the import option. +func (s *sqlEngineMover) getInsertNode(inputChannel chan sql.Row) (sql.Node, error) { + switch s.importOption { + case CreateOp, ReplaceOp: + return s.createInsertImportNode(inputChannel, s.contOnErr, false, nil) // contonerr translates to ignore + case UpdateOp: + return s.createInsertImportNode(inputChannel, s.contOnErr, false, generateOnDuplicateKeyExpressions(s.wrSch)) // contonerr translates to ignore + default: + return nil, fmt.Errorf("unsupported import type") + } +} + +// createInsertImportNode creates the relevant/analyzed insert node given the import option. This insert node is wrapped +// with an error handler. +func (s *sqlEngineMover) createInsertImportNode(source chan sql.Row, ignore bool, replace bool, onDuplicateExpression []sql.Expression) (sql.Node, error) { + src := NewChannelRowSource(s.wrSch, source) + dest := plan.NewUnresolvedTable(s.tableName, s.database) + + colNames := make([]string, 0) + for _, col := range s.wrSch { + colNames = append(colNames, col.Name) + } + + insert := plan.NewInsertInto(sql.UnresolvedDatabase(s.database), dest, src, replace, colNames, onDuplicateExpression, ignore) + analyzed, err := s.se.Analyze(s.sqlCtx, insert) + if err != nil { + return nil, err + } + + analyzed = analyzer.StripQueryProcess(analyzed) + + // Get the first insert (wrapped with the error handler) + plan.Inspect(analyzed, func(node sql.Node) bool { + switch n := node.(type) { + case *plan.InsertInto: + analyzed = n + return false + default: + return true + } + }) + + return analyzed, nil +} + +// generateOnDuplicateKeyExpressions generates the duplicate key expressions needed for the update import option. +func generateOnDuplicateKeyExpressions(sch sql.Schema) []sql.Expression { + ret := make([]sql.Expression, len(sch)) + for i, col := range sch { + columnExpression := expression.NewUnresolvedColumn(col.Name) + functionExpression := expression.NewUnresolvedFunction("values", false, nil, expression.NewUnresolvedColumn(col.Name)) + ret[i] = expression.NewSetField(columnExpression, functionExpression) + } + + return ret +} diff --git a/go/libraries/doltcore/rowconv/row_converter.go b/go/libraries/doltcore/rowconv/row_converter.go index b5d63c1a7c..480213b3f7 100644 --- a/go/libraries/doltcore/rowconv/row_converter.go +++ b/go/libraries/doltcore/rowconv/row_converter.go @@ -82,58 +82,6 @@ func NewRowConverter(ctx context.Context, vrw types.ValueReadWriter, mapping *Fi return &RowConverter{mapping, false, convFuncs}, nil } -// NewImportRowConverter creates a row converter from a given FieldMapping specifically for importing. -func NewImportRowConverter(ctx context.Context, vrw types.ValueReadWriter, mapping *FieldMapping) (*RowConverter, error) { - if nec, err := isNecessary(mapping.SrcSch, mapping.DestSch, mapping.SrcToDest); err != nil { - return nil, err - } else if !nec { - return newIdentityConverter(mapping), nil - } - - convFuncs := make(map[uint64]types.MarshalCallback, len(mapping.SrcToDest)) - for srcTag, destTag := range mapping.SrcToDest { - destCol, destOk := mapping.DestSch.GetAllCols().GetByTag(destTag) - srcCol, srcOk := mapping.SrcSch.GetAllCols().GetByTag(srcTag) - - if !destOk || !srcOk { - return nil, fmt.Errorf("Could not find column being mapped. src tag: %d, dest tag: %d", srcTag, destTag) - } - - if srcCol.TypeInfo.Equals(destCol.TypeInfo) { - convFuncs[srcTag] = func(v types.Value) (types.Value, error) { - return v, nil - } - } - if typeinfo.IsStringType(destCol.TypeInfo) { - convFuncs[srcTag] = func(v types.Value) (types.Value, error) { - val, err := srcCol.TypeInfo.FormatValue(v) - if err != nil { - return nil, err - } - if val == nil { - return types.NullValue, nil - } - return types.String(*val), nil - } - } else if destCol.TypeInfo.Equals(typeinfo.PseudoBoolType) || destCol.TypeInfo.Equals(typeinfo.Int8Type) { - // BIT(1) and BOOLEAN (MySQL alias for TINYINT or Int8) are both logical stand-ins for a bool type - convFuncs[srcTag] = func(v types.Value) (types.Value, error) { - intermediateVal, err := typeinfo.Convert(ctx, vrw, v, srcCol.TypeInfo, typeinfo.BoolType) - if err != nil { - return nil, err - } - return typeinfo.Convert(ctx, vrw, intermediateVal, typeinfo.BoolType, destCol.TypeInfo) - } - } else { - convFuncs[srcTag] = func(v types.Value) (types.Value, error) { - return typeinfo.Convert(ctx, vrw, v, srcCol.TypeInfo, destCol.TypeInfo) - } - } - } - - return &RowConverter{mapping, false, convFuncs}, nil -} - // Convert takes a row maps its columns to their destination columns, and performs any type conversion needed to create // a row of the expected destination schema. func (rc *RowConverter) Convert(inRow row.Row) (row.Row, error) { diff --git a/go/libraries/doltcore/rowconv/row_converter_test.go b/go/libraries/doltcore/rowconv/row_converter_test.go index 10c7f46736..c8c6ff8692 100644 --- a/go/libraries/doltcore/rowconv/row_converter_test.go +++ b/go/libraries/doltcore/rowconv/row_converter_test.go @@ -20,13 +20,10 @@ import ( "time" "github.com/google/uuid" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/dolthub/dolt/go/libraries/doltcore/row" "github.com/dolthub/dolt/go/libraries/doltcore/schema" - "github.com/dolthub/dolt/go/libraries/doltcore/schema/typeinfo" - "github.com/dolthub/dolt/go/libraries/doltcore/table/untyped" "github.com/dolthub/dolt/go/store/types" ) @@ -103,42 +100,3 @@ func TestUnneccessaryConversion(t *testing.T) { t.Fatal("expected identity converter") } } - -func TestSpecialBoolHandling(t *testing.T) { - col1, err := schema.NewColumnWithTypeInfo("pk", 0, typeinfo.Int64Type, true, "", false, "") - require.NoError(t, err) - col2, err := schema.NewColumnWithTypeInfo("v", 1, typeinfo.PseudoBoolType, false, "", false, "") - require.NoError(t, err) - colColl := schema.NewColCollection(col1, col2) - sch, err := schema.SchemaFromCols(colColl) - require.NoError(t, err) - untypedSch, err := untyped.UntypeSchema(sch) - require.NoError(t, err) - - mapping, err := TagMapping(untypedSch, sch) - require.NoError(t, err) - vrw := types.NewMemoryValueStore() - rconv, err := NewImportRowConverter(context.Background(), vrw, mapping) - require.NoError(t, err) - inRow, err := row.New(vrw.Format(), untypedSch, row.TaggedValues{ - 0: types.String("76"), - 1: types.String("true"), - }) - require.NoError(t, err) - outData, err := rconv.Convert(inRow) - require.NoError(t, err) - require.NotNil(t, outData) - - expected, err := row.New(vrw.Format(), mapping.DestSch, row.TaggedValues{ - 0: types.Int(76), - 1: types.Uint(1), - }) - require.NoError(t, err) - assert.True(t, row.AreEqual(outData, expected, mapping.DestSch)) - - rconvNoHandle, err := NewRowConverter(context.Background(), vrw, mapping) - require.NoError(t, err) - results, errStr := rconvNoHandle.Convert(inRow) - assert.Nil(t, results) - assert.NotEmpty(t, errStr) -} diff --git a/go/libraries/doltcore/sqle/sqlddl_test.go b/go/libraries/doltcore/sqle/sqlddl_test.go index debcee17a1..125c102165 100644 --- a/go/libraries/doltcore/sqle/sqlddl_test.go +++ b/go/libraries/doltcore/sqle/sqlddl_test.go @@ -1519,7 +1519,7 @@ INSERT INTO child_non_unq VALUES ('1', 1), ('2', NULL), ('3', 3), ('4', 3), ('5' require.NoError(t, err) _, err = ExecuteSql(t, dEnv, root, "INSERT INTO child_unq VALUES ('6', 5)") if assert.Error(t, err) { - assert.True(t, sql.ErrUniqueKeyViolation.Is(err)) + assert.True(t, sql.ErrUniqueKeyViolation.Is(err.(sql.WrappedInsertError).Cause)) } root, err = ExecuteSql(t, dEnv, root, "INSERT INTO child_non_unq VALUES ('6', 5)") require.NoError(t, err) diff --git a/go/libraries/doltcore/sqle/sqlutil/sql_row.go b/go/libraries/doltcore/sqle/sqlutil/sql_row.go index c59eb508e0..df0a4a9149 100644 --- a/go/libraries/doltcore/sqle/sqlutil/sql_row.go +++ b/go/libraries/doltcore/sqle/sqlutil/sql_row.go @@ -66,6 +66,10 @@ func MapToSqlIter(ctx context.Context, sch schema.Schema, data types.Map) (sql.R // DoltRowToSqlRow constructs a go-mysql-server sql.Row from a Dolt row.Row. func DoltRowToSqlRow(doltRow row.Row, sch schema.Schema) (sql.Row, error) { + if doltRow == nil { + return nil, nil + } + colVals := make(sql.Row, sch.GetAllCols().Size()) i := 0 diff --git a/go/libraries/doltcore/table/pipeline/errors.go b/go/libraries/doltcore/table/pipeline/errors.go index 6bf495aa09..924921248c 100644 --- a/go/libraries/doltcore/table/pipeline/errors.go +++ b/go/libraries/doltcore/table/pipeline/errors.go @@ -15,6 +15,8 @@ package pipeline import ( + "github.com/dolthub/go-mysql-server/sql" + "github.com/dolthub/dolt/go/libraries/doltcore/row" ) @@ -22,13 +24,14 @@ import ( // failed and some details of the error type TransformRowFailure struct { Row row.Row + SqlRow sql.Row TransformName string Details string } // Error returns a string containing details of the error that occurred func (trf *TransformRowFailure) Error() string { - return trf.TransformName + " failed processing" + return trf.TransformName + " failed processing due to: " + trf.Details } // IsTransformFailure will return true if the error is an instance of a TransformRowFailure @@ -61,6 +64,17 @@ func GetTransFailureRow(err error) row.Row { } +// GetTransFailureRow extracts the row that failed from an error that is an instance of a TransformRowFailure +func GetTransFailureSqlRow(err error) sql.Row { + trf, ok := err.(*TransformRowFailure) + + if !ok { + panic("Verify error using IsTransformFailure before calling this.") + } + + return trf.SqlRow +} + // GetTransFailureDetails extracts the details string from an error that is an instance of a TransformRowFailure func GetTransFailureDetails(err error) string { trf, ok := err.(*TransformRowFailure) diff --git a/go/libraries/doltcore/table/pipeline/errors_test.go b/go/libraries/doltcore/table/pipeline/errors_test.go index 4d549195b4..5fd5ebcc34 100644 --- a/go/libraries/doltcore/table/pipeline/errors_test.go +++ b/go/libraries/doltcore/table/pipeline/errors_test.go @@ -30,7 +30,7 @@ func TestTransformRowFailure(t *testing.T) { assert.NoError(t, err) - err = &TransformRowFailure{r, "transform_name", "details"} + err = &TransformRowFailure{r, nil, "transform_name", "details"} if !IsTransformFailure(err) { t.Error("should be transform failure") diff --git a/go/libraries/doltcore/table/pipeline/procfunc_help.go b/go/libraries/doltcore/table/pipeline/procfunc_help.go index 88027d83a0..494e13193b 100644 --- a/go/libraries/doltcore/table/pipeline/procfunc_help.go +++ b/go/libraries/doltcore/table/pipeline/procfunc_help.go @@ -54,7 +54,7 @@ func ProcFuncForSourceFunc(sourceFunc SourceFunc) InFunc { return } } else if table.IsBadRow(err) { - badRowChan <- &TransformRowFailure{table.GetBadRowRow(err), "reader", err.Error()} + badRowChan <- &TransformRowFailure{table.GetBadRowRow(err), nil, "reader", err.Error()} } else { p.StopWithErr(err) return @@ -106,7 +106,7 @@ func ProcFuncForSinkFunc(sinkFunc SinkFunc) OutFunc { sql.ErrPrimaryKeyViolation.Is(err) || sql.ErrUniqueKeyViolation.Is(err) || errors.Is(err, editor.ErrDuplicateKey) { - badRowChan <- &TransformRowFailure{r.Row, "writer", err.Error()} + badRowChan <- &TransformRowFailure{r.Row, nil, "writer", err.Error()} } else { p.StopWithErr(err) return diff --git a/go/libraries/doltcore/table/pipeline/transform.go b/go/libraries/doltcore/table/pipeline/transform.go index 80304c4c99..36f967306f 100644 --- a/go/libraries/doltcore/table/pipeline/transform.go +++ b/go/libraries/doltcore/table/pipeline/transform.go @@ -88,7 +88,7 @@ func newRowTransformer(name string, transRowFunc TransformRowFunc) TransformFunc } if badRowDetails != "" { - badRowChan <- &TransformRowFailure{r.Row, name, badRowDetails} + badRowChan <- &TransformRowFailure{r.Row, nil, name, badRowDetails} } } else { return diff --git a/integration-tests/bats/default-values.bats b/integration-tests/bats/default-values.bats index b6406ca111..ca06917516 100644 --- a/integration-tests/bats/default-values.bats +++ b/integration-tests/bats/default-values.bats @@ -454,3 +454,36 @@ teardown() { run dolt sql -q "ALTER TABLE test ADD COLUMN v1 BIGINT DEFAULT (pk) AFTER v3" [ "$status" -eq "1" ] } + +@test "default-values: Import with default values" { + dolt sql -q "CREATE TABLE test(pk BIGINT PRIMARY KEY, v1 BIGINT DEFAULT 2 NOT NULL, v2 int)" + dolt sql -q "INSERT INTO test (pk, v2) VALUES (1, 3), (2, 4)" + run dolt sql -q "SELECT * FROM test" -r=csv + [ "$status" -eq "0" ] + [[ "$output" =~ "pk,v1" ]] || false + [[ "$output" =~ "1,2,3" ]] || false + [[ "$output" =~ "2,2,4" ]] || false + [[ "${#lines[@]}" = "3" ]] || false + + echo -e 'pk,v2\n3,5\n4,6'| dolt table import -u test + + run dolt sql -q "SELECT * FROM test" -r=csv + [ "$status" -eq "0" ] + [[ "$output" =~ "pk,v1" ]] || false + [[ "$output" =~ "1,2,3" ]] || false + [[ "$output" =~ "2,2,4" ]] || false + [[ "$output" =~ "3,2,5" ]] || false + [[ "$output" =~ "4,2,6" ]] || false + [[ "${#lines[@]}" = "5" ]] || false + + # validate that nulls are not accepted for a not null import + cat < bad-update.csv +pk,v1,v2 +5,,5 +6,,6 +DELIM + run dolt table import -u test bad-update.csv + [ "$status" -eq "1" ] + [[ "$output" =~ "Bad Row: [5,,5]" ]] || false + [[ "$output" =~ "column name 'v1' is non-nullable but attempted to set a value of null" ]] || false +} diff --git a/integration-tests/bats/import-create-tables.bats b/integration-tests/bats/import-create-tables.bats index 10fe47ad94..a3bbcc6971 100755 --- a/integration-tests/bats/import-create-tables.bats +++ b/integration-tests/bats/import-create-tables.bats @@ -91,7 +91,8 @@ teardown() { @test "import-create-tables: create a table with json import. bad schema." { run dolt table import -c -s `batshelper employees-sch-bad.sql` employees `batshelper employees-tbl.json` [ "$status" -eq 1 ] - [[ "$output" =~ "Error determining the output schema" ]] || false + [[ "$output" =~ "Error creating reader for json file" ]] || false + [[ "$output" =~ "employees-tbl.json" ]] || false [[ "$output" =~ "employees-sch-bad.sql" ]] || false } @@ -159,13 +160,9 @@ DELIM run dolt table import -c --pk=id fktest 1pk5col-ints.csv [ "$status" -eq 1 ] [[ "$output" =~ "fktest already exists. Use -f to overwrite." ]] || false - run dolt table import -f -c --pk=pk fktest other.csv - [ "$status" -eq 0 ] - [[ "$output" =~ "Import completed successfully." ]] || false - run dolt schema show fktest - skip "cannot overwrite a table with foreign key constraints" - [ "$status" -eq 0 ] - [[ ! "$output" =~ "FOREIGN KEY" ]] || false + run dolt table import -c --pk=pk test 1pk5col-ints.csv -f + [ "$status" -eq 1 ] + [[ "$output" =~ 'since it is referenced from table `fktest`' ]] || false } @test "import-create-tables: try to create a table with a bad csv" { diff --git a/integration-tests/bats/import-update-tables.bats b/integration-tests/bats/import-update-tables.bats index b7791a7df0..c2a92f9c0f 100644 --- a/integration-tests/bats/import-update-tables.bats +++ b/integration-tests/bats/import-update-tables.bats @@ -28,6 +28,11 @@ SQL pk,c1,c2,c3,c4,c5 0,1,2,3,4,5 1,1,2,3,4,5 +DELIM + + cat < 1pk5col-ints-updt.csv +pk,c1,c2,c3,c4,c5 +0,1,2,3,4,6 DELIM cat < employees-sch.sql @@ -42,6 +47,14 @@ CREATE TABLE employees ( ); SQL + cat < check-constraint-sch.sql +CREATE TABLE persons ( + ID int NOT NULL, + LastName varchar(255) NOT NULL, + FirstName varchar(255), + Age int CHECK (Age>=18) +); +SQL } teardown() { @@ -58,6 +71,18 @@ teardown() { # Validate that a successful import with no bad rows does not print the following ! [[ "$output" =~ "The following rows were skipped:" ]] || false + + # Run again to get correct Had No Effect amount + run dolt table import -u test `batshelper 1pk5col-ints.csv` + [ "$status" -eq 0 ] + [[ "$output" =~ "Rows Processed: 2, Additions: 0, Modifications: 0, Had No Effect: 2" ]] || false + [[ "$output" =~ "Import completed successfully." ]] || false + + # Run another update for the correct modification amount + run dolt table import -u test 1pk5col-ints-updt.csv + [ "$status" -eq 0 ] + [[ "$output" =~ "Rows Processed: 1, Additions: 0, Modifications: 1, Had No Effect: 0" ]] || false + [[ "$output" =~ "Import completed successfully." ]] || false } @test "import-update-tables: update table using csv with null (blank) values" { @@ -194,7 +219,7 @@ SQL [[ "${lines[6]}" =~ "end date" ]] || false } -@test "import-update-tables: update table with incorrect length char throws bad row error" { +@test "import-update-tables: updating table by inputting string longer than char column throws an error" { cat < 1pk1col-rpt-chars.csv pk,c 1,"123456" @@ -204,11 +229,11 @@ DELIM run dolt table import -u test 1pk1col-rpt-chars.csv [ "$status" -eq 1 ] [[ "$output" =~ "A bad row was encountered while moving data" ]] || false - [[ "$output" =~ "Bad Row:" ]] || false - [[ "$output" =~ '"123456" is not valid for column "c" (type "CHAR(5)")' ]] || false + [[ "$output" =~ "Bad Row: [1,123456]" ]] || false + [[ "$output" =~ 'string is too large for column' ]] || false } -@test "import-update-tables: update table with repeat pk in csv throws error" { +@test "import-update-tables: update table with repeat pk in csv does not throw an error" { cat < 1pk5col-rpt-ints.csv pk,c1,c2,c3,c4,c5 1,1,2,3,4,5 @@ -217,48 +242,47 @@ DELIM dolt sql < 1pk5col-ints-sch.sql run dolt table import -u test 1pk5col-rpt-ints.csv - [ "$status" -eq 1 ] - [[ "$output" =~ "A bad row was encountered while moving data" ]] || false - [[ "$output" =~ "Bad Row: c4:4 | pk:1 | c3:3 | c5:5 | c1:1 | c2:2" ]] || false - - # Works with --continue - run dolt table import -u --continue test 1pk5col-rpt-ints.csv [ "$status" -eq 0 ] - [[ "$output" =~ "The following rows were skipped:" ]] || false - [[ "$output" =~ "1,1,2,3,4,5" ]] || false - [[ "$output" =~ "Rows Processed: 1, Additions: 1, Modifications: 0, Had No Effect: 0" ]] || false - [[ "$output" =~ "Lines skipped: 1" ]] || false [[ "$output" =~ "Import completed successfully." ]] || false + + run dolt sql -r csv -q "select * from test" + [ "${#lines[@]}" -eq 2 ] + [[ "${lines[1]}" =~ "1,1,2,3,4,5" ]] || false } @test "import-update-tables: importing into new table renders bad rows" { - cat < 1pk5col-rpt-ints.csv -pk,c1,c2,c3,c4,c5 -1,1,2,3,4,5 -1,1,2,3,4,7 -1,1,2,3,4,8 + cat < persons.csv +ID,LastName,FirstName,Age +1,"jon","doe", 20 +2,"little","doe", 10 +3,"little","doe",4 +4,"little","doe",1 DELIM - dolt sql < 1pk5col-ints-sch.sql - run dolt table import -u --continue test 1pk5col-rpt-ints.csv + dolt sql < check-constraint-sch.sql + run dolt table import -u persons persons.csv + [ "$status" -eq 1 ] + + [[ "$output" =~ "A bad row was encountered while moving data" ]] || false + [[ "$output" =~ "Bad Row:" ]] || false + [[ "$output" =~ "[2,little,doe,10]" ]] || false + + run dolt table import -u --continue persons persons.csv [ "$status" -eq 0 ] [[ "$output" =~ "The following rows were skipped:" ]] || false - [[ "$output" =~ "1,1,2,3,4,7" ]] || false - [[ "$output" =~ "1,1,2,3,4,8" ]] || false + [[ "$output" =~ "[2,little,doe,10]" ]] || false + [[ "$output" =~ "[3,little,doe,4]" ]] || false + [[ "$output" =~ "[4,little,doe,1]" ]] || false [[ "$output" =~ "Rows Processed: 1, Additions: 1, Modifications: 0, Had No Effect: 0" ]] || false - [[ "$output" =~ "Lines skipped: 2" ]] || false [[ "$output" =~ "Import completed successfully." ]] || false - # Output to a file from the error stderr - dolt sql -q "DELETE FROM test WHERE pk = 1" - run dolt table import -u --continue test 1pk5col-rpt-ints.csv - [ "$status" -eq 0 ] - [[ "$output" =~ "The following rows were skipped:" ]] || false - [[ "$output" =~ "1,1,2,3,4,7" ]] || false - [[ "$output" =~ "1,1,2,3,4,8" ]] || false + run dolt sql -r csv -q "select * from persons" + [ "${#lines[@]}" -eq 2 ] + [[ "$output" =~ "ID,LastName,FirstName,Age" ]] || false + [[ "$output" =~ "1,jon,doe,20" ]] || false } -@test "import-update-tables: subsequent runs of same import with duplicate keys produces no modifications" { +@test "import-update-tables: subsequent runs of same import with duplicate keys produces no difference in final data" { cat < 1pk5col-rpt-ints.csv pk,c1,c2,c3,c4,c5 1,1,2,3,4,5 @@ -268,10 +292,15 @@ DELIM dolt sql < 1pk5col-ints-sch.sql dolt table import -u --continue test 1pk5col-rpt-ints.csv + dolt commit -am "cm1" + run dolt table import -u --continue test 1pk5col-rpt-ints.csv [ "$status" -eq 0 ] - skip "Running this file on repeat produces modifications" - ! [[ "$output" =~ "Modifications: 2" ]] || falsa + [[ "$output" =~ "Modifications: 3" ]] || falsa + + run dolt diff + [ "$status" -eq 0 ] + [ "${#lines[@]}" -eq 0 ] } @test "import-update-tables: importing some columns does not overwrite columns not part of the import" { @@ -285,3 +314,23 @@ DELIM [ "$status" -eq 0 ] [[ "$output" =~ "$EXPECTED" ]] || false } + +@test "import-update-tables: poorly written file correctly errors" { + cat < bad-updates.csv +pk,v1 +5,5, +6,6, +DELIM + + dolt sql -q "CREATE TABLE test(pk BIGINT PRIMARY KEY, v1 BIGINT DEFAULT 2 NOT NULL, v2 int)" + dolt sql -q "INSERT INTO test (pk, v1, v2) VALUES (1, 2, 3), (2, 3, 4)" + + run dolt table import -u test bad-updates.csv + [ "$status" -eq 1 ] + [[ "$output" =~ "A bad row was encountered while moving data" ]] || false + [[ "$output" =~ "csv reader's schema expects 2 fields, but line only has 3 values" ]] || false + + run dolt table import -u --continue test bad-updates.csv + [ "$status" -eq 0 ] + [[ "$output" =~ "Lines skipped: 2" ]] || false +} \ No newline at end of file diff --git a/integration-tests/bats/index.bats b/integration-tests/bats/index.bats index 27977239d3..307efc7883 100644 --- a/integration-tests/bats/index.bats +++ b/integration-tests/bats/index.bats @@ -2112,13 +2112,6 @@ SQL [[ "$output" =~ "pk1" ]] || false [[ "$output" =~ "5" ]] || false [[ "${#lines[@]}" = "2" ]] || false - dolt sql <