Peformance tuning for batch SQL insert for the SQL import use case. Now as fast as CSV for single core, but doesn't multiplex so is ~4x worse on my 4-core machine for large data sets. Added profiling command line flags to dolt command (instead of enabling profiling via code changes).

Signed-off-by: Zach Musgrave <zach@liquidata.co>
This commit is contained in:
Zach Musgrave
2019-08-01 13:32:01 -07:00
parent 17ae881db1
commit b913614c87
4 changed files with 97 additions and 26 deletions

View File

@@ -15,6 +15,8 @@
package commands
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
@@ -114,8 +116,13 @@ func Sql(commandStr string, args []string, dEnv *env.DoltEnv) int {
}
}
// start an interactive shell
root = runShell(dEnv, root)
// Run in either batch mode for piped input, or shell mode for interactive
fi, _ := os.Stdin.Stat()
if (fi.Mode() & os.ModeCharDevice) == 0 {
root = runBatchMode(dEnv, root)
} else {
root = runShell(dEnv, root)
}
// If the SQL session wrote a new root value, update the working set with it
if root != nil {
@@ -125,6 +132,51 @@ func Sql(commandStr string, args []string, dEnv *env.DoltEnv) int {
return 0
}
// ScanStatements is a split function for a Scanner that returns each SQL statement in the input as a token. It doesn't
// work for strings that contain semi-colons. Supporting that requires implementing a state machine.
func scanStatements(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
if i := bytes.IndexByte(data, ';'); i >= 0 {
// We have a full ;-terminated line.
return i + 1, data[0:i], nil
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}
// runBatchMode processes queries until EOF and returns the resulting root value
func runBatchMode(dEnv *env.DoltEnv, root *doltdb.RootValue) *doltdb.RootValue {
scanner := bufio.NewScanner(os.Stdin)
scanner.Split(scanStatements)
batcher := dsql.NewSqlBatcher(dEnv.DoltDB, root)
for scanner.Scan() {
query := scanner.Text()
if newRoot, err := processBatchQuery(query, dEnv, root, batcher); newRoot != nil {
root = newRoot
} else if err != nil {
cli.Println(fmt.Sprintf("Error processing query '%s': %s", query, err.Error()))
}
}
if err := scanner.Err(); err != nil {
cli.Println(err.Error())
}
if newRoot, _ := batcher.Commit(context.Background()); newRoot != nil {
root = newRoot
}
return root
}
// runShell starts a SQL shell. Returns when the user exits the shell with the root value resulting from any queries.
func runShell(dEnv *env.DoltEnv, root *doltdb.RootValue) *doltdb.RootValue {
_ = iohelp.WriteLine(cli.CliOut, welcomeMsg)
@@ -298,8 +350,6 @@ func prepend(s string, ss []string) []string {
// Processes a single query and returns the new root value of the DB, or an error encountered.
func processQuery(query string, dEnv *env.DoltEnv, root *doltdb.RootValue) (*doltdb.RootValue, error) {
cli.Print("Processing " + query)
sqlStatement, err := sqlparser.Parse(query)
if err != nil {
return nil, fmt.Errorf("Error parsing SQL: %v.", err.Error())

View File

@@ -66,25 +66,43 @@ var doltCommand = cli.GenSubCommandHandler([]*cli.Command{
{Name: "conflicts", Desc: "Commands for viewing and resolving merge conflicts.", Func: cnfcmds.Commands, ReqRepo: false},
})
var cpuProf = false
var memProf = false
const profFlag = "--prof"
const cpuProf = "cpu"
const memProf = "mem"
const blockingProf = "blocking"
const traceProf = "trace"
func main() {
os.Exit(runMain())
}
func runMain() int {
if cpuProf {
fmt.Println("cpu profiling enabled.")
defer profile.Start(profile.CPUProfile).Stop()
}
if memProf {
fmt.Println("mem profiling enabled.")
defer profile.Start(profile.MemProfile).Stop()
}
args := os.Args[1:]
if len(args) > 0 && args[0] == profFlag {
if len(os.Args) <= 2 {
panic("Expected a profile arg after " + profFlag)
}
prof := args[1]
switch prof {
case cpuProf:
fmt.Println("cpu profiling enabled.")
defer profile.Start(profile.CPUProfile).Stop()
case memProf:
fmt.Println("mem profiling enabled.")
defer profile.Start(profile.MemProfile).Stop()
case blockingProf:
fmt.Println("block profiling enabled")
defer profile.Start(profile.BlockProfile).Stop()
case traceProf:
fmt.Println("trace profiling enabled")
defer profile.Start(profile.TraceProfile).Stop()
default:
panic("Unexpected prof flag: " + prof)
}
args = args[2:]
}
// Currently goland doesn't support running with a different working directory when using go modules.
// This is a hack that allows a different working directory to be set after the application starts using
// chdir=<DIR>. The syntax is not flexible and must match exactly this.

View File

@@ -92,7 +92,7 @@ type BatchInsertResult struct {
}
func (b *SqlBatcher) Insert(ctx context.Context, tableName string, r row.Row, opt InsertOptions) (*BatchInsertResult, error) {
sch, err := b.getSchema(ctx, tableName)
sch, err := b.GetSchema(ctx, tableName)
if err != nil {
return nil, err
}
@@ -125,7 +125,9 @@ func (b *SqlBatcher) Insert(ctx context.Context, tableName string, r row.Row, op
return &BatchInsertResult{RowInserted: !rowExists, RowUpdated: rowExists || rowAlreadyTouched}, nil
}
func (b *SqlBatcher) getTable(ctx context.Context, tableName string) (*doltdb.Table, error) {
// GetTable returns the table with the name given. This method is offered because reading the table from the root value
// is relatively expensive, and SqlBatcher caches Tables to avoid the overhead.
func (b *SqlBatcher) GetTable(ctx context.Context, tableName string) (*doltdb.Table, error) {
if table, ok := b.tables[tableName]; ok {
return table, nil
}
@@ -139,12 +141,14 @@ func (b *SqlBatcher) getTable(ctx context.Context, tableName string) (*doltdb.Ta
return table, nil
}
func (b *SqlBatcher) getSchema(ctx context.Context, tableName string) (schema.Schema, error) {
// GetSchema returns the schema for the table name given. This method is offered because reading the schema from disk
// is actually relatively expensive -- SqlBatcher caches the schema values per table to avoid the overhead.
func (b *SqlBatcher) GetSchema(ctx context.Context, tableName string) (schema.Schema, error) {
if schema, ok := b.schemas[tableName]; ok {
return schema, nil
}
table, err := b.getTable(ctx, tableName)
table, err := b.GetTable(ctx, tableName)
if err != nil {
return nil, err
}
@@ -174,7 +178,7 @@ func (b *SqlBatcher) getRowData(ctx context.Context, tableName string) (types.Ma
return rowData, nil
}
table, err := b.getTable(ctx, tableName)
table, err := b.GetTable(ctx, tableName)
if err != nil {
return types.EmptyMap, err
}

View File

@@ -39,7 +39,7 @@ var ErrMissingPrimaryKeys = errors.New("One or more primary key columns missing
var ConstraintFailedFmt = "Constraint failed for column '%v': %v"
// ExecuteInsertBatch executes the given insert statement in batch mode and returns the result. The table is not changed
// until the batch is Commited. The InsertResult returned similarly doesn't have a Root set, since the root isn't
// until the batch is Committed. The InsertResult returned similarly doesn't have a Root set, since the root isn't
// modified by this function.
func ExecuteBatchInsert(
ctx context.Context,
@@ -49,11 +49,10 @@ func ExecuteBatchInsert(
) (*InsertResult, error) {
tableName := s.Table.Name.String()
if !root.HasTable(ctx, tableName) {
return nil, fmt.Errorf("Unknown table %v", tableName)
tableSch, err := batcher.GetSchema(ctx, tableName)
if err != nil {
return nil, err
}
table, _ := root.GetTable(ctx, tableName)
tableSch := table.GetSchema(ctx)
// Parser supports overwrite on insert with both the replace keyword (from MySQL) as well as the ignore keyword
replace := s.Action == sqlparser.ReplaceStr