This commit is contained in:
Andy Arthur
2020-05-18 08:58:13 -05:00
parent 0b0a2e941c
commit fa5fa2a441
8 changed files with 14 additions and 183 deletions

View File

@@ -57,7 +57,7 @@ const (
delimParam = "delim"
)
var mappingFileHelp = "A mapping file is json in the format:" + `
var MappingFileHelp = "A mapping file is json in the format:" + `
{
"source_field_name":"dest_field_name"
@@ -77,7 +77,7 @@ If {{.EmphasisLeft}}--replace | -r{{.EmphasisRight}} is given the operation will
A mapping file can be used to map fields between the file being imported and the table's schema being inferred. This can be used when creating a new table, or updating or replacing an existing table.
` + mappingFileHelp + `
` + MappingFileHelp + `
In create, update, and replace scenarios the file's extension is used to infer the type of the file. If a file does not have the expected extension then the {{.EmphasisLeft}}--file-type{{.EmphasisRight}} parameter should be used to explicitly define the format of the file in one of the supported formats (Currently only csv is supported). For files separated by a delimiter other than a ',', the --delim parameter can be used to specify a delimeter.

View File

@@ -59,39 +59,12 @@ const (
delimParam = "delim"
)
var SchemaFileHelp = "Schema definition files are json files in the format:" + `
{
"fields": [
{"name":"FIELD_NAME", "kind":"KIND", "Required":[true|false]},
...
],
"constraints": [
{"constraint_type":"primary_key", "field_indices":[INTEGER_FIELD_INDEX]}
]
}
where "fields" is the array of columns in each row of the table "constraints" is a list of table constraints. Only primary_key constraint types are supported currently. FIELD_NAME is the name of a column in a row and can be any valid string KIND must be a supported noms kind (bool, string, uuid, uint, int, float) INTEGER_FIELD_INDEX must be the 0 based index of the primary key in the "fields" array
`
var MappingFileHelp = "A mapping file is json in the format:" + `
{
"source_field_name":"dest_field_name"
...
}
where source_field_name is the name of a field in the file being imported and dest_field_name is the name of a field in the table being imported to.
`
var importDocs = cli.CommandDocumentationContent{
ShortDesc: `Imports data into a dolt table`,
LongDesc: `If {{.EmphasisLeft}}--create-table | -c{{.EmphasisRight}} is given the operation will create {{.LessThan}}table{{.GreaterThan}} and import the contents of file into it. If a table already exists at this location then the operation will fail, unless the {{.EmphasisLeft}}--force | -f{{.EmphasisRight}} flag is provided. The force flag forces the existing table to be overwritten.
The schema for the new table can be specified explicitly by providing a schema definition file, or will be inferred from the imported file. All schemas, inferred or explicitly defined must define a primary key. If the file format being imported does not support defining a primary key, then the {{.EmphasisLeft}}--pk{{.EmphasisRight}} parameter must supply the name of the field that should be used as the primary key.
The schema for the new table can be specified explicitly by providing a SQL schema definition file, or will be inferred from the imported file. All schemas, inferred or explicitly defined must define a primary key. If the file format being imported does not support defining a primary key, then the {{.EmphasisLeft}}--pk{{.EmphasisRight}} parameter must supply the name of the field that should be used as the primary key.
` + SchemaFileHelp +
`
If {{.EmphasisLeft}}--update-table | -u{{.EmphasisRight}} is given the operation will update {{.LessThan}}table{{.GreaterThan}} with the contents of file. The table's existing schema will be used, and field names will be used to match file fields with table fields unless a mapping file is specified.
During import, if there is an error importing any row, the import will be aborted by default. Use the {{.EmphasisLeft}}--continue{{.EmphasisRight}} flag to continue importing when an error is encountered.
@@ -102,7 +75,7 @@ If the schema for the existing table does not match the schema for the new file,
A mapping file can be used to map fields between the file being imported and the table being written to. This can be used when creating a new table, or updating or replacing an existing table.
` + MappingFileHelp +
` + schcmds.MappingFileHelp +
`
In create, update, and replace scenarios the file's extension is used to infer the type of the file. If a file does not have the expected extension then the {{.EmphasisLeft}}--file-type{{.EmphasisRight}} parameter should be used to explicitly define the format of the file in one of the supported formats (csv, psv, json, xlsx). For files separated by a delimiter other than a ',' (type csv) or a '|' (type psv), the --delim parameter can be used to specify a delimeter`,
@@ -117,7 +90,7 @@ In create, update, and replace scenarios the file's extension is used to infer t
type tableImportOp string
const (
CreateOp tableImportOp = "overwrite" // todo: make CreateOp?
CreateOp tableImportOp = "overwrite"
ReplaceOp tableImportOp = "replace"
UpdateOp tableImportOp = "update"
InvalidOp tableImportOp = "invalid"

View File

@@ -157,6 +157,7 @@ func MoveData(ctx context.Context, dEnv *env.DoltEnv, mover *DataMover, mvOpts D
return badCount, nil
}
// NameMapTransform creates a pipeline transform that converts rows from inSch to outSch based on a name mapping.
func NameMapTransform(inSch schema.Schema, outSch schema.Schema, mapper rowconv.NameMapper) (*pipeline.TransformCollection, error) {
mapping, err := rowconv.NameMapping(inSch, outSch, mapper)
@@ -179,6 +180,7 @@ func NameMapTransform(inSch schema.Schema, outSch schema.Schema, mapper rowconv.
return transforms, nil
}
// SchAndTableNameFromFile reads a SQL schema file and creates a Dolt schema from it.
func SchAndTableNameFromFile(ctx context.Context, path string, fs filesys.ReadableFS, root *doltdb.RootValue) (string, schema.Schema, error) {
if path != "" {
data, err := fs.ReadFile(path)

View File

@@ -1,149 +0,0 @@
// Copyright 2019 Liquidata, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package mvdata
const (
schemaFile = "schema.json"
mappingFile = "mapping.json"
)
//
//func TestDataMover(t *testing.T) {
// // todo: add expected schema
// tests := []struct {
// sqlSchema string
// mappingJSON string
// mvOpts *MoveOptions
// }{
// {
// "",
// "",
// &MoveOptions{
// operation: OverwriteOp,
// tableName: "testable",
// contOnErr: false,
// schFile: "",
// nameMapper: "",
// PrimaryKey: "",
// src: NewDataLocation("data.csv", ""),
// dest: NewDataLocation("data.psv", "psv")},
// },
// /*{
// "",
// "",
// &MoveOptions{
// operation: OverwriteOp,
// contOnErr: false,
// schFile: "",
// nameMapper: "",
// PrimaryKey: "a",
// src: NewDataLocation("data.csv", ""),
// dest: NewDataLocation("data.nbf", "")},
// },
// {
// "",
// "",
// &MoveOptions{
// operation: OverwriteOp,
// contOnErr: false,
// schFile: "",
// nameMapper: "",
// PrimaryKey: "",
// src: NewDataLocation("data.nbf", "nbf"),
// dest: NewDataLocation("table-name", "")},
// },*/
// {
// "",
// "",
// &MoveOptions{
// operation: OverwriteOp,
// tableName: "table-name",
// contOnErr: false,
// schFile: "",
// nameMapper: "",
// PrimaryKey: "a",
// src: NewDataLocation("data.csv", ""),
// dest: NewDataLocation("table-name", "")},
// },
// {
// `CREATE TABLE table_name (
//pk VARCHAR(120) COMMENT 'tag:0',
//value INT COMMENT 'tag:1',
//PRIMARY KEY (pk)
//);`,
// `{"a":"pk","b":"value"}`,
// &MoveOptions{
// operation: OverwriteOp,
// tableName: "table_name",
// contOnErr: false,
// schFile: "",
// nameMapper: "",
// PrimaryKey: "",
// src: NewDataLocation("data.csv", ""),
// dest: NewDataLocation("table_name", "")},
// },
// }
//
// for idx, test := range tests {
// fmt.Println(idx)
//
// var err error
// _, root, fs := createRootAndFS()
//
// if test.sqlSchema != "" {
// test.mvOpts.schFile = schemaFile
// err = fs.WriteFile(schemaFile, []byte(test.sqlSchema))
// }
//
// if test.mappingJSON != "" {
// test.mvOpts.nameMapper = mappingFile
// err = fs.WriteFile(mappingFile, []byte(test.mappingJSON))
// }
//
// src := test.mvOpts.src
//
// seedWr, err := src.NewCreatingWriter(context.Background(), test.mvOpts, root, fs, true, fakeSchema, nil)
//
// if err != nil {
// t.Fatal(err.Error())
// }
//
// imtRd := table.NewInMemTableReader(imt)
//
// _, _, err = table.PipeRows(context.Background(), imtRd, seedWr, false)
// seedWr.Close(context.Background())
// imtRd.Close(context.Background())
//
// if err != nil {
// t.Fatal(err)
// }
//
// encoding.UnmarshalJson(test.sqlSchema)
//
// dm, crDMErr := tblcmds.newImportDataMover(context.Background(), root, fs, test.mvOpts, nil)
//
// if crDMErr != nil {
// t.Fatal(crDMErr.String())
// }
//
// var badCount int64
// badCount, err = dm.Move(context.Background())
// assert.Equal(t, int64(0), badCount)
//
// if err != nil {
// t.Fatal(err)
// }
// }
//}

View File

@@ -204,6 +204,7 @@ func NameMapping(srcSch, destSch schema.Schema, nameMapper NameMapper) (*FieldMa
return NewFieldMapping(srcSch, destSch, srcToDest)
}
// NameMapperFromFile reads a JSON file containing a name mapping and returns a NameMapper.
func NameMapperFromFile(mappingFile string, FS filesys.ReadableFS) (NameMapper, error) {
var nm NameMapper

View File

@@ -218,6 +218,7 @@ func (cc *ColCollection) Size() int {
return len(cc.cols)
}
// ColCollsAreEqual determines whether two ColCollections are equal.
func ColCollsAreEqual(cc1, cc2 *ColCollection) bool {
if cc1.Size() != cc2.Size() {
return false
@@ -238,6 +239,7 @@ func ColCollsAreEqual(cc1, cc2 *ColCollection) bool {
return areEqual
}
// MapColCollection applies a function to each column in a ColCollection and creates a new ColCollection from the results.
func MapColCollection(cc *ColCollection, cb func(col Column) (Column, error)) (*ColCollection, error) {
mapped := make([]Column, cc.Size())
for i, c := range cc.cols {
@@ -250,6 +252,8 @@ func MapColCollection(cc *ColCollection, cb func(col Column) (Column, error)) (*
return NewColCollection(mapped...)
}
// FilterColCollection applies a boolean function to column in a ColCollection, it creates a new ColCollection from the
// set of columns for which the function returned true.
func FilterColCollection(cc *ColCollection, cb func(col Column) (bool, error)) (*ColCollection, error) {
filtered := make([]Column, 0, cc.Size())
for _, c := range cc.cols {

View File

@@ -34,7 +34,7 @@ type DSImpl struct {
// Schema defines the structure of the Dataset
Schema *SeedSchema
// tableName is the name of the test dataset
// TableName is the name of the test dataset
TableName string
// w is the writer where the test dataset will be written

View File

@@ -43,7 +43,7 @@ type RSImpl struct {
// Results are results of benchmarking
Results []result
// tableName is the name of the results table
// TableName is the name of the results table
TableName string
// w is the writer where the results will be written