diff --git a/examples/go/v1/bulk-operations/main.go b/examples/go/v1/bulk-operations/main.go deleted file mode 100644 index b1bbd926b..000000000 --- a/examples/go/v1/bulk-operations/main.go +++ /dev/null @@ -1,101 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "time" - - hatchet "github.com/hatchet-dev/hatchet/sdks/go" -) - -type ProcessInput struct { - ID int `json:"id"` - Message string `json:"message"` -} - -type ProcessOutput struct { - ID int `json:"id"` - Result string `json:"result"` -} - -func main() { - // Create a new Hatchet client - client, err := hatchet.NewClient() - if err != nil { - log.Fatalf("failed to create hatchet client: %v", err) - } - - // Create a workflow for bulk processing - workflow := client.NewWorkflow("bulk-processing-workflow") - - // Define the processing task - workflow.NewTask("process-item", func(ctx hatchet.Context, input ProcessInput) (ProcessOutput, error) { - // Simulate some processing work - time.Sleep(time.Duration(100+input.ID*50) * time.Millisecond) - - log.Printf("Processing item %d: %s", input.ID, input.Message) - - return ProcessOutput{ - ID: input.ID, - Result: fmt.Sprintf("Processed item %d: %s", input.ID, input.Message), - }, nil - }) - - // Create a worker to run the workflow - worker, err := client.NewWorker("bulk-operations-worker", hatchet.WithWorkflows(workflow)) - if err != nil { - log.Fatalf("failed to create worker: %v", err) - } - - // Start the worker in a goroutine - go func() { - log.Println("Starting bulk operations worker...") - if err := worker.StartBlocking(); err != nil { - log.Printf("worker failed: %v", err) - } - }() - - // Wait a moment for the worker to start - time.Sleep(2 * time.Second) - - // Prepare bulk data - bulkInputs := make([]ProcessInput, 10) - for i := 0; i < 10; i++ { - bulkInputs[i] = ProcessInput{ - ID: i + 1, - Message: fmt.Sprintf("Task number %d", i+1), - } - } - - log.Printf("Running bulk operations with %d items...", len(bulkInputs)) - - // Prepare inputs as []RunManyOpt for bulk run - inputs := make([]hatchet.RunManyOpt, len(bulkInputs)) - for i, input := range bulkInputs { - inputs[i] = hatchet.RunManyOpt{ - Input: input, - } - } - - // Run workflows in bulk - ctx := context.Background() - runIDs, err := client.RunMany(ctx, "bulk-processing-workflow", inputs) - if err != nil { - log.Fatalf("failed to run bulk workflows: %v", err) - } - - log.Printf("Started %d bulk workflows with run IDs: %v", len(runIDs), runIDs) - - // Optionally monitor some of the runs - for i, runID := range runIDs { - if i < 3 { // Monitor first 3 runs as examples - log.Printf("Monitoring run %d with ID: %s", i+1, runID) - } - } - - log.Println("All bulk operations started. Press Ctrl+C to stop the worker.") - - // Keep the main function running - select {} -} diff --git a/examples/go/v1/cancellations/main.go b/examples/go/v1/cancellations/main.go deleted file mode 100644 index 28c825f54..000000000 --- a/examples/go/v1/cancellations/main.go +++ /dev/null @@ -1,93 +0,0 @@ -package main - -import ( - "context" - "log" - "time" - - hatchet "github.com/hatchet-dev/hatchet/sdks/go" -) - -type CancellationInput struct { - Message string `json:"message"` -} - -type CancellationOutput struct { - Status string `json:"status"` - Completed bool `json:"completed"` -} - -func main() { - client, err := hatchet.NewClient() - if err != nil { - log.Fatalf("failed to create hatchet client: %v", err) - } - - // Create a workflow that demonstrates cancellation handling - workflow := client.NewWorkflow("cancellation-demo", - hatchet.WithWorkflowDescription("Demonstrates workflow cancellation patterns"), - hatchet.WithWorkflowVersion("1.0.0"), - ) - - // Add a long-running task that can be cancelled - workflow.NewTask("long-running-task", func(ctx hatchet.Context, input CancellationInput) (CancellationOutput, error) { - log.Printf("Starting long-running task with message: %s", input.Message) - - // Simulate long-running work with cancellation checking - for i := 0; i < 10; i++ { - select { - case <-ctx.Done(): - log.Printf("Task cancelled after %d seconds", i) - return CancellationOutput{ - Status: "cancelled", - Completed: false, - }, nil - default: - log.Printf("Working... step %d/10", i+1) - time.Sleep(1 * time.Second) - } - } - - log.Println("Task completed successfully") - return CancellationOutput{ - Status: "completed", - Completed: true, - }, nil - }, hatchet.WithTimeout(30*time.Second)) - - // Create a worker - worker, err := client.NewWorker("cancellation-worker", - hatchet.WithWorkflows(workflow), - hatchet.WithSlots(3), - ) - if err != nil { - log.Fatalf("failed to create worker: %v", err) - } - - // Run workflow instances to demonstrate cancellation - go func() { - time.Sleep(2 * time.Second) - - log.Println("Starting workflow instance...") - _, err := client.Run(context.Background(), "cancellation-demo", CancellationInput{ - Message: "This task will run for 10 seconds and can be cancelled", - }) - if err != nil { - log.Printf("failed to run workflow: %v", err) - } - - // You can demonstrate cancellation by manually cancelling the workflow - // through the Hatchet UI or API after starting it - }() - - log.Println("Starting worker for cancellation demo...") - log.Println("Features demonstrated:") - log.Println(" - Long-running task with cancellation checking") - log.Println(" - Context cancellation handling") - log.Println(" - Graceful shutdown on cancellation") - log.Println(" - Task timeout configuration") - - if err := worker.StartBlocking(); err != nil { - log.Fatalf("failed to start worker: %v", err) - } -} diff --git a/examples/go/v1/child-workflows/main.go b/examples/go/v1/child-workflows/main.go deleted file mode 100644 index 4bf37458a..000000000 --- a/examples/go/v1/child-workflows/main.go +++ /dev/null @@ -1,127 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "time" - - hatchet "github.com/hatchet-dev/hatchet/sdks/go" -) - -type ParentInput struct { - Count int `json:"count"` -} - -type ChildInput struct { - Value int `json:"value"` -} - -type ChildOutput struct { - Result int `json:"result"` -} - -type ParentOutput struct { - Sum int `json:"sum"` -} - -func main() { - client, err := hatchet.NewClient() - if err != nil { - log.Fatalf("failed to create hatchet client: %v", err) - } - - // Create child workflow - childWorkflow := client.NewWorkflow("child-workflow", - hatchet.WithWorkflowDescription("Child workflow that processes a single value"), - hatchet.WithWorkflowVersion("1.0.0"), - ) - - childWorkflow.NewTask("process-value", func(ctx hatchet.Context, input ChildInput) (ChildOutput, error) { - log.Printf("Child workflow processing value: %d", input.Value) - - // Simulate some processing - result := input.Value * 2 - - return ChildOutput{ - Result: result, - }, nil - }) - - // Create parent workflow that spawns multiple child workflows - parentWorkflow := client.NewWorkflow("parent-workflow", - hatchet.WithWorkflowDescription("Parent workflow that spawns child workflows"), - hatchet.WithWorkflowVersion("1.0.0"), - ) - - parentWorkflow.NewTask("spawn-children", func(ctx hatchet.Context, input ParentInput) (ParentOutput, error) { - log.Printf("Parent workflow spawning %d child workflows", input.Count) - - // Spawn multiple child workflows and collect results - sum := 0 - for i := 0; i < input.Count; i++ { - log.Printf("Spawning child workflow %d/%d", i+1, input.Count) - - // Spawn child workflow and wait for result - childResult, err := childWorkflow.Run(ctx.GetContext(), ChildInput{ - Value: i + 1, - }) - if err != nil { - return ParentOutput{}, fmt.Errorf("failed to spawn child workflow %d: %w", i, err) - } - - log.Printf("Child workflow %d completed with result: %d", i+1, childResult) - } - - log.Printf("All child workflows completed. Total sum: %d", sum) - return ParentOutput{ - Sum: sum, - }, nil - }) - - // Create a worker with both workflows - worker, err := client.NewWorker("child-workflow-worker", - hatchet.WithWorkflows(childWorkflow, parentWorkflow), - hatchet.WithSlots(10), // Allow parallel execution of child workflows - ) - if err != nil { - log.Fatalf("failed to create worker: %v", err) - } - - // Run the parent workflow - go func() { - // Wait a bit for worker to start - for i := 0; i < 3; i++ { - log.Printf("Starting in %d seconds...", 3-i) - select { - case <-context.Background().Done(): - return - default: - time.Sleep(1 * time.Second) - } - } - - log.Println("Triggering parent workflow...") - _, err := client.Run(context.Background(), "parent-workflow", ParentInput{ - Count: 5, // Spawn 5 child workflows - }) - if err != nil { - log.Printf("failed to run parent workflow: %v", err) - } - }() - - log.Println("Starting worker for child workflows demo...") - log.Println("Features demonstrated:") - log.Println(" - Parent workflow spawning multiple child workflows") - log.Println(" - Child workflow execution and result collection") - log.Println(" - Parallel child workflow processing") - log.Println(" - Parent-child workflow communication") - - if err := worker.StartBlocking(); err != nil { - log.Fatalf("failed to start worker: %v", err) - } -} - -func stringPtr(s string) *string { - return &s -} diff --git a/examples/go/v1/conditions/main.go b/examples/go/v1/conditions/main.go deleted file mode 100644 index ff1cac003..000000000 --- a/examples/go/v1/conditions/main.go +++ /dev/null @@ -1,182 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "math/rand" - "time" - - hatchet "github.com/hatchet-dev/hatchet/sdks/go" -) - -type WorkflowInput struct { - ProcessID string `json:"process_id"` -} - -type StepOutput struct { - StepName string `json:"step_name"` - RandomNumber int `json:"random_number"` - ProcessedAt string `json:"processed_at"` -} - -type SumOutput struct { - Total int `json:"total"` - Summary string `json:"summary"` -} - -func main() { - client, err := hatchet.NewClient() - if err != nil { - log.Fatalf("failed to create hatchet client: %v", err) - } - - workflow := client.NewWorkflow("conditional-workflow") - - // Initial task - start := workflow.NewTask("start", func(ctx hatchet.Context, input WorkflowInput) (StepOutput, error) { - randomNum := rand.Intn(100) + 1 - log.Printf("Starting workflow for process %s with random number: %d", input.ProcessID, randomNum) - - return StepOutput{ - StepName: "start", - RandomNumber: randomNum, - ProcessedAt: time.Now().Format(time.RFC3339), - }, nil - }) - - // Task that waits for either 10 seconds or a user event - waitForEvent := workflow.NewTask("wait-for-event", func(ctx hatchet.Context, input WorkflowInput) (StepOutput, error) { - log.Printf("Wait task completed for process %s", input.ProcessID) - return StepOutput{ - StepName: "wait-for-event", - RandomNumber: rand.Intn(50) + 1, - ProcessedAt: time.Now().Format(time.RFC3339), - }, nil - }, - hatchet.WithParents(start), - hatchet.WithWaitFor(hatchet.OrCondition( - hatchet.SleepCondition(10*time.Second), - hatchet.UserEventCondition("process:continue", "true"), - )), - ) - - // Left branch - only runs if start's random number <= 50 - leftBranch := workflow.NewTask("left-branch", func(ctx hatchet.Context, input WorkflowInput) (StepOutput, error) { - log.Printf("Left branch executing for process %s", input.ProcessID) - return StepOutput{ - StepName: "left-branch", - RandomNumber: rand.Intn(25) + 1, - ProcessedAt: time.Now().Format(time.RFC3339), - }, nil - }, - hatchet.WithParents(start), - hatchet.WithSkipIf(hatchet.ParentCondition(start, "output.randomNumber > 50")), - ) - - // Right branch - only runs if start's random number > 50 - rightBranch := workflow.NewTask("right-branch", func(ctx hatchet.Context, input WorkflowInput) (StepOutput, error) { - log.Printf("Right branch executing for process %s", input.ProcessID) - return StepOutput{ - StepName: "right-branch", - RandomNumber: rand.Intn(25) + 26, - ProcessedAt: time.Now().Format(time.RFC3339), - }, nil - }, - hatchet.WithParents(start), - hatchet.WithSkipIf(hatchet.ParentCondition(start, "output.randomNumber <= 50")), - ) - - // Task that might be skipped based on external event - skipableTask := workflow.NewTask("skipable-task", func(ctx hatchet.Context, input WorkflowInput) (StepOutput, error) { - log.Printf("Skipable task executing for process %s", input.ProcessID) - return StepOutput{ - StepName: "skipable-task", - RandomNumber: rand.Intn(10) + 1, - ProcessedAt: time.Now().Format(time.RFC3339), - }, nil - }, - hatchet.WithParents(start), - hatchet.WithWaitFor(hatchet.SleepCondition(3*time.Second)), - hatchet.WithSkipIf(hatchet.UserEventCondition("process:skip", "true")), - ) - - // Final aggregation task - workflow.NewTask("summarize", func(ctx hatchet.Context, input WorkflowInput) (SumOutput, error) { - var total int - var summary string - - // Get start output - var startOutput StepOutput - if err := ctx.ParentOutput(start, &startOutput); err != nil { - return SumOutput{}, err - } - total += startOutput.RandomNumber - summary = fmt.Sprintf("Start: %d", startOutput.RandomNumber) - - // Get wait output - var waitOutput StepOutput - if err := ctx.ParentOutput(waitForEvent, &waitOutput); err != nil { - return SumOutput{}, err - } - total += waitOutput.RandomNumber - summary += fmt.Sprintf(", Wait: %d", waitOutput.RandomNumber) - - // Try to get left branch output (might be skipped) - var leftOutput StepOutput - if err := ctx.ParentOutput(leftBranch, &leftOutput); err == nil { - total += leftOutput.RandomNumber - summary += fmt.Sprintf(", Left: %d", leftOutput.RandomNumber) - } else { - summary += ", Left: skipped" - } - - // Try to get right branch output (might be skipped) - var rightOutput StepOutput - if err := ctx.ParentOutput(rightBranch, &rightOutput); err == nil { - total += rightOutput.RandomNumber - summary += fmt.Sprintf(", Right: %d", rightOutput.RandomNumber) - } else { - summary += ", Right: skipped" - } - - // Try to get skipable task output (might be skipped) - var skipableOutput StepOutput - if err := ctx.ParentOutput(skipableTask, &skipableOutput); err == nil { - total += skipableOutput.RandomNumber - summary += fmt.Sprintf(", Skipable: %d", skipableOutput.RandomNumber) - } else { - summary += ", Skipable: skipped" - } - - log.Printf("Final summary for process %s: total=%d, %s", input.ProcessID, total, summary) - - return SumOutput{ - Total: total, - Summary: summary, - }, nil - }, hatchet.WithParents( - waitForEvent, - leftBranch, - rightBranch, - skipableTask, - )) - - worker, err := client.NewWorker("conditional-worker", hatchet.WithWorkflows(workflow)) - if err != nil { - log.Fatalf("failed to create worker: %v", err) - } - - // Run the workflow - _, err = client.Run(context.Background(), "conditional-workflow", WorkflowInput{ - ProcessID: "demo-process-1", - }) - if err != nil { - log.Fatalf("failed to run workflow: %v", err) - } - - log.Println("Starting conditional workflow worker...") - if err := worker.StartBlocking(); err != nil { - log.Fatalf("failed to start worker: %v", err) - } -} diff --git a/examples/go/v1/cron/main.go b/examples/go/v1/cron/main.go deleted file mode 100644 index 7c546d7ce..000000000 --- a/examples/go/v1/cron/main.go +++ /dev/null @@ -1,119 +0,0 @@ -package main - -import ( - "log" - "time" - - hatchet "github.com/hatchet-dev/hatchet/sdks/go" -) - -type CronInput struct { - Timestamp string `json:"timestamp"` -} - -type CronOutput struct { - JobName string `json:"job_name"` - ExecutedAt string `json:"executed_at"` - NextRun string `json:"next_run"` -} - -func main() { - client, err := hatchet.NewClient() - if err != nil { - log.Fatalf("failed to create hatchet client: %v", err) - } - - // Daily cleanup job - runs at 2 AM every day - dailyCleanup := client.NewWorkflow("daily-cleanup", - hatchet.WithWorkflowCron("0 2 * * *"), // 2 AM daily - hatchet.WithWorkflowDescription("Daily cleanup and maintenance tasks"), - ) - - dailyCleanup.NewTask("cleanup-temp-files", func(ctx hatchet.Context, input CronInput) (CronOutput, error) { - log.Printf("Running daily cleanup at %s", input.Timestamp) - - // Simulate cleanup work - time.Sleep(2 * time.Second) - - return CronOutput{ - JobName: "daily-cleanup", - ExecutedAt: time.Now().Format(time.RFC3339), - NextRun: "Next run: tomorrow at 2 AM", - }, nil - }) - - // Hourly health check - runs every hour - healthCheck := client.NewWorkflow("health-check", - hatchet.WithWorkflowCron("0 * * * *"), // Every hour - hatchet.WithWorkflowDescription("Hourly system health monitoring"), - ) - - healthCheck.NewTask("check-system-health", func(ctx hatchet.Context, input CronInput) (CronOutput, error) { - log.Printf("Running health check at %s", input.Timestamp) - - // Simulate health check work - time.Sleep(500 * time.Millisecond) - - return CronOutput{ - JobName: "health-check", - ExecutedAt: time.Now().Format(time.RFC3339), - NextRun: "Next run: top of next hour", - }, nil - }) - - // Weekly report - runs every Monday at 9 AM - weeklyReport := client.NewWorkflow("weekly-report", - hatchet.WithWorkflowCron("0 9 * * 1"), // 9 AM every Monday - hatchet.WithWorkflowDescription("Weekly business metrics report"), - ) - - weeklyReport.NewTask("generate-report", func(ctx hatchet.Context, input CronInput) (CronOutput, error) { - log.Printf("Generating weekly report at %s", input.Timestamp) - - // Simulate report generation - time.Sleep(5 * time.Second) - - return CronOutput{ - JobName: "weekly-report", - ExecutedAt: time.Now().Format(time.RFC3339), - NextRun: "Next run: next Monday at 9 AM", - }, nil - }) - - // Multiple cron expressions for business hours monitoring - businessHoursMonitor := client.NewWorkflow("business-hours-monitor", - hatchet.WithWorkflowCron( - "0 9-17 * * 1-5", // Every hour from 9 AM to 5 PM, Monday to Friday - "0 12 * * 6", // Saturday at noon - ), - hatchet.WithWorkflowDescription("Monitor systems during business hours"), - ) - - businessHoursMonitor.NewTask("monitor-business-systems", func(ctx hatchet.Context, input CronInput) (CronOutput, error) { - log.Printf("Monitoring business systems at %s", input.Timestamp) - - return CronOutput{ - JobName: "business-hours-monitor", - ExecutedAt: time.Now().Format(time.RFC3339), - NextRun: "Next run: next business hour", - }, nil - }) - - worker, err := client.NewWorker("cron-worker", - hatchet.WithWorkflows(dailyCleanup, healthCheck, weeklyReport, businessHoursMonitor), - ) - if err != nil { - log.Fatalf("failed to create worker: %v", err) - } - - log.Println("Starting cron worker...") - log.Println("Scheduled jobs:") - log.Println(" - daily-cleanup: 0 2 * * * (2 AM daily)") - log.Println(" - health-check: 0 * * * * (every hour)") - log.Println(" - weekly-report: 0 9 * * 1 (9 AM every Monday)") - log.Println(" - business-hours-monitor: 0 9-17 * * 1-5, 0 12 * * 6 (business hours)") - - if err := worker.StartBlocking(); err != nil { - log.Fatalf("failed to start worker: %v", err) - } -} \ No newline at end of file diff --git a/examples/go/v1/dag/main.go b/examples/go/v1/dag/main.go deleted file mode 100644 index 1e56c228e..000000000 --- a/examples/go/v1/dag/main.go +++ /dev/null @@ -1,96 +0,0 @@ -package main - -import ( - "context" - "log" - - hatchet "github.com/hatchet-dev/hatchet/sdks/go" -) - -type Input struct { - Value int `json:"value"` -} - -type StepOutput struct { - Step int `json:"step"` - Result int `json:"result"` -} - -func main() { - client, err := hatchet.NewClient() - if err != nil { - log.Fatalf("failed to create hatchet client: %v", err) - } - - // Create a DAG workflow - workflow := client.NewWorkflow("dag-workflow") - - // Step 1: Initial processing - step1 := workflow.NewTask("step-1", func(ctx hatchet.Context, input Input) (StepOutput, error) { - return StepOutput{ - Step: 1, - Result: input.Value * 2, - }, nil - }) - - // Step 2: Depends on step 1 - step2 := workflow.NewTask("step-2", func(ctx hatchet.Context, input Input) (StepOutput, error) { - // Get output from step 1 - var step1Output StepOutput - if err := ctx.ParentOutput(step1, &step1Output); err != nil { - return StepOutput{}, err - } - - return StepOutput{ - Step: 2, - Result: step1Output.Result + 10, - }, nil - }, hatchet.WithParents(step1)) - - // Step 3: Also depends on step 1, parallel to step 2 - step3 := workflow.NewTask("step-3", func(ctx hatchet.Context, input Input) (StepOutput, error) { - // Get output from step 1 - var step1Output StepOutput - if err := ctx.ParentOutput(step1, &step1Output); err != nil { - return StepOutput{}, err - } - - return StepOutput{ - Step: 3, - Result: step1Output.Result * 3, - }, nil - }, hatchet.WithParents(step1)) - - // Final step: Combines outputs from step 2 and step 3 - finalStep := workflow.NewTask("final-step", func(ctx hatchet.Context, input Input) (StepOutput, error) { - var step2Output, step3Output StepOutput - - if err := ctx.ParentOutput(step2, &step2Output); err != nil { - return StepOutput{}, err - } - if err := ctx.ParentOutput(step3, &step3Output); err != nil { - return StepOutput{}, err - } - - return StepOutput{ - Step: 4, - Result: step2Output.Result + step3Output.Result, - }, nil - }, hatchet.WithParents(step2, step3)) - _ = finalStep // Task reference available - - worker, err := client.NewWorker("dag-worker", hatchet.WithWorkflows(workflow)) - if err != nil { - log.Fatalf("failed to create worker: %v", err) - } - - // Run the workflow - _, err = client.Run(context.Background(), "dag-workflow", Input{Value: 5}) - if err != nil { - log.Fatalf("failed to run workflow: %v", err) - } - - if err := worker.StartBlocking(); err != nil { - log.Fatalf("failed to start worker: %v", err) - } -} diff --git a/examples/go/v1/durable/main.go b/examples/go/v1/durable/main.go deleted file mode 100644 index 9f1826e14..000000000 --- a/examples/go/v1/durable/main.go +++ /dev/null @@ -1,68 +0,0 @@ -package main - -import ( - "context" - "log" - "time" - - hatchet "github.com/hatchet-dev/hatchet/sdks/go" -) - -type DurableInput struct { - Message string `json:"message"` - Delay int `json:"delay"` // seconds -} - -type DurableOutput struct { - ProcessedAt string `json:"processed_at"` - Message string `json:"message"` -} - -func main() { - client, err := hatchet.NewClient() - if err != nil { - log.Fatalf("failed to create hatchet client: %v", err) - } - - // Create a workflow with a durable task that can sleep - workflow := client.NewWorkflow("durable-workflow") - - durableTask := workflow.NewDurableTask("long-running-task", func(ctx hatchet.DurableContext, input DurableInput) (DurableOutput, error) { - log.Printf("Starting task, will sleep for %d seconds", input.Delay) - - // Durable sleep - this can be resumed if the worker restarts - if _, err := ctx.SleepFor(time.Duration(input.Delay) * time.Second); err != nil { - return DurableOutput{}, err - } - - log.Printf("Finished sleeping, processing message: %s", input.Message) - - return DurableOutput{ - ProcessedAt: time.Now().Format(time.RFC3339), - Message: "Processed: " + input.Message, - }, nil - }) - _ = durableTask // Durable task reference available - - worker, err := client.NewWorker("durable-worker", - hatchet.WithWorkflows(workflow), - hatchet.WithDurableSlots(10), // Allow up to 10 concurrent durable tasks - ) - if err != nil { - log.Fatalf("failed to create worker: %v", err) - } - - // Run the workflow with a 30-second delay - _, err = client.Run(context.Background(), "durable-workflow", DurableInput{ - Message: "Hello from durable task!", - Delay: 30, - }) - if err != nil { - log.Fatalf("failed to run workflow: %v", err) - } - - log.Println("Workflow started. Worker will process it...") - if err := worker.StartBlocking(); err != nil { - log.Fatalf("failed to start worker: %v", err) - } -} \ No newline at end of file diff --git a/examples/go/v1/events/main.go b/examples/go/v1/events/main.go deleted file mode 100644 index 62c87ffbd..000000000 --- a/examples/go/v1/events/main.go +++ /dev/null @@ -1,94 +0,0 @@ -package main - -import ( - "context" - "log" - "time" - - hatchet "github.com/hatchet-dev/hatchet/sdks/go" -) - -type EventInput struct { - UserID string `json:"user_id"` - Action string `json:"action"` - Payload any `json:"payload"` -} - -type ProcessOutput struct { - ProcessedAt string `json:"processed_at"` - UserID string `json:"user_id"` - Action string `json:"action"` - Result string `json:"result"` -} - -func main() { - client, err := hatchet.NewClient() - if err != nil { - log.Fatalf("failed to create hatchet client: %v", err) - } - - // Create an event-triggered standalone task - workflow := client.NewWorkflow("process-user-event", - hatchet.WithWorkflowEvents("user:created", "user:updated"), - ) - - workflow.NewTask("process-user-event", func(ctx hatchet.Context, input EventInput) (ProcessOutput, error) { - log.Printf("Processing %s event for user %s", input.Action, input.UserID) - log.Printf("Event payload contains: %+v", input.Payload) - - return ProcessOutput{ - ProcessedAt: time.Now().Format(time.RFC3339), - UserID: input.UserID, - Action: input.Action, - Result: "Event processed successfully", - }, nil - }) - - worker, err := client.NewWorker("event-worker", hatchet.WithWorkflows(workflow)) - if err != nil { - log.Fatalf("failed to create worker: %v", err) - } - - // Send events in a separate goroutine - go func() { - time.Sleep(3 * time.Second) // Wait for worker to start - - // Send a user:created event - log.Println("Sending user:created event...") - err := client.Events().Push(context.Background(), "user:created", EventInput{ - UserID: "user-123", - Action: "created", - Payload: map[string]any{ - "email": "user@example.com", - "name": "John Doe", - }, - }) - if err != nil { - log.Printf("Failed to send user:created event: %v", err) - } - - // Send another event after a delay - time.Sleep(5 * time.Second) - log.Println("Sending user:updated event...") - err = client.Events().Push(context.Background(), "user:updated", EventInput{ - UserID: "user-123", - Action: "updated", - Payload: map[string]any{ - "email": "newemail@example.com", - "name": "John Doe", - }, - }) - if err != nil { - log.Printf("Failed to send user:updated event: %v", err) - } - }() - - log.Println("Starting event worker...") - log.Println("Features demonstrated:") - log.Println(" - Event-triggered standalone tasks") - log.Println(" - Processing event payloads") - log.Println(" - Real event sending and handling") - if err := worker.StartBlocking(); err != nil { - log.Fatalf("failed to start worker: %v", err) - } -} diff --git a/examples/go/v1/on-failure/main.go b/examples/go/v1/on-failure/main.go deleted file mode 100644 index 2d5bb2e16..000000000 --- a/examples/go/v1/on-failure/main.go +++ /dev/null @@ -1,227 +0,0 @@ -package main - -import ( - "context" - "errors" - "log" - "time" - - hatchet "github.com/hatchet-dev/hatchet/sdks/go" -) - -type FailureInput struct { - Message string `json:"message"` - ShouldFail bool `json:"should_fail"` - FailureType string `json:"failure_type"` -} - -type TaskOutput struct { - Status string `json:"status"` - Message string `json:"message"` -} - -type FailureHandlerOutput struct { - FailureHandled bool `json:"failure_handled"` - ErrorDetails string `json:"error_details"` - OriginalInput string `json:"original_input"` -} - -func main() { - client, err := hatchet.NewClient() - if err != nil { - log.Fatalf("failed to create hatchet client: %v", err) - } - - // Create workflow with failure handling - failureWorkflow := client.NewWorkflow("failure-handling-demo", - hatchet.WithWorkflowDescription("Demonstrates workflow failure handling patterns"), - hatchet.WithWorkflowVersion("1.0.0"), - ) - - // Task that may fail based on input - failureWorkflow.NewTask("potentially-failing-task", func(ctx hatchet.Context, input FailureInput) (TaskOutput, error) { - log.Printf("Processing task with message: %s", input.Message) - - if input.ShouldFail { - switch input.FailureType { - case "panic": - log.Println("Task will panic!") - panic("intentional panic for demonstration") - case "timeout": - log.Println("Task will timeout!") - time.Sleep(10 * time.Second) // This will timeout - case "error": - log.Println("Task will return error!") - return TaskOutput{}, errors.New("intentional error for demonstration") - default: - log.Println("Task will return generic error!") - return TaskOutput{}, errors.New("generic failure") - } - } - - log.Println("Task completed successfully") - return TaskOutput{ - Status: "success", - Message: "Task completed: " + input.Message, - }, nil - }, hatchet.WithTimeout(5*time.Second)) - - // Add failure handler to the workflow - failureWorkflow.OnFailure(func(ctx hatchet.Context, input FailureInput) (FailureHandlerOutput, error) { - log.Printf("Failure handler called for input: %s", input.Message) - - // Access step run errors to understand what failed - stepErrors := ctx.StepRunErrors() - var errorDetails string - for stepName, errorMsg := range stepErrors { - log.Printf("Step '%s' failed with error: %s", stepName, errorMsg) - errorDetails += stepName + ": " + errorMsg + "; " - } - - // Log failure details - log.Printf("Handling failure for workflow. Error details: %s", errorDetails) - - // Perform cleanup or notification logic here - log.Println("Performing failure cleanup...") - - return FailureHandlerOutput{ - FailureHandled: true, - ErrorDetails: errorDetails, - OriginalInput: input.Message, - }, nil - }) - - // Create workflow with multi-step failure - multiStepWorkflow := client.NewWorkflow("multi-step-failure-demo", - hatchet.WithWorkflowDescription("Demonstrates failure handling in multi-step workflows"), - hatchet.WithWorkflowVersion("1.0.0"), - ) - - // First task (succeeds) - step1 := multiStepWorkflow.NewTask("first-step", func(ctx hatchet.Context, input FailureInput) (TaskOutput, error) { - log.Printf("First step processing: %s", input.Message) - return TaskOutput{ - Status: "success", - Message: "First step completed", - }, nil - }) - - // Second task (may fail, depends on first) - multiStepWorkflow.NewTask("second-step", func(ctx hatchet.Context, input FailureInput) (TaskOutput, error) { - // Get output from previous step - var step1Output TaskOutput - if err := ctx.StepOutput("first-step", &step1Output); err != nil { - log.Printf("Failed to get first step output: %v", err) - return TaskOutput{}, err - } - - log.Printf("Second step processing after first step: %s", step1Output.Message) - - if input.ShouldFail { - log.Println("Second step will fail!") - return TaskOutput{}, errors.New("second step intentional failure") - } - - return TaskOutput{ - Status: "success", - Message: "Second step completed after: " + step1Output.Message, - }, nil - }, hatchet.WithParents(step1)) - - // Add failure handler for multi-step workflow - multiStepWorkflow.OnFailure(func(ctx hatchet.Context, input FailureInput) (FailureHandlerOutput, error) { - log.Printf("Multi-step failure handler called for input: %s", input.Message) - - stepErrors := ctx.StepRunErrors() - var errorDetails string - for stepName, errorMsg := range stepErrors { - log.Printf("Multi-step: Step '%s' failed with error: %s", stepName, errorMsg) - errorDetails += stepName + ": " + errorMsg + "; " - } - - // Access successful step outputs for cleanup - var step1Output TaskOutput - if err := ctx.StepOutput("first-step", &step1Output); err == nil { - log.Printf("First step completed successfully with: %s", step1Output.Message) - } - - return FailureHandlerOutput{ - FailureHandled: true, - ErrorDetails: "Multi-step workflow failed: " + errorDetails, - OriginalInput: input.Message, - }, nil - }) - - // Create a worker with both workflows - worker, err := client.NewWorker("failure-handling-worker", - hatchet.WithWorkflows(failureWorkflow, multiStepWorkflow), - hatchet.WithSlots(3), - ) - if err != nil { - log.Fatalf("failed to create worker: %v", err) - } - - // Run workflow instances to demonstrate failure handling - go func() { - time.Sleep(2 * time.Second) - - // Demo 1: Successful workflow - log.Println("\n=== Demo 1: Successful Workflow ===") - _, err := client.Run(context.Background(), "failure-handling-demo", FailureInput{ - Message: "This workflow will succeed", - ShouldFail: false, - }) - if err != nil { - log.Printf("Error in successful workflow: %v", err) - } - - time.Sleep(2 * time.Second) - - // Demo 2: Workflow with error - log.Println("\n=== Demo 2: Workflow with Error ===") - _, err = client.Run(context.Background(), "failure-handling-demo", FailureInput{ - Message: "This workflow will fail with error", - ShouldFail: true, - FailureType: "error", - }) - if err != nil { - log.Printf("Expected error in failing workflow: %v", err) - } - - time.Sleep(2 * time.Second) - - // Demo 3: Multi-step workflow failure - log.Println("\n=== Demo 3: Multi-step Workflow Failure ===") - _, err = client.Run(context.Background(), "multi-step-failure-demo", FailureInput{ - Message: "This multi-step workflow will fail in second step", - ShouldFail: true, - }) - if err != nil { - log.Printf("Expected error in multi-step workflow: %v", err) - } - - time.Sleep(2 * time.Second) - - // Demo 4: Multi-step workflow success - log.Println("\n=== Demo 4: Multi-step Workflow Success ===") - _, err = client.Run(context.Background(), "multi-step-failure-demo", FailureInput{ - Message: "This multi-step workflow will succeed", - ShouldFail: false, - }) - if err != nil { - log.Printf("Error in successful multi-step workflow: %v", err) - } - }() - - log.Println("Starting worker for failure handling demos...") - log.Println("Features demonstrated:") - log.Println(" - Workflow failure handlers (OnFailure)") - log.Println(" - Error details access in failure handlers") - log.Println(" - Multi-step workflow failure handling") - log.Println(" - Successful step output access during failure") - log.Println(" - Different failure types (error, timeout, panic)") - - if err := worker.StartBlocking(); err != nil { - log.Fatalf("failed to start worker: %v", err) - } -} diff --git a/examples/go/v1/priority/main.go b/examples/go/v1/priority/main.go deleted file mode 100644 index 8efa862c8..000000000 --- a/examples/go/v1/priority/main.go +++ /dev/null @@ -1,162 +0,0 @@ -package main - -import ( - "context" - "log" - "time" - - hatchet "github.com/hatchet-dev/hatchet/sdks/go" -) - -type PriorityInput struct { - UserID string `json:"user_id"` - TaskType string `json:"task_type"` - Message string `json:"message"` - IsPremium bool `json:"is_premium"` -} - -type PriorityOutput struct { - ProcessedMessage string `json:"processed_message"` - Priority int32 `json:"priority"` - ProcessedAt time.Time `json:"processed_at"` - UserID string `json:"user_id"` -} - -func main() { - clientInstance, err := hatchet.NewClient() - if err != nil { - log.Fatalf("failed to create hatchet client: %v", err) - } - - // Create workflow that demonstrates priority-based processing - priorityWorkflow := clientInstance.NewWorkflow("priority-demo", - hatchet.WithWorkflowDescription("Demonstrates priority-based task processing"), - hatchet.WithWorkflowVersion("1.0.0"), - ) - - // Task that processes requests based on priority - priorityWorkflow.NewTask("process-request", func(ctx hatchet.Context, input PriorityInput) (PriorityOutput, error) { - // Access the current priority from context - currentPriority := ctx.Priority() - - log.Printf("Processing %s request for user %s with priority %d", - input.TaskType, input.UserID, currentPriority) - - // Simulate different processing times based on priority - // Higher priority = faster processing - var processingTime time.Duration - switch { - case currentPriority >= 3: // High priority - processingTime = 500 * time.Millisecond - log.Printf("High priority processing for user %s", input.UserID) - case currentPriority >= 2: // Medium priority - processingTime = 2 * time.Second - log.Printf("Medium priority processing for user %s", input.UserID) - default: // Low priority - processingTime = 5 * time.Second - log.Printf("Low priority processing for user %s", input.UserID) - } - - // Simulate processing work - time.Sleep(processingTime) - - processedMessage := "" - if input.IsPremium { - processedMessage = "PREMIUM: " + input.Message - } else { - processedMessage = "STANDARD: " + input.Message - } - - log.Printf("Completed processing for user %s with priority %d", input.UserID, currentPriority) - - return PriorityOutput{ - ProcessedMessage: processedMessage, - Priority: currentPriority, - ProcessedAt: time.Now(), - UserID: input.UserID, - }, nil - }) - - // Create a worker to process the workflows - worker, err := clientInstance.NewWorker("priority-worker", - hatchet.WithWorkflows(priorityWorkflow), - hatchet.WithSlots(5), // Allow parallel processing - ) - if err != nil { - log.Fatalf("failed to create worker: %v", err) - } - - // Function to run workflow with specific priority - runWithPriority := func(priority int32, input PriorityInput, delay time.Duration) { - time.Sleep(delay) - log.Printf("Submitting %s task with priority %d for user %s", - input.TaskType, priority, input.UserID) - - // Run workflow with specific priority - _, err := clientInstance.Run(context.Background(), "priority-demo", input, hatchet.WithPriority(priority)) - if err != nil { - log.Printf("Failed to run workflow with priority %d: %v", priority, err) - } - } - - // Run multiple workflows with different priorities to demonstrate priority handling - go func() { - time.Sleep(2 * time.Second) - - log.Println("\n=== Priority Demo Started ===") - log.Println("Submitting tasks in order: Low, High, Medium priority") - log.Println("Watch the processing order - high priority should be processed first!") - - // Submit low priority task first - go runWithPriority(1, PriorityInput{ - UserID: "user-001", - TaskType: "report", - Message: "Generate monthly report", - IsPremium: false, - }, 0) - - // Submit high priority task second (but should be processed first) - go runWithPriority(4, PriorityInput{ - UserID: "user-002", - TaskType: "alert", - Message: "Critical system alert", - IsPremium: true, - }, 100*time.Millisecond) - - // Submit medium priority task third - go runWithPriority(2, PriorityInput{ - UserID: "user-003", - TaskType: "notification", - Message: "User notification", - IsPremium: false, - }, 200*time.Millisecond) - - // Submit another high priority task - go runWithPriority(5, PriorityInput{ - UserID: "user-004", - TaskType: "emergency", - Message: "Emergency response needed", - IsPremium: true, - }, 300*time.Millisecond) - - // Submit more tasks to show queuing behavior - go runWithPriority(1, PriorityInput{ - UserID: "user-005", - TaskType: "backup", - Message: "System backup", - IsPremium: false, - }, 400*time.Millisecond) - }() - - log.Println("Starting worker for priority demo...") - log.Println("Features demonstrated:") - log.Println(" - Task priority configuration") - log.Println(" - Priority-based processing order") - log.Println(" - Accessing current priority in task context") - log.Println(" - Different processing behavior based on priority") - log.Println(" - Premium vs standard user handling") - - if err := worker.StartBlocking(); err != nil { - log.Fatalf("failed to start worker: %v", err) - } -} diff --git a/examples/go/v1/rate-limiting/main.go b/examples/go/v1/rate-limiting/main.go deleted file mode 100644 index bf4d2a6ea..000000000 --- a/examples/go/v1/rate-limiting/main.go +++ /dev/null @@ -1,193 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "time" - - hatchet "github.com/hatchet-dev/hatchet/sdks/go" - "github.com/hatchet-dev/hatchet/pkg/client/types" -) - -type APIRequest struct { - UserID string `json:"user_id"` - Action string `json:"action"` - Timestamp string `json:"timestamp"` -} - -type APIResponse struct { - UserID string `json:"user_id"` - Action string `json:"action"` - ProcessedAt time.Time `json:"processed_at"` - Success bool `json:"success"` -} - -func main() { - client, err := hatchet.NewClient() - if err != nil { - log.Fatalf("failed to create hatchet client: %v", err) - } - - // Create workflow with static rate limiting (global limit) - staticRateLimitWorkflow := client.NewWorkflow("static-rate-limit-demo", - hatchet.WithWorkflowDescription("Demonstrates static rate limiting"), - hatchet.WithWorkflowVersion("1.0.0"), - ) - - // Task with static rate limit - 5 requests per second globally - units := 1 - staticRateLimitWorkflow.NewTask("api-call", - func(ctx hatchet.Context, input APIRequest) (APIResponse, error) { - log.Printf("Processing API call for user %s, action: %s", input.UserID, input.Action) - - // Simulate API processing time - time.Sleep(100 * time.Millisecond) - - return APIResponse{ - UserID: input.UserID, - Action: input.Action, - ProcessedAt: time.Now(), - Success: true, - }, nil - }, - hatchet.WithRateLimits(&types.RateLimit{ - Key: "global-api-limit", - Units: &units, - }), - ) - - // Create workflow with dynamic rate limiting (per-user limit) - dynamicRateLimitWorkflow := client.NewWorkflow("dynamic-rate-limit-demo", - hatchet.WithWorkflowDescription("Demonstrates dynamic rate limiting per user"), - hatchet.WithWorkflowVersion("1.0.0"), - ) - - // Task with dynamic rate limit - 3 requests per second per user - userUnits := 1 - perSecond := types.Second - keyExpression := "input.user_id" - dynamicRateLimitWorkflow.NewTask("user-api-call", - func(ctx hatchet.Context, input APIRequest) (APIResponse, error) { - log.Printf("Processing user-specific API call for user %s, action: %s", input.UserID, input.Action) - - // Simulate API processing time - time.Sleep(200 * time.Millisecond) - - return APIResponse{ - UserID: input.UserID, - Action: input.Action, - ProcessedAt: time.Now(), - Success: true, - }, nil - }, - hatchet.WithRateLimits(&types.RateLimit{ - KeyExpr: &keyExpression, - Units: &userUnits, - Duration: &perSecond, - }), - ) - - // Create workflow with multiple rate limits (both global and per-user) - multiRateLimitWorkflow := client.NewWorkflow("multi-rate-limit-demo", - hatchet.WithWorkflowDescription("Demonstrates multiple rate limits on a single task"), - hatchet.WithWorkflowVersion("1.0.0"), - ) - - // Task with both global and per-user rate limits - globalUnits := 2 - userSpecificUnits := 1 - multiRateLimitWorkflow.NewTask("premium-api-call", - func(ctx hatchet.Context, input APIRequest) (APIResponse, error) { - log.Printf("Processing premium API call for user %s, action: %s", input.UserID, input.Action) - - // Simulate more complex processing for premium API - time.Sleep(300 * time.Millisecond) - - return APIResponse{ - UserID: input.UserID, - Action: "PREMIUM_" + input.Action, - ProcessedAt: time.Now(), - Success: true, - }, nil - }, - hatchet.WithRateLimits( - // Global rate limit - 10 requests per second across all users - &types.RateLimit{ - Key: "premium-global-limit", - Units: &globalUnits, - }, - // Per-user rate limit - 2 requests per second per user - &types.RateLimit{ - KeyExpr: &keyExpression, - Units: &userSpecificUnits, - Duration: &perSecond, - }, - ), - ) - - // Create a worker with all rate-limited workflows - worker, err := client.NewWorker("rate-limit-worker", - hatchet.WithWorkflows(staticRateLimitWorkflow, dynamicRateLimitWorkflow, multiRateLimitWorkflow), - hatchet.WithSlots(10), // Allow more slots to see rate limiting in action - ) - if err != nil { - log.Fatalf("failed to create worker: %v", err) - } - - // Function to submit multiple requests rapidly to test rate limiting - submitRequests := func(workflowName string, userPrefix string, count int, delay time.Duration) { - for i := 0; i < count; i++ { - go func(index int) { - time.Sleep(time.Duration(index) * delay) - - userID := fmt.Sprintf("%s-user-%d", userPrefix, (index%3)+1) // Cycle through 3 users - - _, err := client.Run(context.Background(), workflowName, APIRequest{ - UserID: userID, - Action: fmt.Sprintf("action-%d", index+1), - Timestamp: time.Now().Format(time.RFC3339), - }) - if err != nil { - log.Printf("Failed to submit request %d: %v", index+1, err) - } - }(i) - } - } - - // Demonstrate rate limiting behavior - go func() { - time.Sleep(3 * time.Second) - - log.Println("\n=== Static Rate Limiting Demo ===") - log.Println("Submitting 10 requests rapidly to global rate limit workflow") - log.Println("Watch how they get processed at the rate limit speed...") - submitRequests("static-rate-limit-demo", "static", 10, 50*time.Millisecond) - - time.Sleep(8 * time.Second) - - log.Println("\n=== Dynamic Rate Limiting Demo ===") - log.Println("Submitting 15 requests from 3 different users") - log.Println("Each user has their own rate limit bucket...") - submitRequests("dynamic-rate-limit-demo", "dynamic", 15, 30*time.Millisecond) - - time.Sleep(10 * time.Second) - - log.Println("\n=== Multi Rate Limiting Demo ===") - log.Println("Submitting requests with both global and per-user limits") - log.Println("Requests will be throttled by both limits...") - submitRequests("multi-rate-limit-demo", "multi", 12, 25*time.Millisecond) - }() - - log.Println("Starting worker for rate limiting demos...") - log.Println("Features demonstrated:") - log.Println(" - Static/global rate limiting") - log.Println(" - Dynamic rate limiting with key expressions") - log.Println(" - Per-user rate limiting") - log.Println(" - Multiple rate limits on a single task") - log.Println(" - Rate limit units and duration configuration") - - if err := worker.StartBlocking(); err != nil { - log.Fatalf("failed to start worker: %v", err) - } -} \ No newline at end of file diff --git a/examples/go/v1/retries-concurrency/main.go b/examples/go/v1/retries-concurrency/main.go deleted file mode 100644 index 560c4bd6a..000000000 --- a/examples/go/v1/retries-concurrency/main.go +++ /dev/null @@ -1,103 +0,0 @@ -package main - -import ( - "context" - "errors" - "fmt" - "log" - "math/rand" - "time" - - "github.com/hatchet-dev/hatchet/pkg/client/types" - hatchet "github.com/hatchet-dev/hatchet/sdks/go" -) - -type TaskInput struct { - ID string `json:"id"` - Category string `json:"category"` - Payload string `json:"payload"` -} - -type TaskOutput struct { - ID string `json:"id"` - Attempt int `json:"attempt"` - ProcessedAt string `json:"processed_at"` - Result string `json:"result"` -} - -func main() { - client, err := hatchet.NewClient() - if err != nil { - log.Fatalf("failed to create hatchet client: %v", err) - } - - workflow := client.NewWorkflow("retry-concurrency-workflow") - - // Task with retries and concurrency control - var maxRuns int32 = 2 - strategy := types.GroupRoundRobin - - workflow.NewTask("unreliable-task", func(ctx hatchet.Context, input TaskInput) (TaskOutput, error) { - attempt := ctx.RetryCount() - log.Printf("Processing task %s (attempt %d)", input.ID, attempt) - - // Simulate unreliable processing - fails 70% of the time on first attempt - if attempt == 0 && rand.Float32() < 0.7 { - return TaskOutput{}, errors.New("simulated failure") - } - - // Simulate some processing time - time.Sleep(time.Duration(100+rand.Intn(400)) * time.Millisecond) - - return TaskOutput{ - ID: input.ID, - Attempt: attempt, - ProcessedAt: time.Now().Format(time.RFC3339), - Result: "Successfully processed " + input.Payload, - }, nil - }, - hatchet.WithRetries(3), - hatchet.WithRetryBackoff(2.0, 60), // Exponential backoff: 2s, 4s, 8s, then cap at 60s - hatchet.WithTimeout(30*time.Second), - hatchet.WithConcurrency(&types.Concurrency{ - Expression: "input.Category", // Limit concurrency per category - MaxRuns: &maxRuns, // Max 2 concurrent tasks per category - LimitStrategy: &strategy, // Round-robin distribution - }), - ) - - worker, err := client.NewWorker("retry-worker", - hatchet.WithWorkflows(workflow), - hatchet.WithSlots(10), // Allow up to 10 concurrent tasks total - ) - if err != nil { - log.Fatalf("failed to create worker: %v", err) - } - - // Submit multiple tasks with different categories - go func() { - time.Sleep(2 * time.Second) - - categories := []string{"high-priority", "normal", "batch"} - - for i := 0; i < 10; i++ { - category := categories[rand.Intn(len(categories))] - - _, err = client.Run(context.Background(), "retry-concurrency-workflow", TaskInput{ - ID: fmt.Sprintf("task-%d", i), - Category: category, - Payload: fmt.Sprintf("data for task %d", i), - }) - if err != nil { - log.Printf("failed to submit task %d: %v", i, err) - } - - time.Sleep(100 * time.Millisecond) - } - }() - - log.Println("Starting worker with retry and concurrency controls...") - if err := worker.StartBlocking(); err != nil { - log.Fatalf("failed to start worker: %v", err) - } -} diff --git a/examples/go/v1/simple/main.go b/examples/go/v1/simple/main.go deleted file mode 100644 index ae4843d36..000000000 --- a/examples/go/v1/simple/main.go +++ /dev/null @@ -1,51 +0,0 @@ -package main - -import ( - "context" - "log" - - hatchet "github.com/hatchet-dev/hatchet/sdks/go" -) - -type SimpleInput struct { - Message string `json:"message"` -} - -type SimpleOutput struct { - Result string `json:"result"` -} - -func main() { - client, err := hatchet.NewClient() - if err != nil { - log.Fatalf("failed to create hatchet client: %v", err) - } - - // Create a simple workflow with one task - workflow := client.NewWorkflow("simple-workflow") - task := workflow.NewTask("process-message", func(ctx hatchet.Context, input SimpleInput) (SimpleOutput, error) { - return SimpleOutput{ - Result: "Processed: " + input.Message, - }, nil - }) - _ = task // Task reference available for building DAGs - - // Create a worker to run the workflow - worker, err := client.NewWorker("simple-worker", hatchet.WithWorkflows(workflow)) - if err != nil { - log.Fatalf("failed to create worker: %v", err) - } - - // Run a workflow instance - _, err = client.Run(context.Background(), "simple-workflow", SimpleInput{ - Message: "Hello, World!", - }) - if err != nil { - log.Fatalf("failed to run workflow: %v", err) - } - - // Start the worker (blocks) - if err := worker.StartBlocking(); err != nil { - log.Fatalf("failed to start worker: %v", err) - } -} \ No newline at end of file diff --git a/examples/go/v1/sticky-workers/main.go b/examples/go/v1/sticky-workers/main.go deleted file mode 100644 index 79379c4fd..000000000 --- a/examples/go/v1/sticky-workers/main.go +++ /dev/null @@ -1,278 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "time" - - "github.com/hatchet-dev/hatchet/pkg/worker" - hatchet "github.com/hatchet-dev/hatchet/sdks/go" -) - -type StickyInput struct { - SessionID string `json:"session_id"` - Message string `json:"message"` - Step int `json:"step"` -} - -type StickyOutput struct { - SessionID string `json:"session_id"` - WorkerID string `json:"worker_id"` - ProcessedAt string `json:"processed_at"` - Message string `json:"message"` - Step int `json:"step"` -} - -type SessionState struct { - SessionID string `json:"session_id"` - WorkerID string `json:"worker_id"` - StepCount int `json:"step_count"` - LastMessage string `json:"last_message"` -} - -func main() { - client, err := hatchet.NewClient() - if err != nil { - log.Fatalf("failed to create hatchet client: %v", err) - } - - // Create session workflow that maintains state on the same worker - sessionWorkflow := client.NewWorkflow("session-demo", - hatchet.WithWorkflowDescription("Demonstrates sticky session processing"), - hatchet.WithWorkflowVersion("1.0.0"), - ) - - // Step 1: Initialize session - step1 := sessionWorkflow.NewTask("initialize-session", func(ctx hatchet.Context, input StickyInput) (SessionState, error) { - workerID := ctx.Worker().ID() - log.Printf("[Worker %s] Initializing session %s", workerID, input.SessionID) - - return SessionState{ - SessionID: input.SessionID, - WorkerID: workerID, - StepCount: 1, - LastMessage: input.Message, - }, nil - }) - - // Step 2: Process session (runs on same worker) - sessionWorkflow.NewTask("process-session", func(ctx hatchet.Context, input StickyInput) (SessionState, error) { - workerID := ctx.Worker().ID() - - // Get previous step's output - var sessionState SessionState - if err := ctx.StepOutput("initialize-session", &sessionState); err != nil { - return SessionState{}, fmt.Errorf("failed to get session state: %w", err) - } - - log.Printf("[Worker %s] Processing session %s (was initialized on worker %s)", - workerID, input.SessionID, sessionState.WorkerID) - - // Update session state - sessionState.StepCount++ - sessionState.LastMessage = input.Message - - return sessionState, nil - }, hatchet.WithParents(step1)) - - // Child workflow for sticky child execution - childWorkflow := client.NewWorkflow("sticky-child-demo", - hatchet.WithWorkflowDescription("Child workflow that runs on same worker as parent"), - hatchet.WithWorkflowVersion("1.0.0"), - ) - - childWorkflow.NewTask("child-task", func(ctx hatchet.Context, input StickyInput) (StickyOutput, error) { - workerID := ctx.Worker().ID() - log.Printf("[Worker %s] Processing child task for session %s", workerID, input.SessionID) - - return StickyOutput{ - SessionID: input.SessionID, - WorkerID: workerID, - ProcessedAt: time.Now().Format(time.RFC3339), - Message: "CHILD: " + input.Message, - Step: input.Step, - }, nil - }) - - // Parent workflow that spawns sticky child workflows - parentWorkflow := client.NewWorkflow("sticky-parent-demo", - hatchet.WithWorkflowDescription("Parent workflow that spawns sticky child workflows"), - hatchet.WithWorkflowVersion("1.0.0"), - ) - - parentWorkflow.NewTask("spawn-sticky-children", func(ctx hatchet.Context, input StickyInput) ([]StickyOutput, error) { - workerID := ctx.Worker().ID() - log.Printf("[Worker %s] Parent spawning sticky children for session %s", workerID, input.SessionID) - - var results []StickyOutput - - // Spawn multiple child workflows that should all run on the same worker - for i := 1; i <= 3; i++ { - log.Printf("[Worker %s] Spawning sticky child %d", workerID, i) - - sticky := true - childResult, err := ctx.SpawnWorkflow("sticky-child-demo", StickyInput{ - SessionID: input.SessionID, - Message: fmt.Sprintf("Child %d message", i), - Step: i, - }, &worker.SpawnWorkflowOpts{ - Key: stringPtr(fmt.Sprintf("child-%s-%d", input.SessionID, i)), - Sticky: &sticky, // This ensures child runs on same worker - }) - if err != nil { - return nil, fmt.Errorf("failed to spawn sticky child %d: %w", i, err) - } - - // Wait for child to complete - result, err := childResult.Result() - if err != nil { - return nil, fmt.Errorf("sticky child %d failed: %w", i, err) - } - - var childOutput StickyOutput - if err := result.StepOutput("child-task", &childOutput); err != nil { - return nil, fmt.Errorf("failed to parse child %d output: %w", i, err) - } - - log.Printf("[Worker %s] Child %d completed on worker %s", workerID, i, childOutput.WorkerID) - results = append(results, childOutput) - } - - return results, nil - }) - - // Comparison workflow that spawns non-sticky children - nonStickyWorkflow := client.NewWorkflow("non-sticky-demo", - hatchet.WithWorkflowDescription("Demonstrates non-sticky child workflow execution"), - hatchet.WithWorkflowVersion("1.0.0"), - ) - - nonStickyWorkflow.NewTask("spawn-regular-children", func(ctx hatchet.Context, input StickyInput) ([]StickyOutput, error) { - workerID := ctx.Worker().ID() - log.Printf("[Worker %s] Parent spawning non-sticky children for session %s", workerID, input.SessionID) - - var results []StickyOutput - - // Spawn child workflows without sticky flag (may run on different workers) - for i := 1; i <= 3; i++ { - log.Printf("[Worker %s] Spawning regular child %d", workerID, i) - - childResult, err := ctx.SpawnWorkflow("sticky-child-demo", StickyInput{ - SessionID: input.SessionID, - Message: fmt.Sprintf("Non-sticky child %d message", i), - Step: i, - }, &worker.SpawnWorkflowOpts{ - Key: stringPtr(fmt.Sprintf("non-sticky-child-%s-%d", input.SessionID, i)), - // No Sticky flag - children may run on different workers - }) - if err != nil { - return nil, fmt.Errorf("failed to spawn regular child %d: %w", i, err) - } - - result, err := childResult.Result() - if err != nil { - return nil, fmt.Errorf("regular child %d failed: %w", i, err) - } - - var childOutput StickyOutput - if err := result.StepOutput("child-task", &childOutput); err != nil { - return nil, fmt.Errorf("failed to parse regular child %d output: %w", i, err) - } - - log.Printf("[Worker %s] Regular child %d completed on worker %s", workerID, i, childOutput.WorkerID) - results = append(results, childOutput) - } - - return results, nil - }) - - // Create multiple workers to demonstrate sticky behavior - worker1, err := client.NewWorker("sticky-worker-1", - hatchet.WithWorkflows(sessionWorkflow, childWorkflow, parentWorkflow, nonStickyWorkflow), - hatchet.WithSlots(10), - ) - if err != nil { - log.Fatalf("failed to create worker 1: %v", err) - } - - worker2, err := client.NewWorker("sticky-worker-2", - hatchet.WithWorkflows(sessionWorkflow, childWorkflow, parentWorkflow, nonStickyWorkflow), - hatchet.WithSlots(10), - ) - if err != nil { - log.Fatalf("failed to create worker 2: %v", err) - } - - // Start both workers - go func() { - log.Println("Starting worker 1...") - if err := worker1.StartBlocking(); err != nil { - log.Printf("Worker 1 failed: %v", err) - } - }() - - go func() { - log.Println("Starting worker 2...") - if err := worker2.StartBlocking(); err != nil { - log.Printf("Worker 2 failed: %v", err) - } - }() - - // Run demonstrations - go func() { - time.Sleep(3 * time.Second) - - log.Println("\n=== Session Workflow Demo ===") - log.Println("Running multi-step workflow that should stay on same worker...") - _, err := client.Run(context.Background(), "session-demo", StickyInput{ - SessionID: "session-001", - Message: "Initialize my session", - Step: 1, - }) - if err != nil { - log.Printf("Session workflow error: %v", err) - } - - time.Sleep(3 * time.Second) - - log.Println("\n=== Sticky Parent-Child Demo ===") - log.Println("Parent spawning sticky children - all should run on same worker...") - _, err = client.Run(context.Background(), "sticky-parent-demo", StickyInput{ - SessionID: "sticky-session-001", - Message: "Sticky parent message", - Step: 1, - }) - if err != nil { - log.Printf("Sticky parent workflow error: %v", err) - } - - time.Sleep(5 * time.Second) - - log.Println("\n=== Non-Sticky Comparison Demo ===") - log.Println("Parent spawning regular children - may distribute across workers...") - _, err = client.Run(context.Background(), "non-sticky-demo", StickyInput{ - SessionID: "regular-session-001", - Message: "Regular parent message", - Step: 1, - }) - if err != nil { - log.Printf("Non-sticky workflow error: %v", err) - } - }() - - log.Println("Starting sticky workers demo...") - log.Println("Features demonstrated:") - log.Println(" - Multi-step workflows running on same worker") - log.Println(" - Sticky child workflow execution") - log.Println(" - Worker ID access in task context") - log.Println(" - Session state maintenance across steps") - log.Println(" - Comparison with non-sticky execution") - - // Keep the main thread alive - select {} -} - -func stringPtr(s string) *string { - return &s -} diff --git a/examples/go/v1/streaming/main.go b/examples/go/v1/streaming/main.go deleted file mode 100644 index 395c04515..000000000 --- a/examples/go/v1/streaming/main.go +++ /dev/null @@ -1,246 +0,0 @@ -package main - -import ( - "context" - "fmt" - "log" - "net/http" - "strings" - "time" - - hatchet "github.com/hatchet-dev/hatchet/sdks/go" -) - -type StreamingInput struct { - Content string `json:"content"` - ChunkSize int `json:"chunk_size"` -} - -type StreamingOutput struct { - Message string `json:"message"` - TotalChunks int `json:"total_chunks"` -} - -const sampleText = ` -The Go programming language is an open source project to make programmers more productive. -Go is expressive, concise, clean, and efficient. Its concurrency mechanisms make it easy to -write programs that get the most out of multicore and networked machines, while its novel -type system enables flexible and modular program construction. Go compiles quickly to -machine code yet has the convenience of garbage collection and the power of run-time reflection. -It's a fast, statically typed, compiled language that feels like a dynamically typed, interpreted language. -` - -func main() { - // Create a new Hatchet client - client, err := hatchet.NewClient() - if err != nil { - log.Fatalf("failed to create hatchet client: %v", err) - } - - // Create a workflow for streaming - workflow := client.NewWorkflow("streaming-workflow") - - // Define the streaming task - workflow.NewTask("stream-content", func(ctx hatchet.Context, input StreamingInput) (StreamingOutput, error) { - content := input.Content - if content == "" { - content = sampleText - } - - chunkSize := input.ChunkSize - if chunkSize <= 0 { - chunkSize = 50 - } - - // Split content into chunks and stream them - chunks := createChunks(content, chunkSize) - - log.Printf("Starting to stream %d chunks...", len(chunks)) - - for i, chunk := range chunks { - // Stream each chunk - ctx.PutStream(fmt.Sprintf("Chunk %d: %s", i+1, strings.TrimSpace(chunk))) - - // Small delay between chunks to simulate processing - time.Sleep(300 * time.Millisecond) - } - - ctx.PutStream("Streaming completed!") - - return StreamingOutput{ - Message: "Content streaming finished", - TotalChunks: len(chunks), - }, nil - }) - - // Create a worker to run the workflow - worker, err := client.NewWorker("streaming-worker", hatchet.WithWorkflows(workflow)) - if err != nil { - log.Fatalf("failed to create worker: %v", err) - } - - // Start the worker in a goroutine - go func() { - log.Println("Starting streaming worker...") - if err := worker.StartBlocking(); err != nil { - log.Printf("worker failed: %v", err) - } - }() - - // Wait a moment for the worker to start - time.Sleep(2 * time.Second) - - // Start HTTP server to demonstrate streaming - http.HandleFunc("/stream", func(w http.ResponseWriter, r *http.Request) { - ctx := context.Background() - - // Set headers for streaming response - w.Header().Set("Content-Type", "text/plain; charset=utf-8") - w.Header().Set("Cache-Control", "no-cache") - w.Header().Set("Connection", "keep-alive") - w.Header().Set("Access-Control-Allow-Origin", "*") - - // Run the streaming workflow - workflowRun, err := client.RunNoWait(ctx, "streaming-workflow", StreamingInput{ - Content: sampleText, - ChunkSize: 80, - }) - if err != nil { - http.Error(w, fmt.Sprintf("failed to run workflow: %v", err), http.StatusInternalServerError) - return - } - - // Subscribe to the stream - stream, err := client.Runs().SubscribeToStream(ctx, workflowRun.RunId) - if err != nil { - http.Error(w, fmt.Sprintf("failed to subscribe to stream: %v", err), http.StatusInternalServerError) - return - } - - // Stream the content to the HTTP response - flusher, ok := w.(http.Flusher) - if !ok { - http.Error(w, "streaming not supported", http.StatusInternalServerError) - return - } - - for message := range stream { - fmt.Fprintf(w, "data: %s\n\n", message) - flusher.Flush() - } - }) - - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - html := ` - - -
-Click the button below to start streaming content from a Hatchet workflow:
- - - -