Merge pull request #86 from liquidata-inc/taylor/table-import-replace

dolt table import -r
This commit is contained in:
Taylor Bantle
2019-09-20 17:23:55 -07:00
committed by GitHub
23 changed files with 460 additions and 12 deletions

View File

@@ -0,0 +1,20 @@
{
"rows": [
{
"pk": 0,
"c1": 1,
"c2": 2,
"c3": 3,
"c4": 4,
"c5": 5
},
{
"pk": 1,
"c1": 1,
"c2": 2,
"c3": 3,
"c4": 4,
"c5": 5
}
]
}

View File

@@ -0,0 +1,50 @@
{
"columns": [
{
"name": "id",
"kind": "string",
"tag": 0,
"is_part_of_pk": true,
"col_constraints": [
{
"constraint_type": "not_null"
}
]
},
{
"name": "first",
"kind": "string",
"tag": 1,
"is_part_of_pk": false,
"col_constraints": []
},
{
"name": "last",
"kind": "string",
"tag": 2,
"is_part_of_pk": false,
"col_constraints": []
},
{
"name": "title",
"kind": "string",
"tag": 3,
"is_part_of_pk": false,
"col_constraints": []
},
{
"name": "start date",
"kind": "string",
"tag": 4,
"is_part_of_pk": false,
"col_constraints": []
},
{
"name": "end date",
"kind": "string",
"tag": 5,
"is_part_of_pk": false,
"col_constraints": []
}
]
}

View File

@@ -0,0 +1,36 @@
{
"rows": [
{
"id": 0,
"first name": "tim",
"last name": "sehn",
"title": "ceo",
"start date": "",
"end date": ""
},
{
"id": 1,
"first name": "aaron",
"last name": "son",
"title": "founder",
"start date": "",
"end date": ""
},
{
"id": 2,
"first name": "brian",
"last name": "hendricks",
"title": "founder",
"start date": "",
"end date": ""
},
{
"id": 3,
"first name": "matt",
"last name": "jesuele",
"title": "software engineer",
"start date": "",
"end date": ""
}
]
}

View File

@@ -0,0 +1,3 @@
id, first name, last name, position
0, "tim", "sehn", "ceo"
1, "aaron", "son", "founder"
1 id first name last name position
2 0 tim sehn ceo
3 1 aaron son founder

View File

@@ -0,0 +1,28 @@
{
"rows": [
{
"id": "0",
"first name": "tim",
"last name": "sehn",
"position": "ceo"
},
{
"id": "1",
"first name": "aaron",
"last name": "son",
"position": "founder"
},
{
"id": "2",
"first name": "brian",
"last name": "hendricks",
"position": "founder"
},
{
"id": "3",
"first name": "matt",
"last name": "jesuele",
"position": "software engineer"
}
]
}

158
bats/replace-tables.bats Normal file
View File

