fix race condition in child spawn (#2429)

This commit is contained in:
Mohammed Nafees
2025-10-17 16:56:41 +02:00
committed by GitHub
parent 0ef001c5ce
commit 8f57989730
+14 -7
View File
@@ -348,6 +348,9 @@ func (h *hatchetContext) RetryCount() int {
}
func (h *hatchetContext) CurChildIndex() int {
h.indexMu.Lock()
defer h.indexMu.Unlock()
return h.i
}
@@ -393,13 +396,18 @@ func (h *hatchetContext) SpawnWorkflow(workflowName string, input any, opts *Spa
workflowName = fmt.Sprintf("%s%s", ns, workflowName)
}
h.indexMu.Lock()
childIndex := h.i
h.i++
h.indexMu.Unlock()
workflowRunId, err := h.client().Admin().RunChildWorkflow(
workflowName,
input,
&client.ChildWorkflowOpts{
ParentId: h.WorkflowRunId(),
ParentStepRunId: h.StepRunId(),
ChildIndex: h.CurChildIndex(),
ChildIndex: childIndex,
ChildKey: opts.Key,
DesiredWorkerId: desiredWorker,
AdditionalMetadata: opts.AdditionalMetadata,
@@ -411,9 +419,6 @@ func (h *hatchetContext) SpawnWorkflow(workflowName string, input any, opts *Spa
return nil, fmt.Errorf("failed to spawn workflow: %w", err)
}
// increment the index
h.IncChildIndex()
return client.NewWorkflow(workflowRunId, listener), nil
}
@@ -451,8 +456,10 @@ func (h *hatchetContext) SpawnWorkflows(childWorkflows []*SpawnWorkflowsOpts) ([
workflowName = fmt.Sprintf("%s%s", ns, workflowName)
}
// increment the index
h.IncChildIndex()
h.indexMu.Lock()
childIndex := h.i
h.i++
h.indexMu.Unlock()
triggerWorkflows[i] = &client.RunChildWorkflowsOpts{
WorkflowName: workflowName,
@@ -460,7 +467,7 @@ func (h *hatchetContext) SpawnWorkflows(childWorkflows []*SpawnWorkflowsOpts) ([
Opts: &client.ChildWorkflowOpts{
ParentId: h.WorkflowRunId(),
ParentStepRunId: h.StepRunId(),
ChildIndex: h.CurChildIndex(),
ChildIndex: childIndex,
ChildKey: c.Key,
DesiredWorkerId: desiredWorker,
AdditionalMetadata: c.AdditionalMetadata,