mirror of
https://github.com/dolthub/dolt.git
synced 2026-05-14 11:29:06 -05:00
go/store/prolly: VisitGCRoots: Ensure we visit the fully flushed root of stashed edits if we need to visit them during a GC.
This commit is contained in:
@@ -214,7 +214,6 @@ func (writer prollyKeylessSecondaryWriter) Map(ctx context.Context) (prolly.MapI
|
||||
}
|
||||
|
||||
func (writer prollyKeylessSecondaryWriter) VisitGCRoots(ctx context.Context, roots func(hash.Hash) bool) error {
|
||||
// TODO: writer.primary?
|
||||
return writer.mut.VisitGCRoots(ctx, roots)
|
||||
}
|
||||
|
||||
|
||||
@@ -145,7 +145,7 @@ func testRevertAfterFlush(t *testing.T, scale int) {
|
||||
// flush post-checkpoint edits halfway through
|
||||
// this creates a stashed tree in |mut|
|
||||
if i == len(post)/2 {
|
||||
err = mut.flushPending(ctx)
|
||||
err = mut.flushPending(ctx, false)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,6 +65,14 @@ func (mut *GenericMutableMap[M, T]) Map(ctx context.Context) (M, error) {
|
||||
}
|
||||
|
||||
func (mut *GenericMutableMap[M, T]) VisitGCRoots(ctx context.Context, roots func(hash.Hash) bool) error {
|
||||
// flushPending in order to ensure that our tuples.Static root
|
||||
// is a chunk which includes all of the reachable chunks for
|
||||
// our edits, including chunked internal values like text,
|
||||
// json and geometry.
|
||||
err := mut.flushPending(ctx, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
roots(mut.tuples.Static.GetRoot().HashOf())
|
||||
if mut.stash != nil {
|
||||
roots(mut.stash.Static.GetRoot().HashOf())
|
||||
@@ -117,7 +125,7 @@ func (mut *GenericMutableMap[M, T]) Put(ctx context.Context, key, value val.Tupl
|
||||
return err
|
||||
}
|
||||
if mut.tuples.Edits.Count() > mut.maxPending {
|
||||
return mut.flushPending(ctx)
|
||||
return mut.flushPending(ctx, false)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -167,14 +175,43 @@ func (mut *GenericMutableMap[M, T]) Revert(ctx context.Context) {
|
||||
mut.tuples.Edits.Revert(ctx)
|
||||
}
|
||||
|
||||
func (mut *GenericMutableMap[M, T]) flushPending(ctx context.Context) error {
|
||||
func (mut *GenericMutableMap[M, T]) flushPending(ctx context.Context, deep bool) error {
|
||||
stash := mut.stash
|
||||
// if our in-memory edit set contains a checkpoint, we
|
||||
// must stash a copy of |mut.tuples| we can revert to.
|
||||
if mut.tuples.Edits.HasCheckpoint() {
|
||||
cp := mut.tuples.Copy()
|
||||
cp.Edits.Revert(ctx)
|
||||
stash = &cp
|
||||
if deep {
|
||||
// deep is used by online GC. In order to
|
||||
// ensure that all internal chunk pointers in
|
||||
// all reachable cached edits get visited by
|
||||
// GC, we flush the checkpointed edits to
|
||||
// storage as well.
|
||||
//
|
||||
// TODO: Because of the way apply mutations is
|
||||
// currently coupled with GenericMutableMap,
|
||||
// we construct this temporary generic mutable
|
||||
// map to make the call we want to. This could
|
||||
// be better if we could flush the map with a
|
||||
// more straightforward call which passed
|
||||
// along the dependencies.
|
||||
tmpGMM := &GenericMutableMap[M, T]{
|
||||
keyDesc: mut.keyDesc,
|
||||
valDesc: mut.valDesc,
|
||||
flusher: mut.flusher,
|
||||
tuples: cp,
|
||||
stash: nil,
|
||||
maxPending: mut.maxPending,
|
||||
}
|
||||
err := tmpGMM.flushPending(ctx, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stash = &tmpGMM.tuples
|
||||
} else {
|
||||
stash = &cp
|
||||
}
|
||||
}
|
||||
serializer := mut.flusher.GetDefaultSerializer(ctx, mut)
|
||||
sm, err := mut.flusher.ApplyMutationsWithSerializer(ctx, serializer, mut)
|
||||
|
||||
Reference in New Issue
Block a user