mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-02-04 07:08:46 -06:00
* fix listener cache issue to resubscribe when erroring out * worker retry message clarification (#2543) * add another retry layer and add comments * fix loop logic * make listener channel retry
142 lines
3.6 KiB
Go
142 lines
3.6 KiB
Go
package client
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
|
|
dispatchercontracts "github.com/hatchet-dev/hatchet/internal/services/dispatcher/contracts"
|
|
)
|
|
|
|
// Workflow represents a running workflow instance and provides methods to retrieve its results.
|
|
//
|
|
// The workflow listener uses a multi-layer best-effort retry strategy to handle transient failures
|
|
// and provides robust recovery from temporary connection issues like brief DB downtime
|
|
// or network interruptions without requiring manual intervention.
|
|
type Workflow struct {
|
|
workflowRunId string
|
|
listener *WorkflowRunsListener
|
|
}
|
|
|
|
func NewWorkflow(
|
|
workflowRunId string,
|
|
listener *WorkflowRunsListener,
|
|
) *Workflow {
|
|
return &Workflow{
|
|
workflowRunId: workflowRunId,
|
|
listener: listener,
|
|
}
|
|
}
|
|
|
|
func (r *Workflow) RunId() string {
|
|
return r.workflowRunId
|
|
}
|
|
|
|
// Deprecated: Use RunId instead
|
|
func (r *Workflow) WorkflowRunId() string {
|
|
return r.workflowRunId
|
|
}
|
|
|
|
type WorkflowResult struct {
|
|
workflowRun *dispatchercontracts.WorkflowRunEvent
|
|
}
|
|
|
|
func (r *WorkflowResult) StepOutput(key string, v interface{}) error {
|
|
var outputBytes []byte
|
|
for _, stepRunResult := range r.workflowRun.Results {
|
|
if stepRunResult.StepReadableId == key {
|
|
if stepRunResult.Error != nil {
|
|
return fmt.Errorf("%s", *stepRunResult.Error)
|
|
}
|
|
|
|
if stepRunResult.Output != nil {
|
|
outputBytes = []byte(*stepRunResult.Output)
|
|
}
|
|
}
|
|
}
|
|
|
|
if outputBytes == nil {
|
|
return fmt.Errorf("step output for %s not found", key)
|
|
}
|
|
|
|
if err := json.Unmarshal(outputBytes, v); err != nil {
|
|
return fmt.Errorf("failed to unmarshal output: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Results returns a map of all step outputs from the workflow run.
|
|
//
|
|
// Note: This method operates on an already-fetched WorkflowResult. The retry logic
|
|
// is handled by Workflow.Result() which obtains the WorkflowResult.
|
|
func (r *WorkflowResult) Results() (interface{}, error) {
|
|
results := make(map[string]interface{})
|
|
|
|
for _, stepRunResult := range r.workflowRun.Results {
|
|
if stepRunResult.Error != nil {
|
|
return nil, fmt.Errorf("run failed: %s", *stepRunResult.Error)
|
|
}
|
|
|
|
if stepRunResult.Output != nil {
|
|
results[stepRunResult.StepReadableId] = stepRunResult.Output
|
|
}
|
|
}
|
|
|
|
return results, nil
|
|
}
|
|
|
|
// Result waits for the workflow run to complete and returns the results.
|
|
//
|
|
// Retry strategy (best-effort):
|
|
// 1. This function retries AddWorkflowRun up to DefaultActionListenerRetryCount times with DefaultActionListenerRetryInterval intervals
|
|
// 2. AddWorkflowRun calls retrySend which retries up to DefaultActionListenerRetryCount times with DefaultActionListenerRetryInterval intervals
|
|
// 3. Each retrySend attempt calls retrySubscribe which itself retries up to DefaultActionListenerRetryCount times with DefaultActionListenerRetryInterval intervals
|
|
func (c *Workflow) Result() (*WorkflowResult, error) {
|
|
resChan := make(chan *WorkflowResult, 1)
|
|
sessionId := uuid.NewString()
|
|
|
|
var err error
|
|
retries := 0
|
|
|
|
for retries < DefaultActionListenerRetryCount {
|
|
if retries > 0 {
|
|
time.Sleep(DefaultActionListenerRetryInterval)
|
|
}
|
|
|
|
err = c.listener.AddWorkflowRun(
|
|
c.workflowRunId,
|
|
sessionId,
|
|
func(event WorkflowRunEvent) error {
|
|
resChan <- &WorkflowResult{
|
|
workflowRun: event,
|
|
}
|
|
|
|
return nil
|
|
},
|
|
)
|
|
|
|
if err == nil {
|
|
defer c.listener.RemoveWorkflowRun(c.workflowRunId, sessionId)
|
|
|
|
break
|
|
}
|
|
}
|
|
|
|
if retries == DefaultActionListenerRetryCount && err != nil {
|
|
return nil, fmt.Errorf("failed to listen for workflow events: %w", err)
|
|
}
|
|
|
|
res := <-resChan
|
|
|
|
for _, stepRunResult := range res.workflowRun.Results {
|
|
if stepRunResult.Error != nil {
|
|
return nil, fmt.Errorf("%s", *stepRunResult.Error)
|
|
}
|
|
}
|
|
|
|
return res, nil
|
|
}
|