improve ttl cache on pgmq (#1438)

* improve ttl cache on pgmq

* fix: panic
This commit is contained in:
abelanger5
2025-03-28 09:27:12 -07:00
committed by GitHub
parent f2a6867068
commit b03a8d2666
3 changed files with 45 additions and 60 deletions

View File

@@ -3,7 +3,6 @@ package postgres
import (
"context"
"encoding/json"
"sync"
"time"
"github.com/google/uuid"
@@ -11,6 +10,7 @@ import (
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
"github.com/hatchet-dev/hatchet/internal/cache"
"github.com/hatchet-dev/hatchet/internal/msgqueue"
"github.com/hatchet-dev/hatchet/internal/queueutils"
"github.com/hatchet-dev/hatchet/internal/telemetry"
@@ -24,9 +24,9 @@ type PostgresMessageQueue struct {
l *zerolog.Logger
qos int
upsertedQueues map[string]bool
upsertedQueuesMu sync.RWMutex
configFs []MessageQueueImplOpt
ttlCache *cache.TTLCache[string, bool]
configFs []MessageQueueImplOpt
}
type MessageQueueImplOpt func(*MessageQueueImplOpts)
@@ -57,30 +57,29 @@ func WithQos(qos int) MessageQueueImplOpt {
}
}
func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImplOpt) *PostgresMessageQueue {
func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImplOpt) (func() error, *PostgresMessageQueue) {
opts := defaultMessageQueueImplOpts()
for _, f := range fs {
f(opts)
}
return &PostgresMessageQueue{
repo: repo,
l: opts.l,
qos: opts.qos,
upsertedQueues: make(map[string]bool),
configFs: fs,
}
}
c := cache.NewTTL[string, bool]()
func (p *PostgresMessageQueue) cleanup() error {
return nil
return func() error {
c.Stop()
return nil
}, &PostgresMessageQueue{
repo: repo,
l: opts.l,
qos: opts.qos,
ttlCache: c,
configFs: fs,
}
}
func (p *PostgresMessageQueue) Clone() (func() error, msgqueue.MessageQueue) {
pCp := NewPostgresMQ(p.repo, p.configFs...)
return pCp.cleanup, pCp
return NewPostgresMQ(p.repo, p.configFs...)
}
func (p *PostgresMessageQueue) SetQOS(prefetchCount int) {
@@ -286,18 +285,10 @@ func (p *PostgresMessageQueue) IsReady() bool {
}
func (p *PostgresMessageQueue) upsertQueue(ctx context.Context, queue msgqueue.Queue) error {
// place a lock on the upserted queues
p.upsertedQueuesMu.RLock()
// check if the queue has been upserted
if _, exists := p.upsertedQueues[queue.Name()]; exists {
p.upsertedQueuesMu.RUnlock()
if valid, exists := p.ttlCache.Get(queue.Name()); valid && exists {
return nil
}
// otherwise, lock for writing
p.upsertedQueuesMu.RUnlock()
exclusive := queue.Exclusive()
// If the queue is a fanout exchange, then it is not exclusive. This is different from the RabbitMQ
@@ -322,12 +313,7 @@ func (p *PostgresMessageQueue) upsertQueue(ctx context.Context, queue msgqueue.Q
return err
}
// place a lock on the upserted queues
p.upsertedQueuesMu.Lock()
defer p.upsertedQueuesMu.Unlock()
// add the queue to the upserted queues
p.upsertedQueues[queue.Name()] = true
p.ttlCache.Set(queue.Name(), true, time.Second*15)
return nil
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"
"github.com/hatchet-dev/hatchet/internal/cache"
msgqueue "github.com/hatchet-dev/hatchet/internal/msgqueue/v1"
"github.com/hatchet-dev/hatchet/internal/queueutils"
"github.com/hatchet-dev/hatchet/internal/telemetry"
@@ -26,6 +27,8 @@ type PostgresMessageQueue struct {
upsertedQueues map[string]bool
upsertedQueuesMu sync.RWMutex
ttlCache *cache.TTLCache[string, bool]
configFs []MessageQueueImplOpt
}
@@ -57,7 +60,7 @@ func WithQos(qos int) MessageQueueImplOpt {
}
}
func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImplOpt) *PostgresMessageQueue {
func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImplOpt) (func() error, *PostgresMessageQueue) {
opts := defaultMessageQueueImplOpts()
for _, f := range fs {
@@ -66,12 +69,15 @@ func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImp
opts.l.Info().Msg("Creating new Postgres message queue")
c := cache.NewTTL[string, bool]()
p := &PostgresMessageQueue{
repo: repo,
l: opts.l,
qos: opts.qos,
upsertedQueues: make(map[string]bool),
configFs: fs,
ttlCache: c,
}
err := p.upsertQueue(context.Background(), msgqueue.TASK_PROCESSING_QUEUE)
@@ -86,17 +92,14 @@ func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImp
p.l.Fatal().Msgf("error upserting queue %s", msgqueue.OLAP_QUEUE.Name())
}
return p
}
func (p *PostgresMessageQueue) cleanup() error {
return nil
return func() error {
c.Stop()
return nil
}, p
}
func (p *PostgresMessageQueue) Clone() (func() error, msgqueue.MessageQueue) {
pCp := NewPostgresMQ(p.repo, p.configFs...)
return pCp.cleanup, pCp
return NewPostgresMQ(p.repo, p.configFs...)
}
func (p *PostgresMessageQueue) SetQOS(prefetchCount int) {
@@ -315,18 +318,10 @@ func (p *PostgresMessageQueue) IsReady() bool {
}
func (p *PostgresMessageQueue) upsertQueue(ctx context.Context, queue msgqueue.Queue) error {
// place a lock on the upserted queues
p.upsertedQueuesMu.RLock()
// check if the queue has been upserted
if _, exists := p.upsertedQueues[queue.Name()]; exists {
p.upsertedQueuesMu.RUnlock()
if valid, exists := p.ttlCache.Get(queue.Name()); valid && exists {
return nil
}
// otherwise, lock for writing
p.upsertedQueuesMu.RUnlock()
exclusive := queue.Exclusive()
// If the queue is a fanout exchange, then it is not exclusive. This is different from the RabbitMQ
@@ -351,12 +346,7 @@ func (p *PostgresMessageQueue) upsertQueue(ctx context.Context, queue msgqueue.Q
return err
}
// place a lock on the upserted queues
p.upsertedQueuesMu.Lock()
defer p.upsertedQueuesMu.Unlock()
// add the queue to the upserted queues
p.upsertedQueues[queue.Name()] = true
p.ttlCache.Set(queue.Name(), true, time.Second*15)
return nil
}

View File

@@ -311,19 +311,28 @@ func createControllerLayer(dc *database.Layer, cf *server.ServerConfigFile, vers
if cf.MessageQueue.Enabled {
switch strings.ToLower(cf.MessageQueue.Kind) {
case "postgres":
l.Warn().Msg("Using a Postgres-backed message queue. This feature is still in beta.")
var cleanupv0 func() error
var cleanupv1 func() error
mq = postgres.NewPostgresMQ(
cleanupv0, mq = postgres.NewPostgresMQ(
dc.EngineRepository.MessageQueue(),
postgres.WithLogger(&l),
postgres.WithQos(cf.MessageQueue.Postgres.Qos),
)
mqv1 = pgmqv1.NewPostgresMQ(
cleanupv1, mqv1 = pgmqv1.NewPostgresMQ(
dc.EngineRepository.MessageQueue(),
pgmqv1.WithLogger(&l),
pgmqv1.WithQos(cf.MessageQueue.Postgres.Qos),
)
cleanup1 = func() error {
if err := cleanupv0(); err != nil {
return err
}
return cleanupv1()
}
case "rabbitmq":
var cleanupv0 func() error
var cleanupv1 func() error