Merge pull request #8849 from dolthub/aaron/autogc

sql-server: Add behavior: auto_gc_behavior: enable.
This commit is contained in:
Aaron Son
2025-02-24 15:45:00 -08:00
committed by GitHub
19 changed files with 1055 additions and 80 deletions

View File

@@ -41,6 +41,7 @@ import (
dsqle "github.com/dolthub/dolt/go/libraries/doltcore/sqle"
dblr "github.com/dolthub/dolt/go/libraries/doltcore/sqle/binlogreplication"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/cluster"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dprocedures"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/kvexec"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/mysql_file_handler"
@@ -81,6 +82,7 @@ type SqlEngineConfig struct {
JwksConfig []servercfg.JwksConfig
SystemVariables SystemVariables
ClusterController *cluster.Controller
AutoGCController *dsqle.AutoGCController
BinlogReplicaController binlogreplication.BinlogReplicaController
EventSchedulerStatus eventscheduler.SchedulerStatus
}
@@ -115,7 +117,15 @@ func NewSqlEngine(
return nil, err
}
all := dbs[:]
// Make a copy of the databases. |all| is going to be provided
// as the set of all initial databases to dsqle
// DatabaseProvider. |dbs| is only the databases that came
// from MultiRepoEnv, and they are all real databases based on
// DoltDB instances. |all| is going to include some extension,
// informational databases like |dolt_cluster| sometimes,
// depending on config.
all := make([]dsess.SqlDatabase, len(dbs))
copy(all, dbs)
// this is overwritten only for server sessions
for _, db := range dbs {
@@ -194,6 +204,18 @@ func NewSqlEngine(
statsPro := statspro.NewProvider(pro, statsnoms.NewNomsStatsFactory(mrEnv.RemoteDialProvider()))
engine.Analyzer.Catalog.StatsProvider = statsPro
if config.AutoGCController != nil {
err = config.AutoGCController.RunBackgroundThread(bThreads, sqlEngine.NewDefaultContext)
if err != nil {
return nil, err
}
config.AutoGCController.ApplyCommitHooks(ctx, mrEnv, dbs...)
pro.InitDatabaseHooks = append(pro.InitDatabaseHooks, config.AutoGCController.InitDatabaseHook())
pro.DropDatabaseHooks = append(pro.DropDatabaseHooks, config.AutoGCController.DropDatabaseHook())
// XXX: We force session aware safepoint controller if auto_gc is on.
dprocedures.UseSessionAwareSafepointController = true
}
engine.Analyzer.ExecBuilder = rowexec.NewOverrideBuilder(kvexec.Builder{})
sessFactory := doltSessionFactory(pro, statsPro, mrEnv.Config(), bcController, gcSafepointController, config.Autocommit)
sqlEngine.provider = pro

View File

@@ -489,6 +489,10 @@ func (cfg *commandLineServerConfig) ValueSet(value string) bool {
return ok
}
func (cfg *commandLineServerConfig) AutoGCBehavior() servercfg.AutoGCBehavior {
return stubAutoGCBehavior{}
}
// DoltServerConfigReader is the default implementation of ServerConfigReader suitable for parsing Dolt config files
// and command line options.
type DoltServerConfigReader struct{}
@@ -510,3 +514,10 @@ func (d DoltServerConfigReader) ReadConfigFile(cwdFS filesys.Filesys, file strin
func (d DoltServerConfigReader) ReadConfigArgs(args *argparser.ArgParseResults, dataDirOverride string) (servercfg.ServerConfig, error) {
return NewCommandLineConfig(nil, args, dataDirOverride)
}
type stubAutoGCBehavior struct {
}
func (stubAutoGCBehavior) Enable() bool {
return false
}

View File

@@ -257,6 +257,17 @@ func ConfigureServices(
}
controller.Register(InitEventSchedulerStatus)
InitAutoGCController := &svcs.AnonService{
InitF: func(context.Context) error {
if serverConfig.AutoGCBehavior() != nil &&
serverConfig.AutoGCBehavior().Enable() {
config.AutoGCController = sqle.NewAutoGCController(lgr)
}
return nil
},
}
controller.Register(InitAutoGCController)
var sqlEngine *engine.SqlEngine
InitSqlEngine := &svcs.AnonService{
InitF: func(ctx context.Context) (err error) {

View File

@@ -1917,23 +1917,81 @@ func (ddb *DoltDB) IsTableFileStore() bool {
// ChunkJournal returns the ChunkJournal for this DoltDB, if one is in use.
func (ddb *DoltDB) ChunkJournal() *nbs.ChunkJournal {
tableFileStore, ok := datas.ChunkStoreFromDatabase(ddb.db).(chunks.TableFileStore)
if !ok {
return nil
cs := datas.ChunkStoreFromDatabase(ddb.db)
if generationalNBS, ok := cs.(*nbs.GenerationalNBS); ok {
cs = generationalNBS.NewGen()
}
generationalNbs, ok := tableFileStore.(*nbs.GenerationalNBS)
if !ok {
if nbsStore, ok := cs.(*nbs.NomsBlockStore); ok {
return nbsStore.ChunkJournal()
} else {
return nil
}
}
newGen := generationalNbs.NewGen()
nbs, ok := newGen.(*nbs.NomsBlockStore)
if !ok {
return nil
// An approximate representation of how large the on-disk storage is for a DoltDB.
type StoreSizes struct {
// For ChunkJournal stores, this will be size of the journal file. A size
// of zero does not mean the store is not journaled. The store could be
// journaled, and the journal could be empty.
JournalBytes uint64
// For Generational storages this will be the size of the new gen. It will
// include any JournalBytes. A size of zero does not mean the store is not
// generational, since it could be the case that the store is generational
// but everything in it is in the old gen. In practice, given how we build
// oldgen references today, this will never be the case--there is always
// a little bit of data that only goes in the newgen.
NewGenBytes uint64
// This is the approximate total on-disk storage overhead of the store.
// It includes Journal and NewGenBytes, if there are any.
TotalBytes uint64
}
func (ddb *DoltDB) StoreSizes(ctx context.Context) (StoreSizes, error) {
cs := datas.ChunkStoreFromDatabase(ddb.db)
if generationalNBS, ok := cs.(*nbs.GenerationalNBS); ok {
newgen := generationalNBS.NewGen()
newGenTFS, newGenTFSOk := newgen.(chunks.TableFileStore)
totalTFS, totalTFSOk := cs.(chunks.TableFileStore)
newGenNBS, newGenNBSOk := newgen.(*nbs.NomsBlockStore)
if !(newGenTFSOk && totalTFSOk && newGenNBSOk) {
return StoreSizes{}, fmt.Errorf("unexpected newgen or chunk store type for *nbs.GenerationalNBS instance; cannot take store sizes: cs: %T, newgen: %T", cs, newgen)
}
newgenSz, err := newGenTFS.Size(ctx)
if err != nil {
return StoreSizes{}, err
}
totalSz, err := totalTFS.Size(ctx)
if err != nil {
return StoreSizes{}, err
}
journal := newGenNBS.ChunkJournal()
if journal != nil {
return StoreSizes{
JournalBytes: uint64(journal.Size()),
NewGenBytes: newgenSz,
TotalBytes: totalSz,
}, nil
} else {
return StoreSizes{
NewGenBytes: newgenSz,
TotalBytes: totalSz,
}, nil
}
} else {
totalTFS, totalTFSOk := cs.(chunks.TableFileStore)
if !totalTFSOk {
return StoreSizes{}, fmt.Errorf("unexpected chunk store type for non-*nbs.GenerationalNBS ddb.db instance; cannot take store sizes: cs: %T", cs)
}
totalSz, err := totalTFS.Size(ctx)
if err != nil {
return StoreSizes{}, err
}
return StoreSizes{
TotalBytes: totalSz,
}, nil
}
return nbs.ChunkJournal()
}
func (ddb *DoltDB) TableFileStoreHasJournal(ctx context.Context) (bool, error) {

View File

@@ -36,6 +36,7 @@ import (
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/nbs"
"github.com/dolthub/dolt/go/store/types"
)
@@ -97,7 +98,7 @@ func (rs *RemoteChunkStore) HasChunks(ctx context.Context, req *remotesapi.HasCh
}
repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()
cs, err := rs.getStore(ctx, logger, repoPath)
if err != nil {
@@ -155,7 +156,7 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot
}
repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()
cs, err := rs.getStore(ctx, logger, repoPath)
if err != nil {
@@ -233,7 +234,7 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
"num_requested": numHashes,
"num_urls": numUrls,
"num_ranges": numRanges,
}).Info("finished")
}).Trace("finished")
}()
logger := ologger
@@ -387,7 +388,7 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes
}
repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()
_, err := rs.getStore(ctx, logger, repoPath)
if err != nil {
@@ -445,7 +446,7 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe
}
repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()
_, err := rs.getStore(ctx, logger, repoPath)
if err != nil {
@@ -462,7 +463,7 @@ func (rs *RemoteChunkStore) Root(ctx context.Context, req *remotesapi.RootReques
}
repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()
cs, err := rs.getStore(ctx, logger, repoPath)
if err != nil {
@@ -485,7 +486,7 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe
}
repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()
cs, err := rs.getStore(ctx, logger, repoPath)
if err != nil {
@@ -500,7 +501,11 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe
err = cs.AddTableFilesToManifest(ctx, updates, rs.getAddrs(cs.Version()))
if err != nil {
logger.WithError(err).Error("error calling AddTableFilesToManifest")
return nil, status.Errorf(codes.Internal, "manifest update error: %v", err)
code := codes.Internal
if errors.Is(err, nbs.ErrDanglingRef) || errors.Is(err, nbs.ErrTableFileNotFound) {
code = codes.FailedPrecondition
}
return nil, status.Errorf(code, "manifest update error: %v", err)
}
currHash := hash.New(req.Current)
@@ -513,7 +518,11 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe
"last_hash": lastHash.String(),
"curr_hash": currHash.String(),
}).Error("error calling Commit")
return nil, status.Errorf(codes.Internal, "failed to commit: %v", err)
code := codes.Internal
if errors.Is(err, nbs.ErrDanglingRef) || errors.Is(err, nbs.ErrTableFileNotFound) {
code = codes.FailedPrecondition
}
return nil, status.Errorf(code, "failed to commit: %v", err)
}
logger.Tracef("Commit success; moved from %s -> %s", lastHash.String(), currHash.String())
@@ -528,7 +537,7 @@ func (rs *RemoteChunkStore) GetRepoMetadata(ctx context.Context, req *remotesapi
repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()
cs, err := rs.getOrCreateStore(ctx, logger, repoPath, req.ClientRepoFormat.NbfVersion)
if err != nil {
@@ -556,7 +565,7 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi.
}
repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()
cs, err := rs.getStore(ctx, logger, repoPath)
if err != nil {
@@ -634,7 +643,7 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A
}
repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()
cs, err := rs.getStore(ctx, logger, repoPath)
if err != nil {
@@ -649,7 +658,11 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A
err = cs.AddTableFilesToManifest(ctx, updates, rs.getAddrs(cs.Version()))
if err != nil {
logger.WithError(err).Error("error occurred updating the manifest")
return nil, status.Error(codes.Internal, "manifest update error")
code := codes.Internal
if errors.Is(err, nbs.ErrDanglingRef) || errors.Is(err, nbs.ErrTableFileNotFound) {
code = codes.FailedPrecondition
}
return nil, status.Error(code, "manifest update error")
}
logger = logger.WithFields(logrus.Fields{
@@ -707,7 +720,7 @@ func getReqLogger(lgr *logrus.Entry, method string) *logrus.Entry {
"method": method,
"request_num": strconv.Itoa(incReqId()),
})
lgr.Info("starting request")
lgr.Trace("starting request")
return lgr
}

View File

@@ -66,7 +66,7 @@ func newFileHandler(lgr *logrus.Entry, dbCache DBCache, fs filesys.Filesys, read
func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
logger := getReqLogger(fh.lgr, req.Method+"_"+req.RequestURI)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()
var err error
req.URL, err = fh.sealer.Unseal(req.URL)

View File

@@ -48,6 +48,7 @@ const (
DefaultReadOnly = false
DefaultLogLevel = LogLevel_Info
DefaultAutoCommit = true
DefaultAutoGCBehaviorEnable = false
DefaultDoltTransactionCommit = false
DefaultMaxConnections = 100
DefaultDataDir = "."
@@ -198,6 +199,8 @@ type ServerConfig interface {
EventSchedulerStatus() string
// ValueSet returns whether the value string provided was explicitly set in the config
ValueSet(value string) bool
// AutoGCBehavior defines parameters around how auto-GC works for the running server.
AutoGCBehavior() AutoGCBehavior
}
// DefaultServerConfig creates a `*ServerConfig` that has all of the options set to their default values.
@@ -214,6 +217,9 @@ func defaultServerConfigYAML() *YAMLConfig {
ReadOnly: ptr(DefaultReadOnly),
AutoCommit: ptr(DefaultAutoCommit),
DoltTransactionCommit: ptr(DefaultDoltTransactionCommit),
AutoGCBehavior: &AutoGCBehaviorYAMLConfig{
Enable_: ptr(DefaultAutoGCBehaviorEnable),
},
},
UserConfig: UserYAMLConfig{
Name: ptr(""),
@@ -445,3 +451,7 @@ func CheckForUnixSocket(config ServerConfig) (string, bool, error) {
return "", false, nil
}
type AutoGCBehavior interface {
Enable() bool
}

View File

@@ -65,6 +65,8 @@ type BehaviorYAMLConfig struct {
DoltTransactionCommit *bool `yaml:"dolt_transaction_commit,omitempty"`
EventSchedulerStatus *string `yaml:"event_scheduler,omitempty" minver:"1.17.0"`
AutoGCBehavior *AutoGCBehaviorYAMLConfig `yaml:"auto_gc_behavior,omitempty" minver:"TBD"`
}
// UserYAMLConfig contains server configuration regarding the user account clients must use to connect
@@ -176,6 +178,7 @@ func YamlConfigFromFile(fs filesys.Filesys, path string) (ServerConfig, error) {
func ServerConfigAsYAMLConfig(cfg ServerConfig) *YAMLConfig {
systemVars := cfg.SystemVars()
autoGCBehavior := toAutoGCBehaviorYAML(cfg.AutoGCBehavior())
return &YAMLConfig{
LogLevelStr: ptr(string(cfg.LogLevel())),
MaxQueryLenInLogs: nillableIntPtr(cfg.MaxLoggedQueryLen()),
@@ -186,6 +189,7 @@ func ServerConfigAsYAMLConfig(cfg ServerConfig) *YAMLConfig {
DisableClientMultiStatements: ptr(cfg.DisableClientMultiStatements()),
DoltTransactionCommit: ptr(cfg.DoltTransactionCommit()),
EventSchedulerStatus: ptr(cfg.EventSchedulerStatus()),
AutoGCBehavior: autoGCBehavior,
},
ListenerConfig: ListenerYAMLConfig{
HostStr: ptr(cfg.Host()),
@@ -817,6 +821,13 @@ func (cfg YAMLConfig) ClusterConfig() ClusterConfig {
return cfg.ClusterCfg
}
func (cfg YAMLConfig) AutoGCBehavior() AutoGCBehavior {
if cfg.BehaviorConfig.AutoGCBehavior == nil {
return nil
}
return cfg.BehaviorConfig.AutoGCBehavior
}
func (cfg YAMLConfig) EventSchedulerStatus() string {
if cfg.BehaviorConfig.EventSchedulerStatus == nil {
return "ON"
@@ -922,3 +933,20 @@ func (cfg YAMLConfig) ValueSet(value string) bool {
}
return false
}
type AutoGCBehaviorYAMLConfig struct {
Enable_ *bool `yaml:"enable,omitempty" minver:"TBD"`
}
func (a *AutoGCBehaviorYAMLConfig) Enable() bool {
if a.Enable_ == nil {
return false
}
return *a.Enable_
}
func toAutoGCBehaviorYAML(a AutoGCBehavior) *AutoGCBehaviorYAMLConfig {
return &AutoGCBehaviorYAMLConfig{
Enable_: ptr(a.Enable()),
}
}

View File

@@ -34,6 +34,8 @@ behavior:
dolt_transaction_commit: true
disable_client_multi_statements: false
event_scheduler: ON
auto_gc_behavior:
enable: false
listener:
host: localhost

View File

@@ -0,0 +1,348 @@
// Copyright 2025 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqle
import (
"context"
"errors"
"io"
"sync"
"time"
"github.com/dolthub/go-mysql-server/sql"
"github.com/sirupsen/logrus"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/env"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dprocedures"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/datas"
"github.com/dolthub/dolt/go/store/types"
)
// Auto GC is the ability of a running SQL server engine to perform
// dolt_gc() behaviors periodically. If enabled, it currently works as
// follows:
//
// An AutoGCController is created for a running SQL Engine. The
// controller runs a background thread which is only ever running one
// GC at a time. Post Commit Hooks are installed on every database in
// the DoltDatabaseProvider for the SQL Engine. Those hooks check if
// it is time to perform a GC for that particular database. If it is,
// they forward a request the background thread to register the
// database as wanting a GC.
type AutoGCController struct {
workCh chan autoGCWork
lgr *logrus.Logger
mu sync.Mutex
hooks map[string]*autoGCCommitHook
ctxF func(context.Context) (*sql.Context, error)
threads *sql.BackgroundThreads
}
func NewAutoGCController(lgr *logrus.Logger) *AutoGCController {
return &AutoGCController{
workCh: make(chan autoGCWork),
lgr: lgr,
hooks: make(map[string]*autoGCCommitHook),
}
}
// Passed by a commit hook to the auto-GC thread, requesting the
// thread to dolt_gc |db|. When the GC is finished, |done| will be
// closed. Signalling completion allows the commit hook to only
// submit one dolt_gc request at a time.
type autoGCWork struct {
db *doltdb.DoltDB
done chan struct{}
name string // only for logging.
}
// During engine initialization, this should be called to ensure the
// background worker threads responsible for performing the GC are
// running.
func (c *AutoGCController) RunBackgroundThread(threads *sql.BackgroundThreads, ctxF func(context.Context) (*sql.Context, error)) error {
c.threads = threads
c.ctxF = ctxF
err := threads.Add("auto_gc_thread", c.gcBgThread)
if err != nil {
return err
}
for _, hook := range c.hooks {
err = hook.run(threads)
if err != nil {
return err
}
}
return nil
}
func (c *AutoGCController) gcBgThread(ctx context.Context) {
var wg sync.WaitGroup
runCh := make(chan autoGCWork)
wg.Add(1)
go func() {
defer wg.Done()
dbs := make([]autoGCWork, 0)
// Accumulate GC requests, only one will come in per database at a time.
// Send the oldest one out to the worker when it is ready.
for {
var toSendCh chan autoGCWork
var toSend autoGCWork
if len(dbs) > 0 {
toSend = dbs[0]
toSendCh = runCh
}
select {
case <-ctx.Done():
// sql.BackgroundThreads is shutting down.
// No need to drain or anything; just
// return.
return
case newDB := <-c.workCh:
dbs = append(dbs, newDB)
case toSendCh <- toSend:
// We just sent the front of the slice.
// Delete it from our set of pending GCs.
copy(dbs[:], dbs[1:])
dbs = dbs[:len(dbs)-1]
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case work := <-runCh:
c.doWork(ctx, work, c.ctxF)
}
}
}()
wg.Wait()
}
func (c *AutoGCController) doWork(ctx context.Context, work autoGCWork, ctxF func(context.Context) (*sql.Context, error)) {
defer close(work.done)
sqlCtx, err := ctxF(ctx)
if err != nil {
c.lgr.Warnf("sqle/auto_gc: Could not create session to GC %s: %v", work.name, err)
return
}
c.lgr.Tracef("sqle/auto_gc: Beginning auto GC of database %s", work.name)
start := time.Now()
defer sql.SessionEnd(sqlCtx.Session)
sql.SessionCommandBegin(sqlCtx.Session)
defer sql.SessionCommandEnd(sqlCtx.Session)
err = dprocedures.RunDoltGC(sqlCtx, work.db, types.GCModeDefault, work.name)
if err != nil {
if !errors.Is(err, chunks.ErrNothingToCollect) {
c.lgr.Warnf("sqle/auto_gc: Attempt to auto GC database %s failed with error: %v", work.name, err)
}
return
}
c.lgr.Infof("sqle/auto_gc: Successfully completed auto GC of database %s in %v", work.name, time.Since(start))
}
func (c *AutoGCController) newCommitHook(name string, db *doltdb.DoltDB) *autoGCCommitHook {
c.mu.Lock()
defer c.mu.Unlock()
closed := make(chan struct{})
close(closed)
ret := &autoGCCommitHook{
c: c,
name: name,
done: closed,
next: make(chan struct{}),
db: db,
tickCh: make(chan struct{}),
stopCh: make(chan struct{}),
}
c.hooks[name] = ret
if c.threads != nil {
// If this errors, sql.BackgroundThreads is already closed.
// Things are hopefully shutting down...
_ = ret.run(c.threads)
}
return ret
}
// The doltdb.CommitHook which watches for database changes and
// requests dolt_gcs.
type autoGCCommitHook struct {
c *AutoGCController
name string
// When |done| is closed, there is no GC currently running or
// pending for this database. If it is open, then there is a
// pending request for GC or a GC is currently running. Once
// |done| is closed, we can check for auto GC conditions on
// the database to see if we should request a new GC.
done chan struct{}
// It simplifies the logic and efficiency of the
// implementation a bit to have an already allocated channel
// we can try to send when we request a GC. If will become our
// new |done| channel once we send it successfully.
next chan struct{}
// lastSz is set the first time we observe StoreSizes after a
// GC or after the server comes up. It is used in some simple
// growth heuristics to figure out if we want to run a GC. We
// set it back to |nil| when we successfully submit a request
// to GC, so that we observe and store the new size after the
// GC is finished.
lastSz *doltdb.StoreSizes
db *doltdb.DoltDB
// Closed when the thread should shutdown because the database
// is being removed.
stopCh chan struct{}
// An optimistic send on this channel notifies the background
// thread that the sizes may have changed and it can check for
// the GC condition.
tickCh chan struct{}
wg sync.WaitGroup
}
// During engine initialization, called on the original set of
// databases to configure them for auto-GC.
func (c *AutoGCController) ApplyCommitHooks(ctx context.Context, mrEnv *env.MultiRepoEnv, dbs ...dsess.SqlDatabase) error {
for _, db := range dbs {
denv := mrEnv.GetEnv(db.Name())
if denv == nil {
continue
}
ddb := denv.DoltDB(ctx)
ddb.PrependCommitHooks(ctx, c.newCommitHook(db.Name(), ddb))
}
return nil
}
func (c *AutoGCController) DropDatabaseHook() DropDatabaseHook {
return func(_ *sql.Context, name string) {
c.mu.Lock()
defer c.mu.Unlock()
hook := c.hooks[name]
if hook != nil {
hook.stop()
delete(c.hooks, name)
}
}
}
func (c *AutoGCController) InitDatabaseHook() InitDatabaseHook {
return func(ctx *sql.Context, _ *DoltDatabaseProvider, name string, env *env.DoltEnv, _ dsess.SqlDatabase) error {
ddb := env.DoltDB(ctx)
ddb.PrependCommitHooks(ctx, c.newCommitHook(name, ddb))
return nil
}
}
func (h *autoGCCommitHook) Execute(ctx context.Context, _ datas.Dataset, _ *doltdb.DoltDB) (func(context.Context) error, error) {
select {
case h.tickCh <- struct{}{}:
return nil, nil
case <-ctx.Done():
return nil, context.Cause(ctx)
}
}
func (h *autoGCCommitHook) requestGC(ctx context.Context) error {
select {
case h.c.workCh <- autoGCWork{h.db, h.next, h.name}:
h.done = h.next
h.next = make(chan struct{})
h.lastSz = nil
return nil
case <-ctx.Done():
return context.Cause(ctx)
}
}
func (h *autoGCCommitHook) HandleError(ctx context.Context, err error) error {
return nil
}
func (h *autoGCCommitHook) SetLogger(ctx context.Context, wr io.Writer) error {
return nil
}
func (h *autoGCCommitHook) ExecuteForWorkingSets() bool {
return true
}
const checkInterval = 1 * time.Second
const size_128mb = (1 << 27)
const defaultCheckSizeThreshold = size_128mb
func (h *autoGCCommitHook) checkForGC(ctx context.Context) error {
select {
case <-h.done:
sz, err := h.db.StoreSizes(ctx)
if err != nil {
// Something is probably quite wrong. Regardless, can't determine if we should GC.
return err
}
if h.lastSz == nil {
h.lastSz = &sz
}
if sz.JournalBytes > defaultCheckSizeThreshold {
// Our first heuristic is simply if journal is greater than a fixed size...
return h.requestGC(ctx)
} else if sz.TotalBytes > h.lastSz.TotalBytes && sz.TotalBytes-h.lastSz.TotalBytes > defaultCheckSizeThreshold {
// Or if the store has grown by a fixed size since our last GC / we started watching it...
return h.requestGC(ctx)
}
default:
// A GC is already running or pending. No need to check.
}
return nil
}
func (h *autoGCCommitHook) thread(ctx context.Context) {
defer h.wg.Done()
timer := time.NewTimer(checkInterval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return
case <-h.stopCh:
return
case <-h.tickCh:
// We ignore an error here, which just means we didn't kick
// off a GC when we might have wanted to.
_ = h.checkForGC(ctx)
case <-timer.C:
_ = h.checkForGC(ctx)
timer.Reset(checkInterval)
}
}
}
func (h *autoGCCommitHook) stop() {
close(h.stopCh)
h.wg.Wait()
}
func (h *autoGCCommitHook) run(threads *sql.BackgroundThreads) error {
h.wg.Add(1)
return threads.Add("auto_gc_thread["+h.name+"]", h.thread)
}

View File

@@ -0,0 +1,117 @@
// Copyright 2025 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sqle
import (
"bytes"
"context"
"sync"
"testing"
"time"
"github.com/dolthub/go-mysql-server/sql"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/require"
"github.com/dolthub/dolt/go/store/datas"
)
func TestAutoGCController(t *testing.T) {
NewLogger := func() *logrus.Logger {
res := logrus.New()
res.SetOutput(new(bytes.Buffer))
return res
}
CtxFactory := func(ctx context.Context) (*sql.Context, error) {
return sql.NewContext(ctx, sql.WithSession(sql.NewBaseSession())), nil
}
t.Run("Hook", func(t *testing.T) {
t.Run("NeverStarted", func(t *testing.T) {
controller := NewAutoGCController(NewLogger())
hook := controller.newCommitHook("some_database", nil)
hook.stop()
})
t.Run("StartedBeforeNewHook", func(t *testing.T) {
controller := NewAutoGCController(NewLogger())
bg := sql.NewBackgroundThreads()
defer bg.Shutdown()
err := controller.RunBackgroundThread(bg, CtxFactory)
require.NoError(t, err)
ctx := context.Background()
dEnv := CreateTestEnvWithName("some_database")
hook := controller.newCommitHook("some_database", dEnv.DoltDB(ctx))
hook.Execute(ctx, datas.Dataset{}, nil)
hook.stop()
})
t.Run("StartedAfterNewHook", func(t *testing.T) {
controller := NewAutoGCController(NewLogger())
bg := sql.NewBackgroundThreads()
defer bg.Shutdown()
ctx := context.Background()
dEnv := CreateTestEnvWithName("some_database")
hook := controller.newCommitHook("some_database", dEnv.DoltDB(ctx))
err := controller.RunBackgroundThread(bg, CtxFactory)
require.NoError(t, err)
hook.Execute(ctx, datas.Dataset{}, nil)
hook.stop()
})
t.Run("ExecuteOnCanceledCtx", func(t *testing.T) {
controller := NewAutoGCController(NewLogger())
ctx, cancel := context.WithCancel(context.Background())
cancel()
dEnv := CreateTestEnvWithName("some_database")
hook := controller.newCommitHook("some_database", dEnv.DoltDB(ctx))
_, err := hook.Execute(ctx, datas.Dataset{}, nil)
require.ErrorIs(t, err, context.Canceled)
})
})
t.Run("gcBgThread", func(t *testing.T) {
controller := NewAutoGCController(NewLogger())
ctx, cancel := context.WithCancel(context.Background())
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
controller.gcBgThread(ctx)
}()
time.Sleep(50 * time.Millisecond)
cancel()
wg.Wait()
})
t.Run("DatabaseProviderHooks", func(t *testing.T) {
t.Run("Unstarted", func(t *testing.T) {
controller := NewAutoGCController(NewLogger())
ctx, err := CtxFactory(context.Background())
require.NoError(t, err)
dEnv := CreateTestEnvWithName("some_database")
err = controller.InitDatabaseHook()(ctx, nil, "some_database", dEnv, nil)
require.NoError(t, err)
controller.DropDatabaseHook()(nil, "some_database")
})
t.Run("Started", func(t *testing.T) {
controller := NewAutoGCController(NewLogger())
bg := sql.NewBackgroundThreads()
defer bg.Shutdown()
err := controller.RunBackgroundThread(bg, CtxFactory)
require.NoError(t, err)
ctx, err := CtxFactory(context.Background())
require.NoError(t, err)
dEnv := CreateTestEnvWithName("some_database")
err = controller.InitDatabaseHook()(ctx, nil, "some_database", dEnv, nil)
require.NoError(t, err)
controller.DropDatabaseHook()(nil, "some_database")
})
})
}

View File

@@ -317,7 +317,7 @@ func (c *Controller) applyCommitHooks(ctx context.Context, name string, denv *en
}
}
commitHook := newCommitHook(c.lgr, r.Name(), remote.Url, name, c.role, func(ctx context.Context) (*doltdb.DoltDB, error) {
return remote.GetRemoteDB(ctx, types.Format_Default, dialprovider)
return remote.GetRemoteDBWithoutCaching(ctx, types.Format_Default, dialprovider)
}, denv.DoltDB(ctx), ttfdir)
denv.DoltDB(ctx).PrependCommitHooks(ctx, commitHook)
hooks = append(hooks, commitHook)

View File

@@ -64,7 +64,7 @@ func NewInitDatabaseHook(controller *Controller, bt *sql.BackgroundThreads) sqle
}
remoteDBs = append(remoteDBs, func(ctx context.Context) (*doltdb.DoltDB, error) {
return er.GetRemoteDB(ctx, types.Format_Default, dialprovider)
return er.GetRemoteDBWithoutCaching(ctx, types.Format_Default, dialprovider)
})
remoteUrls = append(remoteUrls, remoteUrl)
}

View File

@@ -38,15 +38,13 @@ const (
cmdSuccess = 0
)
var useSessionAwareSafepointController bool
func init() {
if os.Getenv(dconfig.EnvDisableGcProcedure) != "" {
DoltGCFeatureFlag = false
}
if choice := os.Getenv(dconfig.EnvGCSafepointControllerChoice); choice != "" {
if choice == "session_aware" {
useSessionAwareSafepointController = true
UseSessionAwareSafepointController = true
} else if choice != "kill_connections" {
panic("Invalid value for " + dconfig.EnvGCSafepointControllerChoice + ". must be session_aware or kill_connections")
}
@@ -54,6 +52,7 @@ func init() {
}
var DoltGCFeatureFlag = true
var UseSessionAwareSafepointController = false
// doltGC is the stored procedure to run online garbage collection on a database.
func doltGC(ctx *sql.Context, args ...string) (sql.RowIter, error) {
@@ -158,28 +157,28 @@ func (sc killConnectionsSafepointController) CancelSafepoint() {
}
type sessionAwareSafepointController struct {
controller *dsess.GCSafepointController
callCtx *sql.Context
origEpoch int
doltDB *doltdb.DoltDB
controller *dsess.GCSafepointController
dbname string
callSession *dsess.DoltSession
origEpoch int
doltDB *doltdb.DoltDB
waiter *dsess.GCSafepointWaiter
keeper func(hash.Hash) bool
}
func (sc *sessionAwareSafepointController) visit(ctx context.Context, sess *dsess.DoltSession) error {
return sess.VisitGCRoots(ctx, sc.callCtx.GetCurrentDatabase(), sc.keeper)
return sess.VisitGCRoots(ctx, sc.dbname, sc.keeper)
}
func (sc *sessionAwareSafepointController) BeginGC(ctx context.Context, keeper func(hash.Hash) bool) error {
sc.doltDB.PurgeCaches()
sc.keeper = keeper
thisSess := dsess.DSessFromSess(sc.callCtx.Session)
err := sc.visit(ctx, thisSess)
err := sc.visit(ctx, sc.callSession)
if err != nil {
return err
}
sc.waiter = sc.controller.Waiter(ctx, thisSess, sc.visit)
sc.waiter = sc.controller.Waiter(ctx, sc.callSession, sc.visit)
return nil
}
@@ -188,7 +187,7 @@ func (sc *sessionAwareSafepointController) EstablishPreFinalizeSafepoint(ctx con
}
func (sc *sessionAwareSafepointController) EstablishPostFinalizeSafepoint(ctx context.Context) error {
return checkEpochSame(sc.origEpoch)
return nil
}
func (sc *sessionAwareSafepointController) CancelSafepoint() {
@@ -232,46 +231,12 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) {
return cmdFailure, err
}
} else {
// Currently, if this server is involved in cluster
// replication, a full GC is only safe to run on the primary.
// We assert that we are the primary here before we begin, and
// we assert again that we are the primary at the same epoch as
// we establish the safepoint.
origepoch := -1
if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok {
// TODO: magic constant...
if role.(string) != "primary" {
return cmdFailure, fmt.Errorf("cannot run a full dolt_gc() while cluster replication is enabled and role is %s; must be the primary", role.(string))
}
_, epoch, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleEpochVariable)
if !ok {
return cmdFailure, fmt.Errorf("internal error: cannot run a full dolt_gc(); cluster replication is enabled but could not read %s", dsess.DoltClusterRoleEpochVariable)
}
origepoch = epoch.(int)
}
var mode types.GCMode = types.GCModeDefault
if apr.Contains(cli.FullFlag) {
mode = types.GCModeFull
}
var sc types.GCSafepointController
if useSessionAwareSafepointController {
gcSafepointController := dSess.GCSafepointController()
sc = &sessionAwareSafepointController{
origEpoch: origepoch,
callCtx: ctx,
controller: gcSafepointController,
doltDB: ddb,
}
} else {
sc = killConnectionsSafepointController{
origEpoch: origepoch,
callCtx: ctx,
doltDB: ddb,
}
}
err = ddb.GC(ctx, mode, sc)
err := RunDoltGC(ctx, ddb, mode, ctx.GetCurrentDatabase())
if err != nil {
return cmdFailure, err
}
@@ -279,3 +244,40 @@ func doDoltGC(ctx *sql.Context, args []string) (int, error) {
return cmdSuccess, nil
}
func RunDoltGC(ctx *sql.Context, ddb *doltdb.DoltDB, mode types.GCMode, dbname string) error {
var sc types.GCSafepointController
if UseSessionAwareSafepointController {
dSess := dsess.DSessFromSess(ctx.Session)
gcSafepointController := dSess.GCSafepointController()
sc = &sessionAwareSafepointController{
callSession: dSess,
dbname: dbname,
controller: gcSafepointController,
doltDB: ddb,
}
} else {
// Legacy safepoint controller behavior was to not
// allow GC on a standby server. GC on a standby server
// with killConnections safepoints should be safe now,
// but we retain this legacy behavior for now.
origepoch := -1
if _, role, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleVariable); ok {
// TODO: magic constant...
if role.(string) != "primary" {
return fmt.Errorf("cannot run a full dolt_gc() while cluster replication is enabled and role is %s; must be the primary", role.(string))
}
_, epoch, ok := sql.SystemVariables.GetGlobal(dsess.DoltClusterRoleEpochVariable)
if !ok {
return fmt.Errorf("internal error: cannot run a full dolt_gc(); cluster replication is enabled but could not read %s", dsess.DoltClusterRoleEpochVariable)
}
origepoch = epoch.(int)
}
sc = killConnectionsSafepointController{
origEpoch: origepoch,
callCtx: ctx,
doltDB: ddb,
}
}
return ddb.GC(ctx, mode, sc)
}

View File

@@ -33,6 +33,8 @@ import (
"github.com/dolthub/dolt/go/store/hash"
)
var ErrTableFileNotFound = errors.New("table file not found")
type fileTableReader struct {
tableReader
h hash.Hash
@@ -81,7 +83,7 @@ func newFileTableReader(ctx context.Context, dir string, h hash.Hash, chunkCount
} else if afExists {
return newArchiveChunkSource(ctx, dir, h, chunkCount, q)
}
return nil, errors.New(fmt.Sprintf("table file %s/%s not found", dir, h.String()))
return nil, fmt.Errorf("error opening table file: %w: %s/%s", ErrTableFileNotFound, dir, h.String())
}
func nomsFileTableReader(ctx context.Context, path string, h hash.Hash, chunkCount uint32, q MemoryQuotaProvider) (cs chunkSource, err error) {

View File

@@ -498,6 +498,14 @@ func (j *ChunkJournal) AccessMode() chunks.ExclusiveAccessMode {
return chunks.ExclusiveAccessMode_Exclusive
}
func (j *ChunkJournal) Size() int64 {
if j.wr != nil {
return j.wr.size()
} else {
return 0
}
}
type journalConjoiner struct {
child conjoinStrategy
}

View File

@@ -472,6 +472,12 @@ func (wr *journalWriter) commitRootHash(ctx context.Context, root hash.Hash) err
return wr.commitRootHashUnlocked(ctx, root)
}
func (wr *journalWriter) size() int64 {
wr.lock.Lock()
defer wr.lock.Unlock()
return wr.off
}
func (wr *journalWriter) commitRootHashUnlocked(ctx context.Context, root hash.Hash) error {
defer trace.StartRegion(ctx, "commit-root").End()

View File

@@ -955,10 +955,10 @@ func (nbs *NomsBlockStore) getManyWithFunc(
nbs.stats.ChunksPerGet.Sample(uint64(len(hashes)))
}()
reqs := toGetRecords(hashes)
const ioParallelism = 16
for {
reqs := toGetRecords(hashes)
nbs.mu.Lock()
keeper := nbs.keeperFunc
if gcDepMode == gcDependencyMode_NoDependency {
@@ -1735,7 +1735,7 @@ func refCheckAllSources(ctx context.Context, nbs *NomsBlockStore, getAddrs chunk
checkErr = err
}
if len(remaining) > 0 {
checkErr = fmt.Errorf("%w, missing: %v", errors.New("cannot add table files; referenced chunk not found in store."), remaining)
checkErr = fmt.Errorf("cannot add table files: %w, missing: %v", ErrTableFileNotFound, remaining)
}
}
for _, source := range sources {

View File

@@ -0,0 +1,337 @@
// Copyright 2025 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"context"
"database/sql"
"fmt"
"math/rand/v2"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
driver "github.com/dolthub/dolt/go/libraries/doltcore/dtestutils/sql_server_driver"
)
func TestAutoGC(t *testing.T) {
var enabled_16, final_16, disabled, final_disabled RepoSize
t.Run("Enable", func(t *testing.T) {
t.Run("CommitEvery16", func(t *testing.T) {
var s AutoGCTest
s.Enable = true
enabled_16, final_16 = runAutoGCTest(t, &s, 64, 16)
assert.Contains(t, string(s.PrimaryServer.Output.Bytes()), "Successfully completed auto GC")
t.Logf("repo size before final gc: %v", enabled_16)
t.Logf("repo size after final gc: %v", final_16)
})
t.Run("ClusterReplication", func(t *testing.T) {
var s AutoGCTest
s.Enable = true
s.Replicate = true
enabled_16, final_16 = runAutoGCTest(t, &s, 64, 16)
assert.Contains(t, string(s.PrimaryServer.Output.Bytes()), "Successfully completed auto GC")
assert.Contains(t, string(s.StandbyServer.Output.Bytes()), "Successfully completed auto GC")
t.Logf("repo size before final gc: %v", enabled_16)
t.Logf("repo size after final gc: %v", final_16)
rs, err := GetRepoSize(s.StandbyDir)
require.NoError(t, err)
t.Logf("standby size: %v", rs)
})
})
t.Run("Disabled", func(t *testing.T) {
var s AutoGCTest
disabled, final_disabled = runAutoGCTest(t, &s, 64, 128)
assert.NotContains(t, string(s.PrimaryServer.Output.Bytes()), "Successfully completed auto GC")
t.Logf("repo size before final gc: %v", disabled)
t.Logf("repo size after final gc: %v", final_disabled)
})
if enabled_16.NewGen > 0 && disabled.NewGen > 0 {
assert.Greater(t, enabled_16.OldGen, disabled.OldGen)
assert.Greater(t, enabled_16.OldGenC, disabled.OldGenC)
assert.Less(t, disabled.NewGen-disabled.Journal, disabled.Journal)
}
}
type AutoGCTest struct {
Enable bool
PrimaryDir string
PrimaryServer *driver.SqlServer
PrimaryDB *sql.DB
Replicate bool
StandbyDir string
StandbyServer *driver.SqlServer
StandbyDB *sql.DB
}
func (s *AutoGCTest) Setup(ctx context.Context, t *testing.T) {
u, err := driver.NewDoltUser()
require.NoError(t, err)
t.Cleanup(func() {
u.Cleanup()
})
s.CreatePrimaryServer(ctx, t, u)
if s.Replicate {
u, err := driver.NewDoltUser()
require.NoError(t, err)
t.Cleanup(func() {
u.Cleanup()
})
s.CreateStandbyServer(ctx, t, u)
}
s.CreatePrimaryDatabase(ctx, t)
}
func (s *AutoGCTest) CreatePrimaryServer(ctx context.Context, t *testing.T, u driver.DoltUser) {
rs, err := u.MakeRepoStore()
require.NoError(t, err)
repo, err := rs.MakeRepo("auto_gc_test")
require.NoError(t, err)
behaviorFragment := fmt.Sprintf(`
behavior:
auto_gc_behavior:
enable: %v
`, s.Enable)
var clusterFragment string
if s.Replicate {
clusterFragment = `
cluster:
standby_remotes:
- name: standby
remote_url_template: http://localhost:3852/{database}
bootstrap_role: primary
bootstrap_epoch: 1
remotesapi:
port: 3851
`
}
err = driver.WithFile{
Name: "server.yaml",
Contents: behaviorFragment + clusterFragment,
}.WriteAtDir(repo.Dir)
require.NoError(t, err)
server := MakeServer(t, repo, &driver.Server{
Args: []string{"--config", "server.yaml"},
})
server.DBName = "auto_gc_test"
db, err := server.DB(driver.Connection{User: "root"})
require.NoError(t, err)
t.Cleanup(func() {
db.Close()
})
s.PrimaryDir = repo.Dir
s.PrimaryDB = db
s.PrimaryServer = server
}
func (s *AutoGCTest) CreateStandbyServer(ctx context.Context, t *testing.T, u driver.DoltUser) {
rs, err := u.MakeRepoStore()
require.NoError(t, err)
repo, err := rs.MakeRepo("auto_gc_test")
require.NoError(t, err)
behaviorFragment := fmt.Sprintf(`
listener:
host: 0.0.0.0
port: 3308
behavior:
auto_gc_behavior:
enable: %v
`, s.Enable)
var clusterFragment string
if s.Replicate {
clusterFragment = `
cluster:
standby_remotes:
- name: primary
remote_url_template: http://localhost:3851/{database}
bootstrap_role: standby
bootstrap_epoch: 1
remotesapi:
port: 3852
`
}
err = driver.WithFile{
Name: "server.yaml",
Contents: behaviorFragment + clusterFragment,
}.WriteAtDir(repo.Dir)
require.NoError(t, err)
server := MakeServer(t, repo, &driver.Server{
Args: []string{"--config", "server.yaml"},
Port: 3308,
})
server.DBName = "auto_gc_test"
db, err := server.DB(driver.Connection{User: "root"})
require.NoError(t, err)
t.Cleanup(func() {
db.Close()
})
s.StandbyDir = repo.Dir
s.StandbyDB = db
s.StandbyServer = server
}
func (s *AutoGCTest) CreatePrimaryDatabase(ctx context.Context, t *testing.T) {
// Create the database...
conn, err := s.PrimaryDB.Conn(ctx)
require.NoError(t, err)
_, err = conn.ExecContext(ctx, `
create table vals (
id bigint primary key,
v1 bigint,
v2 bigint,
v3 bigint,
v4 bigint,
index (v1),
index (v2),
index (v3),
index (v4),
index (v1,v2),
index (v1,v3),
index (v1,v4),
index (v2,v3),
index (v2,v4),
index (v2,v1),
index (v3,v1),
index (v3,v2),
index (v3,v4),
index (v4,v1),
index (v4,v2),
index (v4,v3)
)
`)
require.NoError(t, err)
_, err = conn.ExecContext(ctx, "call dolt_commit('-Am', 'create vals table')")
require.NoError(t, err)
require.NoError(t, conn.Close())
}
func autoGCInsertStatement(i int) string {
var vals []string
for j := i * 1024; j < (i+1)*1024; j++ {
var vs [4]string
vs[0] = strconv.Itoa(rand.Int())
vs[1] = strconv.Itoa(rand.Int())
vs[2] = strconv.Itoa(rand.Int())
vs[3] = strconv.Itoa(rand.Int())
val := "(" + strconv.Itoa(j) + "," + strings.Join(vs[:], ",") + ")"
vals = append(vals, val)
}
return "insert into vals values " + strings.Join(vals, ",")
}
func runAutoGCTest(t *testing.T, s *AutoGCTest, numStatements int, commitEvery int) (RepoSize, RepoSize) {
// A simple auto-GC test, where we run
// operations on an auto GC server and
// ensure that the database is getting
// collected.
ctx := context.Background()
s.Setup(ctx, t)
for i := 0; i < numStatements; i++ {
stmt := autoGCInsertStatement(i)
conn, err := s.PrimaryDB.Conn(ctx)
_, err = conn.ExecContext(ctx, stmt)
require.NoError(t, err)
if i%commitEvery == 0 {
_, err = conn.ExecContext(ctx, "call dolt_commit('-am', 'insert from "+strconv.Itoa(i*1024)+"')")
require.NoError(t, err)
}
require.NoError(t, conn.Close())
}
before, err := GetRepoSize(s.PrimaryDir)
require.NoError(t, err)
conn, err := s.PrimaryDB.Conn(ctx)
require.NoError(t, err)
_, err = conn.ExecContext(ctx, "call dolt_gc('--full')")
require.NoError(t, err)
require.NoError(t, conn.Close())
after, err := GetRepoSize(s.PrimaryDir)
require.NoError(t, err)
return before, after
}
type RepoSize struct {
Journal int64
NewGen int64
NewGenC int
OldGen int64
OldGenC int
}
func GetRepoSize(dir string) (RepoSize, error) {
var ret RepoSize
entries, err := os.ReadDir(filepath.Join(dir, ".dolt/noms"))
if err != nil {
return ret, err
}
for _, e := range entries {
stat, err := e.Info()
if err != nil {
return ret, err
}
if stat.IsDir() {
continue
}
ret.NewGen += stat.Size()
ret.NewGenC += 1
if e.Name() == "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv" {
ret.Journal += stat.Size()
}
}
entries, err = os.ReadDir(filepath.Join(dir, ".dolt/noms/oldgen"))
if err != nil {
return ret, err
}
for _, e := range entries {
stat, err := e.Info()
if err != nil {
return ret, err
}
if stat.IsDir() {
continue
}
ret.OldGen += stat.Size()
ret.OldGenC += 1
}
return ret, nil
}
func (rs RepoSize) String() string {
return fmt.Sprintf("journal: %v, new gen: %v (%v files), old gen: %v (%v files)", rs.Journal, rs.NewGen, rs.NewGenC, rs.OldGen, rs.OldGenC)
}