[server] Move from river to in-house job queue

This commit is contained in:
Abhishek Shroff
2025-06-22 14:06:33 +05:30
parent 8cd0ba210a
commit 0c8b91f513
12 changed files with 113 additions and 127 deletions

View File

@@ -16,7 +16,9 @@ import (
"codeberg.org/shroff/phylum/server/internal/command/user"
"codeberg.org/shroff/phylum/server/internal/core"
"codeberg.org/shroff/phylum/server/internal/db"
"codeberg.org/shroff/phylum/server/internal/jobs"
"codeberg.org/shroff/phylum/server/internal/mail"
"codeberg.org/shroff/phylum/server/internal/steve"
"codeberg.org/shroff/phylum/server/internal/storage"
"github.com/google/uuid"
"github.com/knadh/koanf/parsers/yaml"
@@ -25,6 +27,8 @@ import (
"github.com/knadh/koanf/providers/posflag"
"github.com/knadh/koanf/providers/rawbytes"
"github.com/knadh/koanf/v2"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
@@ -62,11 +66,15 @@ func SetupCommand() {
}
}
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: "15:04:05.999"})
uuid.EnableRandPool()
cmd.PersistentPreRun = func(cmd *cobra.Command, args []string) {
if cmd.Name() == "serve" {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
logrus.SetLevel(logrus.InfoLevel)
} else {
zerolog.SetGlobalLevel(zerolog.WarnLevel)
logrus.SetLevel(logrus.WarnLevel)
}
@@ -108,6 +116,7 @@ func SetupCommand() {
k.UnmarshalWithConf("", &cfg, koanf.UnmarshalConf{Tag: "koanf"})
if cfg.Debug {
zerolog.SetGlobalLevel(zerolog.TraceLevel)
logrus.SetLevel(logrus.TraceLevel)
logrus.Debug("Running in debug mode")
cfg.Server.Debug = true
@@ -121,6 +130,11 @@ func SetupCommand() {
auth.Cfg = cfg.Auth
crypt.Cfg = cfg.Auth.Password.Crypt
if !isCmd(cmd, "schema") {
// This will initialize the db, which we don't want yet.
initializeSteve()
}
if err := storage.Initialize(db.Get(context.Background())); err != nil {
logrus.Fatal("Failed to initialize storage: " + err.Error())
}
@@ -142,3 +156,21 @@ func SetupCommand() {
cmd.SetCompletionCommandGroupID("misc")
cmd.Execute()
}
func isCmd(cmd *cobra.Command, s string) bool {
for c := cmd; c != nil; c = c.Parent() {
if cmd.Name() == s {
return true
}
}
return false
}
func initializeSteve() {
ctx := context.Background()
steve.Initialize(db.Get(ctx), log.Logger, steve.Config{Workers: 3})
client := steve.Get()
steve.RegisterWorker(client, &jobs.MigrateWorker{})
steve.RegisterWorker(client, &jobs.CopyContentsWorker{})
steve.RegisterWorker(client, &jobs.DeleteContentsWorker{})
}

View File

@@ -1,7 +1,6 @@
package fs
import (
"context"
"errors"
"fmt"
"io"
@@ -11,8 +10,6 @@ import (
"codeberg.org/shroff/phylum/server/internal/command/common"
"codeberg.org/shroff/phylum/server/internal/core"
"codeberg.org/shroff/phylum/server/internal/db"
"codeberg.org/shroff/phylum/server/internal/jobs"
"github.com/google/uuid"
"github.com/spf13/cobra"
)
@@ -63,7 +60,6 @@ func setupImportCommand() *cobra.Command {
}
if stat.IsDir() {
jobs.Initialize(context.Background(), db.Pool())
dirFS := os.DirFS(importPath)
err = iofs.WalkDir(dirFS, ".", func(p string, d iofs.DirEntry, err error) error {
@@ -86,7 +82,6 @@ func setupImportCommand() *cobra.Command {
})
//TODO #jobs. We don't need to run or wait for all jobs
fmt.Println("Waiting for jobs to finish")
jobs.Shutdown(context.Background())
} else {
err = copyContents(f, importPath, res)
}

View File

@@ -2,19 +2,22 @@ package serve
import (
"context"
"errors"
"net/http"
"os"
"os/signal"
"path"
"strconv"
"strings"
"sync"
"syscall"
"time"
"codeberg.org/shroff/phylum/server/internal/api/publink"
apiv1 "codeberg.org/shroff/phylum/server/internal/api/v1"
"codeberg.org/shroff/phylum/server/internal/api/webdav"
"codeberg.org/shroff/phylum/server/internal/core"
"codeberg.org/shroff/phylum/server/internal/db"
"codeberg.org/shroff/phylum/server/internal/jobs"
"github.com/fvbock/endless"
"codeberg.org/shroff/phylum/server/internal/steve"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/sirupsen/logrus"
@@ -59,21 +62,47 @@ func SetupCommand() *cobra.Command {
setupWebApp(engine, "web")
err := jobs.Initialize(context.Background(), db.Pool())
if err != nil {
logrus.Fatal("Failed to initialize jobs: " + err.Error())
}
setupTrashCompactor()
listen := Cfg.Host + ":" + strconv.Itoa(Cfg.Port)
server := endless.NewServer(listen, engine)
server.BeforeBegin = func(addr string) {
logrus.Info("Listening on " + addr)
server := http.Server{
Addr: listen,
Handler: engine,
ReadHeaderTimeout: 10 * time.Second,
}
if err := server.ListenAndServe(); err != nil {
logrus.Fatal(err.Error())
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT)
closeWG := &sync.WaitGroup{}
closeWG.Add(1)
go func() {
defer closeWG.Done()
if err := server.ListenAndServe(); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
logrus.Fatal("Failed to start server: " + err.Error())
}
}
}()
closeWG.Add(1)
go func() {
defer closeWG.Done()
if err := steve.Get().Run(context.Background()); err != nil {
if !errors.Is(err, http.ErrServerClosed) {
logrus.Fatal("Failed to start jobs client: " + err.Error())
}
}
}()
<-sigChan
logrus.Info("Shutting Down")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go server.Shutdown(ctx)
go steve.Get().Shutdown(ctx)
closeWG.Wait()
}
return cmd

View File

@@ -252,7 +252,7 @@ func (f txFileSystem) Copy(src Resource, target string, requestedID uuid.UUID, r
}
}
if err := jobs.CopyContents(copyContents); err != nil {
if err := jobs.CopyContents(f.db, copyContents); err != nil {
return Resource{}, false, err
}

View File

@@ -82,8 +82,7 @@ func (f *FileSystem) CreateFileByPath(path string, requestedID, versionID uuid.U
return err
}
// TODO: #jobs #tx pass in transaction
return jobs.MigrateVersionContents(versionID)
return jobs.MigrateVersionContents(f.db, versionID)
})
if err != nil {
storage.DefaultBackend().Delete(versionID.String())
@@ -118,7 +117,7 @@ func (f *FileSystem) CreateFileVersion(r Resource, versionID uuid.UUID) (io.Writ
}
// TODO: #jobs #tx pass in transaction
return jobs.MigrateVersionContents(versionID)
return jobs.MigrateVersionContents(db, versionID)
})
if err != nil {
storage.DefaultBackend().Delete(versionID.String())

View File

@@ -176,8 +176,7 @@ func hardDeleteAllVersions(db db.TxHandler, q *goqu.SelectDataset, n interface {
if _, err := db.Exec(query, args...); err != nil {
return err
} else {
// TODO: #jobs #tx
jobs.DeleteContents(versions)
jobs.DeleteContents(db, versions)
}
return nil
}

View File

@@ -56,7 +56,7 @@ DROP INDEX next_available_job;
DROP INDEX scheduled_jobs;
DROP TABLE sequence_jobs;
DROP TABLE job_sequences;
DROP TABLE job_dependencies;

View File

@@ -5,9 +5,9 @@ import (
"errors"
"codeberg.org/shroff/phylum/server/internal/db"
"codeberg.org/shroff/phylum/server/internal/steve"
"codeberg.org/shroff/phylum/server/internal/storage"
"github.com/google/uuid"
"github.com/riverqueue/river"
)
type CopyContentsArgs struct {
@@ -15,13 +15,14 @@ type CopyContentsArgs struct {
Dest uuid.UUID `json:"dest"`
}
func (CopyContentsArgs) Kind() string { return "copy_contents" }
func (a CopyContentsArgs) Kind() string { return "copy_contents" }
func (a CopyContentsArgs) Sequences() []string { return []string{a.Src.String(), a.Dest.String()} }
type CopyContentsWorker struct {
river.WorkerDefaults[CopyContentsArgs]
steve.WorkerDefaults[CopyContentsArgs]
}
func (w *CopyContentsWorker) Work(ctx context.Context, job *river.Job[CopyContentsArgs]) error {
func (w *CopyContentsWorker) Work(ctx context.Context, job *steve.Job[CopyContentsArgs]) error {
return copyContents(ctx, job.Args.Src, job.Args.Dest)
}
@@ -35,16 +36,10 @@ func copyContents(ctx context.Context, src, dest uuid.UUID) error {
}
}
func CopyContents(args []CopyContentsArgs) error {
if len(args) == 0 {
return nil
}
params := make([]river.InsertManyParams, len(args))
func CopyContents(db db.Handler, args []CopyContentsArgs) error {
params := make([]steve.JobArgs, len(args))
for i, a := range args {
params[i] = river.InsertManyParams{
Args: a,
}
params[i] = a
}
_, err := client.InsertMany(context.Background(), params)
return err
return steve.Get().InsertJobs(db, params)
}

View File

@@ -3,8 +3,9 @@ package jobs
import (
"context"
"codeberg.org/shroff/phylum/server/internal/db"
"codeberg.org/shroff/phylum/server/internal/steve"
"codeberg.org/shroff/phylum/server/internal/storage"
"github.com/riverqueue/river"
)
type DeleteContentsArgs struct {
@@ -12,13 +13,14 @@ type DeleteContentsArgs struct {
Storage string `json:"storage"`
}
func (DeleteContentsArgs) Kind() string { return "delete_contents" }
func (a DeleteContentsArgs) Kind() string { return "delete_contents" }
func (a DeleteContentsArgs) Sequences() []string { return []string{a.Name} }
type DeleteContentsWorker struct {
river.WorkerDefaults[DeleteContentsArgs]
steve.WorkerDefaults[DeleteContentsArgs]
}
func (w *DeleteContentsWorker) Work(ctx context.Context, job *river.Job[DeleteContentsArgs]) error {
func (w *DeleteContentsWorker) Work(ctx context.Context, job *steve.Job[DeleteContentsArgs]) error {
if b, err := storage.GetBackend(job.Args.Storage); err != nil {
return err
} else {
@@ -26,16 +28,10 @@ func (w *DeleteContentsWorker) Work(ctx context.Context, job *river.Job[DeleteCo
}
}
func DeleteContents(args []DeleteContentsArgs) error {
if len(args) == 0 {
return nil
}
params := make([]river.InsertManyParams, len(args))
func DeleteContents(db db.Handler, args []DeleteContentsArgs) error {
params := make([]steve.JobArgs, len(args))
for i, a := range args {
params[i] = river.InsertManyParams{
Args: a,
}
params[i] = a
}
_, err := client.InsertMany(context.Background(), params)
return err
return steve.Get().InsertJobs(db, params)
}

View File

@@ -1,62 +0,0 @@
package jobs
import (
"context"
"errors"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/rivermigrate"
"github.com/sirupsen/logrus"
)
// var client river.Client
var client *river.Client[pgx.Tx]
func Initialize(ctx context.Context, pool *pgxpool.Pool) error {
logrus.Info("Initializing Jobs")
if client != nil {
return errors.New("jobs already initialized")
}
if m, err := rivermigrate.New(riverpgxv5.New(pool), nil); err != nil {
return err
} else if _, err := m.Migrate(ctx, rivermigrate.DirectionUp, nil); err != nil {
return err
}
workers := river.NewWorkers()
river.AddWorker(workers, &MigrateWorker{})
river.AddWorker(workers, &CopyContentsWorker{})
river.AddWorker(workers, &DeleteContentsWorker{})
if c, err := river.NewClient(riverpgxv5.New(pool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 5},
},
Workers: workers,
}); err != nil {
return err
} else {
client = c
}
return client.Start(ctx)
}
func Shutdown(ctx context.Context) {
logrus.Info("Stopping Jobs")
gracefulCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := client.Stop(gracefulCtx); err != nil {
logrus.Info("No more Mr. Nice Guy")
hardStopCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
if err := client.StopAndCancel(hardStopCtx); err != nil {
logrus.Warn("Could not stop and cancel jobs: " + err.Error())
}
}
}

View File

@@ -5,9 +5,9 @@ import (
"errors"
"codeberg.org/shroff/phylum/server/internal/db"
"codeberg.org/shroff/phylum/server/internal/steve"
"codeberg.org/shroff/phylum/server/internal/storage"
"github.com/google/uuid"
"github.com/riverqueue/river"
"github.com/sirupsen/logrus"
)
@@ -15,19 +15,19 @@ type MigrateArgs struct {
VersionID uuid.UUID `json:"version_id"`
}
func (MigrateArgs) Kind() string { return "migrate" }
func (a MigrateArgs) Kind() string { return "migrate" }
func (a MigrateArgs) Sequences() []string { return []string{a.VersionID.String()} }
type MigrateWorker struct {
river.WorkerDefaults[MigrateArgs]
steve.WorkerDefaults[MigrateArgs]
}
func (w *MigrateWorker) Work(ctx context.Context, job *river.Job[MigrateArgs]) error {
func (w *MigrateWorker) Work(ctx context.Context, job *steve.Job[MigrateArgs]) error {
return migrateVersionContents(ctx, job.Args.VersionID)
}
func MigrateVersionContents(versionID uuid.UUID) error {
_, err := client.Insert(context.Background(), MigrateArgs{VersionID: versionID}, &river.InsertOpts{})
return err
func MigrateVersionContents(db db.Handler, versionID uuid.UUID) error {
return steve.Get().InsertJob(db, MigrateArgs{VersionID: versionID})
}
func migrateVersionContents(ctx context.Context, versionID uuid.UUID) error {

View File

@@ -72,7 +72,7 @@ func RegisterWorker[T JobArgs](client *Client, worker Worker[T]) {
client.factories[kind] = &typedWorkUnitFactory[T]{worker: worker}
}
func (c *Client) Run(ctx context.Context, notifier pubsub.Notifier) error {
func (c *Client) Run(ctx context.Context) error {
if c.config.Workers <= 0 {
return errors.New("workers must be a positive integer")
}
@@ -97,8 +97,9 @@ func (c *Client) Run(ctx context.Context, notifier pubsub.Notifier) error {
listenCtx, c.cancelListen = context.WithCancel(ctx)
runCtx, c.cancelRun = context.WithCancel(ctx)
workCtx, c.cancelWork = context.WithCancel(ctx)
go c.produceJobs(listenCtx, notifier)
return c.consumeJobs(runCtx, workCtx)
go c.produceJobs(listenCtx, db.Notifier())
c.consumeJobs(runCtx, workCtx)
return ErrShutdown
}
func (c *Client) Shutdown(ctx context.Context) error {
@@ -165,7 +166,7 @@ loop:
subJobInserted.Cancel()
}
func (c *Client) consumeJobs(runCtx context.Context, workCtx context.Context) error {
func (c *Client) consumeJobs(runCtx context.Context, workCtx context.Context) {
loop:
for {
select {
@@ -214,7 +215,6 @@ loop:
}
c.wg.Wait()
close(c.done)
return ErrShutdown
}
func (c *Client) execute(ctx context.Context, info *JobInfo) error {
@@ -329,6 +329,9 @@ func (c *Client) InsertJob(db db.Handler, args JobArgs) error {
}
func (c *Client) InsertJobs(db db.Handler, args []JobArgs) error {
if len(args) == 0 {
return nil
}
params := make([]insertJobParams, len(args))
for i, a := range args {
encodedArgs, err := json.Marshal(a)