Merge pull request #8780 from dolthub/aaron/gc-safepoint-controller

go/store/types: Move to a safepoint controller which will allow a caller better control over when to take actions while the GC is running.
This commit is contained in:
Aaron Son
2025-01-23 17:03:23 -08:00
committed by GitHub
6 changed files with 139 additions and 63 deletions

View File

@@ -1728,7 +1728,7 @@ func (ddb *DoltDB) Rebase(ctx context.Context) error {
// until no possibly-stale ChunkStore state is retained in memory, or failing
// certain in-progress operations which cannot be finalized in a timely manner,
// etc.
func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointF func() error) error {
func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointController types.GCSafepointController) error {
collector, ok := ddb.db.Database.(datas.GarbageCollector)
if !ok {
return fmt.Errorf("this database does not support garbage collection")
@@ -1772,7 +1772,7 @@ func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointF func()
return err
}
return collector.GC(ctx, mode, oldGen, newGen, safepointF)
return collector.GC(ctx, mode, oldGen, newGen, safepointController)
}
func (ddb *DoltDB) ShallowGC(ctx context.Context) error {

View File

@@ -15,6 +15,7 @@
package dprocedures
import (
"context"
"errors"
"fmt"
"os"
@@ -27,6 +28,7 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/branch_control"
"github.com/dolthub/dolt/go/libraries/doltcore/dconfig"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/types"
)
@@ -57,6 +59,29 @@ func doltGC(ctx *sql.Context, args ...string) (sql.RowIter, error) {
var ErrServerPerformedGC = errors.New("this connection was established when this server performed an online garbage collection. this connection can no longer be used. please reconnect.")
type safepointController struct {
begin func(context.Context, func(hash.Hash) bool) error
preFinalize func(context.Context) error
postFinalize func(context.Context) error
cancel func()
}
func (sc safepointController) BeginGC(ctx context.Context, keeper func(hash.Hash) bool) error {
return sc.begin(ctx, keeper)
}
func (sc safepointController) EstablishPreFinalizeSafepoint(ctx context.Context) error {
return sc.preFinalize(ctx)
}
func (sc safepointController) EstablishPostFinalizeSafepoint(ctx context.Context) error {
return sc.postFinalize(ctx)
}
func (sc safepointController) CancelSafepoint() {
sc.cancel()
}
func doDoltGC(ctx *sql.Context, args []string) (int, error) {
dbName := ctx.GetCurrentDatabase()
@@ -116,66 +141,72 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) {
mode = types.GCModeFull
}
// TODO: If we got a callback at the beginning and an
// (allowed-to-block) callback at the end, we could more
// gracefully tear things down.
err = ddb.GC(ctx, mode, func() error {
if origepoch != -1 {
// Here we need to sanity check role and epoch.
if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok {
if role.(string) != "primary" {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but now our role is %s", role.(string))
// TODO: Implement safepointController so that begin can capture inflight sessions
// and preFinalize can ensure they're all in a good place before returning.
sc := safepointController{
begin: func(context.Context, func(hash.Hash) bool) error { return nil },
preFinalize: func(context.Context) error { return nil },
postFinalize: func(context.Context) error {
if origepoch != -1 {
// Here we need to sanity check role and epoch.
if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok {
if role.(string) != "primary" {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but now our role is %s", role.(string))
}
_, epoch, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleEpochVariable)
if !ok {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role epoch.")
}
if origepoch != epoch.(int) {
return fmt.Errorf("dolt_gc failed: when we began we were primary in the cluster at epoch %d, but now we are at epoch %d. for gc to safely finalize, our role and epoch must not change throughout the gc.", origepoch, epoch.(int))
}
} else {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role.")
}
_, epoch, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleEpochVariable)
if !ok {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role epoch.")
}
if origepoch != epoch.(int) {
return fmt.Errorf("dolt_gc failed: when we began we were primary in the cluster at epoch %d, but now we are at epoch %d. for gc to safely finalize, our role and epoch must not change throughout the gc.", origepoch, epoch.(int))
}
} else {
return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role.")
}
}
killed := make(map[uint32]struct{})
processes := ctx.ProcessList.Processes()
for _, p := range processes {
if p.Connection != ctx.Session.ID() {
// Kill any inflight query.
ctx.ProcessList.Kill(p.Connection)
// Tear down the connection itself.
ctx.KillConnection(p.Connection)
killed[p.Connection] = struct{}{}
}
}
// Look in processes until the connections are actually gone.
params := backoff.NewExponentialBackOff()
params.InitialInterval = 1 * time.Millisecond
params.MaxInterval = 25 * time.Millisecond
params.MaxElapsedTime = 3 * time.Second
err := backoff.Retry(func() error {
killed := make(map[uint32]struct{})
processes := ctx.ProcessList.Processes()
allgood := true
for _, p := range processes {
if _, ok := killed[p.Connection]; ok {
allgood = false
if p.Connection != ctx.Session.ID() {
// Kill any inflight query.
ctx.ProcessList.Kill(p.Connection)
// Tear down the connection itself.
ctx.KillConnection(p.Connection)
killed[p.Connection] = struct{}{}
}
}
if !allgood {
return errors.New("unable to establish safepoint.")
// Look in processes until the connections are actually gone.
params := backoff.NewExponentialBackOff()
params.InitialInterval = 1 * time.Millisecond
params.MaxInterval = 25 * time.Millisecond
params.MaxElapsedTime = 3 * time.Second
err := backoff.Retry(func() error {
processes := ctx.ProcessList.Processes()
allgood := true
for _, p := range processes {
if _, ok := killed[p.Connection]; ok {
allgood = false
ctx.ProcessList.Kill(p.Connection)
}
}
if !allgood {
return errors.New("unable to establish safepoint.")
}
return nil
}, params)
if err != nil {
return err
}
ctx.Session.SetTransaction(nil)
dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerPerformedGC)
return nil
}, params)
if err != nil {
return err
}
ctx.Session.SetTransaction(nil)
dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerPerformedGC)
return nil
})
},
cancel: func() {},
}
err = ddb.GC(ctx, mode, sc)
if err != nil {
return cmdFailure, err
}

View File

@@ -7671,7 +7671,7 @@ var DoltTempTableScripts = []queries.ScriptTest{
},
},
{
Name: "drop temporary table behavior",
Name: "drop temporary table behavior",
Dialect: "mysql",
SetUpScript: []string{
"create table t (i int);",
@@ -7723,7 +7723,7 @@ var DoltTempTableScripts = []queries.ScriptTest{
},
},
{
Query: "drop temporary table t;",
Query: "drop temporary table t;",
ExpectedErr: sql.ErrUnknownTable,
},
},

View File

@@ -198,7 +198,7 @@ type GarbageCollector interface {
// GC traverses the database starting at the Root and removes
// all unreferenced data from persistent storage.
GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error
GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointController types.GCSafepointController) error
}
// CanUsePuller returns true if a datas.Puller can be used to pull data from one Database into another. Not all

