diff --git a/server/internal/command/admin/schema/cmd.go b/server/internal/command/admin/schema/cmd.go index 15eac4a0..e4c973ab 100644 --- a/server/internal/command/admin/schema/cmd.go +++ b/server/internal/command/admin/schema/cmd.go @@ -7,9 +7,13 @@ import ( "strconv" "codeberg.org/shroff/phylum/server/internal/db" + "github.com/rs/zerolog" "github.com/spf13/cobra" ) +var DBConfig db.Config +var Logger zerolog.Logger + func SetupCommand() *cobra.Command { cmd := &cobra.Command{ Use: "schema", @@ -27,14 +31,16 @@ func setupResetCommand() *cobra.Command { Use: "reset", Short: "Reset Schema", Run: func(cmd *cobra.Command, args []string) { - db.Cfg.NoMigrate = true - if err := db.DeleteSchema(context.Background()); err != nil { - fmt.Println("unable to delete database schema: " + err.Error()) - os.Exit(1) - } - if err := db.Migrate(context.Background(), -1); err != nil { - fmt.Println("unable to migrate database schema: " + err.Error()) + ctx := context.Background() + if pool, err := db.Connect(ctx, DBConfig, Logger); err != nil { + fmt.Println(err.Error()) os.Exit(1) + } else { + if err := db.ResetSchema(context.Background(), pool); err != nil { + fmt.Println("unable to delete database schema: " + err.Error()) + os.Exit(1) + } + fmt.Println("Database schema successfully reset") } }, } @@ -44,20 +50,36 @@ func setupMigrateCommand() *cobra.Command { return &cobra.Command{ Use: "migrate", Short: "Migrate Schema", - Args: cobra.ExactArgs(1), + Args: cobra.MaximumNArgs(1), Run: func(cmd *cobra.Command, args []string) { ctx := context.Background() - if args[0] == "auto" || args[0] == "latest" { - db.Cfg.NoMigrate = false - db.Get(ctx) + if pool, err := db.Connect(ctx, DBConfig, Logger); err != nil { + fmt.Println(err.Error()) + os.Exit(1) + } else if s, err := db.GetSchemaInfo(ctx, pool); err != nil { + fmt.Println(err.Error()) + os.Exit(1) } else { - db.Cfg.NoMigrate = true - if v, err := strconv.Atoi(args[0]); err != nil { - fmt.Println(err.Error()) - os.Exit(3) - } else if err := db.Migrate(ctx, v); err != nil { - fmt.Println(err.Error()) - os.Exit(4) + v := s.LatestVersion + if len(args) == 1 { + version, err := strconv.Atoi(args[0]) + if err != nil { + fmt.Println("failed to parse target version: " + err.Error()) + os.Exit(1) + } + v = int32(version) + } + if s.CurrentVersion == v { + if len(args) == 1 { + fmt.Println("Database schema is already at the requested version") + } else { + fmt.Println("Database schema is up-to-date") + } + } else if err := s.MigrateTo(ctx, v); err != nil { + fmt.Println("failed to perform migration: " + err.Error()) + os.Exit(1) + } else { + fmt.Printf("Database schema successfully migrated to version %d\n", v) } } }, diff --git a/server/internal/command/command.go b/server/internal/command/command.go index 87c48da8..f2ddbf2d 100644 --- a/server/internal/command/command.go +++ b/server/internal/command/command.go @@ -10,6 +10,7 @@ import ( "codeberg.org/shroff/phylum/server/internal/auth" "codeberg.org/shroff/phylum/server/internal/command/admin" + "codeberg.org/shroff/phylum/server/internal/command/admin/schema" "codeberg.org/shroff/phylum/server/internal/command/fs" "codeberg.org/shroff/phylum/server/internal/command/serve" "codeberg.org/shroff/phylum/server/internal/command/user" @@ -17,6 +18,7 @@ import ( "codeberg.org/shroff/phylum/server/internal/db" "codeberg.org/shroff/phylum/server/internal/jobs" "codeberg.org/shroff/phylum/server/internal/mail" + "codeberg.org/shroff/phylum/server/internal/pubsub" "codeberg.org/shroff/phylum/server/internal/steve" "codeberg.org/shroff/phylum/server/internal/storage" "github.com/google/uuid" @@ -41,7 +43,8 @@ func SetupCommand() { Version: "0.4.0", } flags := cmd.PersistentFlags() - log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: "15:04:05.999"}) + logger := log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: "15:04:05.999"}) + log.Logger = logger // Flags only. Not part of config file flags.StringP("workdir", "W", "", "Working Directory") @@ -116,19 +119,29 @@ func SetupCommand() { cfg.Server.Debug = true } - db.Cfg = cfg.DB + if isCmd(cmd, "schema") { + schema.DBConfig = cfg.DB + // No need to do further setup + return + } + + ctx := context.Background() + if pool, err := db.Initialize(ctx, cfg.DB, logger); err != nil { + log.Fatal().Err(err).Msg("failed to initialize db pool") + } else { + pubsub.Initialize(ctx, pool, logger) + + client := steve.Initialize(db.Get(context.Background()), cfg.Steve, logger) + steve.RegisterWorker(client, &jobs.MigrateWorker{}) + steve.RegisterWorker(client, &jobs.CopyContentsWorker{}) + steve.RegisterWorker(client, &jobs.DeleteContentsWorker{}) + } storage.Cfg = cfg.Storage serve.Cfg = cfg.Server mail.Cfg = cfg.Mail core.Cfg = cfg.Core - steve.Cfg = cfg.Steve - auth.Init(cfg.Auth, log.Logger) - - if !isCmd(cmd, "schema") { - // This will initialize the db, which we don't want yet. - initializeSteve() - } + auth.Init(cfg.Auth, logger) if err := storage.Initialize(db.Get(context.Background())); err != nil { log.Fatal().Err(err).Msg("failed to initialize storage") @@ -153,18 +166,9 @@ func SetupCommand() { func isCmd(cmd *cobra.Command, s string) bool { for c := cmd; c != nil; c = c.Parent() { - if cmd.Name() == s { + if c.Name() == s { return true } } return false } - -func initializeSteve() { - ctx := context.Background() - steve.Initialize(db.Get(ctx), log.Logger) - client := steve.Get() - steve.RegisterWorker(client, &jobs.MigrateWorker{}) - steve.RegisterWorker(client, &jobs.CopyContentsWorker{}) - steve.RegisterWorker(client, &jobs.DeleteContentsWorker{}) -} diff --git a/server/internal/db/db.go b/server/internal/db/db.go index 0e3200de..8f12cefc 100644 --- a/server/internal/db/db.go +++ b/server/internal/db/db.go @@ -6,26 +6,41 @@ import ( "strconv" "strings" - "codeberg.org/shroff/phylum/server/internal/pubsub" "github.com/jackc/pgx/v5/pgxpool" - "github.com/rs/zerolog/log" + "github.com/rs/zerolog" ) -var Cfg Config - var pool *pgxpool.Pool -var notifier pubsub.Notifier -func Get(ctx context.Context) Handler { - if pool == nil { - if err := initPool(context.Background()); err != nil { - log.Fatal().Err(err).Msg("failed to initialize db pool") - } else if n, err := pubsub.Start(pool, context.Background()); err != nil { - log.Fatal().Err(err).Msg("failed to initialize pubsub client") +func Initialize(ctx context.Context, cfg Config, log zerolog.Logger) (*pgxpool.Pool, error) { + if pool != nil { + return nil, errors.New("db pool already initialized") + } + + if p, err := Connect(ctx, cfg, log); err != nil { + return nil, errors.New("failed to initialize db pool: " + err.Error()) + } else { + pool = p + } + + // Migrate schema + if s, err := GetSchemaInfo(ctx, pool); err != nil { + return nil, errors.New("failed to get schema info: " + err.Error()) + } else if s.CurrentVersion != s.LatestVersion { + if cfg.NoMigrate { + log.Warn().Int32("current", s.CurrentVersion).Int32("latest", s.LatestVersion).Msg("Database schema is not at current version") } else { - notifier = n + log.Info().Int32("current", s.CurrentVersion).Int32("latest", s.LatestVersion).Msg("Performing schema migration") + if err := s.MigrateTo(ctx, int32(s.LatestVersion)); err != nil { + return nil, errors.New("failed to perform schema migration") + } } } + + return pool, nil +} + +func Get(ctx context.Context) Handler { return handler{ ctx: ctx, db: pool, @@ -36,16 +51,6 @@ func RunInTx(ctx context.Context, fn func(TxHandler) error) error { return Get(ctx).RunInTx(fn) } -// Pool returns the pgx pool -// Only to be used for jobs initialization -func Pool() *pgxpool.Pool { - return pool -} - -func Notifier() pubsub.Notifier { - return notifier -} - func Close() { if pool != nil { pool.Close() @@ -53,37 +58,36 @@ func Close() { } } -func initPool(ctx context.Context) error { +func Connect(ctx context.Context, cfg Config, log zerolog.Logger) (*pgxpool.Pool, error) { var dsn strings.Builder - dsn.WriteString("host=" + Cfg.Host) - dsn.WriteString(" port=" + strconv.Itoa(Cfg.Port)) - dsn.WriteString(" dbname=" + Cfg.Name) - dsn.WriteString(" user=" + Cfg.User) - if Cfg.Password != "" { - dsn.WriteString(" password=" + Cfg.Password) + dsn.WriteString("host=" + cfg.Host) + dsn.WriteString(" port=" + strconv.Itoa(cfg.Port)) + dsn.WriteString(" dbname=" + cfg.Name) + dsn.WriteString(" user=" + cfg.User) + if cfg.Password != "" { + dsn.WriteString(" password=" + cfg.Password) } config, err := pgxpool.ParseConfig(dsn.String()) if err != nil { - return errors.New("Unable to parse DSN: " + err.Error()) + return nil, errors.New("failed to parse DSN: " + err.Error()) } - if Cfg.Trace { - config.ConnConfig.Tracer = tracer{} + if cfg.Trace { + config.ConnConfig.Tracer = tracer{ + log: log.With().Str("c", "db.trace").Logger(), + } } - pool, err = pgxpool.NewWithConfig(ctx, config) + pool, err := pgxpool.NewWithConfig(ctx, config) if err != nil { - return err + return nil, errors.New("failed to connect to database: " + err.Error()) } - log.Debug().Str("host", config.ConnConfig.Host+":"+strconv.Itoa(int(config.ConnConfig.Port))). + log.Debug(). + Str("host", config.ConnConfig.Host+":"+strconv.Itoa(int(config.ConnConfig.Port))). Str("db", config.ConnConfig.Database). Msg("Connected to database") - if err := checkVersion(ctx, Cfg.NoMigrate); err != nil { - return err - } - - return nil + return pool, nil } diff --git a/server/internal/db/migrations/data/001_resources.sql b/server/internal/db/migrations/001_resources.sql similarity index 100% rename from server/internal/db/migrations/data/001_resources.sql rename to server/internal/db/migrations/001_resources.sql diff --git a/server/internal/db/migrations/data/002_users.sql b/server/internal/db/migrations/002_users.sql similarity index 100% rename from server/internal/db/migrations/data/002_users.sql rename to server/internal/db/migrations/002_users.sql diff --git a/server/internal/db/migrations/data/003_storage_backends.sql b/server/internal/db/migrations/003_storage_backends.sql similarity index 100% rename from server/internal/db/migrations/data/003_storage_backends.sql rename to server/internal/db/migrations/003_storage_backends.sql diff --git a/server/internal/db/migrations/data/004_api_keys.sql b/server/internal/db/migrations/004_api_keys.sql similarity index 100% rename from server/internal/db/migrations/data/004_api_keys.sql rename to server/internal/db/migrations/004_api_keys.sql diff --git a/server/internal/db/migrations/data/005_publink.sql b/server/internal/db/migrations/005_publink.sql similarity index 100% rename from server/internal/db/migrations/data/005_publink.sql rename to server/internal/db/migrations/005_publink.sql diff --git a/server/internal/db/migrations/data/006_bookmarks.sql b/server/internal/db/migrations/006_bookmarks.sql similarity index 100% rename from server/internal/db/migrations/data/006_bookmarks.sql rename to server/internal/db/migrations/006_bookmarks.sql diff --git a/server/internal/db/migrations/data/007_search.sql b/server/internal/db/migrations/007_search.sql similarity index 100% rename from server/internal/db/migrations/data/007_search.sql rename to server/internal/db/migrations/007_search.sql diff --git a/server/internal/db/migrations/data/008_trash.sql b/server/internal/db/migrations/008_trash.sql similarity index 100% rename from server/internal/db/migrations/data/008_trash.sql rename to server/internal/db/migrations/008_trash.sql diff --git a/server/internal/db/migrations/data/009_ignore_case.sql b/server/internal/db/migrations/009_ignore_case.sql similarity index 100% rename from server/internal/db/migrations/data/009_ignore_case.sql rename to server/internal/db/migrations/009_ignore_case.sql diff --git a/server/internal/db/migrations/data/010_reset_tokens.sql b/server/internal/db/migrations/010_reset_tokens.sql similarity index 100% rename from server/internal/db/migrations/data/010_reset_tokens.sql rename to server/internal/db/migrations/010_reset_tokens.sql diff --git a/server/internal/db/migrations/data/011_version_storage.sql b/server/internal/db/migrations/011_version_storage.sql similarity index 100% rename from server/internal/db/migrations/data/011_version_storage.sql rename to server/internal/db/migrations/011_version_storage.sql diff --git a/server/internal/db/migrations/data/012_storage_backend_triggers.sql b/server/internal/db/migrations/012_storage_backend_triggers.sql similarity index 100% rename from server/internal/db/migrations/data/012_storage_backend_triggers.sql rename to server/internal/db/migrations/012_storage_backend_triggers.sql diff --git a/server/internal/db/migrations/data/013_storage_mounts.sql b/server/internal/db/migrations/013_storage_mounts.sql similarity index 100% rename from server/internal/db/migrations/data/013_storage_mounts.sql rename to server/internal/db/migrations/013_storage_mounts.sql diff --git a/server/internal/db/migrations/data/014_jobs.sql b/server/internal/db/migrations/014_jobs.sql similarity index 100% rename from server/internal/db/migrations/data/014_jobs.sql rename to server/internal/db/migrations/014_jobs.sql diff --git a/server/internal/db/migrations/data/015_openid_sessions.sql b/server/internal/db/migrations/015_openid_sessions.sql similarity index 100% rename from server/internal/db/migrations/data/015_openid_sessions.sql rename to server/internal/db/migrations/015_openid_sessions.sql diff --git a/server/internal/db/migrations/migrations.go b/server/internal/db/migrations/migrations.go deleted file mode 100644 index a9874725..00000000 --- a/server/internal/db/migrations/migrations.go +++ /dev/null @@ -1,70 +0,0 @@ -package migrations - -import ( - "context" - "embed" - "io/fs" - - "github.com/jackc/pgx/v5" - "github.com/jackc/tern/v2/migrate" -) - -const versionTable = "db_version" - -type Migrator struct { - migrator *migrate.Migrator -} - -//go:embed data/*.sql -var migrationFiles embed.FS - -func NewMigrator(ctx context.Context, conn *pgx.Conn) (Migrator, error) { - migrator, err := migrate.NewMigratorEx( - ctx, conn, versionTable, - &migrate.MigratorOptions{ - DisableTx: false, - }) - if err != nil { - return Migrator{}, err - } - - migrationRoot, _ := fs.Sub(migrationFiles, "data") - - err = migrator.LoadMigrations(migrationRoot) - if err != nil { - return Migrator{}, err - } - - return Migrator{ - migrator: migrator, - }, nil -} - -func (m Migrator) GetCurrentVersion(ctx context.Context) (int, error) { - version, err := m.migrator.GetCurrentVersion(ctx) - if err != nil { - return 0, err - } - - return int(version), nil -} - -func (m Migrator) GetLatestVersion() int { - return len(m.migrator.Migrations) -} - -func (m Migrator) Migrations() []*migrate.Migration { - return m.migrator.Migrations -} - -// Migrate migrates the DB to the most recent version of the schema. -func (m Migrator) Migrate(ctx context.Context) error { - err := m.migrator.Migrate(ctx) - return err -} - -// MigrateTo migrates to a specific version of the schema. Use '0' to undo all migrations. -func (m Migrator) MigrateTo(ctx context.Context, ver int32) error { - err := m.migrator.MigrateTo(ctx, ver) - return err -} diff --git a/server/internal/db/schema.go b/server/internal/db/schema.go index 34243b38..e1a08607 100644 --- a/server/internal/db/schema.go +++ b/server/internal/db/schema.go @@ -2,112 +2,94 @@ package db import ( "context" - "errors" - "fmt" + "embed" + "io/fs" - "codeberg.org/shroff/phylum/server/internal/db/migrations" - "github.com/sirupsen/logrus" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/jackc/tern/v2/migrate" ) -var ( - ErrMigrationTargetTooLow = errors.New("schema migration target cannot be less than 0") - ErrMigrationTargetTooHigh = errors.New("schema migration target cannot be greater than latest") - ErrMigrationNoAutoDowngrade = errors.New("will not auto-downgrade schema to prevent data loss") -) +const versionTable = "db_version" -func checkVersion(ctx context.Context, skipMigration bool) error { +//go:embed migrations/*.sql +var migrationFiles embed.FS + +type SchemaInfo struct { + pool *pgxpool.Pool + CurrentVersion int32 + LatestVersion int32 +} + +func GetSchemaInfo(ctx context.Context, pool *pgxpool.Pool) (SchemaInfo, error) { conn, err := pool.Acquire(ctx) + if err != nil { + return SchemaInfo{}, err + } + defer conn.Release() + + migrator, err := createMigrator(ctx, conn.Conn()) + if err != nil { + return SchemaInfo{}, err + } + + currentVersion, err := migrator.GetCurrentVersion(ctx) + if err != nil { + return SchemaInfo{}, err + } + + return SchemaInfo{ + pool: pool, + CurrentVersion: currentVersion, + LatestVersion: int32(len(migrator.Migrations)), + }, nil +} + +// MigrateTo migrates to a specific version of the schema. Use '0' to undo all migrations. +func (s SchemaInfo) MigrateTo(ctx context.Context, ver int32) error { + conn, err := s.pool.Acquire(ctx) if err != nil { return err } defer conn.Release() - - migrator, err := migrations.NewMigrator(ctx, conn.Conn()) - if err != nil { + if migrator, err := createMigrator(ctx, conn.Conn()); err != nil { return err + } else { + return migrator.MigrateTo(ctx, ver) } - currentSchemaVersion, err := migrator.GetCurrentVersion(ctx) - if err != nil { - return err - } - latestSchemaVersion := migrator.GetLatestVersion() - - // Nothing to do - if currentSchemaVersion == latestSchemaVersion { - return nil - } - - if skipMigration { - logrus.Warn(fmt.Sprintf("Database schema is not at current version: %d/%d", currentSchemaVersion, latestSchemaVersion)) - return nil - } - if currentSchemaVersion > latestSchemaVersion { - logrus.Warn(fmt.Sprintf("Not automatically downgrading schema from %d to %d", currentSchemaVersion, latestSchemaVersion)) - return nil - } - - logrus.Info(fmt.Sprintf("Migrating database from version %d to %d", currentSchemaVersion, latestSchemaVersion)) - err = migrator.MigrateTo(ctx, int32(latestSchemaVersion)) - if err != nil { - return err - } - return nil } -func Migrate(ctx context.Context, version int) error { - Get(ctx) // Initialize the pool, just in case - conn, err := pool.Acquire(ctx) +func createMigrator(ctx context.Context, conn *pgx.Conn) (*migrate.Migrator, error) { + m, err := migrate.NewMigratorEx(ctx, conn, versionTable, &migrate.MigratorOptions{DisableTx: false}) if err != nil { - return err - } - defer conn.Release() - - migrator, err := migrations.NewMigrator(ctx, conn.Conn()) - if err != nil { - return err - } - currentSchemaVersion, err := migrator.GetCurrentVersion(ctx) - if err != nil { - return err - } - latestSchemaVersion := migrator.GetLatestVersion() - if version < 0 { - version = latestSchemaVersion - } else if version == 0 { - logrus.Info("Deleting database schema") - return DeleteSchema(ctx) - } else if version > latestSchemaVersion { - return ErrMigrationTargetTooHigh + return nil, err } - logrus.Info(fmt.Sprintf("Migrating database from version %d to %d", currentSchemaVersion, version)) - err = migrator.MigrateTo(ctx, int32(version)) - if err != nil { - return err + migrationRoot, _ := fs.Sub(migrationFiles, "migrations") + if err := m.LoadMigrations(migrationRoot); err != nil { + return nil, err } - if version == latestSchemaVersion { - return nil - } - return nil + return m, nil } -func DeleteSchema(ctx context.Context) error { - h := Get(ctx) +func ResetSchema(ctx context.Context, pool *pgxpool.Pool) error { user := pool.Config().ConnConfig.User - return h.RunInTx(func(tx TxHandler) (err error) { - if _, err = tx.Exec("DROP SCHEMA public CASCADE"); err != nil { - return + return pgx.BeginFunc(ctx, pool, func(tx pgx.Tx) error { + if _, err := tx.Exec(ctx, "DROP SCHEMA public CASCADE"); err != nil { + return err } - if _, err = tx.Exec("CREATE SCHEMA public"); err != nil { - return + if _, err := tx.Exec(ctx, "CREATE SCHEMA public"); err != nil { + return err } - if _, err = tx.Exec("GRANT ALL ON SCHEMA public TO " + user); err != nil { - return + if _, err := tx.Exec(ctx, "GRANT ALL ON SCHEMA public TO "+user); err != nil { + return err } - if _, err = tx.Exec("GRANT ALL ON SCHEMA public TO public"); err != nil { - return + if _, err := tx.Exec(ctx, "GRANT ALL ON SCHEMA public TO public"); err != nil { + return err } - _, err = tx.Exec("COMMENT ON SCHEMA public IS 'standard public schema'") - return + if _, err := tx.Exec(ctx, "COMMENT ON SCHEMA public IS 'standard public schema'"); err != nil { + return err + } + return nil }) } diff --git a/server/internal/db/tracer.go b/server/internal/db/tracer.go index 880f7528..b86157ba 100644 --- a/server/internal/db/tracer.go +++ b/server/internal/db/tracer.go @@ -4,17 +4,18 @@ import ( "context" "github.com/jackc/pgx/v5" - "github.com/sirupsen/logrus" + "github.com/rs/zerolog" ) type tracer struct { + log zerolog.Logger } func (t tracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryStartData) context.Context { - logrus.Trace(data) + t.log.Trace().Any("data", data).Msg("query start") return ctx } -func (c tracer) TraceQueryEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryEndData) { - logrus.Trace(data) +func (t tracer) TraceQueryEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryEndData) { + t.log.Trace().Any("data", data).Msg("query end") } diff --git a/server/internal/pubsub/listener.go b/server/internal/pubsub/listener.go index bbb88066..21a83378 100644 --- a/server/internal/pubsub/listener.go +++ b/server/internal/pubsub/listener.go @@ -16,7 +16,7 @@ type listener struct { mu sync.Mutex } -func newListener(pool *pgxpool.Pool, ctx context.Context) (*listener, error) { +func newListener(ctx context.Context, pool *pgxpool.Pool) (*listener, error) { l := listener{ pool: pool, mu: sync.Mutex{}, diff --git a/server/internal/pubsub/notifier.go b/server/internal/pubsub/notifier.go index 71950d9b..49a7475f 100644 --- a/server/internal/pubsub/notifier.go +++ b/server/internal/pubsub/notifier.go @@ -3,14 +3,13 @@ package pubsub import ( "context" "errors" - "os" "slices" "sync" "time" "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" - "github.com/sirupsen/logrus" + "github.com/rs/zerolog" ) const waitTimeout = 30 * time.Second @@ -22,6 +21,7 @@ type Notifier interface { type notifier struct { mu sync.RWMutex + log zerolog.Logger listener *listener subscriptions map[string][]*subscription pendingOps []pendingOp @@ -41,14 +41,17 @@ const ( pendingOpTypeUnlisten = iota ) -func Start(pool *pgxpool.Pool, ctx context.Context) (Notifier, error) { - listener, err := newListener(pool, ctx) +var n Notifier + +func Initialize(ctx context.Context, pool *pgxpool.Pool, log zerolog.Logger) error { + listener, err := newListener(ctx, pool) if err != nil { - return nil, err + return err } notifier := notifier{ mu: sync.RWMutex{}, + log: log.With().Str("c", "notifier").Logger(), listener: listener, subscriptions: make(map[string][]*subscription), pendingOps: []pendingOp{}, @@ -61,10 +64,9 @@ func Start(pool *pgxpool.Pool, ctx context.Context) (Notifier, error) { notification, err := notifier.waitOnce(ctx) if err != nil { if err.Error() == "conn closed" { - logrus.Error("PGX Connection Closed. Exiting") - os.Exit(1) + notifier.log.Fatal().Msg("connection closed. Exiting") } - logrus.Error(err) + notifier.log.Warn().Err(err).Msg("error in notifier.waitOnce") } if notification != nil { notifier.notifyListeners(notification) @@ -72,7 +74,12 @@ func Start(pool *pgxpool.Pool, ctx context.Context) (Notifier, error) { } }() - return ¬ifier, nil + n = ¬ifier + return nil +} + +func Get() Notifier { + return n } func (n *notifier) Listen(channel string) Subscription { @@ -122,7 +129,7 @@ func (n *notifier) processPendingOps(ctx context.Context) { } if err != nil { - logrus.Warnf("error completing op %v on channel %s: %s", u.op, u.channelName, err.Error()) + n.log.Warn().Err(err).Str("channel", u.channelName).Uint8("op", u.op).Msg("error completing op") } u.callback() }() @@ -160,7 +167,7 @@ func (n *notifier) notifyListeners(notification *pgconn.Notification) { select { case sub.listenChan <- notification.Payload: default: - logrus.Warn("dropped notification due to full buffer", "payload", notification.Payload) + n.log.Warn().Str("channel", notification.Channel).Msg("dropped notification") } } } diff --git a/server/internal/steve/steve.go b/server/internal/steve/steve.go index 7387975e..6d799f05 100644 --- a/server/internal/steve/steve.go +++ b/server/internal/steve/steve.go @@ -20,11 +20,11 @@ const maxAttempts = 6 var ErrShutdown = errors.New("shutdown") var errWaiting = errors.New("waiting for jobs") -var Cfg Config var instance *Client type Client struct { db db.Handler + cfg Config logger zerolog.Logger // Config defaultTimeout time.Duration @@ -42,14 +42,16 @@ type Client struct { cancelWork context.CancelFunc } -func Initialize(db db.Handler, logger zerolog.Logger) { +func Initialize(db db.Handler, cfg Config, logger zerolog.Logger) *Client { if instance == nil { instance = &Client{ db: db, + cfg: cfg, logger: logger.With().Str("mod", "steve").Logger(), factories: make(map[string]workUnitFactory), } } + return instance } func Get() *Client { @@ -73,7 +75,7 @@ func RegisterWorker[T JobArgs](client *Client, worker Worker[T]) { } func (c *Client) Run(ctx context.Context) error { - if Cfg.Workers <= 0 { + if c.cfg.Workers <= 0 { return errors.New("workers must be a positive integer") } if err := resetRunningJobs(c.db); err != nil { @@ -86,18 +88,18 @@ func (c *Client) Run(ctx context.Context) error { return err } - c.availableWorkers = make(chan struct{}, Cfg.Workers) + c.availableWorkers = make(chan struct{}, c.cfg.Workers) c.jobAvailable = make(chan struct{}) - c.defaultTimeout = time.Duration(Cfg.Timeout) * time.Second + c.defaultTimeout = time.Duration(c.cfg.Timeout) * time.Second - c.logger.Info().Int("workers", Cfg.Workers).Msg("Starting") + c.logger.Info().Int("workers", c.cfg.Workers).Msg("Starting") c.done = make(chan struct{}) var listenCtx, runCtx, workCtx context.Context listenCtx, c.cancelListen = context.WithCancel(ctx) runCtx, c.cancelRun = context.WithCancel(ctx) workCtx, c.cancelWork = context.WithCancel(ctx) - go c.produceJobs(listenCtx, db.Notifier()) + go c.produceJobs(listenCtx, pubsub.Get()) c.consumeJobs(runCtx, workCtx) return ErrShutdown } diff --git a/server/internal/storage/storage.go b/server/internal/storage/storage.go index bdf6165a..77e0ed3f 100644 --- a/server/internal/storage/storage.go +++ b/server/internal/storage/storage.go @@ -8,6 +8,7 @@ import ( "strings" "codeberg.org/shroff/phylum/server/internal/db" + "codeberg.org/shroff/phylum/server/internal/pubsub" "github.com/jackc/pgx/v5" "github.com/rs/zerolog/log" ) @@ -141,7 +142,7 @@ func restoreBackends(db db.Handler) (map[string]Backend, error) { } func processBackendUpdates() { - sub := db.Notifier().Listen("backend_updates") + sub := pubsub.Get().Listen("backend_updates") for { p := <-sub.NotificationC() var c BackendConfig