From b03a8d2666da35ede7fe78d8b6ebe16e99c3ef05 Mon Sep 17 00:00:00 2001 From: abelanger5 Date: Fri, 28 Mar 2025 09:27:12 -0700 Subject: [PATCH] improve ttl cache on pgmq (#1438) * improve ttl cache on pgmq * fix: panic --- internal/msgqueue/postgres/msgqueue.go | 52 +++++++++-------------- internal/msgqueue/v1/postgres/msgqueue.go | 38 ++++++----------- pkg/config/loader/loader.go | 15 +++++-- 3 files changed, 45 insertions(+), 60 deletions(-) diff --git a/internal/msgqueue/postgres/msgqueue.go b/internal/msgqueue/postgres/msgqueue.go index b26b5c7eb..c72423151 100644 --- a/internal/msgqueue/postgres/msgqueue.go +++ b/internal/msgqueue/postgres/msgqueue.go @@ -3,7 +3,6 @@ package postgres import ( "context" "encoding/json" - "sync" "time" "github.com/google/uuid" @@ -11,6 +10,7 @@ import ( "github.com/rs/zerolog" "golang.org/x/sync/errgroup" + "github.com/hatchet-dev/hatchet/internal/cache" "github.com/hatchet-dev/hatchet/internal/msgqueue" "github.com/hatchet-dev/hatchet/internal/queueutils" "github.com/hatchet-dev/hatchet/internal/telemetry" @@ -24,9 +24,9 @@ type PostgresMessageQueue struct { l *zerolog.Logger qos int - upsertedQueues map[string]bool - upsertedQueuesMu sync.RWMutex - configFs []MessageQueueImplOpt + ttlCache *cache.TTLCache[string, bool] + + configFs []MessageQueueImplOpt } type MessageQueueImplOpt func(*MessageQueueImplOpts) @@ -57,30 +57,29 @@ func WithQos(qos int) MessageQueueImplOpt { } } -func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImplOpt) *PostgresMessageQueue { +func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImplOpt) (func() error, *PostgresMessageQueue) { opts := defaultMessageQueueImplOpts() for _, f := range fs { f(opts) } - return &PostgresMessageQueue{ - repo: repo, - l: opts.l, - qos: opts.qos, - upsertedQueues: make(map[string]bool), - configFs: fs, - } -} + c := cache.NewTTL[string, bool]() -func (p *PostgresMessageQueue) cleanup() error { - return nil + return func() error { + c.Stop() + return nil + }, &PostgresMessageQueue{ + repo: repo, + l: opts.l, + qos: opts.qos, + ttlCache: c, + configFs: fs, + } } func (p *PostgresMessageQueue) Clone() (func() error, msgqueue.MessageQueue) { - pCp := NewPostgresMQ(p.repo, p.configFs...) - - return pCp.cleanup, pCp + return NewPostgresMQ(p.repo, p.configFs...) } func (p *PostgresMessageQueue) SetQOS(prefetchCount int) { @@ -286,18 +285,10 @@ func (p *PostgresMessageQueue) IsReady() bool { } func (p *PostgresMessageQueue) upsertQueue(ctx context.Context, queue msgqueue.Queue) error { - // place a lock on the upserted queues - p.upsertedQueuesMu.RLock() - - // check if the queue has been upserted - if _, exists := p.upsertedQueues[queue.Name()]; exists { - p.upsertedQueuesMu.RUnlock() + if valid, exists := p.ttlCache.Get(queue.Name()); valid && exists { return nil } - // otherwise, lock for writing - p.upsertedQueuesMu.RUnlock() - exclusive := queue.Exclusive() // If the queue is a fanout exchange, then it is not exclusive. This is different from the RabbitMQ @@ -322,12 +313,7 @@ func (p *PostgresMessageQueue) upsertQueue(ctx context.Context, queue msgqueue.Q return err } - // place a lock on the upserted queues - p.upsertedQueuesMu.Lock() - defer p.upsertedQueuesMu.Unlock() - - // add the queue to the upserted queues - p.upsertedQueues[queue.Name()] = true + p.ttlCache.Set(queue.Name(), true, time.Second*15) return nil } diff --git a/internal/msgqueue/v1/postgres/msgqueue.go b/internal/msgqueue/v1/postgres/msgqueue.go index 6641f6059..854304b73 100644 --- a/internal/msgqueue/v1/postgres/msgqueue.go +++ b/internal/msgqueue/v1/postgres/msgqueue.go @@ -10,6 +10,7 @@ import ( "github.com/rs/zerolog" "golang.org/x/sync/errgroup" + "github.com/hatchet-dev/hatchet/internal/cache" msgqueue "github.com/hatchet-dev/hatchet/internal/msgqueue/v1" "github.com/hatchet-dev/hatchet/internal/queueutils" "github.com/hatchet-dev/hatchet/internal/telemetry" @@ -26,6 +27,8 @@ type PostgresMessageQueue struct { upsertedQueues map[string]bool upsertedQueuesMu sync.RWMutex + ttlCache *cache.TTLCache[string, bool] + configFs []MessageQueueImplOpt } @@ -57,7 +60,7 @@ func WithQos(qos int) MessageQueueImplOpt { } } -func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImplOpt) *PostgresMessageQueue { +func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImplOpt) (func() error, *PostgresMessageQueue) { opts := defaultMessageQueueImplOpts() for _, f := range fs { @@ -66,12 +69,15 @@ func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImp opts.l.Info().Msg("Creating new Postgres message queue") + c := cache.NewTTL[string, bool]() + p := &PostgresMessageQueue{ repo: repo, l: opts.l, qos: opts.qos, upsertedQueues: make(map[string]bool), configFs: fs, + ttlCache: c, } err := p.upsertQueue(context.Background(), msgqueue.TASK_PROCESSING_QUEUE) @@ -86,17 +92,14 @@ func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImp p.l.Fatal().Msgf("error upserting queue %s", msgqueue.OLAP_QUEUE.Name()) } - return p -} - -func (p *PostgresMessageQueue) cleanup() error { - return nil + return func() error { + c.Stop() + return nil + }, p } func (p *PostgresMessageQueue) Clone() (func() error, msgqueue.MessageQueue) { - pCp := NewPostgresMQ(p.repo, p.configFs...) - - return pCp.cleanup, pCp + return NewPostgresMQ(p.repo, p.configFs...) } func (p *PostgresMessageQueue) SetQOS(prefetchCount int) { @@ -315,18 +318,10 @@ func (p *PostgresMessageQueue) IsReady() bool { } func (p *PostgresMessageQueue) upsertQueue(ctx context.Context, queue msgqueue.Queue) error { - // place a lock on the upserted queues - p.upsertedQueuesMu.RLock() - - // check if the queue has been upserted - if _, exists := p.upsertedQueues[queue.Name()]; exists { - p.upsertedQueuesMu.RUnlock() + if valid, exists := p.ttlCache.Get(queue.Name()); valid && exists { return nil } - // otherwise, lock for writing - p.upsertedQueuesMu.RUnlock() - exclusive := queue.Exclusive() // If the queue is a fanout exchange, then it is not exclusive. This is different from the RabbitMQ @@ -351,12 +346,7 @@ func (p *PostgresMessageQueue) upsertQueue(ctx context.Context, queue msgqueue.Q return err } - // place a lock on the upserted queues - p.upsertedQueuesMu.Lock() - defer p.upsertedQueuesMu.Unlock() - - // add the queue to the upserted queues - p.upsertedQueues[queue.Name()] = true + p.ttlCache.Set(queue.Name(), true, time.Second*15) return nil } diff --git a/pkg/config/loader/loader.go b/pkg/config/loader/loader.go index cfd589e1e..4ca79c455 100644 --- a/pkg/config/loader/loader.go +++ b/pkg/config/loader/loader.go @@ -311,19 +311,28 @@ func createControllerLayer(dc *database.Layer, cf *server.ServerConfigFile, vers if cf.MessageQueue.Enabled { switch strings.ToLower(cf.MessageQueue.Kind) { case "postgres": - l.Warn().Msg("Using a Postgres-backed message queue. This feature is still in beta.") + var cleanupv0 func() error + var cleanupv1 func() error - mq = postgres.NewPostgresMQ( + cleanupv0, mq = postgres.NewPostgresMQ( dc.EngineRepository.MessageQueue(), postgres.WithLogger(&l), postgres.WithQos(cf.MessageQueue.Postgres.Qos), ) - mqv1 = pgmqv1.NewPostgresMQ( + cleanupv1, mqv1 = pgmqv1.NewPostgresMQ( dc.EngineRepository.MessageQueue(), pgmqv1.WithLogger(&l), pgmqv1.WithQos(cf.MessageQueue.Postgres.Qos), ) + + cleanup1 = func() error { + if err := cleanupv0(); err != nil { + return err + } + + return cleanupv1() + } case "rabbitmq": var cleanupv0 func() error var cleanupv1 func() error