fix: retries cause semaphore to go to zero

This commit is contained in:
Alexander Belanger
2024-04-15 12:41:18 -04:00
committed by abelanger5
parent 181a8e48aa
commit 75172c4a05
2 changed files with 19 additions and 4 deletions

View File

@@ -51,7 +51,7 @@ func (r *retryWorkflow) StepOne(ctx worker.HatchetContext) (result *stepOneOutpu
return nil, err
}
if r.retries < 5 {
if r.retries < 2 {
r.retries++
return nil, fmt.Errorf("error")
}
@@ -73,6 +73,7 @@ func run(ch <-chan interface{}, events chan<- string) error {
worker.WithClient(
c,
),
worker.WithMaxRuns(1),
)
if err != nil {
return fmt.Errorf("error creating worker: %w", err)
@@ -89,7 +90,7 @@ func run(ch <-chan interface{}, events chan<- string) error {
Description: "This runs after an update to the user model.",
Concurrency: worker.Concurrency(getConcurrencyKey),
Steps: []*worker.WorkflowStep{
worker.Fn(wk.StepOne).SetName("step-one").SetRetries(3),
worker.Fn(wk.StepOne).SetName("step-one").SetRetries(4),
},
},
)

View File

@@ -274,6 +274,20 @@ func (s *stepRunEngineRepository) AssignStepRunToWorker(ctx context.Context, ste
defer deferRollback(context.Background(), s.l, tx.Rollback)
// Update the old worker semaphore. This will only increment if the step run was already assigned to a worker,
// which means the step run is being retried or rerun.
_, err = s.queries.UpdateWorkerSemaphore(ctx, tx, dbsqlc.UpdateWorkerSemaphoreParams{
Inc: 1,
Steprunid: stepRun.StepRun.ID,
Tenantid: stepRun.StepRun.TenantId,
})
isNoRowsErr := err != nil && errors.Is(err, pgx.ErrNoRows)
if err != nil && !isNoRowsErr {
return "", "", fmt.Errorf("could not upsert old worker semaphore: %w", err)
}
assigned, err := s.queries.AssignStepRunToWorker(ctx, tx, dbsqlc.AssignStepRunToWorkerParams{
Steprunid: stepRun.StepRun.ID,
Tenantid: stepRun.StepRun.TenantId,
@@ -295,10 +309,10 @@ func (s *stepRunEngineRepository) AssignStepRunToWorker(ctx context.Context, ste
Tenantid: stepRun.StepRun.TenantId,
})
isNoRowsErr := err != nil && errors.Is(err, pgx.ErrNoRows)
isNoRowsErr = err != nil && errors.Is(err, pgx.ErrNoRows)
if err != nil && !isNoRowsErr {
return "", "", fmt.Errorf("could not upsert worker semaphore: %w", err)
return "", "", fmt.Errorf("could not upsert new worker semaphore: %w", err)
}
if !isNoRowsErr && semaphore.Slots < 0 {