mirror of
https://github.com/dolthub/dolt.git
synced 2026-04-22 19:43:51 -05:00
delete batches (#1524)
This commit is contained in:
+69
-34
@@ -57,6 +57,16 @@ import (
|
||||
"github.com/dolthub/dolt/go/libraries/utils/tracing"
|
||||
)
|
||||
|
||||
type batchMode int64
|
||||
|
||||
const (
|
||||
invalidBatchMode batchMode = iota
|
||||
insertBatchMode
|
||||
deleteBatchMode
|
||||
|
||||
currentBatchModeKey = "batch_mode"
|
||||
)
|
||||
|
||||
var sqlDocs = cli.CommandDocumentationContent{
|
||||
ShortDesc: "Runs a SQL query",
|
||||
LongDesc: `Runs a SQL query you specify. With no arguments, begins an interactive shell to run queries and view the results. With the {{.EmphasisLeft}}-q{{.EmphasisRight}} option, runs the given query and prints any results, then exits. If a commit is specified then only read queries are supported, and will run against the data at the specified commit.
|
||||
@@ -579,7 +589,7 @@ func runBatchMode(ctx *sql.Context, se *sqlEngine, input io.Reader) error {
|
||||
query = ""
|
||||
}
|
||||
|
||||
updateBatchInsertOutput()
|
||||
updateBatchEditOutput()
|
||||
cli.Println() // need a newline after all updates are executed
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
@@ -991,37 +1001,47 @@ func processBatchQuery(ctx *sql.Context, query string, se *sqlEngine) error {
|
||||
return fmt.Errorf("Error parsing SQL: %v.", err.Error())
|
||||
}
|
||||
|
||||
canBatch, err := canProcessAsBatchInsert(ctx, sqlStatement, se, query)
|
||||
currentBatchMode := invalidBatchMode
|
||||
if t, v := ctx.Get(currentBatchModeKey); t == sql.Int64 {
|
||||
currentBatchMode = batchMode(v.(int64))
|
||||
}
|
||||
|
||||
newBatchMode, err := canProcessAsBatchEdit(ctx, sqlStatement, se, query)
|
||||
if err != nil {
|
||||
cli.PrintErrln(err)
|
||||
return err
|
||||
}
|
||||
if canBatch {
|
||||
err = processBatchInsert(ctx, se, query, sqlStatement)
|
||||
|
||||
if currentBatchMode != invalidBatchMode && currentBatchMode != newBatchMode {
|
||||
// We need to commit whatever batch edits we've accumulated so far before executing the query
|
||||
err := flushBatchedEdits(ctx, se)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
ctx.Set(ctx, currentBatchModeKey, sql.Int64, int64(newBatchMode))
|
||||
|
||||
if newBatchMode != invalidBatchMode {
|
||||
err = processBatchableEditQuery(ctx, se, query, sqlStatement)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
err := processNonInsertBatchQuery(ctx, se, query, sqlStatement)
|
||||
err := processNonBatchableQuery(ctx, se, query, sqlStatement)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if batchEditStats.shouldUpdateBatchModeOutput() {
|
||||
updateBatchInsertOutput()
|
||||
updateBatchEditOutput()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func processNonInsertBatchQuery(ctx *sql.Context, se *sqlEngine, query string, sqlStatement sqlparser.Statement) (returnErr error) {
|
||||
// We need to commit whatever batch edits we've accumulated so far before executing the query
|
||||
err := flushBatchedEdits(ctx, se)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
func processNonBatchableQuery(ctx *sql.Context, se *sqlEngine, query string, sqlStatement sqlparser.Statement) (returnErr error) {
|
||||
foundDoltSQLFunc, err := checkForDoltSQLFunction(sqlStatement)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -1070,7 +1090,7 @@ func processNonInsertBatchQuery(ctx *sql.Context, se *sqlEngine, query string, s
|
||||
return flushBatchedEdits(ctx, se)
|
||||
}
|
||||
|
||||
func processBatchInsert(ctx *sql.Context, se *sqlEngine, query string, sqlStatement sqlparser.Statement) (returnErr error) {
|
||||
func processBatchableEditQuery(ctx *sql.Context, se *sqlEngine, query string, sqlStatement sqlparser.Statement) (returnErr error) {
|
||||
_, rowIter, err := se.query(ctx, query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error inserting rows: %v", err.Error())
|
||||
@@ -1096,43 +1116,58 @@ func processBatchInsert(ctx *sql.Context, se *sqlEngine, query string, sqlStatem
|
||||
return nil
|
||||
}
|
||||
|
||||
// canProcessBatchInsert returns whether the given statement can be processed as a batch insert. Only simple inserts
|
||||
// canProcessBatchEdit returns whether the given statement can be processed as a batch insert. Only simple inserts
|
||||
// (inserting a list of values) can be processed in this way. Other kinds of insert (notably INSERT INTO SELECT AS and
|
||||
// AUTO_INCREMENT) need a flushed root and can't benefit from batch optimizations.
|
||||
func canProcessAsBatchInsert(ctx *sql.Context, sqlStatement sqlparser.Statement, se *sqlEngine, query string) (bool, error) {
|
||||
func canProcessAsBatchEdit(ctx *sql.Context, sqlStatement sqlparser.Statement, se *sqlEngine, query string) (batchMode, error) {
|
||||
switch s := sqlStatement.(type) {
|
||||
case *sqlparser.Insert:
|
||||
if _, ok := s.Rows.(sqlparser.Values); !ok {
|
||||
return false, nil
|
||||
}
|
||||
foundSubquery, err := checkForInsertSubqueries(query)
|
||||
case *sqlparser.Delete:
|
||||
foundSubquery, err := checkForSubqueries(query)
|
||||
if err != nil {
|
||||
return false, err
|
||||
return invalidBatchMode, err
|
||||
}
|
||||
if foundSubquery {
|
||||
return false, nil
|
||||
return invalidBatchMode, nil
|
||||
}
|
||||
|
||||
return deleteBatchMode, nil
|
||||
|
||||
case *sqlparser.Insert:
|
||||
if _, ok := s.Rows.(sqlparser.Values); !ok {
|
||||
return invalidBatchMode, nil
|
||||
}
|
||||
foundSubquery, err := checkForSubqueries(query)
|
||||
if err != nil {
|
||||
return invalidBatchMode, err
|
||||
}
|
||||
|
||||
if foundSubquery {
|
||||
return invalidBatchMode, nil
|
||||
}
|
||||
|
||||
// TODO: This check coming first seems to cause problems with ctx.Session. Perhaps in the analyzer.
|
||||
hasAutoInc, err := insertsIntoAutoIncrementCol(ctx, se, query)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if hasAutoInc {
|
||||
return false, nil
|
||||
return invalidBatchMode, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
if hasAutoInc {
|
||||
return invalidBatchMode, nil
|
||||
}
|
||||
|
||||
return insertBatchMode, nil
|
||||
|
||||
case *sqlparser.Load:
|
||||
return true, nil
|
||||
return insertBatchMode, nil
|
||||
|
||||
default:
|
||||
return false, nil
|
||||
return invalidBatchMode, nil
|
||||
}
|
||||
}
|
||||
|
||||
// checkForInsertSubqueries parses the insert query to check for a subquery.
|
||||
func checkForInsertSubqueries(insertQuery string) (bool, error) {
|
||||
p, err := sqlparser.Parse(insertQuery)
|
||||
// checkForSubqueries parses the insert query to check for a subquery.
|
||||
func checkForSubqueries(query string) (bool, error) {
|
||||
p, err := sqlparser.Parse(query)
|
||||
|
||||
if err != nil {
|
||||
return false, nil
|
||||
@@ -1206,7 +1241,7 @@ func insertsIntoAutoIncrementCol(ctx *sql.Context, se *sqlEngine, query string)
|
||||
return isAutoInc, nil
|
||||
}
|
||||
|
||||
func updateBatchInsertOutput() {
|
||||
func updateBatchEditOutput() {
|
||||
displayStr := fmt.Sprintf("Rows inserted: %d Rows updated: %d Rows deleted: %d",
|
||||
batchEditStats.rowsInserted, batchEditStats.rowsUpdated, batchEditStats.rowsDeleted)
|
||||
displayStrLen = cli.DeleteAndPrint(displayStrLen, displayStr)
|
||||
|
||||
@@ -149,4 +149,30 @@ SQL
|
||||
[[ "$output" =~ "pk,c1" ]] || false
|
||||
[[ "$output" =~ "1,1" ]] || false
|
||||
[[ "$output" =~ "2,2" ]] || false
|
||||
}
|
||||
|
||||
@test "sql-batch: delete and insert batching" {
|
||||
run dolt sql -r csv << SQL
|
||||
INSERT INTO TEST VALUES (1,1,1,1,1,1);
|
||||
INSERT INTO TEST VALUES (2,1,1,1,1,1);
|
||||
INSERT INTO TEST VALUES (3,1,1,1,1,1);
|
||||
SELECT * FROM TEST;
|
||||
DELETE FROM TEST WHERE pk = 1;
|
||||
DELETE FROM TEST WHERE pk = 2;
|
||||
SELECT * FROM TEST;
|
||||
INSERT INTO TEST VALUES (4,1,1,1,1,1);
|
||||
INSERT INTO TEST VALUES (5,1,1,1,1,1);
|
||||
INSERT INTO TEST VALUES (6,1,1,1,1,1);
|
||||
DELETE FROM TEST WHERE pk=3;
|
||||
DELETE FROM TEST WHERE pk=4;
|
||||
INSERT INTO TEST VALUES (7,1,1,1,1,1);
|
||||
INSERT INTO TEST VALUES (8,1,1,1,1,1);
|
||||
INSERT INTO TEST VALUES (9,1,1,1,1,1);
|
||||
SQL
|
||||
[ "$status" -eq 0 ]
|
||||
|
||||
EXPECTED=$(echo -e "pk\n5\n6\n7\n8\n9")
|
||||
run dolt sql -r csv -q 'SELECT pk FROM TEST ORDER BY pk;'
|
||||
[ "$status" -eq 0 ]
|
||||
[[ "$output" =~ "$EXPECTED" ]] || false
|
||||
}
|
||||
Reference in New Issue
Block a user