From 36a921aca7742d672faa4e8488104b283dc351e1 Mon Sep 17 00:00:00 2001 From: Vinai Rachakonda Date: Fri, 17 Jun 2022 16:27:34 -0400 Subject: [PATCH] Add bulk export parameters to dump (#3620) --- go/cmd/dolt/commands/dump.go | 77 ++++++++++---- go/cmd/dolt/commands/tblcmds/export.go | 4 + go/libraries/doltcore/mvdata/data_loc_test.go | 4 + go/libraries/doltcore/mvdata/data_mover.go | 1 + go/libraries/doltcore/mvdata/file_data_loc.go | 4 +- .../untyped/sqlexport/batch_sqlwriter.go | 100 ++++++++++++------ .../table/untyped/sqlexport/sqlwriter.go | 98 +++++++++++------ integration-tests/bats/dump.bats | 82 ++++++++++++++ .../import-mysqldump.bats | 28 +++++ 9 files changed, 315 insertions(+), 83 deletions(-) diff --git a/go/cmd/dolt/commands/dump.go b/go/cmd/dolt/commands/dump.go index 7876046dd3..483ae75d2a 100644 --- a/go/cmd/dolt/commands/dump.go +++ b/go/cmd/dolt/commands/dump.go @@ -38,10 +38,11 @@ import ( ) const ( - forceParam = "force" - directoryFlag = "directory" - filenameFlag = "file-name" - batchFlag = "batch" + forceParam = "force" + directoryFlag = "directory" + filenameFlag = "file-name" + batchFlag = "batch" + noAutocommitFlag = "no-autocommit" sqlFileExt = "sql" csvFileExt = "csv" @@ -55,11 +56,13 @@ var dumpDocs = cli.CommandDocumentationContent{ ShortDesc: `Export all tables.`, LongDesc: `{{.EmphasisLeft}}dolt dump{{.EmphasisRight}} dumps all tables in the working set. If a dump file already exists then the operation will fail, unless the {{.EmphasisLeft}}--force | -f{{.EmphasisRight}} flag -is provided. The force flag forces the existing dump file to be overwritten. +is provided. The force flag forces the existing dump file to be overwritten. The {{.EmphasisLeft}}-r{{.EmphasisRight}} flag +is used to support different file formats of the dump. In the case of non .sql files each table is written to a separate +csv,json or parquet file. `, Synopsis: []string{ - "[-f] [-r {{.LessThan}}result-format{{.GreaterThan}}] ", + "[-f] [-r {{.LessThan}}result-format{{.GreaterThan}}] [-fn {{.LessThan}}file_name{{.GreaterThan}}] [-d {{.LessThan}}directory{{.GreaterThan}}] [--batch] [--no-autocommit] ", }, } @@ -83,12 +86,12 @@ func (cmd DumpCmd) Docs() *cli.CommandDocumentation { func (cmd DumpCmd) ArgParser() *argparser.ArgParser { ap := argparser.NewArgParser() + ap.SupportsString(FormatFlag, "r", "result_file_type", "Define the type of the output file. Defaults to sql. Valid values are sql, csv, json and parquet.") + ap.SupportsString(filenameFlag, "fn", "file_name", "Define file name for dump file. Defaults to `doltdump.sql`.") + ap.SupportsString(directoryFlag, "d", "directory_name", "Define directory name to dump the files in. Defaults to `doltdump/`.") ap.SupportsFlag(forceParam, "f", "If data already exists in the destination, the force flag will allow the target to be overwritten.") ap.SupportsFlag(batchFlag, "", "Returns batch insert statements wherever possible.") - ap.SupportsString(FormatFlag, "r", "result_file_type", "Define the type of the output file. Defaults to sql. Valid values are sql, csv, json and parquet.") - ap.SupportsString(filenameFlag, "", "file_name", "Define file name for dump file. Defaults to `doltdump.sql`.") - ap.SupportsString(directoryFlag, "", "directory_name", "Define directory name to dump the files in. Defaults to `doltdump/`.") - + ap.SupportsFlag(noAutocommitFlag, "na", "Turns off autocommit for each dumped table. Used to speed up loading of outputted sql file") return ap } @@ -159,11 +162,16 @@ func (cmd DumpCmd) Exec(ctx context.Context, commandStr string, args []string, d dumpOpts := getDumpOptions(name, resFormat) fPath, err := checkAndCreateOpenDestFile(ctx, root, dEnv, force, dumpOpts, name) if err != nil { - return HandleVErrAndExitCode(errhand.VerboseErrorFromError(err), usage) + return HandleVErrAndExitCode(err, usage) + } + + err2 := addBulkLoadingParadigms(dEnv, fPath) + if err2 != nil { + return HandleVErrAndExitCode(errhand.VerboseErrorFromError(err2), usage) } for _, tbl := range tblNames { - tblOpts := newTableArgs(tbl, dumpOpts.dest, apr.Contains(batchFlag)) + tblOpts := newTableArgs(tbl, dumpOpts.dest, apr.Contains(batchFlag), apr.Contains(noAutocommitFlag)) err = dumpTable(ctx, dEnv, tblOpts, fPath) if err != nil { return HandleVErrAndExitCode(err, usage) @@ -199,15 +207,20 @@ type dumpOptions struct { } type tableOptions struct { - tableName string - dest mvdata.DataLocation - batched bool + tableName string + dest mvdata.DataLocation + batched bool + autocommitOff bool } func (m tableOptions) IsBatched() bool { return m.batched } +func (m tableOptions) IsAutocommitOff() bool { + return m.autocommitOff +} + func (m tableOptions) WritesToTable() bool { return false } @@ -386,11 +399,12 @@ func getDumpOptions(fileName string, rf string) *dumpOptions { // newTableArgs returns tableOptions of table name and src table location and dest file location // corresponding to the input parameters -func newTableArgs(tblName string, destination mvdata.DataLocation, batched bool) *tableOptions { +func newTableArgs(tblName string, destination mvdata.DataLocation, batched bool, autocommitOff bool) *tableOptions { return &tableOptions{ - tableName: tblName, - dest: destination, - batched: batched, + tableName: tblName, + dest: destination, + batched: batched, + autocommitOff: autocommitOff, } } @@ -415,7 +429,7 @@ func dumpTables(ctx context.Context, root *doltdb.RootValue, dEnv *env.DoltEnv, return err } - tblOpts := newTableArgs(tbl, dumpOpts.dest, batched) + tblOpts := newTableArgs(tbl, dumpOpts.dest, batched, false) err = dumpTable(ctx, dEnv, tblOpts, fPath) if err != nil { @@ -424,3 +438,26 @@ func dumpTables(ctx context.Context, root *doltdb.RootValue, dEnv *env.DoltEnv, } return nil } + +// addBulkLoadingParadigms adds statements that are used to expedite dump file ingestion. +// cc. https://dev.mysql.com/doc/refman/8.0/en/optimizing-innodb-bulk-data-loading.html +// This includes turning off FOREIGN_KEY_CHECKS and UNIQUE_CHECKS off at the beginning of the file. +// Note that the standard mysqldump program turns these variables off. +func addBulkLoadingParadigms(dEnv *env.DoltEnv, fPath string) error { + writer, err := dEnv.FS.OpenForWriteAppend(fPath, os.ModePerm) + if err != nil { + return err + } + + _, err = writer.Write([]byte("SET FOREIGN_KEY_CHECKS=0;\n")) + if err != nil { + return err + } + + _, err = writer.Write([]byte("SET UNIQUE_CHECKS=0;\n")) + if err != nil { + return err + } + + return writer.Close() +} diff --git a/go/cmd/dolt/commands/tblcmds/export.go b/go/cmd/dolt/commands/tblcmds/export.go index 18788076b9..6366d3a1a7 100644 --- a/go/cmd/dolt/commands/tblcmds/export.go +++ b/go/cmd/dolt/commands/tblcmds/export.go @@ -68,6 +68,10 @@ func (m exportOptions) IsBatched() bool { return false } +func (m exportOptions) IsAutocommitOff() bool { + return false +} + func (m exportOptions) WritesToTable() bool { return false } diff --git a/go/libraries/doltcore/mvdata/data_loc_test.go b/go/libraries/doltcore/mvdata/data_loc_test.go index 846fcbcd84..770b8024a0 100644 --- a/go/libraries/doltcore/mvdata/data_loc_test.go +++ b/go/libraries/doltcore/mvdata/data_loc_test.go @@ -164,6 +164,10 @@ func (t testDataMoverOptions) IsBatched() bool { return false } +func (t testDataMoverOptions) IsAutocommitOff() bool { + return false +} + func (t testDataMoverOptions) WritesToTable() bool { return true } diff --git a/go/libraries/doltcore/mvdata/data_mover.go b/go/libraries/doltcore/mvdata/data_mover.go index 03e06ccd78..7ed4fba915 100644 --- a/go/libraries/doltcore/mvdata/data_mover.go +++ b/go/libraries/doltcore/mvdata/data_mover.go @@ -57,6 +57,7 @@ type MoverOptions struct { } type DataMoverOptions interface { + IsAutocommitOff() bool IsBatched() bool WritesToTable() bool SrcName() string diff --git a/go/libraries/doltcore/mvdata/file_data_loc.go b/go/libraries/doltcore/mvdata/file_data_loc.go index 750d4fda2e..6d9576855e 100644 --- a/go/libraries/doltcore/mvdata/file_data_loc.go +++ b/go/libraries/doltcore/mvdata/file_data_loc.go @@ -190,9 +190,9 @@ func (dl FileDataLocation) NewCreatingWriter(ctx context.Context, mvOpts DataMov return json.NewJSONWriter(wr, outSch) case SqlFile: if mvOpts.IsBatched() { - return sqlexport.OpenBatchedSQLExportWriter(ctx, wr, root, mvOpts.SrcName(), outSch, opts) + return sqlexport.OpenBatchedSQLExportWriter(ctx, wr, root, mvOpts.SrcName(), mvOpts.IsAutocommitOff(), outSch, opts) } else { - return sqlexport.OpenSQLExportWriter(ctx, wr, root, mvOpts.SrcName(), outSch, opts) + return sqlexport.OpenSQLExportWriter(ctx, wr, root, mvOpts.SrcName(), mvOpts.IsAutocommitOff(), outSch, opts) } case ParquetFile: return parquet.NewParquetWriter(outSch, mvOpts.DestName()) diff --git a/go/libraries/doltcore/table/untyped/sqlexport/batch_sqlwriter.go b/go/libraries/doltcore/table/untyped/sqlexport/batch_sqlwriter.go index 988cb63fb7..3697cb15e7 100644 --- a/go/libraries/doltcore/table/untyped/sqlexport/batch_sqlwriter.go +++ b/go/libraries/doltcore/table/untyped/sqlexport/batch_sqlwriter.go @@ -36,19 +36,21 @@ const batchSize = 10000 // SqlExportWriter is a TableWriter that writes SQL drop, create and insert statements to re-create a dolt table in a // SQL database. type BatchSqlExportWriter struct { - tableName string - sch schema.Schema - parentSchs map[string]schema.Schema - foreignKeys []doltdb.ForeignKey - wr io.WriteCloser - root *doltdb.RootValue - writtenFirstRow bool - numInserts int - editOpts editor.Options + tableName string + sch schema.Schema + parentSchs map[string]schema.Schema + foreignKeys []doltdb.ForeignKey + wr io.WriteCloser + root *doltdb.RootValue + writtenFirstRow bool + writtenAutocommitOff bool + numInserts int + editOpts editor.Options + autocommitOff bool } // OpenBatchedSQLExportWriter returns a new SqlWriter for the table with the writer given. -func OpenBatchedSQLExportWriter(ctx context.Context, wr io.WriteCloser, root *doltdb.RootValue, tableName string, sch schema.Schema, editOpts editor.Options) (*BatchSqlExportWriter, error) { +func OpenBatchedSQLExportWriter(ctx context.Context, wr io.WriteCloser, root *doltdb.RootValue, tableName string, autocommitOff bool, sch schema.Schema, editOpts editor.Options) (*BatchSqlExportWriter, error) { allSchemas, err := root.GetAllSchemas(ctx) if err != nil { @@ -63,13 +65,14 @@ func OpenBatchedSQLExportWriter(ctx context.Context, wr io.WriteCloser, root *do foreignKeys, _ := fkc.KeysForTable(tableName) return &BatchSqlExportWriter{ - tableName: tableName, - sch: sch, - parentSchs: allSchemas, - foreignKeys: foreignKeys, - root: root, - wr: wr, - editOpts: editOpts, + tableName: tableName, + sch: sch, + parentSchs: allSchemas, + foreignKeys: foreignKeys, + root: root, + wr: wr, + editOpts: editOpts, + autocommitOff: autocommitOff, }, nil } @@ -141,6 +144,10 @@ func (w *BatchSqlExportWriter) WriteSqlRow(ctx context.Context, r sql.Row) error return err } + if err := w.maybeWriteAutocommitOff(); err != nil { + return err + } + // Reached max number of inserts on one line if w.numInserts == batchSize { // Reset count @@ -189,21 +196,42 @@ func (w *BatchSqlExportWriter) WriteSqlRow(ctx context.Context, r sql.Row) error } func (w *BatchSqlExportWriter) maybeWriteDropCreate(ctx context.Context) error { - if !w.writtenFirstRow { - var b strings.Builder - b.WriteString(sqlfmt.DropTableIfExistsStmt(w.tableName)) - b.WriteRune('\n') - sqlCtx, engine, _ := dsqle.PrepareCreateTableStmt(ctx, dsqle.NewUserSpaceDatabase(w.root, w.editOpts)) - createTableStmt, err := dsqle.GetCreateTableStmt(sqlCtx, engine, w.tableName) - if err != nil { - return err - } - b.WriteString(createTableStmt) - if err := iohelp.WriteLine(w.wr, b.String()); err != nil { - return err - } - w.writtenFirstRow = true + if w.writtenFirstRow { + return nil } + + var b strings.Builder + b.WriteString(sqlfmt.DropTableIfExistsStmt(w.tableName)) + b.WriteRune('\n') + sqlCtx, engine, _ := dsqle.PrepareCreateTableStmt(ctx, dsqle.NewUserSpaceDatabase(w.root, w.editOpts)) + createTableStmt, err := dsqle.GetCreateTableStmt(sqlCtx, engine, w.tableName) + if err != nil { + return err + } + b.WriteString(createTableStmt) + if err := iohelp.WriteLine(w.wr, b.String()); err != nil { + return err + } + + w.writtenFirstRow = true + + return nil +} + +func (w *BatchSqlExportWriter) maybeWriteAutocommitOff() error { + if w.writtenAutocommitOff || !w.autocommitOff { + return nil + } + + var b strings.Builder + b.WriteString("SET AUTOCOMMIT=0;") + + if err := iohelp.WriteLine(w.wr, b.String()); err != nil { + return err + } + + w.writtenAutocommitOff = true + return nil } @@ -222,6 +250,16 @@ func (w *BatchSqlExportWriter) Close(ctx context.Context) error { } } + // We have to commit the changes of this tables insert by adding a COMMIT statement. + if w.autocommitOff { + var b strings.Builder + b.WriteString("COMMIT;") + b.WriteRune('\n') + if err := iohelp.WriteLine(w.wr, b.String()); err != nil { + return err + } + } + if w.wr != nil { return w.wr.Close() } diff --git a/go/libraries/doltcore/table/untyped/sqlexport/sqlwriter.go b/go/libraries/doltcore/table/untyped/sqlexport/sqlwriter.go index 4501f73b55..59a9f100c4 100644 --- a/go/libraries/doltcore/table/untyped/sqlexport/sqlwriter.go +++ b/go/libraries/doltcore/table/untyped/sqlexport/sqlwriter.go @@ -34,18 +34,20 @@ import ( // SqlExportWriter is a TableWriter that writes SQL drop, create and insert statements to re-create a dolt table in a // SQL database. type SqlExportWriter struct { - tableName string - sch schema.Schema - parentSchs map[string]schema.Schema - foreignKeys []doltdb.ForeignKey - wr io.WriteCloser - root *doltdb.RootValue - writtenFirstRow bool - editOpts editor.Options + tableName string + sch schema.Schema + parentSchs map[string]schema.Schema + foreignKeys []doltdb.ForeignKey + wr io.WriteCloser + root *doltdb.RootValue + writtenFirstRow bool + writtenAutocommitOff bool + editOpts editor.Options + autocommitOff bool } // OpenSQLExportWriter returns a new SqlWriter for the table with the writer given. -func OpenSQLExportWriter(ctx context.Context, wr io.WriteCloser, root *doltdb.RootValue, tableName string, sch schema.Schema, editOpts editor.Options) (*SqlExportWriter, error) { +func OpenSQLExportWriter(ctx context.Context, wr io.WriteCloser, root *doltdb.RootValue, tableName string, autocommitOff bool, sch schema.Schema, editOpts editor.Options) (*SqlExportWriter, error) { allSchemas, err := root.GetAllSchemas(ctx) if err != nil { @@ -60,13 +62,14 @@ func OpenSQLExportWriter(ctx context.Context, wr io.WriteCloser, root *doltdb.Ro foreignKeys, _ := fkc.KeysForTable(tableName) return &SqlExportWriter{ - tableName: tableName, - sch: sch, - parentSchs: allSchemas, - foreignKeys: foreignKeys, - root: root, - wr: wr, - editOpts: editOpts, + tableName: tableName, + sch: sch, + parentSchs: allSchemas, + foreignKeys: foreignKeys, + root: root, + wr: wr, + editOpts: editOpts, + autocommitOff: autocommitOff, }, nil } @@ -117,6 +120,10 @@ func (w *SqlExportWriter) WriteSqlRow(ctx context.Context, r sql.Row) error { return err } + if err := w.maybeWriteAutocommitoff(); err != nil { + return err + } + stmt, err := sqlfmt.SqlRowAsInsertStmt(ctx, r, w.tableName, w.sch) if err != nil { return err @@ -130,21 +137,42 @@ func (w *SqlExportWriter) maybeWriteDropCreate(ctx context.Context) error { if w.tableName == doltdb.SchemasTableName || w.tableName == doltdb.ProceduresTableName { return nil } - if !w.writtenFirstRow { - var b strings.Builder - b.WriteString(sqlfmt.DropTableIfExistsStmt(w.tableName)) - b.WriteRune('\n') - sqlCtx, engine, _ := dsqle.PrepareCreateTableStmt(ctx, dsqle.NewUserSpaceDatabase(w.root, w.editOpts)) - createTableStmt, err := dsqle.GetCreateTableStmt(sqlCtx, engine, w.tableName) - if err != nil { - return err - } - b.WriteString(createTableStmt) - if err := iohelp.WriteLine(w.wr, b.String()); err != nil { - return err - } - w.writtenFirstRow = true + + if w.writtenFirstRow { + return nil } + + var b strings.Builder + b.WriteString(sqlfmt.DropTableIfExistsStmt(w.tableName)) + b.WriteRune('\n') + sqlCtx, engine, _ := dsqle.PrepareCreateTableStmt(ctx, dsqle.NewUserSpaceDatabase(w.root, w.editOpts)) + createTableStmt, err := dsqle.GetCreateTableStmt(sqlCtx, engine, w.tableName) + if err != nil { + return err + } + b.WriteString(createTableStmt) + if err := iohelp.WriteLine(w.wr, b.String()); err != nil { + return err + } + w.writtenFirstRow = true + + return nil +} + +func (w *SqlExportWriter) maybeWriteAutocommitoff() error { + if w.writtenAutocommitOff || !w.autocommitOff { + return nil + } + + var b strings.Builder + b.WriteString("SET AUTOCOMMIT=0;") + + if err := iohelp.WriteLine(w.wr, b.String()); err != nil { + return err + } + + w.writtenAutocommitOff = true + return nil } @@ -155,6 +183,16 @@ func (w *SqlExportWriter) Close(ctx context.Context) error { return err } + // We have to commit the changes of this tables insert by adding a COMMIT statement. + if w.autocommitOff { + var b strings.Builder + b.WriteString("COMMIT;") + b.WriteRune('\n') + if err := iohelp.WriteLine(w.wr, b.String()); err != nil { + return err + } + } + if w.wr != nil { return w.wr.Close() } diff --git a/integration-tests/bats/dump.bats b/integration-tests/bats/dump.bats index 36020ab707..45451bdacd 100644 --- a/integration-tests/bats/dump.bats +++ b/integration-tests/bats/dump.bats @@ -37,6 +37,18 @@ teardown() { [ "$status" -eq 0 ] [ "${#lines[@]}" -eq 3 ] + run grep FOREIGN_KEY_CHECKS=0 doltdump.sql + [ "$status" -eq 0 ] + [ "${#lines[@]}" -eq 1 ] + + run grep UNIQUE_CHECKS=0 doltdump.sql + [ "$status" -eq 0 ] + [ "${#lines[@]}" -eq 1 ] + + # Sanity check + run grep AUTOCOMMIT doltdump.sql + [ "$status" -eq 1 ] + run dolt dump [ "$status" -ne 0 ] [[ "$output" =~ "doltdump.sql already exists" ]] || false @@ -646,6 +658,76 @@ teardown() { [[ "$output" = "" ]] || false } +@test "dump: -na flag works correctly" { + dolt sql -q "CREATE TABLE new_table(pk int primary key);" + dolt sql -q "INSERT INTO new_Table values (1)" + + run dolt dump -na + [ "$status" -eq 0 ] + [[ "$output" =~ "Successfully exported data." ]] || false + [ -f doltdump.sql ] + + run head -n 3 doltdump.sql + [ "$status" -eq 0 ] + [ "${#lines[@]}" -eq 3 ] + [[ "$output" =~ "SET FOREIGN_KEY_CHECKS=0;" ]] || false + [[ "$output" =~ "SET UNIQUE_CHECKS=0;" ]] || false + + run grep AUTOCOMMIT doltdump.sql + [ "$status" -eq 0 ] + [ "${#lines[@]}" -eq 1 ] + + run tail -n 2 doltdump.sql + [[ "$output" =~ "COMMIT;" ]] || false + + dolt sql < doltdump.sql + + run dolt sql -r csv -q "select * from new_table" + [ "$status" -eq 0 ] + [[ "${lines[0]}" = "pk" ]] || false + [[ "${lines[1]}" = "1" ]] || false + + # try with a csv output and ensure that there are no problems + run dolt dump -r csv -na + [ "$status" -eq 0 ] + [ -f doltdump/new_table.csv ] +} + +@test "dump: --no-autocommit flag works with multiple tables" { + dolt sql -q "CREATE TABLE table1(pk int primary key);" + dolt sql -q "CREATE TABLE table2(pk int primary key);" + + dolt sql -q "INSERT INTO table1 VALUES (1)" + dolt sql -q "INSERT INTO table2 VALUES (1)" + + run dolt dump --no-autocommit + [ "$status" -eq 0 ] + [[ "$output" =~ "Successfully exported data." ]] || false + [ -f doltdump.sql ] + + run grep AUTOCOMMIT doltdump.sql + [ "$status" -eq 0 ] + [ "${#lines[@]}" -eq 2 ] + + run grep "COMMIT;" doltdump.sql + [ "$status" -eq 0 ] + [ "${#lines[@]}" -eq 2 ] + + # test with batch mode + run dolt dump --batch -f --no-autocommit + [ "$status" -eq 0 ] + [[ "$output" =~ "Successfully exported data." ]] || false + [ -f doltdump.sql ] + + run grep AUTOCOMMIT doltdump.sql + [ "$status" -eq 0 ] + [ "${#lines[@]}" -eq 2 ] + + run grep "COMMIT;" doltdump.sql + [ "$status" -eq 0 ] + [ "${#lines[@]}" -eq 2 ] +} + function create_tables() { dolt sql -q "CREATE TABLE new_table(pk int primary key);" dolt sql -q "CREATE TABLE warehouse(warehouse_id int primary key, warehouse_name varchar(100));" diff --git a/integration-tests/data-dump-loading-tests/import-mysqldump.bats b/integration-tests/data-dump-loading-tests/import-mysqldump.bats index 232628a3e4..fd4fb43121 100644 --- a/integration-tests/data-dump-loading-tests/import-mysqldump.bats +++ b/integration-tests/data-dump-loading-tests/import-mysqldump.bats @@ -481,3 +481,31 @@ SQL [ "$status" -eq 0 ] [[ "$output" =~ "\`location\` geometry NOT NULL SRID 0," ]] || false } + +@test "import mysqldump: dolt dump --no-autocommit can be loaded back into mysql" { + service mysql start + dolt sql -q "CREATE TABLE IF NOT EXISTS mytable (pk int NOT NULL PRIMARY KEY, c1 varchar(25) DEFAULT NULL)" + dolt sql -q "INSERT IGNORE INTO mytable VALUES (0, 'one'), (1, 'two')" + + # Setup the database we are loading data into + mysql <