Merge pull request #9411 from dolthub/aaron/statspro-do-not-cancel-for-autogc

go: sqle/statspro: Rework the way stats manages its session in order to make it safe to continue collecting stats during a GC run.
This commit is contained in:
Aaron Son
2025-07-01 14:07:31 -07:00
committed by GitHub
5 changed files with 108 additions and 111 deletions

View File

@@ -172,7 +172,6 @@ type sessionAwareSafepointController struct {
callSession *dsess.DoltSession
origEpoch int
doltDB *doltdb.DoltDB
statsDoneCh chan struct{}
waiter *gcctx.GCSafepointWaiter
keeper func(hash.Hash) bool
@@ -194,13 +193,6 @@ func (sc *sessionAwareSafepointController) BeginGC(ctx context.Context, keeper f
}
func (sc *sessionAwareSafepointController) EstablishPreFinalizeSafepoint(ctx context.Context) error {
if sc.statsDoneCh != nil {
select {
case <-sc.statsDoneCh:
case <-ctx.Done():
return context.Cause(ctx)
}
}
return sc.waiter.Wait(ctx)
}
@@ -280,27 +272,6 @@ func RunDoltGC(ctx *sql.Context, ddb *doltdb.DoltDB, mode types.GCMode, cmp chun
var sc types.GCSafepointController
var statsDoneCh chan struct{}
statsPro := dSess.StatsProvider()
if afp, ok := statsPro.(ExtendedStatsProvider); ok {
statsDoneCh = afp.Stop()
}
if statsDoneCh != nil {
// If statsDoneCh is nil, we are assuming we are
// running in a context where stats was never
// started. Restart() here will fail, because the
// SerialQueue will not be ready to process the
// restart.
defer func() {
// We receive here as well as in the PreFinalize of the
// safepoint controller, since we want to safely
// restart stats even when GC itself does not complete
// successfully.
<-statsDoneCh
_, err := statsRestart(ctx)
if err != nil {
ctx.GetLogger().Infof("gc stats restart failed: %s", err.Error())
}
}()
}
if UseSessionAwareSafepointController {
gcSafepointController := dSess.GCSafepointController()
sc = &sessionAwareSafepointController{
@@ -308,10 +279,30 @@ func RunDoltGC(ctx *sql.Context, ddb *doltdb.DoltDB, mode types.GCMode, cmp chun
dbname: dbname,
controller: gcSafepointController,
doltDB: ddb,
statsDoneCh: statsDoneCh,
}
} else {
if afp, ok := statsPro.(ExtendedStatsProvider); ok {
statsDoneCh = afp.Stop()
}
if statsDoneCh != nil {
// If statsDoneCh is nil, we are assuming we are
// running in a context where stats was never
// started. Restart() here will fail, because the
// SerialQueue will not be ready to process the
// restart.
defer func() {
// We receive here as well as in the PreFinalize of the
// safepoint controller, since we want to safely
// restart stats even when GC itself does not complete
// successfully.
<-statsDoneCh
_, err := statsRestart(ctx)
if err != nil {
ctx.GetLogger().Infof("gc stats restart failed: %s", err.Error())
}
}()
}
// Legacy safepoint controller behavior was to not
// allow GC on a standby server. GC on a standby server
// with killConnections safepoints should be safe now,

View File

@@ -305,20 +305,7 @@ func (sc *StatsController) AnalyzeTable(ctx *sql.Context, table sql.Table, dbNam
newStats := newRootStats()
// XXX: Use a new context for this operation. |updateTable| does GC
// lifecycle callbacks on the context. |ctx| already has lifecycle
// callbacks registered because we are part of a SQL handler.
newCtx, err := sc.ctxGen(ctx.Context)
if err != nil {
return err
}
defer sql.SessionEnd(newCtx.Session)
sql.SessionCommandBegin(newCtx.Session)
defer sql.SessionCommandEnd(newCtx.Session)
newCtx.SetCurrentDatabase(ctx.GetCurrentDatabase())
err = sc.updateTable(newCtx, newStats, table.Name(), sqlDb, nil, true)
err = sc.updateTable(ctx, newStats, table.Name(), sqlDb, nil, true, false)
if err != nil {
return err
}

View File

@@ -29,7 +29,6 @@ import (
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
"github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable"
"github.com/dolthub/dolt/go/libraries/doltcore/ref"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/store/hash"
@@ -182,24 +181,28 @@ func (sc *StatsController) newStatsForRoot(baseCtx context.Context, gcKv *memSta
if err != nil {
return nil, err
}
defer sql.SessionEnd(ctx.Session)
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
dSess := dsess.DSessFromSess(ctx.Session)
dbs := dSess.Provider().AllDatabases(ctx)
newStats = newRootStats()
dSess := dsess.DSessFromSess(ctx.Session)
digest := xxhash.New()
for _, db := range dbs {
sqlDb, ok := db.(sqle.Database)
if !ok {
continue
}
var branches []ref.DoltRef
if err := sc.rateLimiter.execute(ctx, func() (err error) {
root, err := sqlDb.GetRoot(ctx)
// In a single SessionCommand we load up the schemas for all branches of all the databases we will be inspecting.
// This gets each branch root into the DoltSession dbStates, which will be retained by VisitGCRoots.
type toCollect struct {
schs []sql.DatabaseSchema
}
var toVisit []toCollect
if err := sc.rateLimiter.execute(ctx, func() error {
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
dbs := dSess.Provider().AllDatabases(ctx)
for _, db := range dbs {
sessDb, ok := db.(dsess.SqlDatabase)
if !ok {
continue
}
root, err := sessDb.GetRoot(ctx)
if err != nil {
return err
}
@@ -212,50 +215,55 @@ func (sc *StatsController) newStatsForRoot(baseCtx context.Context, gcKv *memSta
if !ok {
return fmt.Errorf("get dolt db dolt database not found %s", db.Name())
}
branches, err = ddb.GetBranches(ctx)
return err
}); err != nil {
return nil, err
}
for _, br := range branches {
// this call avoids the chunkstore
sqlDb, err := sqle.RevisionDbForBranch(ctx, db.(dsess.SqlDatabase), br.GetPath(), br.GetPath()+"/"+sqlDb.AliasedName())
branches, err := ddb.GetBranches(ctx)
if err != nil {
sc.descError("revisionForBranch", err)
continue
return err
}
var schDbs []sql.DatabaseSchema
for _, branch := range branches {
revDb, err := sqle.RevisionDbForBranch(ctx, sessDb, branch.GetPath(), branch.GetPath()+"/"+sessDb.AliasedName())
if err != nil {
sc.descError("revisionForBranch", err)
continue
}
revSchemas, err := revDb.AllSchemas(ctx)
if err != nil {
sc.descError("getDatabaseSchemas", err)
continue
}
toVisit = append(toVisit, toCollect{
schs: revSchemas,
})
}
}
return nil
}); err != nil {
return nil, err
}
for _, collect := range toVisit {
for _, sqlDb := range collect.schs {
switch sqlDb.SchemaName() {
case "dolt", sql.InformationSchemaDatabaseName, "pg_catalog":
continue
}
var tableNames []string
if err := sc.rateLimiter.execute(ctx, func() (err error) {
schDbs, err = sqlDb.AllSchemas(ctx)
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
tableNames, err = sqlDb.GetTableNames(ctx)
return err
}); err != nil {
sc.descError("getDatabaseSchemas", err)
sc.descError("getTableNames", err)
continue
}
for _, sqlDb := range schDbs {
switch sqlDb.SchemaName() {
case "dolt", sql.InformationSchemaDatabaseName, "pg_catalog":
continue
}
var tableNames []string
if err := sc.rateLimiter.execute(ctx, func() (err error) {
tableNames, err = sqlDb.GetTableNames(ctx)
return err
}); err != nil {
sc.descError("getTableNames", err)
continue
}
newStats.DbCnt++
newStats.DbCnt++
for _, tableName := range tableNames {
err = sc.updateTable(ctx, newStats, tableName, sqlDb.(dsess.SqlDatabase), gcKv, false)
if err != nil {
return nil, err
}
for _, tableName := range tableNames {
err = sc.updateTable(ctx, newStats, tableName, sqlDb.(dsess.SqlDatabase), gcKv, false, true)
if err != nil {
return nil, err
}
}
}
@@ -286,14 +294,14 @@ func (sc *StatsController) finalizeHistogram(template stats.Statistic, buckets [
return &template
}
func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly.Map, idxLen int, nodes []tree.Node, bypassRateLimit bool) ([]*stats.Bucket, sql.Row, int, error) {
func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly.Map, idxLen int, nodes []tree.Node, bypassRateLimit bool, openSessionCmds bool) ([]*stats.Bucket, sql.Row, int, error) {
updater := newBucketBuilder(sql.StatQualifier{}, idxLen, prollyMap.KeyDesc())
keyBuilder := val.NewTupleBuilder(prollyMap.KeyDesc().PrefixDesc(idxLen), prollyMap.NodeStore())
firstNodeHash := nodes[0].HashOf()
lowerBound, ok := sc.GetBound(firstNodeHash, idxLen)
if !ok {
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) {
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) {
lowerBound, err = firstRowForIndex(ctx, idxLen, prollyMap, keyBuilder)
if err != nil {
return fmt.Errorf("get histogram bucket for node; %w", err)
@@ -312,7 +320,7 @@ func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly.
var writes int
var offset uint64
for i := 0; i < len(nodes); {
err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) {
err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) {
newWrites := 0
for i < len(nodes) && newWrites < collectBatchSize {
n := nodes[i]
@@ -375,13 +383,19 @@ func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly.
}
var buckets []*stats.Bucket
for _, n := range nodes {
newBucket, ok, err := sc.GetBucket(ctx, n.HashOf(), keyBuilder)
if err != nil || !ok {
sc.descError(fmt.Sprintf("missing histogram bucket for node %s", n.HashOf().String()[:5]), err)
return nil, nil, 0, err
err := sc.execWithOptionalRateLimit(ctx, true /* no need to rate limit here */, openSessionCmds, func() (err error) {
for _, n := range nodes {
newBucket, ok, err := sc.GetBucket(ctx, n.HashOf(), keyBuilder)
if err != nil || !ok {
sc.descError(fmt.Sprintf("missing histogram bucket for node %s", n.HashOf().String()[:5]), err)
return err
}
buckets = append(buckets, newBucket)
}
buckets = append(buckets, newBucket)
return nil
})
if err != nil {
return nil, nil, 0, err
}
return buckets, lowerBound, writes, nil
@@ -389,18 +403,22 @@ func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly.
// execWithOptionalRateLimit executes the given function either directly or through the rate limiter
// depending on the bypassRateLimit flag
func (sc *StatsController) execWithOptionalRateLimit(ctx *sql.Context, bypassRateLimit bool, f func() error) error {
func (sc *StatsController) execWithOptionalRateLimit(ctx *sql.Context, bypassRateLimit, openSessionCmds bool, f func() error) error {
if openSessionCmds {
sql.SessionCommandBegin(ctx.Session)
defer sql.SessionCommandEnd(ctx.Session)
}
if bypassRateLimit {
return f()
}
return sc.rateLimiter.execute(ctx, f)
}
func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, tableName string, sqlDb dsess.SqlDatabase, gcKv *memStats, bypassRateLimit bool) error {
func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, tableName string, sqlDb dsess.SqlDatabase, gcKv *memStats, bypassRateLimit bool, openSessionCmds bool) error {
var err error
var sqlTable *sqle.DoltTable
var dTab *doltdb.Table
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) {
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) {
sqlTable, dTab, err = GetLatestTable(ctx, tableName, sqlDb)
return err
}); err != nil {
@@ -430,7 +448,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta
}
var indexes []sql.Index
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) {
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) {
indexes, err = sqlTable.GetIndexes(ctx)
return err
}); err != nil {
@@ -445,7 +463,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta
var idx durable.Index
var prollyMap prolly.Map
var template stats.Statistic
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) {
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) {
if strings.EqualFold(sqlIdx.ID(), "PRIMARY") {
idx, err = dTab.GetRowData(ctx)
} else {
@@ -476,7 +494,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta
idxLen := len(sqlIdx.Expressions())
var levelNodes []tree.Node
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) {
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) {
levelNodes, err = tree.GetHistogramLevel(ctx, prollyMap.Tuples(), bucketLowCnt)
if err != nil {
sc.descError("get level", err)
@@ -489,7 +507,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta
var firstBound sql.Row
if len(levelNodes) > 0 {
var writes int
buckets, firstBound, writes, err = sc.collectIndexNodes(ctx, prollyMap, idxLen, levelNodes, bypassRateLimit)
buckets, firstBound, writes, err = sc.collectIndexNodes(ctx, prollyMap, idxLen, levelNodes, bypassRateLimit, openSessionCmds)
if err != nil {
sc.descError("", err)
continue
@@ -504,7 +522,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta
if !gcKv.GcMark(sc.kv, levelNodes, buckets, idxLen, keyBuilder) {
return fmt.Errorf("GC interrupted updated")
}
if err := func() error {
if err := sc.execWithOptionalRateLimit(ctx, true /* no need to rate limit here */, openSessionCmds, func() error {
schHash, _, err := sqlTable.IndexCacheKey(ctx)
if err != nil {
return err
@@ -514,7 +532,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta
gcKv.PutTemplate(key, t)
}
return nil
}(); err != nil {
}); err != nil {
return err
}
}

View File

@@ -73,6 +73,7 @@ func TestScheduleLoop(t *testing.T) {
require.Equal(t, 4, len(kv.templates))
require.Equal(t, 2, len(sc.Stats.stats))
stat := sc.Stats.stats[tableIndexesKey{"mydb", "main", "ab", ""}]
require.Equal(t, 2, len(stat))
require.Equal(t, 7, len(stat[0].Hist))
require.Equal(t, 7, len(stat[1].Hist))
}

View File

@@ -36,7 +36,7 @@ func TestAutoGC(t *testing.T) {
var enabled_16, final_16, disabled, final_disabled RepoSize
numStatements, numCommits := 512, 16
if testing.Short() || os.Getenv("CI") != "" {
numStatements = 64
numStatements = 96
}
t.Run("Enable", func(t *testing.T) {
t.Parallel()