From 0c8b91f513ae9355ff19fe3ec358d507a97a4e5c Mon Sep 17 00:00:00 2001 From: Abhishek Shroff Date: Sun, 22 Jun 2025 14:06:33 +0530 Subject: [PATCH] [server] Move from river to in-house job queue --- server/internal/command/command.go | 32 ++++++++++ server/internal/command/fs/import.go | 5 -- server/internal/command/serve/cmd.go | 55 ++++++++++++---- server/internal/core/resource_copy_move.go | 2 +- server/internal/core/resource_create.go | 5 +- server/internal/core/trash.go | 3 +- .../internal/db/migrations/data/014_jobs.sql | 2 +- server/internal/jobs/copy_contents.go | 23 +++---- server/internal/jobs/delete_contents.go | 24 +++---- server/internal/jobs/jobs.go | 62 ------------------- server/internal/jobs/migrate.go | 14 ++--- server/internal/steve/steve.go | 13 ++-- 12 files changed, 113 insertions(+), 127 deletions(-) delete mode 100644 server/internal/jobs/jobs.go diff --git a/server/internal/command/command.go b/server/internal/command/command.go index af6a3d46..4619a45f 100644 --- a/server/internal/command/command.go +++ b/server/internal/command/command.go @@ -16,7 +16,9 @@ import ( "codeberg.org/shroff/phylum/server/internal/command/user" "codeberg.org/shroff/phylum/server/internal/core" "codeberg.org/shroff/phylum/server/internal/db" + "codeberg.org/shroff/phylum/server/internal/jobs" "codeberg.org/shroff/phylum/server/internal/mail" + "codeberg.org/shroff/phylum/server/internal/steve" "codeberg.org/shroff/phylum/server/internal/storage" "github.com/google/uuid" "github.com/knadh/koanf/parsers/yaml" @@ -25,6 +27,8 @@ import ( "github.com/knadh/koanf/providers/posflag" "github.com/knadh/koanf/providers/rawbytes" "github.com/knadh/koanf/v2" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -62,11 +66,15 @@ func SetupCommand() { } } + log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: "15:04:05.999"}) + uuid.EnableRandPool() cmd.PersistentPreRun = func(cmd *cobra.Command, args []string) { if cmd.Name() == "serve" { + zerolog.SetGlobalLevel(zerolog.InfoLevel) logrus.SetLevel(logrus.InfoLevel) } else { + zerolog.SetGlobalLevel(zerolog.WarnLevel) logrus.SetLevel(logrus.WarnLevel) } @@ -108,6 +116,7 @@ func SetupCommand() { k.UnmarshalWithConf("", &cfg, koanf.UnmarshalConf{Tag: "koanf"}) if cfg.Debug { + zerolog.SetGlobalLevel(zerolog.TraceLevel) logrus.SetLevel(logrus.TraceLevel) logrus.Debug("Running in debug mode") cfg.Server.Debug = true @@ -121,6 +130,11 @@ func SetupCommand() { auth.Cfg = cfg.Auth crypt.Cfg = cfg.Auth.Password.Crypt + if !isCmd(cmd, "schema") { + // This will initialize the db, which we don't want yet. + initializeSteve() + } + if err := storage.Initialize(db.Get(context.Background())); err != nil { logrus.Fatal("Failed to initialize storage: " + err.Error()) } @@ -142,3 +156,21 @@ func SetupCommand() { cmd.SetCompletionCommandGroupID("misc") cmd.Execute() } + +func isCmd(cmd *cobra.Command, s string) bool { + for c := cmd; c != nil; c = c.Parent() { + if cmd.Name() == s { + return true + } + } + return false +} + +func initializeSteve() { + ctx := context.Background() + steve.Initialize(db.Get(ctx), log.Logger, steve.Config{Workers: 3}) + client := steve.Get() + steve.RegisterWorker(client, &jobs.MigrateWorker{}) + steve.RegisterWorker(client, &jobs.CopyContentsWorker{}) + steve.RegisterWorker(client, &jobs.DeleteContentsWorker{}) +} diff --git a/server/internal/command/fs/import.go b/server/internal/command/fs/import.go index 2a39602b..a1d1d98b 100644 --- a/server/internal/command/fs/import.go +++ b/server/internal/command/fs/import.go @@ -1,7 +1,6 @@ package fs import ( - "context" "errors" "fmt" "io" @@ -11,8 +10,6 @@ import ( "codeberg.org/shroff/phylum/server/internal/command/common" "codeberg.org/shroff/phylum/server/internal/core" - "codeberg.org/shroff/phylum/server/internal/db" - "codeberg.org/shroff/phylum/server/internal/jobs" "github.com/google/uuid" "github.com/spf13/cobra" ) @@ -63,7 +60,6 @@ func setupImportCommand() *cobra.Command { } if stat.IsDir() { - jobs.Initialize(context.Background(), db.Pool()) dirFS := os.DirFS(importPath) err = iofs.WalkDir(dirFS, ".", func(p string, d iofs.DirEntry, err error) error { @@ -86,7 +82,6 @@ func setupImportCommand() *cobra.Command { }) //TODO #jobs. We don't need to run or wait for all jobs fmt.Println("Waiting for jobs to finish") - jobs.Shutdown(context.Background()) } else { err = copyContents(f, importPath, res) } diff --git a/server/internal/command/serve/cmd.go b/server/internal/command/serve/cmd.go index e66f3c76..fa0d77d3 100644 --- a/server/internal/command/serve/cmd.go +++ b/server/internal/command/serve/cmd.go @@ -2,19 +2,22 @@ package serve import ( "context" + "errors" "net/http" + "os" + "os/signal" "path" "strconv" "strings" + "sync" + "syscall" "time" "codeberg.org/shroff/phylum/server/internal/api/publink" apiv1 "codeberg.org/shroff/phylum/server/internal/api/v1" "codeberg.org/shroff/phylum/server/internal/api/webdav" "codeberg.org/shroff/phylum/server/internal/core" - "codeberg.org/shroff/phylum/server/internal/db" - "codeberg.org/shroff/phylum/server/internal/jobs" - "github.com/fvbock/endless" + "codeberg.org/shroff/phylum/server/internal/steve" "github.com/gin-contrib/static" "github.com/gin-gonic/gin" "github.com/sirupsen/logrus" @@ -59,21 +62,47 @@ func SetupCommand() *cobra.Command { setupWebApp(engine, "web") - err := jobs.Initialize(context.Background(), db.Pool()) - if err != nil { - logrus.Fatal("Failed to initialize jobs: " + err.Error()) - } setupTrashCompactor() listen := Cfg.Host + ":" + strconv.Itoa(Cfg.Port) - server := endless.NewServer(listen, engine) - server.BeforeBegin = func(addr string) { - logrus.Info("Listening on " + addr) + server := http.Server{ + Addr: listen, + Handler: engine, + ReadHeaderTimeout: 10 * time.Second, } - if err := server.ListenAndServe(); err != nil { - logrus.Fatal(err.Error()) - } + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT) + closeWG := &sync.WaitGroup{} + + closeWG.Add(1) + go func() { + defer closeWG.Done() + if err := server.ListenAndServe(); err != nil { + if !errors.Is(err, http.ErrServerClosed) { + logrus.Fatal("Failed to start server: " + err.Error()) + } + } + }() + + closeWG.Add(1) + go func() { + defer closeWG.Done() + if err := steve.Get().Run(context.Background()); err != nil { + if !errors.Is(err, http.ErrServerClosed) { + logrus.Fatal("Failed to start jobs client: " + err.Error()) + } + } + }() + + <-sigChan + logrus.Info("Shutting Down") + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + go server.Shutdown(ctx) + go steve.Get().Shutdown(ctx) + + closeWG.Wait() } return cmd diff --git a/server/internal/core/resource_copy_move.go b/server/internal/core/resource_copy_move.go index e74dc6d7..980fe37b 100644 --- a/server/internal/core/resource_copy_move.go +++ b/server/internal/core/resource_copy_move.go @@ -252,7 +252,7 @@ func (f txFileSystem) Copy(src Resource, target string, requestedID uuid.UUID, r } } - if err := jobs.CopyContents(copyContents); err != nil { + if err := jobs.CopyContents(f.db, copyContents); err != nil { return Resource{}, false, err } diff --git a/server/internal/core/resource_create.go b/server/internal/core/resource_create.go index c51cf375..4458596c 100644 --- a/server/internal/core/resource_create.go +++ b/server/internal/core/resource_create.go @@ -82,8 +82,7 @@ func (f *FileSystem) CreateFileByPath(path string, requestedID, versionID uuid.U return err } - // TODO: #jobs #tx pass in transaction - return jobs.MigrateVersionContents(versionID) + return jobs.MigrateVersionContents(f.db, versionID) }) if err != nil { storage.DefaultBackend().Delete(versionID.String()) @@ -118,7 +117,7 @@ func (f *FileSystem) CreateFileVersion(r Resource, versionID uuid.UUID) (io.Writ } // TODO: #jobs #tx pass in transaction - return jobs.MigrateVersionContents(versionID) + return jobs.MigrateVersionContents(db, versionID) }) if err != nil { storage.DefaultBackend().Delete(versionID.String()) diff --git a/server/internal/core/trash.go b/server/internal/core/trash.go index 73b87f5a..8e66ca15 100644 --- a/server/internal/core/trash.go +++ b/server/internal/core/trash.go @@ -176,8 +176,7 @@ func hardDeleteAllVersions(db db.TxHandler, q *goqu.SelectDataset, n interface { if _, err := db.Exec(query, args...); err != nil { return err } else { - // TODO: #jobs #tx - jobs.DeleteContents(versions) + jobs.DeleteContents(db, versions) } return nil } diff --git a/server/internal/db/migrations/data/014_jobs.sql b/server/internal/db/migrations/data/014_jobs.sql index 1ff14d9d..ba50d599 100644 --- a/server/internal/db/migrations/data/014_jobs.sql +++ b/server/internal/db/migrations/data/014_jobs.sql @@ -56,7 +56,7 @@ DROP INDEX next_available_job; DROP INDEX scheduled_jobs; -DROP TABLE sequence_jobs; +DROP TABLE job_sequences; DROP TABLE job_dependencies; diff --git a/server/internal/jobs/copy_contents.go b/server/internal/jobs/copy_contents.go index 6c091639..3bd345ca 100644 --- a/server/internal/jobs/copy_contents.go +++ b/server/internal/jobs/copy_contents.go @@ -5,9 +5,9 @@ import ( "errors" "codeberg.org/shroff/phylum/server/internal/db" + "codeberg.org/shroff/phylum/server/internal/steve" "codeberg.org/shroff/phylum/server/internal/storage" "github.com/google/uuid" - "github.com/riverqueue/river" ) type CopyContentsArgs struct { @@ -15,13 +15,14 @@ type CopyContentsArgs struct { Dest uuid.UUID `json:"dest"` } -func (CopyContentsArgs) Kind() string { return "copy_contents" } +func (a CopyContentsArgs) Kind() string { return "copy_contents" } +func (a CopyContentsArgs) Sequences() []string { return []string{a.Src.String(), a.Dest.String()} } type CopyContentsWorker struct { - river.WorkerDefaults[CopyContentsArgs] + steve.WorkerDefaults[CopyContentsArgs] } -func (w *CopyContentsWorker) Work(ctx context.Context, job *river.Job[CopyContentsArgs]) error { +func (w *CopyContentsWorker) Work(ctx context.Context, job *steve.Job[CopyContentsArgs]) error { return copyContents(ctx, job.Args.Src, job.Args.Dest) } @@ -35,16 +36,10 @@ func copyContents(ctx context.Context, src, dest uuid.UUID) error { } } -func CopyContents(args []CopyContentsArgs) error { - if len(args) == 0 { - return nil - } - params := make([]river.InsertManyParams, len(args)) +func CopyContents(db db.Handler, args []CopyContentsArgs) error { + params := make([]steve.JobArgs, len(args)) for i, a := range args { - params[i] = river.InsertManyParams{ - Args: a, - } + params[i] = a } - _, err := client.InsertMany(context.Background(), params) - return err + return steve.Get().InsertJobs(db, params) } diff --git a/server/internal/jobs/delete_contents.go b/server/internal/jobs/delete_contents.go index 920753b6..240bf4c2 100644 --- a/server/internal/jobs/delete_contents.go +++ b/server/internal/jobs/delete_contents.go @@ -3,8 +3,9 @@ package jobs import ( "context" + "codeberg.org/shroff/phylum/server/internal/db" + "codeberg.org/shroff/phylum/server/internal/steve" "codeberg.org/shroff/phylum/server/internal/storage" - "github.com/riverqueue/river" ) type DeleteContentsArgs struct { @@ -12,13 +13,14 @@ type DeleteContentsArgs struct { Storage string `json:"storage"` } -func (DeleteContentsArgs) Kind() string { return "delete_contents" } +func (a DeleteContentsArgs) Kind() string { return "delete_contents" } +func (a DeleteContentsArgs) Sequences() []string { return []string{a.Name} } type DeleteContentsWorker struct { - river.WorkerDefaults[DeleteContentsArgs] + steve.WorkerDefaults[DeleteContentsArgs] } -func (w *DeleteContentsWorker) Work(ctx context.Context, job *river.Job[DeleteContentsArgs]) error { +func (w *DeleteContentsWorker) Work(ctx context.Context, job *steve.Job[DeleteContentsArgs]) error { if b, err := storage.GetBackend(job.Args.Storage); err != nil { return err } else { @@ -26,16 +28,10 @@ func (w *DeleteContentsWorker) Work(ctx context.Context, job *river.Job[DeleteCo } } -func DeleteContents(args []DeleteContentsArgs) error { - if len(args) == 0 { - return nil - } - params := make([]river.InsertManyParams, len(args)) +func DeleteContents(db db.Handler, args []DeleteContentsArgs) error { + params := make([]steve.JobArgs, len(args)) for i, a := range args { - params[i] = river.InsertManyParams{ - Args: a, - } + params[i] = a } - _, err := client.InsertMany(context.Background(), params) - return err + return steve.Get().InsertJobs(db, params) } diff --git a/server/internal/jobs/jobs.go b/server/internal/jobs/jobs.go deleted file mode 100644 index af8ca86a..00000000 --- a/server/internal/jobs/jobs.go +++ /dev/null @@ -1,62 +0,0 @@ -package jobs - -import ( - "context" - "errors" - "time" - - "github.com/jackc/pgx/v5" - "github.com/jackc/pgx/v5/pgxpool" - "github.com/riverqueue/river" - "github.com/riverqueue/river/riverdriver/riverpgxv5" - "github.com/riverqueue/river/rivermigrate" - "github.com/sirupsen/logrus" -) - -// var client river.Client - -var client *river.Client[pgx.Tx] - -func Initialize(ctx context.Context, pool *pgxpool.Pool) error { - logrus.Info("Initializing Jobs") - if client != nil { - return errors.New("jobs already initialized") - } - - if m, err := rivermigrate.New(riverpgxv5.New(pool), nil); err != nil { - return err - } else if _, err := m.Migrate(ctx, rivermigrate.DirectionUp, nil); err != nil { - return err - } - - workers := river.NewWorkers() - river.AddWorker(workers, &MigrateWorker{}) - river.AddWorker(workers, &CopyContentsWorker{}) - river.AddWorker(workers, &DeleteContentsWorker{}) - - if c, err := river.NewClient(riverpgxv5.New(pool), &river.Config{ - Queues: map[string]river.QueueConfig{ - river.QueueDefault: {MaxWorkers: 5}, - }, - Workers: workers, - }); err != nil { - return err - } else { - client = c - } - return client.Start(ctx) -} - -func Shutdown(ctx context.Context) { - logrus.Info("Stopping Jobs") - gracefulCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - if err := client.Stop(gracefulCtx); err != nil { - logrus.Info("No more Mr. Nice Guy") - hardStopCtx, cancel := context.WithTimeout(ctx, 2*time.Second) - defer cancel() - if err := client.StopAndCancel(hardStopCtx); err != nil { - logrus.Warn("Could not stop and cancel jobs: " + err.Error()) - } - } -} diff --git a/server/internal/jobs/migrate.go b/server/internal/jobs/migrate.go index 451b9b2d..8613dc84 100644 --- a/server/internal/jobs/migrate.go +++ b/server/internal/jobs/migrate.go @@ -5,9 +5,9 @@ import ( "errors" "codeberg.org/shroff/phylum/server/internal/db" + "codeberg.org/shroff/phylum/server/internal/steve" "codeberg.org/shroff/phylum/server/internal/storage" "github.com/google/uuid" - "github.com/riverqueue/river" "github.com/sirupsen/logrus" ) @@ -15,19 +15,19 @@ type MigrateArgs struct { VersionID uuid.UUID `json:"version_id"` } -func (MigrateArgs) Kind() string { return "migrate" } +func (a MigrateArgs) Kind() string { return "migrate" } +func (a MigrateArgs) Sequences() []string { return []string{a.VersionID.String()} } type MigrateWorker struct { - river.WorkerDefaults[MigrateArgs] + steve.WorkerDefaults[MigrateArgs] } -func (w *MigrateWorker) Work(ctx context.Context, job *river.Job[MigrateArgs]) error { +func (w *MigrateWorker) Work(ctx context.Context, job *steve.Job[MigrateArgs]) error { return migrateVersionContents(ctx, job.Args.VersionID) } -func MigrateVersionContents(versionID uuid.UUID) error { - _, err := client.Insert(context.Background(), MigrateArgs{VersionID: versionID}, &river.InsertOpts{}) - return err +func MigrateVersionContents(db db.Handler, versionID uuid.UUID) error { + return steve.Get().InsertJob(db, MigrateArgs{VersionID: versionID}) } func migrateVersionContents(ctx context.Context, versionID uuid.UUID) error { diff --git a/server/internal/steve/steve.go b/server/internal/steve/steve.go index df97b956..5b4c6328 100644 --- a/server/internal/steve/steve.go +++ b/server/internal/steve/steve.go @@ -72,7 +72,7 @@ func RegisterWorker[T JobArgs](client *Client, worker Worker[T]) { client.factories[kind] = &typedWorkUnitFactory[T]{worker: worker} } -func (c *Client) Run(ctx context.Context, notifier pubsub.Notifier) error { +func (c *Client) Run(ctx context.Context) error { if c.config.Workers <= 0 { return errors.New("workers must be a positive integer") } @@ -97,8 +97,9 @@ func (c *Client) Run(ctx context.Context, notifier pubsub.Notifier) error { listenCtx, c.cancelListen = context.WithCancel(ctx) runCtx, c.cancelRun = context.WithCancel(ctx) workCtx, c.cancelWork = context.WithCancel(ctx) - go c.produceJobs(listenCtx, notifier) - return c.consumeJobs(runCtx, workCtx) + go c.produceJobs(listenCtx, db.Notifier()) + c.consumeJobs(runCtx, workCtx) + return ErrShutdown } func (c *Client) Shutdown(ctx context.Context) error { @@ -165,7 +166,7 @@ loop: subJobInserted.Cancel() } -func (c *Client) consumeJobs(runCtx context.Context, workCtx context.Context) error { +func (c *Client) consumeJobs(runCtx context.Context, workCtx context.Context) { loop: for { select { @@ -214,7 +215,6 @@ loop: } c.wg.Wait() close(c.done) - return ErrShutdown } func (c *Client) execute(ctx context.Context, info *JobInfo) error { @@ -329,6 +329,9 @@ func (c *Client) InsertJob(db db.Handler, args JobArgs) error { } func (c *Client) InsertJobs(db db.Handler, args []JobArgs) error { + if len(args) == 0 { + return nil + } params := make([]insertJobParams, len(args)) for i, a := range args { encodedArgs, err := json.Marshal(a)