From 8df274314bdcf078c8e8a0a7cba39e46d40a3f29 Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Fri, 27 Jun 2025 14:29:52 -0700 Subject: [PATCH 1/3] go: sqle/statspro: Rework the way stats manages its session in order to make it safe to continue collecting stats during a GC run. Previously GC, even with the session-aware safepoint controller, such as is used with Auto GC, would cancel any ongoing stats work and would restart it at the end of the run. Because Auto GC can run quite frequently and because stats can take a while to run to completion, this meant that on some workloads stats would never successfully populate. This PR changes stats so that it more correctly integrates with the session-safe GC safepoint controller. That allows GC to ensure that the stuff stats is currently working on gets carried over to the collected view of database, and stats itself has a chance to run to completion regardless of what GC work is going on. This PR leaves the cancel-stats-on-collect behavior enabled for the kill-connections safepoint controller, where that behavior is still the correct one. --- .../doltcore/sqle/dprocedures/dolt_gc.go | 51 +++---- .../doltcore/sqle/statspro/controller.go | 15 +- go/libraries/doltcore/sqle/statspro/worker.go | 142 +++++++++++------- .../doltcore/sqle/statspro/worker_test.go | 1 + .../go-sql-server-driver/auto_gc_test.go | 2 +- 5 files changed, 115 insertions(+), 96 deletions(-) diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index 5b753eed41..1f44b2bb3c 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -172,7 +172,6 @@ type sessionAwareSafepointController struct { callSession *dsess.DoltSession origEpoch int doltDB *doltdb.DoltDB - statsDoneCh chan struct{} waiter *gcctx.GCSafepointWaiter keeper func(hash.Hash) bool @@ -194,13 +193,6 @@ func (sc *sessionAwareSafepointController) BeginGC(ctx context.Context, keeper f } func (sc *sessionAwareSafepointController) EstablishPreFinalizeSafepoint(ctx context.Context) error { - if sc.statsDoneCh != nil { - select { - case <-sc.statsDoneCh: - case <-ctx.Done(): - return context.Cause(ctx) - } - } return sc.waiter.Wait(ctx) } @@ -280,27 +272,6 @@ func RunDoltGC(ctx *sql.Context, ddb *doltdb.DoltDB, mode types.GCMode, cmp chun var sc types.GCSafepointController var statsDoneCh chan struct{} statsPro := dSess.StatsProvider() - if afp, ok := statsPro.(ExtendedStatsProvider); ok { - statsDoneCh = afp.Stop() - } - if statsDoneCh != nil { - // If statsDoneCh is nil, we are assuming we are - // running in a context where stats was never - // started. Restart() here will fail, because the - // SerialQueue will not be ready to process the - // restart. - defer func() { - // We receive here as well as in the PreFinalize of the - // safepoint controller, since we want to safely - // restart stats even when GC itself does not complete - // successfully. - <-statsDoneCh - _, err := statsRestart(ctx) - if err != nil { - ctx.GetLogger().Infof("gc stats restart failed: %s", err.Error()) - } - }() - } if UseSessionAwareSafepointController { gcSafepointController := dSess.GCSafepointController() sc = &sessionAwareSafepointController{ @@ -308,10 +279,30 @@ func RunDoltGC(ctx *sql.Context, ddb *doltdb.DoltDB, mode types.GCMode, cmp chun dbname: dbname, controller: gcSafepointController, doltDB: ddb, - statsDoneCh: statsDoneCh, } } else { + if afp, ok := statsPro.(ExtendedStatsProvider); ok { + statsDoneCh = afp.Stop() + } + if statsDoneCh != nil { + // If statsDoneCh is nil, we are assuming we are + // running in a context where stats was never + // started. Restart() here will fail, because the + // SerialQueue will not be ready to process the + // restart. + defer func() { + // We receive here as well as in the PreFinalize of the + // safepoint controller, since we want to safely + // restart stats even when GC itself does not complete + // successfully. + <-statsDoneCh + _, err := statsRestart(ctx) + if err != nil { + ctx.GetLogger().Infof("gc stats restart failed: %s", err.Error()) + } + }() + } // Legacy safepoint controller behavior was to not // allow GC on a standby server. GC on a standby server // with killConnections safepoints should be safe now, diff --git a/go/libraries/doltcore/sqle/statspro/controller.go b/go/libraries/doltcore/sqle/statspro/controller.go index 04d1c2cd55..574399a243 100644 --- a/go/libraries/doltcore/sqle/statspro/controller.go +++ b/go/libraries/doltcore/sqle/statspro/controller.go @@ -305,20 +305,7 @@ func (sc *StatsController) AnalyzeTable(ctx *sql.Context, table sql.Table, dbNam newStats := newRootStats() - // XXX: Use a new context for this operation. |updateTable| does GC - // lifecycle callbacks on the context. |ctx| already has lifecycle - // callbacks registered because we are part of a SQL handler. - newCtx, err := sc.ctxGen(ctx.Context) - if err != nil { - return err - } - - defer sql.SessionEnd(newCtx.Session) - sql.SessionCommandBegin(newCtx.Session) - defer sql.SessionCommandEnd(newCtx.Session) - - newCtx.SetCurrentDatabase(ctx.GetCurrentDatabase()) - err = sc.updateTable(newCtx, newStats, table.Name(), sqlDb, nil, true) + err = sc.updateTable(ctx, newStats, table.Name(), sqlDb, nil, true, false) if err != nil { return err } diff --git a/go/libraries/doltcore/sqle/statspro/worker.go b/go/libraries/doltcore/sqle/statspro/worker.go index 37d9cd1f2e..e3fd7ce21c 100644 --- a/go/libraries/doltcore/sqle/statspro/worker.go +++ b/go/libraries/doltcore/sqle/statspro/worker.go @@ -29,7 +29,6 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable" - "github.com/dolthub/dolt/go/libraries/doltcore/ref" "github.com/dolthub/dolt/go/libraries/doltcore/sqle" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" "github.com/dolthub/dolt/go/store/hash" @@ -182,24 +181,28 @@ func (sc *StatsController) newStatsForRoot(baseCtx context.Context, gcKv *memSta if err != nil { return nil, err } - defer sql.SessionEnd(ctx.Session) - sql.SessionCommandBegin(ctx.Session) - defer sql.SessionCommandEnd(ctx.Session) - dSess := dsess.DSessFromSess(ctx.Session) - dbs := dSess.Provider().AllDatabases(ctx) newStats = newRootStats() + dSess := dsess.DSessFromSess(ctx.Session) digest := xxhash.New() - for _, db := range dbs { - sqlDb, ok := db.(sqle.Database) - if !ok { - continue - } - var branches []ref.DoltRef - if err := sc.rateLimiter.execute(ctx, func() (err error) { - root, err := sqlDb.GetRoot(ctx) + // In a single SessionCommand we load up the schemas for all branches of all the databases we will be inspecting. + // This gets each branch root into the DoltSession dbStates, which will be retained by VisitGCRoots. + type toCollect struct { + schs []sql.DatabaseSchema + } + var toVisit []toCollect + if err := sc.rateLimiter.execute(ctx, func() error { + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + dbs := dSess.Provider().AllDatabases(ctx) + for _, db := range dbs { + sessDb, ok := db.(dsess.SqlDatabase) + if !ok { + continue + } + root, err := sessDb.GetRoot(ctx) if err != nil { return err } @@ -212,50 +215,55 @@ func (sc *StatsController) newStatsForRoot(baseCtx context.Context, gcKv *memSta if !ok { return fmt.Errorf("get dolt db dolt database not found %s", db.Name()) } - branches, err = ddb.GetBranches(ctx) - return err - }); err != nil { - return nil, err - } - - for _, br := range branches { - // this call avoids the chunkstore - sqlDb, err := sqle.RevisionDbForBranch(ctx, db.(dsess.SqlDatabase), br.GetPath(), br.GetPath()+"/"+sqlDb.AliasedName()) + branches, err := ddb.GetBranches(ctx) if err != nil { - sc.descError("revisionForBranch", err) - continue + return err } - var schDbs []sql.DatabaseSchema + for _, branch := range branches { + revDb, err := sqle.RevisionDbForBranch(ctx, sessDb, branch.GetPath(), branch.GetPath()+"/"+sessDb.AliasedName()) + if err != nil { + sc.descError("revisionForBranch", err) + continue + } + revSchemas, err := revDb.AllSchemas(ctx) + if err != nil { + sc.descError("getDatabaseSchemas", err) + continue + } + toVisit = append(toVisit, toCollect{ + schs: revSchemas, + }) + } + } + return nil + }); err != nil { + return nil, err + } + + for _, collect := range toVisit { + for _, sqlDb := range collect.schs { + switch sqlDb.SchemaName() { + case "dolt", sql.InformationSchemaDatabaseName, "pg_catalog": + continue + } + var tableNames []string if err := sc.rateLimiter.execute(ctx, func() (err error) { - schDbs, err = sqlDb.AllSchemas(ctx) + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + tableNames, err = sqlDb.GetTableNames(ctx) return err }); err != nil { - sc.descError("getDatabaseSchemas", err) + sc.descError("getTableNames", err) continue } - for _, sqlDb := range schDbs { - switch sqlDb.SchemaName() { - case "dolt", sql.InformationSchemaDatabaseName, "pg_catalog": - continue - } - var tableNames []string - if err := sc.rateLimiter.execute(ctx, func() (err error) { - tableNames, err = sqlDb.GetTableNames(ctx) - return err - }); err != nil { - sc.descError("getTableNames", err) - continue - } + newStats.DbCnt++ - newStats.DbCnt++ - - for _, tableName := range tableNames { - err = sc.updateTable(ctx, newStats, tableName, sqlDb.(dsess.SqlDatabase), gcKv, false) - if err != nil { - return nil, err - } + for _, tableName := range tableNames { + err = sc.updateTable(ctx, newStats, tableName, sqlDb.(dsess.SqlDatabase), gcKv, false, true) + if err != nil { + return nil, err } } } @@ -286,7 +294,7 @@ func (sc *StatsController) finalizeHistogram(template stats.Statistic, buckets [ return &template } -func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly.Map, idxLen int, nodes []tree.Node, bypassRateLimit bool) ([]*stats.Bucket, sql.Row, int, error) { +func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly.Map, idxLen int, nodes []tree.Node, bypassRateLimit bool, openSessionCmds bool) ([]*stats.Bucket, sql.Row, int, error) { updater := newBucketBuilder(sql.StatQualifier{}, idxLen, prollyMap.KeyDesc()) keyBuilder := val.NewTupleBuilder(prollyMap.KeyDesc().PrefixDesc(idxLen), prollyMap.NodeStore()) @@ -294,6 +302,10 @@ func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly. lowerBound, ok := sc.GetBound(firstNodeHash, idxLen) if !ok { if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { + if openSessionCmds { + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + } lowerBound, err = firstRowForIndex(ctx, idxLen, prollyMap, keyBuilder) if err != nil { return fmt.Errorf("get histogram bucket for node; %w", err) @@ -313,6 +325,10 @@ func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly. var offset uint64 for i := 0; i < len(nodes); { err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { + if openSessionCmds { + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + } newWrites := 0 for i < len(nodes) && newWrites < collectBatchSize { n := nodes[i] @@ -374,6 +390,10 @@ func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly. } } + if openSessionCmds { + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + } var buckets []*stats.Bucket for _, n := range nodes { newBucket, ok, err := sc.GetBucket(ctx, n.HashOf(), keyBuilder) @@ -396,11 +416,15 @@ func (sc *StatsController) execWithOptionalRateLimit(ctx *sql.Context, bypassRat return sc.rateLimiter.execute(ctx, f) } -func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, tableName string, sqlDb dsess.SqlDatabase, gcKv *memStats, bypassRateLimit bool) error { +func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, tableName string, sqlDb dsess.SqlDatabase, gcKv *memStats, bypassRateLimit bool, openSessionCmds bool) error { var err error var sqlTable *sqle.DoltTable var dTab *doltdb.Table if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { + if openSessionCmds { + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + } sqlTable, dTab, err = GetLatestTable(ctx, tableName, sqlDb) return err }); err != nil { @@ -431,6 +455,10 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta var indexes []sql.Index if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { + if openSessionCmds { + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + } indexes, err = sqlTable.GetIndexes(ctx) return err }); err != nil { @@ -446,6 +474,10 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta var prollyMap prolly.Map var template stats.Statistic if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { + if openSessionCmds { + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + } if strings.EqualFold(sqlIdx.ID(), "PRIMARY") { idx, err = dTab.GetRowData(ctx) } else { @@ -477,6 +509,10 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta var levelNodes []tree.Node if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { + if openSessionCmds { + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + } levelNodes, err = tree.GetHistogramLevel(ctx, prollyMap.Tuples(), bucketLowCnt) if err != nil { sc.descError("get level", err) @@ -489,7 +525,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta var firstBound sql.Row if len(levelNodes) > 0 { var writes int - buckets, firstBound, writes, err = sc.collectIndexNodes(ctx, prollyMap, idxLen, levelNodes, bypassRateLimit) + buckets, firstBound, writes, err = sc.collectIndexNodes(ctx, prollyMap, idxLen, levelNodes, bypassRateLimit, openSessionCmds) if err != nil { sc.descError("", err) continue @@ -505,6 +541,10 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta return fmt.Errorf("GC interrupted updated") } if err := func() error { + if openSessionCmds { + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + } schHash, _, err := sqlTable.IndexCacheKey(ctx) if err != nil { return err diff --git a/go/libraries/doltcore/sqle/statspro/worker_test.go b/go/libraries/doltcore/sqle/statspro/worker_test.go index 54be934765..d81a6e75e5 100644 --- a/go/libraries/doltcore/sqle/statspro/worker_test.go +++ b/go/libraries/doltcore/sqle/statspro/worker_test.go @@ -73,6 +73,7 @@ func TestScheduleLoop(t *testing.T) { require.Equal(t, 4, len(kv.templates)) require.Equal(t, 2, len(sc.Stats.stats)) stat := sc.Stats.stats[tableIndexesKey{"mydb", "main", "ab", ""}] + require.Equal(t, 2, len(stat)) require.Equal(t, 7, len(stat[0].Hist)) require.Equal(t, 7, len(stat[1].Hist)) } diff --git a/integration-tests/go-sql-server-driver/auto_gc_test.go b/integration-tests/go-sql-server-driver/auto_gc_test.go index 8938c527e6..1bf699430b 100644 --- a/integration-tests/go-sql-server-driver/auto_gc_test.go +++ b/integration-tests/go-sql-server-driver/auto_gc_test.go @@ -36,7 +36,7 @@ func TestAutoGC(t *testing.T) { var enabled_16, final_16, disabled, final_disabled RepoSize numStatements, numCommits := 512, 16 if testing.Short() || os.Getenv("CI") != "" { - numStatements = 64 + numStatements = 96 } t.Run("Enable", func(t *testing.T) { t.Parallel() From 752040140f145d58f57ef99a281385b1c50a39d0 Mon Sep 17 00:00:00 2001 From: reltuk Date: Fri, 27 Jun 2025 21:47:18 +0000 Subject: [PATCH 2/3] [ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/update.sh --- go/libraries/doltcore/sqle/statspro/worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/libraries/doltcore/sqle/statspro/worker.go b/go/libraries/doltcore/sqle/statspro/worker.go index e3fd7ce21c..b80912ac97 100644 --- a/go/libraries/doltcore/sqle/statspro/worker.go +++ b/go/libraries/doltcore/sqle/statspro/worker.go @@ -190,7 +190,7 @@ func (sc *StatsController) newStatsForRoot(baseCtx context.Context, gcKv *memSta // In a single SessionCommand we load up the schemas for all branches of all the databases we will be inspecting. // This gets each branch root into the DoltSession dbStates, which will be retained by VisitGCRoots. type toCollect struct { - schs []sql.DatabaseSchema + schs []sql.DatabaseSchema } var toVisit []toCollect if err := sc.rateLimiter.execute(ctx, func() error { From e4aa285d39d9ea3add605ea0d1da3eee894121be Mon Sep 17 00:00:00 2001 From: Aaron Son Date: Tue, 1 Jul 2025 12:25:26 -0700 Subject: [PATCH 3/3] go: statspro/worker: PR feedback: Move openSessionCmds handling to execWithOptionalRateLimit. --- go/libraries/doltcore/sqle/statspro/worker.go | 72 +++++++------------ 1 file changed, 25 insertions(+), 47 deletions(-) diff --git a/go/libraries/doltcore/sqle/statspro/worker.go b/go/libraries/doltcore/sqle/statspro/worker.go index b80912ac97..8b5d062ecb 100644 --- a/go/libraries/doltcore/sqle/statspro/worker.go +++ b/go/libraries/doltcore/sqle/statspro/worker.go @@ -301,11 +301,7 @@ func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly. firstNodeHash := nodes[0].HashOf() lowerBound, ok := sc.GetBound(firstNodeHash, idxLen) if !ok { - if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { - if openSessionCmds { - sql.SessionCommandBegin(ctx.Session) - defer sql.SessionCommandEnd(ctx.Session) - } + if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) { lowerBound, err = firstRowForIndex(ctx, idxLen, prollyMap, keyBuilder) if err != nil { return fmt.Errorf("get histogram bucket for node; %w", err) @@ -324,11 +320,7 @@ func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly. var writes int var offset uint64 for i := 0; i < len(nodes); { - err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { - if openSessionCmds { - sql.SessionCommandBegin(ctx.Session) - defer sql.SessionCommandEnd(ctx.Session) - } + err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) { newWrites := 0 for i < len(nodes) && newWrites < collectBatchSize { n := nodes[i] @@ -390,18 +382,20 @@ func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly. } } - if openSessionCmds { - sql.SessionCommandBegin(ctx.Session) - defer sql.SessionCommandEnd(ctx.Session) - } var buckets []*stats.Bucket - for _, n := range nodes { - newBucket, ok, err := sc.GetBucket(ctx, n.HashOf(), keyBuilder) - if err != nil || !ok { - sc.descError(fmt.Sprintf("missing histogram bucket for node %s", n.HashOf().String()[:5]), err) - return nil, nil, 0, err + err := sc.execWithOptionalRateLimit(ctx, true /* no need to rate limit here */, openSessionCmds, func() (err error) { + for _, n := range nodes { + newBucket, ok, err := sc.GetBucket(ctx, n.HashOf(), keyBuilder) + if err != nil || !ok { + sc.descError(fmt.Sprintf("missing histogram bucket for node %s", n.HashOf().String()[:5]), err) + return err + } + buckets = append(buckets, newBucket) } - buckets = append(buckets, newBucket) + return nil + }) + if err != nil { + return nil, nil, 0, err } return buckets, lowerBound, writes, nil @@ -409,7 +403,11 @@ func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly. // execWithOptionalRateLimit executes the given function either directly or through the rate limiter // depending on the bypassRateLimit flag -func (sc *StatsController) execWithOptionalRateLimit(ctx *sql.Context, bypassRateLimit bool, f func() error) error { +func (sc *StatsController) execWithOptionalRateLimit(ctx *sql.Context, bypassRateLimit, openSessionCmds bool, f func() error) error { + if openSessionCmds { + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + } if bypassRateLimit { return f() } @@ -420,11 +418,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta var err error var sqlTable *sqle.DoltTable var dTab *doltdb.Table - if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { - if openSessionCmds { - sql.SessionCommandBegin(ctx.Session) - defer sql.SessionCommandEnd(ctx.Session) - } + if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) { sqlTable, dTab, err = GetLatestTable(ctx, tableName, sqlDb) return err }); err != nil { @@ -454,11 +448,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta } var indexes []sql.Index - if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { - if openSessionCmds { - sql.SessionCommandBegin(ctx.Session) - defer sql.SessionCommandEnd(ctx.Session) - } + if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) { indexes, err = sqlTable.GetIndexes(ctx) return err }); err != nil { @@ -473,11 +463,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta var idx durable.Index var prollyMap prolly.Map var template stats.Statistic - if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { - if openSessionCmds { - sql.SessionCommandBegin(ctx.Session) - defer sql.SessionCommandEnd(ctx.Session) - } + if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) { if strings.EqualFold(sqlIdx.ID(), "PRIMARY") { idx, err = dTab.GetRowData(ctx) } else { @@ -508,11 +494,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta idxLen := len(sqlIdx.Expressions()) var levelNodes []tree.Node - if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { - if openSessionCmds { - sql.SessionCommandBegin(ctx.Session) - defer sql.SessionCommandEnd(ctx.Session) - } + if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) { levelNodes, err = tree.GetHistogramLevel(ctx, prollyMap.Tuples(), bucketLowCnt) if err != nil { sc.descError("get level", err) @@ -540,11 +522,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta if !gcKv.GcMark(sc.kv, levelNodes, buckets, idxLen, keyBuilder) { return fmt.Errorf("GC interrupted updated") } - if err := func() error { - if openSessionCmds { - sql.SessionCommandBegin(ctx.Session) - defer sql.SessionCommandEnd(ctx.Session) - } + if err := sc.execWithOptionalRateLimit(ctx, true /* no need to rate limit here */, openSessionCmds, func() error { schHash, _, err := sqlTable.IndexCacheKey(ctx) if err != nil { return err @@ -554,7 +532,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta gcKv.PutTemplate(key, t) } return nil - }(); err != nil { + }); err != nil { return err } }