Files
hatchet/pkg/client/workflow.go
Mohammed Nafees f66fe63ad0 [Go SDK] Resubscribe and get a new listener stream when gRPC connections fail (#2544)
* fix listener cache issue to resubscribe when erroring out

* worker retry message clarification (#2543)

* add another retry layer and add comments

* fix loop logic

* make listener channel retry
2025-11-20 19:13:24 +01:00

142 lines
3.6 KiB
Go

package client
import (
"encoding/json"
"fmt"
"time"
"github.com/google/uuid"
dispatchercontracts "github.com/hatchet-dev/hatchet/internal/services/dispatcher/contracts"
)
// Workflow represents a running workflow instance and provides methods to retrieve its results.
//
// The workflow listener uses a multi-layer best-effort retry strategy to handle transient failures
// and provides robust recovery from temporary connection issues like brief DB downtime
// or network interruptions without requiring manual intervention.
type Workflow struct {
workflowRunId string
listener *WorkflowRunsListener
}
func NewWorkflow(
workflowRunId string,
listener *WorkflowRunsListener,
) *Workflow {
return &Workflow{
workflowRunId: workflowRunId,
listener: listener,
}
}
func (r *Workflow) RunId() string {
return r.workflowRunId
}
// Deprecated: Use RunId instead
func (r *Workflow) WorkflowRunId() string {
return r.workflowRunId
}
type WorkflowResult struct {
workflowRun *dispatchercontracts.WorkflowRunEvent
}
func (r *WorkflowResult) StepOutput(key string, v interface{}) error {
var outputBytes []byte
for _, stepRunResult := range r.workflowRun.Results {
if stepRunResult.StepReadableId == key {
if stepRunResult.Error != nil {
return fmt.Errorf("%s", *stepRunResult.Error)
}
if stepRunResult.Output != nil {
outputBytes = []byte(*stepRunResult.Output)
}
}
}
if outputBytes == nil {
return fmt.Errorf("step output for %s not found", key)
}
if err := json.Unmarshal(outputBytes, v); err != nil {
return fmt.Errorf("failed to unmarshal output: %w", err)
}
return nil
}
// Results returns a map of all step outputs from the workflow run.
//
// Note: This method operates on an already-fetched WorkflowResult. The retry logic
// is handled by Workflow.Result() which obtains the WorkflowResult.
func (r *WorkflowResult) Results() (interface{}, error) {
results := make(map[string]interface{})
for _, stepRunResult := range r.workflowRun.Results {
if stepRunResult.Error != nil {
return nil, fmt.Errorf("run failed: %s", *stepRunResult.Error)
}
if stepRunResult.Output != nil {
results[stepRunResult.StepReadableId] = stepRunResult.Output
}
}
return results, nil
}
// Result waits for the workflow run to complete and returns the results.
//
// Retry strategy (best-effort):
// 1. This function retries AddWorkflowRun up to DefaultActionListenerRetryCount times with DefaultActionListenerRetryInterval intervals
// 2. AddWorkflowRun calls retrySend which retries up to DefaultActionListenerRetryCount times with DefaultActionListenerRetryInterval intervals
// 3. Each retrySend attempt calls retrySubscribe which itself retries up to DefaultActionListenerRetryCount times with DefaultActionListenerRetryInterval intervals
func (c *Workflow) Result() (*WorkflowResult, error) {
resChan := make(chan *WorkflowResult, 1)
sessionId := uuid.NewString()
var err error
retries := 0
for retries < DefaultActionListenerRetryCount {
if retries > 0 {
time.Sleep(DefaultActionListenerRetryInterval)
}
err = c.listener.AddWorkflowRun(
c.workflowRunId,
sessionId,
func(event WorkflowRunEvent) error {
resChan <- &WorkflowResult{
workflowRun: event,
}
return nil
},
)
if err == nil {
defer c.listener.RemoveWorkflowRun(c.workflowRunId, sessionId)
break
}
}
if retries == DefaultActionListenerRetryCount && err != nil {
return nil, fmt.Errorf("failed to listen for workflow events: %w", err)
}
res := <-resChan
for _, stepRunResult := range res.workflowRun.Results {
if stepRunResult.Error != nil {
return nil, fmt.Errorf("%s", *stepRunResult.Error)
}
}
return res, nil
}