From 90483f5d72ecb9719d06428091cf269933aacc2d Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Fri, 17 Jan 2025 12:03:26 -0800 Subject: [PATCH 1/2] go/store/types: Move to a safepoint controller which will allow a caller better control over when to take actions while the GC is running. --- go/libraries/doltcore/doltdb/doltdb.go | 4 +- .../doltcore/sqle/dprocedures/dolt_gc.go | 131 +++++++++++------- go/store/datas/database.go | 2 +- go/store/datas/database_common.go | 4 +- go/store/types/value_store.go | 58 +++++++- 5 files changed, 138 insertions(+), 61 deletions(-) diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index be4cca5304..9221f6200f 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1728,7 +1728,7 @@ func (ddb *DoltDB) Rebase(ctx context.Context) error { // until no possibly-stale ChunkStore state is retained in memory, or failing // certain in-progress operations which cannot be finalized in a timely manner, // etc. -func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointF func() error) error { +func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointController types.GCSafepointController) error { collector, ok := ddb.db.Database.(datas.GarbageCollector) if !ok { return fmt.Errorf("this database does not support garbage collection") @@ -1772,7 +1772,7 @@ func (ddb *DoltDB) GC(ctx context.Context, mode types.GCMode, safepointF func() return err } - return collector.GC(ctx, mode, oldGen, newGen, safepointF) + return collector.GC(ctx, mode, oldGen, newGen, safepointController) } func (ddb *DoltDB) ShallowGC(ctx context.Context) error { diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index d4308e18bf..7f796dc85b 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -15,6 +15,7 @@ package dprocedures import ( + "context" "errors" "fmt" "os" @@ -27,6 +28,7 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" + "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/types" ) @@ -57,6 +59,29 @@ func doltGC(ctx *sql.Context, args ...string) (sql.RowIter, error) { var ErrServerPerformedGC = errors.New("this connection was established when this server performed an online garbage collection. this connection can no longer be used. please reconnect.") +type safepointController struct { + begin func(context.Context, func(hash.Hash) bool) error + preFinalize func(context.Context) error + postFinalize func(context.Context) error + cancel func() +} + +func (sc safepointController) BeginGC(ctx context.Context, keeper func(hash.Hash) bool) error { + return sc.begin(ctx, keeper) +} + +func (sc safepointController) EstablishPreFinalizeSafepoint(ctx context.Context) error { + return sc.preFinalize(ctx) +} + +func (sc safepointController) EstablishPostFinalizeSafepoint(ctx context.Context) error { + return sc.postFinalize(ctx) +} + +func (sc safepointController) CancelSafepoint() { + sc.cancel() +} + func doDoltGC(ctx *sql.Context, args []string) (int, error) { dbName := ctx.GetCurrentDatabase() @@ -116,66 +141,72 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { mode = types.GCModeFull } - // TODO: If we got a callback at the beginning and an - // (allowed-to-block) callback at the end, we could more - // gracefully tear things down. - err = ddb.GC(ctx, mode, func() error { - if origepoch != -1 { - // Here we need to sanity check role and epoch. - if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok { - if role.(string) != "primary" { - return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but now our role is %s", role.(string)) + // TODO: Implement safepointController so that begin can capture inflight sessions + // and preFinalize can ensure they're all in a good place before returning. + sc := safepointController{ + begin: func(context.Context, func(hash.Hash) bool) error { return nil }, + preFinalize: func(context.Context) error { return nil }, + postFinalize: func(context.Context) error { + if origepoch != -1 { + // Here we need to sanity check role and epoch. + if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok { + if role.(string) != "primary" { + return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but now our role is %s", role.(string)) + } + _, epoch, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleEpochVariable) + if !ok { + return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role epoch.") + } + if origepoch != epoch.(int) { + return fmt.Errorf("dolt_gc failed: when we began we were primary in the cluster at epoch %d, but now we are at epoch %d. for gc to safely finalize, our role and epoch must not change throughout the gc.", origepoch, epoch.(int)) + } + } else { + return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role.") } - _, epoch, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleEpochVariable) - if !ok { - return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role epoch.") - } - if origepoch != epoch.(int) { - return fmt.Errorf("dolt_gc failed: when we began we were primary in the cluster at epoch %d, but now we are at epoch %d. for gc to safely finalize, our role and epoch must not change throughout the gc.", origepoch, epoch.(int)) - } - } else { - return fmt.Errorf("dolt_gc failed: when we began we were a primary in a cluster, but we can no longer read the cluster role.") } - } - killed := make(map[uint32]struct{}) - processes := ctx.ProcessList.Processes() - for _, p := range processes { - if p.Connection != ctx.Session.ID() { - // Kill any inflight query. - ctx.ProcessList.Kill(p.Connection) - // Tear down the connection itself. - ctx.KillConnection(p.Connection) - killed[p.Connection] = struct{}{} - } - } - - // Look in processes until the connections are actually gone. - params := backoff.NewExponentialBackOff() - params.InitialInterval = 1 * time.Millisecond - params.MaxInterval = 25 * time.Millisecond - params.MaxElapsedTime = 3 * time.Second - err := backoff.Retry(func() error { + killed := make(map[uint32]struct{}) processes := ctx.ProcessList.Processes() - allgood := true for _, p := range processes { - if _, ok := killed[p.Connection]; ok { - allgood = false + if p.Connection != ctx.Session.ID() { + // Kill any inflight query. ctx.ProcessList.Kill(p.Connection) + // Tear down the connection itself. + ctx.KillConnection(p.Connection) + killed[p.Connection] = struct{}{} } } - if !allgood { - return errors.New("unable to establish safepoint.") + + // Look in processes until the connections are actually gone. + params := backoff.NewExponentialBackOff() + params.InitialInterval = 1 * time.Millisecond + params.MaxInterval = 25 * time.Millisecond + params.MaxElapsedTime = 3 * time.Second + err := backoff.Retry(func() error { + processes := ctx.ProcessList.Processes() + allgood := true + for _, p := range processes { + if _, ok := killed[p.Connection]; ok { + allgood = false + ctx.ProcessList.Kill(p.Connection) + } + } + if !allgood { + return errors.New("unable to establish safepoint.") + } + return nil + }, params) + if err != nil { + return err } + ctx.Session.SetTransaction(nil) + dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerPerformedGC) return nil - }, params) - if err != nil { - return err - } - ctx.Session.SetTransaction(nil) - dsess.DSessFromSess(ctx.Session).SetValidateErr(ErrServerPerformedGC) - return nil - }) + }, + cancel: func() {}, + } + + err = ddb.GC(ctx, mode, sc) if err != nil { return cmdFailure, err } diff --git a/go/store/datas/database.go b/go/store/datas/database.go index fff64fbe73..9d94cad9a5 100644 --- a/go/store/datas/database.go +++ b/go/store/datas/database.go @@ -198,7 +198,7 @@ type GarbageCollector interface { // GC traverses the database starting at the Root and removes // all unreferenced data from persistent storage. - GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error + GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointController types.GCSafepointController) error } // CanUsePuller returns true if a datas.Puller can be used to pull data from one Database into another. Not all diff --git a/go/store/datas/database_common.go b/go/store/datas/database_common.go index 29dc88f4be..8019ce5943 100644 --- a/go/store/datas/database_common.go +++ b/go/store/datas/database_common.go @@ -1168,8 +1168,8 @@ func (db *database) doDelete(ctx context.Context, datasetIDstr string, workingse } // GC traverses the database starting at the Root and removes all unreferenced data from persistent storage. -func (db *database) GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error { - return db.ValueStore.GC(ctx, mode, oldGenRefs, newGenRefs, safepointF) +func (db *database) GC(ctx context.Context, mode types.GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointController types.GCSafepointController) error { + return db.ValueStore.GC(ctx, mode, oldGenRefs, newGenRefs, safepointController) } func (db *database) tryCommitChunks(ctx context.Context, newRootHash hash.Hash, currentRootHash hash.Hash) error { diff --git a/go/store/types/value_store.go b/go/store/types/value_store.go index 7d0984831d..b5d95b0a28 100644 --- a/go/store/types/value_store.go +++ b/go/store/types/value_store.go @@ -565,8 +565,15 @@ const ( GCModeFull ) +type GCSafepointController interface { + BeginGC(ctx context.Context, keeper func(h hash.Hash) bool) error + EstablishPreFinalizeSafepoint(context.Context) error + EstablishPostFinalizeSafepoint(context.Context) error + CancelSafepoint() +} + // GC traverses the ValueStore from the root and removes unreferenced chunks from the ChunkStore -func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRefs hash.HashSet, safepointF func() error) error { +func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRefs hash.HashSet, safepoint GCSafepointController) error { lvs.versOnce.Do(lvs.expectVersion) lvs.transitionToOldGenGC() @@ -600,6 +607,20 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe } defer collector.EndGC() + var callCancelSafepoint bool + if safepoint != nil { + err = safepoint.BeginGC(ctx, lvs.gcAddChunk) + if err != nil { + return err + } + callCancelSafepoint = true + defer func() { + if callCancelSafepoint { + safepoint.CancelSafepoint() + } + }() + } + root, err := lvs.Root(ctx) if err != nil { return err @@ -634,10 +655,11 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe oldGenHasMany = newFileHasMany } - newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, chksMode, collector, newGen, safepointF, lvs.transitionToFinalizingGC) + newGenFinalizer, err = lvs.gc(ctx, newGenRefs, oldGenHasMany, chksMode, collector, newGen, safepoint, lvs.transitionToFinalizingGC) if err != nil { return err } + callCancelSafepoint = false err = newGenFinalizer.SwapChunksInStore(ctx) if err != nil { @@ -669,6 +691,20 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe } defer collector.EndGC() + var callCancelSafepoint bool + if safepoint != nil { + err = safepoint.BeginGC(ctx, lvs.gcAddChunk) + if err != nil { + return err + } + callCancelSafepoint = true + defer func() { + if callCancelSafepoint { + safepoint.CancelSafepoint() + } + }() + } + root, err := lvs.Root(ctx) if err != nil { return err @@ -682,10 +718,11 @@ func (lvs *ValueStore) GC(ctx context.Context, mode GCMode, oldGenRefs, newGenRe newGenRefs.Insert(root) var finalizer chunks.GCFinalizer - finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, chunks.GCMode_Full, collector, collector, safepointF, lvs.transitionToFinalizingGC) + finalizer, err = lvs.gc(ctx, newGenRefs, unfilteredHashFunc, chunks.GCMode_Full, collector, collector, safepoint, lvs.transitionToFinalizingGC) if err != nil { return err } + callCancelSafepoint = false err = finalizer.SwapChunksInStore(ctx) if err != nil { @@ -718,7 +755,7 @@ func (lvs *ValueStore) gc(ctx context.Context, hashFilter chunks.HasManyFunc, chksMode chunks.GCMode, src, dest chunks.ChunkStoreGarbageCollector, - safepointF func() error, + safepointController GCSafepointController, finalize func() hash.HashSet) (chunks.GCFinalizer, error) { sweeper, err := src.MarkAndSweepChunks(ctx, lvs.getAddrs, hashFilter, dest, chksMode) if err != nil { @@ -732,6 +769,14 @@ func (lvs *ValueStore) gc(ctx context.Context, } toVisit = nil + if safepointController != nil { + err = safepointController.EstablishPreFinalizeSafepoint(ctx) + if err != nil { + _, cErr := sweeper.Close(ctx) + return nil, errors.Join(err, cErr) + } + } + // Before we call finalize(), we can process the current set of // NewGenToVisit. NewGen -> Finalize is going to block writes until // we are done, so its best to keep it as small as possible. @@ -750,13 +795,14 @@ func (lvs *ValueStore) gc(ctx context.Context, return nil, errors.Join(err, cErr) } - if safepointF != nil { - err = safepointF() + if safepointController != nil { + err = safepointController.EstablishPostFinalizeSafepoint(ctx) if err != nil { _, cErr := sweeper.Close(ctx) return nil, errors.Join(err, cErr) } } + return sweeper.Close(ctx) } From a6ff95c25257909b9708925590d85ff4ea0d874b Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 23 Jan 2025 16:23:56 -0800 Subject: [PATCH 2/2] repofmt.sh --- go/libraries/doltcore/sqle/enginetest/dolt_queries.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/libraries/doltcore/sqle/enginetest/dolt_queries.go b/go/libraries/doltcore/sqle/enginetest/dolt_queries.go index 18a5c870d6..5aa6efa7fc 100644 --- a/go/libraries/doltcore/sqle/enginetest/dolt_queries.go +++ b/go/libraries/doltcore/sqle/enginetest/dolt_queries.go @@ -7671,7 +7671,7 @@ var DoltTempTableScripts = []queries.ScriptTest{ }, }, { - Name: "drop temporary table behavior", + Name: "drop temporary table behavior", Dialect: "mysql", SetUpScript: []string{ "create table t (i int);", @@ -7723,7 +7723,7 @@ var DoltTempTableScripts = []queries.ScriptTest{ }, }, { - Query: "drop temporary table t;", + Query: "drop temporary table t;", ExpectedErr: sql.ErrUnknownTable, }, },