Files
hatchet/internal/operation/interval.go
matt 058968c06b Refactor: Attempt II at removing pgtype.UUID everywhere + convert string UUIDs into uuid.UUID (#2894)
* fix: add type override in sqlc.yaml

* chore: gen sqlc

* chore: big find and replace

* chore: more

* fix: clean up bunch of outdated `.Valid` refs

* refactor: remove `sqlchelpers.uuidFromStr()` in favor of `uuid.MustParse()`

* refactor: remove uuidToStr

* fix: lint

* fix: use pointers for null uuids

* chore: clean up more null pointers

* chore: clean up a bunch more

* fix: couple more

* fix: some types on the api

* fix: incorrectly non-null param

* fix: more nullable params

* fix: more refs

* refactor: start replacing tenant id strings with uuids

* refactor: more tenant id uuid casting

* refactor: fix a bunch more

* refactor: more

* refactor: more

* refactor: is that all of them?!

* fix: panic

* fix: rm scans

* fix: unwind some broken things

* chore: tests

* fix: rebase issues

* fix: more tests

* fix: nil checks

* Refactor: Make all UUIDs into `uuid.UUID` (#2897)

* refactor: remove a bunch more string uuids

* refactor: pointers and lists

* refactor: fix all the refs

* refactor: fix a few more

* fix: config loader

* fix: revert some changes

* fix: tests

* fix: test

* chore: proto

* fix: durable listener

* fix: some more string types

* fix: python health worker sleep

* fix: remove a bunch of `MustParse`s from the various gRPC servers

* fix: rm more uuid.MustParse calls

* fix: rm mustparse from api

* fix: test

* fix: merge issues

* fix: handle a bunch more uses of `MustParse` everywhere

* fix: nil id for worker label

* fix: more casting in the oss

* fix: more id parsing

* fix: stringify jwt opt

* fix: couple more bugs in untyped calls

* fix: more types

* fix: broken test

* refactor: implement `GetKeyUuid`

* chore: regen sqlc

* chore: replace pgtype.UUID again

* fix: bunch more type errors

* fix: panic
2026-02-03 11:02:59 -05:00

184 lines
4.4 KiB
Go

package operation
import (
"context"
"fmt"
"math/rand"
"sync"
"time"
"github.com/google/uuid"
"github.com/rs/zerolog"
v1 "github.com/hatchet-dev/hatchet/pkg/repository"
)
const (
gaugeInterval = time.Second * 5
)
// IntervalGauge is a function that determines whether or not to increase or reset the interval.
// If the returned integer is >0, the interval is reset to the start interval. If 0, the no-rows count is increased,
// and if it exceeds the incBackoffCount, the interval is doubled.
type IntervalGauge func(ctx context.Context, resourceId string) (int, error)
type Interval struct {
l *zerolog.Logger
repo v1.IntervalSettingsRepository
gauge IntervalGauge
operationId string
resourceId string // tenant ID, queue name, etc.
maxJitter time.Duration
startInterval time.Duration
currInterval time.Duration
maxInterval time.Duration
noActivityCount int
incBackoffCount int
intervalMu sync.RWMutex
}
func NewInterval(
l *zerolog.Logger,
repo v1.IntervalSettingsRepository,
operationId, resourceId string,
maxJitter, startInterval, maxInterval time.Duration,
incBackoffCount int,
gauge IntervalGauge,
) *Interval {
if maxInterval < 0 {
maxInterval = time.Minute
}
// read the current interval from the database
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
currInterval, err := repo.ReadInterval(ctx, operationId, uuid.MustParse(resourceId))
if err != nil {
l.Error().Err(err).Msg(fmt.Sprintf("error reading interval for resource %s, defaulting to start interval", resourceId))
currInterval = 0
}
if currInterval < startInterval {
currInterval = startInterval
}
if currInterval > maxInterval {
currInterval = maxInterval
}
return &Interval{
l: l,
repo: repo,
operationId: operationId,
resourceId: resourceId,
maxJitter: maxJitter,
startInterval: startInterval,
currInterval: currInterval,
maxInterval: maxInterval,
noActivityCount: 0,
incBackoffCount: incBackoffCount,
gauge: gauge,
}
}
// runInterval sends a struct{} on the returned channel at the configured interval,
// and exits when the context is cancelled.
func (i *Interval) RunInterval(ctx context.Context) <-chan struct{} {
res := make(chan struct{})
// run the gauge at a regular interval to adjust the current interval if needed
if i.gauge != nil {
go func() {
ticker := time.NewTicker(gaugeInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// call the gauge function to get the number of rows modified
rowsModified, err := i.gauge(ctx, i.resourceId)
if err != nil {
i.l.Error().Err(err).Msg(fmt.Sprintf("error calling interval gauge for resource %s", i.resourceId))
} else {
i.SetIntervalGauge(rowsModified)
}
}
}
}()
}
go func() {
trigger := i.getNextTrigger()
for {
select {
case <-ctx.Done():
return
case <-trigger:
res <- struct{}{}
trigger = i.getNextTrigger()
}
}
}()
return res
}
// gets the next trigger time, applying jitter if configured.
func (i *Interval) getNextTrigger() <-chan time.Time {
i.intervalMu.RLock()
defer i.intervalMu.RUnlock()
return time.After(i.currInterval + safeRandomDuration(i.maxJitter))
}
func safeRandomDuration(maxJitter time.Duration) time.Duration {
if maxJitter <= 0 {
return 0
}
return time.Duration(rand.Int63n(int64(maxJitter))) // nolint: gosec
}
func (i *Interval) SetIntervalGauge(rowsModified int) {
i.intervalMu.Lock()
defer i.intervalMu.Unlock()
previousInterval := i.currInterval
if rowsModified > 0 {
i.currInterval = i.startInterval
i.noActivityCount = 0
} else {
i.noActivityCount++
if i.noActivityCount >= i.incBackoffCount {
i.currInterval *= 2
i.noActivityCount = 0
}
}
if i.currInterval > i.maxInterval {
i.currInterval = i.maxInterval
}
// Only update the database if the interval has changed
if i.currInterval != previousInterval {
// Use background context since this is for persistence
ctx := context.Background()
newInterval, err := i.repo.SetInterval(ctx, i.operationId, uuid.MustParse(i.resourceId), i.currInterval)
if err != nil {
i.l.Error().Err(err).Msg(fmt.Sprintf("error setting interval for resource %s", i.resourceId))
} else {
i.currInterval = newInterval
}
}
}