feat: dlq for dispatcher queues (#2600)

* feat: dlq for dispatcher queues

* reduce dispatcher message ttl to 20 seconds

* rename dispatcher queue for clarity

* add error logs when dead lettering

* address comment
This commit is contained in:
abelanger5
2025-12-04 14:19:01 -05:00
committed by GitHub
parent cf18b31218
commit 9dabe7d902
9 changed files with 333 additions and 45 deletions

View File

@@ -27,6 +27,7 @@ type Message struct {
OtelCarrier map[string]string `json:"otel_carrier"`
// Retries is the number of retries for the task.
// Deprecated: retries are set globally at the moment.
Retries int `json:"retries"`
// Compressed indicates whether the payloads are gzip compressed

View File

@@ -32,13 +32,22 @@ type Queue interface {
// IsDLQ returns true if the queue is a dead letter queue.
IsDLQ() bool
// We distinguish between a static DLQ or an automatic DLQ. An automatic DLQ will automatically retry messages
// in a loop with a 5-second backoff, and each subscription to the regular Queue will include a subscription
// to the DLQ.
IsAutoDLQ() bool
// IsExpirable refers to whether the queue itself is expirable
IsExpirable() bool
}
type staticQueue string
const (
TASK_PROCESSING_QUEUE staticQueue = "task_processing_queue_v2"
OLAP_QUEUE staticQueue = "olap_queue_v2"
TASK_PROCESSING_QUEUE staticQueue = "task_processing_queue_v2"
OLAP_QUEUE staticQueue = "olap_queue_v2"
DISPATCHER_DEAD_LETTER_QUEUE staticQueue = "dispatcher_dlq_v2"
)
func (s staticQueue) Name() string {
@@ -61,28 +70,43 @@ func (s staticQueue) FanoutExchangeKey() string {
return ""
}
func (s staticQueue) DLQ() Queue {
name := fmt.Sprintf("%s_dlq", s)
return dlq{
staticQueue: staticQueue(name),
isAutoDLQ: true,
}
}
func (s staticQueue) IsDLQ() bool {
return false
}
func (s staticQueue) IsAutoDLQ() bool {
return false
}
func (s staticQueue) IsExpirable() bool {
return false
}
type dlq struct {
staticQueue
isAutoDLQ bool
}
func (d dlq) IsDLQ() bool {
return true
}
func (d dlq) DLQ() Queue {
return nil
func (d dlq) IsAutoDLQ() bool {
return d.isAutoDLQ
}
func (s staticQueue) DLQ() Queue {
name := fmt.Sprintf("%s_dlq", s)
return dlq{
staticQueue: staticQueue(name),
}
func (d dlq) DLQ() Queue {
return nil
}
func NewRandomStaticQueue() staticQueue {
@@ -90,6 +114,55 @@ func NewRandomStaticQueue() staticQueue {
return staticQueue(fmt.Sprintf("random_static_queue_v2_%s", randBytes))
}
// dispatcherQueue is a type of queue which is durable and exclusive, but utilizes a per-queue TTL
// and per-message TTL + DLX to handle messages which are not consumed within a certain time period,
// for example if the dispatcher goes down.
type dispatcherQueue string
func (d dispatcherQueue) Name() string {
return string(d)
}
func (d dispatcherQueue) Durable() bool {
return true
}
func (d dispatcherQueue) AutoDeleted() bool {
return false
}
func (d dispatcherQueue) Exclusive() bool {
return true
}
func (d dispatcherQueue) FanoutExchangeKey() string {
return ""
}
func (d dispatcherQueue) DLQ() Queue {
return dlq{
staticQueue: DISPATCHER_DEAD_LETTER_QUEUE,
// we maintain a completely separate DLQ for dispatcher queues
isAutoDLQ: false,
}
}
func (d dispatcherQueue) IsDLQ() bool {
return false
}
func (d dispatcherQueue) IsAutoDLQ() bool {
return false
}
func (d dispatcherQueue) IsExpirable() bool {
return true
}
func QueueTypeFromDispatcherID(d string) dispatcherQueue {
return dispatcherQueue(d + "_dispatcher_v1")
}
type consumerQueue string
func (s consumerQueue) Name() string {
@@ -120,12 +193,13 @@ func (n consumerQueue) IsDLQ() bool {
return false
}
func QueueTypeFromDispatcherID(d string) consumerQueue {
return consumerQueue(d + "_v1")
func (n consumerQueue) IsAutoDLQ() bool {
return false
}
func QueueTypeFromTickerID(t string) consumerQueue {
return consumerQueue(t + "_v1")
func (n consumerQueue) IsExpirable() bool {
// since exclusive and auto-deleted, it's not expirable
return false
}
const (
@@ -160,7 +234,7 @@ type AckHook func(task *Message) error
type MessageQueue interface {
// Clone copies the message queue with a new instance.
Clone() (func() error, MessageQueue)
Clone() (func() error, MessageQueue, error)
// SetQOS sets the quality of service for the message queue.
SetQOS(prefetchCount int)

View File

@@ -3,6 +3,7 @@ package postgres
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
@@ -61,7 +62,7 @@ func WithQos(qos int) MessageQueueImplOpt {
}
}
func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImplOpt) (func() error, *PostgresMessageQueue) {
func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImplOpt) (func() error, *PostgresMessageQueue, error) {
opts := defaultMessageQueueImplOpts()
for _, f := range fs {
@@ -84,22 +85,28 @@ func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImp
err := p.upsertQueue(context.Background(), msgqueue.TASK_PROCESSING_QUEUE)
if err != nil {
p.l.Fatal().Msgf("error upserting queue %s", msgqueue.TASK_PROCESSING_QUEUE.Name())
return nil, nil, fmt.Errorf("error upserting queue %s: %w", msgqueue.TASK_PROCESSING_QUEUE.Name(), err)
}
err = p.upsertQueue(context.Background(), msgqueue.OLAP_QUEUE)
if err != nil {
p.l.Fatal().Msgf("error upserting queue %s", msgqueue.OLAP_QUEUE.Name())
return nil, nil, fmt.Errorf("error upserting queue %s: %w", msgqueue.OLAP_QUEUE.Name(), err)
}
err = p.upsertQueue(context.Background(), msgqueue.DISPATCHER_DEAD_LETTER_QUEUE)
if err != nil {
return nil, nil, fmt.Errorf("error upserting queue %s: %w", msgqueue.DISPATCHER_DEAD_LETTER_QUEUE.Name(), err)
}
return func() error {
c.Stop()
return nil
}, p
}, p, nil
}
func (p *PostgresMessageQueue) Clone() (func() error, msgqueue.MessageQueue) {
func (p *PostgresMessageQueue) Clone() (func() error, msgqueue.MessageQueue, error) {
return NewPostgresMQ(p.repo, p.configFs...)
}
@@ -361,8 +368,17 @@ func (p *PostgresMessageQueue) upsertQueue(ctx context.Context, queue msgqueue.Q
consumer = &str
}
autoDeleted := queue.AutoDeleted()
// FIXME: note that this differs from the RabbitMQ implementation, since we auto-delete Postgres MQs after
// 1 hour of inactivity instead of immediately. So if the queue is expirable, we set it to autoDeleted and
// will get effectively the same behavior.
if queue.IsExpirable() {
autoDeleted = true
}
// bind the queue
err := p.repo.BindQueue(ctx, queue.Name(), queue.Durable(), queue.AutoDeleted(), exclusive, consumer)
err := p.repo.BindQueue(ctx, queue.Name(), queue.Durable(), autoDeleted, exclusive, consumer)
if err != nil {
p.l.Error().Err(err).Msg("error binding queue")

View File

@@ -152,7 +152,7 @@ func WithMessageRejection(enabled bool, maxDeathCount int) MessageQueueImplOpt {
}
// New creates a new MessageQueueImpl.
func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) {
func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl, error) {
ctx, cancel := context.WithCancel(context.Background())
opts := defaultMessageQueueImplOpts()
@@ -174,7 +174,7 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) {
if err != nil {
cancel()
return nil, nil
return nil, nil, err
}
subMaxChans := opts.maxSubChannels
@@ -188,7 +188,7 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) {
if err != nil {
pubChannelPool.Close()
cancel()
return nil, nil
return nil, nil, err
}
t := &MessageQueueImpl{
@@ -217,7 +217,7 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) {
if err != nil {
t.l.Error().Msgf("[New] cannot acquire channel: %v", err)
cancel()
return nil, nil
return nil, nil, err
}
ch := poolCh.Value()
@@ -225,24 +225,27 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) {
defer poolCh.Release()
if _, err := t.initQueue(ch, msgqueue.TASK_PROCESSING_QUEUE); err != nil {
t.l.Debug().Msgf("error initializing queue: %v", err)
cancel()
return nil, nil
return nil, nil, fmt.Errorf("failed to initialize queue: %w", err)
}
if _, err := t.initQueue(ch, msgqueue.OLAP_QUEUE); err != nil {
t.l.Debug().Msgf("error initializing queue: %v", err)
cancel()
return nil, nil
return nil, nil, fmt.Errorf("failed to initialize queue: %w", err)
}
if _, err := t.initQueue(ch, msgqueue.DISPATCHER_DEAD_LETTER_QUEUE); err != nil {
cancel()
return nil, nil, fmt.Errorf("failed to initialize queue: %w", err)
}
return func() error {
cancel()
return nil
}, t
}, t, nil
}
func (t *MessageQueueImpl) Clone() (func() error, msgqueue.MessageQueue) {
func (t *MessageQueueImpl) Clone() (func() error, msgqueue.MessageQueue, error) {
return New(t.configFs...)
}
@@ -481,7 +484,8 @@ func (t *MessageQueueImpl) Subscribe(
return nil, err
}
if q.DLQ() != nil {
// only automatic DLQs get subscribed to, static DLQs require a separate subscription
if q.DLQ() != nil && q.DLQ().IsAutoDLQ() {
cleanupSubDLQ, err := t.subscribe(ctx, t.identity, q.DLQ(), preAck, postAck)
if err != nil {
@@ -579,7 +583,7 @@ func (t *MessageQueueImpl) initQueue(ch *amqp.Channel, q msgqueue.Queue) (string
name = fmt.Sprintf("%s-%s", q.Name(), suffix)
}
if !q.IsDLQ() && q.DLQ() != nil {
if !q.IsDLQ() && q.DLQ() != nil && q.DLQ().IsAutoDLQ() {
dlx1 := getTmpDLQName(q.DLQ().Name())
dlx2 := getProcDLQName(q.DLQ().Name())
@@ -609,6 +613,16 @@ func (t *MessageQueueImpl) initQueue(ch *amqp.Channel, q msgqueue.Queue) (string
}
}
if !q.IsDLQ() && q.DLQ() != nil && !q.DLQ().IsAutoDLQ() {
args["x-dead-letter-exchange"] = ""
args["x-dead-letter-routing-key"] = q.DLQ().Name()
}
if q.IsExpirable() {
args["x-message-ttl"] = int32(20000) // 20 seconds
args["x-expires"] = int32(600000) // 10 minutes
}
if _, err := ch.QueueDeclare(name, q.Durable(), q.AutoDeleted(), q.Exclusive(), false, args); err != nil {
t.l.Error().Msgf("cannot declare queue: %q, %v", name, err)
return "", err
@@ -712,7 +726,7 @@ func (t *MessageQueueImpl) subscribe(
poolCh.Release()
}
if q.IsDLQ() {
if q.IsDLQ() && q.IsAutoDLQ() {
queueName = getProcDLQName(q.Name())
}

View File

@@ -30,13 +30,15 @@ func TestMessageQueueIntegration(t *testing.T) {
url := "amqp://user:password@localhost:5672/"
// Initialize the task queue implementation
cleanup, tq := New(
cleanup, tq, err := New(
WithURL(url),
WithQos(100),
WithDeadLetterBackoff(5*time.Second),
WithMessageRejection(false, 10), // Disable message rejection for this test
)
require.Nil(t, err, "error should be nil")
require.NotNil(t, tq, "task queue implementation should not be nil")
id, _ := random.Generate(8) // nolint: errcheck
@@ -122,12 +124,14 @@ func TestBufferedSubMessageQueueIntegration(t *testing.T) {
url := "amqp://user:password@localhost:5672/"
// Initialize the task queue implementation
cleanup, tq := New(
cleanup, tq, err := New(
WithURL(url),
WithQos(100),
WithDeadLetterBackoff(5*time.Second),
)
require.Nil(t, err, "error should be nil")
require.NotNil(t, tq, "task queue implementation should not be nil")
id, _ := random.Generate(8) // nolint: errcheck
@@ -196,12 +200,14 @@ func TestBufferedPubMessageQueueIntegration(t *testing.T) {
url := "amqp://user:password@localhost:5672/"
// Initialize the task queue implementation
cleanup, tq := New(
cleanup, tq, err := New(
WithURL(url),
WithQos(100),
WithDeadLetterBackoff(5*time.Second),
)
require.Nil(t, err, "error should be nil")
require.NotNil(t, tq, "task queue implementation should not be nil")
id, _ := random.Generate(8) // nolint: errcheck
@@ -268,13 +274,15 @@ func TestDeadLetteringSuccess(t *testing.T) {
url := "amqp://user:password@localhost:5672/"
// Initialize the task queue implementation
cleanup, tq := New(
cleanup, tq, err := New(
WithURL(url),
WithQos(100),
WithDeadLetterBackoff(5*time.Second),
WithMessageRejection(false, 10), // Disable message rejection for this test
)
require.Nil(t, err, "error should be nil")
require.NotNil(t, tq, "task queue implementation should not be nil")
id, _ := random.Generate(8) // nolint: errcheck

View File

@@ -1,6 +1,7 @@
package queueutils
import (
"math"
"math/rand"
"time"
)
@@ -13,9 +14,17 @@ func SleepWithExponentialBackoff(base, max time.Duration, retryCount int) { // n
retryCount = 0
}
// prevent overflow
pow := time.Duration(math.MaxInt64)
if retryCount < 63 {
pow = 1 << retryCount
}
// Calculate exponential backoff
backoff := base * (1 << retryCount)
if backoff > max {
backoff := base * pow
// if backoff / pow does not recover base, we've overflowed
if backoff > max || backoff/pow != base {
backoff = max
}

View File

@@ -246,7 +246,11 @@ func New(fs ...OLAPControllerOpt) (*OLAPControllerImpl, error) {
}
func (o *OLAPControllerImpl) Start() (func() error, error) {
cleanupHeavyReadMQ, heavyReadMQ := o.mq.Clone()
cleanupHeavyReadMQ, heavyReadMQ, err := o.mq.Clone()
if err != nil {
return nil, err
}
heavyReadMQ.SetQOS(2000)
o.s.Start()
@@ -282,7 +286,7 @@ func (o *OLAPControllerImpl) Start() (func() error, error) {
}()
}
_, err := o.s.NewJob(
_, err = o.s.NewJob(
gocron.DurationJob(time.Minute*15),
gocron.NewTask(
o.runOLAPTablePartition(ctx),

View File

@@ -192,12 +192,17 @@ func New(
}
func (s *Scheduler) Start() (func() error, error) {
cleanupDLQ, err := s.mq.Subscribe(msgqueue.DISPATCHER_DEAD_LETTER_QUEUE, s.handleDeadLetteredMessages, msgqueue.NoOpHook)
if err != nil {
return nil, fmt.Errorf("could not start subscribe to dispatcher dead letter queue: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
_, err := s.s.NewJob(
_, err = s.s.NewJob(
gocron.DurationJob(time.Second*1),
gocron.NewTask(
s.runSetTenants(ctx),
@@ -287,6 +292,10 @@ func (s *Scheduler) Start() (func() error, error) {
return fmt.Errorf("could not cleanup job processing queue: %w", err)
}
if err := cleanupDLQ(); err != nil {
return fmt.Errorf("could not cleanup message queue buffer: %w", err)
}
if err := s.s.Shutdown(); err != nil {
return fmt.Errorf("could not shutdown scheduler: %w", err)
}
@@ -444,6 +453,7 @@ func (s *Scheduler) scheduleStepRuns(ctx context.Context, tenantId string, res *
if err != nil {
outerErr = multierror.Append(outerErr, fmt.Errorf("could not create bulk assigned task: %w", err))
continue
}
err = s.mq.SendMessage(
@@ -700,3 +710,147 @@ func taskBulkAssignedTask(tenantId string, workerIdsToTaskIds map[string][]int64
},
)
}
func (s *Scheduler) handleDeadLetteredMessages(msg *msgqueue.Message) (err error) {
defer func() {
if r := recover(); r != nil {
recoverErr := recoveryutils.RecoverWithAlert(s.l, s.a, r)
if recoverErr != nil {
err = recoverErr
}
}
}()
ctx, cancel := context.WithTimeout(context.Background(), 25*time.Second)
defer cancel()
switch msg.ID {
case "task-assigned-bulk":
err = s.handleDeadLetteredTaskBulkAssigned(ctx, msg)
case "task-cancelled":
err = s.handleDeadLetteredTaskCancelled(ctx, msg)
default:
err = fmt.Errorf("unknown task: %s", msg.ID)
}
return err
}
func (s *Scheduler) handleDeadLetteredTaskBulkAssigned(ctx context.Context, msg *msgqueue.Message) error {
msgs := msgqueue.JSONConvert[tasktypes.TaskAssignedBulkTaskPayload](msg.Payloads)
taskIds := make([]int64, 0)
for _, innerMsg := range msgs {
for _, tasks := range innerMsg.WorkerIdToTaskIds {
s.l.Error().Msgf("handling dead-lettered task assignments for tenant %s, tasks: %v. This indicates an abrupt shutdown of a dispatcher and should be investigated.", msg.TenantID, tasks)
taskIds = append(taskIds, tasks...)
}
}
toFail, err := s.repov1.Tasks().ListTasks(ctx, msg.TenantID, taskIds)
if err != nil {
return fmt.Errorf("could not list tasks for dead lettered bulk assigned message: %w", err)
}
for _, _task := range toFail {
tenantId := msg.TenantID
task := _task
msg, err := tasktypes.FailedTaskMessage(
tenantId,
task.ID,
task.InsertedAt,
sqlchelpers.UUIDToStr(task.ExternalID),
sqlchelpers.UUIDToStr(task.WorkflowRunID),
task.RetryCount,
false,
"Could not send task to worker",
false,
)
if err != nil {
return fmt.Errorf("could not create failed task message: %w", err)
}
err = s.mq.SendMessage(ctx, msgqueue.TASK_PROCESSING_QUEUE, msg)
if err != nil {
// NOTE: failure to send on the MQ is likely not transient; ideally we could only retry individual
// tasks but since this message has the tasks in a batch, we retry all of them instead. we're banking
// on the downstream `task-failed` processing to be idempotent.
return fmt.Errorf("could not send failed task message: %w", err)
}
}
return nil
}
func (s *Scheduler) handleDeadLetteredTaskCancelled(ctx context.Context, msg *msgqueue.Message) error {
payloads := msgqueue.JSONConvert[tasktypes.SignalTaskCancelledPayload](msg.Payloads)
// try to resend the cancellation signal to the impacted worker.
workerIds := make([]string, 0)
for _, p := range payloads {
s.l.Error().Msgf("handling dead-lettered task cancellations for tenant %s, task %d. This indicates an abrupt shutdown of a dispatcher and should be investigated.", msg.TenantID, p.TaskId)
workerIds = append(workerIds, p.WorkerId)
}
// since the dispatcher IDs may have changed since the previous send, we need to query them again
dispatcherIdWorkerIds, err := s.repo.Worker().GetDispatcherIdsForWorkers(ctx, msg.TenantID, workerIds)
if err != nil {
return fmt.Errorf("could not list dispatcher ids for workers: %w", err)
}
workerIdToDispatcherId := make(map[string]string)
for dispatcherId, workerIds := range dispatcherIdWorkerIds {
for _, workerId := range workerIds {
workerIdToDispatcherId[workerId] = dispatcherId
}
}
dispatcherIdsToPayloads := make(map[string][]tasktypes.SignalTaskCancelledPayload)
for _, p := range payloads {
// if we no longer have the worker attached to a dispatcher, discard the message
if _, ok := workerIdToDispatcherId[p.WorkerId]; !ok {
continue
}
pcp := *p
dispatcherId := workerIdToDispatcherId[pcp.WorkerId]
dispatcherIdsToPayloads[dispatcherId] = append(dispatcherIdsToPayloads[dispatcherId], pcp)
}
for dispatcherId, payloads := range dispatcherIdsToPayloads {
msg, err := msgqueue.NewTenantMessage(
msg.TenantID,
"task-cancelled",
false,
true,
payloads...,
)
if err != nil {
return fmt.Errorf("could not create message for task cancellation: %w", err)
}
err = s.mq.SendMessage(
ctx,
msgqueue.QueueTypeFromDispatcherID(dispatcherId),
msg,
)
if err != nil {
return fmt.Errorf("could not send message for task cancellation: %w", err)
}
}
return nil
}

View File

@@ -418,12 +418,16 @@ func createControllerLayer(dc *database.Layer, cf *server.ServerConfigFile, vers
postgres.WithQos(cf.MessageQueue.Postgres.Qos),
)
cleanupv1, mqv1 = pgmqv1.NewPostgresMQ(
cleanupv1, mqv1, err = pgmqv1.NewPostgresMQ(
dc.EngineRepository.MessageQueue(),
pgmqv1.WithLogger(&l),
pgmqv1.WithQos(cf.MessageQueue.Postgres.Qos),
)
if err != nil {
return nil, nil, fmt.Errorf("could not init postgres queue: %w", err)
}
cleanup1 = func() error {
if err := cleanupv0(); err != nil {
return err
@@ -447,7 +451,7 @@ func createControllerLayer(dc *database.Layer, cf *server.ServerConfigFile, vers
rabbitmq.WithMessageRejection(cf.MessageQueue.RabbitMQ.EnableMessageRejection, cf.MessageQueue.RabbitMQ.MaxDeathCount),
)
cleanupv1, mqv1 = rabbitmqv1.New(
cleanupv1, mqv1, err = rabbitmqv1.New(
rabbitmqv1.WithURL(cf.MessageQueue.RabbitMQ.URL),
rabbitmqv1.WithLogger(&l),
rabbitmqv1.WithQos(cf.MessageQueue.RabbitMQ.Qos),
@@ -461,6 +465,10 @@ func createControllerLayer(dc *database.Layer, cf *server.ServerConfigFile, vers
rabbitmqv1.WithMessageRejection(cf.MessageQueue.RabbitMQ.EnableMessageRejection, cf.MessageQueue.RabbitMQ.MaxDeathCount),
)
if err != nil {
return nil, nil, fmt.Errorf("could not init rabbitmq: %w", err)
}
cleanup1 = func() error {
if err := cleanupv0(); err != nil {
return err