Vinai/log bad rows (#1475)

This pr enables logging for bad rows during table import.
This commit is contained in:
Vinai Rachakonda
2021-03-26 19:59:17 -04:00
committed by GitHub
parent 5d132a4450
commit b784e1fb35
9 changed files with 250 additions and 141 deletions

View File

@@ -16,10 +16,10 @@ package cli
import (
"fmt"
"io"
"os"
"path/filepath"
"github.com/dolthub/dolt/go/libraries/doltcore/mvdata"
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
"github.com/fatih/color"
@@ -31,6 +31,14 @@ var CliErr = color.Error
var ExecuteWithStdioRestored func(userFunc func())
var InStream io.ReadCloser = os.Stdin
var OutStream io.WriteCloser = os.Stdout
func SetIOStreams(inStream io.ReadCloser, outStream io.WriteCloser) {
InStream = inStream
OutStream = outStream
}
func InitIO() (restoreIO func()) {
stdOut, stdErr := os.Stdout, os.Stderr
@@ -40,7 +48,7 @@ func InitIO() (restoreIO func()) {
if err == nil {
os.Stdout = f
os.Stderr = f
mvdata.SetIOStreams(os.Stdin, iohelp.NopWrCloser(CliOut))
SetIOStreams(os.Stdin, iohelp.NopWrCloser(CliOut))
}
restoreIO = func() {
@@ -50,7 +58,7 @@ func InitIO() (restoreIO func()) {
os.Stdout = stdOut
os.Stderr = stdErr
mvdata.SetIOStreams(os.Stdin, os.Stdout)
SetIOStreams(os.Stdin, os.Stdout)
}
ExecuteWithStdioRestored = func(userFunc func()) {
@@ -58,14 +66,14 @@ func InitIO() (restoreIO func()) {
color.NoColor = true
os.Stdout = stdOut
os.Stderr = stdErr
mvdata.SetIOStreams(os.Stdin, os.Stdout)
SetIOStreams(os.Stdin, os.Stdout)
userFunc()
os.Stdout = f
os.Stderr = f
color.NoColor = initialNoColor
mvdata.SetIOStreams(os.Stdin, iohelp.NopWrCloser(CliOut))
SetIOStreams(os.Stdin, iohelp.NopWrCloser(CliOut))
}
return restoreIO

View File

@@ -54,7 +54,6 @@ import (
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/libraries/utils/iohelp"
"github.com/dolthub/dolt/go/libraries/utils/osutil"
"github.com/dolthub/dolt/go/libraries/utils/pipeline"
"github.com/dolthub/dolt/go/libraries/utils/tracing"
)
@@ -1242,15 +1241,6 @@ func mergeResultIntoStats(statement sqlparser.Statement, rowIter sql.RowIter, s
}
}
type resultFormat byte
const (
FormatTabular resultFormat = iota
FormatCsv
FormatJson
FormatNull // used for profiling
)
type sqlEngine struct {
dbs map[string]dsqle.Database
mrEnv env.MultiRepoEnv
@@ -1366,63 +1356,6 @@ func (se *sqlEngine) query(ctx *sql.Context, query string) (sql.Schema, sql.RowI
return se.engine.Query(ctx, query)
}
func PrettyPrintResults(ctx *sql.Context, resultFormat resultFormat, sqlSch sql.Schema, rowIter sql.RowIter) (rerr error) {
defer func() {
closeErr := rowIter.Close(ctx)
if rerr == nil && closeErr != nil {
rerr = closeErr
}
}()
if isOkResult(sqlSch) {
return printOKResult(rowIter)
}
// For some output formats, we want to convert everything to strings to be processed by the pipeline. For others,
// we want to leave types alone and let the writer figure out how to format it for output.
var p *pipeline.Pipeline
switch resultFormat {
case FormatCsv:
p = createCSVPipeline(ctx, sqlSch, rowIter)
case FormatJson:
p = createJSONPipeline(ctx, sqlSch, rowIter)
case FormatTabular:
p = createTabularPipeline(ctx, sqlSch, rowIter)
case FormatNull:
p = createNullPipeline(ctx, sqlSch, rowIter)
}
p.Start(ctx)
rerr = p.Wait()
return rerr
}
func printOKResult(iter sql.RowIter) (returnErr error) {
row, err := iter.Next()
if err != nil {
return err
}
if okResult, ok := row[0].(sql.OkResult); ok {
rowNoun := "row"
if okResult.RowsAffected != 1 {
rowNoun = "rows"
}
cli.Printf("Query OK, %d %s affected\n", okResult.RowsAffected, rowNoun)
if okResult.Info != nil {
cli.Printf("%s\n", okResult.Info)
}
}
return nil
}
func isOkResult(sch sql.Schema) bool {
return sch.Equals(sql.OkResultSchema)
}
func (se *sqlEngine) dbddl(ctx *sql.Context, dbddl *sqlparser.DBDDL, query string) (sql.Schema, sql.RowIter, error) {
action := strings.ToLower(dbddl.Action)
var rowIter sql.RowIter = nil

View File

@@ -20,24 +20,90 @@ import (
"context"
"fmt"
"io"
"strconv"
"strings"
"time"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/vitess/go/sqltypes"
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/libraries/doltcore/table/untyped/csv"
"github.com/dolthub/dolt/go/libraries/doltcore/table/untyped/fwt"
"github.com/dolthub/dolt/go/libraries/utils/pipeline"
)
type resultFormat byte
const (
FormatTabular resultFormat = iota
FormatCsv
FormatJson
FormatNull // used for profiling
)
const (
readBatchSize = 10
writeBatchSize = 1
)
func PrettyPrintResults(ctx *sql.Context, resultFormat resultFormat, sqlSch sql.Schema, rowIter sql.RowIter) (rerr error) {
defer func() {
closeErr := rowIter.Close(ctx)
if rerr == nil && closeErr != nil {
rerr = closeErr
}
}()
if isOkResult(sqlSch) {
return printOKResult(rowIter)
}
// For some output formats, we want to convert everything to strings to be processed by the pipeline. For others,
// we want to leave types alone and let the writer figure out how to format it for output.
var p *pipeline.Pipeline
switch resultFormat {
case FormatCsv:
p = createCSVPipeline(ctx, sqlSch, rowIter)
case FormatJson:
p = createJSONPipeline(ctx, sqlSch, rowIter)
case FormatTabular:
p = createTabularPipeline(ctx, sqlSch, rowIter)
case FormatNull:
p = createNullPipeline(ctx, sqlSch, rowIter)
}
p.Start(ctx)
rerr = p.Wait()
return rerr
}
func printOKResult(iter sql.RowIter) (returnErr error) {
row, err := iter.Next()
if err != nil {
return err
}
if okResult, ok := row[0].(sql.OkResult); ok {
rowNoun := "row"
if okResult.RowsAffected != 1 {
rowNoun = "rows"
}
cli.Printf("Query OK, %d %s affected\n", okResult.RowsAffected, rowNoun)
if okResult.Info != nil {
cli.Printf("%s\n", okResult.Info)
}
}
return nil
}
func isOkResult(sch sql.Schema) bool {
return sch.Equals(sql.OkResultSchema)
}
// noParallelizationInitFunc only exists to validate the routine wasn't parallelized
func noParallelizationInitFunc(ctx context.Context, index int) error {
if index != 0 {
@@ -47,50 +113,6 @@ func noParallelizationInitFunc(ctx context.Context, index int) error {
return nil
}
// sqlColToStr is a utility function for converting a sql column of type interface{} to a string
func sqlColToStr(col interface{}) string {
if col != nil {
switch typedCol := col.(type) {
case int:
return strconv.FormatInt(int64(typedCol), 10)
case int32:
return strconv.FormatInt(int64(typedCol), 10)
case int64:
return strconv.FormatInt(int64(typedCol), 10)
case int16:
return strconv.FormatInt(int64(typedCol), 10)
case int8:
return strconv.FormatInt(int64(typedCol), 10)
case uint:
return strconv.FormatUint(uint64(typedCol), 10)
case uint32:
return strconv.FormatUint(uint64(typedCol), 10)
case uint64:
return strconv.FormatUint(uint64(typedCol), 10)
case uint16:
return strconv.FormatUint(uint64(typedCol), 10)
case uint8:
return strconv.FormatUint(uint64(typedCol), 10)
case float64:
return strconv.FormatFloat(float64(typedCol), 'g', -1, 64)
case float32:
return strconv.FormatFloat(float64(typedCol), 'g', -1, 32)
case string:
return typedCol
case bool:
if typedCol {
return "true"
} else {
return "false"
}
case time.Time:
return typedCol.Format("2006-01-02 15:04:05.999999 -0700 MST")
}
}
return ""
}
// getReadStageFunc is a general purpose stage func used by multiple pipelines to read the rows into batches
func getReadStageFunc(iter sql.RowIter, batchSize int) pipeline.StageFunc {
isDone := false
@@ -188,7 +210,7 @@ func csvProcessStageFunc(ctx context.Context, items []pipeline.ItemWithProps) ([
for colNum, col := range r {
if col != nil {
str := sqlColToStr(col)
str := sqlutil.SqlColToStr(col)
colValStrs[colNum] = &str
} else {
colValStrs[colNum] = nil
@@ -209,7 +231,6 @@ func csvProcessStageFunc(ctx context.Context, items []pipeline.ItemWithProps) ([
}
// JSON pipeline creation and stage functions
func createJSONPipeline(_ context.Context, sch sql.Schema, iter sql.RowIter) *pipeline.Pipeline {
p := pipeline.NewPipeline(
pipeline.NewStage("read", noParallelizationInitFunc, getReadStageFunc(iter, readBatchSize), 0, 0, 0),
@@ -255,7 +276,7 @@ func getJSONProcessFunc(sch sql.Schema) pipeline.StageFunc {
}
validCols++
colStr := sqlColToStr(col)
colStr := sqlutil.SqlColToStr(col)
colStr = strings.Replace(colStr, "\"", "\\\"", -1)
str := fmt.Sprintf(formats[colNum], colStr)
sb.WriteString(str)
@@ -302,7 +323,6 @@ func writeJSONToCliOutStageFunc(ctx context.Context, items []pipeline.ItemWithPr
}
// tabular pipeline creation and pipeline functions
func createTabularPipeline(_ context.Context, sch sql.Schema, iter sql.RowIter) *pipeline.Pipeline {
const samplesForAutoSizing = 10000
tps := &tabularPipelineStages{}
@@ -350,7 +370,7 @@ func rowsToStringSlices(_ context.Context, items []pipeline.ItemWithProps) ([]pi
}
if !isNull {
cols[colNum] = sqlColToStr(col)
cols[colNum] = sqlutil.SqlColToStr(col)
} else {
cols[colNum] = "NULL"
}

View File

@@ -146,6 +146,7 @@ github.com/denisbrodbeck/machineid v1.0.1 h1:geKr9qtkB876mXguW2X6TU4ZynleN6ezuMS
github.com/denisbrodbeck/machineid v1.0.1/go.mod h1:dJUwb7PTidGDeYyUBmXZ2GphQBbjJCrnectwCyxcUSI=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no=
github.com/dolthub/dolt v1.0.20 h1:Y9wbWxj2eD5XhnEWDsFGCB9onV7AIltmSneuyZK2Ep0=
github.com/dolthub/fslock v0.0.2 h1:8vUh47iKovgrtXNrXVIzsIoWLlspoXg+3nslhUzgKSw=
github.com/dolthub/fslock v0.0.2/go.mod h1:0i7bsNkK+XHwFL3dIsSWeXSV7sykVzzVr6+jq8oeEo0=
github.com/dolthub/go-mysql-server v0.8.1-0.20210319025306-7fcc1fc72450 h1:N32fDj4eLZ+KKEIF2NtWd/t/qt+YWg8Wkk0XXezOqdo=

View File

@@ -17,28 +17,18 @@ package mvdata
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/table"
"github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
)
var InStream io.ReadCloser = os.Stdin
var OutStream io.WriteCloser = os.Stdout
func SetIOStreams(inStream io.ReadCloser, outStream io.WriteCloser) {
InStream = inStream
OutStream = outStream
}
// DataFormat is an enumeration of the valid data formats
type DataFormat string
@@ -116,7 +106,7 @@ func NewDataLocation(path, fileFmtStr string) DataLocation {
dataFmt := DFFromString(fileFmtStr)
if len(path) == 0 {
return StreamDataLocation{Format: dataFmt, Reader: InStream, Writer: OutStream}
return StreamDataLocation{Format: dataFmt, Reader: cli.InStream, Writer: cli.OutStream}
} else if fileFmtStr == "" {
if doltdb.IsValidTableName(path) {
return TableDataLocation{path}

View File

@@ -15,11 +15,16 @@
package mvdata
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"sync/atomic"
"github.com/dolthub/dolt/go/cmd/dolt/cli"
"github.com/dolthub/dolt/go/libraries/doltcore/table/untyped/csv"
"github.com/dolthub/dolt/go/cmd/dolt/errhand"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
@@ -97,7 +102,7 @@ type GCTableWriteCloser interface {
// Move is the method that executes the pipeline which will move data from the pipeline's source DataLocation to it's
// dest DataLocation. It returns the number of bad rows encountered during import, and an error.
func (imp *DataMover) Move(ctx context.Context) (badRowCount int64, err error) {
func (imp *DataMover) Move(ctx context.Context, sch schema.Schema) (badRowCount int64, err error) {
defer imp.Rd.Close(ctx)
defer func() {
closeErr := imp.Wr.Close(ctx)
@@ -114,12 +119,28 @@ func (imp *DataMover) Move(ctx context.Context) (badRowCount int64, err error) {
var badCount int64
var rowErr error
var printStarted bool
var b bytes.Buffer
badRowCB := func(trf *pipeline.TransformRowFailure) (quit bool) {
if !imp.ContOnErr {
rowErr = trf
return true
}
if !printStarted {
cli.PrintErrln("The following rows were skipped:")
printStarted = true
}
r := pipeline.GetTransFailureRow(trf)
if r != nil {
err = writeBadRowToCli(r, sch, &b)
if err != nil {
return true
}
}
atomic.AddInt64(&badCount, 1)
return false
}
@@ -132,7 +153,6 @@ func (imp *DataMover) Move(ctx context.Context) (badRowCount int64, err error) {
p.Start()
err = p.Wait()
if err != nil {
return 0, err
}
@@ -144,12 +164,48 @@ func (imp *DataMover) Move(ctx context.Context) (badRowCount int64, err error) {
return badCount, nil
}
// writeBadRowToCli prints a bad row in a csv form to STDERR.
func writeBadRowToCli(r row.Row, sch schema.Schema, b *bytes.Buffer) error {
sqlRow, err := sqlutil.DoltRowToSqlRow(r, sch)
if err != nil {
return err
}
wr := bufio.NewWriter(b)
colValStrs := make([]*string, len(sqlRow))
for colNum, col := range sqlRow {
if col != nil {
str := sqlutil.SqlColToStr(col)
colValStrs[colNum] = &str
} else {
colValStrs[colNum] = nil
}
}
err = csv.WriteCSVRow(wr, colValStrs, ",", false)
if err != nil {
return err
}
err = wr.Flush()
if err != nil {
return err
}
str := b.String()
cli.PrintErr(str)
return nil
}
func MoveDataToRoot(ctx context.Context, mover *DataMover, mvOpts DataMoverOptions, root *doltdb.RootValue, updateRoot func(c context.Context, r *doltdb.RootValue) error) (*doltdb.RootValue, int64, errhand.VerboseError) {
var badCount int64
var err error
newRoot := &doltdb.RootValue{}
badCount, err = mover.Move(ctx)
badCount, err = mover.Move(ctx, mover.Wr.GetSchema())
if err != nil {
if pipeline.IsTransformFailure(err) {
@@ -157,7 +213,7 @@ func MoveDataToRoot(ctx context.Context, mover *DataMover, mvOpts DataMoverOptio
r := pipeline.GetTransFailureRow(err)
if r != nil {
bdr.AddDetails("Bad Row:" + row.Fmt(ctx, r, mover.Rd.GetSchema()))
bdr.AddDetails("Bad Row: " + row.Fmt(ctx, r, mover.Wr.GetSchema()))
}
details := pipeline.GetTransFailureDetails(err)

View File

@@ -18,6 +18,8 @@ import (
"context"
"errors"
"fmt"
"strconv"
"time"
"github.com/dolthub/go-mysql-server/sql"
@@ -173,3 +175,49 @@ func keylessDoltRowFromSqlRow(ctx context.Context, vrw types.ValueReadWriter, sq
return row.KeylessRow(vrw.Format(), vals[:j]...)
}
// SqlColToStr is a utility function for converting a sql column of type interface{} to a string
func SqlColToStr(col interface{}) string {
if col != nil {
switch typedCol := col.(type) {
case int:
return strconv.FormatInt(int64(typedCol), 10)
case int32:
return strconv.FormatInt(int64(typedCol), 10)
case int64:
return strconv.FormatInt(int64(typedCol), 10)
case int16:
return strconv.FormatInt(int64(typedCol), 10)
case int8:
return strconv.FormatInt(int64(typedCol), 10)
case uint:
return strconv.FormatUint(uint64(typedCol), 10)
case uint32:
return strconv.FormatUint(uint64(typedCol), 10)
case uint64:
return strconv.FormatUint(uint64(typedCol), 10)
case uint16:
return strconv.FormatUint(uint64(typedCol), 10)
case uint8:
return strconv.FormatUint(uint64(typedCol), 10)
case float64:
return strconv.FormatFloat(float64(typedCol), 'g', -1, 64)
case float32:
return strconv.FormatFloat(float64(typedCol), 'g', -1, 32)
case string:
return typedCol
case bool:
if typedCol {
return "true"
} else {
return "false"
}
case time.Time:
return typedCol.Format("2006-01-02 15:04:05.999999 -0700 MST")
default:
return fmt.Sprintf("%v", typedCol)
}
}
return ""
}

View File

@@ -529,3 +529,21 @@ DELIM
# less than 10% smaller
[ "$BEFORE" -lt $(($AFTER * 11 / 10)) ]
}
@test "import-create-tables: table import -c --continue logs bad rows" {
cat <<DELIM > 1pk5col-rpt-ints.csv
pk,c1,c2,c3,c4,c5
1,1,2,3,4,5
1,1,2,3,4,7
1,1,2,3,4,8
DELIM
run dolt table import -c --continue --pk=pk test 1pk5col-rpt-ints.csv
[ "$status" -eq 0 ]
[[ "$output" =~ "The following rows were skipped:" ]] || false
[[ "$output" =~ "1,1,2,3,4,7" ]] || false
[[ "$output" =~ "1,1,2,3,4,8" ]] || false
[[ "$output" =~ "Rows Processed: 1, Additions: 1, Modifications: 0, Had No Effect: 0" ]] || false
[[ "$output" =~ "Lines skipped: 2" ]] || false
[[ "$output" =~ "Import completed successfully." ]] || false
}

View File

@@ -47,6 +47,9 @@ teardown() {
[ "$status" -eq 0 ]
[[ "$output" =~ "Rows Processed: 2, Additions: 2, Modifications: 0, Had No Effect: 0" ]] || false
[[ "$output" =~ "Import completed successfully." ]] || false
# Validate that a successful import with no bad rows does not print the following
! [[ "$output" =~ "The following rows were skipped:" ]] || false
}
@test "import-update-tables: update table using schema with csv" {
@@ -182,10 +185,42 @@ DELIM
dolt sql < 1pk5col-ints-sch.sql
run dolt table import -u test 1pk5col-rpt-ints.csv
[ "$status" -eq 1 ]
[[ "$output" =~ "A bad row was encountered while moving data" ]] || false
[[ "$output" =~ "Bad Row: c4:4 | pk:1 | c3:3 | c5:5 | c1:1 | c2:2" ]] || false
# Works with --continue
run dolt table import -u --continue test 1pk5col-rpt-ints.csv
[ "$status" -eq 0 ]
[[ "$output" =~ "Rows Processed: 1, Additions: 1, Modifications: 0, Had No Effect: 0Lines skipped: 1" ]] || false
[[ "$output" =~ "The following rows were skipped:" ]] || false
[[ "$output" =~ "1,1,2,3,4,5" ]] || false
[[ "$output" =~ "Rows Processed: 1, Additions: 1, Modifications: 0, Had No Effect: 0" ]] || false
[[ "$output" =~ "Lines skipped: 1" ]] || false
[[ "$output" =~ "Import completed successfully." ]] || false
}
@test "import-update-tables: importing into new table renders bad rows" {
cat <<DELIM > 1pk5col-rpt-ints.csv
pk,c1,c2,c3,c4,c5
1,1,2,3,4,5
1,1,2,3,4,7
1,1,2,3,4,8
DELIM
dolt sql < 1pk5col-ints-sch.sql
run dolt table import -u --continue test 1pk5col-rpt-ints.csv
[ "$status" -eq 0 ]
[[ "$output" =~ "The following rows were skipped:" ]] || false
[[ "$output" =~ "1,1,2,3,4,7" ]] || false
[[ "$output" =~ "1,1,2,3,4,8" ]] || false
[[ "$output" =~ "Rows Processed: 1, Additions: 1, Modifications: 0, Had No Effect: 0" ]] || false
[[ "$output" =~ "Lines skipped: 2" ]] || false
[[ "$output" =~ "Import completed successfully." ]] || false
# Output to a file from the error stderr
dolt sql -q "DELETE FROM test WHERE pk = 1"
dolt table import -u --continue test 1pk5col-rpt-ints.csv 2> skipped.csv
run cat skipped.csv
[[ "$output" =~ "The following rows were skipped:" ]] || false
[[ "$output" =~ "1,1,2,3,4,7" ]] || false
[[ "$output" =~ "1,1,2,3,4,8" ]] || false
}