@@ -0,0 +1,158 @@
#!/usr/bin/env bats
setup() {
load $BATS_TEST_DIRNAME/helper/common.bash
export PATH=$PATH:~/go/bin
export NOMS_VERSION_NEXT=1
cd $BATS_TMPDIR
mkdir "dolt-repo-$$"
cd "dolt-repo-$$"
dolt init
}
teardown() {
rm -rf "$BATS_TMPDIR/dolt-repo-$$"
}
@test "replace table using csv" {
run dolt table create -s `batshelper 1pk5col-ints.schema` test
[ "$status" -eq 0 ]
run dolt table import -r test `batshelper 1pk5col-ints.csv`
[ "$status" -eq 0 ]
[[ "$output" =~ "Rows Processed: 2, Additions: 2, Modifications: 0, Had No Effect: 0" ]] || false
[[ "$output" =~ "Import completed successfully." ]] || false
}
@test "replace table using csv with wrong schema" {
run dolt table create -s `batshelper 1pk5col-ints.schema` test
[ "$status" -eq 0 ]
run dolt table import -r test `batshelper 2pk5col-ints.csv`
[ "$status" -eq 1 ]
[[ "$output" =~ "Error replacing table" ]] || false
[[ "$output" =~ "cause: Schema from file does not match schema from existing table." ]] || false
}
@test "replace table using psv" {
run dolt table create -s `batshelper 1pk5col-ints.schema` test
[ "$status" -eq 0 ]
run dolt table import -r test `batshelper 1pk5col-ints.psv`
[ "$status" -eq 0 ]
[[ "$output" =~ "Rows Processed: 2, Additions: 2, Modifications: 0, Had No Effect: 0" ]] || false
[[ "$output" =~ "Import completed successfully." ]] || false
}
@test "replace table using psv with wrong schema" {
run dolt table create -s `batshelper 2pk5col-ints.schema` test
[ "$status" -eq 0 ]
run dolt table import -r test `batshelper 1pk5col-ints.psv`
[ "$status" -eq 1 ]
[[ "$output" =~ "Error replacing table" ]] || false
[[ "$output" =~ "cause: Schema from file does not match schema from existing table." ]] || false
}
@test "replace table using schema with csv" {
run dolt table create -s `batshelper 1pk5col-ints.schema` test
[ "$status" -eq 0 ]
run dolt table import -r -s `batshelper 1pk5col-ints.schema` test `batshelper 1pk5col-ints.csv`
[ "$status" -eq 1 ]
[[ "$output" =~ "schema is not supported for update or replace operations" ]] || false
}
@test "replace table using json" {
run dolt table create -s `batshelper employees-sch.json` employees
[ "$status" -eq 0 ]
run dolt table import -r employees `batshelper employees-tbl.json`
[ "$status" -eq 0 ]
[[ "$output" =~ "Rows Processed: 3, Additions: 3, Modifications: 0, Had No Effect: 0" ]] || false
[[ "$output" =~ "Import completed successfully." ]] || false
}
@test "replace table using json with wrong schema" {
run dolt table create -s `batshelper employees-sch-wrong.json` employees
[ "$status" -eq 0 ]
run dolt table import -r employees `batshelper employees-tbl.json`
[ "$status" -eq 1 ]
[[ "$output" =~ "Error replacing table" ]] || false
[[ "$output" =~ "cause: Schema from file does not match schema from existing table." ]] || false
}
@test "replace table using schema with json" {
run dolt table create -s `batshelper employees-sch-wrong.json` employees
[ "$status" -eq 0 ]
run dolt table import -r -s `batshelper employees-sch.json` employees `batshelper employees-tbl.json`
[ "$status" -eq 1 ]
[[ "$output" =~ "fatal: schema is not supported for update or replace operations" ]] || false
}
@test "replace table with json when table does not exist" {
run dolt table import -r employees `batshelper employees-tbl.json`
[ "$status" -eq 1 ]
[[ "$output" =~ "The following table could not be found:" ]] || false
}
@test "replace table with existing data using json" {
run dolt table import -c -s `batshelper employees-sch.json` employees `batshelper employees-tbl.json`
[ "$status" -eq 0 ]
[[ "$output" =~ "Import completed successfully." ]] || false
run dolt table import -r employees `batshelper employees-tbl-new.json`
[ "$status" -eq 0 ]
[[ "$output" =~ "Rows Processed: 4, Additions: 4, Modifications: 0, Had No Effect: 0" ]] || false
[[ "$output" =~ "Import completed successfully." ]] || false
}
@test "replace table with existing data with different schema" {
run dolt table import -c -s `batshelper employees-sch.json` employees `batshelper employees-tbl.json`
[ "$status" -eq 0 ]
[[ "$output" =~ "Import completed successfully." ]] || false
run dolt table import -r employees `batshelper employees-tbl-schema-wrong.json`
[ "$status" -eq 1 ]
[[ "$output" =~ "Error replacing table" ]] || false
[[ "$output" =~ "cause: Schema from file does not match schema from existing table." ]] || false
}
@test "replace table with bad json" {
run dolt table create -s `batshelper employees-sch.json` employees
[ "$status" -eq 0 ]
run dolt table import -r employees `batshelper employees-tbl-bad.json`
[ "$status" -eq 1 ]
[[ "$output" =~ "Error creating reader" ]] || false
[[ "$output" =~ "employees-tbl-bad.json to" ]] || false
}
@test "replace table using xlsx file" {
run dolt table create -s `batshelper employees-sch-2.json` employees
[ "$status" -eq 0 ]
run dolt table import -r employees `batshelper employees.xlsx`
[ "$status" -eq 0 ]
[[ "$output" =~ "Rows Processed: 3, Additions: 3, Modifications: 0, Had No Effect: 0" ]] || false
[[ "$output" =~ "Import completed successfully." ]] || false
}
@test "replace table using xlsx file with wrong schema" {
run dolt table create -s `batshelper employees-sch.json` employees
[ "$status" -eq 0 ]
run dolt table import -r employees `batshelper employees.xlsx`
[ "$status" -eq 1 ]
[[ "$output" =~ "Error replacing table" ]] || false
[[ "$output" =~ "cause: Schema from file does not match schema from existing table." ]] || false
}
@test "replace table with 2 primary keys with a csv with one primary key" {
run dolt table import -c --pk=pk1,pk2 test `batshelper 2pk5col-ints.csv`
[ "$status" -eq 0 ]
[[ "$output" =~ "Import completed successfully." ]] || false
run dolt table import -r test `batshelper 1pk5col-ints.csv`
[ "$status" -eq 1 ]
[[ "$output" =~ "Error replacing table" ]] || false
[[ "$output" =~ "cause: Schema from file does not match schema from existing table." ]] || false
}
@test "replace table with 2 primary keys with a csv with 2 primary keys" {
run dolt table import -c --pk=pk1,pk2 test `batshelper 2pk5col-ints.csv`
[ "$status" -eq 0 ]
[[ "$output" =~ "Import completed successfully." ]] || false
run dolt table import -r test `batshelper 2pk5col-ints.csv`
[ "$status" -eq 0 ]
[[ "$output" =~ "Rows Processed: 4, Additions: 4, Modifications: 0, Had No Effect: 0" ]] || false
[[ "$output" =~ "Import completed successfully." ]] || false
}