View File

@@ -1168,8 +1168,8 @@ func (db *database) doDelete(ctx context.Context, datasetIDstr string, workingse
}
// GC traverses the database starting at the Root and removes all unreferenced data from persistent storage.
func (db *database) GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error {
return db.ValueStore.GC(ctx, mode, oldGenRefs, newGenRefs, safepointF)
func (db *database) GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointController types.GCSafepointController) error {
return db.ValueStore.GC(ctx, mode, oldGenRefs, newGenRefs, safepointController)
}
func (db *database) tryCommitChunks(ctx context.Context, newRootHash hash.Hash, currentRootHash hash.Hash) error {

View File

@@ -565,8 +565,15 @@ const (
GCModeFull
)
type GCSafepointController interface {
BeginGC(ctx context.Context, keeper func(h hash.Hash) bool) error
EstablishPreFinalizeSafepoint(context.Context) error
EstablishPostFinalizeSafepoint(context.Context) error
CancelSafepoint()
}
// GC traverses the ValueStore from the root and removes unreferenced chunks from the ChunkStore
func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error {
func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRefs hash.HashSet, safepoint GCSafepointController) error {
lvs.versOnce.Do(lvs.expectVersion)
lvs.transitionToOldGenGC()
@@ -600,6 +607,20 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
}
defer collector.EndGC()
var callCancelSafepoint bool
if safepoint != nil {
err = safepoint.BeginGC(ctx, lvs.gcAddChunk)
if err != nil {
return err
}
callCancelSafepoint = true
defer func() {
if callCancelSafepoint {
safepoint.CancelSafepoint()
}
}()
}
root, err := lvs.Root(ctx)
if err != nil {
return err
@@ -634,10 +655,11 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
oldGenHasMany = newFileHasMany
}
newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, chksMode, collector, newGen, safepointF, lvs.transitionToFinalizingGC)
newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, chksMode, collector, newGen, safepoint, lvs.transitionToFinalizingGC)
if err != nil {
return err
}
callCancelSafepoint = false
err = newGenFinalizer.SwapChunksInStore(ctx)
if err != nil {
@@ -669,6 +691,20 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
}
defer collector.EndGC()
var callCancelSafepoint bool
if safepoint != nil {
err = safepoint.BeginGC(ctx, lvs.gcAddChunk)
if err != nil {
return err
}
callCancelSafepoint = true
defer func() {
if callCancelSafepoint {
safepoint.CancelSafepoint()
}
}()
}
root, err := lvs.Root(ctx)
if err != nil {
return err
@@ -682,10 +718,11 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe
newGenRefs.Insert(root)
var finalizer chunks.GCFinalizer
finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, chunks.GCMode_Full, collector, collector, safepointF, lvs.transitionToFinalizingGC)
finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, chunks.GCMode_Full, collector, collector, safepoint, lvs.transitionToFinalizingGC)
if err != nil {
return err
}
callCancelSafepoint = false
err = finalizer.SwapChunksInStore(ctx)
if err != nil {
@@ -718,7 +755,7 @@ func (lvs *ValueStore) gc(ctx context.Context,
hashFilter chunks.HasManyFunc,
chksMode chunks.GCMode,
src, dest chunks.ChunkStoreGarbageCollector,
safepointF func() error,
safepointController GCSafepointController,
finalize func() hash.HashSet) (chunks.GCFinalizer, error) {
sweeper, err := src.MarkAndSweepChunks(ctx, lvs.getAddrs, hashFilter, dest, chksMode)
if err != nil {
@@ -732,6 +769,14 @@ func (lvs *ValueStore) gc(ctx context.Context,
}
toVisit = nil
if safepointController != nil {
err = safepointController.EstablishPreFinalizeSafepoint(ctx)
if err != nil {
cErr := sweeper.Close(ctx)
return nil, errors.Join(err, cErr)
}
}
// Before we call finalize(), we can process the current set of
// NewGenToVisit. NewGen -> Finalize is going to block writes until
// we are done, so its best to keep it as small as possible.
@@ -750,8 +795,8 @@ func (lvs *ValueStore) gc(ctx context.Context,
return nil, errors.Join(err, cErr)
}
if safepointF != nil {
err = safepointF()
if safepointController != nil {
err = safepointController.EstablishPostFinalizeSafepoint(ctx)
if err != nil {
cErr := sweeper.Close(ctx)
return nil, errors.Join(err, cErr)