mirror of
https://codeberg.org/shroff/phylum.git
synced 2026-01-01 09:09:34 -06:00
[server][steve] Graceful shutdown
This commit is contained in:
@@ -31,9 +31,8 @@ type JobRow struct {
|
||||
}
|
||||
|
||||
type AttemptError struct {
|
||||
At time.Time `json:"at"`
|
||||
Attempt int `json:"attempt"`
|
||||
Error string `json:"error"`
|
||||
At time.Time `json:"at"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
type JobStatus uint8
|
||||
|
||||
@@ -17,19 +17,24 @@ const channelName = "job_inserted"
|
||||
const maxAttempts = 6
|
||||
|
||||
type Client struct {
|
||||
config Config
|
||||
db db.Handler
|
||||
logger zerolog.Logger
|
||||
db db.Handler
|
||||
logger zerolog.Logger
|
||||
// Config
|
||||
config Config
|
||||
defaultTimeout time.Duration
|
||||
// Running
|
||||
factories map[string]workUnitFactory
|
||||
availableWorkers chan struct{}
|
||||
// Job availability
|
||||
subJobInserted pubsub.Subscription
|
||||
muJobAvailable *sync.Mutex
|
||||
condJobAvailable *sync.Cond
|
||||
sub pubsub.Subscription
|
||||
cancelListen context.CancelFunc
|
||||
cancelRun context.CancelFunc
|
||||
cancelWork context.CancelFunc
|
||||
// Config
|
||||
defaultTimeout time.Duration
|
||||
// Shutdown
|
||||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
cancelListen context.CancelFunc
|
||||
cancelRun context.CancelFunc
|
||||
cancelWork context.CancelFunc
|
||||
}
|
||||
|
||||
func NewClient(db db.Handler, logger zerolog.Logger, config Config) *Client {
|
||||
@@ -58,23 +63,27 @@ func RegisterWorker[T JobArgs](client *Client, worker Worker[T]) {
|
||||
}
|
||||
|
||||
func (c *Client) Start(ctx context.Context, notifier pubsub.Notifier) error {
|
||||
if c.sub != nil {
|
||||
if c.subJobInserted != nil {
|
||||
return errors.New("already started")
|
||||
}
|
||||
if c.config.Workers <= 0 {
|
||||
return errors.New("workers must be a positive integer")
|
||||
}
|
||||
if err := resetRunningJobs(c.db); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := enqueueScheduledJobs(c.db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.availableWorkers = make(chan struct{}, c.config.Workers)
|
||||
c.muJobAvailable = &sync.Mutex{}
|
||||
c.condJobAvailable = &sync.Cond{L: c.muJobAvailable}
|
||||
c.defaultTimeout = time.Duration(c.config.Timeout) * time.Second
|
||||
|
||||
c.logger.Info().Int("workers", c.config.Workers).Msg("Starting Job Processing")
|
||||
c.sub = notifier.Listen(channelName)
|
||||
|
||||
if err := resetRunningJobs(c.db); err != nil {
|
||||
return err
|
||||
}
|
||||
c.logger.Info().Int("workers", c.config.Workers).Msg("Starting")
|
||||
c.subJobInserted = notifier.Listen(channelName)
|
||||
c.done = make(chan struct{})
|
||||
|
||||
var listenCtx, runCtx, workCtx context.Context
|
||||
listenCtx, c.cancelListen = context.WithCancel(ctx)
|
||||
@@ -85,27 +94,42 @@ func (c *Client) Start(ctx context.Context, notifier pubsub.Notifier) error {
|
||||
|
||||
// Wait until the listener is established
|
||||
select {
|
||||
case <-c.sub.EstablishedC():
|
||||
case <-c.subJobInserted.EstablishedC():
|
||||
default:
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) Stop(ctx context.Context, hard bool) bool {
|
||||
c.logger.Info().Bool("hard", hard).Msg("Stop")
|
||||
c.cancelListen()
|
||||
c.cancelRun()
|
||||
if hard {
|
||||
c.cancelWork()
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done(): // stop context cancelled
|
||||
c.logger.Warn().Msg("Stop timeout reached. Some jobs still running")
|
||||
return false
|
||||
case <-c.done:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) produceJobs(ctx context.Context) {
|
||||
t := time.NewTicker(5 * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-c.sub.NotificationC():
|
||||
c.logger.Debug().Msg("Job available")
|
||||
case <-c.subJobInserted.NotificationC():
|
||||
c.condJobAvailable.Signal()
|
||||
case <-t.C:
|
||||
if n, err := enqueueScheduledJobs(c.db); err != nil {
|
||||
c.logger.Warn().Err(err).Msg("Error enqueueing scheduled jobs")
|
||||
} else if n > 0 {
|
||||
c.logger.Debug().Int("count", n).Msg("Enqueued scheduled jobs")
|
||||
c.condJobAvailable.Signal()
|
||||
}
|
||||
}
|
||||
@@ -122,12 +146,10 @@ loop:
|
||||
case c.availableWorkers <- struct{}{}:
|
||||
if info, err := claimNextJob(c.db); err != nil {
|
||||
if errors.Is(err, errNoJobs) {
|
||||
c.logger.Debug().Msg("No jobs available. Sleeping until notified")
|
||||
// No available jobs. Wait for signal
|
||||
c.muJobAvailable.Lock()
|
||||
c.condJobAvailable.Wait()
|
||||
c.muJobAvailable.Unlock()
|
||||
c.logger.Debug().Msg("New job!!")
|
||||
// Make sure to mark the worker as available
|
||||
<-c.availableWorkers
|
||||
} else {
|
||||
@@ -136,11 +158,13 @@ loop:
|
||||
c.logger.Fatal().Err(err).Msg("Failed to claim job")
|
||||
}
|
||||
} else {
|
||||
c.logger.Debug().Int32("id", info.ID).Str("kind", info.Kind).Bytes("args", info.EncodedArgs).Msg("Executing job")
|
||||
c.logger.Trace().Int32("id", info.ID).Str("kind", info.Kind).Bytes("args", info.EncodedArgs).Msg("Executing job")
|
||||
c.execute(workCtx, info)
|
||||
}
|
||||
}
|
||||
}
|
||||
c.wg.Wait()
|
||||
close(c.done)
|
||||
}
|
||||
|
||||
func (c *Client) execute(ctx context.Context, info *JobInfo) {
|
||||
@@ -163,8 +187,10 @@ func (c *Client) execute(ctx context.Context, info *JobInfo) {
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
c.wg.Add(1)
|
||||
go func() {
|
||||
defer func() {
|
||||
c.wg.Done()
|
||||
<-c.availableWorkers
|
||||
}()
|
||||
if err := w.Work(ctx); err != nil {
|
||||
@@ -176,15 +202,19 @@ func (c *Client) execute(ctx context.Context, info *JobInfo) {
|
||||
}
|
||||
|
||||
func (c *Client) recordError(info *JobInfo, err error) error {
|
||||
scheduleAt := computeBackoff(info.Attempt + 1)
|
||||
c.logger.Warn().Err(err).Int32("id", info.ID).Str("kind", info.Kind).Bytes("args", info.EncodedArgs).Time("schedule_at", scheduleAt.Time).Msg("Error executing jobs")
|
||||
scheduleAt := computeBackoff(info.Attempt)
|
||||
c.logger.
|
||||
Warn().
|
||||
Err(err).
|
||||
Int32("id", info.ID).
|
||||
Int("attempt", info.Attempt+1).
|
||||
Msg("Error executing job")
|
||||
return updateJobStatusError(
|
||||
c.db,
|
||||
info.ID,
|
||||
AttemptError{
|
||||
At: time.Now(),
|
||||
Attempt: info.Attempt + 1,
|
||||
Error: err.Error(),
|
||||
At: time.Now(),
|
||||
Error: err.Error(),
|
||||
},
|
||||
scheduleAt,
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user