[server] Overhaul db initialization

This commit is contained in:
Abhishek Shroff
2025-07-08 15:04:57 +05:30
parent 8069afb06b
commit 0ebdcdf874
25 changed files with 207 additions and 254 deletions

View File

@@ -7,9 +7,13 @@ import (
"strconv"
"codeberg.org/shroff/phylum/server/internal/db"
"github.com/rs/zerolog"
"github.com/spf13/cobra"
)
var DBConfig db.Config
var Logger zerolog.Logger
func SetupCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "schema",
@@ -27,14 +31,16 @@ func setupResetCommand() *cobra.Command {
Use: "reset",
Short: "Reset Schema",
Run: func(cmd *cobra.Command, args []string) {
db.Cfg.NoMigrate = true
if err := db.DeleteSchema(context.Background()); err != nil {
fmt.Println("unable to delete database schema: " + err.Error())
os.Exit(1)
}
if err := db.Migrate(context.Background(), -1); err != nil {
fmt.Println("unable to migrate database schema: " + err.Error())
ctx := context.Background()
if pool, err := db.Connect(ctx, DBConfig, Logger); err != nil {
fmt.Println(err.Error())
os.Exit(1)
} else {
if err := db.ResetSchema(context.Background(), pool); err != nil {
fmt.Println("unable to delete database schema: " + err.Error())
os.Exit(1)
}
fmt.Println("Database schema successfully reset")
}
},
}
@@ -44,20 +50,36 @@ func setupMigrateCommand() *cobra.Command {
return &cobra.Command{
Use: "migrate",
Short: "Migrate Schema",
Args: cobra.ExactArgs(1),
Args: cobra.MaximumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
ctx := context.Background()
if args[0] == "auto" || args[0] == "latest" {
db.Cfg.NoMigrate = false
db.Get(ctx)
if pool, err := db.Connect(ctx, DBConfig, Logger); err != nil {
fmt.Println(err.Error())
os.Exit(1)
} else if s, err := db.GetSchemaInfo(ctx, pool); err != nil {
fmt.Println(err.Error())
os.Exit(1)
} else {
db.Cfg.NoMigrate = true
if v, err := strconv.Atoi(args[0]); err != nil {
fmt.Println(err.Error())
os.Exit(3)
} else if err := db.Migrate(ctx, v); err != nil {
fmt.Println(err.Error())
os.Exit(4)
v := s.LatestVersion
if len(args) == 1 {
version, err := strconv.Atoi(args[0])
if err != nil {
fmt.Println("failed to parse target version: " + err.Error())
os.Exit(1)
}
v = int32(version)
}
if s.CurrentVersion == v {
if len(args) == 1 {
fmt.Println("Database schema is already at the requested version")
} else {
fmt.Println("Database schema is up-to-date")
}
} else if err := s.MigrateTo(ctx, v); err != nil {
fmt.Println("failed to perform migration: " + err.Error())
os.Exit(1)
} else {
fmt.Printf("Database schema successfully migrated to version %d\n", v)
}
}
},

View File

@@ -10,6 +10,7 @@ import (
"codeberg.org/shroff/phylum/server/internal/auth"
"codeberg.org/shroff/phylum/server/internal/command/admin"
"codeberg.org/shroff/phylum/server/internal/command/admin/schema"
"codeberg.org/shroff/phylum/server/internal/command/fs"
"codeberg.org/shroff/phylum/server/internal/command/serve"
"codeberg.org/shroff/phylum/server/internal/command/user"
@@ -17,6 +18,7 @@ import (
"codeberg.org/shroff/phylum/server/internal/db"
"codeberg.org/shroff/phylum/server/internal/jobs"
"codeberg.org/shroff/phylum/server/internal/mail"
"codeberg.org/shroff/phylum/server/internal/pubsub"
"codeberg.org/shroff/phylum/server/internal/steve"
"codeberg.org/shroff/phylum/server/internal/storage"
"github.com/google/uuid"
@@ -41,7 +43,8 @@ func SetupCommand() {
Version: "0.4.0",
}
flags := cmd.PersistentFlags()
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: "15:04:05.999"})
logger := log.Output(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: "15:04:05.999"})
log.Logger = logger
// Flags only. Not part of config file
flags.StringP("workdir", "W", "", "Working Directory")
@@ -116,19 +119,29 @@ func SetupCommand() {
cfg.Server.Debug = true
}
db.Cfg = cfg.DB
if isCmd(cmd, "schema") {
schema.DBConfig = cfg.DB
// No need to do further setup
return
}
ctx := context.Background()
if pool, err := db.Initialize(ctx, cfg.DB, logger); err != nil {
log.Fatal().Err(err).Msg("failed to initialize db pool")
} else {
pubsub.Initialize(ctx, pool, logger)
client := steve.Initialize(db.Get(context.Background()), cfg.Steve, logger)
steve.RegisterWorker(client, &jobs.MigrateWorker{})
steve.RegisterWorker(client, &jobs.CopyContentsWorker{})
steve.RegisterWorker(client, &jobs.DeleteContentsWorker{})
}
storage.Cfg = cfg.Storage
serve.Cfg = cfg.Server
mail.Cfg = cfg.Mail
core.Cfg = cfg.Core
steve.Cfg = cfg.Steve
auth.Init(cfg.Auth, log.Logger)
if !isCmd(cmd, "schema") {
// This will initialize the db, which we don't want yet.
initializeSteve()
}
auth.Init(cfg.Auth, logger)
if err := storage.Initialize(db.Get(context.Background())); err != nil {
log.Fatal().Err(err).Msg("failed to initialize storage")
@@ -153,18 +166,9 @@ func SetupCommand() {
func isCmd(cmd *cobra.Command, s string) bool {
for c := cmd; c != nil; c = c.Parent() {
if cmd.Name() == s {
if c.Name() == s {
return true
}
}
return false
}
func initializeSteve() {
ctx := context.Background()
steve.Initialize(db.Get(ctx), log.Logger)
client := steve.Get()
steve.RegisterWorker(client, &jobs.MigrateWorker{})
steve.RegisterWorker(client, &jobs.CopyContentsWorker{})
steve.RegisterWorker(client, &jobs.DeleteContentsWorker{})
}

View File

@@ -6,26 +6,41 @@ import (
"strconv"
"strings"
"codeberg.org/shroff/phylum/server/internal/pubsub"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rs/zerolog/log"
"github.com/rs/zerolog"
)
var Cfg Config
var pool *pgxpool.Pool
var notifier pubsub.Notifier
func Get(ctx context.Context) Handler {
if pool == nil {
if err := initPool(context.Background()); err != nil {
log.Fatal().Err(err).Msg("failed to initialize db pool")
} else if n, err := pubsub.Start(pool, context.Background()); err != nil {
log.Fatal().Err(err).Msg("failed to initialize pubsub client")
func Initialize(ctx context.Context, cfg Config, log zerolog.Logger) (*pgxpool.Pool, error) {
if pool != nil {
return nil, errors.New("db pool already initialized")
}
if p, err := Connect(ctx, cfg, log); err != nil {
return nil, errors.New("failed to initialize db pool: " + err.Error())
} else {
pool = p
}
// Migrate schema
if s, err := GetSchemaInfo(ctx, pool); err != nil {
return nil, errors.New("failed to get schema info: " + err.Error())
} else if s.CurrentVersion != s.LatestVersion {
if cfg.NoMigrate {
log.Warn().Int32("current", s.CurrentVersion).Int32("latest", s.LatestVersion).Msg("Database schema is not at current version")
} else {
notifier = n
log.Info().Int32("current", s.CurrentVersion).Int32("latest", s.LatestVersion).Msg("Performing schema migration")
if err := s.MigrateTo(ctx, int32(s.LatestVersion)); err != nil {
return nil, errors.New("failed to perform schema migration")
}
}
}
return pool, nil
}
func Get(ctx context.Context) Handler {
return handler{
ctx: ctx,
db: pool,
@@ -36,16 +51,6 @@ func RunInTx(ctx context.Context, fn func(TxHandler) error) error {
return Get(ctx).RunInTx(fn)
}
// Pool returns the pgx pool
// Only to be used for jobs initialization
func Pool() *pgxpool.Pool {
return pool
}
func Notifier() pubsub.Notifier {
return notifier
}
func Close() {
if pool != nil {
pool.Close()
@@ -53,37 +58,36 @@ func Close() {
}
}
func initPool(ctx context.Context) error {
func Connect(ctx context.Context, cfg Config, log zerolog.Logger) (*pgxpool.Pool, error) {
var dsn strings.Builder
dsn.WriteString("host=" + Cfg.Host)
dsn.WriteString(" port=" + strconv.Itoa(Cfg.Port))
dsn.WriteString(" dbname=" + Cfg.Name)
dsn.WriteString(" user=" + Cfg.User)
if Cfg.Password != "" {
dsn.WriteString(" password=" + Cfg.Password)
dsn.WriteString("host=" + cfg.Host)
dsn.WriteString(" port=" + strconv.Itoa(cfg.Port))
dsn.WriteString(" dbname=" + cfg.Name)
dsn.WriteString(" user=" + cfg.User)
if cfg.Password != "" {
dsn.WriteString(" password=" + cfg.Password)
}
config, err := pgxpool.ParseConfig(dsn.String())
if err != nil {
return errors.New("Unable to parse DSN: " + err.Error())
return nil, errors.New("failed to parse DSN: " + err.Error())
}
if Cfg.Trace {
config.ConnConfig.Tracer = tracer{}
if cfg.Trace {
config.ConnConfig.Tracer = tracer{
log: log.With().Str("c", "db.trace").Logger(),
}
}
pool, err = pgxpool.NewWithConfig(ctx, config)
pool, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
return err
return nil, errors.New("failed to connect to database: " + err.Error())
}
log.Debug().Str("host", config.ConnConfig.Host+":"+strconv.Itoa(int(config.ConnConfig.Port))).
log.Debug().
Str("host", config.ConnConfig.Host+":"+strconv.Itoa(int(config.ConnConfig.Port))).
Str("db", config.ConnConfig.Database).
Msg("Connected to database")
if err := checkVersion(ctx, Cfg.NoMigrate); err != nil {
return err
}
return nil
return pool, nil
}

View File

@@ -1,70 +0,0 @@
package migrations
import (
"context"
"embed"
"io/fs"
"github.com/jackc/pgx/v5"
"github.com/jackc/tern/v2/migrate"
)
const versionTable = "db_version"
type Migrator struct {
migrator *migrate.Migrator
}
//go:embed data/*.sql
var migrationFiles embed.FS
func NewMigrator(ctx context.Context, conn *pgx.Conn) (Migrator, error) {
migrator, err := migrate.NewMigratorEx(
ctx, conn, versionTable,
&migrate.MigratorOptions{
DisableTx: false,
})
if err != nil {
return Migrator{}, err
}
migrationRoot, _ := fs.Sub(migrationFiles, "data")
err = migrator.LoadMigrations(migrationRoot)
if err != nil {
return Migrator{}, err
}
return Migrator{
migrator: migrator,
}, nil
}
func (m Migrator) GetCurrentVersion(ctx context.Context) (int, error) {
version, err := m.migrator.GetCurrentVersion(ctx)
if err != nil {
return 0, err
}
return int(version), nil
}
func (m Migrator) GetLatestVersion() int {
return len(m.migrator.Migrations)
}
func (m Migrator) Migrations() []*migrate.Migration {
return m.migrator.Migrations
}
// Migrate migrates the DB to the most recent version of the schema.
func (m Migrator) Migrate(ctx context.Context) error {
err := m.migrator.Migrate(ctx)
return err
}
// MigrateTo migrates to a specific version of the schema. Use '0' to undo all migrations.
func (m Migrator) MigrateTo(ctx context.Context, ver int32) error {
err := m.migrator.MigrateTo(ctx, ver)
return err
}

View File

@@ -2,112 +2,94 @@ package db
import (
"context"
"errors"
"fmt"
"embed"
"io/fs"
"codeberg.org/shroff/phylum/server/internal/db/migrations"
"github.com/sirupsen/logrus"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/tern/v2/migrate"
)
var (
ErrMigrationTargetTooLow = errors.New("schema migration target cannot be less than 0")
ErrMigrationTargetTooHigh = errors.New("schema migration target cannot be greater than latest")
ErrMigrationNoAutoDowngrade = errors.New("will not auto-downgrade schema to prevent data loss")
)
const versionTable = "db_version"
func checkVersion(ctx context.Context, skipMigration bool) error {
//go:embed migrations/*.sql
var migrationFiles embed.FS
type SchemaInfo struct {
pool *pgxpool.Pool
CurrentVersion int32
LatestVersion int32
}
func GetSchemaInfo(ctx context.Context, pool *pgxpool.Pool) (SchemaInfo, error) {
conn, err := pool.Acquire(ctx)
if err != nil {
return SchemaInfo{}, err
}
defer conn.Release()
migrator, err := createMigrator(ctx, conn.Conn())
if err != nil {
return SchemaInfo{}, err
}
currentVersion, err := migrator.GetCurrentVersion(ctx)
if err != nil {
return SchemaInfo{}, err
}
return SchemaInfo{
pool: pool,
CurrentVersion: currentVersion,
LatestVersion: int32(len(migrator.Migrations)),
}, nil
}
// MigrateTo migrates to a specific version of the schema. Use '0' to undo all migrations.
func (s SchemaInfo) MigrateTo(ctx context.Context, ver int32) error {
conn, err := s.pool.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()
migrator, err := migrations.NewMigrator(ctx, conn.Conn())
if err != nil {
if migrator, err := createMigrator(ctx, conn.Conn()); err != nil {
return err
} else {
return migrator.MigrateTo(ctx, ver)
}
currentSchemaVersion, err := migrator.GetCurrentVersion(ctx)
if err != nil {
return err
}
latestSchemaVersion := migrator.GetLatestVersion()
// Nothing to do
if currentSchemaVersion == latestSchemaVersion {
return nil
}
if skipMigration {
logrus.Warn(fmt.Sprintf("Database schema is not at current version: %d/%d", currentSchemaVersion, latestSchemaVersion))
return nil
}
if currentSchemaVersion > latestSchemaVersion {
logrus.Warn(fmt.Sprintf("Not automatically downgrading schema from %d to %d", currentSchemaVersion, latestSchemaVersion))
return nil
}
logrus.Info(fmt.Sprintf("Migrating database from version %d to %d", currentSchemaVersion, latestSchemaVersion))
err = migrator.MigrateTo(ctx, int32(latestSchemaVersion))
if err != nil {
return err
}
return nil
}
func Migrate(ctx context.Context, version int) error {
Get(ctx) // Initialize the pool, just in case
conn, err := pool.Acquire(ctx)
func createMigrator(ctx context.Context, conn *pgx.Conn) (*migrate.Migrator, error) {
m, err := migrate.NewMigratorEx(ctx, conn, versionTable, &migrate.MigratorOptions{DisableTx: false})
if err != nil {
return err
}
defer conn.Release()
migrator, err := migrations.NewMigrator(ctx, conn.Conn())
if err != nil {
return err
}
currentSchemaVersion, err := migrator.GetCurrentVersion(ctx)
if err != nil {
return err
}
latestSchemaVersion := migrator.GetLatestVersion()
if version < 0 {
version = latestSchemaVersion
} else if version == 0 {
logrus.Info("Deleting database schema")
return DeleteSchema(ctx)
} else if version > latestSchemaVersion {
return ErrMigrationTargetTooHigh
return nil, err
}
logrus.Info(fmt.Sprintf("Migrating database from version %d to %d", currentSchemaVersion, version))
err = migrator.MigrateTo(ctx, int32(version))
if err != nil {
return err
migrationRoot, _ := fs.Sub(migrationFiles, "migrations")
if err := m.LoadMigrations(migrationRoot); err != nil {
return nil, err
}
if version == latestSchemaVersion {
return nil
}
return nil
return m, nil
}
func DeleteSchema(ctx context.Context) error {
h := Get(ctx)
func ResetSchema(ctx context.Context, pool *pgxpool.Pool) error {
user := pool.Config().ConnConfig.User
return h.RunInTx(func(tx TxHandler) (err error) {
if _, err = tx.Exec("DROP SCHEMA public CASCADE"); err != nil {
return
return pgx.BeginFunc(ctx, pool, func(tx pgx.Tx) error {
if _, err := tx.Exec(ctx, "DROP SCHEMA public CASCADE"); err != nil {
return err
}
if _, err = tx.Exec("CREATE SCHEMA public"); err != nil {
return
if _, err := tx.Exec(ctx, "CREATE SCHEMA public"); err != nil {
return err
}
if _, err = tx.Exec("GRANT ALL ON SCHEMA public TO " + user); err != nil {
return
if _, err := tx.Exec(ctx, "GRANT ALL ON SCHEMA public TO "+user); err != nil {
return err
}
if _, err = tx.Exec("GRANT ALL ON SCHEMA public TO public"); err != nil {
return
if _, err := tx.Exec(ctx, "GRANT ALL ON SCHEMA public TO public"); err != nil {
return err
}
_, err = tx.Exec("COMMENT ON SCHEMA public IS 'standard public schema'")
return
if _, err := tx.Exec(ctx, "COMMENT ON SCHEMA public IS 'standard public schema'"); err != nil {
return err
}
return nil
})
}

View File

@@ -4,17 +4,18 @@ import (
"context"
"github.com/jackc/pgx/v5"
"github.com/sirupsen/logrus"
"github.com/rs/zerolog"
)
type tracer struct {
log zerolog.Logger
}
func (t tracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryStartData) context.Context {
logrus.Trace(data)
t.log.Trace().Any("data", data).Msg("query start")
return ctx
}
func (c tracer) TraceQueryEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryEndData) {
logrus.Trace(data)
func (t tracer) TraceQueryEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryEndData) {
t.log.Trace().Any("data", data).Msg("query end")
}

View File

@@ -16,7 +16,7 @@ type listener struct {
mu sync.Mutex
}
func newListener(pool *pgxpool.Pool, ctx context.Context) (*listener, error) {
func newListener(ctx context.Context, pool *pgxpool.Pool) (*listener, error) {
l := listener{
pool: pool,
mu: sync.Mutex{},

View File

@@ -3,14 +3,13 @@ package pubsub
import (
"context"
"errors"
"os"
"slices"
"sync"
"time"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/sirupsen/logrus"
"github.com/rs/zerolog"
)
const waitTimeout = 30 * time.Second
@@ -22,6 +21,7 @@ type Notifier interface {
type notifier struct {
mu sync.RWMutex
log zerolog.Logger
listener *listener
subscriptions map[string][]*subscription
pendingOps []pendingOp
@@ -41,14 +41,17 @@ const (
pendingOpTypeUnlisten = iota
)
func Start(pool *pgxpool.Pool, ctx context.Context) (Notifier, error) {
listener, err := newListener(pool, ctx)
var n Notifier
func Initialize(ctx context.Context, pool *pgxpool.Pool, log zerolog.Logger) error {
listener, err := newListener(ctx, pool)
if err != nil {
return nil, err
return err
}
notifier := notifier{
mu: sync.RWMutex{},
log: log.With().Str("c", "notifier").Logger(),
listener: listener,
subscriptions: make(map[string][]*subscription),
pendingOps: []pendingOp{},
@@ -61,10 +64,9 @@ func Start(pool *pgxpool.Pool, ctx context.Context) (Notifier, error) {
notification, err := notifier.waitOnce(ctx)
if err != nil {
if err.Error() == "conn closed" {
logrus.Error("PGX Connection Closed. Exiting")
os.Exit(1)
notifier.log.Fatal().Msg("connection closed. Exiting")
}
logrus.Error(err)
notifier.log.Warn().Err(err).Msg("error in notifier.waitOnce")
}
if notification != nil {
notifier.notifyListeners(notification)
@@ -72,7 +74,12 @@ func Start(pool *pgxpool.Pool, ctx context.Context) (Notifier, error) {
}
}()
return &notifier, nil
n = &notifier
return nil
}
func Get() Notifier {
return n
}
func (n *notifier) Listen(channel string) Subscription {
@@ -122,7 +129,7 @@ func (n *notifier) processPendingOps(ctx context.Context) {
}
if err != nil {
logrus.Warnf("error completing op %v on channel %s: %s", u.op, u.channelName, err.Error())
n.log.Warn().Err(err).Str("channel", u.channelName).Uint8("op", u.op).Msg("error completing op")
}
u.callback()
}()
@@ -160,7 +167,7 @@ func (n *notifier) notifyListeners(notification *pgconn.Notification) {
select {
case sub.listenChan <- notification.Payload:
default:
logrus.Warn("dropped notification due to full buffer", "payload", notification.Payload)
n.log.Warn().Str("channel", notification.Channel).Msg("dropped notification")
}
}
}

View File

@@ -20,11 +20,11 @@ const maxAttempts = 6
var ErrShutdown = errors.New("shutdown")
var errWaiting = errors.New("waiting for jobs")
var Cfg Config
var instance *Client
type Client struct {
db db.Handler
cfg Config
logger zerolog.Logger
// Config
defaultTimeout time.Duration
@@ -42,14 +42,16 @@ type Client struct {
cancelWork context.CancelFunc
}
func Initialize(db db.Handler, logger zerolog.Logger) {
func Initialize(db db.Handler, cfg Config, logger zerolog.Logger) *Client {
if instance == nil {
instance = &Client{
db: db,
cfg: cfg,
logger: logger.With().Str("mod", "steve").Logger(),
factories: make(map[string]workUnitFactory),
}
}
return instance
}
func Get() *Client {
@@ -73,7 +75,7 @@ func RegisterWorker[T JobArgs](client *Client, worker Worker[T]) {
}
func (c *Client) Run(ctx context.Context) error {
if Cfg.Workers <= 0 {
if c.cfg.Workers <= 0 {
return errors.New("workers must be a positive integer")
}
if err := resetRunningJobs(c.db); err != nil {
@@ -86,18 +88,18 @@ func (c *Client) Run(ctx context.Context) error {
return err
}
c.availableWorkers = make(chan struct{}, Cfg.Workers)
c.availableWorkers = make(chan struct{}, c.cfg.Workers)
c.jobAvailable = make(chan struct{})
c.defaultTimeout = time.Duration(Cfg.Timeout) * time.Second
c.defaultTimeout = time.Duration(c.cfg.Timeout) * time.Second
c.logger.Info().Int("workers", Cfg.Workers).Msg("Starting")
c.logger.Info().Int("workers", c.cfg.Workers).Msg("Starting")
c.done = make(chan struct{})
var listenCtx, runCtx, workCtx context.Context
listenCtx, c.cancelListen = context.WithCancel(ctx)
runCtx, c.cancelRun = context.WithCancel(ctx)
workCtx, c.cancelWork = context.WithCancel(ctx)
go c.produceJobs(listenCtx, db.Notifier())
go c.produceJobs(listenCtx, pubsub.Get())
c.consumeJobs(runCtx, workCtx)
return ErrShutdown
}

View File

@@ -8,6 +8,7 @@ import (
"strings"
"codeberg.org/shroff/phylum/server/internal/db"
"codeberg.org/shroff/phylum/server/internal/pubsub"
"github.com/jackc/pgx/v5"
"github.com/rs/zerolog/log"
)
@@ -141,7 +142,7 @@ func restoreBackends(db db.Handler) (map[string]Backend, error) {
}
func processBackendUpdates() {
sub := db.Notifier().Listen("backend_updates")
sub := pubsub.Get().Listen("backend_updates")
for {
p := <-sub.NotificationC()
var c BackendConfig