Move Import Write Path to Engine (#2426)

This commit is contained in:
Vinai Rachakonda
2021-11-26 15:42:26 -08:00
committed by GitHub
parent 63665d0db0
commit 791a128fa3
20 changed files with 869 additions and 262 deletions

View File

@@ -54,6 +54,7 @@ func NewSqlEngine(
format PrintResultFormat,
initialDb string,
autocommit bool) (*SqlEngine, error) {
au := new(auth.None)
parallelism := runtime.GOMAXPROCS(0)
@@ -68,8 +69,7 @@ func NewSqlEngine(
pro := dsqle.NewDoltDatabaseProvider(mrEnv.Config(), mrEnv.FileSystem(), all...)
engine := gms.New(analyzer.NewBuilder(pro).WithParallelism(
parallelism).Build(), &gms.Config{Auth: au})
engine := gms.New(analyzer.NewBuilder(pro).WithParallelism(parallelism).Build(), &gms.Config{Auth: au})
if dbg, ok := os.LookupEnv("DOLT_SQL_DEBUG_LOG"); ok && strings.ToLower(dbg) == "true" {
engine.Analyzer.Debug = true

View File

@@ -16,13 +16,15 @@ package tblcmds
import (
"context"
"errors"
"fmt"
"io"
"os"
"strings"
"sync/atomic"
"github.com/dolthub/go-mysql-server/sql"
"github.com/fatih/color"
"golang.org/x/sync/errgroup"
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/cmd/dolt/commands"
@@ -32,11 +34,12 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/mvdata"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/rowconv"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/libraries/doltcore/table"
"github.com/dolthub/dolt/go/libraries/doltcore/table/editor"
"github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms"
"github.com/dolthub/dolt/go/libraries/doltcore/table/pipeline"
"github.com/dolthub/dolt/go/libraries/utils/argparser"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/libraries/utils/funcitr"
@@ -87,17 +90,8 @@ In create, update, and replace scenarios the file's extension is used to infer t
},
}
type tableImportOp string
const (
CreateOp tableImportOp = "overwrite"
ReplaceOp tableImportOp = "replace"
UpdateOp tableImportOp = "update"
InvalidOp tableImportOp = "invalid"
)
type importOptions struct {
operation tableImportOp
operation mvdata.TableImportOp
tableName string
contOnErr bool
force bool
@@ -136,7 +130,7 @@ func (m importOptions) FloatThreshold() float64 {
}
func (m importOptions) checkOverwrite(ctx context.Context, root *doltdb.RootValue, fs filesys.ReadableFS) (bool, error) {
if !m.force && m.operation == CreateOp {
if !m.force && m.operation == mvdata.CreateOp {
return m.dest.Exists(ctx, root, fs)
}
return false, nil
@@ -208,17 +202,17 @@ func getImportMoveOptions(ctx context.Context, apr *argparser.ArgParseResults, d
}
}
var moveOp tableImportOp
var moveOp mvdata.TableImportOp
switch {
case apr.Contains(createParam):
moveOp = CreateOp
moveOp = mvdata.CreateOp
case apr.Contains(replaceParam):
moveOp = ReplaceOp
moveOp = mvdata.ReplaceOp
default:
moveOp = UpdateOp
moveOp = mvdata.UpdateOp
}
if moveOp != CreateOp {
if moveOp != mvdata.CreateOp {
root, err := dEnv.WorkingRoot(ctx)
if err != nil {
return nil, errhand.VerboseErrorFromError(err)
@@ -383,26 +377,54 @@ func (cmd ImportCmd) Exec(ctx context.Context, commandStr string, args []string,
return commands.HandleVErrAndExitCode(verr, usage)
}
mover, nDMErr := newImportDataMover(ctx, root, dEnv, mvOpts, importStatsCB)
rd, nDMErr := newImportDataReader(ctx, root, dEnv, mvOpts)
if nDMErr != nil {
verr = newDataMoverErrToVerr(mvOpts, nDMErr)
return commands.HandleVErrAndExitCode(verr, usage)
}
skipped, verr := mvdata.MoveData(ctx, dEnv, mover, mvOpts)
wrSch, nDMErr := getWriterSchema(ctx, root, dEnv, rd.GetSchema(), mvOpts)
if nDMErr != nil {
verr = newDataMoverErrToVerr(mvOpts, nDMErr)
return commands.HandleVErrAndExitCode(verr, usage)
}
wr, nDMErr := newImportDataWriter(ctx, dEnv, wrSch, mvOpts)
if nDMErr != nil {
verr = newDataMoverErrToVerr(mvOpts, nDMErr)
return commands.HandleVErrAndExitCode(verr, usage)
}
skipped, err := move(ctx, rd, wr, mvOpts)
if err != nil {
if pipeline.IsTransformFailure(err) {
bdr := errhand.BuildDError("\nA bad row was encountered while moving data.")
r := pipeline.GetTransFailureSqlRow(err)
if r != nil {
bdr.AddDetails("Bad Row: " + sql.FormatRow(r))
}
details := pipeline.GetTransFailureDetails(err)
bdr.AddDetails(details)
bdr.AddDetails("These can be ignored using '--continue'")
return commands.HandleVErrAndExitCode(bdr.Build(), usage)
}
verr = errhand.BuildDError("An error occurred moving data:\n").AddCause(err).Build()
return commands.HandleVErrAndExitCode(verr, usage)
}
cli.PrintErrln()
if skipped > 0 {
cli.PrintErrln(color.YellowString("Lines skipped: %d", skipped))
}
if verr == nil {
cli.PrintErrln(color.CyanString("Import completed successfully."))
}
cli.PrintErrln(color.CyanString("Import completed successfully."))
return commands.HandleVErrAndExitCode(verr, usage)
return 0
}
var displayStrLen int
@@ -414,9 +436,10 @@ func importStatsCB(stats types.AppliedEditStats) {
displayStrLen = cli.DeleteAndPrint(displayStrLen, displayStr)
}
func newImportDataMover(ctx context.Context, root *doltdb.RootValue, dEnv *env.DoltEnv, impOpts *importOptions, statsCB noms.StatsCB) (*mvdata.DataMover, *mvdata.DataMoverCreationError) {
func newImportDataReader(ctx context.Context, root *doltdb.RootValue, dEnv *env.DoltEnv, impOpts *importOptions) (table.TableReadCloser, *mvdata.DataMoverCreationError) {
var err error
// Checks whether import destination table already exists. This can probably be simplified to not need a root value...
ow, err := impOpts.checkOverwrite(ctx, root, dEnv.FS)
if err != nil {
return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.CreateReaderErr, Cause: err}
@@ -425,83 +448,156 @@ func newImportDataMover(ctx context.Context, root *doltdb.RootValue, dEnv *env.D
return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.CreateReaderErr, Cause: fmt.Errorf("%s already exists. Use -f to overwrite.", impOpts.DestName())}
}
wrSch, dmce := getImportSchema(ctx, root, dEnv.FS, impOpts)
if dmce != nil {
return nil, dmce
}
rd, srcIsSorted, err := impOpts.src.NewReader(ctx, root, dEnv.FS, impOpts.srcOptions)
rd, _, err := impOpts.src.NewReader(ctx, root, dEnv.FS, impOpts.srcOptions)
if err != nil {
return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.CreateReaderErr, Cause: err}
}
defer func() {
if rd != nil {
rd.Close(ctx)
}
}()
return rd, nil
}
err = wrSch.GetPKCols().Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
preImage := impOpts.nameMapper.PreImage(col.Name)
_, found := rd.GetSchema().GetAllCols().GetByName(preImage)
func getWriterSchema(ctx context.Context, root *doltdb.RootValue, dEnv *env.DoltEnv, rdSchema schema.Schema, imOpts *importOptions) (schema.Schema, *mvdata.DataMoverCreationError) {
wrSch, dmce := getImportSchema(ctx, root, dEnv.FS, imOpts)
if dmce != nil {
return nil, dmce
}
err := wrSch.GetPKCols().Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
preImage := imOpts.nameMapper.PreImage(col.Name)
_, found := rdSchema.GetAllCols().GetByName(preImage)
if !found {
err = fmt.Errorf("input primary keys do not match primary keys of existing table")
}
return err == nil, err
})
if err != nil {
return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.SchemaErr, Cause: err}
}
transforms, fieldMapping, err := mvdata.NameMapTransform(ctx, root.VRW(), rd.GetSchema(), wrSch, impOpts.nameMapper)
if err != nil {
return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.CreateMapperErr, Cause: err}
}
// allow subsetting of the final write schema only if it is an update operation. Every other operation must
// match perfectly.
if wrSch.GetAllCols().Size() != rdSchema.GetAllCols().Size() && imOpts.operation == mvdata.UpdateOp {
ret := schema.NewColCollection()
// read tags will be the tags of read rows which come from the imported data. Being able to distinguish columns coming
// from the import data allows us to merge the data with existing rows
rdTags := make([]uint64, 0, len(fieldMapping.SrcToDest))
for _, tag := range fieldMapping.SrcToDest {
rdTags = append(rdTags, tag)
}
rdSchema.GetAllCols().Iter(func(tag uint64, col schema.Column) (stop bool, err error) {
wrColName := imOpts.nameMapper.Map(col.Name)
wrCol, ok := wrSch.GetAllCols().GetByName(wrColName)
if ok {
ret = ret.Append(wrCol)
}
bulkTeaf := editor.NewBulkImportTEAFactory(root.VRW().Format(), root.VRW(), dEnv.TempTableFilesDir())
opts := editor.Options{Deaf: bulkTeaf}
return false, nil
})
var wr table.TableWriteCloser
switch impOpts.operation {
case CreateOp:
filePath, err := dEnv.FS.Abs(impOpts.DestName())
newSch, err := schema.SchemaFromCols(ret)
if err != nil {
return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.CreateWriterErr, Cause: err}
return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.SchemaErr, Cause: err}
}
writer, err := dEnv.FS.OpenForWrite(filePath, os.ModePerm)
if err != nil {
return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.CreateWriterErr, Cause: err}
}
wr, err = impOpts.dest.NewCreatingWriter(ctx, impOpts, root, srcIsSorted, wrSch, statsCB, opts, writer)
case ReplaceOp:
wr, err = impOpts.dest.NewReplacingWriter(ctx, impOpts, root, srcIsSorted, wrSch, statsCB, opts)
case UpdateOp:
wr, err = impOpts.dest.NewUpdatingWriter(ctx, impOpts, root, srcIsSorted, wrSch, statsCB, rdTags, opts)
default:
err = errors.New("invalid move operation")
return newSch, nil
}
return wrSch, nil
}
func newImportDataWriter(ctx context.Context, dEnv *env.DoltEnv, wrSchema schema.Schema, imOpts *importOptions) (mvdata.DataWriter, *mvdata.DataMoverCreationError) {
moveOps := &mvdata.MoverOptions{Force: imOpts.force, TableToWriteTo: imOpts.tableName, ContinueOnErr: imOpts.contOnErr, Operation: imOpts.operation}
mv, err := mvdata.NewSqlEngineMover(ctx, dEnv, wrSchema, moveOps, importStatsCB)
if err != nil {
return nil, &mvdata.DataMoverCreationError{ErrType: mvdata.CreateWriterErr, Cause: err}
}
imp := &mvdata.DataMover{
Rd: rd,
Transforms: transforms,
Wr: wr,
ContOnErr: impOpts.contOnErr,
}
rd = nil
return mv, nil
}
return imp, nil
func move(ctx context.Context, rd table.TableReadCloser, wr mvdata.DataWriter, options *importOptions) (int64, error) {
g, ctx := errgroup.WithContext(ctx)
// Setup the necessary data points for the import job
parsedRowChan := make(chan sql.Row)
var rowErr error
var printStarted bool
var badCount int64
badRowCB := func(trf *pipeline.TransformRowFailure) (quit bool) {
if !options.contOnErr {
rowErr = trf
return true
}
if !printStarted {
cli.PrintErrln("The following rows were skipped:")
printStarted = true
}
r := pipeline.GetTransFailureSqlRow(trf)
if r != nil {
cli.PrintErr(sql.FormatRow(r))
}
atomic.AddInt64(&badCount, 1)
return false
}
// Start the group that reads rows from the reader
g.Go(func() error {
defer close(parsedRowChan)
for {
r, err := rd.ReadRow(ctx)
if err != nil {
if err == io.EOF {
return nil
} else if table.IsBadRow(err) {
dRow, _ := sqlutil.DoltRowToSqlRow(r, rd.GetSchema())
trf := &pipeline.TransformRowFailure{Row: nil, SqlRow: dRow, TransformName: "reader", Details: err.Error()}
quit := badRowCB(trf)
if quit {
return trf
}
} else {
return err
}
} else {
dRow, err := transformToDoltRow(r, rd.GetSchema(), wr.Schema(), options.nameMapper)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case parsedRowChan <- dRow:
}
}
}
})
// Start the group that writes rows
g.Go(func() error {
err := wr.WriteRows(ctx, parsedRowChan, badRowCB)
if err != nil {
return err
}
return nil
})
err := g.Wait()
if err != nil && err != io.EOF {
return badCount, err
}
if rowErr != nil {
return badCount, rowErr
}
err = wr.Commit(ctx)
if err != nil {
return badCount, err
}
return badCount, nil
}
func getImportSchema(ctx context.Context, root *doltdb.RootValue, fs filesys.Filesys, impOpts *importOptions) (schema.Schema, *mvdata.DataMoverCreationError) {
@@ -519,7 +615,7 @@ func getImportSchema(ctx context.Context, root *doltdb.RootValue, fs filesys.Fil
return out, nil
}
if impOpts.operation == CreateOp {
if impOpts.operation == mvdata.CreateOp {
if impOpts.srcIsStream() {
// todo: capture stream data to file so we can use schema inferrence
return nil, nil
@@ -607,3 +703,87 @@ func newDataMoverErrToVerr(mvOpts *importOptions, err *mvdata.DataMoverCreationE
panic("Unhandled Error type")
}
// transformToDoltRow does 1) Convert to a sql.Row 2) Matches the read and write schema with subsetting and name matching.
// 3) Addresses any type inconsistencies.
func transformToDoltRow(row row.Row, rdSchema schema.Schema, wrSchema sql.Schema, nameMapper rowconv.NameMapper) (sql.Row, error) {
doltRow, err := sqlutil.DoltRowToSqlRow(row, rdSchema)
if err != nil {
return nil, err
}
for i, col := range wrSchema {
switch col.Type {
case sql.Boolean, sql.Int8, sql.MustCreateBitType(1): // TODO: noms bool wraps MustCreateBitType
val, ok := stringToBoolean(doltRow[i].(string))
if ok {
doltRow[i] = val
}
}
switch col.Type.(type) {
case sql.StringType:
default:
doltRow[i] = emptyStringToNil(doltRow[i])
}
}
rdSchemaAsDoltSchema, err := sqlutil.FromDoltSchema(wrSchema[0].Source, rdSchema)
if err != nil {
return nil, err
}
doltRow = matchReadSchemaToWriteSchema(doltRow, rdSchemaAsDoltSchema, wrSchema, nameMapper)
return doltRow, nil
}
func stringToBoolean(s string) (result bool, canConvert bool) {
lower := strings.ToLower(s)
switch lower {
case "true":
return true, true
case "false":
return false, true
case "0":
return false, true
case "1":
return true, false
default:
return false, false
}
}
func emptyStringToNil(val interface{}) interface{} {
if val == nil {
return val
}
if s, canConvert := val.(string); canConvert {
if s == "" {
return nil
}
}
return val
}
// matchReadSchemaToWriteSchema takes the read schema and accounts for subsetting and mapper (-m) differences.
func matchReadSchemaToWriteSchema(row sql.Row, rdSchema, wrSchema sql.Schema, nameMapper rowconv.NameMapper) sql.Row {
returnRow := sql.Row{}
for _, wCol := range wrSchema {
seen := false
for rIdx, rCol := range rdSchema {
if rCol.Name == nameMapper.PreImage(wCol.Name) {
returnRow = append(returnRow, row[rIdx])
seen = true
}
}
if !seen {
returnRow = append(returnRow, nil)
}
}
return returnRow
}

View File

@@ -19,7 +19,7 @@ require (
github.com/denisbrodbeck/machineid v1.0.1
github.com/dolthub/dolt/go/gen/proto/dolt/services/eventsapi v0.0.0-20201005193433-3ee972b1d078
github.com/dolthub/fslock v0.0.3
github.com/dolthub/go-mysql-server v0.11.1-0.20211123231205-91cc0c1ae82f
github.com/dolthub/go-mysql-server v0.11.1-0.20211126215923-37e939b19247
github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446
github.com/dolthub/mmap-go v1.0.4-0.20201107010347-f9f2a9588a66
github.com/dolthub/sqllogictest/go v0.0.0-20201107003712-816f3ae12d81

View File

@@ -173,8 +173,8 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
github.com/dolthub/go-mysql-server v0.11.1-0.20211123231205-91cc0c1ae82f h1:WqEdr++dbFB1a4cAlzLAg+em1sc4Nigb8UxSNlBXSSw=
github.com/dolthub/go-mysql-server v0.11.1-0.20211123231205-91cc0c1ae82f/go.mod h1:q1U4zfAUIVcpAAASqlcmGJ6DqZ2V3LiFseAm97TqJTY=
github.com/dolthub/go-mysql-server v0.11.1-0.20211126215923-37e939b19247 h1:8NuMyxbitJPTXh3qam7LOwvc1eKguUKDsxb+aKVCdWY=
github.com/dolthub/go-mysql-server v0.11.1-0.20211126215923-37e939b19247/go.mod h1:q1U4zfAUIVcpAAASqlcmGJ6DqZ2V3LiFseAm97TqJTY=
github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446 h1:0ol5pj+QlKUKAtqs1LiPM3ZJKs+rHPgLSsMXmhTrCAM=
github.com/dolthub/ishell v0.0.0-20210205014355-16a4ce758446/go.mod h1:dhGBqcCEfK5kuFmeO5+WOx3hqc1k3M29c1oS/R7N4ms=
github.com/dolthub/jsonpath v0.0.0-20210609232853-d49537a30474 h1:xTrR+l5l+1Lfq0NvhiEsctylXinUMFhhsqaEcl414p8=

View File

@@ -0,0 +1,98 @@
// Copyright 2020-2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvdata
import (
"fmt"
"io"
"github.com/dolthub/go-mysql-server/sql"
)
// ChannelRowSource is a sql.Node that wraps a channel as a sql.RowIter.
type ChannelRowSource struct {
schema sql.Schema
rowChannel chan sql.Row
}
// NewChannelRowSource returns a ChannelRowSource object.
func NewChannelRowSource(schema sql.Schema, rowChannel chan sql.Row) *ChannelRowSource {
return &ChannelRowSource{schema: schema, rowChannel: rowChannel}
}
var _ sql.Node = (*ChannelRowSource)(nil)
// Resolved implements the sql.Node interface.
func (c *ChannelRowSource) Resolved() bool {
return true
}
// String implements the sql.Node interface.
func (c *ChannelRowSource) String() string {
return fmt.Sprintf("ChannelRowSource()")
}
// Schema implements the sql.Node interface.
func (c *ChannelRowSource) Schema() sql.Schema {
return c.schema
}
// Children implements the sql.Node interface.
func (c *ChannelRowSource) Children() []sql.Node {
return nil
}
// RowIter implements the sql.Node interface.
func (c *ChannelRowSource) RowIter(ctx *sql.Context, row sql.Row) (sql.RowIter, error) {
return &channelRowIter{
rowChannel: c.rowChannel,
ctx: ctx,
}, nil
}
// WithChildren implements the sql.Node interface.
func (c *ChannelRowSource) WithChildren(children ...sql.Node) (sql.Node, error) {
if len(children) != 0 {
return nil, sql.ErrInvalidChildrenNumber.New(c, len(children), 0)
}
return c, nil
}
// channelRowIter wraps the channel under the sql.RowIter interface
type channelRowIter struct {
rowChannel chan sql.Row
ctx *sql.Context
}
var _ sql.RowIter = (*channelRowIter)(nil)
// Next implements the sql.RowIter interface.
func (c *channelRowIter) Next() (sql.Row, error) {
for r := range c.rowChannel {
select {
case <-c.ctx.Done():
return nil, io.EOF
default:
return r, nil
}
}
return nil, io.EOF
}
// Close implements the sql.RowIter interface.
func (c *channelRowIter) Close(context *sql.Context) error {
return nil
}

View File

@@ -22,22 +22,21 @@ import (
"fmt"
"sync/atomic"
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/libraries/doltcore/table/untyped/csv"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/cmd/dolt/errhand"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/env/actions"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/rowconv"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/libraries/doltcore/table"
"github.com/dolthub/dolt/go/libraries/doltcore/table/pipeline"
"github.com/dolthub/dolt/go/libraries/doltcore/table/untyped/csv"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/libraries/utils/set"
"github.com/dolthub/dolt/go/store/types"
)
type CsvOptions struct {
@@ -53,6 +52,13 @@ type JSONOptions struct {
SchFile string
}
type MoverOptions struct {
ContinueOnErr bool
Force bool
TableToWriteTo string
Operation TableImportOp
}
type DataMoverOptions interface {
WritesToTable() bool
SrcName() string
@@ -64,6 +70,12 @@ type DataMoverCloser interface {
Flush(context.Context) (*doltdb.RootValue, error)
}
type DataWriter interface {
WriteRows(ctx context.Context, inputChannel chan sql.Row, badRowCb func(*pipeline.TransformRowFailure) bool) error
Commit(ctx context.Context) error
Schema() sql.Schema
}
type DataMover struct {
Rd table.TableReadCloser
Transforms *pipeline.TransformCollection
@@ -262,29 +274,6 @@ func MoveData(ctx context.Context, dEnv *env.DoltEnv, mover *DataMover, mvOpts D
return badCount, nil
}
// NameMapTransform creates a pipeline transform that converts rows from inSch to outSch based on a name mapping.
func NameMapTransform(ctx context.Context, vrw types.ValueReadWriter, inSch schema.Schema, outSch schema.Schema, mapper rowconv.NameMapper) (*pipeline.TransformCollection, *rowconv.FieldMapping, error) {
mapping, err := rowconv.NameMapping(inSch, outSch, mapper)
if err != nil {
return nil, nil, err
}
rconv, err := rowconv.NewImportRowConverter(ctx, vrw, mapping)
if err != nil {
return nil, nil, err
}
transforms := pipeline.NewTransformCollection()
if !rconv.IdentityConverter {
nt := pipeline.NewNamedTransform("Mapping transform", pipeline.GetRowConvTransformFunc(rconv))
transforms.AppendTransforms(nt)
}
return transforms, mapping, nil
}
// SchAndTableNameFromFile reads a SQL schema file and creates a Dolt schema from it.
func SchAndTableNameFromFile(ctx context.Context, path string, fs filesys.ReadableFS, root *doltdb.RootValue) (string, schema.Schema, error) {
if path != "" {
@@ -352,3 +341,11 @@ func InferSchema(ctx context.Context, root *doltdb.RootValue, rd table.TableRead
return schema.SchemaFromCols(newCols)
}
type TableImportOp string
const (
CreateOp TableImportOp = "overwrite"
ReplaceOp TableImportOp = "replace"
UpdateOp TableImportOp = "update"
)

View File

@@ -0,0 +1,314 @@
// Copyright 2021 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvdata
import (
"context"
"fmt"
"io"
"sync/atomic"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/go-mysql-server/sql/analyzer"
"github.com/dolthub/go-mysql-server/sql/expression"
"github.com/dolthub/go-mysql-server/sql/plan"
"github.com/dolthub/dolt/go/cmd/dolt/commands/engine"
"github.com/dolthub/dolt/go/cmd/dolt/errhand"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/libraries/doltcore/table/pipeline"
"github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms"
"github.com/dolthub/dolt/go/store/types"
)
type sqlEngineMover struct {
se *engine.SqlEngine
sqlCtx *sql.Context
tableName string
database string
wrSch sql.Schema
contOnErr bool
force bool
statsCB noms.StatsCB
stats types.AppliedEditStats
statOps int32
importOption TableImportOp
}
func NewSqlEngineMover(ctx context.Context, dEnv *env.DoltEnv, writeSch schema.Schema, options *MoverOptions, statsCB noms.StatsCB) (*sqlEngineMover, error) {
mrEnv, err := env.DoltEnvAsMultiEnv(ctx, dEnv)
if err != nil {
return nil, err
}
// Choose the first DB as the current one. This will be the DB in the working dir if there was one there
var dbName string
mrEnv.Iter(func(name string, _ *env.DoltEnv) (stop bool, err error) {
dbName = name
return true, nil
})
se, err := engine.NewSqlEngine(ctx, mrEnv, engine.FormatCsv, dbName, false)
if err != nil {
return nil, err
}
sqlCtx, err := se.NewContext(ctx)
if err != nil {
return nil, err
}
dsess.DSessFromSess(sqlCtx.Session).EnableBatchedMode()
err = sqlCtx.Session.SetSessionVariable(sqlCtx, sql.AutoCommitSessionVar, false)
if err != nil {
return nil, errhand.VerboseErrorFromError(err)
}
doltSchema, err := sqlutil.FromDoltSchema(options.TableToWriteTo, writeSch)
if err != nil {
return nil, err
}
return &sqlEngineMover{
se: se,
contOnErr: options.ContinueOnErr,
force: options.Force,
database: dbName,
tableName: options.TableToWriteTo,
wrSch: doltSchema,
statsCB: statsCB,
importOption: options.Operation,
}, nil
}
// StartWriting implements the DataWriter interface.
func (s *sqlEngineMover) WriteRows(ctx context.Context, inputChannel chan sql.Row, badRowCb func(*pipeline.TransformRowFailure) bool) (err error) {
s.sqlCtx, err = s.se.NewContext(ctx)
if err != nil {
return err
}
err = s.forceDropTableIfNeeded()
if err != nil {
return err
}
_, _, err = s.se.Query(s.sqlCtx, fmt.Sprintf("START TRANSACTION"))
if err != nil {
return err
}
err = s.createOrEmptyTableIfNeeded()
if err != nil {
return err
}
updateStats := func(row sql.Row) {
if row == nil {
return
}
// If the length of the row does not match the schema then we have an update operation.
if len(row) != len(s.wrSch) {
oldRow := row[:len(row)/2]
newRow := row[len(row)/2:]
if ok, err := oldRow.Equals(newRow, s.wrSch); err == nil {
if ok {
s.stats.SameVal++
} else {
s.stats.Modifications++
}
}
} else {
s.stats.Additions++
}
}
insertOrUpdateOperation, err := s.getInsertNode(inputChannel)
if err != nil {
return err
}
iter, err := insertOrUpdateOperation.RowIter(s.sqlCtx, nil)
if err != nil {
return err
}
defer func() {
if err != nil {
iter.Close(s.sqlCtx) // save the error that should be propagated.
} else {
err = iter.Close(s.sqlCtx)
}
}()
for {
if s.statsCB != nil && atomic.LoadInt32(&s.statOps) >= tableWriterStatUpdateRate {
atomic.StoreInt32(&s.statOps, 0)
s.statsCB(s.stats)
}
row, err := iter.Next()
// All other errors are handled by the errorHandler
if err == nil {
_ = atomic.AddInt32(&s.statOps, 1)
updateStats(row)
} else if err == io.EOF {
atomic.LoadInt32(&s.statOps)
atomic.StoreInt32(&s.statOps, 0)
s.statsCB(s.stats)
return err
} else {
var offendingRow sql.Row
switch n := err.(type) {
case sql.WrappedInsertError:
offendingRow = n.OffendingRow
case sql.ErrInsertIgnore:
offendingRow = n.OffendingRow
}
trf := &pipeline.TransformRowFailure{Row: nil, SqlRow: offendingRow, TransformName: "write", Details: err.Error()}
quit := badRowCb(trf)
if quit {
return trf
}
}
}
}
// Commit implements the DataWriter interface.
func (s *sqlEngineMover) Commit(ctx context.Context) error {
_, _, err := s.se.Query(s.sqlCtx, "COMMIT")
return err
}
// GetSchema implements the DataWriter interface.
func (s *sqlEngineMover) Schema() sql.Schema {
return s.wrSch
}
// forceDropTableIfNeeded drop the given table in case the -f parameter is passed.
func (s *sqlEngineMover) forceDropTableIfNeeded() error {
if s.force {
_, _, err := s.se.Query(s.sqlCtx, fmt.Sprintf("DROP TABLE IF EXISTS %s", s.tableName))
return err
}
return nil
}
// createOrEmptyTableIfNeeded either creates or truncates the table given a -c or -r parameter.
func (s *sqlEngineMover) createOrEmptyTableIfNeeded() error {
switch s.importOption {
case CreateOp:
return s.createTable()
case ReplaceOp:
_, _, err := s.se.Query(s.sqlCtx, fmt.Sprintf("TRUNCATE TABLE %s", s.tableName))
return err
default:
return nil
}
}
// createTable creates a table.
func (s *sqlEngineMover) createTable() error {
cr := plan.NewCreateTable(sql.UnresolvedDatabase(s.database), s.tableName, false, false, &plan.TableSpec{Schema: s.wrSch})
analyzed, err := s.se.Analyze(s.sqlCtx, cr)
if err != nil {
return err
}
analyzedQueryProcess := analyzer.StripQueryProcess(analyzed.(*plan.QueryProcess))
ri, err := analyzedQueryProcess.RowIter(s.sqlCtx, nil)
if err != nil {
return err
}
for {
_, err = ri.Next()
if err != nil {
return ri.Close(s.sqlCtx)
}
}
}
// getInsertNode returns the sql.Node to be iterated on given the import option.
func (s *sqlEngineMover) getInsertNode(inputChannel chan sql.Row) (sql.Node, error) {
switch s.importOption {
case CreateOp, ReplaceOp:
return s.createInsertImportNode(inputChannel, s.contOnErr, false, nil) // contonerr translates to ignore
case UpdateOp:
return s.createInsertImportNode(inputChannel, s.contOnErr, false, generateOnDuplicateKeyExpressions(s.wrSch)) // contonerr translates to ignore
default:
return nil, fmt.Errorf("unsupported import type")
}
}
// createInsertImportNode creates the relevant/analyzed insert node given the import option. This insert node is wrapped
// with an error handler.
func (s *sqlEngineMover) createInsertImportNode(source chan sql.Row, ignore bool, replace bool, onDuplicateExpression []sql.Expression) (sql.Node, error) {
src := NewChannelRowSource(s.wrSch, source)
dest := plan.NewUnresolvedTable(s.tableName, s.database)
colNames := make([]string, 0)
for _, col := range s.wrSch {
colNames = append(colNames, col.Name)
}
insert := plan.NewInsertInto(sql.UnresolvedDatabase(s.database), dest, src, replace, colNames, onDuplicateExpression, ignore)
analyzed, err := s.se.Analyze(s.sqlCtx, insert)
if err != nil {
return nil, err
}
analyzed = analyzer.StripQueryProcess(analyzed)
// Get the first insert (wrapped with the error handler)
plan.Inspect(analyzed, func(node sql.Node) bool {
switch n := node.(type) {
case *plan.InsertInto:
analyzed = n
return false
default:
return true
}
})
return analyzed, nil
}
// generateOnDuplicateKeyExpressions generates the duplicate key expressions needed for the update import option.
func generateOnDuplicateKeyExpressions(sch sql.Schema) []sql.Expression {
ret := make([]sql.Expression, len(sch))
for i, col := range sch {
columnExpression := expression.NewUnresolvedColumn(col.Name)
functionExpression := expression.NewUnresolvedFunction("values", false, nil, expression.NewUnresolvedColumn(col.Name))
ret[i] = expression.NewSetField(columnExpression, functionExpression)
}
return ret
}

View File

@@ -82,58 +82,6 @@ func NewRowConverter(ctx context.Context, vrw types.ValueReadWriter, mapping *Fi
return &RowConverter{mapping, false, convFuncs}, nil
}
// NewImportRowConverter creates a row converter from a given FieldMapping specifically for importing.
func NewImportRowConverter(ctx context.Context, vrw types.ValueReadWriter, mapping *FieldMapping) (*RowConverter, error) {
if nec, err := isNecessary(mapping.SrcSch, mapping.DestSch, mapping.SrcToDest); err != nil {
return nil, err
} else if !nec {
return newIdentityConverter(mapping), nil
}
convFuncs := make(map[uint64]types.MarshalCallback, len(mapping.SrcToDest))
for srcTag, destTag := range mapping.SrcToDest {
destCol, destOk := mapping.DestSch.GetAllCols().GetByTag(destTag)
srcCol, srcOk := mapping.SrcSch.GetAllCols().GetByTag(srcTag)
if !destOk || !srcOk {
return nil, fmt.Errorf("Could not find column being mapped. src tag: %d, dest tag: %d", srcTag, destTag)
}
if srcCol.TypeInfo.Equals(destCol.TypeInfo) {
convFuncs[srcTag] = func(v types.Value) (types.Value, error) {
return v, nil
}
}
if typeinfo.IsStringType(destCol.TypeInfo) {
convFuncs[srcTag] = func(v types.Value) (types.Value, error) {
val, err := srcCol.TypeInfo.FormatValue(v)
if err != nil {
return nil, err
}
if val == nil {
return types.NullValue, nil
}
return types.String(*val), nil
}
} else if destCol.TypeInfo.Equals(typeinfo.PseudoBoolType) || destCol.TypeInfo.Equals(typeinfo.Int8Type) {
// BIT(1) and BOOLEAN (MySQL alias for TINYINT or Int8) are both logical stand-ins for a bool type
convFuncs[srcTag] = func(v types.Value) (types.Value, error) {
intermediateVal, err := typeinfo.Convert(ctx, vrw, v, srcCol.TypeInfo, typeinfo.BoolType)
if err != nil {
return nil, err
}
return typeinfo.Convert(ctx, vrw, intermediateVal, typeinfo.BoolType, destCol.TypeInfo)
}
} else {
convFuncs[srcTag] = func(v types.Value) (types.Value, error) {
return typeinfo.Convert(ctx, vrw, v, srcCol.TypeInfo, destCol.TypeInfo)
}
}
}
return &RowConverter{mapping, false, convFuncs}, nil
}
// Convert takes a row maps its columns to their destination columns, and performs any type conversion needed to create
// a row of the expected destination schema.
func (rc *RowConverter) Convert(inRow row.Row) (row.Row, error) {

View File

@@ -20,13 +20,10 @@ import (
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/schema/typeinfo"
"github.com/dolthub/dolt/go/libraries/doltcore/table/untyped"
"github.com/dolthub/dolt/go/store/types"
)
@@ -103,42 +100,3 @@ func TestUnneccessaryConversion(t *testing.T) {
t.Fatal("expected identity converter")
}
}
func TestSpecialBoolHandling(t *testing.T) {
col1, err := schema.NewColumnWithTypeInfo("pk", 0, typeinfo.Int64Type, true, "", false, "")
require.NoError(t, err)
col2, err := schema.NewColumnWithTypeInfo("v", 1, typeinfo.PseudoBoolType, false, "", false, "")
require.NoError(t, err)
colColl := schema.NewColCollection(col1, col2)
sch, err := schema.SchemaFromCols(colColl)
require.NoError(t, err)
untypedSch, err := untyped.UntypeSchema(sch)
require.NoError(t, err)
mapping, err := TagMapping(untypedSch, sch)
require.NoError(t, err)
vrw := types.NewMemoryValueStore()
rconv, err := NewImportRowConverter(context.Background(), vrw, mapping)
require.NoError(t, err)
inRow, err := row.New(vrw.Format(), untypedSch, row.TaggedValues{
0: types.String("76"),
1: types.String("true"),
})
require.NoError(t, err)
outData, err := rconv.Convert(inRow)
require.NoError(t, err)
require.NotNil(t, outData)
expected, err := row.New(vrw.Format(), mapping.DestSch, row.TaggedValues{
0: types.Int(76),
1: types.Uint(1),
})
require.NoError(t, err)
assert.True(t, row.AreEqual(outData, expected, mapping.DestSch))
rconvNoHandle, err := NewRowConverter(context.Background(), vrw, mapping)
require.NoError(t, err)
results, errStr := rconvNoHandle.Convert(inRow)
assert.Nil(t, results)
assert.NotEmpty(t, errStr)
}

View File

@@ -1519,7 +1519,7 @@ INSERT INTO child_non_unq VALUES ('1', 1), ('2', NULL), ('3', 3), ('4', 3), ('5'
require.NoError(t, err)
_, err = ExecuteSql(t, dEnv, root, "INSERT INTO child_unq VALUES ('6', 5)")
if assert.Error(t, err) {
assert.True(t, sql.ErrUniqueKeyViolation.Is(err))
assert.True(t, sql.ErrUniqueKeyViolation.Is(err.(sql.WrappedInsertError).Cause))
}
root, err = ExecuteSql(t, dEnv, root, "INSERT INTO child_non_unq VALUES ('6', 5)")
require.NoError(t, err)

View File

@@ -66,6 +66,10 @@ func MapToSqlIter(ctx context.Context, sch schema.Schema, data types.Map) (sql.R
// DoltRowToSqlRow constructs a go-mysql-server sql.Row from a Dolt row.Row.
func DoltRowToSqlRow(doltRow row.Row, sch schema.Schema) (sql.Row, error) {
if doltRow == nil {
return nil, nil
}
colVals := make(sql.Row, sch.GetAllCols().Size())
i := 0

View File

@@ -15,6 +15,8 @@
package pipeline
import (
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
)
@@ -22,13 +24,14 @@ import (
// failed and some details of the error
type TransformRowFailure struct {
Row row.Row
SqlRow sql.Row
TransformName string
Details string
}
// Error returns a string containing details of the error that occurred
func (trf *TransformRowFailure) Error() string {
return trf.TransformName + " failed processing"
return trf.TransformName + " failed processing due to: " + trf.Details
}
// IsTransformFailure will return true if the error is an instance of a TransformRowFailure
@@ -61,6 +64,17 @@ func GetTransFailureRow(err error) row.Row {
}
// GetTransFailureRow extracts the row that failed from an error that is an instance of a TransformRowFailure
func GetTransFailureSqlRow(err error) sql.Row {
trf, ok := err.(*TransformRowFailure)
if !ok {
panic("Verify error using IsTransformFailure before calling this.")
}
return trf.SqlRow
}
// GetTransFailureDetails extracts the details string from an error that is an instance of a TransformRowFailure
func GetTransFailureDetails(err error) string {
trf, ok := err.(*TransformRowFailure)

View File

@@ -30,7 +30,7 @@ func TestTransformRowFailure(t *testing.T) {
assert.NoError(t, err)
err = &TransformRowFailure{r, "transform_name", "details"}
err = &TransformRowFailure{r, nil, "transform_name", "details"}
if !IsTransformFailure(err) {
t.Error("should be transform failure")

View File

@@ -54,7 +54,7 @@ func ProcFuncForSourceFunc(sourceFunc SourceFunc) InFunc {
return
}
} else if table.IsBadRow(err) {
badRowChan <- &TransformRowFailure{table.GetBadRowRow(err), "reader", err.Error()}
badRowChan <- &TransformRowFailure{table.GetBadRowRow(err), nil, "reader", err.Error()}
} else {
p.StopWithErr(err)
return
@@ -106,7 +106,7 @@ func ProcFuncForSinkFunc(sinkFunc SinkFunc) OutFunc {
sql.ErrPrimaryKeyViolation.Is(err) ||
sql.ErrUniqueKeyViolation.Is(err) ||
errors.Is(err, editor.ErrDuplicateKey) {
badRowChan <- &TransformRowFailure{r.Row, "writer", err.Error()}
badRowChan <- &TransformRowFailure{r.Row, nil, "writer", err.Error()}
} else {
p.StopWithErr(err)
return

View File

@@ -88,7 +88,7 @@ func newRowTransformer(name string, transRowFunc TransformRowFunc) TransformFunc
}
if badRowDetails != "" {
badRowChan <- &TransformRowFailure{r.Row, name, badRowDetails}
badRowChan <- &TransformRowFailure{r.Row, nil, name, badRowDetails}
}
} else {
return

View File

@@ -454,3 +454,36 @@ teardown() {
run dolt sql -q "ALTER TABLE test ADD COLUMN v1 BIGINT DEFAULT (pk) AFTER v3"
[ "$status" -eq "1" ]
}
@test "default-values: Import with default values" {
dolt sql -q "CREATE TABLE test(pk BIGINT PRIMARY KEY, v1 BIGINT DEFAULT 2 NOT NULL, v2 int)"
dolt sql -q "INSERT INTO test (pk, v2) VALUES (1, 3), (2, 4)"
run dolt sql -q "SELECT * FROM test" -r=csv
[ "$status" -eq "0" ]
[[ "$output" =~ "pk,v1" ]] || false
[[ "$output" =~ "1,2,3" ]] || false
[[ "$output" =~ "2,2,4" ]] || false
[[ "${#lines[@]}" = "3" ]] || false
echo -e 'pk,v2\n3,5\n4,6'| dolt table import -u test
run dolt sql -q "SELECT * FROM test" -r=csv
[ "$status" -eq "0" ]
[[ "$output" =~ "pk,v1" ]] || false
[[ "$output" =~ "1,2,3" ]] || false
[[ "$output" =~ "2,2,4" ]] || false
[[ "$output" =~ "3,2,5" ]] || false
[[ "$output" =~ "4,2,6" ]] || false
[[ "${#lines[@]}" = "5" ]] || false
# validate that nulls are not accepted for a not null import
cat <<DELIM > bad-update.csv
pk,v1,v2
5,,5
6,,6
DELIM
run dolt table import -u test bad-update.csv
[ "$status" -eq "1" ]
[[ "$output" =~ "Bad Row: [5,<nil>,5]" ]] || false
[[ "$output" =~ "column name 'v1' is non-nullable but attempted to set a value of null" ]] || false
}

View File

@@ -91,7 +91,8 @@ teardown() {
@test "import-create-tables: create a table with json import. bad schema." {
run dolt table import -c -s `batshelper employees-sch-bad.sql` employees `batshelper employees-tbl.json`
[ "$status" -eq 1 ]
[[ "$output" =~ "Error determining the output schema" ]] || false
[[ "$output" =~ "Error creating reader for json file" ]] || false
[[ "$output" =~ "employees-tbl.json" ]] || false
[[ "$output" =~ "employees-sch-bad.sql" ]] || false
}
@@ -159,13 +160,9 @@ DELIM
run dolt table import -c --pk=id fktest 1pk5col-ints.csv
[ "$status" -eq 1 ]
[[ "$output" =~ "fktest already exists. Use -f to overwrite." ]] || false
run dolt table import -f -c --pk=pk fktest other.csv
[ "$status" -eq 0 ]
[[ "$output" =~ "Import completed successfully." ]] || false
run dolt schema show fktest
skip "cannot overwrite a table with foreign key constraints"
[ "$status" -eq 0 ]
[[ ! "$output" =~ "FOREIGN KEY" ]] || false
run dolt table import -c --pk=pk test 1pk5col-ints.csv -f
[ "$status" -eq 1 ]
[[ "$output" =~ 'since it is referenced from table `fktest`' ]] || false
}
@test "import-create-tables: try to create a table with a bad csv" {

View File

@@ -28,6 +28,11 @@ SQL
pk,c1,c2,c3,c4,c5
0,1,2,3,4,5
1,1,2,3,4,5
DELIM
cat <<DELIM > 1pk5col-ints-updt.csv
pk,c1,c2,c3,c4,c5
0,1,2,3,4,6
DELIM
cat <<SQL > employees-sch.sql
@@ -42,6 +47,14 @@ CREATE TABLE employees (
);
SQL
cat <<SQL > check-constraint-sch.sql
CREATE TABLE persons (
ID int NOT NULL,
LastName varchar(255) NOT NULL,
FirstName varchar(255),
Age int CHECK (Age>=18)
);
SQL
}
teardown() {
@@ -58,6 +71,18 @@ teardown() {
# Validate that a successful import with no bad rows does not print the following
! [[ "$output" =~ "The following rows were skipped:" ]] || false
# Run again to get correct Had No Effect amount
run dolt table import -u test `batshelper 1pk5col-ints.csv`
[ "$status" -eq 0 ]
[[ "$output" =~ "Rows Processed: 2, Additions: 0, Modifications: 0, Had No Effect: 2" ]] || false
[[ "$output" =~ "Import completed successfully." ]] || false
# Run another update for the correct modification amount
run dolt table import -u test 1pk5col-ints-updt.csv
[ "$status" -eq 0 ]
[[ "$output" =~ "Rows Processed: 1, Additions: 0, Modifications: 1, Had No Effect: 0" ]] || false
[[ "$output" =~ "Import completed successfully." ]] || false
}
@test "import-update-tables: update table using csv with null (blank) values" {
@@ -194,7 +219,7 @@ SQL
[[ "${lines[6]}" =~ "end date" ]] || false
}
@test "import-update-tables: update table with incorrect length char throws bad row error" {
@test "import-update-tables: updating table by inputting string longer than char column throws an error" {
cat <<DELIM > 1pk1col-rpt-chars.csv
pk,c
1,"123456"
@@ -204,11 +229,11 @@ DELIM
run dolt table import -u test 1pk1col-rpt-chars.csv
[ "$status" -eq 1 ]
[[ "$output" =~ "A bad row was encountered while moving data" ]] || false
[[ "$output" =~ "Bad Row:" ]] || false
[[ "$output" =~ '"123456" is not valid for column "c" (type "CHAR(5)")' ]] || false
[[ "$output" =~ "Bad Row: [1,123456]" ]] || false
[[ "$output" =~ 'string is too large for column' ]] || false
}
@test "import-update-tables: update table with repeat pk in csv throws error" {
@test "import-update-tables: update table with repeat pk in csv does not throw an error" {
cat <<DELIM > 1pk5col-rpt-ints.csv
pk,c1,c2,c3,c4,c5
1,1,2,3,4,5
@@ -217,48 +242,47 @@ DELIM
dolt sql < 1pk5col-ints-sch.sql
run dolt table import -u test 1pk5col-rpt-ints.csv
[ "$status" -eq 1 ]
[[ "$output" =~ "A bad row was encountered while moving data" ]] || false
[[ "$output" =~ "Bad Row: c4:4 | pk:1 | c3:3 | c5:5 | c1:1 | c2:2" ]] || false
# Works with --continue
run dolt table import -u --continue test 1pk5col-rpt-ints.csv
[ "$status" -eq 0 ]
[[ "$output" =~ "The following rows were skipped:" ]] || false
[[ "$output" =~ "1,1,2,3,4,5" ]] || false
[[ "$output" =~ "Rows Processed: 1, Additions: 1, Modifications: 0, Had No Effect: 0" ]] || false
[[ "$output" =~ "Lines skipped: 1" ]] || false
[[ "$output" =~ "Import completed successfully." ]] || false
run dolt sql -r csv -q "select * from test"
[ "${#lines[@]}" -eq 2 ]
[[ "${lines[1]}" =~ "1,1,2,3,4,5" ]] || false
}
@test "import-update-tables: importing into new table renders bad rows" {
cat <<DELIM > 1pk5col-rpt-ints.csv
pk,c1,c2,c3,c4,c5
1,1,2,3,4,5
1,1,2,3,4,7
1,1,2,3,4,8
cat <<DELIM > persons.csv
ID,LastName,FirstName,Age
1,"jon","doe", 20
2,"little","doe", 10
3,"little","doe",4
4,"little","doe",1
DELIM
dolt sql < 1pk5col-ints-sch.sql
run dolt table import -u --continue test 1pk5col-rpt-ints.csv
dolt sql < check-constraint-sch.sql
run dolt table import -u persons persons.csv
[ "$status" -eq 1 ]
[[ "$output" =~ "A bad row was encountered while moving data" ]] || false
[[ "$output" =~ "Bad Row:" ]] || false
[[ "$output" =~ "[2,little,doe,10]" ]] || false
run dolt table import -u --continue persons persons.csv
[ "$status" -eq 0 ]
[[ "$output" =~ "The following rows were skipped:" ]] || false
[[ "$output" =~ "1,1,2,3,4,7" ]] || false
[[ "$output" =~ "1,1,2,3,4,8" ]] || false
[[ "$output" =~ "[2,little,doe,10]" ]] || false
[[ "$output" =~ "[3,little,doe,4]" ]] || false
[[ "$output" =~ "[4,little,doe,1]" ]] || false
[[ "$output" =~ "Rows Processed: 1, Additions: 1, Modifications: 0, Had No Effect: 0" ]] || false
[[ "$output" =~ "Lines skipped: 2" ]] || false
[[ "$output" =~ "Import completed successfully." ]] || false
# Output to a file from the error stderr
dolt sql -q "DELETE FROM test WHERE pk = 1"
run dolt table import -u --continue test 1pk5col-rpt-ints.csv
[ "$status" -eq 0 ]
[[ "$output" =~ "The following rows were skipped:" ]] || false
[[ "$output" =~ "1,1,2,3,4,7" ]] || false
[[ "$output" =~ "1,1,2,3,4,8" ]] || false
run dolt sql -r csv -q "select * from persons"
[ "${#lines[@]}" -eq 2 ]
[[ "$output" =~ "ID,LastName,FirstName,Age" ]] || false
[[ "$output" =~ "1,jon,doe,20" ]] || false
}
@test "import-update-tables: subsequent runs of same import with duplicate keys produces no modifications" {
@test "import-update-tables: subsequent runs of same import with duplicate keys produces no difference in final data" {
cat <<DELIM > 1pk5col-rpt-ints.csv
pk,c1,c2,c3,c4,c5
1,1,2,3,4,5
@@ -268,10 +292,15 @@ DELIM
dolt sql < 1pk5col-ints-sch.sql
dolt table import -u --continue test 1pk5col-rpt-ints.csv
dolt commit -am "cm1"
run dolt table import -u --continue test 1pk5col-rpt-ints.csv
[ "$status" -eq 0 ]
skip "Running this file on repeat produces modifications"
! [[ "$output" =~ "Modifications: 2" ]] || falsa
[[ "$output" =~ "Modifications: 3" ]] || falsa
run dolt diff
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 0 ]
}
@test "import-update-tables: importing some columns does not overwrite columns not part of the import" {
@@ -285,3 +314,23 @@ DELIM
[ "$status" -eq 0 ]
[[ "$output" =~ "$EXPECTED" ]] || false
}
@test "import-update-tables: poorly written file correctly errors" {
cat <<DELIM > bad-updates.csv
pk,v1
5,5,
6,6,
DELIM
dolt sql -q "CREATE TABLE test(pk BIGINT PRIMARY KEY, v1 BIGINT DEFAULT 2 NOT NULL, v2 int)"
dolt sql -q "INSERT INTO test (pk, v1, v2) VALUES (1, 2, 3), (2, 3, 4)"
run dolt table import -u test bad-updates.csv
[ "$status" -eq 1 ]
[[ "$output" =~ "A bad row was encountered while moving data" ]] || false
[[ "$output" =~ "csv reader's schema expects 2 fields, but line only has 3 values" ]] || false
run dolt table import -u --continue test bad-updates.csv
[ "$status" -eq 0 ]
[[ "$output" =~ "Lines skipped: 2" ]] || false
}

View File

@@ -2112,13 +2112,6 @@ SQL
[[ "$output" =~ "pk1" ]] || false
[[ "$output" =~ "5" ]] || false
[[ "${#lines[@]}" = "2" ]] || false
dolt sql <<SQL
DELETE FROM onepk;
INSERT INTO onepk VALUES (6, 11, 55);
SQL
run dolt table import -u onepk `batshelper index_onepk.csv`
[ "$status" -eq "1" ]
[[ "$output" =~ "duplicate key" ]] || false
}
@test "index: UNIQUE dolt table import -r" {
@@ -2150,7 +2143,7 @@ SQL
dolt sql -q "DELETE FROM onepk"
run dolt table import -r onepk `batshelper index_onepk_non_unique.csv`
[ "$status" -eq "1" ]
[[ "$output" =~ "duplicate key" ]] || false
[[ "$output" =~ "duplicate unique key" ]] || false
}
@test "index: Merge without conflicts" {

View File

@@ -93,6 +93,28 @@ SQL
[[ "${#lines[@]}" = "2" ]] || false
}
@test "triggers: import with triggers" {
dolt sql <<SQL
CREATE TABLE test(pk BIGINT PRIMARY KEY, v1 BIGINT);
CREATE TRIGGER trigger1 BEFORE INSERT ON test FOR EACH ROW SET new.v1 = new.v1 + 1;
INSERT INTO test VALUES (1, 1);
SQL
run dolt sql -q "SELECT * FROM test" -r=csv
[ "$status" -eq "0" ]
[[ "$output" =~ "pk,v1" ]] || false
[[ "$output" =~ "1,2" ]] || false
[[ "${#lines[@]}" = "2" ]] || false
echo -e 'pk,v1\n2,2\n3,3'|dolt table import -u test
run dolt sql -q "SELECT * FROM test" -r=csv
[[ "$output" =~ "pk,v1" ]] || false
[[ "$output" =~ "1,2" ]] || false
[[ "$output" =~ "2,3" ]] || false
[[ "$output" =~ "3,4" ]] || false
[[ "${#lines[@]}" = "4" ]] || false
}
@test "triggers: Writing directly into dolt_schemas" {
dolt sql -q "CREATE TABLE test(pk BIGINT PRIMARY KEY, v1 BIGINT);"
dolt sql -q "CREATE VIEW view1 AS SELECT v1 FROM test;"