implement implicit commits and fix import behavior (#8767)

This commit is contained in:
James Cor
2025-01-22 10:27:41 -08:00
committed by GitHub
parent cb47ee1c04
commit 758627e735
6 changed files with 77 additions and 15 deletions
+2
View File
@@ -583,6 +583,7 @@ func move(ctx context.Context, rd table.SqlRowReader, wr *mvdata.SqlEngineTableW
// only log info for the --continue option
if !options.contOnErr {
_ = wr.DropCreatedTable()
return true
}
@@ -620,6 +621,7 @@ func move(ctx context.Context, rd table.SqlRowReader, wr *mvdata.SqlEngineTableW
err := g.Wait()
if err != nil && err != io.EOF {
_ = wr.DropCreatedTable()
// don't lose the rowErr if there is one
if rowErr != nil {
return badCount, fmt.Errorf("%w\n%s", err, rowErr.Error())
+1 -1
View File
@@ -56,7 +56,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0
github.com/creasty/defaults v1.6.0
github.com/dolthub/flatbuffers/v23 v23.3.3-dh.2
github.com/dolthub/go-mysql-server v0.19.1-0.20250117232918-d5bf206dd036
github.com/dolthub/go-mysql-server v0.19.1-0.20250122100341-c5d0e527c855
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63
github.com/dolthub/swiss v0.1.0
github.com/esote/minmaxheap v1.0.0
+2 -2
View File
@@ -179,8 +179,8 @@ github.com/dolthub/fslock v0.0.3 h1:iLMpUIvJKMKm92+N1fmHVdxJP5NdyDK5bK7z7Ba2s2U=
github.com/dolthub/fslock v0.0.3/go.mod h1:QWql+P17oAAMLnL4HGB5tiovtDuAjdDTPbuqx7bYfa0=
github.com/dolthub/go-icu-regex v0.0.0-20241215010122-db690dd53c90 h1:Sni8jrP0sy/w9ZYXoff4g/ixe+7bFCZlfCqXKJSU+zM=
github.com/dolthub/go-icu-regex v0.0.0-20241215010122-db690dd53c90/go.mod h1:ylU4XjUpsMcvl/BKeRRMXSH7e7WBrPXdSLvnRJYrxEA=
github.com/dolthub/go-mysql-server v0.19.1-0.20250117232918-d5bf206dd036 h1:9rqJs6j4Vgjl4EgSV0ruvcpzZvdlCQa7TLm+Wh2cxVw=
github.com/dolthub/go-mysql-server v0.19.1-0.20250117232918-d5bf206dd036/go.mod h1:5HtKnb+IAiv+27bo50KGANbUB4HAzGEF9rlFF2ZBLZg=
github.com/dolthub/go-mysql-server v0.19.1-0.20250122100341-c5d0e527c855 h1:TpoMmjRofO1JOTxp14t/pDyr0RLi29ybLAMhV3ZZ+R0=
github.com/dolthub/go-mysql-server v0.19.1-0.20250122100341-c5d0e527c855/go.mod h1:5HtKnb+IAiv+27bo50KGANbUB4HAzGEF9rlFF2ZBLZg=
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63 h1:OAsXLAPL4du6tfbBgK0xXHZkOlos63RdKYS3Sgw/dfI=
github.com/dolthub/gozstd v0.0.0-20240423170813-23a2903bca63/go.mod h1:lV7lUeuDhH5thVGDCKXbatwKy2KW80L4rMT46n+Y2/Q=
github.com/dolthub/ishell v0.0.0-20240701202509-2b217167d718 h1:lT7hE5k+0nkBdj/1UOSFwjWpNxf+LCApbRHgnCA17XE=
@@ -187,8 +187,24 @@ func (s *SqlEngineTableWriter) WriteRows(ctx context.Context, inputChannel chan
}
}()
line := 1
// If there were create table statements, they are automatically committed, so we need to start a new transaction
if s.importOption == CreateOp {
_, iter, _, err := s.se.Query(s.sqlCtx, "START TRANSACTION")
if err != nil {
return err
}
for {
_, err = iter.Next(s.sqlCtx)
if err == io.EOF {
break
}
if err != nil {
return err
}
}
}
line := 1
for {
if s.statsCB != nil && atomic.LoadInt32(&s.statOps) >= tableWriterStatUpdateRate {
atomic.StoreInt32(&s.statOps, 0)
@@ -228,8 +244,20 @@ func (s *SqlEngineTableWriter) WriteRows(ctx context.Context, inputChannel chan
}
func (s *SqlEngineTableWriter) Commit(ctx context.Context) error {
_, _, _, err := s.se.Query(s.sqlCtx, "COMMIT")
return err
_, iter, _, err := s.se.Query(s.sqlCtx, "COMMIT")
if err != nil {
return err
}
for {
_, err = iter.Next(s.sqlCtx)
if err == io.EOF {
break
}
if err != nil {
return err
}
}
return nil
}
func (s *SqlEngineTableWriter) RowOperationSchema() sql.PrimaryKeySchema {
@@ -240,6 +268,42 @@ func (s *SqlEngineTableWriter) TableSchema() sql.PrimaryKeySchema {
return s.tableSchema
}
func (s *SqlEngineTableWriter) DropCreatedTable() error {
// quitting import that created table, should drop table
if s.importOption == CreateOp {
var err error
var iter sql.RowIter
_, iter, _, err = s.se.Query(s.sqlCtx, fmt.Sprintf("DROP TABLE IF EXISTS `%s`", s.tableName))
if err != nil {
return err
}
for {
_, err = iter.Next(s.sqlCtx)
if err == io.EOF {
break
}
if err != nil {
return err
}
}
_, iter, _, err = s.se.Query(s.sqlCtx, "COMMIT")
if err != nil {
return err
}
for {
_, err = iter.Next(s.sqlCtx)
if err == io.EOF {
break
}
if err != nil {
return err
}
}
}
return nil
}
// forceDropTableIfNeeded drop the given table in case the -f parameter is passed.
func (s *SqlEngineTableWriter) forceDropTableIfNeeded() error {
if s.force {
@@ -448,6 +448,11 @@ func (d *DoltSession) CommitTransaction(ctx *sql.Context, tx sql.Transaction) (e
return nil
}
// There is no transaction to commit
if tx == nil {
return nil
}
dirties := d.dirtyWorkingSets()
if len(dirties) == 0 {
return nil
@@ -100,15 +100,6 @@ tests:
queries:
- exec: 'set autocommit=0'
- exec: 'create table t(c0 int)'
- on: repo1
queries:
- query: 'show tables'
result:
columns: ["Tables_in_repo1"]
rows: []
- on: repo1
queries:
- exec: 'create table t(c0 int)'
- on: repo1
queries:
- query: 'show tables'