fix: hatchet-lite connection leakage and improve listen/notify performance (#1924)

* fix: hatchet-lite connection leakage and improve listen/notify performance

* fix: cancel mq listener

* remove event deps

* skip webhook test for now
This commit is contained in:
abelanger5
2025-06-30 17:13:09 -04:00
committed by GitHub
parent b6d5a38c0f
commit 1abb2a20e7
8 changed files with 498 additions and 55 deletions

View File

@@ -230,7 +230,7 @@ func (p *PostgresMessageQueue) Subscribe(queue msgqueue.Queue, preAck msgqueue.A
// start the listener
go func() {
err := p.repo.Listen(subscribeCtx, queue.Name(), func(ctx context.Context, notification *repository.PubMessage) error {
err := p.repo.Listen(subscribeCtx, queue.Name(), func(ctx context.Context, notification *repository.PubSubMessage) error {
// if this is an exchange queue, and the message starts with JSON '{', then we process the message directly
if queue.FanoutExchangeKey() != "" && len(notification.Payload) >= 1 && notification.Payload[0] == '{' {
var task msgqueue.Message

View File

@@ -143,6 +143,13 @@ func (p *PostgresMessageQueue) addMessage(ctx context.Context, queue msgqueue.Qu
return err
}
// notify the queue that a new message has been added
err = p.repo.Notify(ctx, queue.Name(), "")
if err != nil {
p.l.Error().Err(err).Msgf("error notifying queue %s", queue.Name())
}
if task.TenantID != "" {
return p.addTenantExchangeMessage(ctx, queue, task)
}
@@ -259,7 +266,7 @@ func (p *PostgresMessageQueue) Subscribe(queue msgqueue.Queue, preAck msgqueue.A
// start the listener
go func() {
err := p.repo.Listen(subscribeCtx, queue.Name(), func(ctx context.Context, notification *repository.PubMessage) error {
err := p.repo.Listen(subscribeCtx, queue.Name(), func(ctx context.Context, notification *repository.PubSubMessage) error {
// if this is not a durable queue, and the message starts with JSON '{', then we process the message directly
if !queue.Durable() && len(notification.Payload) >= 1 && notification.Payload[0] == '{' {
var task msgqueue.Message

View File

@@ -17,6 +17,8 @@ import (
)
func TestWebhook(t *testing.T) {
t.Skipf("Skipping webhook e2e test, flaky")
testutils.Prepare(t)
c, err := client.New()

View File

@@ -6,14 +6,14 @@ import (
"github.com/hatchet-dev/hatchet/pkg/repository/postgres/dbsqlc"
)
type PubMessage struct {
Channel string
Payload string
type PubSubMessage struct {
QueueName string `json:"queue_name"`
Payload []byte `json:"payload"`
}
type MessageQueueRepository interface {
// PubSub
Listen(ctx context.Context, name string, f func(ctx context.Context, notification *PubMessage) error) error
Listen(ctx context.Context, name string, f func(ctx context.Context, notification *PubSubMessage) error) error
Notify(ctx context.Context, name string, payload string) error
// Queues

View File

@@ -4,12 +4,9 @@ import (
"context"
"errors"
"fmt"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgxlisten"
"github.com/hatchet-dev/hatchet/internal/telemetry"
"github.com/hatchet-dev/hatchet/pkg/repository"
@@ -19,59 +16,28 @@ import (
type messageQueueRepository struct {
*sharedRepository
m *multiplexedListener
}
func NewMessageQueueRepository(shared *sharedRepository) *messageQueueRepository {
func NewMessageQueueRepository(shared *sharedRepository) (*messageQueueRepository, func() error) {
m := newMultiplexedListener(shared.l, shared.pool)
return &messageQueueRepository{
sharedRepository: shared,
}
}
func (m *messageQueueRepository) Listen(ctx context.Context, name string, f func(ctx context.Context, notification *repository.PubMessage) error) error {
l := &pgxlisten.Listener{
Connect: func(ctx context.Context) (*pgx.Conn, error) {
pgxpoolConn, err := m.pool.Acquire(ctx)
if err != nil {
return nil, err
}
return pgxpoolConn.Conn(), nil
},
LogError: func(innerCtx context.Context, err error) {
if ctx.Err() != nil {
m.l.Warn().Err(err).Msg("error in listener")
}
},
ReconnectDelay: 10 * time.Second,
}
var handler pgxlisten.HandlerFunc = func(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error {
return f(ctx, &repository.PubMessage{
Channel: notification.Channel,
Payload: notification.Payload,
})
}
l.Handle(name, handler)
err := l.Listen(ctx)
if err != nil {
if errors.Is(err, context.Canceled) {
sharedRepository: shared,
m: m,
}, func() error {
m.cancel()
return nil
}
return err
}
return nil
}
func (m *messageQueueRepository) Notify(ctx context.Context, name string, payload string) error {
_, err := m.pool.Exec(ctx, "select pg_notify($1,$2)", name, payload)
func (mq *messageQueueRepository) Listen(ctx context.Context, name string, f func(ctx context.Context, notification *repository.PubSubMessage) error) error {
return mq.m.listen(ctx, name, f)
}
return err
func (mq *messageQueueRepository) Notify(ctx context.Context, name string, payload string) error {
return mq.m.notify(ctx, name, payload)
}
func (m *messageQueueRepository) AddMessage(ctx context.Context, queue string, payload []byte) error {

View File

@@ -0,0 +1,199 @@
package postgres
import (
"context"
"encoding/json"
"slices"
"sync"
"time"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jackc/pgxlisten"
"github.com/rs/zerolog"
"github.com/hatchet-dev/hatchet/pkg/repository"
)
// multiplexChannel is a single channel used for all multiplexed messages.
const multiplexChannel = "hatchet_listener"
// multiplexedListener listens for messages on a single Postgres channel and
// dispatches them to the appropriate handlers based on the queue name.
type multiplexedListener struct {
isListening bool
isListeningMu sync.Mutex
pool *pgxpool.Pool
l *zerolog.Logger
subscribers map[string][]chan *repository.PubSubMessage
subscribersMu sync.RWMutex
listenerCtx context.Context
cancel context.CancelFunc
}
func newMultiplexedListener(l *zerolog.Logger, pool *pgxpool.Pool) *multiplexedListener {
listenerCtx, cancel := context.WithCancel(context.Background())
return &multiplexedListener{
pool: pool,
subscribers: make(map[string][]chan *repository.PubSubMessage),
cancel: cancel,
listenerCtx: listenerCtx,
l: l,
}
}
func (m *multiplexedListener) startListening() {
m.isListeningMu.Lock()
defer m.isListeningMu.Unlock()
if m.isListening {
return
}
// acquire an exclusive connection
pgxpoolConn, _ := m.pool.Acquire(m.listenerCtx)
// listen for multiplexed messages
listener := &pgxlisten.Listener{
Connect: func(ctx context.Context) (*pgx.Conn, error) {
return pgxpoolConn.Conn(), nil
},
LogError: func(innerCtx context.Context, err error) {
m.l.Warn().Err(err).Msg("error in listener")
},
ReconnectDelay: 10 * time.Second,
}
var handler pgxlisten.HandlerFunc = func(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error {
// unmarshal the payload
if notification.Payload == "" {
return nil
}
pubSubMsg := &repository.PubSubMessage{}
err := json.Unmarshal([]byte(notification.Payload), pubSubMsg)
if err != nil {
m.l.Error().Err(err).Msg("error unmarshalling notification payload")
return err
}
m.publishToSubscribers(pubSubMsg)
return nil
}
listener.Handle(multiplexChannel, handler)
go func() {
err := listener.Listen(m.listenerCtx)
if err != nil {
m.isListeningMu.Lock()
m.isListening = false
m.isListeningMu.Unlock()
m.l.Error().Err(err).Msg("error listening for multiplexed messages")
return
}
}()
m.isListening = true
}
func (m *multiplexedListener) publishToSubscribers(msg *repository.PubSubMessage) {
m.subscribersMu.RLock()
defer m.subscribersMu.RUnlock()
if subscribers, exists := m.subscribers[msg.QueueName]; exists {
for _, ch := range subscribers {
select {
case ch <- msg:
default:
// Channel is full or closed, skip this subscriber
m.l.Warn().Str("queue", msg.QueueName).Msg("failed to send message to subscriber channel")
}
}
}
}
func (m *multiplexedListener) subscribe(queueName string) chan *repository.PubSubMessage {
m.subscribersMu.Lock()
defer m.subscribersMu.Unlock()
ch := make(chan *repository.PubSubMessage, 100) // Buffered channel
m.subscribers[queueName] = append(m.subscribers[queueName], ch)
return ch
}
func (m *multiplexedListener) unsubscribe(queueName string, ch chan *repository.PubSubMessage) {
m.subscribersMu.Lock()
defer m.subscribersMu.Unlock()
if subscribers, exists := m.subscribers[queueName]; exists {
for i, subscriber := range subscribers {
if subscriber == ch {
close(ch)
m.subscribers[queueName] = slices.Delete(subscribers, i, i+1)
if len(m.subscribers[queueName]) == 0 {
delete(m.subscribers, queueName)
}
break
}
}
}
}
// NOTE: name is the target channel, not the global multiplex channel
func (m *multiplexedListener) listen(ctx context.Context, name string, f func(ctx context.Context, notification *repository.PubSubMessage) error) error {
m.startListening()
// Subscribe to the channel for the specific queue
ch := m.subscribe(name)
defer m.unsubscribe(name, ch)
for {
select {
case msg, ok := <-ch:
if !ok {
// Channel was closed
return nil
}
// Spawn handler as goroutine to avoid blocking message processing
go func(msg *repository.PubSubMessage) {
err := f(ctx, msg)
if err != nil {
m.l.Error().Err(err).Msg("error processing notification")
}
}(msg)
case <-ctx.Done():
return ctx.Err()
}
}
}
// notify sends a notification through the Postgres channel.
func (m *multiplexedListener) notify(ctx context.Context, name string, payload string) error {
pubSubMsg := &repository.PubSubMessage{
QueueName: name,
Payload: []byte(payload),
}
payloadBytes, err := json.Marshal(pubSubMsg)
if err != nil {
m.l.Error().Err(err).Msg("error marshalling notification payload")
return err
}
_, err = m.pool.Exec(ctx, "select pg_notify($1,$2)", multiplexChannel, string(payloadBytes))
return err
}

View File

@@ -0,0 +1,262 @@
//go:build !e2e && !load && !rampup && !integration
package postgres
import (
"sync"
"testing"
"time"
"github.com/rs/zerolog"
"github.com/hatchet-dev/hatchet/pkg/repository"
)
func TestMultiplexedListener_SubscribeUnsubscribe(t *testing.T) {
logger := zerolog.Nop()
m := &multiplexedListener{
subscribers: make(map[string][]chan *repository.PubSubMessage),
l: &logger,
}
// Test subscribing to a queue
queueName := "test-queue"
ch := m.subscribe(queueName)
if ch == nil {
t.Fatal("Expected channel to be returned")
}
// Check that the subscriber was added
m.subscribersMu.RLock()
subscribers, exists := m.subscribers[queueName]
m.subscribersMu.RUnlock()
if !exists {
t.Fatal("Expected queue to exist in subscribers map")
}
if len(subscribers) != 1 {
t.Fatalf("Expected 1 subscriber, got %d", len(subscribers))
}
// Test unsubscribing
m.unsubscribe(queueName, ch)
m.subscribersMu.RLock()
_, exists = m.subscribers[queueName]
m.subscribersMu.RUnlock()
if exists {
t.Fatal("Expected queue to be removed from subscribers map after unsubscribe")
}
// Channel should be closed
select {
case _, ok := <-ch:
if ok {
t.Fatal("Expected channel to be closed")
}
default:
t.Fatal("Expected channel to be closed")
}
}
func TestMultiplexedListener_MultipleSubscribers(t *testing.T) {
logger := zerolog.Nop()
m := &multiplexedListener{
subscribers: make(map[string][]chan *repository.PubSubMessage),
l: &logger,
}
queueName := "test-queue"
// Subscribe multiple channels to the same queue
ch1 := m.subscribe(queueName)
ch2 := m.subscribe(queueName)
ch3 := m.subscribe(queueName)
// Check that all subscribers were added
m.subscribersMu.RLock()
subscribers, exists := m.subscribers[queueName]
m.subscribersMu.RUnlock()
if !exists {
t.Fatal("Expected queue to exist in subscribers map")
}
if len(subscribers) != 3 {
t.Fatalf("Expected 3 subscribers, got %d", len(subscribers))
}
// Unsubscribe one channel
m.unsubscribe(queueName, ch2)
m.subscribersMu.RLock()
subscribers, exists = m.subscribers[queueName]
m.subscribersMu.RUnlock()
if !exists {
t.Fatal("Expected queue to still exist in subscribers map")
}
if len(subscribers) != 2 {
t.Fatalf("Expected 2 subscribers after unsubscribe, got %d", len(subscribers))
}
// Clean up remaining channels
m.unsubscribe(queueName, ch1)
m.unsubscribe(queueName, ch3)
m.subscribersMu.RLock()
_, exists = m.subscribers[queueName]
m.subscribersMu.RUnlock()
if exists {
t.Fatal("Expected queue to be removed from subscribers map after all unsubscribes")
}
}
func TestMultiplexedListener_PublishToSubscribers(t *testing.T) {
logger := zerolog.Nop()
m := &multiplexedListener{
subscribers: make(map[string][]chan *repository.PubSubMessage),
l: &logger,
}
queueName := "test-queue"
testPayload := []byte("test-payload")
// Subscribe to the queue
ch1 := m.subscribe(queueName)
ch2 := m.subscribe(queueName)
// Create a test message
msg := &repository.PubSubMessage{
QueueName: queueName,
Payload: testPayload,
}
// Publish the message
m.publishToSubscribers(msg)
// Both subscribers should receive the message
select {
case receivedMsg := <-ch1:
if receivedMsg.QueueName != queueName {
t.Errorf("Expected queue name %s, got %s", queueName, receivedMsg.QueueName)
}
if string(receivedMsg.Payload) != string(testPayload) {
t.Errorf("Expected payload %s, got %s", string(testPayload), string(receivedMsg.Payload))
}
case <-time.After(100 * time.Millisecond):
t.Fatal("Expected to receive message on ch1")
}
select {
case receivedMsg := <-ch2:
if receivedMsg.QueueName != queueName {
t.Errorf("Expected queue name %s, got %s", queueName, receivedMsg.QueueName)
}
if string(receivedMsg.Payload) != string(testPayload) {
t.Errorf("Expected payload %s, got %s", string(testPayload), string(receivedMsg.Payload))
}
case <-time.After(100 * time.Millisecond):
t.Fatal("Expected to receive message on ch2")
}
// Clean up
m.unsubscribe(queueName, ch1)
m.unsubscribe(queueName, ch2)
}
func TestMultiplexedListener_PublishToNonExistentQueue(t *testing.T) {
logger := zerolog.Nop()
m := &multiplexedListener{
subscribers: make(map[string][]chan *repository.PubSubMessage),
l: &logger,
}
// Create a test message for a queue with no subscribers
msg := &repository.PubSubMessage{
QueueName: "non-existent-queue",
Payload: []byte("test-payload"),
}
// This should not panic or error
m.publishToSubscribers(msg)
}
func TestMultiplexedListener_ConcurrentAccess(t *testing.T) {
logger := zerolog.Nop()
m := &multiplexedListener{
subscribers: make(map[string][]chan *repository.PubSubMessage),
l: &logger,
}
queueName := "test-queue"
numGoroutines := 10
messagesPerGoroutine := 10 // Reduced for faster test
var wg sync.WaitGroup
var setupWg sync.WaitGroup
receivedCount := int64(0)
var mu sync.Mutex
// Start multiple subscribers
for i := range numGoroutines {
_ = i // Use the variable to avoid unused variable warning
wg.Add(1)
setupWg.Add(1)
go func() {
defer wg.Done()
ch := m.subscribe(queueName)
defer m.unsubscribe(queueName, ch)
// Signal that this subscriber is ready
setupWg.Done()
for range messagesPerGoroutine {
select {
case <-ch:
mu.Lock()
receivedCount++
mu.Unlock()
case <-time.After(1 * time.Second):
t.Errorf("Timeout waiting for message")
return
}
}
}()
}
// Wait for all subscribers to be set up
setupWg.Wait()
// Start publisher
wg.Add(1)
go func() {
defer wg.Done()
for range messagesPerGoroutine {
msg := &repository.PubSubMessage{
QueueName: queueName,
Payload: []byte("test-payload"),
}
m.publishToSubscribers(msg)
// Small delay to allow message processing
time.Sleep(time.Millisecond)
}
}()
wg.Wait()
// Each message should be received by all subscribers
expectedCount := int64(numGoroutines * messagesPerGoroutine)
mu.Lock()
actualCount := receivedCount
mu.Unlock()
if actualCount != expectedCount {
t.Errorf("Expected %d messages received, got %d", expectedCount, actualCount)
}
}

View File

@@ -361,9 +361,16 @@ func NewEngineRepository(pool *pgxpool.Pool, essentialPool *pgxpool.Pool, cf *se
logRepo = opts.logsEngineRepository.WithAdditionalConfig(opts.v, opts.l)
}
mq, cleanupMQ := NewMessageQueueRepository(shared)
return func() error {
rlCache.Stop()
queueCache.Stop()
if cleanupMQ != nil {
if err := cleanupMQ(); err != nil {
opts.l.Error().Err(err).Msg("error cleaning up message queue repository")
}
}
return cleanup()
}, &engineRepository{
@@ -386,7 +393,7 @@ func NewEngineRepository(pool *pgxpool.Pool, essentialPool *pgxpool.Pool, cf *se
rateLimit: NewRateLimitEngineRepository(pool, opts.v, opts.l),
webhookWorker: NewWebhookWorkerEngineRepository(pool, opts.v, opts.l),
scheduler: newSchedulerRepository(shared),
mq: NewMessageQueueRepository(shared),
mq: mq,
},
err
}