diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index 5b753eed41..1f44b2bb3c 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -172,7 +172,6 @@ type sessionAwareSafepointController struct { callSession *dsess.DoltSession origEpoch int doltDB *doltdb.DoltDB - statsDoneCh chan struct{} waiter *gcctx.GCSafepointWaiter keeper func(hash.Hash) bool @@ -194,13 +193,6 @@ func (sc *sessionAwareSafepointController) BeginGC(ctx context.Context, keeper f } func (sc *sessionAwareSafepointController) EstablishPreFinalizeSafepoint(ctx context.Context) error { - if sc.statsDoneCh != nil { - select { - case <-sc.statsDoneCh: - case <-ctx.Done(): - return context.Cause(ctx) - } - } return sc.waiter.Wait(ctx) } @@ -280,27 +272,6 @@ func RunDoltGC(ctx *sql.Context, ddb *doltdb.DoltDB, mode types.GCMode, cmp chun var sc types.GCSafepointController var statsDoneCh chan struct{} statsPro := dSess.StatsProvider() - if afp, ok := statsPro.(ExtendedStatsProvider); ok { - statsDoneCh = afp.Stop() - } - if statsDoneCh != nil { - // If statsDoneCh is nil, we are assuming we are - // running in a context where stats was never - // started. Restart() here will fail, because the - // SerialQueue will not be ready to process the - // restart. - defer func() { - // We receive here as well as in the PreFinalize of the - // safepoint controller, since we want to safely - // restart stats even when GC itself does not complete - // successfully. - <-statsDoneCh - _, err := statsRestart(ctx) - if err != nil { - ctx.GetLogger().Infof("gc stats restart failed: %s", err.Error()) - } - }() - } if UseSessionAwareSafepointController { gcSafepointController := dSess.GCSafepointController() sc = &sessionAwareSafepointController{ @@ -308,10 +279,30 @@ func RunDoltGC(ctx *sql.Context, ddb *doltdb.DoltDB, mode types.GCMode, cmp chun dbname: dbname, controller: gcSafepointController, doltDB: ddb, - statsDoneCh: statsDoneCh, } } else { + if afp, ok := statsPro.(ExtendedStatsProvider); ok { + statsDoneCh = afp.Stop() + } + if statsDoneCh != nil { + // If statsDoneCh is nil, we are assuming we are + // running in a context where stats was never + // started. Restart() here will fail, because the + // SerialQueue will not be ready to process the + // restart. + defer func() { + // We receive here as well as in the PreFinalize of the + // safepoint controller, since we want to safely + // restart stats even when GC itself does not complete + // successfully. + <-statsDoneCh + _, err := statsRestart(ctx) + if err != nil { + ctx.GetLogger().Infof("gc stats restart failed: %s", err.Error()) + } + }() + } // Legacy safepoint controller behavior was to not // allow GC on a standby server. GC on a standby server // with killConnections safepoints should be safe now, diff --git a/go/libraries/doltcore/sqle/statspro/controller.go b/go/libraries/doltcore/sqle/statspro/controller.go index 04d1c2cd55..574399a243 100644 --- a/go/libraries/doltcore/sqle/statspro/controller.go +++ b/go/libraries/doltcore/sqle/statspro/controller.go @@ -305,20 +305,7 @@ func (sc *StatsController) AnalyzeTable(ctx *sql.Context, table sql.Table, dbNam newStats := newRootStats() - // XXX: Use a new context for this operation. |updateTable| does GC - // lifecycle callbacks on the context. |ctx| already has lifecycle - // callbacks registered because we are part of a SQL handler. - newCtx, err := sc.ctxGen(ctx.Context) - if err != nil { - return err - } - - defer sql.SessionEnd(newCtx.Session) - sql.SessionCommandBegin(newCtx.Session) - defer sql.SessionCommandEnd(newCtx.Session) - - newCtx.SetCurrentDatabase(ctx.GetCurrentDatabase()) - err = sc.updateTable(newCtx, newStats, table.Name(), sqlDb, nil, true) + err = sc.updateTable(ctx, newStats, table.Name(), sqlDb, nil, true, false) if err != nil { return err } diff --git a/go/libraries/doltcore/sqle/statspro/worker.go b/go/libraries/doltcore/sqle/statspro/worker.go index 37d9cd1f2e..8b5d062ecb 100644 --- a/go/libraries/doltcore/sqle/statspro/worker.go +++ b/go/libraries/doltcore/sqle/statspro/worker.go @@ -29,7 +29,6 @@ import ( "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" "github.com/dolthub/dolt/go/libraries/doltcore/doltdb/durable" - "github.com/dolthub/dolt/go/libraries/doltcore/ref" "github.com/dolthub/dolt/go/libraries/doltcore/sqle" "github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess" "github.com/dolthub/dolt/go/store/hash" @@ -182,24 +181,28 @@ func (sc *StatsController) newStatsForRoot(baseCtx context.Context, gcKv *memSta if err != nil { return nil, err } - defer sql.SessionEnd(ctx.Session) - sql.SessionCommandBegin(ctx.Session) - defer sql.SessionCommandEnd(ctx.Session) - dSess := dsess.DSessFromSess(ctx.Session) - dbs := dSess.Provider().AllDatabases(ctx) newStats = newRootStats() + dSess := dsess.DSessFromSess(ctx.Session) digest := xxhash.New() - for _, db := range dbs { - sqlDb, ok := db.(sqle.Database) - if !ok { - continue - } - var branches []ref.DoltRef - if err := sc.rateLimiter.execute(ctx, func() (err error) { - root, err := sqlDb.GetRoot(ctx) + // In a single SessionCommand we load up the schemas for all branches of all the databases we will be inspecting. + // This gets each branch root into the DoltSession dbStates, which will be retained by VisitGCRoots. + type toCollect struct { + schs []sql.DatabaseSchema + } + var toVisit []toCollect + if err := sc.rateLimiter.execute(ctx, func() error { + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + dbs := dSess.Provider().AllDatabases(ctx) + for _, db := range dbs { + sessDb, ok := db.(dsess.SqlDatabase) + if !ok { + continue + } + root, err := sessDb.GetRoot(ctx) if err != nil { return err } @@ -212,50 +215,55 @@ func (sc *StatsController) newStatsForRoot(baseCtx context.Context, gcKv *memSta if !ok { return fmt.Errorf("get dolt db dolt database not found %s", db.Name()) } - branches, err = ddb.GetBranches(ctx) - return err - }); err != nil { - return nil, err - } - - for _, br := range branches { - // this call avoids the chunkstore - sqlDb, err := sqle.RevisionDbForBranch(ctx, db.(dsess.SqlDatabase), br.GetPath(), br.GetPath()+"/"+sqlDb.AliasedName()) + branches, err := ddb.GetBranches(ctx) if err != nil { - sc.descError("revisionForBranch", err) - continue + return err } - var schDbs []sql.DatabaseSchema + for _, branch := range branches { + revDb, err := sqle.RevisionDbForBranch(ctx, sessDb, branch.GetPath(), branch.GetPath()+"/"+sessDb.AliasedName()) + if err != nil { + sc.descError("revisionForBranch", err) + continue + } + revSchemas, err := revDb.AllSchemas(ctx) + if err != nil { + sc.descError("getDatabaseSchemas", err) + continue + } + toVisit = append(toVisit, toCollect{ + schs: revSchemas, + }) + } + } + return nil + }); err != nil { + return nil, err + } + + for _, collect := range toVisit { + for _, sqlDb := range collect.schs { + switch sqlDb.SchemaName() { + case "dolt", sql.InformationSchemaDatabaseName, "pg_catalog": + continue + } + var tableNames []string if err := sc.rateLimiter.execute(ctx, func() (err error) { - schDbs, err = sqlDb.AllSchemas(ctx) + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + tableNames, err = sqlDb.GetTableNames(ctx) return err }); err != nil { - sc.descError("getDatabaseSchemas", err) + sc.descError("getTableNames", err) continue } - for _, sqlDb := range schDbs { - switch sqlDb.SchemaName() { - case "dolt", sql.InformationSchemaDatabaseName, "pg_catalog": - continue - } - var tableNames []string - if err := sc.rateLimiter.execute(ctx, func() (err error) { - tableNames, err = sqlDb.GetTableNames(ctx) - return err - }); err != nil { - sc.descError("getTableNames", err) - continue - } + newStats.DbCnt++ - newStats.DbCnt++ - - for _, tableName := range tableNames { - err = sc.updateTable(ctx, newStats, tableName, sqlDb.(dsess.SqlDatabase), gcKv, false) - if err != nil { - return nil, err - } + for _, tableName := range tableNames { + err = sc.updateTable(ctx, newStats, tableName, sqlDb.(dsess.SqlDatabase), gcKv, false, true) + if err != nil { + return nil, err } } } @@ -286,14 +294,14 @@ func (sc *StatsController) finalizeHistogram(template stats.Statistic, buckets [ return &template } -func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly.Map, idxLen int, nodes []tree.Node, bypassRateLimit bool) ([]*stats.Bucket, sql.Row, int, error) { +func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly.Map, idxLen int, nodes []tree.Node, bypassRateLimit bool, openSessionCmds bool) ([]*stats.Bucket, sql.Row, int, error) { updater := newBucketBuilder(sql.StatQualifier{}, idxLen, prollyMap.KeyDesc()) keyBuilder := val.NewTupleBuilder(prollyMap.KeyDesc().PrefixDesc(idxLen), prollyMap.NodeStore()) firstNodeHash := nodes[0].HashOf() lowerBound, ok := sc.GetBound(firstNodeHash, idxLen) if !ok { - if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { + if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) { lowerBound, err = firstRowForIndex(ctx, idxLen, prollyMap, keyBuilder) if err != nil { return fmt.Errorf("get histogram bucket for node; %w", err) @@ -312,7 +320,7 @@ func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly. var writes int var offset uint64 for i := 0; i < len(nodes); { - err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { + err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) { newWrites := 0 for i < len(nodes) && newWrites < collectBatchSize { n := nodes[i] @@ -375,13 +383,19 @@ func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly. } var buckets []*stats.Bucket - for _, n := range nodes { - newBucket, ok, err := sc.GetBucket(ctx, n.HashOf(), keyBuilder) - if err != nil || !ok { - sc.descError(fmt.Sprintf("missing histogram bucket for node %s", n.HashOf().String()[:5]), err) - return nil, nil, 0, err + err := sc.execWithOptionalRateLimit(ctx, true /* no need to rate limit here */, openSessionCmds, func() (err error) { + for _, n := range nodes { + newBucket, ok, err := sc.GetBucket(ctx, n.HashOf(), keyBuilder) + if err != nil || !ok { + sc.descError(fmt.Sprintf("missing histogram bucket for node %s", n.HashOf().String()[:5]), err) + return err + } + buckets = append(buckets, newBucket) } - buckets = append(buckets, newBucket) + return nil + }) + if err != nil { + return nil, nil, 0, err } return buckets, lowerBound, writes, nil @@ -389,18 +403,22 @@ func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly. // execWithOptionalRateLimit executes the given function either directly or through the rate limiter // depending on the bypassRateLimit flag -func (sc *StatsController) execWithOptionalRateLimit(ctx *sql.Context, bypassRateLimit bool, f func() error) error { +func (sc *StatsController) execWithOptionalRateLimit(ctx *sql.Context, bypassRateLimit, openSessionCmds bool, f func() error) error { + if openSessionCmds { + sql.SessionCommandBegin(ctx.Session) + defer sql.SessionCommandEnd(ctx.Session) + } if bypassRateLimit { return f() } return sc.rateLimiter.execute(ctx, f) } -func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, tableName string, sqlDb dsess.SqlDatabase, gcKv *memStats, bypassRateLimit bool) error { +func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, tableName string, sqlDb dsess.SqlDatabase, gcKv *memStats, bypassRateLimit bool, openSessionCmds bool) error { var err error var sqlTable *sqle.DoltTable var dTab *doltdb.Table - if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { + if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) { sqlTable, dTab, err = GetLatestTable(ctx, tableName, sqlDb) return err }); err != nil { @@ -430,7 +448,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta } var indexes []sql.Index - if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { + if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) { indexes, err = sqlTable.GetIndexes(ctx) return err }); err != nil { @@ -445,7 +463,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta var idx durable.Index var prollyMap prolly.Map var template stats.Statistic - if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { + if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) { if strings.EqualFold(sqlIdx.ID(), "PRIMARY") { idx, err = dTab.GetRowData(ctx) } else { @@ -476,7 +494,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta idxLen := len(sqlIdx.Expressions()) var levelNodes []tree.Node - if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) { + if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) { levelNodes, err = tree.GetHistogramLevel(ctx, prollyMap.Tuples(), bucketLowCnt) if err != nil { sc.descError("get level", err) @@ -489,7 +507,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta var firstBound sql.Row if len(levelNodes) > 0 { var writes int - buckets, firstBound, writes, err = sc.collectIndexNodes(ctx, prollyMap, idxLen, levelNodes, bypassRateLimit) + buckets, firstBound, writes, err = sc.collectIndexNodes(ctx, prollyMap, idxLen, levelNodes, bypassRateLimit, openSessionCmds) if err != nil { sc.descError("", err) continue @@ -504,7 +522,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta if !gcKv.GcMark(sc.kv, levelNodes, buckets, idxLen, keyBuilder) { return fmt.Errorf("GC interrupted updated") } - if err := func() error { + if err := sc.execWithOptionalRateLimit(ctx, true /* no need to rate limit here */, openSessionCmds, func() error { schHash, _, err := sqlTable.IndexCacheKey(ctx) if err != nil { return err @@ -514,7 +532,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta gcKv.PutTemplate(key, t) } return nil - }(); err != nil { + }); err != nil { return err } } diff --git a/go/libraries/doltcore/sqle/statspro/worker_test.go b/go/libraries/doltcore/sqle/statspro/worker_test.go index 54be934765..d81a6e75e5 100644 --- a/go/libraries/doltcore/sqle/statspro/worker_test.go +++ b/go/libraries/doltcore/sqle/statspro/worker_test.go @@ -73,6 +73,7 @@ func TestScheduleLoop(t *testing.T) { require.Equal(t, 4, len(kv.templates)) require.Equal(t, 2, len(sc.Stats.stats)) stat := sc.Stats.stats[tableIndexesKey{"mydb", "main", "ab", ""}] + require.Equal(t, 2, len(stat)) require.Equal(t, 7, len(stat[0].Hist)) require.Equal(t, 7, len(stat[1].Hist)) } diff --git a/integration-tests/go-sql-server-driver/auto_gc_test.go b/integration-tests/go-sql-server-driver/auto_gc_test.go index 8938c527e6..1bf699430b 100644 --- a/integration-tests/go-sql-server-driver/auto_gc_test.go +++ b/integration-tests/go-sql-server-driver/auto_gc_test.go @@ -36,7 +36,7 @@ func TestAutoGC(t *testing.T) { var enabled_16, final_16, disabled, final_disabled RepoSize numStatements, numCommits := 512, 16 if testing.Short() || os.Getenv("CI") != "" { - numStatements = 64 + numStatements = 96 } t.Run("Enable", func(t *testing.T) { t.Parallel()