View File

@@ -28,7 +28,7 @@ teardown() {
[ "$status" -eq 0 ]
run dolt table import -u -s `batshelper 1pk5col-ints.schema` test `batshelper 1pk5col-ints.csv`
[ "$status" -eq 1 ]
[[ "$output" =~ "schema is not supported for update operations" ]] || false
[[ "$output" =~ "fatal: schema is not supported for update or replace operations" ]] || false
}
@test "update table using csv with newlines" {
@@ -61,7 +61,16 @@ teardown() {
[ "$status" -eq 0 ]
run dolt table import -u -s `batshelper employees-sch.json` employees `batshelper employees-tbl.json`
[ "$status" -eq 1 ]
[[ "$output" =~ "schema is not supported for update operations" ]] || false
[[ "$output" =~ "fatal: schema is not supported for update or replace operations" ]] || false
}
@test "update table with existing imported data with different schema" {
run dolt table import -c -s `batshelper employees-sch.json` employees `batshelper employees-tbl.json`
[ "$status" -eq 0 ]
[[ "$output" =~ "Import completed successfully." ]] || false
run dolt table import -u employees `batshelper employees-tbl-schema-wrong.json`
[ "$status" -eq 0 ]
[[ "$output" =~ "Rows Processed: 0, Additions: 0, Modifications: 0, Had No Effect: 0" ]] || false
}
@test "update table with json when table does not exist" {

View File

@@ -37,6 +37,7 @@ import (
const (
createParam = "create-table"
updateParam = "update-table"
replaceParam = "replace-table"
tableParam = "table"
fileParam = "file"
outSchemaParam = "schema"
@@ -93,12 +94,20 @@ schema will be used, and field names will be used to match file fields with tabl
During import, if there is an error importing any row, the import will be aborted by default. Use the <b>--continue</b>
flag to continue importing when an error is encountered.
If <b>--replace-table | -r</b> is given the operation will replace <table> with the contents of the file. The table's
existing schema will be used, and field names will be used to match file fields with table fields unless a mapping file is
specified.
If the schema for the existing table does not match the schema for the new file, the import will be aborted by default. To
overwrite both the table and the schema, use <b>-c -f</b>.
A mapping file can be used to map fields between the file being imported and the table being written to. This can
be used when creating a new table, or updating an existing table.
be used when creating a new table, or updating or replacing an existing table.
` + mappingFileHelp +
`
In both create and update scenarios the file's extension is used to infer the type of the file. If a file does not
In create, update, and replace scenarios the file's extension is used to infer the type of the file. If a file does not
have the expected extension then the <b>--file-type</b> parameter should be used to explicitly define the format of
the file in one of the supported formats (csv, psv, nbf, json, xlsx). For files separated by a delimiter other than a
',' (type csv) or a '|' (type psv), the --delim parameter can be used to specify a delimeter`
@@ -106,6 +115,7 @@ the file in one of the supported formats (csv, psv, nbf, json, xlsx). For files
var importSynopsis = []string{
"-c [-f] [--pk <field>] [--schema <file>] [--map <file>] [--continue] [--file-type <type>] <table> <file>",
"-u [--map <file>] [--continue] [--file-type <type>] <table> <file>",
"-r [--map <file>] [--file-type <type>] <table> <file>",
}
func validateImportArgs(apr *argparser.ArgParseResults, usage cli.UsagePrinter) (mvdata.MoveOperation, mvdata.TableDataLocation, mvdata.DataLocation, interface{}) {
@@ -116,15 +126,19 @@ func validateImportArgs(apr *argparser.ArgParseResults, usage cli.UsagePrinter)
var mvOp mvdata.MoveOperation
var srcOpts interface{}
if !apr.Contains(createParam) && !apr.Contains(updateParam) {
cli.PrintErrln("Must include '-c' for initial table import or -u to update existing table.")
if !apr.Contains(createParam) && !apr.Contains(updateParam) && !apr.Contains(replaceParam) {
cli.PrintErrln("Must include '-c' for initial table import or -u to update existing table or -r to replace existing table.")
return mvdata.InvalidOp, mvdata.TableDataLocation{}, nil, nil
} else if apr.Contains(createParam) {
mvOp = mvdata.OverwriteOp
} else {
mvOp = mvdata.UpdateOp
if apr.Contains(replaceParam) {
mvOp = mvdata.ReplaceOp
} else {
mvOp = mvdata.UpdateOp
}
if apr.Contains(outSchemaParam) {
cli.PrintErrln("fatal:", outSchemaParam+" is not supported for update operations")
cli.PrintErrln("fatal:", outSchemaParam+" is not supported for update or replace operations")
usage()
return mvdata.InvalidOp, mvdata.TableDataLocation{}, nil, nil
}
@@ -250,6 +264,7 @@ func createArgParser() *argparser.ArgParser {
ap.SupportsFlag(createParam, "c", "Create a new table, or overwrite an existing table (with the -f flag) from the imported data.")
ap.SupportsFlag(updateParam, "u", "Update an existing table with the imported data.")
ap.SupportsFlag(forceParam, "f", "If a create operation is being executed, data already exists in the destination, the Force flag will allow the target to be overwritten.")
ap.SupportsFlag(replaceParam, "r", "Replace existing table with imported data while preserving the original schema.")
ap.SupportsFlag(contOnErrParam, "", "Continue importing when row import errors are encountered.")
ap.SupportsString(outSchemaParam, "s", "schema_file", "The schema for the output data.")
ap.SupportsString(mappingFileParam, "m", "mapping_file", "A file that lays out how fields should be mapped from input data to output data.")
@@ -371,6 +386,11 @@ func newDataMoverErrToVerr(mvOpts *mvdata.MoveOptions, err *mvdata.DataMoverCrea
bdr.AddDetails(`Mapping File: "%s"`, mvOpts.MappingFile)
return bdr.AddCause(err.Cause).Build()
case mvdata.ReplacingErr:
bdr := errhand.BuildDError("Error replacing table")
bdr.AddDetails("When attempting to replace data with %s, could not validate schema.", mvOpts.Src.String())
return bdr.AddCause(err.Cause).Build()
case mvdata.CreateMapperErr:
bdr := errhand.BuildDError("Error creating input to output mapper.")
details := fmt.Sprintf("When attempting to move data from %s to %s, could not create a mapper.", mvOpts.Src.String(), mvOpts.Dest.String())

View File

@@ -20,7 +20,7 @@ import (
)
var Commands = cli.GenSubCommandHandler([]*cli.Command{
{Name: "import", Desc: "Creates, overwrites, or updates a table from the data in a file.", Func: Import, ReqRepo: true, EventType: eventsapi.ClientEventType_TABLE_IMPORT},
{Name: "import", Desc: "Creates, overwrites, replaces, or updates a table from the data in a file.", Func: Import, ReqRepo: true, EventType: eventsapi.ClientEventType_TABLE_IMPORT},
{Name: "export", Desc: "Export a table to a file.", Func: Export, ReqRepo: true, EventType: eventsapi.ClientEventType_TABLE_EXPORT},
{Name: "create", Desc: "Creates or overwrite an existing table with an empty table.", Func: Create, ReqRepo: true, EventType: eventsapi.ClientEventType_TABLE_CREATE},
{Name: "rm", Desc: "Deletes a table", Func: Rm, ReqRepo: true, EventType: eventsapi.ClientEventType_TABLE_RM},

View File

@@ -194,12 +194,15 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.1.1 h1:sJZmqHoEaY7f+NPP8pgLB/WxulyR3fewgCM2qaSlBb4=
github.com/lib/pq v1.1.1/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/liquidata-inc/dolt v0.9.8 h1:uup9UA59oSvYsqYd9A3zF0CFXUnF1qI8odE+Cxtn+pg=
github.com/liquidata-inc/go-mysql-server v0.4.1-0.20190821121850-0e0249cf7bc0 h1:PfmOkSsLcIete4/6vhuJg020i4BEHLaHnEhvclbFxS8=
github.com/liquidata-inc/go-mysql-server v0.4.1-0.20190821121850-0e0249cf7bc0/go.mod h1:DdWE0ku/mNfuLsRJIrHeHpDtB7am+6oopxEsQKmVkx8=
github.com/liquidata-inc/go-mysql-server v0.4.1-0.20190911213247-6dfc00bbb116 h1:1sjwK2ofL06EFX8f0wbA/RW2Y2C3su42YBg/O4Rn9bg=
github.com/liquidata-inc/go-mysql-server v0.4.1-0.20190911213247-6dfc00bbb116/go.mod h1:DdWE0ku/mNfuLsRJIrHeHpDtB7am+6oopxEsQKmVkx8=
github.com/liquidata-inc/go-mysql-server v0.4.1-0.20190912171848-afe92c9129ca h1:m+IjqmcLBpmNI/58CcE+GOjsD5gxwfK1etDdcVAUx/k=
github.com/liquidata-inc/go-mysql-server v0.4.1-0.20190912171848-afe92c9129ca/go.mod h1:DdWE0ku/mNfuLsRJIrHeHpDtB7am+6oopxEsQKmVkx8=
github.com/liquidata-inc/go-mysql-server v0.4.1-0.20190916182737-521be194cc83 h1:6DUdp1E9YtjukV2VHIj7iHQx4XQQVqqFjmHGlArjsMI=
github.com/liquidata-inc/go-mysql-server v0.4.1-0.20190916182737-521be194cc83/go.mod h1:DdWE0ku/mNfuLsRJIrHeHpDtB7am+6oopxEsQKmVkx8=
github.com/liquidata-inc/ishell v0.0.0-20190514193646-693241f1f2a0 h1:phMgajKClMUiIr+hF2LGt8KRuUa2Vd2GI1sNgHgSXoU=
github.com/liquidata-inc/ishell v0.0.0-20190514193646-693241f1f2a0/go.mod h1:YC1rI9k5gx8D02ljlbxDfZe80s/iq8bGvaaQsvR+qxs=
github.com/liquidata-inc/mmap-go v1.0.3 h1:2LndAeAtup9rpvUmu4wZSYCsjCQ0Zpc+NqE+6+PnT7g=

View File

@@ -100,6 +100,9 @@ type DataLocation interface {
// NewUpdatingWriter will create a TableWriteCloser for a DataLocation that will update and append rows based on
// their primary key.
NewUpdatingWriter(ctx context.Context, mvOpts *MoveOptions, root *doltdb.RootValue, fs filesys.WritableFS, srcIsSorted bool, outSch schema.Schema, statsCB noms.StatsCB) (table.TableWriteCloser, error)
// NewReplacingWriter will create a TableWriteCloser for a DataLocation that will overwrite an existing table if it has the same schema.
NewReplacingWriter(ctx context.Context, mvOpts *MoveOptions, root *doltdb.RootValue, fs filesys.WritableFS, srcIsSorted bool, outSch schema.Schema, statsCB noms.StatsCB) (table.TableWriteCloser, error)
}
// NewDataLocation creates a DataLocation object from a path and a format string. If the path is the name of a table

View File

@@ -36,6 +36,7 @@ type MoveOperation string
const (
OverwriteOp MoveOperation = "overwrite"
ReplaceOp MoveOperation = "replace"
UpdateOp MoveOperation = "update"
InvalidOp MoveOperation = "invalid"
)
@@ -78,6 +79,7 @@ const (
NomsKindSchemaErr DataMoverCreationErrType = "Invalid schema error"
SchemaErr DataMoverCreationErrType = "Schema error"
MappingErr DataMoverCreationErrType = "Mapping error"
ReplacingErr DataMoverCreationErrType = "Replacing error"
CreateMapperErr DataMoverCreationErrType = "Mapper creation error"
CreateWriterErr DataMoverCreationErrType = "Create writer error"
CreateSorterErr DataMoverCreationErrType = "Create sorter error"
@@ -118,6 +120,17 @@ func NewDataMover(ctx context.Context, root *doltdb.RootValue, fs filesys.Filesy
return nil, &DataMoverCreationError{SchemaErr, err}
}
if mvOpts.Operation == ReplaceOp && mvOpts.MappingFile == "" {
fileMatchesSchema, err := rd.VerifySchema(outSch)
if err != nil {
return nil, &DataMoverCreationError{ReplacingErr, err}
}
if !fileMatchesSchema {
err := errors.New("Schema from file does not match schema from existing table.")
return nil, &DataMoverCreationError{ReplacingErr, err}
}
}
var mapping *rowconv.FieldMapping
if mvOpts.MappingFile != "" {
mapping, err = rowconv.MappingFromFile(mvOpts.MappingFile, fs, rd.GetSchema(), outSch)
@@ -140,6 +153,8 @@ func NewDataMover(ctx context.Context, root *doltdb.RootValue, fs filesys.Filesy
var wr table.TableWriteCloser
if mvOpts.Operation == OverwriteOp {
wr, err = mvOpts.Dest.NewCreatingWriter(ctx, mvOpts, root, fs, srcIsSorted, outSch, statsCB)
} else if mvOpts.Operation == ReplaceOp {
wr, err = mvOpts.Dest.NewReplacingWriter(ctx, mvOpts, root, fs, srcIsSorted, outSch, statsCB)
} else {
wr, err = mvOpts.Dest.NewUpdatingWriter(ctx, mvOpts, root, fs, srcIsSorted, outSch, statsCB)
}
@@ -200,7 +215,7 @@ func maybeMapFields(transforms *pipeline.TransformCollection, mapping *rowconv.F
}
func getOutSchema(ctx context.Context, inSch schema.Schema, root *doltdb.RootValue, fs filesys.ReadableFS, mvOpts *MoveOptions) (schema.Schema, error) {
if mvOpts.Operation == UpdateOp {
if mvOpts.Operation == UpdateOp || mvOpts.Operation == ReplaceOp {
// Get schema from target
rd, _, err := mvOpts.Dest.NewReader(ctx, root, fs, mvOpts.SchFile, mvOpts.SrcOptions)

View File

@@ -155,3 +155,9 @@ func (dl FileDataLocation) NewCreatingWriter(ctx context.Context, mvOpts *MoveOp
func (dl FileDataLocation) NewUpdatingWriter(ctx context.Context, mvOpts *MoveOptions, root *doltdb.RootValue, fs filesys.WritableFS, srcIsSorted bool, outSch schema.Schema, statsCB noms.StatsCB) (table.TableWriteCloser, error) {
panic("Updating of files is not supported")
}
// NewReplacingWriter will create a TableWriteCloser for a DataLocation that will overwrite an existing table while
// preserving schema
func (dl FileDataLocation) NewReplacingWriter(ctx context.Context, mvOpts *MoveOptions, root *doltdb.RootValue, fs filesys.WritableFS, srcIsSorted bool, outSch schema.Schema, statsCB noms.StatsCB) (table.TableWriteCloser, error) {
panic("Replacing files is not supported")
}

View File

@@ -91,3 +91,9 @@ func (dl StreamDataLocation) NewCreatingWriter(ctx context.Context, mvOpts *Move
func (dl StreamDataLocation) NewUpdatingWriter(ctx context.Context, mvOpts *MoveOptions, root *doltdb.RootValue, fs filesys.WritableFS, srcIsSorted bool, outSch schema.Schema, statsCB noms.StatsCB) (table.TableWriteCloser, error) {
panic("Updating is not supported for stdout")
}
// NewReplacingWriter will create a TableWriteCloser for a DataLocation that will overwrite an existing table while
// preserving schema
func (dl StreamDataLocation) NewReplacingWriter(ctx context.Context, mvOpts *MoveOptions, root *doltdb.RootValue, fs filesys.WritableFS, srcIsSorted bool, outSch schema.Schema, statsCB noms.StatsCB) (table.TableWriteCloser, error) {
panic("Replacing is not supported for stdout")
}

View File

@@ -119,3 +119,25 @@ func (dl TableDataLocation) NewUpdatingWriter(ctx context.Context, mvOpts *MoveO
return noms.NewNomsMapUpdater(ctx, root.VRW(), m, outSch, statsCB), nil
}
// NewReplacingWriter will create a TableWriteCloser for a DataLocation that will overwrite an existing table while
// preserving schema
func (dl TableDataLocation) NewReplacingWriter(ctx context.Context, mvOpts *MoveOptions, root *doltdb.RootValue, fs filesys.WritableFS, srcIsSorted bool, outSch schema.Schema, statsCB noms.StatsCB) (table.TableWriteCloser, error) {
_, ok, err := root.GetTable(ctx, dl.Name)
if err != nil {
return nil, err
}
if !ok {
return nil, errors.New("Could not find table " + dl.Name)
}
m, err := types.NewMap(ctx, root.VRW())
if err != nil {
return nil, err
}
return noms.NewNomsMapUpdater(ctx, root.VRW(), m, outSch, statsCB), nil
}

View File

@@ -84,6 +84,35 @@ func SchemasAreEqual(sch1, sch2 Schema) (bool, error) {
return areEqual, nil
}
// VerifyInSchema tests that the incoming schema matches the schema from the original table.
// The test for column equality is more flexible than SchemasAreEqual.
func VerifyInSchema(inSch, outSch Schema) (bool, error) {
inSchCols := inSch.GetAllCols()
outSchCols := outSch.GetAllCols()
if inSchCols.Size() != outSchCols.Size() {
return false, nil
}
match := true
err := outSchCols.Iter(func(tag uint64, outCol Column) (stop bool, err error) {
inCol, ok := inSchCols.GetByTag(tag)
if !ok || inCol.Name != outCol.Name || inCol.Tag != outCol.Tag {
match = false
return true, nil
}
return false, nil
})
if err != nil {
return false, err
}
return match, nil
}
var randGen = rand.New(rand.NewSource(time.Now().UnixNano()))
func AutoGenerateTag(sch Schema) uint64 {

View File

@@ -144,6 +144,11 @@ func (rd *InMemTableReader) GetSchema() schema.Schema {
return rd.tt.sch
}
// VerifySchema checks that the incoming schema matches the schema from the existing table
func (rd *InMemTableReader) VerifySchema(outSch schema.Schema) (bool, error) {
return schema.VerifyInSchema(rd.tt.sch, outSch)
}
// InMemTableWriter is an implementation of a TableWriter for an InMemTable
type InMemTableWriter struct {
tt *InMemTable

View File

@@ -31,6 +31,9 @@ type TableReader interface {
// ReadRow reads a row from a table. If there is a bad row the returned error will be non nil, and callin IsBadRow(err)
// will be return true. This is a potentially non-fatal error and callers can decide if they want to continue on a bad row, or fail.
ReadRow(ctx context.Context) (row.Row, error)
// VerifySchema checks that the incoming schema matches the schema from the existing table
VerifySchema(outSch schema.Schema) (bool, error)
}
// TableWriteCloser is an interface for writing rows to a table

View File

@@ -17,14 +17,20 @@ package json
import "github.com/liquidata-inc/dolt/go/libraries/doltcore/row"
type JSONFileInfo struct {
Rows []row.Row
Rows []row.Row
AbleToDecode bool
}
func NewJSONInfo() *JSONFileInfo {
return &JSONFileInfo{nil}
return &JSONFileInfo{nil, true}
}
func (info *JSONFileInfo) SetRows(rows []row.Row) *JSONFileInfo {
info.Rows = rows
return info
}
func (info *JSONFileInfo) SetAbleToDecode(able bool) *JSONFileInfo {
info.AbleToDecode = able
return info
}

View File

@@ -77,8 +77,13 @@ func NewJSONReader(nbf *types.NomsBinFormat, r io.ReadCloser, info *JSONFileInfo
return nil, err
}
ableToDecode := true
decodedRows, err := jsonRows.decodeJSONRows(nbf, sch)
if err != nil {
ableToDecode = false
}
info.SetRows(decodedRows)
info.SetAbleToDecode(ableToDecode)
return &JSONReader{r, br, info, sch, 0}, nil
}
@@ -95,9 +100,14 @@ func (jsonr *JSONReader) Close(ctx context.Context) error {
}
// GetSchema gets the schema of the rows that this reader will return
func (jsonr *JSONReader) GetSchema() schema.Schema {
return jsonr.sch
}
// VerifySchema checks that the incoming schema matches the schema from the existing table
func (jsonr *JSONReader) VerifySchema(sch schema.Schema) (bool, error) {
return jsonr.info.AbleToDecode, nil
}
func (jsonr *JSONReader) ReadRow(ctx context.Context) (row.Row, error) {

View File

@@ -65,3 +65,8 @@ func (nmr *NomsMapReader) Close(ctx context.Context) error {
nmr.itr = nil
return nil
}
// VerifySchema checks that the incoming schema matches the schema from the existing table
func (nmr *NomsMapReader) VerifySchema(outSch schema.Schema) (bool, error) {
return schema.VerifyInSchema(nmr.sch, outSch)
}

View File

@@ -132,6 +132,11 @@ func (csvr *CSVReader) GetSchema() schema.Schema {
return csvr.sch
}
// VerifySchema checks that the in schema matches the original schema
func (csvr *CSVReader) VerifySchema(outSch schema.Schema) (bool, error) {
return schema.VerifyInSchema(csvr.sch, outSch)
}
// Close should release resources being held
func (csvr *CSVReader) Close(ctx context.Context) error {
if csvr.closer != nil {

View File

@@ -75,10 +75,16 @@ func getColHeaders(path string, sheetName string) ([]string, error) {
return colHeaders, nil
}
// GetSchema gets the schema of the rows that this reader will return
func (xlsxr *XLSXReader) GetSchema() schema.Schema {
return xlsxr.sch
}
// VerifySchema checks that the incoming schema matches the schema from the existing table
func (xlsxr *XLSXReader) VerifySchema(outSch schema.Schema) (bool, error) {
return schema.VerifyInSchema(xlsxr.sch, outSch)
}
// Close should release resources being held
func (xlsxr *XLSXReader) Close(ctx context.Context) error {
if xlsxr.closer != nil {