[server] Dependencies and sequences stored in db

This commit is contained in:
Abhishek Shroff
2025-06-19 01:37:42 +05:30
parent fe1b1e7975
commit 9e9155201f
4 changed files with 181 additions and 106 deletions

View File

@@ -12,10 +12,25 @@ CREATE TABLE jobs(
errors JSONB[]
);
CREATE INDEX next_available_job ON jobs(id) WHERE status = 0;
CREATE TABLE job_sequences(
job_id INTEGER REFERENCES jobs(id) ON DELETE CASCADE,
sequence TEXT NOT NULL,
PRIMARY KEY(job_id, sequence)
);
CREATE TABLE job_dependencies(
job_id INTEGER REFERENCES jobs(id) ON DELETE CASCADE,
dependency INTEGER REFERENCES jobs(id) ON DELETE CASCADE,
PRIMARY KEY(job_id, dependency)
);
CREATE INDEX next_available_job ON jobs(id) WHERE (status = 0 OR status = 2);
CREATE INDEX scheduled_jobs ON jobs(scheduled_at);
CREATE INDEX job_sequences_by_sequence ON job_sequences(sequence);
CREATE FUNCTION notify_job_inserted() RETURNS trigger AS $$
BEGIN
PERFORM pg_notify('job_inserted', row_to_json(NEW)::TEXT);
@@ -31,12 +46,18 @@ CREATE TRIGGER job_inserted
---- create above / drop below ----
DROP INDEX next_available_job;
DROP INDEX scheduled_jobs;
DROP TRIGGER job_inserted ON jobs;
DROP FUNCTION notify_job_inserted();
DROP TABLE jobs
DROP INDEX job_sequences_by_sequence;
DROP INDEX next_available_job;
DROP INDEX scheduled_jobs;
DROP TABLE sequence_jobs;
DROP TABLE job_dependencies;
DROP TABLE jobs;

View File

