Add bulk export parameters to dump (#3620)

This commit is contained in:
Vinai Rachakonda
2022-06-17 16:27:34 -04:00
committed by GitHub
parent 80e809541f
commit 36a921aca7
9 changed files with 315 additions and 83 deletions

View File

@@ -38,10 +38,11 @@ import (
)
const (
forceParam = "force"
directoryFlag = "directory"
filenameFlag = "file-name"
batchFlag = "batch"
forceParam = "force"
directoryFlag = "directory"
filenameFlag = "file-name"
batchFlag = "batch"
noAutocommitFlag = "no-autocommit"
sqlFileExt = "sql"
csvFileExt = "csv"
@@ -55,11 +56,13 @@ var dumpDocs = cli.CommandDocumentationContent{
ShortDesc: `Export all tables.`,
LongDesc: `{{.EmphasisLeft}}dolt dump{{.EmphasisRight}} dumps all tables in the working set.
If a dump file already exists then the operation will fail, unless the {{.EmphasisLeft}}--force | -f{{.EmphasisRight}} flag
is provided. The force flag forces the existing dump file to be overwritten.
is provided. The force flag forces the existing dump file to be overwritten. The {{.EmphasisLeft}}-r{{.EmphasisRight}} flag
is used to support different file formats of the dump. In the case of non .sql files each table is written to a separate
csv,json or parquet file.
`,
Synopsis: []string{
"[-f] [-r {{.LessThan}}result-format{{.GreaterThan}}] ",
"[-f] [-r {{.LessThan}}result-format{{.GreaterThan}}] [-fn {{.LessThan}}file_name{{.GreaterThan}}] [-d {{.LessThan}}directory{{.GreaterThan}}] [--batch] [--no-autocommit] ",
},
}
@@ -83,12 +86,12 @@ func (cmd DumpCmd) Docs() *cli.CommandDocumentation {
func (cmd DumpCmd) ArgParser() *argparser.ArgParser {
ap := argparser.NewArgParser()
ap.SupportsString(FormatFlag, "r", "result_file_type", "Define the type of the output file. Defaults to sql. Valid values are sql, csv, json and parquet.")
ap.SupportsString(filenameFlag, "fn", "file_name", "Define file name for dump file. Defaults to `doltdump.sql`.")
ap.SupportsString(directoryFlag, "d", "directory_name", "Define directory name to dump the files in. Defaults to `doltdump/`.")
ap.SupportsFlag(forceParam, "f", "If data already exists in the destination, the force flag will allow the target to be overwritten.")
ap.SupportsFlag(batchFlag, "", "Returns batch insert statements wherever possible.")
ap.SupportsString(FormatFlag, "r", "result_file_type", "Define the type of the output file. Defaults to sql. Valid values are sql, csv, json and parquet.")
ap.SupportsString(filenameFlag, "", "file_name", "Define file name for dump file. Defaults to `doltdump.sql`.")
ap.SupportsString(directoryFlag, "", "directory_name", "Define directory name to dump the files in. Defaults to `doltdump/`.")
ap.SupportsFlag(noAutocommitFlag, "na", "Turns off autocommit for each dumped table. Used to speed up loading of outputted sql file")
return ap
}
@@ -159,11 +162,16 @@ func (cmd DumpCmd) Exec(ctx context.Context, commandStr string, args []string, d
dumpOpts := getDumpOptions(name, resFormat)
fPath, err := checkAndCreateOpenDestFile(ctx, root, dEnv, force, dumpOpts, name)
if err != nil {
return HandleVErrAndExitCode(errhand.VerboseErrorFromError(err), usage)
return HandleVErrAndExitCode(err, usage)
}
err2 := addBulkLoadingParadigms(dEnv, fPath)
if err2 != nil {
return HandleVErrAndExitCode(errhand.VerboseErrorFromError(err2), usage)
}
for _, tbl := range tblNames {
tblOpts := newTableArgs(tbl, dumpOpts.dest, apr.Contains(batchFlag))
tblOpts := newTableArgs(tbl, dumpOpts.dest, apr.Contains(batchFlag), apr.Contains(noAutocommitFlag))
err = dumpTable(ctx, dEnv, tblOpts, fPath)
if err != nil {
return HandleVErrAndExitCode(err, usage)
@@ -199,15 +207,20 @@ type dumpOptions struct {
}
type tableOptions struct {
tableName string
dest mvdata.DataLocation
batched bool
tableName string
dest mvdata.DataLocation
batched bool
autocommitOff bool
}
func (m tableOptions) IsBatched() bool {
return m.batched
}
func (m tableOptions) IsAutocommitOff() bool {
return m.autocommitOff
}
func (m tableOptions) WritesToTable() bool {
return false
}
@@ -386,11 +399,12 @@ func getDumpOptions(fileName string, rf string) *dumpOptions {
// newTableArgs returns tableOptions of table name and src table location and dest file location
// corresponding to the input parameters
func newTableArgs(tblName string, destination mvdata.DataLocation, batched bool) *tableOptions {
func newTableArgs(tblName string, destination mvdata.DataLocation, batched bool, autocommitOff bool) *tableOptions {
return &tableOptions{
tableName: tblName,
dest: destination,
batched: batched,
tableName: tblName,
dest: destination,
batched: batched,
autocommitOff: autocommitOff,
}
}
@@ -415,7 +429,7 @@ func dumpTables(ctx context.Context, root *doltdb.RootValue, dEnv *env.DoltEnv,
return err
}
tblOpts := newTableArgs(tbl, dumpOpts.dest, batched)
tblOpts := newTableArgs(tbl, dumpOpts.dest, batched, false)
err = dumpTable(ctx, dEnv, tblOpts, fPath)
if err != nil {
@@ -424,3 +438,26 @@ func dumpTables(ctx context.Context, root *doltdb.RootValue, dEnv *env.DoltEnv,
}
return nil
}
// addBulkLoadingParadigms adds statements that are used to expedite dump file ingestion.
// cc. https://dev.mysql.com/doc/refman/8.0/en/optimizing-innodb-bulk-data-loading.html
// This includes turning off FOREIGN_KEY_CHECKS and UNIQUE_CHECKS off at the beginning of the file.
// Note that the standard mysqldump program turns these variables off.
func addBulkLoadingParadigms(dEnv *env.DoltEnv, fPath string) error {
writer, err := dEnv.FS.OpenForWriteAppend(fPath, os.ModePerm)
if err != nil {
return err
}
_, err = writer.Write([]byte("SET FOREIGN_KEY_CHECKS=0;\n"))
if err != nil {
return err
}
_, err = writer.Write([]byte("SET UNIQUE_CHECKS=0;\n"))
if err != nil {
return err
}
return writer.Close()
}

View File

@@ -68,6 +68,10 @@ func (m exportOptions) IsBatched() bool {
return false
}
func (m exportOptions) IsAutocommitOff() bool {
return false
}
func (m exportOptions) WritesToTable() bool {
return false
}

View File

@@ -164,6 +164,10 @@ func (t testDataMoverOptions) IsBatched() bool {
return false
}
func (t testDataMoverOptions) IsAutocommitOff() bool {
return false
}
func (t testDataMoverOptions) WritesToTable() bool {
return true
}

View File

@@ -57,6 +57,7 @@ type MoverOptions struct {
}
type DataMoverOptions interface {
IsAutocommitOff() bool
IsBatched() bool
WritesToTable() bool
SrcName() string

View File

@@ -190,9 +190,9 @@ func (dl FileDataLocation) NewCreatingWriter(ctx context.Context, mvOpts DataMov
return json.NewJSONWriter(wr, outSch)
case SqlFile:
if mvOpts.IsBatched() {
return sqlexport.OpenBatchedSQLExportWriter(ctx, wr, root, mvOpts.SrcName(), outSch, opts)
return sqlexport.OpenBatchedSQLExportWriter(ctx, wr, root, mvOpts.SrcName(), mvOpts.IsAutocommitOff(), outSch, opts)
} else {
return sqlexport.OpenSQLExportWriter(ctx, wr, root, mvOpts.SrcName(), outSch, opts)
return sqlexport.OpenSQLExportWriter(ctx, wr, root, mvOpts.SrcName(), mvOpts.IsAutocommitOff(), outSch, opts)
}
case ParquetFile:
return parquet.NewParquetWriter(outSch, mvOpts.DestName())

View File

@@ -36,19 +36,21 @@ const batchSize = 10000
// SqlExportWriter is a TableWriter that writes SQL drop, create and insert statements to re-create a dolt table in a
// SQL database.
type BatchSqlExportWriter struct {
tableName string
sch schema.Schema
parentSchs map[string]schema.Schema
foreignKeys []doltdb.ForeignKey
wr io.WriteCloser
root *doltdb.RootValue
writtenFirstRow bool
numInserts int
editOpts editor.Options
tableName string
sch schema.Schema
parentSchs map[string]schema.Schema
foreignKeys []doltdb.ForeignKey
wr io.WriteCloser
root *doltdb.RootValue
writtenFirstRow bool
writtenAutocommitOff bool
numInserts int
editOpts editor.Options
autocommitOff bool
}
// OpenBatchedSQLExportWriter returns a new SqlWriter for the table with the writer given.
func OpenBatchedSQLExportWriter(ctx context.Context, wr io.WriteCloser, root *doltdb.RootValue, tableName string, sch schema.Schema, editOpts editor.Options) (*BatchSqlExportWriter, error) {
func OpenBatchedSQLExportWriter(ctx context.Context, wr io.WriteCloser, root *doltdb.RootValue, tableName string, autocommitOff bool, sch schema.Schema, editOpts editor.Options) (*BatchSqlExportWriter, error) {
allSchemas, err := root.GetAllSchemas(ctx)
if err != nil {
@@ -63,13 +65,14 @@ func OpenBatchedSQLExportWriter(ctx context.Context, wr io.WriteCloser, root *do
foreignKeys, _ := fkc.KeysForTable(tableName)
return &BatchSqlExportWriter{
tableName: tableName,
sch: sch,
parentSchs: allSchemas,
foreignKeys: foreignKeys,
root: root,
wr: wr,
editOpts: editOpts,
tableName: tableName,
sch: sch,
parentSchs: allSchemas,
foreignKeys: foreignKeys,
root: root,
wr: wr,
editOpts: editOpts,
autocommitOff: autocommitOff,
}, nil
}
@@ -141,6 +144,10 @@ func (w *BatchSqlExportWriter) WriteSqlRow(ctx context.Context, r sql.Row) error
return err
}
if err := w.maybeWriteAutocommitOff(); err != nil {
return err
}
// Reached max number of inserts on one line
if w.numInserts == batchSize {
// Reset count
@@ -189,21 +196,42 @@ func (w *BatchSqlExportWriter) WriteSqlRow(ctx context.Context, r sql.Row) error
}
func (w *BatchSqlExportWriter) maybeWriteDropCreate(ctx context.Context) error {
if !w.writtenFirstRow {
var b strings.Builder
b.WriteString(sqlfmt.DropTableIfExistsStmt(w.tableName))
b.WriteRune('\n')
sqlCtx, engine, _ := dsqle.PrepareCreateTableStmt(ctx, dsqle.NewUserSpaceDatabase(w.root, w.editOpts))
createTableStmt, err := dsqle.GetCreateTableStmt(sqlCtx, engine, w.tableName)
if err != nil {
return err
}
b.WriteString(createTableStmt)
if err := iohelp.WriteLine(w.wr, b.String()); err != nil {
return err
}
w.writtenFirstRow = true
if w.writtenFirstRow {
return nil
}
var b strings.Builder
b.WriteString(sqlfmt.DropTableIfExistsStmt(w.tableName))
b.WriteRune('\n')
sqlCtx, engine, _ := dsqle.PrepareCreateTableStmt(ctx, dsqle.NewUserSpaceDatabase(w.root, w.editOpts))
createTableStmt, err := dsqle.GetCreateTableStmt(sqlCtx, engine, w.tableName)
if err != nil {
return err
}
b.WriteString(createTableStmt)
if err := iohelp.WriteLine(w.wr, b.String()); err != nil {
return err
}
w.writtenFirstRow = true
return nil
}
func (w *BatchSqlExportWriter) maybeWriteAutocommitOff() error {
if w.writtenAutocommitOff || !w.autocommitOff {
return nil
}
var b strings.Builder
b.WriteString("SET AUTOCOMMIT=0;")
if err := iohelp.WriteLine(w.wr, b.String()); err != nil {
return err
}
w.writtenAutocommitOff = true
return nil
}
@@ -222,6 +250,16 @@ func (w *BatchSqlExportWriter) Close(ctx context.Context) error {
}
}
// We have to commit the changes of this tables insert by adding a COMMIT statement.
if w.autocommitOff {
var b strings.Builder
b.WriteString("COMMIT;")
b.WriteRune('\n')
if err := iohelp.WriteLine(w.wr, b.String()); err != nil {
return err
}
}
if w.wr != nil {
return w.wr.Close()
}

View File

@@ -34,18 +34,20 @@ import (
// SqlExportWriter is a TableWriter that writes SQL drop, create and insert statements to re-create a dolt table in a
// SQL database.
type SqlExportWriter struct {
tableName string
sch schema.Schema
parentSchs map[string]schema.Schema
foreignKeys []doltdb.ForeignKey
wr io.WriteCloser
root *doltdb.RootValue
writtenFirstRow bool
editOpts editor.Options
tableName string
sch schema.Schema
parentSchs map[string]schema.Schema
foreignKeys []doltdb.ForeignKey
wr io.WriteCloser
root *doltdb.RootValue
writtenFirstRow bool
writtenAutocommitOff bool
editOpts editor.Options
autocommitOff bool
}
// OpenSQLExportWriter returns a new SqlWriter for the table with the writer given.
func OpenSQLExportWriter(ctx context.Context, wr io.WriteCloser, root *doltdb.RootValue, tableName string, sch schema.Schema, editOpts editor.Options) (*SqlExportWriter, error) {
func OpenSQLExportWriter(ctx context.Context, wr io.WriteCloser, root *doltdb.RootValue, tableName string, autocommitOff bool, sch schema.Schema, editOpts editor.Options) (*SqlExportWriter, error) {
allSchemas, err := root.GetAllSchemas(ctx)
if err != nil {
@@ -60,13 +62,14 @@ func OpenSQLExportWriter(ctx context.Context, wr io.WriteCloser, root *doltdb.Ro
foreignKeys, _ := fkc.KeysForTable(tableName)
return &SqlExportWriter{
tableName: tableName,
sch: sch,
parentSchs: allSchemas,
foreignKeys: foreignKeys,
root: root,
wr: wr,
editOpts: editOpts,
tableName: tableName,
sch: sch,
parentSchs: allSchemas,
foreignKeys: foreignKeys,
root: root,
wr: wr,
editOpts: editOpts,
autocommitOff: autocommitOff,
}, nil
}
@@ -117,6 +120,10 @@ func (w *SqlExportWriter) WriteSqlRow(ctx context.Context, r sql.Row) error {
return err
}
if err := w.maybeWriteAutocommitoff(); err != nil {
return err
}
stmt, err := sqlfmt.SqlRowAsInsertStmt(ctx, r, w.tableName, w.sch)
if err != nil {
return err
@@ -130,21 +137,42 @@ func (w *SqlExportWriter) maybeWriteDropCreate(ctx context.Context) error {
if w.tableName == doltdb.SchemasTableName || w.tableName == doltdb.ProceduresTableName {
return nil
}
if !w.writtenFirstRow {
var b strings.Builder
b.WriteString(sqlfmt.DropTableIfExistsStmt(w.tableName))
b.WriteRune('\n')
sqlCtx, engine, _ := dsqle.PrepareCreateTableStmt(ctx, dsqle.NewUserSpaceDatabase(w.root, w.editOpts))
createTableStmt, err := dsqle.GetCreateTableStmt(sqlCtx, engine, w.tableName)
if err != nil {
return err
}
b.WriteString(createTableStmt)
if err := iohelp.WriteLine(w.wr, b.String()); err != nil {
return err
}
w.writtenFirstRow = true
if w.writtenFirstRow {
return nil
}
var b strings.Builder
b.WriteString(sqlfmt.DropTableIfExistsStmt(w.tableName))
b.WriteRune('\n')
sqlCtx, engine, _ := dsqle.PrepareCreateTableStmt(ctx, dsqle.NewUserSpaceDatabase(w.root, w.editOpts))
createTableStmt, err := dsqle.GetCreateTableStmt(sqlCtx, engine, w.tableName)
if err != nil {
return err
}
b.WriteString(createTableStmt)
if err := iohelp.WriteLine(w.wr, b.String()); err != nil {
return err
}
w.writtenFirstRow = true
return nil
}
func (w *SqlExportWriter) maybeWriteAutocommitoff() error {
if w.writtenAutocommitOff || !w.autocommitOff {
return nil
}
var b strings.Builder
b.WriteString("SET AUTOCOMMIT=0;")
if err := iohelp.WriteLine(w.wr, b.String()); err != nil {
return err
}
w.writtenAutocommitOff = true
return nil
}
@@ -155,6 +183,16 @@ func (w *SqlExportWriter) Close(ctx context.Context) error {
return err
}
// We have to commit the changes of this tables insert by adding a COMMIT statement.
if w.autocommitOff {
var b strings.Builder
b.WriteString("COMMIT;")
b.WriteRune('\n')
if err := iohelp.WriteLine(w.wr, b.String()); err != nil {
return err
}
}
if w.wr != nil {
return w.wr.Close()
}

View File

@@ -37,6 +37,18 @@ teardown() {
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 3 ]
run grep FOREIGN_KEY_CHECKS=0 doltdump.sql
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 1 ]
run grep UNIQUE_CHECKS=0 doltdump.sql
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 1 ]
# Sanity check
run grep AUTOCOMMIT doltdump.sql
[ "$status" -eq 1 ]
run dolt dump
[ "$status" -ne 0 ]
[[ "$output" =~ "doltdump.sql already exists" ]] || false
@@ -646,6 +658,76 @@ teardown() {
[[ "$output" = "" ]] || false
}
@test "dump: -na flag works correctly" {
dolt sql -q "CREATE TABLE new_table(pk int primary key);"
dolt sql -q "INSERT INTO new_Table values (1)"
run dolt dump -na
[ "$status" -eq 0 ]
[[ "$output" =~ "Successfully exported data." ]] || false
[ -f doltdump.sql ]
run head -n 3 doltdump.sql
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 3 ]
[[ "$output" =~ "SET FOREIGN_KEY_CHECKS=0;" ]] || false
[[ "$output" =~ "SET UNIQUE_CHECKS=0;" ]] || false
run grep AUTOCOMMIT doltdump.sql
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 1 ]
run tail -n 2 doltdump.sql
[[ "$output" =~ "COMMIT;" ]] || false
dolt sql < doltdump.sql
run dolt sql -r csv -q "select * from new_table"
[ "$status" -eq 0 ]
[[ "${lines[0]}" = "pk" ]] || false
[[ "${lines[1]}" = "1" ]] || false
# try with a csv output and ensure that there are no problems
run dolt dump -r csv -na
[ "$status" -eq 0 ]
[ -f doltdump/new_table.csv ]
}
@test "dump: --no-autocommit flag works with multiple tables" {
dolt sql -q "CREATE TABLE table1(pk int primary key);"
dolt sql -q "CREATE TABLE table2(pk int primary key);"
dolt sql -q "INSERT INTO table1 VALUES (1)"
dolt sql -q "INSERT INTO table2 VALUES (1)"
run dolt dump --no-autocommit
[ "$status" -eq 0 ]
[[ "$output" =~ "Successfully exported data." ]] || false
[ -f doltdump.sql ]
run grep AUTOCOMMIT doltdump.sql
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 2 ]
run grep "COMMIT;" doltdump.sql
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 2 ]
# test with batch mode
run dolt dump --batch -f --no-autocommit
[ "$status" -eq 0 ]
[[ "$output" =~ "Successfully exported data." ]] || false
[ -f doltdump.sql ]
run grep AUTOCOMMIT doltdump.sql
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 2 ]
run grep "COMMIT;" doltdump.sql
[ "$status" -eq 0 ]
[ "${#lines[@]}" -eq 2 ]
}
function create_tables() {
dolt sql -q "CREATE TABLE new_table(pk int primary key);"
dolt sql -q "CREATE TABLE warehouse(warehouse_id int primary key, warehouse_name varchar(100));"

View File

@@ -481,3 +481,31 @@ SQL
[ "$status" -eq 0 ]
[[ "$output" =~ "\`location\` geometry NOT NULL SRID 0," ]] || false
}
@test "import mysqldump: dolt dump --no-autocommit can be loaded back into mysql" {
service mysql start
dolt sql -q "CREATE TABLE IF NOT EXISTS mytable (pk int NOT NULL PRIMARY KEY, c1 varchar(25) DEFAULT NULL)"
dolt sql -q "INSERT IGNORE INTO mytable VALUES (0, 'one'), (1, 'two')"
# Setup the database we are loading data into
mysql <<SQL
CREATE DATABASE IF NOT EXISTS testdb;
SQL
run dolt dump --no-autocommit
[ -f doltdump.sql ]
# remove the utf8mb4_0900_bin collation which is not supported in this installation of mysql
sed -i 's/COLLATE=utf8mb4_0900_bin//' doltdump.sql
mysql testdb < doltdump.sql
run mysql <<SQL
SELECT count(*) from testdb.mytable
SQL
[ "$status" -eq 0 ]
[[ "$output" =~ "2" ]] || false
# Give the server a chance to drop the database
sleep 1
service mysql stop
}