Finished killing pipeline package

This commit is contained in:
Zach Musgrave
2022-09-12 15:50:05 -07:00
parent a5e9cb06cb
commit 0b0d352483
7 changed files with 32 additions and 247 deletions
+18 -31
View File
@@ -42,7 +42,6 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/libraries/doltcore/table"
"github.com/dolthub/dolt/go/libraries/doltcore/table/pipeline"
"github.com/dolthub/dolt/go/libraries/utils/argparser"
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/libraries/utils/funcitr"
@@ -400,24 +399,12 @@ func (cmd ImportCmd) Exec(ctx context.Context, commandStr string, args []string,
skipped, err := move(ctx, rd, wr, mvOpts)
if err != nil {
if pipeline.IsTransformFailure(err) {
bdr := errhand.BuildDError("\nA bad row was encountered while moving data.")
r := pipeline.GetTransFailureSqlRow(err)
bdr := errhand.BuildDError("\nAn error occurred while moving data")
bdr.AddCause(err)
if r != nil {
bdr.AddDetails("Bad Row: " + sql.FormatRow(r))
}
bdr.AddDetails("Errors during import can be ignored using '--continue'")
details := pipeline.GetTransFailureDetails(err)
bdr.AddDetails(details)
bdr.AddDetails("These can be ignored using '--continue'")
return commands.HandleVErrAndExitCode(bdr.Build(), usage)
}
verr = errhand.BuildDError("An error occurred moving data:\n").AddCause(err).Build()
return commands.HandleVErrAndExitCode(verr, usage)
return commands.HandleVErrAndExitCode(bdr.Build(), usage)
}
cli.PrintErrln()
@@ -513,24 +500,29 @@ func newImportSqlEngineMover(ctx context.Context, dEnv *env.DoltEnv, rdSchema sc
return mv, nil
}
type badRowFn func(trf *pipeline.TransformRowFailure) (quit bool)
type badRowFn func(row sql.Row, err error) (quit bool)
func move(ctx context.Context, rd table.SqlRowReader, wr *mvdata.SqlEngineTableWriter, options *importOptions) (int64, error) {
g, ctx := errgroup.WithContext(ctx)
// Setup the necessary data points for the import job
// Set up the necessary data points for the import job
parsedRowChan := make(chan sql.Row)
var rowErr error
var printStarted bool
var badCount int64
badRowCB := func(trf *pipeline.TransformRowFailure) (quit bool) {
if !options.contOnErr {
rowErr = trf
return true
badRowCB := func(row sql.Row, err error) (quit bool) {
// record the first error encountered
if row != nil && rowErr == nil {
rowErr = fmt.Errorf("A bad row was encountered: %s: %w", sql.FormatRow(row), err)
}
atomic.AddInt64(&badCount, 1)
if !options.contOnErr {
return true
}
// Don't log the skipped rows when the ignore-skipped-rows param is specified.
if options.ignoreSkippedRows {
return false
@@ -541,11 +533,7 @@ func move(ctx context.Context, rd table.SqlRowReader, wr *mvdata.SqlEngineTableW
printStarted = true
}
r := pipeline.GetTransFailureSqlRow(trf)
if r != nil {
cli.PrintErr(sql.FormatRow(r), "\n")
}
cli.PrintErrln(sql.FormatRow(row))
return false
}
@@ -604,10 +592,9 @@ func moveRows(
if err != nil {
if table.IsBadRow(err) {
trf := &pipeline.TransformRowFailure{Row: nil, SqlRow: sqlRow, TransformName: "reader", Details: err.Error()}
quit := badRowCb(trf)
quit := badRowCb(sqlRow, err)
if quit {
return trf
return err
}
} else {
return err
+10 -11
View File
@@ -22,7 +22,6 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/rowconv"
"github.com/dolthub/dolt/go/libraries/doltcore/schema"
"github.com/dolthub/dolt/go/libraries/doltcore/table/pipeline"
"github.com/dolthub/dolt/go/store/types"
)
@@ -69,17 +68,17 @@ func (rdRd *RowDiffSource) GetSchema() schema.Schema {
// NextDiff reads a row from a table. If there is a bad row the returned error will be non nil, and calling IsBadRow(err)
// will be return true. This is a potentially non-fatal error and callers can decide if they want to continue on a bad row, or fail.
func (rdRd *RowDiffSource) NextDiff() (row.Row, pipeline.ImmutableProperties, error) {
func (rdRd *RowDiffSource) NextDiff() (row.Row, error) {
diffs, hasMore, err := rdRd.ad.GetDiffs(1, time.Second)
if err != nil {
return nil, pipeline.ImmutableProperties{}, err
return nil, err
}
if len(diffs) == 0 {
if !hasMore {
return nil, pipeline.NoProps, io.EOF
return nil, io.EOF
}
return nil, pipeline.NoProps, errors.New("timeout")
return nil, errors.New("timeout")
}
if len(diffs) != 1 {
@@ -97,12 +96,12 @@ func (rdRd *RowDiffSource) NextDiff() (row.Row, pipeline.ImmutableProperties, er
oldRow, err := row.FromNoms(sch, d.KeyValue.(types.Tuple), d.OldValue.(types.Tuple))
if err != nil {
return nil, pipeline.ImmutableProperties{}, err
return nil, err
}
rows[From], err = rdRd.oldRowConv.ConvertWithWarnings(oldRow, rdRd.warnFn)
if err != nil {
return nil, pipeline.NoProps, err
return nil, err
}
}
@@ -115,22 +114,22 @@ func (rdRd *RowDiffSource) NextDiff() (row.Row, pipeline.ImmutableProperties, er
newRow, err := row.FromNoms(sch, d.KeyValue.(types.Tuple), d.NewValue.(types.Tuple))
if err != nil {
return nil, pipeline.ImmutableProperties{}, err
return nil, err
}
rows[To], err = rdRd.newRowConv.ConvertWithWarnings(newRow, rdRd.warnFn)
if err != nil {
return nil, pipeline.NoProps, err
return nil, err
}
}
joinedRow, err := rdRd.joiner.Join(rows)
if err != nil {
return nil, pipeline.ImmutableProperties{}, err
return nil, err
}
return joinedRow, pipeline.ImmutableProperties{}, nil
return joinedRow, nil
}
// Close should release resources being held
@@ -34,7 +34,6 @@ import (
dsqle "github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/sqlutil"
"github.com/dolthub/dolt/go/libraries/doltcore/table/pipeline"
"github.com/dolthub/dolt/go/libraries/doltcore/table/typed/noms"
"github.com/dolthub/dolt/go/store/types"
)
@@ -184,7 +183,7 @@ func NewSqlEngineTableWriterWithEngine(ctx *sql.Context, eng *sqle.Engine, db ds
}, nil
}
func (s *SqlEngineTableWriter) WriteRows(ctx context.Context, inputChannel chan sql.Row, badRowCb func(*pipeline.TransformRowFailure) bool) (err error) {
func (s *SqlEngineTableWriter) WriteRows(ctx context.Context, inputChannel chan sql.Row, badRowCb func(row sql.Row, err error) bool) (err error) {
err = s.forceDropTableIfNeeded()
if err != nil {
return err
@@ -275,10 +274,9 @@ func (s *SqlEngineTableWriter) WriteRows(ctx context.Context, inputChannel chan
offendingRow = n.OffendingRow
}
trf := &pipeline.TransformRowFailure{Row: nil, SqlRow: offendingRow, TransformName: "write", Details: err.Error()}
quit := badRowCb(trf)
quit := badRowCb(offendingRow, err)
if quit {
return trf
return err
}
}
}
@@ -122,7 +122,7 @@ func newNomsDiffIter(ctx *sql.Context, ddb *doltdb.DoltDB, joiner *rowconv.Joine
// Next returns the next row
func (itr *diffRowItr) Next(ctx *sql.Context) (sql.Row, error) {
r, _, err := itr.diffSrc.NextDiff()
r, err := itr.diffSrc.NextDiff()
if err != nil {
return nil, err
@@ -1,87 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pipeline
import (
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
)
// TransformRowFailure is an error implementation that stores the row that failed to transform, the transform that
// failed and some details of the error
type TransformRowFailure struct {
Row row.Row
SqlRow sql.Row
TransformName string
Details string
}
// Error returns a string containing details of the error that occurred
func (trf *TransformRowFailure) Error() string {
return trf.TransformName + " failed processing due to: " + trf.Details
}
// IsTransformFailure will return true if the error is an instance of a TransformRowFailure
func IsTransformFailure(err error) bool {
_, ok := err.(*TransformRowFailure)
return ok
}
// GetTransFailureTransName extracts the name of the transform that failed from an error that is an instance of a
// TransformRowFailure
func GetTransFailureTransName(err error) string {
trf, ok := err.(*TransformRowFailure)
if !ok {
panic("Verify error using IsTransformFailure before calling this.")
}
return trf.TransformName
}
// GetTransFailureRow extracts the row that failed from an error that is an instance of a TransformRowFailure
func GetTransFailureRow(err error) row.Row {
trf, ok := err.(*TransformRowFailure)
if !ok {
panic("Verify error using IsTransformFailure before calling this.")
}
return trf.Row
}
// GetTransFailureRow extracts the row that failed from an error that is an instance of a TransformRowFailure
func GetTransFailureSqlRow(err error) sql.Row {
trf, ok := err.(*TransformRowFailure)
if !ok {
panic("Verify error using IsTransformFailure before calling this.")
}
return trf.SqlRow
}
// GetTransFailureDetails extracts the details string from an error that is an instance of a TransformRowFailure
func GetTransFailureDetails(err error) string {
trf, ok := err.(*TransformRowFailure)
if !ok {
panic("Verify error using IsTransformFailure before calling this.")
}
return trf.Details
}
@@ -1,55 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pipeline
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/dolthub/dolt/go/libraries/doltcore/row"
"github.com/dolthub/dolt/go/libraries/doltcore/table/untyped"
"github.com/dolthub/dolt/go/store/types"
)
func TestTransformRowFailure(t *testing.T) {
_, sch := untyped.NewUntypedSchema("a", "b", "c")
r, err := untyped.NewRowFromStrings(types.Format_Default, sch, []string{"1", "2", "3"})
assert.NoError(t, err)
err = &TransformRowFailure{r, nil, "transform_name", "details"}
if !IsTransformFailure(err) {
t.Error("should be transform failure")
}
tn := GetTransFailureTransName(err)
if tn != "transform_name" {
t.Error("Unexpected transform name:" + tn)
}
fr := GetTransFailureRow(err)
if !row.AreEqual(r, fr, sch) {
t.Error("unexpected row")
}
dets := GetTransFailureDetails(err)
if dets != "details" {
t.Error("unexpected details:" + dets)
}
}
@@ -1,57 +0,0 @@
// Copyright 2019 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pipeline
// ReadableMap is an interface that provides read only access to map properties
type ReadableMap interface {
// Get retrieves an element from the map, and a bool which says if there was a property that exists with that
// name at all
Get(propName string) (interface{}, bool)
}
// NoProps is an empty ImmutableProperties struct
var NoProps = ImmutableProperties{}
// ImmutableProperties is a map of properties which can't be edited after creation
type ImmutableProperties struct {
props map[string]interface{}
}
// Get retrieves an element from the map, and a bool which says if there was a property that exists with that name at all
func (ip ImmutableProperties) Get(propName string) (interface{}, bool) {
if ip.props == nil {
return nil, false
}
val, ok := ip.props[propName]
return val, ok
}
// Set will create a new ImmutableProperties struct whose values are the original properties combined with the provided
// updates
func (ip ImmutableProperties) Set(updates map[string]interface{}) ImmutableProperties {
numProps := len(updates) + len(ip.props)
allProps := make(map[string]interface{}, numProps)
for k, v := range ip.props {
allProps[k] = v
}
for k, v := range updates {
allProps[k] = v
}
return ImmutableProperties{allProps}
}