mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-20 18:19:15 -06:00
go/store/types/edits: Changes to ensure AsyncSortedEdits always returned edits in stable order based on how they were added.
The way the SQL layer issues UPDATE statements to the storage layer, combined with the change which introduced table_edit_accumulator meant that a Map Delete followed by a Map Insert at certain batch sizes were being sent to an edit accumulator and then fetched back out of it. When those come back from the edit accumulator, they need to come back in stable order, so that the update doesn't turn into a set followed by a delete instead. AsyncSortedEdits did not currently make attempts to keep these edits in stable order. In some places it's explicitly backwards (sorted_edit_itr for example). This fixes it so that it is always stable. Potentially regresses a memory utilization and sort size smoothing optimization based around pairing small buffers with large buffers early in the process. Also includes a fix to table_edit_accumulator to correctly order its currently-in-progress commited edit provider when it is doing its stable merge.
This commit is contained in:
@@ -332,10 +332,10 @@ func (tea *tableEditAccumulatorImpl) MaterializeEdits(ctx context.Context, nbf *
|
||||
}
|
||||
|
||||
eps := make([]types.EditProvider, 0, len(flushedEPs)+1)
|
||||
eps = append(eps, committedEP)
|
||||
for i := 0; i < len(flushedEPs); i++ {
|
||||
eps = append(eps, flushedEPs[i].Edits)
|
||||
}
|
||||
eps = append(eps, committedEP)
|
||||
|
||||
defer func() {
|
||||
for _, ep := range eps {
|
||||
|
||||
@@ -16,7 +16,6 @@ package edits
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sort"
|
||||
|
||||
"golang.org/x/sync/errgroup"
|
||||
"golang.org/x/sync/semaphore"
|
||||
@@ -223,23 +222,21 @@ func (ase *AsyncSortedEdits) mergeCollections() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// we pair collections so that as you perform many merges you end up with collections of edits that are similarly sized
|
||||
func pairCollections(colls []*KVPCollection) [][2]*KVPCollection {
|
||||
numColls := len(colls)
|
||||
pairs := make([][2]*KVPCollection, 0, numColls/2+1)
|
||||
sort.Slice(colls, func(i, j int) bool {
|
||||
return colls[i].Size() < colls[j].Size()
|
||||
})
|
||||
|
||||
// These pairs need to come back in order because our sort is stable.
|
||||
// If there is an odd number of collections, put the first element as a
|
||||
// single non-merge collection pair at the front of the list.
|
||||
if numColls%2 == 1 {
|
||||
pairs = append(pairs, [2]*KVPCollection{colls[numColls-1], nil})
|
||||
|
||||
colls = colls[:numColls-1]
|
||||
pairs = append(pairs, [2]*KVPCollection{colls[0], nil})
|
||||
colls = colls[1:]
|
||||
numColls -= 1
|
||||
}
|
||||
|
||||
for i, j := 0, numColls-1; i < numColls/2; i, j = i+1, j-1 {
|
||||
pairs = append(pairs, [2]*KVPCollection{colls[i], colls[j]})
|
||||
for i := 0; i < numColls; i+=2 {
|
||||
pairs = append(pairs, [2]*KVPCollection{colls[i], colls[i+1]})
|
||||
}
|
||||
|
||||
return pairs
|
||||
|
||||
@@ -67,15 +67,15 @@ func (left *KVPCollection) DestructiveMerge(right *KVPCollection) (*KVPCollectio
|
||||
var otherItr *KVPCollItr
|
||||
|
||||
for !done {
|
||||
currItr, otherItr = rItr, lItr
|
||||
isLess, err := lItr.Less(rItr)
|
||||
currItr, otherItr = lItr, rItr
|
||||
isLess, err := rItr.Less(lItr)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if isLess {
|
||||
currItr, otherItr = lItr, rItr
|
||||
currItr, otherItr = rItr, lItr
|
||||
}
|
||||
|
||||
kvp, exhaustedBuff, done = currItr.nextForDestructiveMerge()
|
||||
|
||||
@@ -45,15 +45,15 @@ func (itr *SortedEditItr) Next() (*types.KVP, error) {
|
||||
return nil, io.EOF
|
||||
}
|
||||
|
||||
lesser := itr.rightItr
|
||||
isLess, err := itr.leftItr.Less(itr.rightItr)
|
||||
lesser := itr.leftItr
|
||||
isLess, err := itr.rightItr.Less(itr.leftItr)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if isLess {
|
||||
lesser = itr.leftItr
|
||||
lesser = itr.rightItr
|
||||
}
|
||||
|
||||
kvp, err := lesser.Next()
|
||||
@@ -86,15 +86,15 @@ func (itr *SortedEditItr) Peek() (*types.KVP, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
lesser := itr.rightItr
|
||||
isLess, err := itr.leftItr.Less(itr.rightItr)
|
||||
lesser := itr.leftItr
|
||||
isLess, err := itr.rightItr.Less(itr.leftItr)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if isLess {
|
||||
lesser = itr.leftItr
|
||||
lesser = itr.rightItr
|
||||
}
|
||||
|
||||
return lesser.Peek(), nil
|
||||
|
||||
Reference in New Issue
Block a user