From f86a11de6b3f25214094ee3f617413a816f16151 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 21 Oct 2021 14:25:30 -0700 Subject: [PATCH] go/store/types/edits: Changes to ensure AsyncSortedEdits always returned edits in stable order based on how they were added. The way the SQL layer issues UPDATE statements to the storage layer, combined with the change which introduced table_edit_accumulator meant that a Map Delete followed by a Map Insert at certain batch sizes were being sent to an edit accumulator and then fetched back out of it. When those come back from the edit accumulator, they need to come back in stable order, so that the update doesn't turn into a set followed by a delete instead. AsyncSortedEdits did not currently make attempts to keep these edits in stable order. In some places it's explicitly backwards (sorted_edit_itr for example). This fixes it so that it is always stable. Potentially regresses a memory utilization and sort size smoothing optimization based around pairing small buffers with large buffers early in the process. Also includes a fix to table_edit_accumulator to correctly order its currently-in-progress commited edit provider when it is doing its stable merge. --- .../table/editor/table_edit_accumulator.go | 2 +- go/store/types/edits/async_sorted_edits.go | 17 +++++++---------- go/store/types/edits/kvp_collection.go | 6 +++--- go/store/types/edits/sorted_edit_itr.go | 12 ++++++------ 4 files changed, 17 insertions(+), 20 deletions(-) diff --git a/go/libraries/doltcore/table/editor/table_edit_accumulator.go b/go/libraries/doltcore/table/editor/table_edit_accumulator.go index 4df77d9e38..a7dff864dd 100644 --- a/go/libraries/doltcore/table/editor/table_edit_accumulator.go +++ b/go/libraries/doltcore/table/editor/table_edit_accumulator.go @@ -332,10 +332,10 @@ func (tea *tableEditAccumulatorImpl) MaterializeEdits(ctx context.Context, nbf * } eps := make([]types.EditProvider, 0, len(flushedEPs)+1) - eps = append(eps, committedEP) for i := 0; i < len(flushedEPs); i++ { eps = append(eps, flushedEPs[i].Edits) } + eps = append(eps, committedEP) defer func() { for _, ep := range eps { diff --git a/go/store/types/edits/async_sorted_edits.go b/go/store/types/edits/async_sorted_edits.go index 42d92ce920..6025d51610 100644 --- a/go/store/types/edits/async_sorted_edits.go +++ b/go/store/types/edits/async_sorted_edits.go @@ -16,7 +16,6 @@ package edits import ( "context" - "sort" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" @@ -223,23 +222,21 @@ func (ase *AsyncSortedEdits) mergeCollections() error { return nil } -// we pair collections so that as you perform many merges you end up with collections of edits that are similarly sized func pairCollections(colls []*KVPCollection) [][2]*KVPCollection { numColls := len(colls) pairs := make([][2]*KVPCollection, 0, numColls/2+1) - sort.Slice(colls, func(i, j int) bool { - return colls[i].Size() < colls[j].Size() - }) + // These pairs need to come back in order because our sort is stable. + // If there is an odd number of collections, put the first element as a + // single non-merge collection pair at the front of the list. if numColls%2 == 1 { - pairs = append(pairs, [2]*KVPCollection{colls[numColls-1], nil}) - - colls = colls[:numColls-1] + pairs = append(pairs, [2]*KVPCollection{colls[0], nil}) + colls = colls[1:] numColls -= 1 } - for i, j := 0, numColls-1; i < numColls/2; i, j = i+1, j-1 { - pairs = append(pairs, [2]*KVPCollection{colls[i], colls[j]}) + for i := 0; i < numColls; i+=2 { + pairs = append(pairs, [2]*KVPCollection{colls[i], colls[i+1]}) } return pairs diff --git a/go/store/types/edits/kvp_collection.go b/go/store/types/edits/kvp_collection.go index b2bdda0ead..fb8026bf74 100644 --- a/go/store/types/edits/kvp_collection.go +++ b/go/store/types/edits/kvp_collection.go @@ -67,15 +67,15 @@ func (left *KVPCollection) DestructiveMerge(right *KVPCollection) (*KVPCollectio var otherItr *KVPCollItr for !done { - currItr, otherItr = rItr, lItr - isLess, err := lItr.Less(rItr) + currItr, otherItr = lItr, rItr + isLess, err := rItr.Less(lItr) if err != nil { return nil, err } if isLess { - currItr, otherItr = lItr, rItr + currItr, otherItr = rItr, lItr } kvp, exhaustedBuff, done = currItr.nextForDestructiveMerge() diff --git a/go/store/types/edits/sorted_edit_itr.go b/go/store/types/edits/sorted_edit_itr.go index 25e41ab2df..01e26bd804 100644 --- a/go/store/types/edits/sorted_edit_itr.go +++ b/go/store/types/edits/sorted_edit_itr.go @@ -45,15 +45,15 @@ func (itr *SortedEditItr) Next() (*types.KVP, error) { return nil, io.EOF } - lesser := itr.rightItr - isLess, err := itr.leftItr.Less(itr.rightItr) + lesser := itr.leftItr + isLess, err := itr.rightItr.Less(itr.leftItr) if err != nil { return nil, err } if isLess { - lesser = itr.leftItr + lesser = itr.rightItr } kvp, err := lesser.Next() @@ -86,15 +86,15 @@ func (itr *SortedEditItr) Peek() (*types.KVP, error) { return nil, nil } - lesser := itr.rightItr - isLess, err := itr.leftItr.Less(itr.rightItr) + lesser := itr.leftItr + isLess, err := itr.rightItr.Less(itr.leftItr) if err != nil { return nil, err } if isLess { - lesser = itr.leftItr + lesser = itr.rightItr } return lesser.Peek(), nil