diff --git a/server/internal/steve/job.go b/server/internal/steve/job.go index 464fb746..c7330f41 100644 --- a/server/internal/steve/job.go +++ b/server/internal/steve/job.go @@ -31,9 +31,8 @@ type JobRow struct { } type AttemptError struct { - At time.Time `json:"at"` - Attempt int `json:"attempt"` - Error string `json:"error"` + At time.Time `json:"at"` + Error string `json:"error"` } type JobStatus uint8 diff --git a/server/internal/steve/steve.go b/server/internal/steve/steve.go index 5ff3f224..5a4d410a 100644 --- a/server/internal/steve/steve.go +++ b/server/internal/steve/steve.go @@ -17,19 +17,24 @@ const channelName = "job_inserted" const maxAttempts = 6 type Client struct { - config Config - db db.Handler - logger zerolog.Logger + db db.Handler + logger zerolog.Logger + // Config + config Config + defaultTimeout time.Duration + // Running factories map[string]workUnitFactory availableWorkers chan struct{} + // Job availability + subJobInserted pubsub.Subscription muJobAvailable *sync.Mutex condJobAvailable *sync.Cond - sub pubsub.Subscription - cancelListen context.CancelFunc - cancelRun context.CancelFunc - cancelWork context.CancelFunc - // Config - defaultTimeout time.Duration + // Shutdown + wg sync.WaitGroup + done chan struct{} + cancelListen context.CancelFunc + cancelRun context.CancelFunc + cancelWork context.CancelFunc } func NewClient(db db.Handler, logger zerolog.Logger, config Config) *Client { @@ -58,23 +63,27 @@ func RegisterWorker[T JobArgs](client *Client, worker Worker[T]) { } func (c *Client) Start(ctx context.Context, notifier pubsub.Notifier) error { - if c.sub != nil { + if c.subJobInserted != nil { return errors.New("already started") } if c.config.Workers <= 0 { return errors.New("workers must be a positive integer") } + if err := resetRunningJobs(c.db); err != nil { + return err + } + if _, err := enqueueScheduledJobs(c.db); err != nil { + return err + } + c.availableWorkers = make(chan struct{}, c.config.Workers) c.muJobAvailable = &sync.Mutex{} c.condJobAvailable = &sync.Cond{L: c.muJobAvailable} c.defaultTimeout = time.Duration(c.config.Timeout) * time.Second - c.logger.Info().Int("workers", c.config.Workers).Msg("Starting Job Processing") - c.sub = notifier.Listen(channelName) - - if err := resetRunningJobs(c.db); err != nil { - return err - } + c.logger.Info().Int("workers", c.config.Workers).Msg("Starting") + c.subJobInserted = notifier.Listen(channelName) + c.done = make(chan struct{}) var listenCtx, runCtx, workCtx context.Context listenCtx, c.cancelListen = context.WithCancel(ctx) @@ -85,27 +94,42 @@ func (c *Client) Start(ctx context.Context, notifier pubsub.Notifier) error { // Wait until the listener is established select { - case <-c.sub.EstablishedC(): + case <-c.subJobInserted.EstablishedC(): default: } return nil } +func (c *Client) Stop(ctx context.Context, hard bool) bool { + c.logger.Info().Bool("hard", hard).Msg("Stop") + c.cancelListen() + c.cancelRun() + if hard { + c.cancelWork() + } + + select { + case <-ctx.Done(): // stop context cancelled + c.logger.Warn().Msg("Stop timeout reached. Some jobs still running") + return false + case <-c.done: + return true + } +} + func (c *Client) produceJobs(ctx context.Context) { t := time.NewTicker(5 * time.Second) for { select { case <-ctx.Done(): return - case <-c.sub.NotificationC(): - c.logger.Debug().Msg("Job available") + case <-c.subJobInserted.NotificationC(): c.condJobAvailable.Signal() case <-t.C: if n, err := enqueueScheduledJobs(c.db); err != nil { c.logger.Warn().Err(err).Msg("Error enqueueing scheduled jobs") } else if n > 0 { - c.logger.Debug().Int("count", n).Msg("Enqueued scheduled jobs") c.condJobAvailable.Signal() } } @@ -122,12 +146,10 @@ loop: case c.availableWorkers <- struct{}{}: if info, err := claimNextJob(c.db); err != nil { if errors.Is(err, errNoJobs) { - c.logger.Debug().Msg("No jobs available. Sleeping until notified") // No available jobs. Wait for signal c.muJobAvailable.Lock() c.condJobAvailable.Wait() c.muJobAvailable.Unlock() - c.logger.Debug().Msg("New job!!") // Make sure to mark the worker as available <-c.availableWorkers } else { @@ -136,11 +158,13 @@ loop: c.logger.Fatal().Err(err).Msg("Failed to claim job") } } else { - c.logger.Debug().Int32("id", info.ID).Str("kind", info.Kind).Bytes("args", info.EncodedArgs).Msg("Executing job") + c.logger.Trace().Int32("id", info.ID).Str("kind", info.Kind).Bytes("args", info.EncodedArgs).Msg("Executing job") c.execute(workCtx, info) } } } + c.wg.Wait() + close(c.done) } func (c *Client) execute(ctx context.Context, info *JobInfo) { @@ -163,8 +187,10 @@ func (c *Client) execute(ctx context.Context, info *JobInfo) { defer cancel() } + c.wg.Add(1) go func() { defer func() { + c.wg.Done() <-c.availableWorkers }() if err := w.Work(ctx); err != nil { @@ -176,15 +202,19 @@ func (c *Client) execute(ctx context.Context, info *JobInfo) { } func (c *Client) recordError(info *JobInfo, err error) error { - scheduleAt := computeBackoff(info.Attempt + 1) - c.logger.Warn().Err(err).Int32("id", info.ID).Str("kind", info.Kind).Bytes("args", info.EncodedArgs).Time("schedule_at", scheduleAt.Time).Msg("Error executing jobs") + scheduleAt := computeBackoff(info.Attempt) + c.logger. + Warn(). + Err(err). + Int32("id", info.ID). + Int("attempt", info.Attempt+1). + Msg("Error executing job") return updateJobStatusError( c.db, info.ID, AttemptError{ - At: time.Now(), - Attempt: info.Attempt + 1, - Error: err.Error(), + At: time.Now(), + Error: err.Error(), }, scheduleAt, )