mirror of
https://github.com/dolthub/dolt.git
synced 2026-05-06 11:20:30 -05:00
shuffling
This commit is contained in:
@@ -16,6 +16,7 @@ package tblcmds
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/table"
|
||||
|
||||
"github.com/fatih/color"
|
||||
|
||||
@@ -158,31 +159,9 @@ func (cmd CpCmd) Exec(ctx context.Context, commandStr string, args []string, dEn
|
||||
return commands.HandleVErrAndExitCode(verr, usage)
|
||||
}
|
||||
|
||||
//TODO: change this to not use the executeImport function, and instead the SQL code path
|
||||
newWorking, err := dEnv.WorkingRoot(ctx)
|
||||
if err != nil {
|
||||
return commands.HandleVErrAndExitCode(errhand.BuildDError("Unable to load the working set to build the indexes.").AddCause(err).Build(), nil)
|
||||
}
|
||||
updatedTable, ok, err := newWorking.GetTable(ctx, newTbl)
|
||||
if err != nil {
|
||||
return commands.HandleVErrAndExitCode(errhand.BuildDError("Unable to load the table to build the indexes.").AddCause(err).Build(), nil)
|
||||
} else if !ok {
|
||||
return commands.HandleVErrAndExitCode(errhand.BuildDError("Unable to find the table to build the indexes.").Build(), nil)
|
||||
}
|
||||
updatedTable, err = updatedTable.RebuildIndexData(ctx)
|
||||
if err != nil {
|
||||
return commands.HandleVErrAndExitCode(errhand.BuildDError("Unable to build the indexes.").AddCause(err).Build(), nil)
|
||||
}
|
||||
newWorking, err = newWorking.PutTable(ctx, newTbl, updatedTable)
|
||||
if err != nil {
|
||||
return commands.HandleVErrAndExitCode(errhand.BuildDError("Unable to write the indexes to the working set.").AddCause(err).Build(), nil)
|
||||
}
|
||||
err = dEnv.UpdateWorkingRoot(ctx, newWorking)
|
||||
if err != nil {
|
||||
return commands.HandleVErrAndExitCode(errhand.BuildDError("Unable to update the working set containing the indexes.").AddCause(err).Build(), nil)
|
||||
}
|
||||
verr = buildNewIndexes(ctx, dEnv, newTbl)
|
||||
|
||||
return 0
|
||||
return commands.HandleVErrAndExitCode(verr, usage)
|
||||
}
|
||||
|
||||
func executeCopy(ctx context.Context, dEnv *env.DoltEnv, mvOpts *mvdata.MoveOptions) errhand.VerboseError {
|
||||
@@ -201,7 +180,7 @@ func executeCopy(ctx context.Context, dEnv *env.DoltEnv, mvOpts *mvdata.MoveOpti
|
||||
}
|
||||
}
|
||||
|
||||
mover, nDMErr := mvdata.NewTableCopyDataMover(ctx, root, dEnv.FS, mvOpts, importStatsCB)
|
||||
mover, nDMErr := NewTableCopyDataMover(ctx, root, dEnv.FS, mvOpts, importStatsCB)
|
||||
|
||||
if nDMErr != nil {
|
||||
return newDataMoverErrToVerr(mvOpts, nDMErr)
|
||||
@@ -264,3 +243,73 @@ func executeCopy(ctx context.Context, dEnv *env.DoltEnv, mvOpts *mvdata.MoveOpti
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewTableCopyDataMover(ctx context.Context, root *doltdb.RootValue, fs filesys.Filesys, mvOpts *mvdata.MoveOptions, statsCB noms.StatsCB) (*mvdata.DataMover, *mvdata.DataMoverCreationError) {
|
||||
var rd table.TableReadCloser
|
||||
var err error
|
||||
|
||||
rd, srcIsSorted, err := mvOpts.Src.NewReader(ctx, root, fs, mvOpts.SrcOptions)
|
||||
|
||||
if err != nil {
|
||||
return nil, &mvdata.DataMoverCreationError{mvdata.CreateReaderErr, err}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if rd != nil {
|
||||
rd.Close(ctx)
|
||||
}
|
||||
}()
|
||||
|
||||
inSch := rd.GetSchema()
|
||||
cc, err := root.GenerateTagsForNewColColl(ctx, mvOpts.TableName, inSch.GetAllCols())
|
||||
|
||||
if err != nil {
|
||||
return nil, &mvdata.DataMoverCreationError{mvdata.SchemaErr, err}
|
||||
}
|
||||
|
||||
outSch := schema.SchemaFromCols(cc)
|
||||
|
||||
transforms, dmce := mvdata.MaybeMapFields(inSch, outSch, fs, mvOpts)
|
||||
|
||||
if dmce != nil {
|
||||
return nil, dmce
|
||||
}
|
||||
|
||||
wr, err := mvOpts.Dest.NewCreatingWriter(ctx, mvOpts, root, fs, srcIsSorted, outSch, statsCB)
|
||||
|
||||
if err != nil {
|
||||
return nil, &mvdata.DataMoverCreationError{mvdata.CreateWriterErr, err}
|
||||
}
|
||||
|
||||
imp := &mvdata.DataMover{rd, transforms, wr, mvOpts.ContOnErr}
|
||||
rd = nil
|
||||
|
||||
return imp, nil
|
||||
}
|
||||
|
||||
func buildNewIndexes(ctx context.Context, dEnv *env.DoltEnv, newTblName string) errhand.VerboseError {
|
||||
//TODO: change this to not use the executeImport function, and instead the SQL code path
|
||||
newWorking, err := dEnv.WorkingRoot(ctx)
|
||||
if err != nil {
|
||||
return errhand.BuildDError("Unable to load the working set to build the indexes.").AddCause(err).Build()
|
||||
}
|
||||
updatedTable, ok, err := newWorking.GetTable(ctx, newTblName)
|
||||
if err != nil {
|
||||
return errhand.BuildDError("Unable to load the table to build the indexes.").AddCause(err).Build()
|
||||
} else if !ok {
|
||||
return errhand.BuildDError("Unable to find the table to build the indexes.").Build()
|
||||
}
|
||||
updatedTable, err = updatedTable.RebuildIndexData(ctx)
|
||||
if err != nil {
|
||||
return errhand.BuildDError("Unable to build the indexes.").AddCause(err).Build()
|
||||
}
|
||||
newWorking, err = newWorking.PutTable(ctx, newTblName, updatedTable)
|
||||
if err != nil {
|
||||
return errhand.BuildDError("Unable to write the indexes to the working set.").AddCause(err).Build()
|
||||
}
|
||||
err = dEnv.UpdateWorkingRoot(ctx, newWorking)
|
||||
if err != nil {
|
||||
return errhand.BuildDError("Unable to update the working set containing the indexes.").AddCause(err).Build()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -16,6 +16,8 @@ package tblcmds
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/table"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/table/typed/noms"
|
||||
"os"
|
||||
|
||||
"github.com/fatih/color"
|
||||
@@ -28,9 +30,7 @@ import (
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/env"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/mvdata"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/row"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/schema"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/table/pipeline"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/table/typed/noms"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/utils/argparser"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/utils/filesys"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/utils/iohelp"
|
||||
@@ -177,6 +177,7 @@ func (cmd ExportCmd) Exec(ctx context.Context, commandStr string, args []string,
|
||||
return 0
|
||||
}
|
||||
|
||||
// todo: remerge these and pass data_mover
|
||||
func executeExport(ctx context.Context, dEnv *env.DoltEnv, mvOpts *mvdata.MoveOptions) errhand.VerboseError {
|
||||
root, err := dEnv.WorkingRoot(ctx)
|
||||
|
||||
@@ -193,7 +194,7 @@ func executeExport(ctx context.Context, dEnv *env.DoltEnv, mvOpts *mvdata.MoveOp
|
||||
}
|
||||
}
|
||||
|
||||
mover, nDMErr := mvdata.NewExportDataMover(ctx, root, dEnv.FS, mvOpts, importStatsCB)
|
||||
mover, nDMErr := NewExportDataMover(ctx, root, dEnv.FS, mvOpts, importStatsCB)
|
||||
|
||||
if nDMErr != nil {
|
||||
return newDataMoverErrToVerr(mvOpts, nDMErr)
|
||||
@@ -226,33 +227,44 @@ func executeExport(ctx context.Context, dEnv *env.DoltEnv, mvOpts *mvdata.MoveOp
|
||||
return errhand.BuildDError("An error occurred moving data:\n").AddCause(err).Build()
|
||||
}
|
||||
|
||||
if nomsWr, ok := mover.Wr.(noms.NomsMapWriteCloser); ok {
|
||||
var indexes []schema.Index
|
||||
|
||||
if tableLoc, ok := mvOpts.Src.(mvdata.TableDataLocation); ok {
|
||||
originalTable, ok, err := root.GetTable(ctx, tableLoc.Name)
|
||||
if err != nil || !ok {
|
||||
return errhand.BuildDError(color.RedString("Source table does not exist.")).Build()
|
||||
}
|
||||
originalSchema, err := originalTable.GetSchema(ctx)
|
||||
if err != nil || !ok {
|
||||
return errhand.BuildDError(color.RedString("Failed to read source table's schema.")).Build()
|
||||
}
|
||||
indexes = originalSchema.Indexes().AllIndexes()
|
||||
}
|
||||
|
||||
tableDest := mvOpts.Dest.(mvdata.TableDataLocation)
|
||||
sch := nomsWr.GetSchema()
|
||||
sch.Indexes().Merge(indexes...)
|
||||
err = dEnv.PutTableToWorking(ctx, *nomsWr.GetMap(), sch, tableDest.Name)
|
||||
if err != nil {
|
||||
return errhand.BuildDError("Failed to update the working value.").AddCause(err).Build()
|
||||
}
|
||||
}
|
||||
|
||||
if badCount > 0 {
|
||||
cli.PrintErrln(color.YellowString("Lines skipped: %d", badCount))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewExportDataMover(ctx context.Context, root *doltdb.RootValue, fs filesys.Filesys, mvOpts *mvdata.MoveOptions, statsCB noms.StatsCB) (*mvdata.DataMover, *mvdata.DataMoverCreationError) {
|
||||
var rd table.TableReadCloser
|
||||
var err error
|
||||
|
||||
rd, srcIsSorted, err := mvOpts.Src.NewReader(ctx, root, fs, mvOpts.SrcOptions)
|
||||
|
||||
if err != nil {
|
||||
return nil, &mvdata.DataMoverCreationError{mvdata.CreateReaderErr, err}
|
||||
}
|
||||
|
||||
// close on err exit
|
||||
defer func() {
|
||||
if rd != nil {
|
||||
rd.Close(ctx)
|
||||
}
|
||||
}()
|
||||
|
||||
inSch := rd.GetSchema()
|
||||
outSch := inSch
|
||||
|
||||
wr, err := mvOpts.Dest.NewCreatingWriter(ctx, mvOpts, root, fs, srcIsSorted, outSch, statsCB)
|
||||
|
||||
if err != nil {
|
||||
return nil, &mvdata.DataMoverCreationError{mvdata.CreateWriterErr, err}
|
||||
}
|
||||
|
||||
emptyTransColl := pipeline.NewTransformCollection()
|
||||
|
||||
imp := &mvdata.DataMover{rd, emptyTransColl, wr, mvOpts.ContOnErr}
|
||||
rd = nil
|
||||
|
||||
return imp, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -16,7 +16,10 @@ package tblcmds
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/doltdb"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/table"
|
||||
"os"
|
||||
|
||||
"github.com/fatih/color"
|
||||
@@ -295,34 +298,13 @@ func (cmd ImportCmd) Exec(ctx context.Context, commandStr string, args []string,
|
||||
return commands.HandleVErrAndExitCode(verr, usage)
|
||||
}
|
||||
|
||||
cli.PrintErrln(color.CyanString("Import completed successfully."))
|
||||
verr = buildNewIndexes(ctx, dEnv, mvOpts.TableName)
|
||||
|
||||
//TODO: change this to not use the executeImport function, and instead the SQL code path, so that we don't rebuild indexes on every import
|
||||
newWorking, err := dEnv.WorkingRoot(ctx)
|
||||
if err != nil {
|
||||
return commands.HandleVErrAndExitCode(errhand.BuildDError("Unable to load the working set to build the indexes.").AddCause(err).Build(), nil)
|
||||
}
|
||||
updatedTable, ok, err := newWorking.GetTable(ctx, mvOpts.TableName)
|
||||
if err != nil {
|
||||
return commands.HandleVErrAndExitCode(errhand.BuildDError("Unable to load the table to build the indexes.").AddCause(err).Build(), nil)
|
||||
} else if !ok {
|
||||
return commands.HandleVErrAndExitCode(errhand.BuildDError("Unable to find the table to build the indexes.").Build(), nil)
|
||||
}
|
||||
updatedTable, err = updatedTable.RebuildIndexData(ctx)
|
||||
if err != nil {
|
||||
return commands.HandleVErrAndExitCode(errhand.BuildDError("Unable to build the indexes.").AddCause(err).Build(), nil)
|
||||
}
|
||||
newWorking, err = newWorking.PutTable(ctx, mvOpts.TableName, updatedTable)
|
||||
if err != nil {
|
||||
return commands.HandleVErrAndExitCode(errhand.BuildDError("Unable to write the indexes to the working set.").AddCause(err).Build(), nil)
|
||||
}
|
||||
err = dEnv.UpdateWorkingRoot(ctx, newWorking)
|
||||
if err != nil {
|
||||
return commands.HandleVErrAndExitCode(errhand.BuildDError("Unable to update the working set containing the indexes.").AddCause(err).Build(), nil)
|
||||
if verr == nil {
|
||||
cli.PrintErrln(color.CyanString("Import completed successfully."))
|
||||
}
|
||||
|
||||
cli.PrintErrln(color.CyanString("Import completed successfully."))
|
||||
return 0
|
||||
return commands.HandleVErrAndExitCode(verr, usage)
|
||||
}
|
||||
|
||||
func createArgParser() *argparser.ArgParser {
|
||||
@@ -351,6 +333,58 @@ func importStatsCB(stats types.AppliedEditStats) {
|
||||
displayStrLen = cli.DeleteAndPrint(displayStrLen, displayStr)
|
||||
}
|
||||
|
||||
|
||||
func newImportDataMover(ctx context.Context, root *doltdb.RootValue, fs filesys.Filesys, mvOpts *mvdata.MoveOptions, statsCB noms.StatsCB) (*mvdata.DataMover, *mvdata.DataMoverCreationError) {
|
||||
var rd table.TableReadCloser
|
||||
var err error
|
||||
|
||||
rd, srcIsSorted, err := mvOpts.Src.NewReader(ctx, root, fs, mvOpts.SrcOptions)
|
||||
|
||||
if err != nil {
|
||||
return nil, &mvdata.DataMoverCreationError{mvdata.CreateReaderErr, err}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if rd != nil {
|
||||
rd.Close(ctx)
|
||||
}
|
||||
}()
|
||||
|
||||
inSch := rd.GetSchema()
|
||||
outSch, err := mvdata.OutSchemaFromInSchema(ctx, inSch, root, fs, mvOpts)
|
||||
|
||||
if err != nil {
|
||||
return nil, &mvdata.DataMoverCreationError{mvdata.SchemaErr, err}
|
||||
}
|
||||
|
||||
transforms, dmce := mvdata.MaybeMapFields(inSch, outSch, fs, mvOpts)
|
||||
|
||||
if dmce != nil {
|
||||
return nil, dmce
|
||||
}
|
||||
|
||||
var wr table.TableWriteCloser
|
||||
switch mvOpts.Operation {
|
||||
case mvdata.OverwriteOp:
|
||||
wr, err = mvOpts.Dest.NewCreatingWriter(ctx, mvOpts, root, fs, srcIsSorted, outSch, statsCB)
|
||||
case mvdata.ReplaceOp:
|
||||
wr, err = mvOpts.Dest.NewReplacingWriter(ctx, mvOpts, root, fs, srcIsSorted, outSch, statsCB)
|
||||
case mvdata.UpdateOp:
|
||||
wr, err = mvOpts.Dest.NewUpdatingWriter(ctx, mvOpts, root, fs, srcIsSorted, outSch, statsCB)
|
||||
default:
|
||||
err = errors.New("invalid move operation")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, &mvdata.DataMoverCreationError{mvdata.CreateWriterErr, err}
|
||||
}
|
||||
|
||||
imp := &mvdata.DataMover{rd, transforms, wr, mvOpts.ContOnErr}
|
||||
rd = nil
|
||||
|
||||
return imp, nil
|
||||
}
|
||||
|
||||
func executeImport(ctx context.Context, dEnv *env.DoltEnv, mvOpts *mvdata.MoveOptions) errhand.VerboseError {
|
||||
root, err := dEnv.WorkingRoot(ctx)
|
||||
|
||||
@@ -367,7 +401,7 @@ func executeImport(ctx context.Context, dEnv *env.DoltEnv, mvOpts *mvdata.MoveOp
|
||||
}
|
||||
}
|
||||
|
||||
mover, nDMErr := mvdata.NewDataMover(ctx, root, dEnv.FS, mvOpts, importStatsCB)
|
||||
mover, nDMErr := newImportDataMover(ctx, root, dEnv.FS, mvOpts, importStatsCB)
|
||||
|
||||
if nDMErr != nil {
|
||||
return newDataMoverErrToVerr(mvOpts, nDMErr)
|
||||
@@ -431,6 +465,33 @@ func executeImport(ctx context.Context, dEnv *env.DoltEnv, mvOpts *mvdata.MoveOp
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildIndexes(ctx context.Context, dEnv *env.DoltEnv, newTblName string) errhand.VerboseError {
|
||||
//TODO: change this to not use the executeImport function, and instead the SQL code path, so that we don't rebuild indexes on every import
|
||||
newWorking, err := dEnv.WorkingRoot(ctx)
|
||||
if err != nil {
|
||||
return errhand.BuildDError("Unable to load the working set to build the indexes.").AddCause(err).Build()
|
||||
}
|
||||
updatedTable, ok, err := newWorking.GetTable(ctx, newTblName)
|
||||
if err != nil {
|
||||
return errhand.BuildDError("Unable to load the table to build the indexes.").AddCause(err).Build()
|
||||
} else if !ok {
|
||||
return errhand.BuildDError("Unable to find the table to build the indexes.").Build()
|
||||
}
|
||||
updatedTable, err = updatedTable.RebuildIndexData(ctx)
|
||||
if err != nil {
|
||||
return errhand.BuildDError("Unable to build the indexes.").AddCause(err).Build()
|
||||
}
|
||||
newWorking, err = newWorking.PutTable(ctx, newTblName, updatedTable)
|
||||
if err != nil {
|
||||
return errhand.BuildDError("Unable to write the indexes to the working set.").AddCause(err).Build()
|
||||
}
|
||||
err = dEnv.UpdateWorkingRoot(ctx, newWorking)
|
||||
if err != nil {
|
||||
return errhand.BuildDError("Unable to update the working set containing the indexes.").AddCause(err).Build()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func newDataMoverErrToVerr(mvOpts *mvdata.MoveOptions, err *mvdata.DataMoverCreationError) errhand.VerboseError {
|
||||
switch err.ErrType {
|
||||
case mvdata.CreateReaderErr:
|
||||
|
||||
@@ -27,7 +27,6 @@ import (
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/sqle"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/table"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/table/pipeline"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/table/typed/noms"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/utils/filesys"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/utils/funcitr"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/utils/set"
|
||||
@@ -37,7 +36,7 @@ import (
|
||||
type MoveOperation string
|
||||
|
||||
const (
|
||||
OverwriteOp MoveOperation = "overwrite"
|
||||
OverwriteOp MoveOperation = "overwrite" // todo: make CreateOp?
|
||||
ReplaceOp MoveOperation = "replace"
|
||||
UpdateOp MoveOperation = "update"
|
||||
InvalidOp MoveOperation = "invalid"
|
||||
@@ -123,159 +122,6 @@ func (dmce *DataMoverCreationError) String() string {
|
||||
return string(dmce.ErrType) + ": " + dmce.Cause.Error()
|
||||
}
|
||||
|
||||
func NewDataMover(ctx context.Context, root *doltdb.RootValue, fs filesys.Filesys, mvOpts *MoveOptions, statsCB noms.StatsCB) (*DataMover, *DataMoverCreationError) {
|
||||
var rd table.TableReadCloser
|
||||
var err error
|
||||
|
||||
rd, srcIsSorted, err := mvOpts.Src.NewReader(ctx, root, fs, mvOpts.SrcOptions)
|
||||
|
||||
if err != nil {
|
||||
return nil, &DataMoverCreationError{CreateReaderErr, err}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if rd != nil {
|
||||
rd.Close(ctx)
|
||||
}
|
||||
}()
|
||||
|
||||
inSch := rd.GetSchema()
|
||||
outSch, err := outSchemaFromInSchema(ctx, inSch, root, fs, mvOpts)
|
||||
|
||||
if err != nil {
|
||||
return nil, &DataMoverCreationError{SchemaErr, err}
|
||||
}
|
||||
|
||||
transforms, dmce := maybeMapFields(inSch, outSch, fs, mvOpts)
|
||||
|
||||
if dmce != nil {
|
||||
return nil, dmce
|
||||
}
|
||||
|
||||
var wr table.TableWriteCloser
|
||||
switch mvOpts.Operation {
|
||||
case OverwriteOp:
|
||||
wr, err = mvOpts.Dest.NewCreatingWriter(ctx, mvOpts, root, fs, srcIsSorted, outSch, statsCB)
|
||||
case ReplaceOp:
|
||||
wr, err = mvOpts.Dest.NewReplacingWriter(ctx, mvOpts, root, fs, srcIsSorted, outSch, statsCB)
|
||||
case UpdateOp:
|
||||
wr, err = mvOpts.Dest.NewUpdatingWriter(ctx, mvOpts, root, fs, srcIsSorted, outSch, statsCB)
|
||||
default:
|
||||
err = errors.New("invalid move operation")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, &DataMoverCreationError{CreateWriterErr, err}
|
||||
}
|
||||
|
||||
imp := &DataMover{rd, transforms, wr, mvOpts.ContOnErr}
|
||||
rd = nil
|
||||
|
||||
return imp, nil
|
||||
}
|
||||
|
||||
func NewExportDataMover(ctx context.Context, root *doltdb.RootValue, fs filesys.Filesys, mvOpts *MoveOptions, statsCB noms.StatsCB) (*DataMover, *DataMoverCreationError) {
|
||||
var rd table.TableReadCloser
|
||||
var err error
|
||||
|
||||
rd, srcIsSorted, err := mvOpts.Src.NewReader(ctx, root, fs, mvOpts.SrcOptions)
|
||||
|
||||
if err != nil {
|
||||
return nil, &DataMoverCreationError{CreateReaderErr, err}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if rd != nil {
|
||||
rd.Close(ctx)
|
||||
}
|
||||
}()
|
||||
|
||||
inSch := rd.GetSchema()
|
||||
outSch, err := outSchemaFromInSchema(ctx, inSch, root, fs, mvOpts)
|
||||
|
||||
if err != nil {
|
||||
return nil, &DataMoverCreationError{SchemaErr, err}
|
||||
}
|
||||
|
||||
transforms, dmce := maybeMapFields(inSch, outSch, fs, mvOpts)
|
||||
|
||||
if dmce != nil {
|
||||
return nil, dmce
|
||||
}
|
||||
|
||||
var wr table.TableWriteCloser
|
||||
switch mvOpts.Operation {
|
||||
case OverwriteOp:
|
||||
wr, err = mvOpts.Dest.NewCreatingWriter(ctx, mvOpts, root, fs, srcIsSorted, outSch, statsCB)
|
||||
case ReplaceOp:
|
||||
wr, err = mvOpts.Dest.NewReplacingWriter(ctx, mvOpts, root, fs, srcIsSorted, outSch, statsCB)
|
||||
case UpdateOp:
|
||||
wr, err = mvOpts.Dest.NewUpdatingWriter(ctx, mvOpts, root, fs, srcIsSorted, outSch, statsCB)
|
||||
default:
|
||||
err = errors.New("invalid move operation")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, &DataMoverCreationError{CreateWriterErr, err}
|
||||
}
|
||||
|
||||
imp := &DataMover{rd, transforms, wr, mvOpts.ContOnErr}
|
||||
rd = nil
|
||||
|
||||
return imp, nil
|
||||
}
|
||||
|
||||
func NewTableCopyDataMover(ctx context.Context, root *doltdb.RootValue, fs filesys.Filesys, mvOpts *MoveOptions, statsCB noms.StatsCB) (*DataMover, *DataMoverCreationError) {
|
||||
var rd table.TableReadCloser
|
||||
var err error
|
||||
|
||||
rd, srcIsSorted, err := mvOpts.Src.NewReader(ctx, root, fs, mvOpts.SrcOptions)
|
||||
|
||||
if err != nil {
|
||||
return nil, &DataMoverCreationError{CreateReaderErr, err}
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if rd != nil {
|
||||
rd.Close(ctx)
|
||||
}
|
||||
}()
|
||||
|
||||
inSch := rd.GetSchema()
|
||||
outSch, err := outSchemaFromInSchema(ctx, inSch, root, fs, mvOpts)
|
||||
|
||||
if err != nil {
|
||||
return nil, &DataMoverCreationError{SchemaErr, err}
|
||||
}
|
||||
|
||||
transforms, dmce := maybeMapFields(inSch, outSch, fs, mvOpts)
|
||||
|
||||
if dmce != nil {
|
||||
return nil, dmce
|
||||
}
|
||||
|
||||
var wr table.TableWriteCloser
|
||||
switch mvOpts.Operation {
|
||||
case OverwriteOp:
|
||||
wr, err = mvOpts.Dest.NewCreatingWriter(ctx, mvOpts, root, fs, srcIsSorted, outSch, statsCB)
|
||||
case ReplaceOp:
|
||||
wr, err = mvOpts.Dest.NewReplacingWriter(ctx, mvOpts, root, fs, srcIsSorted, outSch, statsCB)
|
||||
case UpdateOp:
|
||||
wr, err = mvOpts.Dest.NewUpdatingWriter(ctx, mvOpts, root, fs, srcIsSorted, outSch, statsCB)
|
||||
default:
|
||||
err = errors.New("invalid move operation")
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, &DataMoverCreationError{CreateWriterErr, err}
|
||||
}
|
||||
|
||||
imp := &DataMover{rd, transforms, wr, mvOpts.ContOnErr}
|
||||
rd = nil
|
||||
|
||||
return imp, nil
|
||||
}
|
||||
|
||||
// Move is the method that executes the pipeline which will move data from the pipeline's source DataLocation to it's
|
||||
// dest DataLocation. It returns the number of bad rows encountered during import, and an error.
|
||||
func (imp *DataMover) Move(ctx context.Context) (badRowCount int64, err error) {
|
||||
@@ -314,7 +160,8 @@ func (imp *DataMover) Move(ctx context.Context) (badRowCount int64, err error) {
|
||||
return badCount, nil
|
||||
}
|
||||
|
||||
func maybeMapFields(inSch schema.Schema, outSch schema.Schema, fs filesys.Filesys, mvOpts *MoveOptions) (*pipeline.TransformCollection, *DataMoverCreationError) {
|
||||
// todo: break this into seperate paths for import and copy
|
||||
func MaybeMapFields(inSch schema.Schema, outSch schema.Schema, fs filesys.Filesys, mvOpts *MoveOptions) (*pipeline.TransformCollection, *DataMoverCreationError) {
|
||||
var mapping *rowconv.FieldMapping
|
||||
var err error
|
||||
if mvOpts.MappingFile != "" {
|
||||
@@ -349,7 +196,7 @@ func maybeMapFields(inSch schema.Schema, outSch schema.Schema, fs filesys.Filesy
|
||||
return transforms, nil
|
||||
}
|
||||
|
||||
func outSchemaFromInSchema(ctx context.Context, inSch schema.Schema, root *doltdb.RootValue, fs filesys.ReadableFS, mvOpts *MoveOptions) (schema.Schema, error) {
|
||||
func OutSchemaFromInSchema(ctx context.Context, inSch schema.Schema, root *doltdb.RootValue, fs filesys.ReadableFS, mvOpts *MoveOptions) (schema.Schema, error) {
|
||||
var err error
|
||||
outSch := inSch
|
||||
|
||||
@@ -368,7 +215,7 @@ func outSchemaFromInSchema(ctx context.Context, inSch schema.Schema, root *doltd
|
||||
|
||||
if mvOpts.SchFile != "" {
|
||||
var tn string
|
||||
tn, outSch, err = schAndTableNameFromFile(ctx, mvOpts.SchFile, fs, root)
|
||||
tn, outSch, err = SchAndTableNameFromFile(ctx, mvOpts.SchFile, fs, root)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -389,7 +236,7 @@ func outSchemaFromInSchema(ctx context.Context, inSch schema.Schema, root *doltd
|
||||
return outSch, nil
|
||||
}
|
||||
|
||||
func schAndTableNameFromFile(ctx context.Context, path string, fs filesys.ReadableFS, root *doltdb.RootValue) (string, schema.Schema, error) {
|
||||
func SchAndTableNameFromFile(ctx context.Context, path string, fs filesys.ReadableFS, root *doltdb.RootValue) (string, schema.Schema, error) {
|
||||
if path != "" {
|
||||
data, err := fs.ReadFile(path)
|
||||
|
||||
|
||||
@@ -14,146 +14,135 @@
|
||||
|
||||
package mvdata
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/schema/encoding"
|
||||
"github.com/liquidata-inc/dolt/go/libraries/doltcore/table"
|
||||
)
|
||||
|
||||
const (
|
||||
schemaFile = "schema.json"
|
||||
mappingFile = "mapping.json"
|
||||
)
|
||||
|
||||
func TestDataMover(t *testing.T) {
|
||||
// todo: add expected schema
|
||||
tests := []struct {
|
||||
sqlSchema string
|
||||
mappingJSON string
|
||||
mvOpts *MoveOptions
|
||||
}{
|
||||
{
|
||||
"",
|
||||
"",
|
||||
&MoveOptions{
|
||||
Operation: OverwriteOp,
|
||||
TableName: "testable",
|
||||
ContOnErr: false,
|
||||
SchFile: "",
|
||||
MappingFile: "",
|
||||
PrimaryKey: "",
|
||||
Src: NewDataLocation("data.csv", ""),
|
||||
Dest: NewDataLocation("data.psv", "psv")},
|
||||
},
|
||||
/*{
|
||||
"",
|
||||
"",
|
||||
&MoveOptions{
|
||||
Operation: OverwriteOp,
|
||||
ContOnErr: false,
|
||||
SchFile: "",
|
||||
MappingFile: "",
|
||||
PrimaryKey: "a",
|
||||
Src: NewDataLocation("data.csv", ""),
|
||||
Dest: NewDataLocation("data.nbf", "")},
|
||||
},
|
||||
{
|
||||
"",
|
||||
"",
|
||||
&MoveOptions{
|
||||
Operation: OverwriteOp,
|
||||
ContOnErr: false,
|
||||
SchFile: "",
|
||||
MappingFile: "",
|
||||
PrimaryKey: "",
|
||||
Src: NewDataLocation("data.nbf", "nbf"),
|
||||
Dest: NewDataLocation("table-name", "")},
|
||||
},*/
|
||||
{
|
||||
"",
|
||||
"",
|
||||
&MoveOptions{
|
||||
Operation: OverwriteOp,
|
||||
TableName: "table-name",
|
||||
ContOnErr: false,
|
||||
SchFile: "",
|
||||
MappingFile: "",
|
||||
PrimaryKey: "a",
|
||||
Src: NewDataLocation("data.csv", ""),
|
||||
Dest: NewDataLocation("table-name", "")},
|
||||
},
|
||||
{
|
||||
`CREATE TABLE table_name (
|
||||
pk VARCHAR(120) COMMENT 'tag:0',
|
||||
value INT COMMENT 'tag:1',
|
||||
PRIMARY KEY (pk)
|
||||
);`,
|
||||
`{"a":"pk","b":"value"}`,
|
||||
&MoveOptions{
|
||||
Operation: OverwriteOp,
|
||||
TableName: "table_name",
|
||||
ContOnErr: false,
|
||||
SchFile: "",
|
||||
MappingFile: "",
|
||||
PrimaryKey: "",
|
||||
Src: NewDataLocation("data.csv", ""),
|
||||
Dest: NewDataLocation("table_name", "")},
|
||||
},
|
||||
}
|
||||
|
||||
for idx, test := range tests {
|
||||
fmt.Println(idx)
|
||||
|
||||
var err error
|
||||
_, root, fs := createRootAndFS()
|
||||
|
||||
if test.sqlSchema != "" {
|
||||
test.mvOpts.SchFile = schemaFile
|
||||
err = fs.WriteFile(schemaFile, []byte(test.sqlSchema))
|
||||
}
|
||||
|
||||
if test.mappingJSON != "" {
|
||||
test.mvOpts.MappingFile = mappingFile
|
||||
err = fs.WriteFile(mappingFile, []byte(test.mappingJSON))
|
||||
}
|
||||
|
||||
src := test.mvOpts.Src
|
||||
|
||||
seedWr, err := src.NewCreatingWriter(context.Background(), test.mvOpts, root, fs, true, fakeSchema, nil)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
imtRd := table.NewInMemTableReader(imt)
|
||||
|
||||
_, _, err = table.PipeRows(context.Background(), imtRd, seedWr, false)
|
||||
seedWr.Close(context.Background())
|
||||
imtRd.Close(context.Background())
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
encoding.UnmarshalJson(test.sqlSchema)
|
||||
|
||||
dm, crDMErr := NewDataMover(context.Background(), root, fs, test.mvOpts, nil)
|
||||
|
||||
if crDMErr != nil {
|
||||
t.Fatal(crDMErr.String())
|
||||
}
|
||||
|
||||
var badCount int64
|
||||
badCount, err = dm.Move(context.Background())
|
||||
assert.Equal(t, int64(0), badCount)
|
||||
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
//
|
||||
//func TestDataMover(t *testing.T) {
|
||||
// // todo: add expected schema
|
||||
// tests := []struct {
|
||||
// sqlSchema string
|
||||
// mappingJSON string
|
||||
// mvOpts *MoveOptions
|
||||
// }{
|
||||
// {
|
||||
// "",
|
||||
// "",
|
||||
// &MoveOptions{
|
||||
// Operation: OverwriteOp,
|
||||
// TableName: "testable",
|
||||
// ContOnErr: false,
|
||||
// SchFile: "",
|
||||
// MappingFile: "",
|
||||
// PrimaryKey: "",
|
||||
// Src: NewDataLocation("data.csv", ""),
|
||||
// Dest: NewDataLocation("data.psv", "psv")},
|
||||
// },
|
||||
// /*{
|
||||
// "",
|
||||
// "",
|
||||
// &MoveOptions{
|
||||
// Operation: OverwriteOp,
|
||||
// ContOnErr: false,
|
||||
// SchFile: "",
|
||||
// MappingFile: "",
|
||||
// PrimaryKey: "a",
|
||||
// Src: NewDataLocation("data.csv", ""),
|
||||
// Dest: NewDataLocation("data.nbf", "")},
|
||||
// },
|
||||
// {
|
||||
// "",
|
||||
// "",
|
||||
// &MoveOptions{
|
||||
// Operation: OverwriteOp,
|
||||
// ContOnErr: false,
|
||||
// SchFile: "",
|
||||
// MappingFile: "",
|
||||
// PrimaryKey: "",
|
||||
// Src: NewDataLocation("data.nbf", "nbf"),
|
||||
// Dest: NewDataLocation("table-name", "")},
|
||||
// },*/
|
||||
// {
|
||||
// "",
|
||||
// "",
|
||||
// &MoveOptions{
|
||||
// Operation: OverwriteOp,
|
||||
// TableName: "table-name",
|
||||
// ContOnErr: false,
|
||||
// SchFile: "",
|
||||
// MappingFile: "",
|
||||
// PrimaryKey: "a",
|
||||
// Src: NewDataLocation("data.csv", ""),
|
||||
// Dest: NewDataLocation("table-name", "")},
|
||||
// },
|
||||
// {
|
||||
// `CREATE TABLE table_name (
|
||||
//pk VARCHAR(120) COMMENT 'tag:0',
|
||||
//value INT COMMENT 'tag:1',
|
||||
//PRIMARY KEY (pk)
|
||||
//);`,
|
||||
// `{"a":"pk","b":"value"}`,
|
||||
// &MoveOptions{
|
||||
// Operation: OverwriteOp,
|
||||
// TableName: "table_name",
|
||||
// ContOnErr: false,
|
||||
// SchFile: "",
|
||||
// MappingFile: "",
|
||||
// PrimaryKey: "",
|
||||
// Src: NewDataLocation("data.csv", ""),
|
||||
// Dest: NewDataLocation("table_name", "")},
|
||||
// },
|
||||
// }
|
||||
//
|
||||
// for idx, test := range tests {
|
||||
// fmt.Println(idx)
|
||||
//
|
||||
// var err error
|
||||
// _, root, fs := createRootAndFS()
|
||||
//
|
||||
// if test.sqlSchema != "" {
|
||||
// test.mvOpts.SchFile = schemaFile
|
||||
// err = fs.WriteFile(schemaFile, []byte(test.sqlSchema))
|
||||
// }
|
||||
//
|
||||
// if test.mappingJSON != "" {
|
||||
// test.mvOpts.MappingFile = mappingFile
|
||||
// err = fs.WriteFile(mappingFile, []byte(test.mappingJSON))
|
||||
// }
|
||||
//
|
||||
// src := test.mvOpts.Src
|
||||
//
|
||||
// seedWr, err := src.NewCreatingWriter(context.Background(), test.mvOpts, root, fs, true, fakeSchema, nil)
|
||||
//
|
||||
// if err != nil {
|
||||
// t.Fatal(err.Error())
|
||||
// }
|
||||
//
|
||||
// imtRd := table.NewInMemTableReader(imt)
|
||||
//
|
||||
// _, _, err = table.PipeRows(context.Background(), imtRd, seedWr, false)
|
||||
// seedWr.Close(context.Background())
|
||||
// imtRd.Close(context.Background())
|
||||
//
|
||||
// if err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
//
|
||||
// encoding.UnmarshalJson(test.sqlSchema)
|
||||
//
|
||||
// dm, crDMErr := tblcmds.newImportDataMover(context.Background(), root, fs, test.mvOpts, nil)
|
||||
//
|
||||
// if crDMErr != nil {
|
||||
// t.Fatal(crDMErr.String())
|
||||
// }
|
||||
//
|
||||
// var badCount int64
|
||||
// badCount, err = dm.Move(context.Background())
|
||||
// assert.Equal(t, int64(0), badCount)
|
||||
//
|
||||
// if err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
|
||||
@@ -109,7 +109,7 @@ func (dl FileDataLocation) NewReader(ctx context.Context, root *doltdb.RootValue
|
||||
var sch schema.Schema
|
||||
jsonOpts, _ := opts.(JSONOptions)
|
||||
if jsonOpts.SchFile != "" {
|
||||
tn, s, err := schAndTableNameFromFile(ctx, jsonOpts.SchFile, fs, root)
|
||||
tn, s, err := SchAndTableNameFromFile(ctx, jsonOpts.SchFile, fs, root)
|
||||
if err != nil {
|
||||
return nil, false, err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user