Prevent panic when iea capacity is reached and undo operations are attempted

This commit is contained in:
Daylon Wilkins
2021-10-07 14:09:56 -07:00
committed by Daylon Wilkins
parent fe2fdd8ddd
commit 87b7ed61d1
4 changed files with 110 additions and 27 deletions

View File

@@ -177,7 +177,7 @@ type BulkImportIEA struct {
}
// Delete adds a row to be deleted when these edits are eventually applied.
func (iea *BulkImportIEA) Delete(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, partialKey, value types.Tuple) error {
func (iea *BulkImportIEA) Delete(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, value types.Tuple) error {
// key is stored in iea.ea, keyHash is stored in iea.deletes. Capacity is just an estimate and gets off if a key is added and/or deleted more than once.
if iea.capMon.capacityExceeded(key.Size()) {
return errors.New("capacity exceeded")
@@ -194,7 +194,7 @@ func (iea *BulkImportIEA) Delete(ctx context.Context, keyHash, partialKeyHash ha
}
// Insert adds a row to be inserted when these edits are eventually applied.
func (iea *BulkImportIEA) Insert(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, partialKey types.Tuple, val types.Tuple) error {
func (iea *BulkImportIEA) Insert(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, val types.Tuple) error {
// key and val are stored in the iea.ea, keyHash is stored in iea.adds, and iea.partialAdds. partialKeyHash is stored in iea.partialAdds[keyHash].
// Capacity is just an estimate and gets off if a key is added and/or deleted more than once.
size := key.Size() + val.Size() + (3 * hash.ByteLen)

View File

@@ -34,10 +34,10 @@ const (
type IndexEditAccumulator interface {
// Delete adds a row to be deleted when these edits are eventually applied.
Delete(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, partialKey, value types.Tuple) error
Delete(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, value types.Tuple) error
// Insert adds a row to be inserted when these edits are eventually applied.
Insert(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, partialKey types.Tuple, value types.Tuple) error
Insert(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, value types.Tuple) error
// Has returns true if the current TableEditAccumulator contains the given key, or it exists in the row data.
Has(ctx context.Context, keyHash hash.Hash, key types.Tuple) (bool, error)
@@ -167,6 +167,8 @@ type indexEditAccumulatorImpl struct {
uncommittedEAId uint64
}
var _ IndexEditAccumulator = (*indexEditAccumulatorImpl)(nil)
func (iea *indexEditAccumulatorImpl) flushUncommitted() {
// if we are not already actively writing edits to the uncommittedEA then change the state and push all in mem edits
// to a types.EditAccumulator
@@ -208,7 +210,7 @@ func (iea *indexEditAccumulatorImpl) flushUncommitted() {
}
// Insert adds a row to be inserted when these edits are eventually applied.
func (iea *indexEditAccumulatorImpl) Insert(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, partialKey types.Tuple, value types.Tuple) error {
func (iea *indexEditAccumulatorImpl) Insert(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, value types.Tuple) error {
if _, ok := iea.uncommitted.deletes[keyHash]; ok {
delete(iea.uncommitted.deletes, keyHash)
} else {
@@ -234,7 +236,7 @@ func (iea *indexEditAccumulatorImpl) Insert(ctx context.Context, keyHash, partia
}
// Delete adds a row to be deleted when these edits are eventually applied.
func (iea *indexEditAccumulatorImpl) Delete(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, partialKey, value types.Tuple) error {
func (iea *indexEditAccumulatorImpl) Delete(ctx context.Context, keyHash, partialKeyHash hash.Hash, key, value types.Tuple) error {
if _, ok := iea.uncommitted.adds[keyHash]; ok {
delete(iea.uncommitted.adds, keyHash)
delete(iea.uncommitted.partialAdds[partialKeyHash], keyHash)

View File

@@ -68,11 +68,12 @@ func (u *uniqueKeyErr) Error() string {
type IndexEditor struct {
nbf *types.NomsBinFormat
idxSch schema.Schema
tblSch schema.Schema
idx schema.Index
iea IndexEditAccumulator
stack indexOperationStack
idxSch schema.Schema
tblSch schema.Schema
idx schema.Index
iea IndexEditAccumulator
stack indexOperationStack
permanentErr error // If this is set then we should always return this error as the IndexEditor is no longer usable
// This mutex blocks on each operation, so that map reads and updates are serialized
writeMutex *sync.Mutex
@@ -81,12 +82,13 @@ type IndexEditor struct {
// NewIndexEditor creates a new index editor
func NewIndexEditor(ctx context.Context, index schema.Index, indexData types.Map, tableSch schema.Schema, opts Options) *IndexEditor {
ie := &IndexEditor{
idxSch: index.Schema(),
tblSch: tableSch,
idx: index,
iea: opts.Deaf.NewIndexEA(ctx, indexData),
nbf: indexData.Format(),
writeMutex: &sync.Mutex{},
idxSch: index.Schema(),
tblSch: tableSch,
idx: index,
iea: opts.Deaf.NewIndexEA(ctx, indexData),
nbf: indexData.Format(),
permanentErr: nil,
writeMutex: &sync.Mutex{},
}
return ie
}
@@ -106,6 +108,10 @@ func (ie *IndexEditor) InsertRow(ctx context.Context, key, partialKey types.Tupl
ie.writeMutex.Lock()
defer ie.writeMutex.Unlock()
if ie.permanentErr != nil {
return ie.permanentErr
}
if ie.idx.IsUnique() {
if matches, err := ie.iea.HasPartial(ctx, ie.idxSch, partialKeyHash, partialKey); err != nil {
return err
@@ -126,7 +132,7 @@ func (ie *IndexEditor) InsertRow(ctx context.Context, key, partialKey types.Tupl
}
}
err = ie.iea.Insert(ctx, keyHash, partialKeyHash, key, partialKey, value)
err = ie.iea.Insert(ctx, keyHash, partialKeyHash, key, value)
if err != nil {
return err
}
@@ -149,7 +155,11 @@ func (ie *IndexEditor) DeleteRow(ctx context.Context, key, partialKey, value typ
ie.writeMutex.Lock()
defer ie.writeMutex.Unlock()
err = ie.iea.Delete(ctx, keyHash, partialKeyHash, key, partialKey, value)
if ie.permanentErr != nil {
return ie.permanentErr
}
err = ie.iea.Delete(ctx, keyHash, partialKeyHash, key, value)
if err != nil {
return err
}
@@ -168,6 +178,10 @@ func (ie *IndexEditor) HasPartial(ctx context.Context, partialKey types.Tuple) (
ie.writeMutex.Lock()
defer ie.writeMutex.Unlock()
if ie.permanentErr != nil {
return false, ie.permanentErr
}
tpls, err := ie.iea.HasPartial(ctx, ie.idxSch, partialKeyHash, partialKey)
if err != nil {
return false, err
@@ -180,9 +194,16 @@ func (ie *IndexEditor) HasPartial(ctx context.Context, partialKey types.Tuple) (
// Insert on a key, it will use Delete on that same key. The stack size is very small, therefore too many consecutive
// calls will cause the stack to empty. This should only be called in the event that an operation was performed that
// has failed for other reasons, such as an INSERT on the parent table failing on a separate index editor. In the event
// that Undo is called and there are no operations to undo OR the reverse operation fails (it never should), this panics
// rather than errors, as the index editor is in an invalid state that cannot be corrected.
// that Undo is called and there are no operations to undo OR the reverse operation fails (such as memory capacity
// reached), then we set a permanent error as the index editor is in an invalid state that cannot be corrected.
//
// We don't return an error here as Undo will only be called when there's an error in a different editor. We allow the
// user to handle that initial error, as ALL further calls to this IndexEditor will return the error set here.
func (ie *IndexEditor) Undo(ctx context.Context) {
if ie.permanentErr != nil {
return
}
indexOp, ok := ie.stack.Pop()
if !ok {
panic(fmt.Sprintf("attempted to undo the last operation on index '%s' but failed due to an empty stack", ie.idx.Name()))
@@ -195,16 +216,18 @@ func (ie *IndexEditor) Undo(ctx context.Context) {
if indexOp.isInsert {
err := ie.DeleteRow(ctx, indexOp.fullKey, indexOp.partialKey, indexOp.value)
if err != nil {
panic(fmt.Sprintf("index '%s' is in an invalid and unrecoverable state: "+
ie.permanentErr = fmt.Errorf("index '%s' is in an invalid and unrecoverable state: "+
"attempted to undo previous insertion but encountered the following error: %v",
ie.idx.Name(), err))
ie.idx.Name(), err)
return
}
} else {
err := ie.InsertRow(ctx, indexOp.fullKey, indexOp.partialKey, indexOp.value)
if err != nil {
panic(fmt.Sprintf("index '%s' is in an invalid and unrecoverable state: "+
ie.permanentErr = fmt.Errorf("index '%s' is in an invalid and unrecoverable state: "+
"attempted to undo previous deletion but encountered the following error: %v",
ie.idx.Name(), err))
ie.idx.Name(), err)
return
}
}
}
@@ -214,6 +237,10 @@ func (ie *IndexEditor) Map(ctx context.Context) (types.Map, error) {
ie.writeMutex.Lock()
defer ie.writeMutex.Unlock()
if ie.permanentErr != nil {
return types.EmptyMap, ie.permanentErr
}
return ie.iea.MaterializeEdits(ctx, ie.nbf)
}
@@ -231,7 +258,9 @@ func (ie *IndexEditor) StatementFinished(ctx context.Context, errored bool) erro
ie.writeMutex.Lock()
defer ie.writeMutex.Unlock()
if errored {
if ie.permanentErr != nil {
return ie.permanentErr
} else if errored {
return ie.iea.Rollback(ctx)
}
@@ -240,7 +269,7 @@ func (ie *IndexEditor) StatementFinished(ctx context.Context, errored bool) erro
// Close is a no-op for an IndexEditor.
func (ie *IndexEditor) Close() error {
return nil
return ie.permanentErr
}
func RebuildIndex(ctx context.Context, tbl *doltdb.Table, indexName string, opts Options) (types.Map, error) {

View File

@@ -735,6 +735,58 @@ func TestIndexRebuildingUniqueFailTwoCol(t *testing.T) {
require.Error(t, err)
}
func TestIndexEditorCapacityExceeded(t *testing.T) {
// In the event that we reach the iea capacity on Undo, we need to verify that all code paths fail and remain failing
ctx := context.Background()
format := types.Format_Default
db, err := dbfactory.MemFactory{}.CreateDB(ctx, format, nil, nil)
require.NoError(t, err)
colColl := schema.NewColCollection(
schema.NewColumn("pk", 0, types.IntKind, true),
schema.NewColumn("v1", 1, types.IntKind, false))
tableSch, err := schema.SchemaFromCols(colColl)
require.NoError(t, err)
index, err := tableSch.Indexes().AddIndexByColNames("idx_cap", []string{"v1"}, schema.IndexProperties{IsUnique: false, Comment: ""})
require.NoError(t, err)
indexSch := index.Schema()
emptyMap, err := types.NewMap(ctx, db)
require.NoError(t, err)
opts := Options{Deaf: NewInMemDeafWithMaxCapacity(format, 224)}
indexEditor := NewIndexEditor(ctx, index, emptyMap, tableSch, opts)
for i := 0; i < 3; i++ {
dRow, err := row.New(format, indexSch, row.TaggedValues{
0: types.Int(i),
1: types.Int(i),
})
require.NoError(t, err)
fullKey, partialKey, value, err := dRow.ReduceToIndexKeys(index, nil)
require.NoError(t, err)
require.NoError(t, indexEditor.InsertRow(ctx, fullKey, partialKey, value))
}
dRow, err := row.New(format, indexSch, row.TaggedValues{
0: types.Int(4),
1: types.Int(4),
})
require.NoError(t, err)
fullKey, partialKey, value, err := dRow.ReduceToIndexKeys(index, nil)
require.NoError(t, err)
err = indexEditor.InsertRow(ctx, fullKey, partialKey, value)
require.Error(t, err)
require.Equal(t, "capacity exceeded", err.Error())
indexEditor.Undo(ctx) // This sets the unrecoverable state error, but does not return an error itself
require.Contains(t, indexEditor.InsertRow(ctx, fullKey, partialKey, value).Error(), "unrecoverable state")
require.Contains(t, indexEditor.DeleteRow(ctx, fullKey, partialKey, value).Error(), "unrecoverable state")
require.Contains(t, indexEditor.StatementFinished(ctx, false).Error(), "unrecoverable state")
require.Contains(t, indexEditor.Close().Error(), "unrecoverable state")
_, err = indexEditor.HasPartial(ctx, partialKey)
require.Contains(t, err.Error(), "unrecoverable state")
_, err = indexEditor.Map(ctx)
require.Contains(t, err.Error(), "unrecoverable state")
}
func createTestRowData(t *testing.T, vrw types.ValueReadWriter, sch schema.Schema) (types.Map, []row.Row) {
return createTestRowDataFromTaggedValues(t, vrw, sch,
row.TaggedValues{