From dc4b94d68dc6b930957154b0c52b66efaa3b575d Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Mon, 10 Feb 2025 11:06:17 -0800 Subject: [PATCH 01/15] sql-server: Add behavior: auto_gc_behavior: enable. When Auto GC is enabled, the running sql-server will periodically collect a Dolt database that is growing in size. This behavior is currently experimental. Tuning the behavior around how often to collect is ongoing work. --- go/cmd/dolt/commands/engine/sqlengine.go | 16 +- .../commands/sqlserver/command_line_config.go | 11 + go/cmd/dolt/commands/sqlserver/server.go | 10 + go/libraries/doltcore/doltdb/doltdb.go | 24 +- .../doltcore/servercfg/serverconfig.go | 10 + .../doltcore/servercfg/yaml_config.go | 28 +++ .../doltcore/servercfg/yaml_config_test.go | 2 + go/libraries/doltcore/sqle/auto_gc.go | 225 ++++++++++++++++++ .../doltcore/sqle/dprocedures/dolt_gc.go | 77 +++--- go/store/nbs/journal.go | 4 + go/store/nbs/journal_writer.go | 6 + .../go-sql-server-driver/auto_gc_test.go | 220 +++++++++++++++++ 12 files changed, 581 insertions(+), 52 deletions(-) create mode 100644 go/libraries/doltcore/sqle/auto_gc.go create mode 100644 integration-tests/go-sql-server-driver/auto_gc_test.go diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index d7f9ee10fc..e646da048a 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -40,6 +40,7 @@ import ( dsqle "github.com/dolthub/dolt/go/libraries/doltcore/sqle" dblr "github.com/dolthub/dolt/go/libraries/doltcore/sqle/binlogreplication" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/cluster" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dprocedures" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/kvexec" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/mysql_file_handler" @@ -80,6 +81,7 @@ type SqlEngineConfig struct { JwksConfig []servercfg.JwksConfig SystemVariables SystemVariables ClusterController *cluster.Controller + AutoGCController *dsqle.AutoGCController BinlogReplicaController binlogreplication.BinlogReplicaController EventSchedulerStatus eventscheduler.SchedulerStatus } @@ -113,7 +115,8 @@ func NewSqlEngine( return nil, err } - all := dbs[:] + all := make([]dsess.SqlDatabase, len(dbs)) + copy(all, dbs) // this is overwritten only for server sessions for _, db := range dbs { @@ -192,6 +195,17 @@ func NewSqlEngine( statsPro := statspro.NewProvider(pro, statsnoms.NewNomsStatsFactory(mrEnv.RemoteDialProvider())) engine.Analyzer.Catalog.StatsProvider = statsPro + if config.AutoGCController != nil { + err = config.AutoGCController.RunBackgroundThread(bThreads, sqlEngine.NewDefaultContext) + if err != nil { + return nil, err + } + config.AutoGCController.ApplyCommitHooks(ctx, mrEnv, dbs...) + pro.InitDatabaseHooks = append(pro.InitDatabaseHooks, config.AutoGCController.InitDatabaseHook()) + // XXX: We force session aware safepoint controller if auto_gc is on. + dprocedures.UseSessionAwareSafepointController = true + } + engine.Analyzer.ExecBuilder = rowexec.NewOverrideBuilder(kvexec.Builder{}) sessFactory := doltSessionFactory(pro, statsPro, mrEnv.Config(), bcController, gcSafepointController, config.Autocommit) sqlEngine.provider = pro diff --git a/go/cmd/dolt/commands/sqlserver/command_line_config.go b/go/cmd/dolt/commands/sqlserver/command_line_config.go index 0d9bf9afdd..5d0d8cde29 100755 --- a/go/cmd/dolt/commands/sqlserver/command_line_config.go +++ b/go/cmd/dolt/commands/sqlserver/command_line_config.go @@ -489,6 +489,10 @@ func (cfg *commandLineServerConfig) ValueSet(value string) bool { return ok } +func (cfg *commandLineServerConfig) AutoGCBehavior() servercfg.AutoGCBehavior { + return stubAutoGCBehavior{} +} + // DoltServerConfigReader is the default implementation of ServerConfigReader suitable for parsing Dolt config files // and command line options. type DoltServerConfigReader struct{} @@ -510,3 +514,10 @@ func (d DoltServerConfigReader) ReadConfigFile(cwdFS filesys.Filesys, file strin func (d DoltServerConfigReader) ReadConfigArgs(args *argparser.ArgParseResults, dataDirOverride string) (servercfg.ServerConfig, error) { return NewCommandLineConfig(nil, args, dataDirOverride) } + +type stubAutoGCBehavior struct { +} + +func (stubAutoGCBehavior) Enable() bool { + return false +} diff --git a/go/cmd/dolt/commands/sqlserver/server.go b/go/cmd/dolt/commands/sqlserver/server.go index 33d253a377..b3592cd265 100644 --- a/go/cmd/dolt/commands/sqlserver/server.go +++ b/go/cmd/dolt/commands/sqlserver/server.go @@ -257,6 +257,16 @@ func ConfigureServices( } controller.Register(InitEventSchedulerStatus) + InitAutoGCController := &svcs.AnonService{ + InitF: func(context.Context) error { + if serverConfig.AutoGCBehavior().Enable() { + config.AutoGCController = sqle.NewAutoGCController(lgr) + } + return nil + }, + } + controller.Register(InitAutoGCController) + var sqlEngine *engine.SqlEngine InitSqlEngine := &svcs.AnonService{ InitF: func(ctx context.Context) (err error) { diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index e27a397915..9982f3abc6 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1917,23 +1917,21 @@ func (ddb *DoltDB) IsTableFileStore() bool { // ChunkJournal returns the ChunkJournal for this DoltDB, if one is in use. func (ddb *DoltDB) ChunkJournal() *nbs.ChunkJournal { - tableFileStore, ok := datas.ChunkStoreFromDatabase(ddb.db).(chunks.TableFileStore) - if !ok { - return nil + cs := datas.ChunkStoreFromDatabase(ddb.db) + + var store *nbs.NomsBlockStore + generationalNBS, ok := cs.(*nbs.GenerationalNBS) + if ok { + store = generationalNBS.NewGen().(*nbs.NomsBlockStore) + } else { + store = cs.(*nbs.NomsBlockStore) } - generationalNbs, ok := tableFileStore.(*nbs.GenerationalNBS) - if !ok { + if store != nil { + return store.ChunkJournal() + } else { return nil } - - newGen := generationalNbs.NewGen() - nbs, ok := newGen.(*nbs.NomsBlockStore) - if !ok { - return nil - } - - return nbs.ChunkJournal() } func (ddb *DoltDB) TableFileStoreHasJournal(ctx context.Context) (bool, error) { diff --git a/go/libraries/doltcore/servercfg/serverconfig.go b/go/libraries/doltcore/servercfg/serverconfig.go index e69ae23bab..79aa01c958 100644 --- a/go/libraries/doltcore/servercfg/serverconfig.go +++ b/go/libraries/doltcore/servercfg/serverconfig.go @@ -48,6 +48,7 @@ const ( DefaultReadOnly = false DefaultLogLevel = LogLevel_Info DefaultAutoCommit = true + DefaultAutoGCBehaviorEnable = false DefaultDoltTransactionCommit = false DefaultMaxConnections = 100 DefaultDataDir = "." @@ -198,6 +199,8 @@ type ServerConfig interface { EventSchedulerStatus() string // ValueSet returns whether the value string provided was explicitly set in the config ValueSet(value string) bool + // AutoGCBehavior defines parameters around how auto-GC works for the running server. + AutoGCBehavior() AutoGCBehavior } // DefaultServerConfig creates a `*ServerConfig` that has all of the options set to their default values. @@ -214,6 +217,9 @@ func defaultServerConfigYAML() *YAMLConfig { ReadOnly: ptr(DefaultReadOnly), AutoCommit: ptr(DefaultAutoCommit), DoltTransactionCommit: ptr(DefaultDoltTransactionCommit), + AutoGCBehavior: &AutoGCBehaviorYAMLConfig{ + Enable_: ptr(DefaultAutoGCBehaviorEnable), + }, }, UserConfig: UserYAMLConfig{ Name: ptr(""), @@ -445,3 +451,7 @@ func CheckForUnixSocket(config ServerConfig) (string, bool, error) { return "", false, nil } + +type AutoGCBehavior interface { + Enable() bool +} diff --git a/go/libraries/doltcore/servercfg/yaml_config.go b/go/libraries/doltcore/servercfg/yaml_config.go index c5fc56b977..5f370f3350 100644 --- a/go/libraries/doltcore/servercfg/yaml_config.go +++ b/go/libraries/doltcore/servercfg/yaml_config.go @@ -65,6 +65,8 @@ type BehaviorYAMLConfig struct { DoltTransactionCommit *bool `yaml:"dolt_transaction_commit,omitempty"` EventSchedulerStatus *string `yaml:"event_scheduler,omitempty" minver:"1.17.0"` + + AutoGCBehavior *AutoGCBehaviorYAMLConfig `yaml:"auto_gc_behavior,omitempty" minver:"TBD"` } // UserYAMLConfig contains server configuration regarding the user account clients must use to connect @@ -176,6 +178,7 @@ func YamlConfigFromFile(fs filesys.Filesys, path string) (ServerConfig, error) { func ServerConfigAsYAMLConfig(cfg ServerConfig) *YAMLConfig { systemVars := cfg.SystemVars() + autoGCBehavior := toAutoGCBehaviorYAML(cfg.AutoGCBehavior()) return &YAMLConfig{ LogLevelStr: ptr(string(cfg.LogLevel())), MaxQueryLenInLogs: nillableIntPtr(cfg.MaxLoggedQueryLen()), @@ -186,6 +189,7 @@ func ServerConfigAsYAMLConfig(cfg ServerConfig) *YAMLConfig { DisableClientMultiStatements: ptr(cfg.DisableClientMultiStatements()), DoltTransactionCommit: ptr(cfg.DoltTransactionCommit()), EventSchedulerStatus: ptr(cfg.EventSchedulerStatus()), + AutoGCBehavior: autoGCBehavior, }, ListenerConfig: ListenerYAMLConfig{ HostStr: ptr(cfg.Host()), @@ -817,6 +821,13 @@ func (cfg YAMLConfig) ClusterConfig() ClusterConfig { return cfg.ClusterCfg } +func (cfg YAMLConfig) AutoGCBehavior() AutoGCBehavior { + if cfg.BehaviorConfig.AutoGCBehavior == nil { + return nil + } + return cfg.BehaviorConfig.AutoGCBehavior +} + func (cfg YAMLConfig) EventSchedulerStatus() string { if cfg.BehaviorConfig.EventSchedulerStatus == nil { return "ON" @@ -922,3 +933,20 @@ func (cfg YAMLConfig) ValueSet(value string) bool { } return false } + +type AutoGCBehaviorYAMLConfig struct { + Enable_ *bool `yaml:"enable,omitempty" minver:"TBD"` +} + +func (a *AutoGCBehaviorYAMLConfig) Enable() bool { + if a.Enable_ == nil { + return false + } + return *a.Enable_ +} + +func toAutoGCBehaviorYAML(a AutoGCBehavior) *AutoGCBehaviorYAMLConfig { + return &AutoGCBehaviorYAMLConfig{ + Enable_: ptr(a.Enable()), + } +} diff --git a/go/libraries/doltcore/servercfg/yaml_config_test.go b/go/libraries/doltcore/servercfg/yaml_config_test.go index 76dd3f21f3..5f6dd64c62 100644 --- a/go/libraries/doltcore/servercfg/yaml_config_test.go +++ b/go/libraries/doltcore/servercfg/yaml_config_test.go @@ -34,6 +34,8 @@ behavior: dolt_transaction_commit: true disable_client_multi_statements: false event_scheduler: ON + auto_gc_behavior: + enable: false listener: host: localhost diff --git a/go/libraries/doltcore/sqle/auto_gc.go b/go/libraries/doltcore/sqle/auto_gc.go new file mode 100644 index 0000000000..23995a84f8 --- /dev/null +++ b/go/libraries/doltcore/sqle/auto_gc.go @@ -0,0 +1,225 @@ +// Copyright 2025 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqle + +import ( + "context" + "io" + "sync" + "time" + + "github.com/dolthub/go-mysql-server/sql" + "github.com/sirupsen/logrus" + + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" + "github.com/dolthub/dolt/go/libraries/doltcore/env" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dprocedures" + "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" + "github.com/dolthub/dolt/go/store/datas" + "github.com/dolthub/dolt/go/store/types" +) + +// Auto GC is the ability of a running SQL server engine to perform +// dolt_gc() behaviors periodically. If enabled, it currently works as +// follows: +// +// An AutoGCController is created for a running SQL Engine. The +// controller runs a background thread which is only ever running one +// GC at a time. Post Commit Hooks are installed on every database in +// the DoltDatabaseProvider for the SQL Engine. Those hooks check if +// it is time to perform a GC for that particular database. If it is, +// they forward a request the background thread to register the +// database as wanting a GC. + +type AutoGCController struct { + workCh chan autoGCWork + lgr *logrus.Logger +} + +func NewAutoGCController(lgr *logrus.Logger) *AutoGCController { + return &AutoGCController{ + workCh: make(chan autoGCWork), + lgr: lgr, + } +} + +// Passed by a commit hook to the auto-GC thread, requesting the +// thread to dolt_gc |db|. When the GC is finished, |done| will be +// closed. Signalling completion allows the commit hook to only +// submit one dolt_gc request at a time. +type autoGCWork struct { + db *doltdb.DoltDB + done chan struct{} + name string // only for logging. +} + +// During engine initialization, this should be called to ensure the +// background worker threads responsible for performing the GC are +// running. +func (c *AutoGCController) RunBackgroundThread(threads *sql.BackgroundThreads, ctxF func(context.Context) (*sql.Context, error)) error { + return threads.Add("auto_gc_thread", func(ctx context.Context) { + var wg sync.WaitGroup + runCh := make(chan autoGCWork) + wg.Add(1) + go func() { + defer wg.Done() + dbs := make([]autoGCWork, 0) + // Accumulate GC requests, only one will come in per database at a time. + // Send the oldest one out to the worker when it is ready. + for { + var toSendCh chan autoGCWork + var toSend autoGCWork + if len(dbs) > 0 { + toSend = dbs[0] + toSendCh = runCh + } + select { + case <-ctx.Done(): + // sql.BackgroundThreads is shutting down. + // No need to drain or anything; just + // return. + return + case newDB := <-c.workCh: + dbs = append(dbs, newDB) + case toSendCh <- toSend: + // We just sent the front of the slice. + // Delete it from our set of pending GCs. + copy(dbs[:], dbs[1:]) + dbs = dbs[:len(dbs)-1] + } + + } + }() + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case work := <-runCh: + c.doWork(ctx, work, ctxF) + } + } + }() + wg.Wait() + }) +} + +func (c *AutoGCController) doWork(ctx context.Context, work autoGCWork, ctxF func(context.Context) (*sql.Context, error)) { + defer close(work.done) + sqlCtx, err := ctxF(ctx) + if err != nil { + c.lgr.Warnf("sqle/auto_gc: Could not create session to GC %s: %v", work.name, err) + return + } + c.lgr.Tracef("sqle/auto_gc: Beginning auto GC of database %s", work.name) + start := time.Now() + defer sql.SessionEnd(sqlCtx.Session) + sql.SessionCommandBegin(sqlCtx.Session) + defer sql.SessionCommandEnd(sqlCtx.Session) + err = dprocedures.RunDoltGC(sqlCtx, work.db, types.GCModeDefault) + if err != nil { + c.lgr.Warnf("sqle/auto_gc: Attempt to auto GC database %s failed with error: %v", work.name, err) + return + } + c.lgr.Infof("sqle/auto_gc: Successfully completed auto GC of database %s in %v", work.name, time.Since(start)) +} + +func (c *AutoGCController) newCommitHook(name string) doltdb.CommitHook { + closed := make(chan struct{}) + close(closed) + return &autoGCCommitHook{ + c: c, + name: name, + done: closed, + next: make(chan struct{}), + } +} + +// The doltdb.CommitHook which watches for database changes and +// requests dolt_gcs. +type autoGCCommitHook struct { + c *AutoGCController + name string + // Always non-nil, if this channel delivers this channel + // delivers when no GC is currently running. + done chan struct{} + // It simplifies the logic and efficiency of the + // implementation a bit to have a + // we can send. It becomes our new |done| channel on a + // successful send. + next chan struct{} + // |done| and |next| are mutable and |Execute| can be called + // concurrently. We protect them with |mu|. + mu sync.Mutex +} + +// During engine initialization, called on the original set of +// databases to configure them for auto-GC. +func (c *AutoGCController) ApplyCommitHooks(ctx context.Context, mrEnv *env.MultiRepoEnv, dbs ...dsess.SqlDatabase) error { + for _, db := range dbs { + denv := mrEnv.GetEnv(db.Name()) + if denv == nil { + continue + } + denv.DoltDB(ctx).PrependCommitHooks(ctx, c.newCommitHook(db.Name())) + } + return nil +} + +func (c *AutoGCController) InitDatabaseHook() InitDatabaseHook { + return func(ctx *sql.Context, pro *DoltDatabaseProvider, name string, env *env.DoltEnv, db dsess.SqlDatabase) error { + env.DoltDB(ctx).PrependCommitHooks(ctx, c.newCommitHook(name)) + return nil + } +} + +func (h *autoGCCommitHook) Execute(ctx context.Context, ds datas.Dataset, db *doltdb.DoltDB) (func(context.Context) error, error) { + h.mu.Lock() + defer h.mu.Unlock() + select { + case <-h.done: + journal := db.ChunkJournal() + if journal != nil { + const size_128mb = (1 << 27) + if journal.Size() > size_128mb { + // We want a GC... + select { + case h.c.workCh <- autoGCWork{db, h.next, h.name}: + h.done = h.next + h.next = make(chan struct{}) + case <-ctx.Done(): + return nil, context.Cause(ctx) + } + } + } + default: + // A GC is running or pending. No need to check. + } + return nil, nil +} + +func (h *autoGCCommitHook) HandleError(ctx context.Context, err error) error { + return nil +} + +func (h *autoGCCommitHook) SetLogger(ctx context.Context, wr io.Writer) error { + return nil +} + +func (h *autoGCCommitHook) ExecuteForWorkingSets() bool { + return true +} diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index c2acc6f5f9..e798ee1a59 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -27,6 +27,7 @@ import ( "github.com/dolthub/dolt/go/cmd/dolt/cli" "github.com/dolthub/dolt/go/libraries/doltcore/branch_control" "github.com/dolthub/dolt/go/libraries/doltcore/dconfig" + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" "github.com/dolthub/dolt/go/store/hash" "github.com/dolthub/dolt/go/store/types" @@ -37,15 +38,13 @@ const ( cmdSuccess = 0 ) -var useSessionAwareSafepointController bool - func init() { if os.Getenv(dconfig.EnvDisableGcProcedure) != "" { DoltGCFeatureFlag = false } if choice := os.Getenv(dconfig.EnvGCSafepointControllerChoice); choice != "" { if choice == "session_aware" { - useSessionAwareSafepointController = true + UseSessionAwareSafepointController = true } else if choice != "kill_connections" { panic("Invalid value for " + dconfig.EnvGCSafepointControllerChoice + ". must be session_aware or kill_connections") } @@ -53,6 +52,7 @@ func init() { } var DoltGCFeatureFlag = true +var UseSessionAwareSafepointController = false // doltGC is the stored procedure to run online garbage collection on a database. func doltGC(ctx *sql.Context, args ...string) (sql.RowIter, error) { @@ -156,7 +156,6 @@ func (sc killConnectionsSafepointController) CancelSafepoint() { type sessionAwareSafepointController struct { controller *dsess.GCSafepointController callCtx *sql.Context - origEpoch int waiter *dsess.GCSafepointWaiter keeper func(hash.Hash) bool @@ -182,7 +181,7 @@ func (sc *sessionAwareSafepointController) EstablishPreFinalizeSafepoint(ctx con } func (sc *sessionAwareSafepointController) EstablishPostFinalizeSafepoint(ctx context.Context) error { - return checkEpochSame(sc.origEpoch) + return nil } func (sc *sessionAwareSafepointController) CancelSafepoint() { @@ -226,44 +225,12 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { return cmdFailure, err } } else { - // Currently, if this server is involved in cluster - // replication, a full GC is only safe to run on the primary. - // We assert that we are the primary here before we begin, and - // we assert again that we are the primary at the same epoch as - // we establish the safepoint. - origepoch := -1 - if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok { - // TODO: magic constant... - if role.(string) != "primary" { - return cmdFailure, fmt.Errorf("cannot run a full dolt_gc() while cluster replication is enabled and role is %s; must be the primary", role.(string)) - } - _, epoch, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleEpochVariable) - if !ok { - return cmdFailure, fmt.Errorf("internal error: cannot run a full dolt_gc(); cluster replication is enabled but could not read %s", dsess.DoltClusterRoleEpochVariable) - } - origepoch = epoch.(int) - } - var mode types.GCMode = types.GCModeDefault if apr.Contains(cli.FullFlag) { mode = types.GCModeFull } - var sc types.GCSafepointController - if useSessionAwareSafepointController { - gcSafepointController := dSess.GCSafepointController() - sc = &sessionAwareSafepointController{ - origEpoch: origepoch, - callCtx: ctx, - controller: gcSafepointController, - } - } else { - sc = killConnectionsSafepointController{ - origEpoch: origepoch, - callCtx: ctx, - } - } - err = ddb.GC(ctx, mode, sc) + err := RunDoltGC(ctx, ddb, mode) if err != nil { return cmdFailure, err } @@ -271,3 +238,37 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { return cmdSuccess, nil } + +func RunDoltGC(ctx *sql.Context, ddb *doltdb.DoltDB, mode types.GCMode) error { + var sc types.GCSafepointController + if UseSessionAwareSafepointController { + dSess := dsess.DSessFromSess(ctx.Session) + gcSafepointController := dSess.GCSafepointController() + sc = &sessionAwareSafepointController{ + callCtx: ctx, + controller: gcSafepointController, + } + } else { + // Legacy safepoint controller behavior was to not + // allow GC on a standby server. GC on a standby server + // with killConnections safepoints should be safe now, + // but we retain this legacy behavior for now. + origepoch := -1 + if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok { + // TODO: magic constant... + if role.(string) != "primary" { + return fmt.Errorf("cannot run a full dolt_gc() while cluster replication is enabled and role is %s; must be the primary", role.(string)) + } + _, epoch, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleEpochVariable) + if !ok { + return fmt.Errorf("internal error: cannot run a full dolt_gc(); cluster replication is enabled but could not read %s", dsess.DoltClusterRoleEpochVariable) + } + origepoch = epoch.(int) + } + sc = killConnectionsSafepointController{ + origEpoch: origepoch, + callCtx: ctx, + } + } + return ddb.GC(ctx, mode, sc) +} diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index e33c8614ae..16310f15cd 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -490,6 +490,10 @@ func (j *ChunkJournal) AccessMode() chunks.ExclusiveAccessMode { return chunks.ExclusiveAccessMode_Exclusive } +func (j *ChunkJournal) Size() int64 { + return j.wr.size() +} + type journalConjoiner struct { child conjoinStrategy } diff --git a/go/store/nbs/journal_writer.go b/go/store/nbs/journal_writer.go index caea111b14..1aedf7f073 100644 --- a/go/store/nbs/journal_writer.go +++ b/go/store/nbs/journal_writer.go @@ -472,6 +472,12 @@ func (wr *journalWriter) commitRootHash(ctx context.Context, root hash.Hash) err return wr.commitRootHashUnlocked(ctx, root) } +func (wr *journalWriter) size() int64 { + wr.lock.Lock() + defer wr.lock.Unlock() + return wr.off +} + func (wr *journalWriter) commitRootHashUnlocked(ctx context.Context, root hash.Hash) error { defer trace.StartRegion(ctx, "commit-root").End() diff --git a/integration-tests/go-sql-server-driver/auto_gc_test.go b/integration-tests/go-sql-server-driver/auto_gc_test.go new file mode 100644 index 0000000000..8f89f3b882 --- /dev/null +++ b/integration-tests/go-sql-server-driver/auto_gc_test.go @@ -0,0 +1,220 @@ +// Copyright 2025 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "database/sql" + "fmt" + "math/rand/v2" + "os" + "path/filepath" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver" +) + +func TestAutoGC(t *testing.T) { + var enabled_16, final_16, disabled, final_disabled RepoSize + t.Run("Enable", func(t *testing.T) { + t.Run("CommitEvery16", func(t *testing.T) { + enabled_16, final_16 = runAutoGCTest(t, true, 16) + t.Logf("repo size before final gc: %v", enabled_16) + t.Logf("repo size after final gc: %v", final_16) + }) + }) + t.Run("Disabled", func(t *testing.T) { + disabled, final_disabled = runAutoGCTest(t, false, 128) + t.Logf("repo size before final gc: %v", disabled) + t.Logf("repo size after final gc: %v", final_disabled) + }) + if enabled_16.NewGen > 0 && disabled.NewGen > 0 { + assert.Greater(t, enabled_16.OldGen, disabled.OldGen) + assert.Greater(t, enabled_16.OldGenC, disabled.OldGenC) + assert.Greater(t, enabled_16.NewGen-enabled_16.Journal, enabled_16.Journal) + assert.Less(t, disabled.NewGen-disabled.Journal, disabled.Journal) + } +} + +func setupAutoGCTest(ctx context.Context, t *testing.T, enable bool) (string, *sql.DB) { + u, err := driver.NewDoltUser() + require.NoError(t, err) + t.Cleanup(func() { + u.Cleanup() + }) + + rs, err := u.MakeRepoStore() + require.NoError(t, err) + + repo, err := rs.MakeRepo("auto_gc_test") + require.NoError(t, err) + + err = driver.WithFile{ + Name: "server.yaml", + Contents: fmt.Sprintf(` +behavior: + auto_gc_behavior: + enable: %v +`, enable), + }.WriteAtDir(repo.Dir) + require.NoError(t, err) + + server := MakeServer(t, repo, &driver.Server{ + Args: []string{"--config", "server.yaml"}, + }) + server.DBName = "auto_gc_test" + + db, err := server.DB(driver.Connection{User: "root"}) + require.NoError(t, err) + t.Cleanup(func() { + db.Close() + }) + + // Create the database... + conn, err := db.Conn(ctx) + require.NoError(t, err) + _, err = conn.ExecContext(ctx, ` +create table vals ( + id bigint primary key, + v1 bigint, + v2 bigint, + v3 bigint, + v4 bigint, + index (v1), + index (v2), + index (v3), + index (v4), + index (v1,v2), + index (v1,v3), + index (v1,v4), + index (v2,v3), + index (v2,v4), + index (v2,v1), + index (v3,v1), + index (v3,v2), + index (v3,v4), + index (v4,v1), + index (v4,v2), + index (v4,v3) +) +`) + require.NoError(t, err) + _, err = conn.ExecContext(ctx, "call dolt_commit('-Am', 'create vals table')") + require.NoError(t, err) + require.NoError(t, conn.Close()) + + return repo.Dir, db +} + +func autoGCInsertStatement(i int) string { + var vals []string + for j := i * 1024; j < (i+1)*1024; j++ { + var vs [4]string + vs[0] = strconv.Itoa(rand.Int()) + vs[1] = strconv.Itoa(rand.Int()) + vs[2] = strconv.Itoa(rand.Int()) + vs[3] = strconv.Itoa(rand.Int()) + val := "(" + strconv.Itoa(j) + "," + strings.Join(vs[:], ",") + ")" + vals = append(vals, val) + } + return "insert into vals values " + strings.Join(vals, ",") +} + +func runAutoGCTest(t *testing.T, enable bool, commitEvery int) (RepoSize, RepoSize) { + // A simple auto-GC test, where we run + // operations on an auto GC server and + // ensure that the database is getting + // collected. + ctx := context.Background() + dir, db := setupAutoGCTest(ctx, t, enable) + + for i := 0; i < 64; i++ { + stmt := autoGCInsertStatement(i) + conn, err := db.Conn(ctx) + _, err = conn.ExecContext(ctx, stmt) + require.NoError(t, err) + if i%commitEvery == 0 { + _, err = conn.ExecContext(ctx, "call dolt_commit('-am', 'insert from "+strconv.Itoa(i*1024)+"')") + require.NoError(t, err) + } + require.NoError(t, conn.Close()) + } + + before, err := GetRepoSize(dir) + require.NoError(t, err) + conn, err := db.Conn(ctx) + require.NoError(t, err) + _, err = conn.ExecContext(ctx, "call dolt_gc('--full')") + require.NoError(t, err) + require.NoError(t, conn.Close()) + after, err := GetRepoSize(dir) + require.NoError(t, err) + return before, after +} + +type RepoSize struct { + Journal int64 + NewGen int64 + NewGenC int + OldGen int64 + OldGenC int +} + +func GetRepoSize(dir string) (RepoSize, error) { + var ret RepoSize + entries, err := os.ReadDir(filepath.Join(dir, ".dolt/noms")) + if err != nil { + return ret, err + } + for _, e := range entries { + stat, err := e.Info() + if err != nil { + return ret, err + } + if stat.IsDir() { + continue + } + ret.NewGen += stat.Size() + ret.NewGenC += 1 + if e.Name() == "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv" { + ret.Journal += stat.Size() + } + } + entries, err = os.ReadDir(filepath.Join(dir, ".dolt/noms/oldgen")) + if err != nil { + return ret, err + } + for _, e := range entries { + stat, err := e.Info() + if err != nil { + return ret, err + } + if stat.IsDir() { + continue + } + ret.OldGen += stat.Size() + ret.OldGenC += 1 + } + return ret, nil +} + +func (rs RepoSize) String() string { + return fmt.Sprintf("journal: %v, new gen: %v (%v files), old gen: %v (%v files)", rs.Journal, rs.NewGen, rs.NewGenC, rs.OldGen, rs.OldGenC) +} From 9b53d20c7ce154b5e577bd8233208f533a4dd7d2 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Mon, 10 Feb 2025 20:55:21 -0800 Subject: [PATCH 02/15] go: cmd: sqlserver: Fix nil SIGSEGV on AutoGCBehavior read in init. --- go/cmd/dolt/commands/sqlserver/server.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/cmd/dolt/commands/sqlserver/server.go b/go/cmd/dolt/commands/sqlserver/server.go index b3592cd265..dd6dd83784 100644 --- a/go/cmd/dolt/commands/sqlserver/server.go +++ b/go/cmd/dolt/commands/sqlserver/server.go @@ -259,7 +259,8 @@ func ConfigureServices( InitAutoGCController := &svcs.AnonService{ InitF: func(context.Context) error { - if serverConfig.AutoGCBehavior().Enable() { + if serverConfig.AutoGCBehavior() != nil && + serverConfig.AutoGCBehavior().Enable() { config.AutoGCController = sqle.NewAutoGCController(lgr) } return nil From 62e50323dfd035af95ba570d5b8735153e53b464 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Mon, 10 Feb 2025 21:55:32 -0800 Subject: [PATCH 03/15] integration-tests/go-sql-server-driver: auto_gc_test.go: Tweak asserts to be more reliable. --- integration-tests/go-sql-server-driver/auto_gc_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/integration-tests/go-sql-server-driver/auto_gc_test.go b/integration-tests/go-sql-server-driver/auto_gc_test.go index 8f89f3b882..512070d863 100644 --- a/integration-tests/go-sql-server-driver/auto_gc_test.go +++ b/integration-tests/go-sql-server-driver/auto_gc_test.go @@ -48,7 +48,6 @@ func TestAutoGC(t *testing.T) { if enabled_16.NewGen > 0 && disabled.NewGen > 0 { assert.Greater(t, enabled_16.OldGen, disabled.OldGen) assert.Greater(t, enabled_16.OldGenC, disabled.OldGenC) - assert.Greater(t, enabled_16.NewGen-enabled_16.Journal, enabled_16.Journal) assert.Less(t, disabled.NewGen-disabled.Journal, disabled.Journal) } } From f509d98649402f162b05008ec9fa08b85834b4b8 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Wed, 12 Feb 2025 13:06:19 -0800 Subject: [PATCH 04/15] go: sqle: auto_gc.go: Make sure the safepoint controller works on the same database as the auto GC work. --- go/libraries/doltcore/sqle/auto_gc.go | 2 +- .../doltcore/sqle/dprocedures/dolt_gc.go | 21 ++++++++++--------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/go/libraries/doltcore/sqle/auto_gc.go b/go/libraries/doltcore/sqle/auto_gc.go index 23995a84f8..5d35e94c2e 100644 --- a/go/libraries/doltcore/sqle/auto_gc.go +++ b/go/libraries/doltcore/sqle/auto_gc.go @@ -130,7 +130,7 @@ func (c *AutoGCController) doWork(ctx context.Context, work autoGCWork, ctxF fun defer sql.SessionEnd(sqlCtx.Session) sql.SessionCommandBegin(sqlCtx.Session) defer sql.SessionCommandEnd(sqlCtx.Session) - err = dprocedures.RunDoltGC(sqlCtx, work.db, types.GCModeDefault) + err = dprocedures.RunDoltGC(sqlCtx, work.db, types.GCModeDefault, work.name) if err != nil { c.lgr.Warnf("sqle/auto_gc: Attempt to auto GC database %s failed with error: %v", work.name, err) return diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index e798ee1a59..4b0bfd74d1 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -154,25 +154,25 @@ func (sc killConnectionsSafepointController) CancelSafepoint() { } type sessionAwareSafepointController struct { - controller *dsess.GCSafepointController - callCtx *sql.Context + controller *dsess.GCSafepointController + dbname string + callSession *dsess.DoltSession waiter *dsess.GCSafepointWaiter keeper func(hash.Hash) bool } func (sc *sessionAwareSafepointController) visit(ctx context.Context, sess *dsess.DoltSession) error { - return sess.VisitGCRoots(ctx, sc.callCtx.GetCurrentDatabase(), sc.keeper) + return sess.VisitGCRoots(ctx, sc.dbname, sc.keeper) } func (sc *sessionAwareSafepointController) BeginGC(ctx context.Context, keeper func(hash.Hash) bool) error { sc.keeper = keeper - thisSess := dsess.DSessFromSess(sc.callCtx.Session) - err := sc.visit(ctx, thisSess) + err := sc.visit(ctx, sc.callSession) if err != nil { return err } - sc.waiter = sc.controller.Waiter(ctx, thisSess, sc.visit) + sc.waiter = sc.controller.Waiter(ctx, sc.callSession, sc.visit) return nil } @@ -230,7 +230,7 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { mode = types.GCModeFull } - err := RunDoltGC(ctx, ddb, mode) + err := RunDoltGC(ctx, ddb, mode, ctx.GetCurrentDatabase() ) if err != nil { return cmdFailure, err } @@ -239,14 +239,15 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { return cmdSuccess, nil } -func RunDoltGC(ctx *sql.Context, ddb *doltdb.DoltDB, mode types.GCMode) error { +func RunDoltGC(ctx *sql.Context, ddb *doltdb.DoltDB, mode types.GCMode, dbname string) error { var sc types.GCSafepointController if UseSessionAwareSafepointController { dSess := dsess.DSessFromSess(ctx.Session) gcSafepointController := dSess.GCSafepointController() sc = &sessionAwareSafepointController{ - callCtx: ctx, - controller: gcSafepointController, + callSession: dSess, + dbname: dbname, + controller: gcSafepointController, } } else { // Legacy safepoint controller behavior was to not From 10dfcec18f60010120f22b3c37f2dfa448313b7e Mon Sep 17 00:00:00 2001 From: reltuk Date: Wed, 12 Feb 2025 21:14:19 +0000 Subject: [PATCH 05/15] [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh --- go/libraries/doltcore/sqle/dprocedures/dolt_gc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index 4b0bfd74d1..13f5b17b36 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -230,7 +230,7 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) { mode = types.GCModeFull } - err := RunDoltGC(ctx, ddb, mode, ctx.GetCurrentDatabase() ) + err := RunDoltGC(ctx, ddb, mode, ctx.GetCurrentDatabase()) if err != nil { return cmdFailure, err } From 2a47d402b22889b45a799f385ceeb75fd4abcc2b Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 13 Feb 2025 17:24:03 -0800 Subject: [PATCH 06/15] go: sqle: auto_gc: Responding to PR feedback. Comments and cleanup. --- go/cmd/dolt/commands/engine/sqlengine.go | 7 +++++++ go/libraries/doltcore/doltdb/doltdb.go | 12 ++++-------- go/libraries/doltcore/sqle/auto_gc.go | 14 +++++++++----- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index e646da048a..adc14af9a2 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -115,6 +115,13 @@ func NewSqlEngine( return nil, err } + // Make a copy of the databases. |all| is going to be provided + // as the set of all initial databases to dsqle + // DatabaseProvider. |dbs| is only the databases that came + // from MultiRepoEnv, and they are all real databases based on + // DoltDB instances. |all| is going to include some extension, + // informational databases like |dolt_cluster| sometimes, + // depending on config. all := make([]dsess.SqlDatabase, len(dbs)) copy(all, dbs) diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index 9982f3abc6..55d160e410 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1919,16 +1919,12 @@ func (ddb *DoltDB) IsTableFileStore() bool { func (ddb *DoltDB) ChunkJournal() *nbs.ChunkJournal { cs := datas.ChunkStoreFromDatabase(ddb.db) - var store *nbs.NomsBlockStore - generationalNBS, ok := cs.(*nbs.GenerationalNBS) - if ok { - store = generationalNBS.NewGen().(*nbs.NomsBlockStore) - } else { - store = cs.(*nbs.NomsBlockStore) + if generationalNBS, ok := cs.(*nbs.GenerationalNBS); ok { + cs = generationalNBS.NewGen() } - if store != nil { - return store.ChunkJournal() + if nbsStore, ok := cs.(*nbs.NomsBlockStore); ok { + return nbsStore.ChunkJournal() } else { return nil } diff --git a/go/libraries/doltcore/sqle/auto_gc.go b/go/libraries/doltcore/sqle/auto_gc.go index 5d35e94c2e..0ea5038b13 100644 --- a/go/libraries/doltcore/sqle/auto_gc.go +++ b/go/libraries/doltcore/sqle/auto_gc.go @@ -154,13 +154,17 @@ func (c *AutoGCController) newCommitHook(name string) doltdb.CommitHook { type autoGCCommitHook struct { c *AutoGCController name string - // Always non-nil, if this channel delivers this channel - // delivers when no GC is currently running. + // When |done| is closed, there is no GC currently running or + // pending for this database. If it is open, then there is a + // pending request for GC or a GC is currently running. Once + // |done| is closed, we can check for auto GC conditions on + // the database to see if we should request a new GC. done chan struct{} // It simplifies the logic and efficiency of the - // implementation a bit to have a - // we can send. It becomes our new |done| channel on a - // successful send. + // implementation a bit to have an already allocated // in the commit hook. +channel + // we can try to send, which will become our new |done| + // channel once we send it successfully. next chan struct{} // |done| and |next| are mutable and |Execute| can be called // concurrently. We protect them with |mu|. From 69630e100704bc754934a5698c4a452fa0a13884 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Wed, 19 Feb 2025 15:07:07 -0800 Subject: [PATCH 07/15] go: auto_gc: Add a heuristic which allows followers in cluster replication to also auto-gc. --- go/libraries/doltcore/doltdb/doltdb.go | 54 ++++++++++++++++++++++++++ go/libraries/doltcore/sqle/auto_gc.go | 54 ++++++++++++++++++-------- 2 files changed, 92 insertions(+), 16 deletions(-) diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index 87b1cf1063..36c9ef3764 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1930,6 +1930,60 @@ func (ddb *DoltDB) ChunkJournal() *nbs.ChunkJournal { } } +// An approximate representation of how large the on-disk storage is for a DoltDB. +type StoreSizes struct { + // For ChunkJournal stores, this will be size of the journal file. A size + // of zero does not mean the store is not journaled. The store could be + // journaled, and the journal could be empty. + JournalBytes uint64 + // For Generational storages this will be the size of the new gen. It will + // include any JournalBytes. A size of zero does not mean the store is not + // generational, since it could be the case that the store is generational + // but everything in it is in the old gen. In practice, given how we build + // oldgen references today, this will never be the case--there is always + // a little bit of data that only goes in the newgen. + NewGenBytes uint64 + // This is the approximate total on-disk storage overhead of the store. + // It includes Journal and NewGenBytes, if there are any. + TotalBytes uint64 +} + +func (ddb *DoltDB) StoreSizes(ctx context.Context) (StoreSizes, error) { + cs := datas.ChunkStoreFromDatabase(ddb.db) + if generationalNBS, ok := cs.(*nbs.GenerationalNBS); ok { + newgen := generationalNBS.NewGen() + newgenSz, err := newgen.(chunks.TableFileStore).Size(ctx) + if err != nil { + return StoreSizes{}, err + } + totalSz, err := cs.(chunks.TableFileStore).Size(ctx) + if err != nil { + return StoreSizes{}, err + } + journal := newgen.(*nbs.NomsBlockStore).ChunkJournal() + if journal != nil { + return StoreSizes{ + JournalBytes: uint64(journal.Size()), + NewGenBytes: newgenSz, + TotalBytes: totalSz, + }, nil + } else { + return StoreSizes{ + NewGenBytes: newgenSz, + TotalBytes: totalSz, + }, nil + } + } else { + totalSz, err := cs.(chunks.TableFileStore).Size(ctx) + if err != nil { + return StoreSizes{}, err + } + return StoreSizes{ + TotalBytes: totalSz, + }, nil + } +} + func (ddb *DoltDB) TableFileStoreHasJournal(ctx context.Context) (bool, error) { tableFileStore, ok := datas.ChunkStoreFromDatabase(ddb.db).(chunks.TableFileStore) if !ok { diff --git a/go/libraries/doltcore/sqle/auto_gc.go b/go/libraries/doltcore/sqle/auto_gc.go index 70a2c76802..bcb5bab849 100644 --- a/go/libraries/doltcore/sqle/auto_gc.go +++ b/go/libraries/doltcore/sqle/auto_gc.go @@ -165,8 +165,15 @@ type autoGCCommitHook struct { // we can try to send when we request a GC. If will become our // new |done| channel once we send it successfully. next chan struct{} - // |done| and |next| are mutable and |Execute| can be called - // concurrently. We protect them with |mu|. + // lastSz is set the first time we observe StoreSizes after a + // GC or after the server comes up. It is used in some simple + // growth heuristics to figure out if we want to run a GC. We + // set it back to |nil| when we successfully submit a request + // to GC, so that we observe and store the new size after the + // GC is finished. + lastSz *doltdb.StoreSizes + // |done|, |next|, |lastSz| are mutable and |Execute| can be + // called concurrently. We protect them with |mu|. mu sync.Mutex } @@ -195,26 +202,41 @@ func (h *autoGCCommitHook) Execute(ctx context.Context, ds datas.Dataset, db *do defer h.mu.Unlock() select { case <-h.done: - journal := db.ChunkJournal() - if journal != nil { - const size_128mb = (1 << 27) - if journal.Size() > size_128mb { - // We want a GC... - select { - case h.c.workCh <- autoGCWork{db, h.next, h.name}: - h.done = h.next - h.next = make(chan struct{}) - case <-ctx.Done(): - return nil, context.Cause(ctx) - } - } + sz, err := db.StoreSizes(ctx) + if err != nil { + // Something is probably quite wrong. Regardless, can't determine if we should GC. + return nil, err + } + if h.lastSz == nil { + h.lastSz = &sz + } + const size_128mb = (1 << 27) + const size_256mb = (1 << 28) + if sz.JournalBytes > size_128mb { + // Our first heuristic is simply if journal is greater than a fixed size... + return nil, h.requestGC(ctx, db) + } else if sz.TotalBytes > h.lastSz.TotalBytes && sz.TotalBytes - h.lastSz.TotalBytes > size_256mb { + // Or if the store has grown by a fixed size since our last GC / we started watching it... + return nil, h.requestGC(ctx, db) } default: - // A GC is running or pending. No need to check. + // A GC is already running or pending. No need to check. } return nil, nil } +func (h *autoGCCommitHook) requestGC(ctx context.Context, db *doltdb.DoltDB) error { + select { + case h.c.workCh <- autoGCWork{db, h.next, h.name}: + h.done = h.next + h.next = make(chan struct{}) + h.lastSz = nil + return nil + case <-ctx.Done(): + return context.Cause(ctx) + } +} + func (h *autoGCCommitHook) HandleError(ctx context.Context, err error) error { return nil } From 143893303b43a492bb7f387319c439bc7842eba1 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Thu, 20 Feb 2025 16:15:21 -0800 Subject: [PATCH 08/15] integration-tests: go-sql-server-driver: auto_gc_test: Add a skipped test for auto-gc occurring on a standby replica. --- .../go-sql-server-driver/auto_gc_test.go | 156 ++++++++++++++++-- 1 file changed, 139 insertions(+), 17 deletions(-) diff --git a/integration-tests/go-sql-server-driver/auto_gc_test.go b/integration-tests/go-sql-server-driver/auto_gc_test.go index 512070d863..3a2f881b08 100644 --- a/integration-tests/go-sql-server-driver/auto_gc_test.go +++ b/integration-tests/go-sql-server-driver/auto_gc_test.go @@ -35,13 +35,35 @@ func TestAutoGC(t *testing.T) { var enabled_16, final_16, disabled, final_disabled RepoSize t.Run("Enable", func(t *testing.T) { t.Run("CommitEvery16", func(t *testing.T) { - enabled_16, final_16 = runAutoGCTest(t, true, 16) + var s AutoGCTest + s.Enable = true + enabled_16, final_16 = runAutoGCTest(t, &s, 64, 16) + assert.Contains(t, string(s.PrimaryServer.Output.Bytes()), "Successfully completed auto GC") t.Logf("repo size before final gc: %v", enabled_16) t.Logf("repo size after final gc: %v", final_16) }) + t.Run("ClusterReplication", func(t *testing.T) { + // This test does not work yet, because remotsrv Commits + // do not go through the doltdb.hooksDatabase hooks + // machinery. + t.Skip() + var s AutoGCTest + s.Enable = true + s.Replicate = true + enabled_16, final_16 = runAutoGCTest(t, &s, 256, 16) + assert.Contains(t, string(s.PrimaryServer.Output.Bytes()), "Successfully completed auto GC") + assert.Contains(t, string(s.StandbyServer.Output.Bytes()), "Successfully completed auto GC") + t.Logf("repo size before final gc: %v", enabled_16) + t.Logf("repo size after final gc: %v", final_16) + rs, err := GetRepoSize(s.StandbyDir) + require.NoError(t, err) + t.Logf("standby size: %v", rs) + }) }) t.Run("Disabled", func(t *testing.T) { - disabled, final_disabled = runAutoGCTest(t, false, 128) + var s AutoGCTest + disabled, final_disabled = runAutoGCTest(t, &s, 64, 128) + assert.NotContains(t, string(s.PrimaryServer.Output.Bytes()), "Successfully completed auto GC") t.Logf("repo size before final gc: %v", disabled) t.Logf("repo size after final gc: %v", final_disabled) }) @@ -52,26 +74,69 @@ func TestAutoGC(t *testing.T) { } } -func setupAutoGCTest(ctx context.Context, t *testing.T, enable bool) (string, *sql.DB) { +type AutoGCTest struct { + Enable bool + PrimaryDir string + PrimaryServer *driver.SqlServer + PrimaryDB *sql.DB + + Replicate bool + StandbyDir string + StandbyServer *driver.SqlServer + StandbyDB *sql.DB +} + +func (s *AutoGCTest) Setup(ctx context.Context, t *testing.T) { u, err := driver.NewDoltUser() require.NoError(t, err) t.Cleanup(func() { u.Cleanup() }) + s.CreatePrimaryServer(ctx, t, u) + + if s.Replicate { + u, err := driver.NewDoltUser() + require.NoError(t, err) + t.Cleanup(func() { + u.Cleanup() + }) + s.CreateStandbyServer(ctx, t, u) + } + + s.CreatePrimaryDatabase(ctx, t) +} + +func (s *AutoGCTest) CreatePrimaryServer(ctx context.Context, t *testing.T, u driver.DoltUser) { rs, err := u.MakeRepoStore() require.NoError(t, err) repo, err := rs.MakeRepo("auto_gc_test") require.NoError(t, err) - err = driver.WithFile{ - Name: "server.yaml", - Contents: fmt.Sprintf(` + behaviorFragment := fmt.Sprintf(` behavior: auto_gc_behavior: enable: %v -`, enable), +`, s.Enable) + + var clusterFragment string + if s.Replicate { + clusterFragment = ` +cluster: + standby_remotes: + - name: standby + remote_url_template: http://localhost:3852/{database} + bootstrap_role: primary + bootstrap_epoch: 1 + remotesapi: + port: 3851 +` + } + + err = driver.WithFile{ + Name: "server.yaml", + Contents: behaviorFragment + clusterFragment, }.WriteAtDir(repo.Dir) require.NoError(t, err) @@ -86,8 +151,67 @@ behavior: db.Close() }) + s.PrimaryDir = repo.Dir + s.PrimaryDB = db + s.PrimaryServer = server +} + +func (s *AutoGCTest) CreateStandbyServer(ctx context.Context, t *testing.T, u driver.DoltUser) { + rs, err := u.MakeRepoStore() + require.NoError(t, err) + + repo, err := rs.MakeRepo("auto_gc_test") + require.NoError(t, err) + + behaviorFragment := fmt.Sprintf(` +listener: + host: 0.0.0.0 + port: 3308 +behavior: + auto_gc_behavior: + enable: %v +`, s.Enable) + + var clusterFragment string + if s.Replicate { + clusterFragment = ` +cluster: + standby_remotes: + - name: primary + remote_url_template: http://localhost:3851/{database} + bootstrap_role: standby + bootstrap_epoch: 1 + remotesapi: + port: 3852 +` + } + + err = driver.WithFile{ + Name: "server.yaml", + Contents: behaviorFragment + clusterFragment, + }.WriteAtDir(repo.Dir) + require.NoError(t, err) + + server := MakeServer(t, repo, &driver.Server{ + Args: []string{"--config", "server.yaml"}, + Port: 3308, + }) + server.DBName = "auto_gc_test" + + db, err := server.DB(driver.Connection{User: "root"}) + require.NoError(t, err) + t.Cleanup(func() { + db.Close() + }) + + s.StandbyDir = repo.Dir + s.StandbyDB = db + s.StandbyServer = server +} + +func (s *AutoGCTest) CreatePrimaryDatabase(ctx context.Context, t *testing.T) { // Create the database... - conn, err := db.Conn(ctx) + conn, err := s.PrimaryDB.Conn(ctx) require.NoError(t, err) _, err = conn.ExecContext(ctx, ` create table vals ( @@ -118,8 +242,6 @@ create table vals ( _, err = conn.ExecContext(ctx, "call dolt_commit('-Am', 'create vals table')") require.NoError(t, err) require.NoError(t, conn.Close()) - - return repo.Dir, db } func autoGCInsertStatement(i int) string { @@ -136,17 +258,17 @@ func autoGCInsertStatement(i int) string { return "insert into vals values " + strings.Join(vals, ",") } -func runAutoGCTest(t *testing.T, enable bool, commitEvery int) (RepoSize, RepoSize) { +func runAutoGCTest(t *testing.T, s *AutoGCTest, numStatements int, commitEvery int) (RepoSize, RepoSize) { // A simple auto-GC test, where we run // operations on an auto GC server and // ensure that the database is getting // collected. ctx := context.Background() - dir, db := setupAutoGCTest(ctx, t, enable) + s.Setup(ctx, t) - for i := 0; i < 64; i++ { + for i := 0; i < numStatements; i++ { stmt := autoGCInsertStatement(i) - conn, err := db.Conn(ctx) + conn, err := s.PrimaryDB.Conn(ctx) _, err = conn.ExecContext(ctx, stmt) require.NoError(t, err) if i%commitEvery == 0 { @@ -156,14 +278,14 @@ func runAutoGCTest(t *testing.T, enable bool, commitEvery int) (RepoSize, RepoSi require.NoError(t, conn.Close()) } - before, err := GetRepoSize(dir) + before, err := GetRepoSize(s.PrimaryDir) require.NoError(t, err) - conn, err := db.Conn(ctx) + conn, err := s.PrimaryDB.Conn(ctx) require.NoError(t, err) _, err = conn.ExecContext(ctx, "call dolt_gc('--full')") require.NoError(t, err) require.NoError(t, conn.Close()) - after, err := GetRepoSize(dir) + after, err := GetRepoSize(s.PrimaryDir) require.NoError(t, err) return before, after } From efb5edca3039961531d86d6eaf55919578774f9d Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Fri, 21 Feb 2025 08:41:01 -0800 Subject: [PATCH 09/15] go: sqle: auto_gc: Move to a background thread which periodically checks, plus gets triggered by a commit hook. This helps handle the stanbdy replica case, where commits come in through remotesrv directly into the ChunkStore, and not through the datas.Database. --- go/libraries/doltcore/sqle/auto_gc.go | 251 ++++++++++++------ .../doltcore/sqle/cluster/controller.go | 2 +- .../doltcore/sqle/cluster/initdbhook.go | 2 +- go/store/nbs/journal.go | 6 +- .../go-sql-server-driver/auto_gc_test.go | 4 - 5 files changed, 177 insertions(+), 88 deletions(-) diff --git a/go/libraries/doltcore/sqle/auto_gc.go b/go/libraries/doltcore/sqle/auto_gc.go index bcb5bab849..ce87d794b9 100644 --- a/go/libraries/doltcore/sqle/auto_gc.go +++ b/go/libraries/doltcore/sqle/auto_gc.go @@ -46,12 +46,18 @@ import ( type AutoGCController struct { workCh chan autoGCWork lgr *logrus.Logger + + mu sync.Mutex + hooks map[string]*autoGCCommitHook + ctxF func(context.Context) (*sql.Context, error) + threads *sql.BackgroundThreads } func NewAutoGCController(lgr *logrus.Logger) *AutoGCController { return &AutoGCController{ workCh: make(chan autoGCWork), lgr: lgr, + hooks: make(map[string]*autoGCCommitHook), } } @@ -69,53 +75,62 @@ type autoGCWork struct { // background worker threads responsible for performing the GC are // running. func (c *AutoGCController) RunBackgroundThread(threads *sql.BackgroundThreads, ctxF func(context.Context) (*sql.Context, error)) error { - return threads.Add("auto_gc_thread", func(ctx context.Context) { - var wg sync.WaitGroup - runCh := make(chan autoGCWork) - wg.Add(1) - go func() { - defer wg.Done() - dbs := make([]autoGCWork, 0) - // Accumulate GC requests, only one will come in per database at a time. - // Send the oldest one out to the worker when it is ready. - for { - var toSendCh chan autoGCWork - var toSend autoGCWork - if len(dbs) > 0 { - toSend = dbs[0] - toSendCh = runCh - } - select { - case <-ctx.Done(): - // sql.BackgroundThreads is shutting down. - // No need to drain or anything; just - // return. - return - case newDB := <-c.workCh: - dbs = append(dbs, newDB) - case toSendCh <- toSend: - // We just sent the front of the slice. - // Delete it from our set of pending GCs. - copy(dbs[:], dbs[1:]) - dbs = dbs[:len(dbs)-1] - } + c.threads = threads + c.ctxF = ctxF + err := threads.Add("auto_gc_thread", c.gcBgThread) + if err != nil { + return err + } + // TODO: Start bg threads for all existing commit hooks. + return nil +} +func (c *AutoGCController) gcBgThread(ctx context.Context) { + var wg sync.WaitGroup + runCh := make(chan autoGCWork) + wg.Add(1) + go func() { + defer wg.Done() + dbs := make([]autoGCWork, 0) + // Accumulate GC requests, only one will come in per database at a time. + // Send the oldest one out to the worker when it is ready. + for { + var toSendCh chan autoGCWork + var toSend autoGCWork + if len(dbs) > 0 { + toSend = dbs[0] + toSendCh = runCh } - }() - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-ctx.Done(): - return - case work := <-runCh: - c.doWork(ctx, work, ctxF) - } + select { + case <-ctx.Done(): + // sql.BackgroundThreads is shutting down. + // No need to drain or anything; just + // return. + return + case newDB := <-c.workCh: + dbs = append(dbs, newDB) + case toSendCh <- toSend: + // We just sent the front of the slice. + // Delete it from our set of pending GCs. + copy(dbs[:], dbs[1:]) + dbs = dbs[:len(dbs)-1] } - }() - wg.Wait() - }) + + } + }() + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case work := <-runCh: + c.doWork(ctx, work, c.ctxF) + } + } + }() + wg.Wait() } func (c *AutoGCController) doWork(ctx context.Context, work autoGCWork, ctxF func(context.Context) (*sql.Context, error)) { @@ -138,15 +153,27 @@ func (c *AutoGCController) doWork(ctx context.Context, work autoGCWork, ctxF fun c.lgr.Infof("sqle/auto_gc: Successfully completed auto GC of database %s in %v", work.name, time.Since(start)) } -func (c *AutoGCController) newCommitHook(name string) doltdb.CommitHook { +func (c *AutoGCController) newCommitHook(name string, db *doltdb.DoltDB) doltdb.CommitHook { + c.mu.Lock() + defer c.mu.Unlock() closed := make(chan struct{}) close(closed) - return &autoGCCommitHook{ - c: c, - name: name, - done: closed, - next: make(chan struct{}), + ret := &autoGCCommitHook{ + c: c, + name: name, + done: closed, + next: make(chan struct{}), + db: db, + tickCh: make(chan struct{}), + stopCh: make(chan struct{}), } + c.hooks[name] = ret + if c.threads != nil { + // If this errors, sql.BackgroundThreads is already closed. + // Things are hopefully shutting down... + _ = ret.run(c.threads) + } + return ret } // The doltdb.CommitHook which watches for database changes and @@ -172,9 +199,17 @@ type autoGCCommitHook struct { // to GC, so that we observe and store the new size after the // GC is finished. lastSz *doltdb.StoreSizes - // |done|, |next|, |lastSz| are mutable and |Execute| can be - // called concurrently. We protect them with |mu|. - mu sync.Mutex + + db *doltdb.DoltDB + + // Closed when the thread should shutdown because the database + // is being removed. + stopCh chan struct{} + // An optimistic send on this channel notifies the background + // thread that the sizes may have changed and it can check for + // the GC condition. + tickCh chan struct{} + wg sync.WaitGroup } // During engine initialization, called on the original set of @@ -185,49 +220,44 @@ func (c *AutoGCController) ApplyCommitHooks(ctx context.Context, mrEnv *env.Mult if denv == nil { continue } - denv.DoltDB(ctx).PrependCommitHooks(ctx, c.newCommitHook(db.Name())) + ddb := denv.DoltDB(ctx) + ddb.PrependCommitHooks(ctx, c.newCommitHook(db.Name(), ddb)) } return nil } +func (c *AutoGCController) DropDatabaseHook() DropDatabaseHook { + return func(ctx *sql.Context, name string) { + c.mu.Lock() + defer c.mu.Unlock() + hook := c.hooks[name] + if hook != nil { + hook.stop() + delete(c.hooks, name) + } + } +} + func (c *AutoGCController) InitDatabaseHook() InitDatabaseHook { return func(ctx *sql.Context, pro *DoltDatabaseProvider, name string, env *env.DoltEnv, db dsess.SqlDatabase) error { - env.DoltDB(ctx).PrependCommitHooks(ctx, c.newCommitHook(name)) + ddb := env.DoltDB(ctx) + ddb.PrependCommitHooks(ctx, c.newCommitHook(name, ddb)) return nil } } -func (h *autoGCCommitHook) Execute(ctx context.Context, ds datas.Dataset, db *doltdb.DoltDB) (func(context.Context) error, error) { - h.mu.Lock() - defer h.mu.Unlock() +func (h *autoGCCommitHook) Execute(ctx context.Context, _ datas.Dataset, _ *doltdb.DoltDB) (func(context.Context) error, error) { select { - case <-h.done: - sz, err := db.StoreSizes(ctx) - if err != nil { - // Something is probably quite wrong. Regardless, can't determine if we should GC. - return nil, err - } - if h.lastSz == nil { - h.lastSz = &sz - } - const size_128mb = (1 << 27) - const size_256mb = (1 << 28) - if sz.JournalBytes > size_128mb { - // Our first heuristic is simply if journal is greater than a fixed size... - return nil, h.requestGC(ctx, db) - } else if sz.TotalBytes > h.lastSz.TotalBytes && sz.TotalBytes - h.lastSz.TotalBytes > size_256mb { - // Or if the store has grown by a fixed size since our last GC / we started watching it... - return nil, h.requestGC(ctx, db) - } - default: - // A GC is already running or pending. No need to check. + case h.tickCh <- struct{}{}: + return nil, nil + case <-ctx.Done(): + return nil, context.Cause(ctx) } - return nil, nil } -func (h *autoGCCommitHook) requestGC(ctx context.Context, db *doltdb.DoltDB) error { +func (h *autoGCCommitHook) requestGC(ctx context.Context) error { select { - case h.c.workCh <- autoGCWork{db, h.next, h.name}: + case h.c.workCh <- autoGCWork{h.db, h.next, h.name}: h.done = h.next h.next = make(chan struct{}) h.lastSz = nil @@ -248,3 +278,62 @@ func (h *autoGCCommitHook) SetLogger(ctx context.Context, wr io.Writer) error { func (h *autoGCCommitHook) ExecuteForWorkingSets() bool { return true } + +func (h *autoGCCommitHook) checkForGC(ctx context.Context) error { + select { + case <-h.done: + sz, err := h.db.StoreSizes(ctx) + if err != nil { + // Something is probably quite wrong. Regardless, can't determine if we should GC. + return err + } + if h.lastSz == nil { + h.lastSz = &sz + } + const size_128mb = (1 << 27) + const size_256mb = (1 << 28) + if sz.JournalBytes > size_128mb { + // Our first heuristic is simply if journal is greater than a fixed size... + return h.requestGC(ctx) + } else if sz.TotalBytes > h.lastSz.TotalBytes && sz.TotalBytes-h.lastSz.TotalBytes > size_256mb { + // Or if the store has grown by a fixed size since our last GC / we started watching it... + return h.requestGC(ctx) + } + default: + // A GC is already running or pending. No need to check. + } + return nil +} + +const checkInterval = 100 * time.Millisecond + +func (h *autoGCCommitHook) thread(ctx context.Context) { + defer h.wg.Done() + timer := time.NewTimer(checkInterval) + defer timer.Stop() + for { + select { + case <-ctx.Done(): + return + case <-h.stopCh: + return + case <-h.tickCh: + // We ignore an error here, which just means we didn't kick + // off a GC when we might have wanted to. + _ = h.checkForGC(ctx) + case <-timer.C: + _ = h.checkForGC(ctx) + timer.Reset(checkInterval) + } + } +} + +func (h *autoGCCommitHook) stop() { + close(h.stopCh) + h.wg.Wait() +} + +func (h *autoGCCommitHook) run(threads *sql.BackgroundThreads) error { + h.wg.Add(1) + return threads.Add("auto_gc_thread["+h.name+"]", h.thread) +} diff --git a/go/libraries/doltcore/sqle/cluster/controller.go b/go/libraries/doltcore/sqle/cluster/controller.go index 487923dbcb..c6b07c3aed 100644 --- a/go/libraries/doltcore/sqle/cluster/controller.go +++ b/go/libraries/doltcore/sqle/cluster/controller.go @@ -317,7 +317,7 @@ func (c *Controller) applyCommitHooks(ctx context.Context, name string, denv *en } } commitHook := newCommitHook(c.lgr, r.Name(), remote.Url, name, c.role, func(ctx context.Context) (*doltdb.DoltDB, error) { - return remote.GetRemoteDB(ctx, types.Format_Default, dialprovider) + return remote.GetRemoteDBWithoutCaching(ctx, types.Format_Default, dialprovider) }, denv.DoltDB(ctx), ttfdir) denv.DoltDB(ctx).PrependCommitHooks(ctx, commitHook) hooks = append(hooks, commitHook) diff --git a/go/libraries/doltcore/sqle/cluster/initdbhook.go b/go/libraries/doltcore/sqle/cluster/initdbhook.go index 1fbcd57872..bd5eafe5b3 100644 --- a/go/libraries/doltcore/sqle/cluster/initdbhook.go +++ b/go/libraries/doltcore/sqle/cluster/initdbhook.go @@ -64,7 +64,7 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads) sqle } remoteDBs = append(remoteDBs, func(ctx context.Context) (*doltdb.DoltDB, error) { - return er.GetRemoteDB(ctx, types.Format_Default, dialprovider) + return er.GetRemoteDBWithoutCaching(ctx, types.Format_Default, dialprovider) }) remoteUrls = append(remoteUrls, remoteUrl) } diff --git a/go/store/nbs/journal.go b/go/store/nbs/journal.go index 871d0214d8..7e9598a4b6 100644 --- a/go/store/nbs/journal.go +++ b/go/store/nbs/journal.go @@ -499,7 +499,11 @@ func (j *ChunkJournal) AccessMode() chunks.ExclusiveAccessMode { } func (j *ChunkJournal) Size() int64 { - return j.wr.size() + if j.wr != nil { + return j.wr.size() + } else { + return 0 + } } type journalConjoiner struct { diff --git a/integration-tests/go-sql-server-driver/auto_gc_test.go b/integration-tests/go-sql-server-driver/auto_gc_test.go index 3a2f881b08..7683a4756a 100644 --- a/integration-tests/go-sql-server-driver/auto_gc_test.go +++ b/integration-tests/go-sql-server-driver/auto_gc_test.go @@ -43,10 +43,6 @@ func TestAutoGC(t *testing.T) { t.Logf("repo size after final gc: %v", final_16) }) t.Run("ClusterReplication", func(t *testing.T) { - // This test does not work yet, because remotsrv Commits - // do not go through the doltdb.hooksDatabase hooks - // machinery. - t.Skip() var s AutoGCTest s.Enable = true s.Replicate = true From e4a4b5dadbc2691713ea5bb394b40802996097a8 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Fri, 21 Feb 2025 11:31:23 -0800 Subject: [PATCH 10/15] go: store,remotesrv: Improve logging. Improve gRPC error statuses for certain errros that arise after a GC. --- go/libraries/doltcore/remotesrv/grpc.go | 41 ++++++++++++++++--------- go/libraries/doltcore/remotesrv/http.go | 2 +- go/libraries/doltcore/sqle/auto_gc.go | 6 +++- go/store/nbs/file_table_reader.go | 4 ++- go/store/nbs/store.go | 2 +- 5 files changed, 37 insertions(+), 18 deletions(-) diff --git a/go/libraries/doltcore/remotesrv/grpc.go b/go/libraries/doltcore/remotesrv/grpc.go index b3090e587c..40ae33fe60 100644 --- a/go/libraries/doltcore/remotesrv/grpc.go +++ b/go/libraries/doltcore/remotesrv/grpc.go @@ -36,6 +36,7 @@ import ( "github.com/dolthub/dolt/go/libraries/utils/filesys" "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/hash" + "github.com/dolthub/dolt/go/store/nbs" "github.com/dolthub/dolt/go/store/types" ) @@ -97,7 +98,7 @@ func (rs *RemoteChunkStore) HasChunks(ctx context.Context, req *remotesapi.HasCh } repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() cs, err := rs.getStore(ctx, logger, repoPath) if err != nil { @@ -155,7 +156,7 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot } repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() cs, err := rs.getStore(ctx, logger, repoPath) if err != nil { @@ -233,7 +234,7 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore "num_requested": numHashes, "num_urls": numUrls, "num_ranges": numRanges, - }).Info("finished") + }).Trace("finished") }() logger := ologger @@ -387,7 +388,7 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes } repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() _, err := rs.getStore(ctx, logger, repoPath) if err != nil { @@ -445,7 +446,7 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe } repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() _, err := rs.getStore(ctx, logger, repoPath) if err != nil { @@ -462,7 +463,7 @@ func (rs *RemoteChunkStore) Root(ctx context.Context, req *remotesapi.RootReques } repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() cs, err := rs.getStore(ctx, logger, repoPath) if err != nil { @@ -485,7 +486,7 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe } repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() cs, err := rs.getStore(ctx, logger, repoPath) if err != nil { @@ -500,7 +501,11 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe err = cs.AddTableFilesToManifest(ctx, updates, rs.getAddrs(cs.Version())) if err != nil { logger.WithError(err).Error("error calling AddTableFilesToManifest") - return nil, status.Errorf(codes.Internal, "manifest update error: %v", err) + code := codes.Internal + if errors.Is(err, nbs.ErrDanglingRef) || errors.Is(err, nbs.ErrTableFileNotFound) { + code = codes.FailedPrecondition + } + return nil, status.Errorf(code, "manifest update error: %v", err) } currHash := hash.New(req.Current) @@ -513,7 +518,11 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe "last_hash": lastHash.String(), "curr_hash": currHash.String(), }).Error("error calling Commit") - return nil, status.Errorf(codes.Internal, "failed to commit: %v", err) + code := codes.Internal + if errors.Is(err, nbs.ErrDanglingRef) || errors.Is(err, nbs.ErrTableFileNotFound) { + code = codes.FailedPrecondition + } + return nil, status.Errorf(code, "failed to commit: %v", err) } logger.Tracef("Commit success; moved from %s -> %s", lastHash.String(), currHash.String()) @@ -528,7 +537,7 @@ func (rs *RemoteChunkStore) GetRepoMetadata(ctx context.Context, req *remotesapi repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() cs, err := rs.getOrCreateStore(ctx, logger, repoPath, req.ClientRepoFormat.NbfVersion) if err != nil { @@ -556,7 +565,7 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi. } repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() cs, err := rs.getStore(ctx, logger, repoPath) if err != nil { @@ -634,7 +643,7 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A } repoPath := getRepoPath(req) logger = logger.WithField(RepoPathField, repoPath) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() cs, err := rs.getStore(ctx, logger, repoPath) if err != nil { @@ -649,7 +658,11 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A err = cs.AddTableFilesToManifest(ctx, updates, rs.getAddrs(cs.Version())) if err != nil { logger.WithError(err).Error("error occurred updating the manifest") - return nil, status.Error(codes.Internal, "manifest update error") + code := codes.Internal + if errors.Is(err, nbs.ErrDanglingRef) || errors.Is(err, nbs.ErrTableFileNotFound) { + code = codes.FailedPrecondition + } + return nil, status.Error(code, "manifest update error") } logger = logger.WithFields(logrus.Fields{ @@ -707,7 +720,7 @@ func getReqLogger(lgr *logrus.Entry, method string) *logrus.Entry { "method": method, "request_num": strconv.Itoa(incReqId()), }) - lgr.Info("starting request") + lgr.Trace("starting request") return lgr } diff --git a/go/libraries/doltcore/remotesrv/http.go b/go/libraries/doltcore/remotesrv/http.go index 825f4ca0df..ba1ea046ee 100644 --- a/go/libraries/doltcore/remotesrv/http.go +++ b/go/libraries/doltcore/remotesrv/http.go @@ -66,7 +66,7 @@ func newFileHandler(lgr *logrus.Entry, dbCache DBCache, fs filesys.Filesys, read func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) { logger := getReqLogger(fh.lgr, req.Method+"_"+req.RequestURI) - defer func() { logger.Info("finished") }() + defer func() { logger.Trace("finished") }() var err error req.URL, err = fh.sealer.Unseal(req.URL) diff --git a/go/libraries/doltcore/sqle/auto_gc.go b/go/libraries/doltcore/sqle/auto_gc.go index ce87d794b9..5ca099bd49 100644 --- a/go/libraries/doltcore/sqle/auto_gc.go +++ b/go/libraries/doltcore/sqle/auto_gc.go @@ -16,6 +16,7 @@ package sqle import ( "context" + "errors" "io" "sync" "time" @@ -27,6 +28,7 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/env" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dprocedures" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" + "github.com/dolthub/dolt/go/store/chunks" "github.com/dolthub/dolt/go/store/datas" "github.com/dolthub/dolt/go/store/types" ) @@ -147,7 +149,9 @@ func (c *AutoGCController) doWork(ctx context.Context, work autoGCWork, ctxF fun defer sql.SessionCommandEnd(sqlCtx.Session) err = dprocedures.RunDoltGC(sqlCtx, work.db, types.GCModeDefault, work.name) if err != nil { - c.lgr.Warnf("sqle/auto_gc: Attempt to auto GC database %s failed with error: %v", work.name, err) + if !errors.Is(err, chunks.ErrNothingToCollect) { + c.lgr.Warnf("sqle/auto_gc: Attempt to auto GC database %s failed with error: %v", work.name, err) + } return } c.lgr.Infof("sqle/auto_gc: Successfully completed auto GC of database %s in %v", work.name, time.Since(start)) diff --git a/go/store/nbs/file_table_reader.go b/go/store/nbs/file_table_reader.go index d808b81563..48e38ffdce 100644 --- a/go/store/nbs/file_table_reader.go +++ b/go/store/nbs/file_table_reader.go @@ -33,6 +33,8 @@ import ( "github.com/dolthub/dolt/go/store/hash" ) +var ErrTableFileNotFound = errors.New("table file not found") + type fileTableReader struct { tableReader h hash.Hash @@ -81,7 +83,7 @@ func newFileTableReader(ctx context.Context, dir string, h hash.Hash, chunkCount } else if afExists { return newArchiveChunkSource(ctx, dir, h, chunkCount, q) } - return nil, errors.New(fmt.Sprintf("table file %s/%s not found", dir, h.String())) + return nil, fmt.Errorf("error opening table file: %w: %s/%s", ErrTableFileNotFound, dir, h.String()) } func nomsFileTableReader(ctx context.Context, path string, h hash.Hash, chunkCount uint32, q MemoryQuotaProvider) (cs chunkSource, err error) { diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 78ab22843b..1ddf2900b8 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -1735,7 +1735,7 @@ func refCheckAllSources(ctx context.Context, nbs *NomsBlockStore, getAddrs chunk checkErr = err } if len(remaining) > 0 { - checkErr = fmt.Errorf("%w, missing: %v", errors.New("cannot add table files; referenced chunk not found in store."), remaining) + checkErr = fmt.Errorf("cannot add table files: %w, missing: %v", ErrTableFileNotFound, remaining) } } for _, source := range sources { From 4ce49e6a2411e5a6369784f52dac069ca0acd831 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Fri, 21 Feb 2025 17:14:29 -0800 Subject: [PATCH 11/15] go: store/nbs: store.go: Fix GetManyCompressed to not redeliver chunks on waitForGC retry. --- go/libraries/doltcore/doltdb/doltdb.go | 10 +++++----- go/store/nbs/store.go | 4 ++-- integration-tests/go-sql-server-driver/auto_gc_test.go | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index 36c9ef3764..feaba4fb14 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1942,10 +1942,10 @@ type StoreSizes struct { // but everything in it is in the old gen. In practice, given how we build // oldgen references today, this will never be the case--there is always // a little bit of data that only goes in the newgen. - NewGenBytes uint64 + NewGenBytes uint64 // This is the approximate total on-disk storage overhead of the store. // It includes Journal and NewGenBytes, if there are any. - TotalBytes uint64 + TotalBytes uint64 } func (ddb *DoltDB) StoreSizes(ctx context.Context) (StoreSizes, error) { @@ -1964,13 +1964,13 @@ func (ddb *DoltDB) StoreSizes(ctx context.Context) (StoreSizes, error) { if journal != nil { return StoreSizes{ JournalBytes: uint64(journal.Size()), - NewGenBytes: newgenSz, - TotalBytes: totalSz, + NewGenBytes: newgenSz, + TotalBytes: totalSz, }, nil } else { return StoreSizes{ NewGenBytes: newgenSz, - TotalBytes: totalSz, + TotalBytes: totalSz, }, nil } } else { diff --git a/go/store/nbs/store.go b/go/store/nbs/store.go index 1ddf2900b8..1e5da5a4c1 100644 --- a/go/store/nbs/store.go +++ b/go/store/nbs/store.go @@ -955,10 +955,10 @@ func (nbs *NomsBlockStore) getManyWithFunc( nbs.stats.ChunksPerGet.Sample(uint64(len(hashes))) }() + reqs := toGetRecords(hashes) + const ioParallelism = 16 for { - reqs := toGetRecords(hashes) - nbs.mu.Lock() keeper := nbs.keeperFunc if gcDepMode == gcDependencyMode_NoDependency { diff --git a/integration-tests/go-sql-server-driver/auto_gc_test.go b/integration-tests/go-sql-server-driver/auto_gc_test.go index 7683a4756a..79a0f913bd 100644 --- a/integration-tests/go-sql-server-driver/auto_gc_test.go +++ b/integration-tests/go-sql-server-driver/auto_gc_test.go @@ -131,7 +131,7 @@ cluster: } err = driver.WithFile{ - Name: "server.yaml", + Name: "server.yaml", Contents: behaviorFragment + clusterFragment, }.WriteAtDir(repo.Dir) require.NoError(t, err) @@ -183,7 +183,7 @@ cluster: } err = driver.WithFile{ - Name: "server.yaml", + Name: "server.yaml", Contents: behaviorFragment + clusterFragment, }.WriteAtDir(repo.Dir) require.NoError(t, err) From 837ae43d3583bb97365aeb741edbb29fdd4016ba Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Sat, 22 Feb 2025 06:34:22 -0800 Subject: [PATCH 12/15] integration-tests/go-sql-server-driver: auto_gc_test.go: Bring down amount of garbage produced by replication test to improve speed. --- integration-tests/go-sql-server-driver/auto_gc_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/go-sql-server-driver/auto_gc_test.go b/integration-tests/go-sql-server-driver/auto_gc_test.go index 79a0f913bd..de14ea2478 100644 --- a/integration-tests/go-sql-server-driver/auto_gc_test.go +++ b/integration-tests/go-sql-server-driver/auto_gc_test.go @@ -46,7 +46,7 @@ func TestAutoGC(t *testing.T) { var s AutoGCTest s.Enable = true s.Replicate = true - enabled_16, final_16 = runAutoGCTest(t, &s, 256, 16) + enabled_16, final_16 = runAutoGCTest(t, &s, 64, 16) assert.Contains(t, string(s.PrimaryServer.Output.Bytes()), "Successfully completed auto GC") assert.Contains(t, string(s.StandbyServer.Output.Bytes()), "Successfully completed auto GC") t.Logf("repo size before final gc: %v", enabled_16) From df991698ecfec07a5afb8009e96331bc197f910c Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Sun, 23 Feb 2025 11:46:12 -0800 Subject: [PATCH 13/15] go: sqle/auto_gc: Add some simple tests for controller and hook. --- go/cmd/dolt/commands/engine/sqlengine.go | 1 + go/libraries/doltcore/sqle/auto_gc.go | 13 ++- go/libraries/doltcore/sqle/auto_gc_test.go | 117 +++++++++++++++++++++ 3 files changed, 127 insertions(+), 4 deletions(-) create mode 100644 go/libraries/doltcore/sqle/auto_gc_test.go diff --git a/go/cmd/dolt/commands/engine/sqlengine.go b/go/cmd/dolt/commands/engine/sqlengine.go index fe1769c0ca..9c43cab2b9 100644 --- a/go/cmd/dolt/commands/engine/sqlengine.go +++ b/go/cmd/dolt/commands/engine/sqlengine.go @@ -211,6 +211,7 @@ func NewSqlEngine( } config.AutoGCController.ApplyCommitHooks(ctx, mrEnv, dbs...) pro.InitDatabaseHooks = append(pro.InitDatabaseHooks, config.AutoGCController.InitDatabaseHook()) + pro.DropDatabaseHooks = append(pro.DropDatabaseHooks, config.AutoGCController.DropDatabaseHook()) // XXX: We force session aware safepoint controller if auto_gc is on. dprocedures.UseSessionAwareSafepointController = true } diff --git a/go/libraries/doltcore/sqle/auto_gc.go b/go/libraries/doltcore/sqle/auto_gc.go index 5ca099bd49..ac33a42a17 100644 --- a/go/libraries/doltcore/sqle/auto_gc.go +++ b/go/libraries/doltcore/sqle/auto_gc.go @@ -83,7 +83,12 @@ func (c *AutoGCController) RunBackgroundThread(threads *sql.BackgroundThreads, c if err != nil { return err } - // TODO: Start bg threads for all existing commit hooks. + for _, hook := range c.hooks { + err = hook.run(threads) + if err != nil { + return err + } + } return nil } @@ -157,7 +162,7 @@ func (c *AutoGCController) doWork(ctx context.Context, work autoGCWork, ctxF fun c.lgr.Infof("sqle/auto_gc: Successfully completed auto GC of database %s in %v", work.name, time.Since(start)) } -func (c *AutoGCController) newCommitHook(name string, db *doltdb.DoltDB) doltdb.CommitHook { +func (c *AutoGCController) newCommitHook(name string, db *doltdb.DoltDB) *autoGCCommitHook { c.mu.Lock() defer c.mu.Unlock() closed := make(chan struct{}) @@ -231,7 +236,7 @@ func (c *AutoGCController) ApplyCommitHooks(ctx context.Context, mrEnv *env.Mult } func (c *AutoGCController) DropDatabaseHook() DropDatabaseHook { - return func(ctx *sql.Context, name string) { + return func(_ *sql.Context, name string) { c.mu.Lock() defer c.mu.Unlock() hook := c.hooks[name] @@ -243,7 +248,7 @@ func (c *AutoGCController) DropDatabaseHook() DropDatabaseHook { } func (c *AutoGCController) InitDatabaseHook() InitDatabaseHook { - return func(ctx *sql.Context, pro *DoltDatabaseProvider, name string, env *env.DoltEnv, db dsess.SqlDatabase) error { + return func(ctx *sql.Context, _ *DoltDatabaseProvider, name string, env *env.DoltEnv, _ dsess.SqlDatabase) error { ddb := env.DoltDB(ctx) ddb.PrependCommitHooks(ctx, c.newCommitHook(name, ddb)) return nil diff --git a/go/libraries/doltcore/sqle/auto_gc_test.go b/go/libraries/doltcore/sqle/auto_gc_test.go new file mode 100644 index 0000000000..80387f57b2 --- /dev/null +++ b/go/libraries/doltcore/sqle/auto_gc_test.go @@ -0,0 +1,117 @@ +// Copyright 2025 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sqle + +import ( + "bytes" + "context" + "sync" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + + "github.com/dolthub/dolt/go/store/datas" + "github.com/dolthub/go-mysql-server/sql" +) + +func TestAutoGCController(t *testing.T) { + NewLogger := func() *logrus.Logger { + res := logrus.New() + res.SetOutput(new(bytes.Buffer)) + return res + } + CtxFactory := func(ctx context.Context) (*sql.Context, error) { + return sql.NewContext(ctx, sql.WithSession(sql.NewBaseSession())), nil + } + t.Run("Hook", func(t *testing.T) { + t.Run("NeverStarted", func(t *testing.T) { + controller := NewAutoGCController(NewLogger()) + hook := controller.newCommitHook("some_database", nil) + hook.stop() + }) + t.Run("StartedBeforeNewHook", func(t *testing.T) { + controller := NewAutoGCController(NewLogger()) + bg := sql.NewBackgroundThreads() + defer bg.Shutdown() + err := controller.RunBackgroundThread(bg, CtxFactory) + require.NoError(t, err) + ctx := context.Background() + dEnv := CreateTestEnvWithName("some_database") + hook := controller.newCommitHook("some_database", dEnv.DoltDB(ctx)) + hook.Execute(ctx, datas.Dataset{}, nil) + hook.stop() + }) + t.Run("StartedAfterNewHook", func(t *testing.T) { + controller := NewAutoGCController(NewLogger()) + bg := sql.NewBackgroundThreads() + defer bg.Shutdown() + ctx := context.Background() + dEnv := CreateTestEnvWithName("some_database") + hook := controller.newCommitHook("some_database", dEnv.DoltDB(ctx)) + err := controller.RunBackgroundThread(bg, CtxFactory) + require.NoError(t, err) + hook.Execute(ctx, datas.Dataset{}, nil) + hook.stop() + }) + t.Run("ExecuteOnCanceledCtx", func(t *testing.T) { + controller := NewAutoGCController(NewLogger()) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + dEnv := CreateTestEnvWithName("some_database") + hook := controller.newCommitHook("some_database", dEnv.DoltDB(ctx)) + _, err := hook.Execute(ctx, datas.Dataset{}, nil) + require.ErrorIs(t, err, context.Canceled) + }) + }) + t.Run("gcBgThread", func(t *testing.T) { + controller := NewAutoGCController(NewLogger()) + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + controller.gcBgThread(ctx) + }() + time.Sleep(50 * time.Millisecond) + cancel() + wg.Wait() + }) + t.Run("DatabaseProviderHooks", func(t *testing.T) { + t.Run("Unstarted", func(t *testing.T) { + controller := NewAutoGCController(NewLogger()) + ctx, err := CtxFactory(context.Background()) + require.NoError(t, err) + dEnv := CreateTestEnvWithName("some_database") + err = controller.InitDatabaseHook()(ctx, nil, "some_database", dEnv, nil) + require.NoError(t, err) + controller.DropDatabaseHook()(nil, "some_database") + }) + t.Run("Started", func(t *testing.T) { + controller := NewAutoGCController(NewLogger()) + bg := sql.NewBackgroundThreads() + defer bg.Shutdown() + err := controller.RunBackgroundThread(bg, CtxFactory) + require.NoError(t, err) + ctx, err := CtxFactory(context.Background()) + require.NoError(t, err) + dEnv := CreateTestEnvWithName("some_database") + err = controller.InitDatabaseHook()(ctx, nil, "some_database", dEnv, nil) + require.NoError(t, err) + controller.DropDatabaseHook()(nil, "some_database") + }) + }) +} From 7f1e9dd3dde65bc84d5075d3efd9ea6a84dc597a Mon Sep 17 00:00:00 2001 From: reltuk Date: Sun, 23 Feb 2025 19:55:06 +0000 Subject: [PATCH 14/15] [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh --- go/libraries/doltcore/sqle/auto_gc_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/libraries/doltcore/sqle/auto_gc_test.go b/go/libraries/doltcore/sqle/auto_gc_test.go index 80387f57b2..367178c9a9 100644 --- a/go/libraries/doltcore/sqle/auto_gc_test.go +++ b/go/libraries/doltcore/sqle/auto_gc_test.go @@ -21,11 +21,11 @@ import ( "testing" "time" + "github.com/dolthub/go-mysql-server/sql" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" "github.com/dolthub/dolt/go/store/datas" - "github.com/dolthub/go-mysql-server/sql" ) func TestAutoGCController(t *testing.T) { From 160b434dbdaafb39d06bae6672e95145cb650eee Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Mon, 24 Feb 2025 14:43:09 -0800 Subject: [PATCH 15/15] go: sqle/auto_gc: Some PR feedback. --- go/libraries/doltcore/doltdb/doltdb.go | 18 ++++++++++++++---- go/libraries/doltcore/sqle/auto_gc.go | 12 ++++++------ 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index feaba4fb14..d9e8597b9b 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1952,15 +1952,21 @@ func (ddb *DoltDB) StoreSizes(ctx context.Context) (StoreSizes, error) { cs := datas.ChunkStoreFromDatabase(ddb.db) if generationalNBS, ok := cs.(*nbs.GenerationalNBS); ok { newgen := generationalNBS.NewGen() - newgenSz, err := newgen.(chunks.TableFileStore).Size(ctx) + newGenTFS, newGenTFSOk := newgen.(chunks.TableFileStore) + totalTFS, totalTFSOk := cs.(chunks.TableFileStore) + newGenNBS, newGenNBSOk := newgen.(*nbs.NomsBlockStore) + if !(newGenTFSOk && totalTFSOk && newGenNBSOk) { + return StoreSizes{}, fmt.Errorf("unexpected newgen or chunk store type for *nbs.GenerationalNBS instance; cannot take store sizes: cs: %T, newgen: %T", cs, newgen) + } + newgenSz, err := newGenTFS.Size(ctx) if err != nil { return StoreSizes{}, err } - totalSz, err := cs.(chunks.TableFileStore).Size(ctx) + totalSz, err := totalTFS.Size(ctx) if err != nil { return StoreSizes{}, err } - journal := newgen.(*nbs.NomsBlockStore).ChunkJournal() + journal := newGenNBS.ChunkJournal() if journal != nil { return StoreSizes{ JournalBytes: uint64(journal.Size()), @@ -1974,7 +1980,11 @@ func (ddb *DoltDB) StoreSizes(ctx context.Context) (StoreSizes, error) { }, nil } } else { - totalSz, err := cs.(chunks.TableFileStore).Size(ctx) + totalTFS, totalTFSOk := cs.(chunks.TableFileStore) + if !totalTFSOk { + return StoreSizes{}, fmt.Errorf("unexpected chunk store type for non-*nbs.GenerationalNBS ddb.db instance; cannot take store sizes: cs: %T", cs) + } + totalSz, err := totalTFS.Size(ctx) if err != nil { return StoreSizes{}, err } diff --git a/go/libraries/doltcore/sqle/auto_gc.go b/go/libraries/doltcore/sqle/auto_gc.go index ac33a42a17..644a13cfe5 100644 --- a/go/libraries/doltcore/sqle/auto_gc.go +++ b/go/libraries/doltcore/sqle/auto_gc.go @@ -288,6 +288,10 @@ func (h *autoGCCommitHook) ExecuteForWorkingSets() bool { return true } +const checkInterval = 1 * time.Second +const size_128mb = (1 << 27) +const defaultCheckSizeThreshold = size_128mb + func (h *autoGCCommitHook) checkForGC(ctx context.Context) error { select { case <-h.done: @@ -299,12 +303,10 @@ func (h *autoGCCommitHook) checkForGC(ctx context.Context) error { if h.lastSz == nil { h.lastSz = &sz } - const size_128mb = (1 << 27) - const size_256mb = (1 << 28) - if sz.JournalBytes > size_128mb { + if sz.JournalBytes > defaultCheckSizeThreshold { // Our first heuristic is simply if journal is greater than a fixed size... return h.requestGC(ctx) - } else if sz.TotalBytes > h.lastSz.TotalBytes && sz.TotalBytes-h.lastSz.TotalBytes > size_256mb { + } else if sz.TotalBytes > h.lastSz.TotalBytes && sz.TotalBytes-h.lastSz.TotalBytes > defaultCheckSizeThreshold { // Or if the store has grown by a fixed size since our last GC / we started watching it... return h.requestGC(ctx) } @@ -314,8 +316,6 @@ func (h *autoGCCommitHook) checkForGC(ctx context.Context) error { return nil } -const checkInterval = 100 * time.Millisecond - func (h *autoGCCommitHook) thread(ctx context.Context) { defer h.wg.Done() timer := time.NewTimer(checkInterval)