diff --git a/server/internal/pubsub/notifier.go b/server/internal/pubsub/notifier.go index 66d55699..93d23376 100644 --- a/server/internal/pubsub/notifier.go +++ b/server/internal/pubsub/notifier.go @@ -16,7 +16,7 @@ const waitTimeout = 30 * time.Second const opTimeout = 10 * time.Second type Notifier interface { - Listen(channel string) Subscription + Listen(channel string, bufferSize int, warnOnDrop bool) Subscription } type notifier struct { @@ -82,7 +82,7 @@ func Get() Notifier { return n } -func (n *notifier) Listen(channel string) Subscription { +func (n *notifier) Listen(channel string, bufferSize int, warnOnDrop bool) Subscription { n.mu.Lock() defer n.mu.Unlock() @@ -90,8 +90,9 @@ func (n *notifier) Listen(channel string) Subscription { sub := &subscription{ channelName: channel, - listenChan: make(chan string, 2), + listenChan: make(chan string, bufferSize), notifier: n, + warnOnDrop: warnOnDrop, } n.subscriptions[channel] = append(existingSubs, sub) @@ -167,7 +168,9 @@ func (n *notifier) notifyListeners(notification *pgconn.Notification) { select { case sub.listenChan <- notification.Payload: default: - n.log.Warn().Str("channel", notification.Channel).Msg("dropped notification") + if sub.warnOnDrop { + n.log.Warn().Str("channel", notification.Channel).Msg("dropped notification") + } } } } diff --git a/server/internal/pubsub/subscription.go b/server/internal/pubsub/subscription.go index 1d80d945..b79bb116 100644 --- a/server/internal/pubsub/subscription.go +++ b/server/internal/pubsub/subscription.go @@ -14,6 +14,7 @@ type subscription struct { channelName string listenChan chan string notifier *notifier + warnOnDrop bool establishedChan chan struct{} unlistenOnce sync.Once diff --git a/server/internal/steve/steve.go b/server/internal/steve/steve.go index fb8ce559..5d3fc59a 100644 --- a/server/internal/steve/steve.go +++ b/server/internal/steve/steve.go @@ -100,6 +100,7 @@ func (c *Client) Run(ctx context.Context) error { runCtx, c.cancelRun = context.WithCancel(ctx) workCtx, c.cancelWork = context.WithCancel(ctx) go c.produceJobs(listenCtx, pubsub.Get()) + go c.readyJobs(listenCtx) c.consumeJobs(runCtx, workCtx) return ErrShutdown } @@ -216,14 +217,12 @@ func (c *Client) Shutdown(ctx context.Context) error { func (c *Client) produceJobs(ctx context.Context, notifier pubsub.Notifier) { // Wait until the listener is established - subJobInserted := notifier.Listen(channelName) + subJobInserted := notifier.Listen(channelName, 1, false) select { case <-subJobInserted.EstablishedC(): default: } - t := time.NewTicker(5 * time.Second) - loop: for { select { @@ -234,6 +233,19 @@ loop: case c.jobAvailable <- struct{}{}: default: } + } + } + subJobInserted.Cancel() +} + +func (c *Client) readyJobs(ctx context.Context) { + t := time.NewTicker(5 * time.Second) + +loop: + for { + select { + case <-ctx.Done(): + break loop case <-t.C: if n, err := readyScheduledJobs(c.db); err != nil { c.logger.Warn().Err(err).Msg("Error scheduling ready jobs") @@ -255,7 +267,6 @@ loop: } } } - subJobInserted.Cancel() } func (c *Client) consumeJobs(runCtx context.Context, workCtx context.Context) { diff --git a/server/internal/storage/storage.go b/server/internal/storage/storage.go index b72a44da..460cc879 100644 --- a/server/internal/storage/storage.go +++ b/server/internal/storage/storage.go @@ -140,7 +140,7 @@ func restoreBackends(db db.Handler) (map[string]Backend, error) { } func processBackendUpdates(logger zerolog.Logger) { - sub := pubsub.Get().Listen("backend_updates") + sub := pubsub.Get().Listen("backend_updates", 5, true) for { p := <-sub.NotificationC() var c BackendConfig