Merge pull request #2538 from dolthub/aaron/index-edit-accumulator-dropped-edit-fix

doltcore/table/editor: index_edit_accumulator: Correctly order the most recently accumulated edits after all flushed edits.
This commit is contained in:
Aaron Son
2021-12-28 15:05:32 -08:00
committed by GitHub
2 changed files with 49 additions and 6 deletions

View File

@@ -28,9 +28,8 @@ import (
"github.com/dolthub/dolt/go/store/types/edits"
)
const (
indexFlushThreshold = 256 * 1024
)
// var for testing
var indexFlushThreshold int64 = 256 * 1024
type IndexEditAccumulator interface {
// Delete adds a row to be deleted when these edits are eventually applied.
@@ -226,7 +225,7 @@ func (iea *indexEditAccumulatorImpl) Insert(ctx context.Context, keyHash, partia
if iea.flushingUncommitted {
iea.uncommittedEA.AddEdit(key, value)
if iea.uncommitted.ops-iea.lastFlush > flushThreshold {
if iea.uncommitted.ops-iea.lastFlush > indexFlushThreshold {
iea.flushUncommitted()
}
} else if iea.uncommitted.ops > indexFlushThreshold {
@@ -248,7 +247,7 @@ func (iea *indexEditAccumulatorImpl) Delete(ctx context.Context, keyHash, partia
if iea.flushingUncommitted {
iea.uncommittedEA.AddEdit(key, nil)
if iea.uncommitted.ops-iea.lastFlush > flushThreshold {
if iea.uncommitted.ops-iea.lastFlush > indexFlushThreshold {
iea.flushUncommitted()
}
} else if iea.uncommitted.ops > indexFlushThreshold {
@@ -411,10 +410,10 @@ func (iea *indexEditAccumulatorImpl) MaterializeEdits(ctx context.Context, nbf *
}
eps := make([]types.EditProvider, 0, len(flushedEPs)+1)
eps = append(eps, committedEP)
for i := 0; i < len(flushedEPs); i++ {
eps = append(eps, flushedEPs[i].Edits)
}
eps = append(eps, committedEP)
defer func() {
for _, ep := range eps {

View File

@@ -65,6 +65,50 @@ func requireGet(ctx context.Context, t *testing.T, tea TableEditAccumulator, key
require.Equal(t, expected, has)
}
func TestIndexEditAccumulatorStableOrder(t *testing.T) {
origFlushThreshold := flushThreshold
defer func() {
indexFlushThreshold = origFlushThreshold
}()
indexFlushThreshold = 1
ctx := context.Background()
nbf := types.Format_Default
teaf := newTestTEAF()
m, err := types.NewMap(ctx, teaf.vrw)
require.NoError(t, err)
iea := teaf.NewIndexEA(ctx, m).(*indexEditAccumulatorImpl)
h := func(k types.Tuple) hash.Hash {
h, err := k.Hash(nbf)
require.NoError(t, err)
return h
}
k1 := newTuple(t, types.Int(0))
k2 := newTuple(t, types.Int(1))
err = iea.Insert(ctx, h(k1), h(k1), k1, emptyTpl)
require.NoError(t, err)
err = iea.Insert(ctx, h(k2), h(k1), k2, emptyTpl)
require.NoError(t, err)
err = iea.Delete(ctx, h(k1), h(k1), k1, k1)
require.NoError(t, err)
err = iea.Delete(ctx, h(k2), h(k2), k2, k2)
require.NoError(t, err)
err = iea.Insert(ctx, h(k1), h(k1), k1, k1)
require.NoError(t, err)
err = iea.Commit(ctx, nbf)
require.NoError(t, err)
m, err = iea.MaterializeEdits(ctx, nbf)
require.NoError(t, err)
require.Equal(t, uint64(1), m.Len())
}
func TestTableEditAccumulatorStableOrder(t *testing.T) {
origFlushThreshold := flushThreshold
defer func() {