mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-04-20 00:11:13 -05:00
* fix: enhance SQL query name extraction in otel tracer and fallback * feat: propagate context to logger * feat: correlation ids * feat: add tenant ids * feat: more richness for partitionable things * fix: tests * feat: capture http errors on spans * Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
294 lines
6.8 KiB
Go
294 lines
6.8 KiB
Go
package ticker
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-co-op/gocron/v2"
|
|
"github.com/google/uuid"
|
|
"github.com/rs/zerolog"
|
|
|
|
"github.com/hatchet-dev/hatchet/internal/datautils"
|
|
"github.com/hatchet-dev/hatchet/internal/integrations/alerting"
|
|
"github.com/hatchet-dev/hatchet/internal/msgqueue"
|
|
"github.com/hatchet-dev/hatchet/internal/syncx"
|
|
"github.com/hatchet-dev/hatchet/pkg/logger"
|
|
v1 "github.com/hatchet-dev/hatchet/pkg/repository"
|
|
)
|
|
|
|
type Ticker interface {
|
|
Start(ctx context.Context) error
|
|
}
|
|
|
|
type TickerImpl struct {
|
|
mqv1 msgqueue.MessageQueue
|
|
l *zerolog.Logger
|
|
|
|
repov1 v1.Repository
|
|
s gocron.Scheduler
|
|
ta *alerting.TenantAlertManager
|
|
|
|
scheduledWorkflows syncx.Map[string, context.CancelFunc]
|
|
|
|
dv datautils.DataDecoderValidator
|
|
|
|
tickerId uuid.UUID
|
|
|
|
userCronScheduler gocron.Scheduler
|
|
userCronSchedulerLock sync.Mutex
|
|
|
|
userCronRefreshLock sync.Mutex
|
|
|
|
// maps a unique key for the cron schedule to a UUID, because the gocron library depends on uuids
|
|
// as unique identifiers for scheduled jobs
|
|
userCronSchedulesToIds map[string]string
|
|
}
|
|
|
|
type TickerOpt func(*TickerOpts)
|
|
|
|
type TickerOpts struct {
|
|
mqv1 msgqueue.MessageQueue
|
|
l *zerolog.Logger
|
|
|
|
repov1 v1.Repository
|
|
tickerId uuid.UUID
|
|
ta *alerting.TenantAlertManager
|
|
|
|
dv datautils.DataDecoderValidator
|
|
}
|
|
|
|
func defaultTickerOpts() *TickerOpts {
|
|
logger := logger.NewDefaultLogger("ticker")
|
|
return &TickerOpts{
|
|
l: &logger,
|
|
tickerId: uuid.New(),
|
|
dv: datautils.NewDataDecoderValidator(),
|
|
}
|
|
}
|
|
|
|
func WithMessageQueueV1(mq msgqueue.MessageQueue) TickerOpt {
|
|
return func(opts *TickerOpts) {
|
|
opts.mqv1 = mq
|
|
}
|
|
}
|
|
|
|
func WithRepositoryV1(r v1.Repository) TickerOpt {
|
|
return func(opts *TickerOpts) {
|
|
opts.repov1 = r
|
|
}
|
|
}
|
|
|
|
func WithLogger(l *zerolog.Logger) TickerOpt {
|
|
return func(opts *TickerOpts) {
|
|
opts.l = l
|
|
}
|
|
}
|
|
|
|
func WithTenantAlerter(ta *alerting.TenantAlertManager) TickerOpt {
|
|
return func(opts *TickerOpts) {
|
|
opts.ta = ta
|
|
}
|
|
}
|
|
|
|
func New(fs ...TickerOpt) (*TickerImpl, error) {
|
|
opts := defaultTickerOpts()
|
|
|
|
for _, f := range fs {
|
|
f(opts)
|
|
}
|
|
|
|
if opts.mqv1 == nil {
|
|
return nil, fmt.Errorf("task queue v1 is required. use WithMessageQueueV1")
|
|
}
|
|
|
|
if opts.repov1 == nil {
|
|
return nil, fmt.Errorf("repository v1 is required. use WithRepositoryV1")
|
|
}
|
|
|
|
if opts.ta == nil {
|
|
return nil, fmt.Errorf("tenant alerter is required. use WithTenantAlerter")
|
|
}
|
|
|
|
newLogger := opts.l.With().Str("service", "ticker").Logger()
|
|
opts.l = &newLogger
|
|
|
|
s, err := gocron.NewScheduler(gocron.WithLocation(time.UTC))
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("could not create scheduler: %w", err)
|
|
}
|
|
|
|
return &TickerImpl{
|
|
mqv1: opts.mqv1,
|
|
l: opts.l,
|
|
repov1: opts.repov1,
|
|
s: s,
|
|
dv: opts.dv,
|
|
tickerId: opts.tickerId,
|
|
ta: opts.ta,
|
|
userCronSchedulesToIds: make(map[string]string),
|
|
}, nil
|
|
}
|
|
|
|
func (t *TickerImpl) Start() (func() error, error) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
t.l.Debug().Ctx(ctx).Msgf("starting ticker %s", t.tickerId)
|
|
|
|
// initialize the cron schedules, so no need to wait for 15 seconds or
|
|
// a cron to be created
|
|
t.refreshCronSchedules(ctx)()
|
|
|
|
// add a handler to update the cron schedule on-demand when crons are created
|
|
cronUpdateHandler := func(task *msgqueue.Message) error {
|
|
t.refreshCronSchedules(ctx)()
|
|
return nil
|
|
}
|
|
queueCleanupFunc, err := t.mqv1.Subscribe(msgqueue.TICKER_UPDATE_QUEUE, cronUpdateHandler, msgqueue.NoOpHook)
|
|
if err != nil {
|
|
t.l.Err(err).Ctx(ctx).Msg("Could not subscribe to cron trigger update queue")
|
|
}
|
|
|
|
// register the ticker
|
|
_, err = t.repov1.Ticker().CreateNewTicker(ctx, &v1.CreateTickerOpts{
|
|
ID: t.tickerId,
|
|
})
|
|
|
|
if err != nil {
|
|
cancel()
|
|
return nil, err
|
|
}
|
|
|
|
_, err = t.s.NewJob(
|
|
gocron.DurationJob(time.Second*5),
|
|
gocron.NewTask(
|
|
t.runUpdateHeartbeat(ctx),
|
|
),
|
|
)
|
|
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("could not create update heartbeat job: %w", err)
|
|
}
|
|
|
|
_, err = t.s.NewJob(
|
|
// crons only have a resolution of 1 minute, so only poll every 15 seconds
|
|
gocron.DurationJob(time.Second*15),
|
|
gocron.NewTask(
|
|
t.refreshCronSchedules(ctx),
|
|
),
|
|
gocron.WithSingletonMode(gocron.LimitModeReschedule),
|
|
)
|
|
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("could not create poll cron schedules job: %w", err)
|
|
}
|
|
|
|
_, err = t.s.NewJob(
|
|
// we look ahead every 5 seconds
|
|
// FIXME: add another queue similar to cron jobs so that tasks scheduled less than 5 seconds
|
|
// into the future do not take 5 seconds to be triggered
|
|
gocron.DurationJob(time.Second*5),
|
|
gocron.NewTask(
|
|
t.runPollSchedules(ctx),
|
|
),
|
|
gocron.WithSingletonMode(gocron.LimitModeReschedule),
|
|
)
|
|
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("could not create poll cron schedules job: %w", err)
|
|
}
|
|
|
|
// poll for expiring tokens every 15 minutes
|
|
_, err = t.s.NewJob(
|
|
gocron.DurationJob(time.Minute*15),
|
|
gocron.NewTask(
|
|
t.runExpiringTokenAlerts(ctx),
|
|
),
|
|
)
|
|
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("could not schedule tenant alert polling: %w", err)
|
|
}
|
|
|
|
// poll for tenant resource limit alerts every 15 minutes
|
|
_, err = t.s.NewJob(
|
|
gocron.DurationJob(time.Minute*15),
|
|
gocron.NewTask(
|
|
t.runTenantResourceLimitAlerts(ctx),
|
|
),
|
|
)
|
|
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("could not schedule tenant resource limit alert polling: %w", err)
|
|
}
|
|
|
|
userCronScheduler, err := gocron.NewScheduler(gocron.WithLocation(time.UTC))
|
|
|
|
if err != nil {
|
|
cancel()
|
|
return nil, fmt.Errorf("could not create user cron scheduler: %w", err)
|
|
}
|
|
|
|
t.userCronScheduler = userCronScheduler
|
|
|
|
t.s.Start()
|
|
t.userCronScheduler.Start()
|
|
|
|
cleanup := func() error {
|
|
t.l.Debug().Ctx(ctx).Msg("removing ticker")
|
|
|
|
cancel()
|
|
|
|
if err = queueCleanupFunc(); err != nil {
|
|
t.l.Err(err).Ctx(ctx).Msg("Could not cleanup cron trigger update queue")
|
|
}
|
|
|
|
if err := t.s.Shutdown(); err != nil {
|
|
return fmt.Errorf("could not shutdown scheduler: %w", err)
|
|
}
|
|
|
|
if err := t.userCronScheduler.Shutdown(); err != nil {
|
|
return fmt.Errorf("could not shutdown user cron scheduler: %w", err)
|
|
}
|
|
|
|
deleteCtx, deleteCancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
defer deleteCancel()
|
|
|
|
// delete the ticker
|
|
err = t.repov1.Ticker().DeactivateTicker(deleteCtx, t.tickerId)
|
|
|
|
if err != nil {
|
|
t.l.Err(err).Ctx(ctx).Msg("could not delete ticker")
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
return cleanup, nil
|
|
}
|
|
|
|
func (t *TickerImpl) runUpdateHeartbeat(ctx context.Context) func() {
|
|
return func() {
|
|
t.l.Debug().Ctx(ctx).Msgf("ticker: updating heartbeat")
|
|
|
|
now := time.Now().UTC()
|
|
|
|
// update the heartbeat
|
|
_, err := t.repov1.Ticker().UpdateTicker(ctx, t.tickerId, &v1.UpdateTickerOpts{
|
|
LastHeartbeatAt: &now,
|
|
})
|
|
|
|
if err != nil {
|
|
t.l.Err(err).Ctx(ctx).Msg("could not update heartbeat")
|
|
}
|
|
}
|
|
}
|