diff --git a/internal/services/dispatcher/dispatcher_v1.go b/internal/services/dispatcher/dispatcher_v1.go index e75ca9395..c799e37c8 100644 --- a/internal/services/dispatcher/dispatcher_v1.go +++ b/internal/services/dispatcher/dispatcher_v1.go @@ -115,7 +115,6 @@ func (d *DispatcherImpl) handleTaskBulkAssignedTask(ctx context.Context, msg *ms // we set a timeout of 25 seconds because we don't want to hold the semaphore for longer than the visibility timeout (30 seconds) // on the worker ctx, cancel := context.WithTimeout(ctx, 25*time.Second) - defer cancel() msgs := msgqueuev1.JSONConvert[tasktypesv1.TaskAssignedBulkTaskPayload](msg.Payloads) outerEg := errgroup.Group{} @@ -260,53 +259,63 @@ func (d *DispatcherImpl) handleTaskBulkAssignedTask(ctx context.Context, msg *ms } } - outerErr := outerEg.Wait() - - if len(toRetry) > 0 { - retryCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + // we spawn a goroutine to wait for the outer error group to finish and handle retries, because sending over the gRPC stream + // can occasionally take a long time and we don't want to block the RabbitMQ queue processing + go func() { defer cancel() - retryGroup := errgroup.Group{} + outerErr := outerEg.Wait() - for _, _task := range toRetry { - tenantId := msg.TenantID - task := _task + if len(toRetry) > 0 { + retryCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() - retryGroup.Go(func() error { - msg, err := tasktypesv1.FailedTaskMessage( - tenantId, - task.ID, - task.InsertedAt, - sqlchelpers.UUIDToStr(task.ExternalID), - sqlchelpers.UUIDToStr(task.WorkflowRunID), - task.RetryCount, - false, - "Could not send task to worker", - false, - ) + retryGroup := errgroup.Group{} - if err != nil { - return fmt.Errorf("could not create failed task message: %w", err) - } + for _, _task := range toRetry { + tenantId := msg.TenantID + task := _task - queueutils.SleepWithExponentialBackoff(100*time.Millisecond, 5*time.Second, int(task.InternalRetryCount)) + retryGroup.Go(func() error { + msg, err := tasktypesv1.FailedTaskMessage( + tenantId, + task.ID, + task.InsertedAt, + sqlchelpers.UUIDToStr(task.ExternalID), + sqlchelpers.UUIDToStr(task.WorkflowRunID), + task.RetryCount, + false, + "Could not send task to worker", + false, + ) - err = d.mqv1.SendMessage(retryCtx, msgqueuev1.TASK_PROCESSING_QUEUE, msg) + if err != nil { + return fmt.Errorf("could not create failed task message: %w", err) + } - if err != nil { - return fmt.Errorf("could not send failed task message: %w", err) - } + queueutils.SleepWithExponentialBackoff(100*time.Millisecond, 5*time.Second, int(task.InternalRetryCount)) - return nil - }) + err = d.mqv1.SendMessage(retryCtx, msgqueuev1.TASK_PROCESSING_QUEUE, msg) + + if err != nil { + return fmt.Errorf("could not send failed task message: %w", err) + } + + return nil + }) + } + + if err := retryGroup.Wait(); err != nil { + outerErr = multierror.Append(outerErr, fmt.Errorf("could not retry failed tasks: %w", err)) + } } - if err := retryGroup.Wait(); err != nil { - outerErr = multierror.Append(outerErr, fmt.Errorf("could not retry failed tasks: %w", err)) + if outerErr != nil { + d.l.Error().Err(outerErr).Msg("failed to handle task assigned bulk message") } - } + }() - return outerErr + return nil } func (d *DispatcherImpl) handleTaskCancelled(ctx context.Context, msg *msgqueuev1.Message) error {