mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-01-05 16:19:43 -06:00
fix: don't wait for grpc stream send on rabbitmq loop (#2115)
* fix: don't wait for grpc stream send on rabbitmq loop * fix: ctx cancel
This commit is contained in:
@@ -115,7 +115,6 @@ func (d *DispatcherImpl) handleTaskBulkAssignedTask(ctx context.Context, msg *ms
|
||||
// we set a timeout of 25 seconds because we don't want to hold the semaphore for longer than the visibility timeout (30 seconds)
|
||||
// on the worker
|
||||
ctx, cancel := context.WithTimeout(ctx, 25*time.Second)
|
||||
defer cancel()
|
||||
|
||||
msgs := msgqueuev1.JSONConvert[tasktypesv1.TaskAssignedBulkTaskPayload](msg.Payloads)
|
||||
outerEg := errgroup.Group{}
|
||||
@@ -260,53 +259,63 @@ func (d *DispatcherImpl) handleTaskBulkAssignedTask(ctx context.Context, msg *ms
|
||||
}
|
||||
}
|
||||
|
||||
outerErr := outerEg.Wait()
|
||||
|
||||
if len(toRetry) > 0 {
|
||||
retryCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
// we spawn a goroutine to wait for the outer error group to finish and handle retries, because sending over the gRPC stream
|
||||
// can occasionally take a long time and we don't want to block the RabbitMQ queue processing
|
||||
go func() {
|
||||
defer cancel()
|
||||
|
||||
retryGroup := errgroup.Group{}
|
||||
outerErr := outerEg.Wait()
|
||||
|
||||
for _, _task := range toRetry {
|
||||
tenantId := msg.TenantID
|
||||
task := _task
|
||||
if len(toRetry) > 0 {
|
||||
retryCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
retryGroup.Go(func() error {
|
||||
msg, err := tasktypesv1.FailedTaskMessage(
|
||||
tenantId,
|
||||
task.ID,
|
||||
task.InsertedAt,
|
||||
sqlchelpers.UUIDToStr(task.ExternalID),
|
||||
sqlchelpers.UUIDToStr(task.WorkflowRunID),
|
||||
task.RetryCount,
|
||||
false,
|
||||
"Could not send task to worker",
|
||||
false,
|
||||
)
|
||||
retryGroup := errgroup.Group{}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create failed task message: %w", err)
|
||||
}
|
||||
for _, _task := range toRetry {
|
||||
tenantId := msg.TenantID
|
||||
task := _task
|
||||
|
||||
queueutils.SleepWithExponentialBackoff(100*time.Millisecond, 5*time.Second, int(task.InternalRetryCount))
|
||||
retryGroup.Go(func() error {
|
||||
msg, err := tasktypesv1.FailedTaskMessage(
|
||||
tenantId,
|
||||
task.ID,
|
||||
task.InsertedAt,
|
||||
sqlchelpers.UUIDToStr(task.ExternalID),
|
||||
sqlchelpers.UUIDToStr(task.WorkflowRunID),
|
||||
task.RetryCount,
|
||||
false,
|
||||
"Could not send task to worker",
|
||||
false,
|
||||
)
|
||||
|
||||
err = d.mqv1.SendMessage(retryCtx, msgqueuev1.TASK_PROCESSING_QUEUE, msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not create failed task message: %w", err)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not send failed task message: %w", err)
|
||||
}
|
||||
queueutils.SleepWithExponentialBackoff(100*time.Millisecond, 5*time.Second, int(task.InternalRetryCount))
|
||||
|
||||
return nil
|
||||
})
|
||||
err = d.mqv1.SendMessage(retryCtx, msgqueuev1.TASK_PROCESSING_QUEUE, msg)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not send failed task message: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err := retryGroup.Wait(); err != nil {
|
||||
outerErr = multierror.Append(outerErr, fmt.Errorf("could not retry failed tasks: %w", err))
|
||||
}
|
||||
}
|
||||
|
||||
if err := retryGroup.Wait(); err != nil {
|
||||
outerErr = multierror.Append(outerErr, fmt.Errorf("could not retry failed tasks: %w", err))
|
||||
if outerErr != nil {
|
||||
d.l.Error().Err(outerErr).Msg("failed to handle task assigned bulk message")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return outerErr
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DispatcherImpl) handleTaskCancelled(ctx context.Context, msg *msgqueuev1.Message) error {
|
||||
|
||||
Reference in New Issue
Block a user