@@ -47,14 +47,32 @@ WHERE id = $1::INT`
}
}
func resetDB(db db.Handler) error {
const qResetStatus = "UPDATE jobs SET status = 0 WHERE status = 1 OR status = 6"
func resetRunningStatus(db db.Handler) error {
const qResetStatus = "UPDATE jobs SET status = 0 WHERE status = 3"
_, err := db.Exec(qResetStatus)
return err
}
func enqueueScheduledJobs(db db.Handler) (int, error) {
const q = "UPDATE jobs SET status = 0 WHERE status = 3 AND scheduled_at < NOW()"
func scheduleReadyJobs(db db.Handler) (int, error) {
const q = "UPDATE jobs SET status = 2 WHERE status = 5 AND scheduled_at < NOW()"
if tag, err := db.Exec(q); err != nil {
return 0, err
} else {
return int(tag.RowsAffected()), err
}
}
func unblockReadyJobs(db db.Handler) (int, error) {
const q = `-- unblockJobs
WITH cte (id, count) AS (
SELECT id, count(dependency) FROM jobs
LEFT JOIN job_dependencies
ON id = job_id
WHERE status = 1
GROUP BY id
) UPDATE jobs SET
status = 2
WHERE id IN (SELECT id FROM cte WHERE count = 0)`
if tag, err := db.Exec(q); err != nil {
return 0, err
} else {
@@ -64,17 +82,18 @@ func enqueueScheduledJobs(db db.Handler) (int, error) {
func claimNextJob(db db.Handler) (*JobInfo, error) {
const q = `-- ClaimNextJob
WITH cte (id) AS (
SELECT id FROM jobs
WHERE status = 0
WITH cte (id, status) AS (
SELECT id, status FROM jobs
WHERE status = 0 OR status = 2
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 1
) UPDATE jobs SET
status = 1,
status = 3,
attempted_at = now()
WHERE id = (SELECT id FROM cte)
RETURNING id, status, attempt, kind, args`
FROM cte
WHERE jobs.id = (SELECT id FROM cte)
RETURNING jobs.id, cte.status, jobs.attempt, jobs.kind, jobs.args`
if rows, err := db.Query(q); err != nil {
return nil, err
@@ -86,6 +105,42 @@ RETURNING id, status, attempt, kind, args`
return info, err
}
}
func insertDependencies(db db.Handler, jobID int32, sequences []string) (int, error) {
const q = `-- InsertDependencies
INSERT INTO job_dependencies(job_id, dependency)
(SELECT $1::INTEGER, job_id from job_sequences WHERE sequence = ANY ($2::TEXT[]))`
if tag, err := db.Exec(q, jobID, sequences); err != nil {
return 0, err
} else {
return int(tag.RowsAffected()), err
}
}
func insertSequences(db db.Handler, jobID int32, sequences []string) (int, error) {
const q = `-- InsertSequences
INSERT INTO job_sequences(job_id, sequence)
(SELECT $1::INTEGER, unnest($2::TEXT[]))`
if tag, err := db.Exec(q, jobID, sequences); err != nil {
return 0, err
} else {
return int(tag.RowsAffected()), err
}
}
func deleteDependencies(db db.Handler, jobID int32) error {
const q = `DELETE FROM job_dependencies WHERE dependency = $1::INTEGER`
_, err := db.Exec(q, jobID)
return err
}
func deleteSequences(db db.Handler, jobID int32) error {
const q = ` DELETE FROM job_sequences WHERE job_id = $1::INTEGER`
_, err := db.Exec(q, jobID)
return err
}
func scanJobInfo(row pgx.CollectableRow) (*JobInfo, error) {
var job JobInfo
err := row.Scan(

View File

@@ -40,10 +40,10 @@ type JobStatus uint8
const (
JobStatusQueued JobStatus = 0
JobStatusRunning JobStatus = 1
JobStatusCompleted JobStatus = 2
JobStatusScheduled JobStatus = 3
JobStatusFailed JobStatus = 4
JobStatusCancelled JobStatus = 5
JobStatusWaiting JobStatus = 6
JobStatusBlocked JobStatus = 1
JobStatusReady JobStatus = 2
JobStatusRunning JobStatus = 3
JobStatusCompleted JobStatus = 4
JobStatusScheduled JobStatus = 5
JobStatusFailed JobStatus = 6
)

View File

@@ -5,8 +5,6 @@ import (
"encoding/json"
"errors"
"fmt"
"slices"
"strings"
"sync"
"time"
@@ -18,13 +16,7 @@ import (
const channelName = "job_inserted"
const maxAttempts = 6
type errWaiting struct {
sequences []string
}
func (e errWaiting) Error() string {
return "Waiting for " + strings.Join(e.sequences, ",")
}
var errWaiting = errors.New("waiting for jobs")
type Client struct {
db db.Handler
@@ -36,13 +28,8 @@ type Client struct {
factories map[string]workUnitFactory
availableWorkers chan struct{}
// Job availability
subJobInserted pubsub.Subscription
muJobAvailable *sync.Mutex
condJobAvailable *sync.Cond
// Dependencies
muSeq *sync.Mutex
sequences map[string][]int32
waitingJobs map[int32][]string
subJobInserted pubsub.Subscription
jobAvailable chan struct{}
// Shutdown
wg sync.WaitGroup
@@ -84,22 +71,20 @@ func (c *Client) Start(ctx context.Context, notifier pubsub.Notifier) error {
if c.config.Workers <= 0 {
return errors.New("workers must be a positive integer")
}
if err := resetDB(c.db); err != nil {
if err := resetRunningStatus(c.db); err != nil {
return err
}
if _, err := enqueueScheduledJobs(c.db); err != nil {
if _, err := scheduleReadyJobs(c.db); err != nil {
return err
}
if _, err := unblockReadyJobs(c.db); err != nil {
return err
}
c.availableWorkers = make(chan struct{}, c.config.Workers)
c.muJobAvailable = &sync.Mutex{}
c.condJobAvailable = &sync.Cond{L: c.muJobAvailable}
c.jobAvailable = make(chan struct{})
c.defaultTimeout = time.Duration(c.config.Timeout) * time.Second
c.muSeq = &sync.Mutex{}
c.sequences = make(map[string][]int32)
c.waitingJobs = make(map[int32][]string)
c.logger.Info().Int("workers", c.config.Workers).Msg("Starting")
c.subJobInserted = notifier.Listen(channelName)
c.done = make(chan struct{})
@@ -123,7 +108,7 @@ func (c *Client) Start(ctx context.Context, notifier pubsub.Notifier) error {
func (c *Client) Stop(ctx context.Context, waitTimeout time.Duration) bool {
c.cancelListen()
c.cancelRun()
c.condJobAvailable.Signal() // Make sure the run loop isn't waiting
close(c.jobAvailable)
select {
case <-c.done:
@@ -160,12 +145,28 @@ func (c *Client) produceJobs(ctx context.Context) {
case <-ctx.Done():
return
case <-c.subJobInserted.NotificationC():
c.condJobAvailable.Signal()
select {
case c.jobAvailable <- struct{}{}:
default:
}
case <-t.C:
if n, err := enqueueScheduledJobs(c.db); err != nil {
c.logger.Warn().Err(err).Msg("Error enqueueing scheduled jobs")
if n, err := scheduleReadyJobs(c.db); err != nil {
c.logger.Warn().Err(err).Msg("Error scheduling ready jobs")
} else if n > 0 {
c.condJobAvailable.Signal()
c.logger.Trace().Int("n", n).Msg("Scheduled ready jobs")
select {
case c.jobAvailable <- struct{}{}:
default:
}
}
if n, err := unblockReadyJobs(c.db); err != nil {
c.logger.Warn().Err(err).Msg("Error unblocking ready jobs")
} else if n > 0 {
c.logger.Trace().Int("n", n).Msg("Unblocked ready jobs")
select {
case c.jobAvailable <- struct{}{}:
default:
}
}
}
}
@@ -182,9 +183,11 @@ loop:
if info, err := claimNextJob(c.db); err != nil {
if errors.Is(err, errNoJobs) {
// No available jobs. Wait for signal
c.muJobAvailable.Lock()
c.condJobAvailable.Wait()
c.muJobAvailable.Unlock()
select {
case <-runCtx.Done():
break loop
case <-c.jobAvailable:
}
// Make sure to mark the worker as available
<-c.availableWorkers
} else {
@@ -194,13 +197,12 @@ loop:
}
} else {
if err := c.execute(workCtx, info); err != nil {
if e, ok := err.(*errWaiting); ok {
if errors.Is(err, errWaiting) {
c.logger.
Trace().
Int32("id", info.ID).
Str("kind", info.Kind).
Str("args", string(info.EncodedArgs)).
Str("waiting", strings.Join(e.sequences, ",")).
Msg("Job Waiting")
} else {
c.recordError(info, err)
@@ -234,27 +236,28 @@ func (c *Client) execute(ctx context.Context, info *JobInfo) error {
return err
}
c.muSeq.Lock()
defer c.muSeq.Unlock()
var waiting []string
for _, s := range w.Sequences() {
j, ok := c.sequences[s]
if ok {
// This job is already at the head of the queue for this sequence
// i.e. we've been through here before
if j[0] == info.ID {
continue
// This is the first time we're seeing this job.
// Check dependencies and insert sequences
if info.Status == JobStatusQueued {
err := c.db.RunInTx(func(db db.TxHandler) error {
// Needs to happen before inserting sequences
if n, err := insertDependencies(db, info.ID, w.Sequences()); err != nil {
return err
} else if _, err := insertSequences(db, info.ID, w.Sequences()); err != nil {
return err
} else {
if n > 0 {
if err := updateJobStatus(c.db, info.ID, JobStatusBlocked); err != nil {
return err
}
return errWaiting
}
}
waiting = append(waiting, s)
}
c.sequences[s] = append(j, info.ID)
}
if waiting != nil {
updateJobStatus(c.db, info.ID, JobStatusWaiting)
c.waitingJobs[info.ID] = waiting
return &errWaiting{
sequences: waiting,
return nil
})
if err != nil {
return err
}
}
@@ -274,38 +277,34 @@ func (c *Client) execute(ctx context.Context, info *JobInfo) error {
if err := w.Work(ctx); err != nil {
c.recordError(info, err)
} else {
updateJobStatus(c.db, info.ID, JobStatusCompleted)
c.logger.
Trace().
Int("attempt", info.Attempt).
Int32("id", info.ID).
Msg("Job Completed")
c.muSeq.Lock()
defer c.muSeq.Unlock()
for _, s := range w.Sequences() {
j := c.sequences[s]
if len(j) == 1 {
delete(c.sequences, s)
} else {
c.sequences[s] = j[1:]
nextJobID := j[1]
if waiting := c.waitingJobs[nextJobID]; len(waiting) <= 1 {
c.logger.
Trace().
Int32("id", nextJobID).
Msg("Job Available")
delete(c.waitingJobs, nextJobID)
updateJobStatus(c.db, nextJobID, JobStatusQueued)
c.condJobAvailable.Signal()
} else {
// TODO: #untested
c.waitingJobs[nextJobID] = slices.DeleteFunc(waiting, func(seq string) bool {
return seq == s
})
}
err := c.db.RunInTx(func(db db.TxHandler) error {
if err := deleteDependencies(c.db, info.ID); err != nil {
return err
}
if err := deleteSequences(c.db, info.ID); err != nil {
return err
}
if err := updateJobStatus(c.db, info.ID, JobStatusCompleted); err != nil {
return err
}
return nil
})
if err != nil {
c.logger.
Warn().
Err(err).
Int("attempt", info.Attempt).
Int32("id", info.ID).
Msg("Error marking job as completed")
} else {
c.logger.
Trace().
Int("attempt", info.Attempt).
Int32("id", info.ID).
Msg("Job Completed")
}
}
}()
return nil