From 9dabe7d902026122a2300a25349d521fdd22d584 Mon Sep 17 00:00:00 2001 From: abelanger5 Date: Thu, 4 Dec 2025 14:19:01 -0500 Subject: [PATCH] feat: dlq for dispatcher queues (#2600) * feat: dlq for dispatcher queues * reduce dispatcher message ttl to 20 seconds * rename dispatcher queue for clarity * add error logs when dead lettering * address comment --- internal/msgqueue/v1/msg.go | 1 + internal/msgqueue/v1/msgqueue.go | 104 ++++++++++-- internal/msgqueue/v1/postgres/msgqueue.go | 28 +++- internal/msgqueue/v1/rabbitmq/rabbitmq.go | 40 +++-- .../msgqueue/v1/rabbitmq/rabbitmq_test.go | 16 +- internal/queueutils/backoff.go | 13 +- .../controllers/v1/olap/controller.go | 8 +- internal/services/scheduler/v1/scheduler.go | 156 +++++++++++++++++- pkg/config/loader/loader.go | 12 +- 9 files changed, 333 insertions(+), 45 deletions(-) diff --git a/internal/msgqueue/v1/msg.go b/internal/msgqueue/v1/msg.go index a32e05e11..637a75354 100644 --- a/internal/msgqueue/v1/msg.go +++ b/internal/msgqueue/v1/msg.go @@ -27,6 +27,7 @@ type Message struct { OtelCarrier map[string]string `json:"otel_carrier"` // Retries is the number of retries for the task. + // Deprecated: retries are set globally at the moment. Retries int `json:"retries"` // Compressed indicates whether the payloads are gzip compressed diff --git a/internal/msgqueue/v1/msgqueue.go b/internal/msgqueue/v1/msgqueue.go index bc7cb1cff..8ec69085a 100644 --- a/internal/msgqueue/v1/msgqueue.go +++ b/internal/msgqueue/v1/msgqueue.go @@ -32,13 +32,22 @@ type Queue interface { // IsDLQ returns true if the queue is a dead letter queue. IsDLQ() bool + + // We distinguish between a static DLQ or an automatic DLQ. An automatic DLQ will automatically retry messages + // in a loop with a 5-second backoff, and each subscription to the regular Queue will include a subscription + // to the DLQ. + IsAutoDLQ() bool + + // IsExpirable refers to whether the queue itself is expirable + IsExpirable() bool } type staticQueue string const ( - TASK_PROCESSING_QUEUE staticQueue = "task_processing_queue_v2" - OLAP_QUEUE staticQueue = "olap_queue_v2" + TASK_PROCESSING_QUEUE staticQueue = "task_processing_queue_v2" + OLAP_QUEUE staticQueue = "olap_queue_v2" + DISPATCHER_DEAD_LETTER_QUEUE staticQueue = "dispatcher_dlq_v2" ) func (s staticQueue) Name() string { @@ -61,28 +70,43 @@ func (s staticQueue) FanoutExchangeKey() string { return "" } +func (s staticQueue) DLQ() Queue { + name := fmt.Sprintf("%s_dlq", s) + + return dlq{ + staticQueue: staticQueue(name), + isAutoDLQ: true, + } +} + func (s staticQueue) IsDLQ() bool { return false } +func (s staticQueue) IsAutoDLQ() bool { + return false +} + +func (s staticQueue) IsExpirable() bool { + return false +} + type dlq struct { staticQueue + + isAutoDLQ bool } func (d dlq) IsDLQ() bool { return true } -func (d dlq) DLQ() Queue { - return nil +func (d dlq) IsAutoDLQ() bool { + return d.isAutoDLQ } -func (s staticQueue) DLQ() Queue { - name := fmt.Sprintf("%s_dlq", s) - - return dlq{ - staticQueue: staticQueue(name), - } +func (d dlq) DLQ() Queue { + return nil } func NewRandomStaticQueue() staticQueue { @@ -90,6 +114,55 @@ func NewRandomStaticQueue() staticQueue { return staticQueue(fmt.Sprintf("random_static_queue_v2_%s", randBytes)) } +// dispatcherQueue is a type of queue which is durable and exclusive, but utilizes a per-queue TTL +// and per-message TTL + DLX to handle messages which are not consumed within a certain time period, +// for example if the dispatcher goes down. +type dispatcherQueue string + +func (d dispatcherQueue) Name() string { + return string(d) +} + +func (d dispatcherQueue) Durable() bool { + return true +} + +func (d dispatcherQueue) AutoDeleted() bool { + return false +} + +func (d dispatcherQueue) Exclusive() bool { + return true +} + +func (d dispatcherQueue) FanoutExchangeKey() string { + return "" +} + +func (d dispatcherQueue) DLQ() Queue { + return dlq{ + staticQueue: DISPATCHER_DEAD_LETTER_QUEUE, + // we maintain a completely separate DLQ for dispatcher queues + isAutoDLQ: false, + } +} + +func (d dispatcherQueue) IsDLQ() bool { + return false +} + +func (d dispatcherQueue) IsAutoDLQ() bool { + return false +} + +func (d dispatcherQueue) IsExpirable() bool { + return true +} + +func QueueTypeFromDispatcherID(d string) dispatcherQueue { + return dispatcherQueue(d + "_dispatcher_v1") +} + type consumerQueue string func (s consumerQueue) Name() string { @@ -120,12 +193,13 @@ func (n consumerQueue) IsDLQ() bool { return false } -func QueueTypeFromDispatcherID(d string) consumerQueue { - return consumerQueue(d + "_v1") +func (n consumerQueue) IsAutoDLQ() bool { + return false } -func QueueTypeFromTickerID(t string) consumerQueue { - return consumerQueue(t + "_v1") +func (n consumerQueue) IsExpirable() bool { + // since exclusive and auto-deleted, it's not expirable + return false } const ( @@ -160,7 +234,7 @@ type AckHook func(task *Message) error type MessageQueue interface { // Clone copies the message queue with a new instance. - Clone() (func() error, MessageQueue) + Clone() (func() error, MessageQueue, error) // SetQOS sets the quality of service for the message queue. SetQOS(prefetchCount int) diff --git a/internal/msgqueue/v1/postgres/msgqueue.go b/internal/msgqueue/v1/postgres/msgqueue.go index 330b3c79c..b1dc87b61 100644 --- a/internal/msgqueue/v1/postgres/msgqueue.go +++ b/internal/msgqueue/v1/postgres/msgqueue.go @@ -3,6 +3,7 @@ package postgres import ( "context" "encoding/json" + "fmt" "sync" "time" @@ -61,7 +62,7 @@ func WithQos(qos int) MessageQueueImplOpt { } } -func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImplOpt) (func() error, *PostgresMessageQueue) { +func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImplOpt) (func() error, *PostgresMessageQueue, error) { opts := defaultMessageQueueImplOpts() for _, f := range fs { @@ -84,22 +85,28 @@ func NewPostgresMQ(repo repository.MessageQueueRepository, fs ...MessageQueueImp err := p.upsertQueue(context.Background(), msgqueue.TASK_PROCESSING_QUEUE) if err != nil { - p.l.Fatal().Msgf("error upserting queue %s", msgqueue.TASK_PROCESSING_QUEUE.Name()) + return nil, nil, fmt.Errorf("error upserting queue %s: %w", msgqueue.TASK_PROCESSING_QUEUE.Name(), err) } err = p.upsertQueue(context.Background(), msgqueue.OLAP_QUEUE) if err != nil { - p.l.Fatal().Msgf("error upserting queue %s", msgqueue.OLAP_QUEUE.Name()) + return nil, nil, fmt.Errorf("error upserting queue %s: %w", msgqueue.OLAP_QUEUE.Name(), err) + } + + err = p.upsertQueue(context.Background(), msgqueue.DISPATCHER_DEAD_LETTER_QUEUE) + + if err != nil { + return nil, nil, fmt.Errorf("error upserting queue %s: %w", msgqueue.DISPATCHER_DEAD_LETTER_QUEUE.Name(), err) } return func() error { c.Stop() return nil - }, p + }, p, nil } -func (p *PostgresMessageQueue) Clone() (func() error, msgqueue.MessageQueue) { +func (p *PostgresMessageQueue) Clone() (func() error, msgqueue.MessageQueue, error) { return NewPostgresMQ(p.repo, p.configFs...) } @@ -361,8 +368,17 @@ func (p *PostgresMessageQueue) upsertQueue(ctx context.Context, queue msgqueue.Q consumer = &str } + autoDeleted := queue.AutoDeleted() + + // FIXME: note that this differs from the RabbitMQ implementation, since we auto-delete Postgres MQs after + // 1 hour of inactivity instead of immediately. So if the queue is expirable, we set it to autoDeleted and + // will get effectively the same behavior. + if queue.IsExpirable() { + autoDeleted = true + } + // bind the queue - err := p.repo.BindQueue(ctx, queue.Name(), queue.Durable(), queue.AutoDeleted(), exclusive, consumer) + err := p.repo.BindQueue(ctx, queue.Name(), queue.Durable(), autoDeleted, exclusive, consumer) if err != nil { p.l.Error().Err(err).Msg("error binding queue") diff --git a/internal/msgqueue/v1/rabbitmq/rabbitmq.go b/internal/msgqueue/v1/rabbitmq/rabbitmq.go index 2eec90d92..f6620bf89 100644 --- a/internal/msgqueue/v1/rabbitmq/rabbitmq.go +++ b/internal/msgqueue/v1/rabbitmq/rabbitmq.go @@ -152,7 +152,7 @@ func WithMessageRejection(enabled bool, maxDeathCount int) MessageQueueImplOpt { } // New creates a new MessageQueueImpl. -func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) { +func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl, error) { ctx, cancel := context.WithCancel(context.Background()) opts := defaultMessageQueueImplOpts() @@ -174,7 +174,7 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) { if err != nil { cancel() - return nil, nil + return nil, nil, err } subMaxChans := opts.maxSubChannels @@ -188,7 +188,7 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) { if err != nil { pubChannelPool.Close() cancel() - return nil, nil + return nil, nil, err } t := &MessageQueueImpl{ @@ -217,7 +217,7 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) { if err != nil { t.l.Error().Msgf("[New] cannot acquire channel: %v", err) cancel() - return nil, nil + return nil, nil, err } ch := poolCh.Value() @@ -225,24 +225,27 @@ func New(fs ...MessageQueueImplOpt) (func() error, *MessageQueueImpl) { defer poolCh.Release() if _, err := t.initQueue(ch, msgqueue.TASK_PROCESSING_QUEUE); err != nil { - t.l.Debug().Msgf("error initializing queue: %v", err) cancel() - return nil, nil + return nil, nil, fmt.Errorf("failed to initialize queue: %w", err) } if _, err := t.initQueue(ch, msgqueue.OLAP_QUEUE); err != nil { - t.l.Debug().Msgf("error initializing queue: %v", err) cancel() - return nil, nil + return nil, nil, fmt.Errorf("failed to initialize queue: %w", err) + } + + if _, err := t.initQueue(ch, msgqueue.DISPATCHER_DEAD_LETTER_QUEUE); err != nil { + cancel() + return nil, nil, fmt.Errorf("failed to initialize queue: %w", err) } return func() error { cancel() return nil - }, t + }, t, nil } -func (t *MessageQueueImpl) Clone() (func() error, msgqueue.MessageQueue) { +func (t *MessageQueueImpl) Clone() (func() error, msgqueue.MessageQueue, error) { return New(t.configFs...) } @@ -481,7 +484,8 @@ func (t *MessageQueueImpl) Subscribe( return nil, err } - if q.DLQ() != nil { + // only automatic DLQs get subscribed to, static DLQs require a separate subscription + if q.DLQ() != nil && q.DLQ().IsAutoDLQ() { cleanupSubDLQ, err := t.subscribe(ctx, t.identity, q.DLQ(), preAck, postAck) if err != nil { @@ -579,7 +583,7 @@ func (t *MessageQueueImpl) initQueue(ch *amqp.Channel, q msgqueue.Queue) (string name = fmt.Sprintf("%s-%s", q.Name(), suffix) } - if !q.IsDLQ() && q.DLQ() != nil { + if !q.IsDLQ() && q.DLQ() != nil && q.DLQ().IsAutoDLQ() { dlx1 := getTmpDLQName(q.DLQ().Name()) dlx2 := getProcDLQName(q.DLQ().Name()) @@ -609,6 +613,16 @@ func (t *MessageQueueImpl) initQueue(ch *amqp.Channel, q msgqueue.Queue) (string } } + if !q.IsDLQ() && q.DLQ() != nil && !q.DLQ().IsAutoDLQ() { + args["x-dead-letter-exchange"] = "" + args["x-dead-letter-routing-key"] = q.DLQ().Name() + } + + if q.IsExpirable() { + args["x-message-ttl"] = int32(20000) // 20 seconds + args["x-expires"] = int32(600000) // 10 minutes + } + if _, err := ch.QueueDeclare(name, q.Durable(), q.AutoDeleted(), q.Exclusive(), false, args); err != nil { t.l.Error().Msgf("cannot declare queue: %q, %v", name, err) return "", err @@ -712,7 +726,7 @@ func (t *MessageQueueImpl) subscribe( poolCh.Release() } - if q.IsDLQ() { + if q.IsDLQ() && q.IsAutoDLQ() { queueName = getProcDLQName(q.Name()) } diff --git a/internal/msgqueue/v1/rabbitmq/rabbitmq_test.go b/internal/msgqueue/v1/rabbitmq/rabbitmq_test.go index ca40d7e05..bee612d51 100644 --- a/internal/msgqueue/v1/rabbitmq/rabbitmq_test.go +++ b/internal/msgqueue/v1/rabbitmq/rabbitmq_test.go @@ -30,13 +30,15 @@ func TestMessageQueueIntegration(t *testing.T) { url := "amqp://user:password@localhost:5672/" // Initialize the task queue implementation - cleanup, tq := New( + cleanup, tq, err := New( WithURL(url), WithQos(100), WithDeadLetterBackoff(5*time.Second), WithMessageRejection(false, 10), // Disable message rejection for this test ) + require.Nil(t, err, "error should be nil") + require.NotNil(t, tq, "task queue implementation should not be nil") id, _ := random.Generate(8) // nolint: errcheck @@ -122,12 +124,14 @@ func TestBufferedSubMessageQueueIntegration(t *testing.T) { url := "amqp://user:password@localhost:5672/" // Initialize the task queue implementation - cleanup, tq := New( + cleanup, tq, err := New( WithURL(url), WithQos(100), WithDeadLetterBackoff(5*time.Second), ) + require.Nil(t, err, "error should be nil") + require.NotNil(t, tq, "task queue implementation should not be nil") id, _ := random.Generate(8) // nolint: errcheck @@ -196,12 +200,14 @@ func TestBufferedPubMessageQueueIntegration(t *testing.T) { url := "amqp://user:password@localhost:5672/" // Initialize the task queue implementation - cleanup, tq := New( + cleanup, tq, err := New( WithURL(url), WithQos(100), WithDeadLetterBackoff(5*time.Second), ) + require.Nil(t, err, "error should be nil") + require.NotNil(t, tq, "task queue implementation should not be nil") id, _ := random.Generate(8) // nolint: errcheck @@ -268,13 +274,15 @@ func TestDeadLetteringSuccess(t *testing.T) { url := "amqp://user:password@localhost:5672/" // Initialize the task queue implementation - cleanup, tq := New( + cleanup, tq, err := New( WithURL(url), WithQos(100), WithDeadLetterBackoff(5*time.Second), WithMessageRejection(false, 10), // Disable message rejection for this test ) + require.Nil(t, err, "error should be nil") + require.NotNil(t, tq, "task queue implementation should not be nil") id, _ := random.Generate(8) // nolint: errcheck diff --git a/internal/queueutils/backoff.go b/internal/queueutils/backoff.go index 060ba8bbc..27f370635 100644 --- a/internal/queueutils/backoff.go +++ b/internal/queueutils/backoff.go @@ -1,6 +1,7 @@ package queueutils import ( + "math" "math/rand" "time" ) @@ -13,9 +14,17 @@ func SleepWithExponentialBackoff(base, max time.Duration, retryCount int) { // n retryCount = 0 } + // prevent overflow + pow := time.Duration(math.MaxInt64) + if retryCount < 63 { + pow = 1 << retryCount + } + // Calculate exponential backoff - backoff := base * (1 << retryCount) - if backoff > max { + backoff := base * pow + + // if backoff / pow does not recover base, we've overflowed + if backoff > max || backoff/pow != base { backoff = max } diff --git a/internal/services/controllers/v1/olap/controller.go b/internal/services/controllers/v1/olap/controller.go index 967ea0de3..0f5a4843e 100644 --- a/internal/services/controllers/v1/olap/controller.go +++ b/internal/services/controllers/v1/olap/controller.go @@ -246,7 +246,11 @@ func New(fs ...OLAPControllerOpt) (*OLAPControllerImpl, error) { } func (o *OLAPControllerImpl) Start() (func() error, error) { - cleanupHeavyReadMQ, heavyReadMQ := o.mq.Clone() + cleanupHeavyReadMQ, heavyReadMQ, err := o.mq.Clone() + + if err != nil { + return nil, err + } heavyReadMQ.SetQOS(2000) o.s.Start() @@ -282,7 +286,7 @@ func (o *OLAPControllerImpl) Start() (func() error, error) { }() } - _, err := o.s.NewJob( + _, err = o.s.NewJob( gocron.DurationJob(time.Minute*15), gocron.NewTask( o.runOLAPTablePartition(ctx), diff --git a/internal/services/scheduler/v1/scheduler.go b/internal/services/scheduler/v1/scheduler.go index 5b0a9cbf1..50be5de7b 100644 --- a/internal/services/scheduler/v1/scheduler.go +++ b/internal/services/scheduler/v1/scheduler.go @@ -192,12 +192,17 @@ func New( } func (s *Scheduler) Start() (func() error, error) { + cleanupDLQ, err := s.mq.Subscribe(msgqueue.DISPATCHER_DEAD_LETTER_QUEUE, s.handleDeadLetteredMessages, msgqueue.NoOpHook) + + if err != nil { + return nil, fmt.Errorf("could not start subscribe to dispatcher dead letter queue: %w", err) + } ctx, cancel := context.WithCancel(context.Background()) wg := sync.WaitGroup{} - _, err := s.s.NewJob( + _, err = s.s.NewJob( gocron.DurationJob(time.Second*1), gocron.NewTask( s.runSetTenants(ctx), @@ -287,6 +292,10 @@ func (s *Scheduler) Start() (func() error, error) { return fmt.Errorf("could not cleanup job processing queue: %w", err) } + if err := cleanupDLQ(); err != nil { + return fmt.Errorf("could not cleanup message queue buffer: %w", err) + } + if err := s.s.Shutdown(); err != nil { return fmt.Errorf("could not shutdown scheduler: %w", err) } @@ -444,6 +453,7 @@ func (s *Scheduler) scheduleStepRuns(ctx context.Context, tenantId string, res * if err != nil { outerErr = multierror.Append(outerErr, fmt.Errorf("could not create bulk assigned task: %w", err)) + continue } err = s.mq.SendMessage( @@ -700,3 +710,147 @@ func taskBulkAssignedTask(tenantId string, workerIdsToTaskIds map[string][]int64 }, ) } + +func (s *Scheduler) handleDeadLetteredMessages(msg *msgqueue.Message) (err error) { + defer func() { + if r := recover(); r != nil { + recoverErr := recoveryutils.RecoverWithAlert(s.l, s.a, r) + + if recoverErr != nil { + err = recoverErr + } + } + }() + + ctx, cancel := context.WithTimeout(context.Background(), 25*time.Second) + defer cancel() + + switch msg.ID { + case "task-assigned-bulk": + err = s.handleDeadLetteredTaskBulkAssigned(ctx, msg) + case "task-cancelled": + err = s.handleDeadLetteredTaskCancelled(ctx, msg) + default: + err = fmt.Errorf("unknown task: %s", msg.ID) + } + + return err +} + +func (s *Scheduler) handleDeadLetteredTaskBulkAssigned(ctx context.Context, msg *msgqueue.Message) error { + msgs := msgqueue.JSONConvert[tasktypes.TaskAssignedBulkTaskPayload](msg.Payloads) + + taskIds := make([]int64, 0) + + for _, innerMsg := range msgs { + for _, tasks := range innerMsg.WorkerIdToTaskIds { + s.l.Error().Msgf("handling dead-lettered task assignments for tenant %s, tasks: %v. This indicates an abrupt shutdown of a dispatcher and should be investigated.", msg.TenantID, tasks) + taskIds = append(taskIds, tasks...) + } + } + + toFail, err := s.repov1.Tasks().ListTasks(ctx, msg.TenantID, taskIds) + + if err != nil { + return fmt.Errorf("could not list tasks for dead lettered bulk assigned message: %w", err) + } + + for _, _task := range toFail { + tenantId := msg.TenantID + task := _task + + msg, err := tasktypes.FailedTaskMessage( + tenantId, + task.ID, + task.InsertedAt, + sqlchelpers.UUIDToStr(task.ExternalID), + sqlchelpers.UUIDToStr(task.WorkflowRunID), + task.RetryCount, + false, + "Could not send task to worker", + false, + ) + + if err != nil { + return fmt.Errorf("could not create failed task message: %w", err) + } + + err = s.mq.SendMessage(ctx, msgqueue.TASK_PROCESSING_QUEUE, msg) + + if err != nil { + // NOTE: failure to send on the MQ is likely not transient; ideally we could only retry individual + // tasks but since this message has the tasks in a batch, we retry all of them instead. we're banking + // on the downstream `task-failed` processing to be idempotent. + return fmt.Errorf("could not send failed task message: %w", err) + } + } + + return nil +} + +func (s *Scheduler) handleDeadLetteredTaskCancelled(ctx context.Context, msg *msgqueue.Message) error { + payloads := msgqueue.JSONConvert[tasktypes.SignalTaskCancelledPayload](msg.Payloads) + + // try to resend the cancellation signal to the impacted worker. + workerIds := make([]string, 0) + + for _, p := range payloads { + s.l.Error().Msgf("handling dead-lettered task cancellations for tenant %s, task %d. This indicates an abrupt shutdown of a dispatcher and should be investigated.", msg.TenantID, p.TaskId) + workerIds = append(workerIds, p.WorkerId) + } + + // since the dispatcher IDs may have changed since the previous send, we need to query them again + dispatcherIdWorkerIds, err := s.repo.Worker().GetDispatcherIdsForWorkers(ctx, msg.TenantID, workerIds) + + if err != nil { + return fmt.Errorf("could not list dispatcher ids for workers: %w", err) + } + + workerIdToDispatcherId := make(map[string]string) + + for dispatcherId, workerIds := range dispatcherIdWorkerIds { + for _, workerId := range workerIds { + workerIdToDispatcherId[workerId] = dispatcherId + } + } + + dispatcherIdsToPayloads := make(map[string][]tasktypes.SignalTaskCancelledPayload) + + for _, p := range payloads { + // if we no longer have the worker attached to a dispatcher, discard the message + if _, ok := workerIdToDispatcherId[p.WorkerId]; !ok { + continue + } + + pcp := *p + dispatcherId := workerIdToDispatcherId[pcp.WorkerId] + + dispatcherIdsToPayloads[dispatcherId] = append(dispatcherIdsToPayloads[dispatcherId], pcp) + } + + for dispatcherId, payloads := range dispatcherIdsToPayloads { + msg, err := msgqueue.NewTenantMessage( + msg.TenantID, + "task-cancelled", + false, + true, + payloads..., + ) + + if err != nil { + return fmt.Errorf("could not create message for task cancellation: %w", err) + } + + err = s.mq.SendMessage( + ctx, + msgqueue.QueueTypeFromDispatcherID(dispatcherId), + msg, + ) + + if err != nil { + return fmt.Errorf("could not send message for task cancellation: %w", err) + } + } + + return nil +} diff --git a/pkg/config/loader/loader.go b/pkg/config/loader/loader.go index 53809f45c..1c28be88e 100644 --- a/pkg/config/loader/loader.go +++ b/pkg/config/loader/loader.go @@ -418,12 +418,16 @@ func createControllerLayer(dc *database.Layer, cf *server.ServerConfigFile, vers postgres.WithQos(cf.MessageQueue.Postgres.Qos), ) - cleanupv1, mqv1 = pgmqv1.NewPostgresMQ( + cleanupv1, mqv1, err = pgmqv1.NewPostgresMQ( dc.EngineRepository.MessageQueue(), pgmqv1.WithLogger(&l), pgmqv1.WithQos(cf.MessageQueue.Postgres.Qos), ) + if err != nil { + return nil, nil, fmt.Errorf("could not init postgres queue: %w", err) + } + cleanup1 = func() error { if err := cleanupv0(); err != nil { return err @@ -447,7 +451,7 @@ func createControllerLayer(dc *database.Layer, cf *server.ServerConfigFile, vers rabbitmq.WithMessageRejection(cf.MessageQueue.RabbitMQ.EnableMessageRejection, cf.MessageQueue.RabbitMQ.MaxDeathCount), ) - cleanupv1, mqv1 = rabbitmqv1.New( + cleanupv1, mqv1, err = rabbitmqv1.New( rabbitmqv1.WithURL(cf.MessageQueue.RabbitMQ.URL), rabbitmqv1.WithLogger(&l), rabbitmqv1.WithQos(cf.MessageQueue.RabbitMQ.Qos), @@ -461,6 +465,10 @@ func createControllerLayer(dc *database.Layer, cf *server.ServerConfigFile, vers rabbitmqv1.WithMessageRejection(cf.MessageQueue.RabbitMQ.EnableMessageRejection, cf.MessageQueue.RabbitMQ.MaxDeathCount), ) + if err != nil { + return nil, nil, fmt.Errorf("could not init rabbitmq: %w", err) + } + cleanup1 = func() error { if err := cleanupv0(); err != nil { return err