Factored table editor functions into their own file

Signed-off-by: Zach Musgrave <zach@liquidata.co>
This commit is contained in:
Zach Musgrave
2019-12-03 12:22:57 -08:00
parent 3de3580714
commit 7f1d268dac
3 changed files with 203 additions and 174 deletions
+192
View File
@@ -0,0 +1,192 @@
// Copyright 2019 Liquidata, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqle
import (
"context"
"errors"
"fmt"
"github.com/liquidata-inc/dolt/go/cmd/dolt/errhand"
"github.com/liquidata-inc/dolt/go/store/hash"
"github.com/liquidata-inc/dolt/go/store/types"
"github.com/src-d/go-mysql-server/sql"
)
// tableEditor supports making multiple row edits (inserts, updates, deletes) to a table. It does error checking for key
// collision etc. in the Close() method, as well as during Insert / Update.
type tableEditor struct {
t *DoltTable
ed *types.MapEditor
addedKeys map[hash.Hash]types.LesserValuable
removedKeys map[hash.Hash]types.LesserValuable
}
var _ sql.RowReplacer = (*tableEditor)(nil)
var _ sql.RowUpdater = (*tableEditor)(nil)
var _ sql.RowInserter = (*tableEditor)(nil)
var _ sql.RowDeleter = (*tableEditor)(nil)
func newTableEditor(t *DoltTable) *tableEditor {
return &tableEditor{
t: t,
addedKeys: make(map[hash.Hash]types.LesserValuable),
removedKeys: make(map[hash.Hash]types.LesserValuable),
}
}
func (te *tableEditor) Insert(ctx *sql.Context, sqlRow sql.Row) error {
dRow, err := SqlRowToDoltRow(te.t.table.Format(), sqlRow, te.t.sch)
if err != nil {
return err
}
key, err := dRow.NomsMapKey(te.t.sch).Value(ctx)
if err != nil {
return errhand.BuildDError("failed to get row key").AddCause(err).Build()
}
hash, err := key.Hash(dRow.Format())
if err != nil {
return err
}
// If we've already inserted this key as part of this insert operation, that's an error. Inserting a row that already
// exists in the table will be handled in Close().
if _, ok := te.addedKeys[hash]; ok {
return errors.New("duplicate primary key given")
}
te.addedKeys[hash] = key
if te.ed == nil {
te.ed, err = te.t.newMapEditor(ctx)
if err != nil {
return err
}
}
te.ed = te.ed.Set(key, dRow.NomsMapValue(te.t.sch))
return nil
}
func (te *tableEditor) Delete(ctx *sql.Context, sqlRow sql.Row) error {
dRow, err := SqlRowToDoltRow(te.t.table.Format(), sqlRow, te.t.sch)
if err != nil {
return err
}
key, err := dRow.NomsMapKey(te.t.sch).Value(ctx)
if err != nil {
return errhand.BuildDError("failed to get row key").AddCause(err).Build()
}
hash, err := key.Hash(dRow.Format())
if err != nil {
return err
}
delete(te.addedKeys, hash)
te.removedKeys[hash] = key
if te.ed == nil {
te.ed, err = te.t.newMapEditor(ctx)
if err != nil {
return err
}
}
te.ed = te.ed.Remove(key)
return nil
}
func (t *DoltTable) newMapEditor(ctx context.Context) (*types.MapEditor, error) {
typesMap, err := t.table.GetRowData(ctx)
if err != nil {
return nil, errhand.BuildDError("failed to get row data.").AddCause(err).Build()
}
return typesMap.Edit(), nil
}
func (tu *tableEditor) Update(ctx *sql.Context, oldRow sql.Row, newRow sql.Row) error {
dOldRow, err := SqlRowToDoltRow(tu.t.table.Format(), oldRow, tu.t.sch)
if err != nil {
return err
}
dNewRow, err := SqlRowToDoltRow(tu.t.table.Format(), newRow, tu.t.sch)
if err != nil {
return err
}
// If the PK is changed then we have to delete the old row first
dOldKey := dOldRow.NomsMapKey(tu.t.sch)
dOldKeyVal, err := dOldKey.Value(ctx)
if err != nil {
return err
}
dNewKey := dNewRow.NomsMapKey(tu.t.sch)
dNewKeyVal, err := dNewKey.Value(ctx)
if err != nil {
return err
}
if tu.ed == nil {
tu.ed, err = tu.t.newMapEditor(ctx)
if err != nil {
return err
}
}
if !dOldKeyVal.Equals(dNewKeyVal) {
oldHash, err := dOldKeyVal.Hash(dOldRow.Format())
if err != nil {
return err
}
newHash, err := dNewKeyVal.Hash(dNewRow.Format())
if err != nil {
return err
}
tu.addedKeys[newHash] = dNewKeyVal
tu.removedKeys[oldHash] = dOldKey
}
tu.ed.Set(dNewKey, dNewRow.NomsMapValue(tu.t.sch))
return nil
}
func (tu *tableEditor) Close(ctx *sql.Context) error {
// For all added keys, check for and report a collision
for hash, addedKey := range tu.addedKeys {
if _, ok := tu.removedKeys[hash]; !ok {
_, rowExists, err := tu.t.table.GetRow(ctx, addedKey.(types.Tuple), tu.t.sch)
if err != nil {
return errhand.BuildDError("failed to read table").AddCause(err).Build()
}
if rowExists {
return fmt.Errorf("primary key collision: (%v)", addedKey)
}
}
}
// For all removed keys, remove the map entries that weren't added elsewhere by other updates
for hash, removedKey := range tu.removedKeys {
if _, ok := tu.addedKeys[hash]; !ok {
tu.ed.Remove(removedKey)
}
}
if tu.ed != nil {
return tu.t.updateTable(ctx, tu.ed)
}
return nil
}
+11 -174
View File
@@ -29,12 +29,6 @@ import (
"github.com/liquidata-inc/dolt/go/store/types"
)
var _ sql.Table = (*DoltTable)(nil)
var _ sql.UpdatableTable = (*DoltTable)(nil)
var _ sql.DeletableTable = (*DoltTable)(nil)
var _ sql.InsertableTable = (*DoltTable)(nil)
var _ sql.ReplaceableTable = (*DoltTable)(nil)
// DoltTable implements the sql.Table interface and gives access to dolt table rows and schema.
type DoltTable struct {
name string
@@ -43,6 +37,12 @@ type DoltTable struct {
db *Database
}
var _ sql.Table = (*DoltTable)(nil)
var _ sql.UpdatableTable = (*DoltTable)(nil)
var _ sql.DeletableTable = (*DoltTable)(nil)
var _ sql.InsertableTable = (*DoltTable)(nil)
var _ sql.ReplaceableTable = (*DoltTable)(nil)
// Implements sql.IndexableTable
func (t *DoltTable) WithIndexLookup(lookup sql.IndexLookup) sql.Table {
dil, ok := lookup.(*doltIndexLookup)
@@ -106,185 +106,22 @@ func (t *DoltTable) PartitionRows(ctx *sql.Context, _ sql.Partition) (sql.RowIte
return newRowIterator(t, ctx)
}
func newTableEditor(t *DoltTable) *tableEditor {
return &tableEditor{
t: t,
addedKeys: make(map[hash.Hash]types.LesserValuable),
removedKeys: make(map[hash.Hash]types.LesserValuable),
}
}
var _ sql.RowReplacer = (*tableEditor)(nil)
var _ sql.RowUpdater = (*tableEditor)(nil)
var _ sql.RowInserter = (*tableEditor)(nil)
var _ sql.RowDeleter = (*tableEditor)(nil)
func (te *tableEditor) Insert(ctx *sql.Context, sqlRow sql.Row) error {
dRow, err := SqlRowToDoltRow(te.t.table.Format(), sqlRow, te.t.sch)
if err != nil {
return err
}
key, err := dRow.NomsMapKey(te.t.sch).Value(ctx)
if err != nil {
return errhand.BuildDError("failed to get row key").AddCause(err).Build()
}
hash, err := key.Hash(dRow.Format())
if err != nil {
return err
}
// If we've already inserted this key as part of this insert operation, that's an error. Inserting a row that already
// exists in the table will be handled in Close().
if _, ok := te.addedKeys[hash]; ok {
return errors.New("duplicate primary key given")
}
te.addedKeys[hash] = key
if te.ed == nil {
te.ed, err = te.t.newMapEditor(ctx)
if err != nil {
return err
}
}
te.ed = te.ed.Set(key, dRow.NomsMapValue(te.t.sch))
return nil
}
func (t *DoltTable) newMapEditor(ctx context.Context) (*types.MapEditor, error) {
typesMap, err := t.table.GetRowData(ctx)
if err != nil {
return nil, errhand.BuildDError("failed to get row data.").AddCause(err).Build()
}
return typesMap.Edit(), nil
}
func (te *tableEditor) Delete(ctx *sql.Context, sqlRow sql.Row) error {
dRow, err := SqlRowToDoltRow(te.t.table.Format(), sqlRow, te.t.sch)
if err != nil {
return err
}
key, err := dRow.NomsMapKey(te.t.sch).Value(ctx)
if err != nil {
return errhand.BuildDError("failed to get row key").AddCause(err).Build()
}
hash, err := key.Hash(dRow.Format())
if err != nil {
return err
}
delete(te.addedKeys, hash)
te.removedKeys[hash] = key
if te.ed == nil {
te.ed, err = te.t.newMapEditor(ctx)
if err != nil {
return err
}
}
te.ed = te.ed.Remove(key)
return nil
}
// tableEditor supports making multiple row edits (inserts, updates, deletes) to a table. It does error checking for key
// collision etc. in the Close() method.
type tableEditor struct {
t *DoltTable
ed *types.MapEditor
addedKeys map[hash.Hash]types.LesserValuable
removedKeys map[hash.Hash]types.LesserValuable
}
func (tu *tableEditor) Close(ctx *sql.Context) error {
// For all added keys, check for and report a collision
for hash, addedKey := range tu.addedKeys {
if _, ok := tu.removedKeys[hash]; !ok {
_, rowExists, err := tu.t.table.GetRow(ctx, addedKey.(types.Tuple), tu.t.sch)
if err != nil {
return errhand.BuildDError("failed to read table").AddCause(err).Build()
}
if rowExists {
return fmt.Errorf("primary key collision: (%v)", addedKey)
}
}
}
// For all removed keys, remove the map entries that weren't added elsewhere by other updates
for hash, removedKey := range tu.removedKeys {
if _, ok := tu.addedKeys[hash]; !ok {
tu.ed.Remove(removedKey)
}
}
if tu.ed != nil {
return tu.t.updateTable(ctx, tu.ed)
}
return nil
}
func (tu *tableEditor) Update(ctx *sql.Context, oldRow sql.Row, newRow sql.Row) error {
dOldRow, err := SqlRowToDoltRow(tu.t.table.Format(), oldRow, tu.t.sch)
if err != nil {
return err
}
dNewRow, err := SqlRowToDoltRow(tu.t.table.Format(), newRow, tu.t.sch)
if err != nil {
return err
}
// If the PK is changed then we have to delete the old row first
dOldKey := dOldRow.NomsMapKey(tu.t.sch)
dOldKeyVal, err := dOldKey.Value(ctx)
if err != nil {
return err
}
dNewKey := dNewRow.NomsMapKey(tu.t.sch)
dNewKeyVal, err := dNewKey.Value(ctx)
if err != nil {
return err
}
if tu.ed == nil {
tu.ed, err = tu.t.newMapEditor(ctx)
if err != nil {
return err
}
}
if !dOldKeyVal.Equals(dNewKeyVal) {
oldHash, err := dOldKeyVal.Hash(dOldRow.Format())
if err != nil {
return err
}
newHash, err := dNewKeyVal.Hash(dNewRow.Format())
if err != nil {
return err
}
tu.addedKeys[newHash] = dNewKeyVal
tu.removedKeys[oldHash] = dOldKey
}
tu.ed.Set(dNewKey, dNewRow.NomsMapValue(tu.t.sch))
return nil
}
// Inserter implements sql.InsertableTable
func (t *DoltTable) Inserter(ctx *sql.Context) sql.RowInserter {
return newTableEditor(t)
}
// Deleter implements sql.DeletableTable
func (t *DoltTable) Deleter(*sql.Context) sql.RowDeleter {
return newTableEditor(t)
}
// Replacer implements sql.ReplaceableTable
func (t *DoltTable) Replacer(ctx *sql.Context) sql.RowReplacer {
return newTableEditor(t)
}
// Updater implements sql.UpdatableTable
func (t *DoltTable) Updater(ctx *sql.Context) sql.RowUpdater {
return &tableEditor{
t: t,
@@ -346,4 +183,4 @@ func (t *DoltTable) updateTable(ctx *sql.Context, mapEditor *types.MapEditor) er
t.table = newTable
t.db.root = newRoot
return nil
}
}