mirror of
https://github.com/dolthub/dolt.git
synced 2026-02-08 18:58:47 -06:00
go: statspro/worker: PR feedback: Move openSessionCmds handling to execWithOptionalRateLimit.
This commit is contained in:
@@ -301,11 +301,7 @@ func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly.
|
||||
firstNodeHash := nodes[0].HashOf()
|
||||
lowerBound, ok := sc.GetBound(firstNodeHash, idxLen)
|
||||
if !ok {
|
||||
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) {
|
||||
if openSessionCmds {
|
||||
sql.SessionCommandBegin(ctx.Session)
|
||||
defer sql.SessionCommandEnd(ctx.Session)
|
||||
}
|
||||
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) {
|
||||
lowerBound, err = firstRowForIndex(ctx, idxLen, prollyMap, keyBuilder)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get histogram bucket for node; %w", err)
|
||||
@@ -324,11 +320,7 @@ func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly.
|
||||
var writes int
|
||||
var offset uint64
|
||||
for i := 0; i < len(nodes); {
|
||||
err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) {
|
||||
if openSessionCmds {
|
||||
sql.SessionCommandBegin(ctx.Session)
|
||||
defer sql.SessionCommandEnd(ctx.Session)
|
||||
}
|
||||
err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) {
|
||||
newWrites := 0
|
||||
for i < len(nodes) && newWrites < collectBatchSize {
|
||||
n := nodes[i]
|
||||
@@ -390,18 +382,20 @@ func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly.
|
||||
}
|
||||
}
|
||||
|
||||
if openSessionCmds {
|
||||
sql.SessionCommandBegin(ctx.Session)
|
||||
defer sql.SessionCommandEnd(ctx.Session)
|
||||
}
|
||||
var buckets []*stats.Bucket
|
||||
for _, n := range nodes {
|
||||
newBucket, ok, err := sc.GetBucket(ctx, n.HashOf(), keyBuilder)
|
||||
if err != nil || !ok {
|
||||
sc.descError(fmt.Sprintf("missing histogram bucket for node %s", n.HashOf().String()[:5]), err)
|
||||
return nil, nil, 0, err
|
||||
err := sc.execWithOptionalRateLimit(ctx, true /* no need to rate limit here */, openSessionCmds, func() (err error) {
|
||||
for _, n := range nodes {
|
||||
newBucket, ok, err := sc.GetBucket(ctx, n.HashOf(), keyBuilder)
|
||||
if err != nil || !ok {
|
||||
sc.descError(fmt.Sprintf("missing histogram bucket for node %s", n.HashOf().String()[:5]), err)
|
||||
return err
|
||||
}
|
||||
buckets = append(buckets, newBucket)
|
||||
}
|
||||
buckets = append(buckets, newBucket)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, 0, err
|
||||
}
|
||||
|
||||
return buckets, lowerBound, writes, nil
|
||||
@@ -409,7 +403,11 @@ func (sc *StatsController) collectIndexNodes(ctx *sql.Context, prollyMap prolly.
|
||||
|
||||
// execWithOptionalRateLimit executes the given function either directly or through the rate limiter
|
||||
// depending on the bypassRateLimit flag
|
||||
func (sc *StatsController) execWithOptionalRateLimit(ctx *sql.Context, bypassRateLimit bool, f func() error) error {
|
||||
func (sc *StatsController) execWithOptionalRateLimit(ctx *sql.Context, bypassRateLimit, openSessionCmds bool, f func() error) error {
|
||||
if openSessionCmds {
|
||||
sql.SessionCommandBegin(ctx.Session)
|
||||
defer sql.SessionCommandEnd(ctx.Session)
|
||||
}
|
||||
if bypassRateLimit {
|
||||
return f()
|
||||
}
|
||||
@@ -420,11 +418,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta
|
||||
var err error
|
||||
var sqlTable *sqle.DoltTable
|
||||
var dTab *doltdb.Table
|
||||
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) {
|
||||
if openSessionCmds {
|
||||
sql.SessionCommandBegin(ctx.Session)
|
||||
defer sql.SessionCommandEnd(ctx.Session)
|
||||
}
|
||||
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) {
|
||||
sqlTable, dTab, err = GetLatestTable(ctx, tableName, sqlDb)
|
||||
return err
|
||||
}); err != nil {
|
||||
@@ -454,11 +448,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta
|
||||
}
|
||||
|
||||
var indexes []sql.Index
|
||||
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) {
|
||||
if openSessionCmds {
|
||||
sql.SessionCommandBegin(ctx.Session)
|
||||
defer sql.SessionCommandEnd(ctx.Session)
|
||||
}
|
||||
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) {
|
||||
indexes, err = sqlTable.GetIndexes(ctx)
|
||||
return err
|
||||
}); err != nil {
|
||||
@@ -473,11 +463,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta
|
||||
var idx durable.Index
|
||||
var prollyMap prolly.Map
|
||||
var template stats.Statistic
|
||||
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) {
|
||||
if openSessionCmds {
|
||||
sql.SessionCommandBegin(ctx.Session)
|
||||
defer sql.SessionCommandEnd(ctx.Session)
|
||||
}
|
||||
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) {
|
||||
if strings.EqualFold(sqlIdx.ID(), "PRIMARY") {
|
||||
idx, err = dTab.GetRowData(ctx)
|
||||
} else {
|
||||
@@ -508,11 +494,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta
|
||||
idxLen := len(sqlIdx.Expressions())
|
||||
|
||||
var levelNodes []tree.Node
|
||||
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, func() (err error) {
|
||||
if openSessionCmds {
|
||||
sql.SessionCommandBegin(ctx.Session)
|
||||
defer sql.SessionCommandEnd(ctx.Session)
|
||||
}
|
||||
if err := sc.execWithOptionalRateLimit(ctx, bypassRateLimit, openSessionCmds, func() (err error) {
|
||||
levelNodes, err = tree.GetHistogramLevel(ctx, prollyMap.Tuples(), bucketLowCnt)
|
||||
if err != nil {
|
||||
sc.descError("get level", err)
|
||||
@@ -540,11 +522,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta
|
||||
if !gcKv.GcMark(sc.kv, levelNodes, buckets, idxLen, keyBuilder) {
|
||||
return fmt.Errorf("GC interrupted updated")
|
||||
}
|
||||
if err := func() error {
|
||||
if openSessionCmds {
|
||||
sql.SessionCommandBegin(ctx.Session)
|
||||
defer sql.SessionCommandEnd(ctx.Session)
|
||||
}
|
||||
if err := sc.execWithOptionalRateLimit(ctx, true /* no need to rate limit here */, openSessionCmds, func() error {
|
||||
schHash, _, err := sqlTable.IndexCacheKey(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -554,7 +532,7 @@ func (sc *StatsController) updateTable(ctx *sql.Context, newStats *rootStats, ta
|
||||
gcKv.PutTemplate(key, t)
|
||||
}
|
||||
return nil
|
||||
}(); err != nil {
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user