[server][pubsub] Allow configuring buffer size for each listen

This commit is contained in:
Abhishek Shroff
2025-07-21 02:23:45 +05:30
parent 594ea3b554
commit 326089c009
4 changed files with 24 additions and 9 deletions

View File

@@ -16,7 +16,7 @@ const waitTimeout = 30 * time.Second
const opTimeout = 10 * time.Second
type Notifier interface {
Listen(channel string) Subscription
Listen(channel string, bufferSize int, warnOnDrop bool) Subscription
}
type notifier struct {
@@ -82,7 +82,7 @@ func Get() Notifier {
return n
}
func (n *notifier) Listen(channel string) Subscription {
func (n *notifier) Listen(channel string, bufferSize int, warnOnDrop bool) Subscription {
n.mu.Lock()
defer n.mu.Unlock()
@@ -90,8 +90,9 @@ func (n *notifier) Listen(channel string) Subscription {
sub := &subscription{
channelName: channel,
listenChan: make(chan string, 2),
listenChan: make(chan string, bufferSize),
notifier: n,
warnOnDrop: warnOnDrop,
}
n.subscriptions[channel] = append(existingSubs, sub)
@@ -167,7 +168,9 @@ func (n *notifier) notifyListeners(notification *pgconn.Notification) {
select {
case sub.listenChan <- notification.Payload:
default:
n.log.Warn().Str("channel", notification.Channel).Msg("dropped notification")
if sub.warnOnDrop {
n.log.Warn().Str("channel", notification.Channel).Msg("dropped notification")
}
}
}
}

View File

@@ -14,6 +14,7 @@ type subscription struct {
channelName string
listenChan chan string
notifier *notifier
warnOnDrop bool
establishedChan chan struct{}
unlistenOnce sync.Once

View File

@@ -100,6 +100,7 @@ func (c *Client) Run(ctx context.Context) error {
runCtx, c.cancelRun = context.WithCancel(ctx)
workCtx, c.cancelWork = context.WithCancel(ctx)
go c.produceJobs(listenCtx, pubsub.Get())
go c.readyJobs(listenCtx)
c.consumeJobs(runCtx, workCtx)
return ErrShutdown
}
@@ -216,14 +217,12 @@ func (c *Client) Shutdown(ctx context.Context) error {
func (c *Client) produceJobs(ctx context.Context, notifier pubsub.Notifier) {
// Wait until the listener is established
subJobInserted := notifier.Listen(channelName)
subJobInserted := notifier.Listen(channelName, 1, false)
select {
case <-subJobInserted.EstablishedC():
default:
}
t := time.NewTicker(5 * time.Second)
loop:
for {
select {
@@ -234,6 +233,19 @@ loop:
case c.jobAvailable <- struct{}{}:
default:
}
}
}
subJobInserted.Cancel()
}
func (c *Client) readyJobs(ctx context.Context) {
t := time.NewTicker(5 * time.Second)
loop:
for {
select {
case <-ctx.Done():
break loop
case <-t.C:
if n, err := readyScheduledJobs(c.db); err != nil {
c.logger.Warn().Err(err).Msg("Error scheduling ready jobs")
@@ -255,7 +267,6 @@ loop:
}
}
}
subJobInserted.Cancel()
}
func (c *Client) consumeJobs(runCtx context.Context, workCtx context.Context) {

View File

@@ -140,7 +140,7 @@ func restoreBackends(db db.Handler) (map[string]Backend, error) {
}
func processBackendUpdates(logger zerolog.Logger) {
sub := pubsub.Get().Listen("backend_updates")
sub := pubsub.Get().Listen("backend_updates", 5, true)
for {
p := <-sub.NotificationC()
var c BackendConfig