diff --git a/server/internal/db/migrations/data/014_jobs.sql b/server/internal/db/migrations/data/014_jobs.sql index d4b1c332..1ff14d9d 100644 --- a/server/internal/db/migrations/data/014_jobs.sql +++ b/server/internal/db/migrations/data/014_jobs.sql @@ -12,10 +12,25 @@ CREATE TABLE jobs( errors JSONB[] ); -CREATE INDEX next_available_job ON jobs(id) WHERE status = 0; +CREATE TABLE job_sequences( + job_id INTEGER REFERENCES jobs(id) ON DELETE CASCADE, + sequence TEXT NOT NULL, + PRIMARY KEY(job_id, sequence) +); + +CREATE TABLE job_dependencies( + job_id INTEGER REFERENCES jobs(id) ON DELETE CASCADE, + dependency INTEGER REFERENCES jobs(id) ON DELETE CASCADE, + PRIMARY KEY(job_id, dependency) +); + + +CREATE INDEX next_available_job ON jobs(id) WHERE (status = 0 OR status = 2); CREATE INDEX scheduled_jobs ON jobs(scheduled_at); +CREATE INDEX job_sequences_by_sequence ON job_sequences(sequence); + CREATE FUNCTION notify_job_inserted() RETURNS trigger AS $$ BEGIN PERFORM pg_notify('job_inserted', row_to_json(NEW)::TEXT); @@ -31,12 +46,18 @@ CREATE TRIGGER job_inserted ---- create above / drop below ---- -DROP INDEX next_available_job; - -DROP INDEX scheduled_jobs; - DROP TRIGGER job_inserted ON jobs; DROP FUNCTION notify_job_inserted(); -DROP TABLE jobs +DROP INDEX job_sequences_by_sequence; + +DROP INDEX next_available_job; + +DROP INDEX scheduled_jobs; + +DROP TABLE sequence_jobs; + +DROP TABLE job_dependencies; + +DROP TABLE jobs; diff --git a/server/internal/steve/db.go b/server/internal/steve/db.go index a3472e65..b5eb65f0 100644 --- a/server/internal/steve/db.go +++ b/server/internal/steve/db.go @@ -47,14 +47,32 @@ WHERE id = $1::INT` } } -func resetDB(db db.Handler) error { - const qResetStatus = "UPDATE jobs SET status = 0 WHERE status = 1 OR status = 6" +func resetRunningStatus(db db.Handler) error { + const qResetStatus = "UPDATE jobs SET status = 0 WHERE status = 3" _, err := db.Exec(qResetStatus) return err } -func enqueueScheduledJobs(db db.Handler) (int, error) { - const q = "UPDATE jobs SET status = 0 WHERE status = 3 AND scheduled_at < NOW()" +func scheduleReadyJobs(db db.Handler) (int, error) { + const q = "UPDATE jobs SET status = 2 WHERE status = 5 AND scheduled_at < NOW()" + if tag, err := db.Exec(q); err != nil { + return 0, err + } else { + return int(tag.RowsAffected()), err + } +} + +func unblockReadyJobs(db db.Handler) (int, error) { + const q = `-- unblockJobs +WITH cte (id, count) AS ( + SELECT id, count(dependency) FROM jobs + LEFT JOIN job_dependencies + ON id = job_id + WHERE status = 1 + GROUP BY id +) UPDATE jobs SET + status = 2 +WHERE id IN (SELECT id FROM cte WHERE count = 0)` if tag, err := db.Exec(q); err != nil { return 0, err } else { @@ -64,17 +82,18 @@ func enqueueScheduledJobs(db db.Handler) (int, error) { func claimNextJob(db db.Handler) (*JobInfo, error) { const q = `-- ClaimNextJob -WITH cte (id) AS ( - SELECT id FROM jobs - WHERE status = 0 +WITH cte (id, status) AS ( + SELECT id, status FROM jobs + WHERE status = 0 OR status = 2 ORDER BY id FOR UPDATE SKIP LOCKED LIMIT 1 ) UPDATE jobs SET - status = 1, + status = 3, attempted_at = now() -WHERE id = (SELECT id FROM cte) -RETURNING id, status, attempt, kind, args` +FROM cte + WHERE jobs.id = (SELECT id FROM cte) + RETURNING jobs.id, cte.status, jobs.attempt, jobs.kind, jobs.args` if rows, err := db.Query(q); err != nil { return nil, err @@ -86,6 +105,42 @@ RETURNING id, status, attempt, kind, args` return info, err } } + +func insertDependencies(db db.Handler, jobID int32, sequences []string) (int, error) { + const q = `-- InsertDependencies +INSERT INTO job_dependencies(job_id, dependency) + (SELECT $1::INTEGER, job_id from job_sequences WHERE sequence = ANY ($2::TEXT[]))` + if tag, err := db.Exec(q, jobID, sequences); err != nil { + return 0, err + } else { + return int(tag.RowsAffected()), err + } +} + +func insertSequences(db db.Handler, jobID int32, sequences []string) (int, error) { + const q = `-- InsertSequences +INSERT INTO job_sequences(job_id, sequence) + (SELECT $1::INTEGER, unnest($2::TEXT[]))` + + if tag, err := db.Exec(q, jobID, sequences); err != nil { + return 0, err + } else { + return int(tag.RowsAffected()), err + } +} + +func deleteDependencies(db db.Handler, jobID int32) error { + const q = `DELETE FROM job_dependencies WHERE dependency = $1::INTEGER` + _, err := db.Exec(q, jobID) + return err +} + +func deleteSequences(db db.Handler, jobID int32) error { + const q = ` DELETE FROM job_sequences WHERE job_id = $1::INTEGER` + _, err := db.Exec(q, jobID) + return err +} + func scanJobInfo(row pgx.CollectableRow) (*JobInfo, error) { var job JobInfo err := row.Scan( diff --git a/server/internal/steve/job.go b/server/internal/steve/job.go index 35136ddf..f2e2631a 100644 --- a/server/internal/steve/job.go +++ b/server/internal/steve/job.go @@ -40,10 +40,10 @@ type JobStatus uint8 const ( JobStatusQueued JobStatus = 0 - JobStatusRunning JobStatus = 1 - JobStatusCompleted JobStatus = 2 - JobStatusScheduled JobStatus = 3 - JobStatusFailed JobStatus = 4 - JobStatusCancelled JobStatus = 5 - JobStatusWaiting JobStatus = 6 + JobStatusBlocked JobStatus = 1 + JobStatusReady JobStatus = 2 + JobStatusRunning JobStatus = 3 + JobStatusCompleted JobStatus = 4 + JobStatusScheduled JobStatus = 5 + JobStatusFailed JobStatus = 6 ) diff --git a/server/internal/steve/steve.go b/server/internal/steve/steve.go index 55e4c286..39de33f1 100644 --- a/server/internal/steve/steve.go +++ b/server/internal/steve/steve.go @@ -5,8 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "slices" - "strings" "sync" "time" @@ -18,13 +16,7 @@ import ( const channelName = "job_inserted" const maxAttempts = 6 -type errWaiting struct { - sequences []string -} - -func (e errWaiting) Error() string { - return "Waiting for " + strings.Join(e.sequences, ",") -} +var errWaiting = errors.New("waiting for jobs") type Client struct { db db.Handler @@ -36,13 +28,8 @@ type Client struct { factories map[string]workUnitFactory availableWorkers chan struct{} // Job availability - subJobInserted pubsub.Subscription - muJobAvailable *sync.Mutex - condJobAvailable *sync.Cond - // Dependencies - muSeq *sync.Mutex - sequences map[string][]int32 - waitingJobs map[int32][]string + subJobInserted pubsub.Subscription + jobAvailable chan struct{} // Shutdown wg sync.WaitGroup @@ -84,22 +71,20 @@ func (c *Client) Start(ctx context.Context, notifier pubsub.Notifier) error { if c.config.Workers <= 0 { return errors.New("workers must be a positive integer") } - if err := resetDB(c.db); err != nil { + if err := resetRunningStatus(c.db); err != nil { return err } - if _, err := enqueueScheduledJobs(c.db); err != nil { + if _, err := scheduleReadyJobs(c.db); err != nil { + return err + } + if _, err := unblockReadyJobs(c.db); err != nil { return err } c.availableWorkers = make(chan struct{}, c.config.Workers) - c.muJobAvailable = &sync.Mutex{} - c.condJobAvailable = &sync.Cond{L: c.muJobAvailable} + c.jobAvailable = make(chan struct{}) c.defaultTimeout = time.Duration(c.config.Timeout) * time.Second - c.muSeq = &sync.Mutex{} - c.sequences = make(map[string][]int32) - c.waitingJobs = make(map[int32][]string) - c.logger.Info().Int("workers", c.config.Workers).Msg("Starting") c.subJobInserted = notifier.Listen(channelName) c.done = make(chan struct{}) @@ -123,7 +108,7 @@ func (c *Client) Start(ctx context.Context, notifier pubsub.Notifier) error { func (c *Client) Stop(ctx context.Context, waitTimeout time.Duration) bool { c.cancelListen() c.cancelRun() - c.condJobAvailable.Signal() // Make sure the run loop isn't waiting + close(c.jobAvailable) select { case <-c.done: @@ -160,12 +145,28 @@ func (c *Client) produceJobs(ctx context.Context) { case <-ctx.Done(): return case <-c.subJobInserted.NotificationC(): - c.condJobAvailable.Signal() + select { + case c.jobAvailable <- struct{}{}: + default: + } case <-t.C: - if n, err := enqueueScheduledJobs(c.db); err != nil { - c.logger.Warn().Err(err).Msg("Error enqueueing scheduled jobs") + if n, err := scheduleReadyJobs(c.db); err != nil { + c.logger.Warn().Err(err).Msg("Error scheduling ready jobs") } else if n > 0 { - c.condJobAvailable.Signal() + c.logger.Trace().Int("n", n).Msg("Scheduled ready jobs") + select { + case c.jobAvailable <- struct{}{}: + default: + } + } + if n, err := unblockReadyJobs(c.db); err != nil { + c.logger.Warn().Err(err).Msg("Error unblocking ready jobs") + } else if n > 0 { + c.logger.Trace().Int("n", n).Msg("Unblocked ready jobs") + select { + case c.jobAvailable <- struct{}{}: + default: + } } } } @@ -182,9 +183,11 @@ loop: if info, err := claimNextJob(c.db); err != nil { if errors.Is(err, errNoJobs) { // No available jobs. Wait for signal - c.muJobAvailable.Lock() - c.condJobAvailable.Wait() - c.muJobAvailable.Unlock() + select { + case <-runCtx.Done(): + break loop + case <-c.jobAvailable: + } // Make sure to mark the worker as available <-c.availableWorkers } else { @@ -194,13 +197,12 @@ loop: } } else { if err := c.execute(workCtx, info); err != nil { - if e, ok := err.(*errWaiting); ok { + if errors.Is(err, errWaiting) { c.logger. Trace(). Int32("id", info.ID). Str("kind", info.Kind). Str("args", string(info.EncodedArgs)). - Str("waiting", strings.Join(e.sequences, ",")). Msg("Job Waiting") } else { c.recordError(info, err) @@ -234,27 +236,28 @@ func (c *Client) execute(ctx context.Context, info *JobInfo) error { return err } - c.muSeq.Lock() - defer c.muSeq.Unlock() - - var waiting []string - for _, s := range w.Sequences() { - j, ok := c.sequences[s] - if ok { - // This job is already at the head of the queue for this sequence - // i.e. we've been through here before - if j[0] == info.ID { - continue + // This is the first time we're seeing this job. + // Check dependencies and insert sequences + if info.Status == JobStatusQueued { + err := c.db.RunInTx(func(db db.TxHandler) error { + // Needs to happen before inserting sequences + if n, err := insertDependencies(db, info.ID, w.Sequences()); err != nil { + return err + } else if _, err := insertSequences(db, info.ID, w.Sequences()); err != nil { + return err + } else { + if n > 0 { + if err := updateJobStatus(c.db, info.ID, JobStatusBlocked); err != nil { + return err + } + return errWaiting + } } - waiting = append(waiting, s) - } - c.sequences[s] = append(j, info.ID) - } - if waiting != nil { - updateJobStatus(c.db, info.ID, JobStatusWaiting) - c.waitingJobs[info.ID] = waiting - return &errWaiting{ - sequences: waiting, + + return nil + }) + if err != nil { + return err } } @@ -274,38 +277,34 @@ func (c *Client) execute(ctx context.Context, info *JobInfo) error { if err := w.Work(ctx); err != nil { c.recordError(info, err) } else { - updateJobStatus(c.db, info.ID, JobStatusCompleted) - c.logger. - Trace(). - Int("attempt", info.Attempt). - Int32("id", info.ID). - Msg("Job Completed") - - c.muSeq.Lock() - defer c.muSeq.Unlock() - for _, s := range w.Sequences() { - j := c.sequences[s] - if len(j) == 1 { - delete(c.sequences, s) - } else { - c.sequences[s] = j[1:] - nextJobID := j[1] - if waiting := c.waitingJobs[nextJobID]; len(waiting) <= 1 { - c.logger. - Trace(). - Int32("id", nextJobID). - Msg("Job Available") - delete(c.waitingJobs, nextJobID) - updateJobStatus(c.db, nextJobID, JobStatusQueued) - c.condJobAvailable.Signal() - } else { - // TODO: #untested - c.waitingJobs[nextJobID] = slices.DeleteFunc(waiting, func(seq string) bool { - return seq == s - }) - } + err := c.db.RunInTx(func(db db.TxHandler) error { + if err := deleteDependencies(c.db, info.ID); err != nil { + return err } + if err := deleteSequences(c.db, info.ID); err != nil { + return err + } + + if err := updateJobStatus(c.db, info.ID, JobStatusCompleted); err != nil { + return err + } + return nil + }) + if err != nil { + c.logger. + Warn(). + Err(err). + Int("attempt", info.Attempt). + Int32("id", info.ID). + Msg("Error marking job as completed") + } else { + c.logger. + Trace(). + Int("attempt", info.Attempt). + Int32("id", info.ID). + Msg("Job Completed") } + } }() return nil