Added batch awareness to table editor / database, needs testing. Reverted change to table editor to disallow changing primary keys, since that is required for some updates statements. Have a bug to fix in the table editor as a result.

Signed-off-by: Zach Musgrave <zach@liquidata.co>
This commit is contained in:
Zach Musgrave
2019-12-04 10:24:40 -08:00
parent 000657175f
commit f3fe71ec31
4 changed files with 57 additions and 22 deletions
+8 -1
View File
@@ -29,15 +29,22 @@ import (
var _ sql.Database = (*Database)(nil)
type batchMode bool
const (
batched batchMode = true
single batchMode = false
)
// Database implements sql.Database for a dolt DB.
type Database struct {
sql.Database
name string
root *doltdb.RootValue
dEnv *env.DoltEnv
batchMode batchMode
}
// NewDatabase returns a new dolt databae to use in queries.
// NewDatabase returns a new dolt database to use in queries.
func NewDatabase(name string, root *doltdb.RootValue, dEnv *env.DoltEnv) *Database {
return &Database{
name: name,
+20 -11
View File
@@ -29,8 +29,8 @@ import (
type tableEditor struct {
t *DoltTable
ed *types.MapEditor
addedKeys map[hash.Hash]types.LesserValuable
removedKeys map[hash.Hash]types.LesserValuable
addedKeys map[hash.Hash]types.Value
removedKeys map[hash.Hash]types.Value
}
var _ sql.RowReplacer = (*tableEditor)(nil)
@@ -41,8 +41,8 @@ var _ sql.RowDeleter = (*tableEditor)(nil)
func newTableEditor(t *DoltTable) *tableEditor {
return &tableEditor{
t: t,
addedKeys: make(map[hash.Hash]types.LesserValuable),
removedKeys: make(map[hash.Hash]types.LesserValuable),
addedKeys: make(map[hash.Hash]types.Value),
removedKeys: make(map[hash.Hash]types.Value),
}
}
@@ -150,15 +150,15 @@ func (te *tableEditor) Update(ctx *sql.Context, oldRow sql.Row, newRow sql.Row)
return err
}
if _, ok := te.addedKeys[newHash]; ok {
return errors.New("Cannot update key before flushing current batch")
}
if _, ok := te.addedKeys[oldHash]; ok {
return errors.New("Cannot update key before flushing current batch")
}
// if _, ok := te.addedKeys[newHash]; ok {
// return errors.New("Cannot update key before flushing current batch")
// }
// if _, ok := te.addedKeys[oldHash]; ok {
// return errors.New("Cannot update key before flushing current batch")
// }
te.addedKeys[newHash] = dNewKeyVal
te.removedKeys[oldHash] = dOldKey
te.removedKeys[oldHash] = dOldKeyVal
}
if te.ed == nil {
@@ -172,7 +172,16 @@ func (te *tableEditor) Update(ctx *sql.Context, oldRow sql.Row, newRow sql.Row)
return nil
}
// Close implements Closer
func (te *tableEditor) Close(ctx *sql.Context) error {
// If we're running in batched mode, don't flush the edits until explicitly told to do so by the parent table.
if te.t.db.batchMode == batched {
return nil
}
return te.flush(ctx)
}
func (te *tableEditor) flush(ctx context.Context) error {
// For all added keys, check for and report a collision
for hash, addedKey := range te.addedKeys {
if _, ok := te.removedKeys[hash]; !ok {
@@ -140,7 +140,10 @@ func TestTableEditor(t *testing.T) {
expectedErr = ed.Update(ctx, r(edna, PeopleTestSchema), r(MutateRow(edna, IdTag, 30), PeopleTestSchema))
},
selectQuery: "select * from people where id >= 10",
expectedErr: "Cannot update key before flushing current batch",
expectedRows: CompressRows(PeopleTestSchema,
krusty,
MutateRow(edna, IdTag, 30),
),
},
}
+25 -9
View File
@@ -25,7 +25,6 @@ import (
"github.com/liquidata-inc/dolt/go/cmd/dolt/errhand"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/doltdb"
"github.com/liquidata-inc/dolt/go/libraries/doltcore/schema"
"github.com/liquidata-inc/dolt/go/store/hash"
"github.com/liquidata-inc/dolt/go/store/types"
)
@@ -35,6 +34,7 @@ type DoltTable struct {
table *doltdb.Table
sch schema.Schema
db *Database
ed *tableEditor
}
var _ sql.Table = (*DoltTable)(nil)
@@ -108,26 +108,42 @@ func (t *DoltTable) PartitionRows(ctx *sql.Context, _ sql.Partition) (sql.RowIte
// Inserter implements sql.InsertableTable
func (t *DoltTable) Inserter(ctx *sql.Context) sql.RowInserter {
return t.getTableEditor()
}
func (t *DoltTable) getTableEditor() *tableEditor {
if t.db.batchMode == batched {
if t.ed != nil {
return t.ed
}
t.ed = newTableEditor(t)
return t.ed
}
return newTableEditor(t)
}
func (t *DoltTable) flushBatchedEdits(ctx context.Context) error {
if t.ed != nil {
err := t.ed.flush(ctx)
t.ed = nil
return err
}
return nil
}
// Deleter implements sql.DeletableTable
func (t *DoltTable) Deleter(*sql.Context) sql.RowDeleter {
return newTableEditor(t)
return t.getTableEditor()
}
// Replacer implements sql.ReplaceableTable
func (t *DoltTable) Replacer(ctx *sql.Context) sql.RowReplacer {
return newTableEditor(t)
return t.getTableEditor()
}
// Updater implements sql.UpdatableTable
func (t *DoltTable) Updater(ctx *sql.Context) sql.RowUpdater {
return &tableEditor{
t: t,
addedKeys: make(map[hash.Hash]types.LesserValuable),
removedKeys: make(map[hash.Hash]types.LesserValuable),
}
return t.getTableEditor()
}
// doltTablePartitionIter, an object that knows how to return the single partition exactly once.
@@ -164,7 +180,7 @@ func (p doltTablePartition) Key() []byte {
return []byte(partitionName)
}
func (t *DoltTable) updateTable(ctx *sql.Context, mapEditor *types.MapEditor) error {
func (t *DoltTable) updateTable(ctx context.Context, mapEditor *types.MapEditor) error {
updated, err := mapEditor.Map(ctx)
if err != nil {
return errhand.BuildDError("failed to modify table").AddCause(err).Build()