Fixed a bug in batch mode: re-use the same tables for every query rather than creating them each time. Ported part of the sql batcher tests to use the new batch mechanism.

Signed-off-by: Zach Musgrave <zach@liquidata.co>
This commit is contained in:
Zach Musgrave
2019-12-04 15:03:22 -08:00
parent b0c2551dd4
commit c7bed47c4f
2 changed files with 149 additions and 108 deletions

View File

@@ -37,21 +37,21 @@ const (
// Database implements sql.Database for a dolt DB.
type Database struct {
name string
root *doltdb.RootValue
dEnv *env.DoltEnv
batchMode batchMode
outstandingTables map[string]*DoltTable
name string
root *doltdb.RootValue
dEnv *env.DoltEnv
batchMode batchMode
tables map[string]*DoltTable
}
// NewDatabase returns a new dolt database to use in queries.
func NewDatabase(name string, root *doltdb.RootValue, dEnv *env.DoltEnv) *Database {
return &Database{
name: name,
root: root,
dEnv: dEnv,
batchMode: single,
outstandingTables: make(map[string]*DoltTable),
name: name,
root: root,
dEnv: dEnv,
batchMode: single,
tables: make(map[string]*DoltTable),
}
}
@@ -59,11 +59,11 @@ func NewDatabase(name string, root *doltdb.RootValue, dEnv *env.DoltEnv) *Databa
// commit any outstanding edits.
func NewBatchedDatabase(name string, root *doltdb.RootValue, dEnv *env.DoltEnv) *Database {
return &Database{
name: name,
root: root,
dEnv: dEnv,
batchMode: batched,
outstandingTables: make(map[string]*DoltTable),
name: name,
root: root,
dEnv: dEnv,
batchMode: batched,
tables: make(map[string]*DoltTable),
}
}
@@ -101,6 +101,10 @@ func (db *Database) GetTableInsensitive(ctx context.Context, tblName string) (sq
return nil, false, nil
}
if db.tables[exactName] != nil {
return db.tables[exactName], true, nil
}
tbl, ok, err := db.root.GetTable(ctx, exactName)
if err != nil {
@@ -115,12 +119,8 @@ func (db *Database) GetTableInsensitive(ctx context.Context, tblName string) (sq
return nil, false, err
}
table := &DoltTable{name: tblName, table: tbl, sch: sch, db: db}
if db.batchMode == batched {
db.outstandingTables[tblName] = table
}
return table, true, nil
db.tables[exactName] = &DoltTable{name: exactName, table: tbl, sch: sch, db: db}
return db.tables[exactName], true, nil
}
func (db *Database) GetTableNames(ctx context.Context) ([]string, error) {
@@ -205,7 +205,7 @@ func (db *Database) CreateTable(ctx *sql.Context, tableName string, schema sql.S
// Flushes the current batch of outstanding changes and returns any errors.
func (db *Database) Flush(ctx context.Context) error {
for _, table := range db.outstandingTables {
for _, table := range db.tables {
if err := table.flushBatchedEdits(ctx); err != nil {
return err
}

View File

@@ -12,18 +12,22 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package sql
package sqle
import (
"context"
"fmt"
"github.com/google/go-cmp/cmp"
sqle "github.com/src-d/go-mysql-server"
"github.com/src-d/go-mysql-server/sql"
"io"
"math/rand"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/vt/sqlparser"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/dtestutils"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/row"
@@ -63,17 +67,14 @@ func TestSqlBatchInserts(t *testing.T) {
CreateTestDatabase(dEnv, t)
root, _ := dEnv.WorkingRoot(ctx)
batcher := NewSqlBatcher(dEnv.DoltDB, root)
engine := sqle.NewDefault()
db := NewBatchedDatabase("dolt", root, dEnv)
engine.AddDatabase(db)
for _, stmt := range insertStatements {
statement, err := sqlparser.Parse(stmt)
_, rowIter, err := engine.Query(sql.NewEmptyContext(), stmt)
require.NoError(t, err)
insertStmt, ok := statement.(*sqlparser.Insert)
require.True(t, ok)
result, err := ExecuteBatchInsert(context.Background(), root, insertStmt, batcher)
require.NoError(t, err)
assert.True(t, result.NumRowsInserted > 0)
assert.Equal(t, 0, result.NumRowsUpdated)
assert.Equal(t, 0, result.NumErrorsIgnored)
drainIter(rowIter)
}
// Before committing the batch, the database should be unchanged from its original state
@@ -89,7 +90,7 @@ func TestSqlBatchInserts(t *testing.T) {
assert.ElementsMatch(t, AllAppsRows, allAppearanceRows)
// Now commit the batch and check for new rows
root, err = batcher.Commit(ctx)
err = db.Flush(ctx)
require.NoError(t, err)
var expectedPeople, expectedEpisodes, expectedAppearances []row.Row
@@ -124,6 +125,7 @@ func TestSqlBatchInserts(t *testing.T) {
newAppsRow(11, 9),
)
root = db.Root()
allPeopleRows, err = GetAllRows(root, PeopleTableName)
require.NoError(t, err)
allEpsRows, err = GetAllRows(root, EpisodesTableName)
@@ -136,83 +138,93 @@ func TestSqlBatchInserts(t *testing.T) {
assertRowSetsEqual(t, expectedAppearances, allAppearanceRows)
}
func TestSqlBatchInsertIgnoreReplace(t *testing.T) {
insertStatements := []string{
`replace into people (id, first, last, is_married, age, rating, uuid, num_episodes) values
(0, "Maggie", "Simpson", false, 1, 5.1, '00000000-0000-0000-0000-000000000007', 677)`,
`insert ignore into people values
(2, "Milhouse", "VanHouten", false, 1, 5.1, '00000000-0000-0000-0000-000000000008', 677)`,
}
numRowsUpdated := []int{1, 0}
numErrorsIgnored := []int{0, 1}
dEnv := dtestutils.CreateTestEnv()
ctx := context.Background()
CreateTestDatabase(dEnv, t)
root, _ := dEnv.WorkingRoot(ctx)
batcher := NewSqlBatcher(dEnv.DoltDB, root)
for i := range insertStatements {
stmt := insertStatements[i]
statement, err := sqlparser.Parse(stmt)
require.NoError(t, err)
insertStmt, ok := statement.(*sqlparser.Insert)
require.True(t, ok)
result, err := ExecuteBatchInsert(context.Background(), root, insertStmt, batcher)
require.NoError(t, err)
assert.Equal(t, 0, result.NumRowsInserted)
assert.Equal(t, numRowsUpdated[i], result.NumRowsUpdated)
assert.Equal(t, numErrorsIgnored[i], result.NumErrorsIgnored)
}
// Before committing the batch, the database should be unchanged from its original state
allPeopleRows, err := GetAllRows(root, PeopleTableName)
assert.NoError(t, err)
assert.ElementsMatch(t, AllPeopleRows, allPeopleRows)
// Now commit the batch and check for new rows
root, err = batcher.Commit(ctx)
require.NoError(t, err)
var expectedPeople []row.Row
expectedPeople = append(expectedPeople, AllPeopleRows[1:]...) // skip homer
expectedPeople = append(expectedPeople,
NewPeopleRowWithOptionalFields(0, "Maggie", "Simpson", false, 1, 5.1, uuid.MustParse("00000000-0000-0000-0000-000000000007"), 677),
)
allPeopleRows, err = GetAllRows(root, PeopleTableName)
assert.NoError(t, err)
assertRowSetsEqual(t, expectedPeople, allPeopleRows)
}
func TestSqlBatchInsertErrors(t *testing.T) {
insertStatements := []string{
`insert into people (id, first, last, is_married, age, rating, uuid, num_episodes) values
(0, "Maggie", "Simpson", false, 1, 5.1, '00000000-0000-0000-0000-000000000007', 677)`,
`insert into people values
(2, "Milhouse", "VanHouten", false, 1, 5.1, true, 677)`,
}
dEnv := dtestutils.CreateTestEnv()
ctx := context.Background()
CreateTestDatabase(dEnv, t)
root, _ := dEnv.WorkingRoot(ctx)
batcher := NewSqlBatcher(dEnv.DoltDB, root)
for i := range insertStatements {
stmt := insertStatements[i]
statement, err := sqlparser.Parse(stmt)
require.NoError(t, err)
insertStmt, ok := statement.(*sqlparser.Insert)
require.True(t, ok)
_, err = ExecuteBatchInsert(context.Background(), root, insertStmt, batcher)
require.Error(t, err)
func drainIter(iter sql.RowIter) {
for {
_, err := iter.Next()
if err == io.EOF {
return
}
}
}
// func TestSqlBatchInsertIgnoreReplace(t *testing.T) {
// insertStatements := []string{
// `replace into people (id, first, last, is_married, age, rating, uuid, num_episodes) values
// (0, "Maggie", "Simpson", false, 1, 5.1, '00000000-0000-0000-0000-000000000007', 677)`,
// `insert ignore into people values
// (2, "Milhouse", "VanHouten", false, 1, 5.1, '00000000-0000-0000-0000-000000000008', 677)`,
// }
// numRowsUpdated := []int{1, 0}
// numErrorsIgnored := []int{0, 1}
//
// dEnv := dtestutils.CreateTestEnv()
// ctx := context.Background()
//
// CreateTestDatabase(dEnv, t)
// root, _ := dEnv.WorkingRoot(ctx)
//
// batcher := NewSqlBatcher(dEnv.DoltDB, root)
// for i := range insertStatements {
// stmt := insertStatements[i]
// statement, err := sqlparser.Parse(stmt)
// require.NoError(t, err)
// insertStmt, ok := statement.(*sqlparser.Insert)
// require.True(t, ok)
// result, err := ExecuteBatchInsert(context.Background(), root, insertStmt, batcher)
// require.NoError(t, err)
// assert.Equal(t, 0, result.NumRowsInserted)
// assert.Equal(t, numRowsUpdated[i], result.NumRowsUpdated)
// assert.Equal(t, numErrorsIgnored[i], result.NumErrorsIgnored)
// }
//
// // Before committing the batch, the database should be unchanged from its original state
// allPeopleRows, err := GetAllRows(root, PeopleTableName)
// assert.NoError(t, err)
// assert.ElementsMatch(t, AllPeopleRows, allPeopleRows)
//
// // Now commit the batch and check for new rows
// root, err = batcher.Commit(ctx)
// require.NoError(t, err)
//
// var expectedPeople []row.Row
//
// expectedPeople = append(expectedPeople, AllPeopleRows[1:]...) // skip homer
// expectedPeople = append(expectedPeople,
// NewPeopleRowWithOptionalFields(0, "Maggie", "Simpson", false, 1, 5.1, uuid.MustParse("00000000-0000-0000-0000-000000000007"), 677),
// )
//
// allPeopleRows, err = GetAllRows(root, PeopleTableName)
// assert.NoError(t, err)
// assertRowSetsEqual(t, expectedPeople, allPeopleRows)
// }
//
// func TestSqlBatchInsertErrors(t *testing.T) {
// insertStatements := []string{
// `insert into people (id, first, last, is_married, age, rating, uuid, num_episodes) values
// (0, "Maggie", "Simpson", false, 1, 5.1, '00000000-0000-0000-0000-000000000007', 677)`,
// `insert into people values
// (2, "Milhouse", "VanHouten", false, 1, 5.1, true, 677)`,
// }
//
// dEnv := dtestutils.CreateTestEnv()
// ctx := context.Background()
//
// CreateTestDatabase(dEnv, t)
// root, _ := dEnv.WorkingRoot(ctx)
//
// batcher := NewSqlBatcher(dEnv.DoltDB, root)
// for i := range insertStatements {
// stmt := insertStatements[i]
// statement, err := sqlparser.Parse(stmt)
// require.NoError(t, err)
// insertStmt, ok := statement.(*sqlparser.Insert)
// require.True(t, ok)
// _, err = ExecuteBatchInsert(context.Background(), root, insertStmt, batcher)
// require.Error(t, err)
// }
// }
//
func assertRowSetsEqual(t *testing.T, expected, actual []row.Row) {
equal, diff := rowSetsEqual(expected, actual)
assert.True(t, equal, diff)
@@ -244,6 +256,35 @@ func containsRow(rs []row.Row, r row.Row) bool {
return false
}
func rowsEqual(expected, actual row.Row) (bool, string) {
er, ar := make(map[uint64]types.Value), make(map[uint64]types.Value)
_, err := expected.IterCols(func(t uint64, v types.Value) (bool, error) {
er[t] = v
return false, nil
})
if err != nil {
panic(err)
}
_, err = actual.IterCols(func(t uint64, v types.Value) (bool, error) {
ar[t] = v
return false, nil
})
if err != nil {
panic(err)
}
opts := cmp.Options{cmp.AllowUnexported(), dtestutils.FloatComparer}
eq := cmp.Equal(er, ar, opts)
var diff string
if !eq {
diff = cmp.Diff(er, ar, opts)
}
return eq, diff
}
func newPeopleRow(id int, first, last string) row.Row {
vals := row.TaggedValues{
IdTag: types.Int(id),