Merge branch 'main' into feat-durable-execution

This commit is contained in:
mrkaye97
2026-02-26 14:21:53 -05:00
49 changed files with 1516 additions and 960 deletions
+1 -1
View File
@@ -63,7 +63,7 @@ jobs:
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version-file: go.mod
cache: true
+1 -1
View File
@@ -36,7 +36,7 @@ jobs:
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Set up Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
+1 -1
View File
@@ -32,7 +32,7 @@ jobs:
run: git fetch --force --tags
- name: Set up Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
cache: true
+1 -1
View File
@@ -17,7 +17,7 @@ jobs:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
+1 -1
View File
@@ -8,7 +8,7 @@ jobs:
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
- uses: actions/setup-python@a309ff8b426b58ec0e2a45f0f869d46889d02405 # v6.2.0
+1 -1
View File
@@ -27,7 +27,7 @@ jobs:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
+1 -1
View File
@@ -25,7 +25,7 @@ jobs:
- name: Checkout
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
- name: Compile Go SDK examples
+1 -1
View File
@@ -77,7 +77,7 @@ jobs:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
+1 -1
View File
@@ -64,7 +64,7 @@ jobs:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
+1 -1
View File
@@ -104,7 +104,7 @@ jobs:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
+9 -9
View File
@@ -28,7 +28,7 @@ jobs:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
@@ -58,7 +58,7 @@ jobs:
steps:
- uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
@@ -84,7 +84,7 @@ jobs:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
@@ -131,7 +131,7 @@ jobs:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
@@ -220,7 +220,7 @@ jobs:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
@@ -315,7 +315,7 @@ jobs:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
@@ -352,7 +352,7 @@ jobs:
fetch-tags: true
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
@@ -487,7 +487,7 @@ jobs:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
@@ -562,7 +562,7 @@ jobs:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Setup Go
uses: actions/setup-go@7a3fe6cf4cb3a834922a1244abfce67bcef6a0c5 # v6.2.0
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
+79 -130
View File
@@ -2,7 +2,6 @@ package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
@@ -11,19 +10,12 @@ import (
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
)
type WorkflowInput struct {
ProcessID string `json:"process_id"`
}
type StepOutput struct {
StepName string `json:"step_name"`
RandomNumber int `json:"random_number"`
ProcessedAt string `json:"processed_at"`
RandomNumber int `json:"random_number"`
}
type SumOutput struct {
Total int `json:"total"`
Summary string `json:"summary"`
type RandomSum struct {
Sum int `json:"sum"`
}
func main() {
@@ -33,161 +25,121 @@ func main() {
}
// > Create a workflow
workflow := client.NewWorkflow("conditional-workflow")
workflow := client.NewWorkflow("TaskConditionWorkflow")
// Initial task
// > Add base task
start := workflow.NewTask("start", func(ctx hatchet.Context, input WorkflowInput) (StepOutput, error) {
randomNum := rand.Intn(100) + 1 //nolint:gosec // This is a demo
log.Printf("Starting workflow for process %s with random number: %d", input.ProcessID, randomNum)
return StepOutput{
StepName: "start",
RandomNumber: randomNum,
ProcessedAt: time.Now().Format(time.RFC3339),
}, nil
start := workflow.NewTask("start", func(ctx hatchet.Context, _ any) (StepOutput, error) {
return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
})
// > Add wait for sleep
waitForSleep := workflow.NewTask("wait-for-sleep", func(ctx hatchet.Context, input WorkflowInput) (StepOutput, error) {
return StepOutput{
RandomNumber: rand.Intn(100) + 1,
}, nil
waitForSleep := workflow.NewTask("wait-for-sleep", func(ctx hatchet.Context, _ any) (StepOutput, error) {
return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
},
hatchet.WithParents(start),
hatchet.WithWaitFor(hatchet.SleepCondition(10*time.Second)),
)
// > Add skip condition override
_ = workflow.NewTask("skip-with-multiple-parents", func(ctx hatchet.Context, _ any) (StepOutput, error) {
return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
},
hatchet.WithParents(start, waitForSleep),
hatchet.WithSkipIf(hatchet.ParentCondition(start, "output.random_number > 0")),
)
// > Add skip on event
// Task that waits for either 10 seconds or a user event
skipOnEvent := workflow.NewTask("skip-on-event", func(ctx hatchet.Context, input WorkflowInput) (StepOutput, error) {
log.Printf("Skip on event task completed for process %s", input.ProcessID)
return StepOutput{
StepName: "skip-on-event",
RandomNumber: rand.Intn(50) + 1, //nolint:gosec // This is a demo
ProcessedAt: time.Now().Format(time.RFC3339),
}, nil
skipOnEvent := workflow.NewTask("skip-on-event", func(ctx hatchet.Context, _ any) (StepOutput, error) {
return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
},
hatchet.WithParents(start),
hatchet.WithWaitFor(hatchet.SleepCondition(10*time.Second)),
hatchet.WithSkipIf(hatchet.UserEventCondition("process:skip", "true")),
)
// > Add wait for event
// Task that might be skipped based on external event
skipableTask := workflow.NewTask("skipable-task", func(ctx hatchet.Context, input WorkflowInput) (StepOutput, error) {
log.Printf("Skipable task executing for process %s", input.ProcessID)
return StepOutput{
StepName: "skipable-task",
RandomNumber: rand.Intn(10) + 1, //nolint:gosec // This is a demo
ProcessedAt: time.Now().Format(time.RFC3339),
}, nil
},
hatchet.WithParents(start),
hatchet.WithWaitFor(hatchet.SleepCondition(3*time.Second)),
hatchet.WithSkipIf(hatchet.UserEventCondition("process:skip", "true")),
hatchet.WithWaitFor(hatchet.SleepCondition(30*time.Second)),
hatchet.WithSkipIf(hatchet.UserEventCondition("skip_on_event:skip", "")),
)
// > Add branching
// Left branch - only runs if start's random number <= 50
leftBranch := workflow.NewTask("left-branch", func(ctx hatchet.Context, input WorkflowInput) (StepOutput, error) {
log.Printf("Left branch executing for process %s", input.ProcessID)
return StepOutput{
StepName: "left-branch",
RandomNumber: rand.Intn(25) + 1, //nolint:gosec // This is a demo
ProcessedAt: time.Now().Format(time.RFC3339),
}, nil
leftBranch := workflow.NewTask("left-branch", func(ctx hatchet.Context, _ any) (StepOutput, error) {
return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
},
hatchet.WithParents(waitForSleep),
hatchet.WithSkipIf(hatchet.ParentCondition(start, "output.randomNumber > 50")),
hatchet.WithSkipIf(hatchet.ParentCondition(waitForSleep, "output.random_number > 50")),
)
// Right branch - only runs if start's random number > 50
rightBranch := workflow.NewTask("right-branch", func(ctx hatchet.Context, input WorkflowInput) (StepOutput, error) {
log.Printf("Right branch executing for process %s", input.ProcessID)
return StepOutput{
StepName: "right-branch",
RandomNumber: rand.Intn(25) + 26, //nolint:gosec // This is a demo
ProcessedAt: time.Now().Format(time.RFC3339),
}, nil
rightBranch := workflow.NewTask("right-branch", func(ctx hatchet.Context, _ any) (StepOutput, error) {
return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
},
hatchet.WithParents(waitForSleep),
hatchet.WithSkipIf(hatchet.ParentCondition(start, "output.randomNumber <= 50")),
hatchet.WithSkipIf(hatchet.ParentCondition(waitForSleep, "output.random_number <= 50")),
)
// > Add wait for event
waitForEvent := workflow.NewTask("wait-for-event", func(ctx hatchet.Context, _ any) (StepOutput, error) {
return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
},
hatchet.WithParents(start),
hatchet.WithWaitFor(hatchet.OrCondition(
hatchet.SleepCondition(1*time.Minute),
hatchet.UserEventCondition("wait_for_event:start", ""),
)),
)
// Final aggregation task
// > Add sum
_ = workflow.NewTask("summarize", func(ctx hatchet.Context, input WorkflowInput) (SumOutput, error) {
var total int
var summary string
// Get start output
var startOutput StepOutput
if err := ctx.ParentOutput(start, &startOutput); err != nil {
return SumOutput{}, err
}
total += startOutput.RandomNumber
summary = fmt.Sprintf("Start: %d", startOutput.RandomNumber)
// Get wait for sleep output
var waitForSleepOutput StepOutput
if err := ctx.ParentOutput(waitForSleep, &waitForSleepOutput); err != nil {
return SumOutput{}, err
}
total += waitForSleepOutput.RandomNumber
summary += fmt.Sprintf(", Wait for sleep: %d", waitForSleepOutput.RandomNumber)
// Get skip on event output
var skipOnEventOutput StepOutput
if err := ctx.ParentOutput(skipOnEvent, &skipOnEventOutput); err != nil {
return SumOutput{}, err
}
total += skipOnEventOutput.RandomNumber
summary += fmt.Sprintf(", Skip on event: %d", skipOnEventOutput.RandomNumber)
// Try to get left branch output (might be skipped)
var leftOutput StepOutput
if err := ctx.ParentOutput(leftBranch, &leftOutput); err == nil {
total += leftOutput.RandomNumber
summary += fmt.Sprintf(", Left: %d", leftOutput.RandomNumber)
} else {
summary += ", Left: skipped"
_ = workflow.NewTask("sum", func(ctx hatchet.Context, _ any) (RandomSum, error) {
var startOut StepOutput
err := ctx.ParentOutput(start, &startOut)
if err != nil {
return RandomSum{}, err
}
// Try to get right branch output (might be skipped)
var rightOutput StepOutput
if err := ctx.ParentOutput(rightBranch, &rightOutput); err == nil {
total += rightOutput.RandomNumber
summary += fmt.Sprintf(", Right: %d", rightOutput.RandomNumber)
} else {
summary += ", Right: skipped"
var waitForEventOut StepOutput
err = ctx.ParentOutput(waitForEvent, &waitForEventOut)
if err != nil {
return RandomSum{}, err
}
// Try to get skipable task output (might be skipped)
var skipableOutput StepOutput
if err := ctx.ParentOutput(skipableTask, &skipableOutput); err == nil {
total += skipableOutput.RandomNumber
summary += fmt.Sprintf(", Skipable: %d", skipableOutput.RandomNumber)
} else {
summary += ", Skipable: skipped"
var waitForSleepOut StepOutput
err = ctx.ParentOutput(waitForSleep, &waitForSleepOut)
if err != nil {
return RandomSum{}, err
}
log.Printf("Final summary for process %s: total=%d, %s", input.ProcessID, total, summary)
total := startOut.RandomNumber + waitForEventOut.RandomNumber + waitForSleepOut.RandomNumber
return SumOutput{
Total: total,
Summary: summary,
}, nil
if !ctx.WasSkipped(skipOnEvent) {
var out StepOutput
err = ctx.ParentOutput(skipOnEvent, &out)
if err == nil {
total += out.RandomNumber
}
}
if !ctx.WasSkipped(leftBranch) {
var out StepOutput
err = ctx.ParentOutput(leftBranch, &out)
if err == nil {
total += out.RandomNumber
}
}
if !ctx.WasSkipped(rightBranch) {
var out StepOutput
err = ctx.ParentOutput(rightBranch, &out)
if err == nil {
total += out.RandomNumber
}
}
return RandomSum{Sum: total}, nil
}, hatchet.WithParents(
start,
waitForSleep,
waitForEvent,
skipOnEvent,
leftBranch,
rightBranch,
skipableTask,
))
worker, err := client.NewWorker("conditional-worker", hatchet.WithWorkflows(workflow))
worker, err := client.NewWorker("dag-worker", hatchet.WithWorkflows(workflow))
if err != nil {
log.Fatalf("failed to create worker: %v", err)
}
@@ -202,10 +154,7 @@ func main() {
}
}()
// Run the workflow
_, err = client.Run(context.Background(), "conditional-workflow", WorkflowInput{
ProcessID: "demo-process-1",
})
_, err = client.Run(context.Background(), "TaskConditionWorkflow", nil)
if err != nil {
log.Fatalf("failed to run workflow: %v", err)
}
@@ -0,0 +1,25 @@
from hatchet_sdk.runnables.task import Depends
from hatchet_sdk import Context
from hatchet_sdk.runnables.types import EmptyModel
from typing import Annotated, TypeAlias
async def async_dep(input: EmptyModel, ctx: Context) -> bool:
return True
def sync_dep(input: EmptyModel, ctx: Context) -> bool:
return True
AsyncDepNoTypeAlias = Annotated[bool, Depends(async_dep)]
AsyncDepTypeAlias: TypeAlias = Annotated[bool, Depends(async_dep)]
AsyncDepTypeSyntax: TypeAlias = (
AsyncDepTypeAlias # python <3.12 doesn't support `type` syntax for type alias so we use type alias again
)
SyncDepNoTypeAlias = Annotated[bool, Depends(sync_dep)]
SyncDepTypeAlias: TypeAlias = Annotated[bool, Depends(sync_dep)]
SyncDepTypeSyntax: TypeAlias = (
SyncDepTypeAlias # python <3.12 doesn't support `type` syntax for type alias so we use type alias again
)
@@ -0,0 +1,21 @@
from hatchet_sdk.runnables.task import Depends
from hatchet_sdk import Context
from hatchet_sdk.runnables.types import EmptyModel
from typing import Annotated, TypeAlias
async def async_dep(input: EmptyModel, ctx: Context) -> bool:
return True
def sync_dep(input: EmptyModel, ctx: Context) -> bool:
return True
AsyncDepNoTypeAlias = Annotated[bool, Depends(async_dep)]
AsyncDepTypeAlias: TypeAlias = Annotated[bool, Depends(async_dep)]
type AsyncDepTypeSyntax = Annotated[bool, Depends(async_dep)]
SyncDepNoTypeAlias = Annotated[bool, Depends(sync_dep)]
SyncDepTypeAlias: TypeAlias = Annotated[bool, Depends(sync_dep)]
type SyncDepTypeSyntax = Annotated[bool, Depends(sync_dep)]
@@ -1,5 +1,6 @@
from contextlib import asynccontextmanager, contextmanager
from typing import Annotated, AsyncGenerator, Generator
import sys
from pydantic import BaseModel
@@ -14,6 +15,46 @@ ASYNC_CM_DEPENDENCY_VALUE = "async_cm_dependency_value"
CHAINED_CM_VALUE = "chained_cm_value"
CHAINED_ASYNC_CM_VALUE = "chained_async_cm_value"
if sys.version_info >= (3, 12):
from examples.dependency_injection.dependency_annotations312 import (
AsyncDepNoTypeAlias,
AsyncDepTypeAlias,
SyncDepNoTypeAlias,
AsyncDepTypeSyntax,
SyncDepTypeAlias,
SyncDepTypeSyntax,
)
else:
from examples.dependency_injection.dependency_annotations310 import (
AsyncDepNoTypeAlias,
AsyncDepTypeAlias,
SyncDepNoTypeAlias,
AsyncDepTypeSyntax,
SyncDepTypeAlias,
SyncDepTypeSyntax,
)
@hatchet.task()
async def task_with_type_aliases(
_i: EmptyModel,
ctx: Context,
async_dep_no_type_alias: AsyncDepNoTypeAlias,
async_dep_type_alias: AsyncDepTypeAlias,
async_dep_type_syntax: AsyncDepTypeSyntax,
sync_dep_no_type_alias: SyncDepNoTypeAlias,
sync_dep_type_alias: SyncDepTypeAlias,
sync_dep_type_syntax: SyncDepTypeSyntax,
) -> dict[str, bool]:
return {
"async_dep_no_type_alias": async_dep_no_type_alias,
"async_dep_type_alias": async_dep_type_alias,
"async_dep_type_syntax": async_dep_type_syntax,
"sync_dep_no_type_alias": sync_dep_no_type_alias,
"sync_dep_type_alias": sync_dep_type_alias,
"sync_dep_type_syntax": sync_dep_type_syntax,
}
# > Declare dependencies
async def async_dep(input: EmptyModel, ctx: Context) -> str:
@@ -229,6 +270,7 @@ def main() -> None:
sync_task_with_dependencies,
durable_async_task_with_dependencies,
di_workflow,
task_with_type_aliases,
],
)
worker.start()
+20 -4
View File
@@ -473,14 +473,14 @@ setuptools = "*"
[[package]]
name = "hatchet-sdk"
version = "1.25.2"
version = "1.26.0"
description = "This is the official Python SDK for Hatchet, a distributed, fault-tolerant task queue. The SDK allows you to easily integrate Hatchet's task scheduling and workflow orchestration capabilities into your Python applications."
optional = false
python-versions = "<4.0,>=3.10"
groups = ["main"]
files = [
{file = "hatchet_sdk-1.25.2-py3-none-any.whl", hash = "sha256:057964f451636a46f2365e550a8eeebefc75f13daf412a84ec739874b6ae4191"},
{file = "hatchet_sdk-1.25.2.tar.gz", hash = "sha256:879f7c0e2e20cb17e58df787bd9f160870a592567924d0bec673ab7df071632a"},
{file = "hatchet_sdk-1.26.0-py3-none-any.whl", hash = "sha256:77da522bc41d7798014b36180565f2252d19d0d98fefdde165fb1e2ef97fa936"},
{file = "hatchet_sdk-1.26.0.tar.gz", hash = "sha256:a60e86954f6ecb96367e3b84727ddc9c3f5836ca25f719e8b41ae39aca9fcb69"},
]
[package.dependencies]
@@ -493,6 +493,7 @@ pydantic = ">=2.6.3,<3.0.0"
pydantic-settings = ">=2.7.1,<3.0.0"
python-dateutil = ">=2.9.0.post0,<3.0.0"
tenacity = ">=8.4.1"
typing-inspection = ">=0.1.0"
urllib3 = ">=2.6.0,<3.0.0"
[package.extras]
@@ -1007,6 +1008,21 @@ files = [
{file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"},
]
[[package]]
name = "typing-inspection"
version = "0.4.2"
description = "Runtime typing introspection tools"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "typing_inspection-0.4.2-py3-none-any.whl", hash = "sha256:4ed1cacbdc298c220f1bd249ed5287caa16f34d44ef4e9c3d0cbad5b521545e7"},
{file = "typing_inspection-0.4.2.tar.gz", hash = "sha256:ba561c48a67c5958007083d386c3295464928b01faa735ab8547c5692e87f464"},
]
[package.dependencies]
typing-extensions = ">=4.12.0"
[[package]]
name = "urllib3"
version = "2.6.3"
@@ -1125,4 +1141,4 @@ propcache = ">=0.2.0"
[metadata]
lock-version = "2.1"
python-versions = "^3.10"
content-hash = "9f8a336bf99bd769d22743dcec55bfc1d35df37c0bbfa544f3a8178ea4ad08df"
content-hash = "cbac77363268f7b45cc0e9274f455e8e88b6aa8f220d6f0450839eecc7b7aa39"
+1 -1
View File
@@ -8,7 +8,7 @@ package-mode = false
[tool.poetry.dependencies]
python = "^3.10"
hatchet-sdk = "1.25.2"
hatchet-sdk = "1.26.0"
[build-system]
+2
View File
@@ -28,6 +28,7 @@ from examples.dependency_injection.worker import (
di_workflow,
durable_async_task_with_dependencies,
sync_task_with_dependencies,
task_with_type_aliases,
)
from examples.dict_input.worker import say_hello_unsafely
from examples.durable.worker import (
@@ -118,6 +119,7 @@ def main() -> None:
async_task_with_dependencies,
sync_task_with_dependencies,
durable_async_task_with_dependencies,
task_with_type_aliases,
say_hello,
say_hello_unsafely,
serde_workflow,
@@ -1,4 +1,5 @@
import { ConfirmDialog } from '@/components/v1/molecules/confirm-dialog';
import RelativeDate from '@/components/v1/molecules/relative-date';
import { Badge } from '@/components/v1/ui/badge';
import { Button } from '@/components/v1/ui/button';
import {
@@ -20,7 +21,7 @@ import {
} from '@/lib/api/generated/cloud/data-contracts';
import { useApiError } from '@/lib/hooks';
import queryClient from '@/query-client';
import { useMutation } from '@tanstack/react-query';
import { useMutation, useQuery } from '@tanstack/react-query';
import React, { useCallback, useEffect, useMemo, useState } from 'react';
interface SubscriptionProps {
@@ -48,6 +49,45 @@ export const Subscription: React.FC<SubscriptionProps> = ({
const { tenant } = useTenantDetails();
const { handleApiError } = useApiError({});
const [portalLoading, setPortalLoading] = useState(false);
const creditBalanceQuery = useQuery({
...queries.cloud.creditBalance(tenantId),
});
const creditBalance = useMemo(() => {
const balanceCents = creditBalanceQuery.data?.balanceCents ?? 0;
// Stripe customer balance is negative when the customer has credits.
if (balanceCents >= 0) {
return null;
}
const currencyCode = (creditBalanceQuery.data?.currency || 'USD')
.toUpperCase()
.slice(0, 3);
let formatted: string;
try {
formatted = new Intl.NumberFormat('en-US', {
style: 'currency',
currency: currencyCode,
}).format(Math.abs(balanceCents) / 100);
} catch {
formatted = `$${(Math.abs(balanceCents) / 100).toFixed(2)}`;
}
const description = creditBalanceQuery.data?.description?.trim();
const expires = creditBalanceQuery.data?.expiresAt;
return {
amount: formatted,
description,
expires,
};
}, [
creditBalanceQuery.data?.balanceCents,
creditBalanceQuery.data?.currency,
creditBalanceQuery.data?.description,
creditBalanceQuery.data?.expiresAt,
]);
const manageClicked = async () => {
try {
@@ -238,6 +278,35 @@ export const Subscription: React.FC<SubscriptionProps> = ({
{portalLoading ? <Spinner /> : 'Visit Billing Portal'}
</Button>
</div>
{creditBalance && (
<Card className="mt-4 border-2 border-emerald-500/30 bg-emerald-50 dark:bg-emerald-950/20">
<CardHeader className="pb-4">
<div className="flex items-start justify-between gap-4">
<div>
<CardTitle className="text-base">
Available Credit
</CardTitle>
<CardDescription className="mt-1 text-sm">
{creditBalance.description ||
'Applied to upcoming invoices.'}
</CardDescription>
</div>
<div className="text-right">
<div className="text-xl font-semibold text-foreground whitespace-nowrap">
{creditBalance.amount}
</div>
{creditBalance.expires && (
<p className="mt-1 text-sm text-muted-foreground whitespace-nowrap">
Expires{' '}
<RelativeDate date={creditBalance.expires} future />
</p>
)}
</div>
</div>
</CardHeader>
</Card>
)}
{/* Current Subscription Section */}
{currentPlanDetails && (
<div className="mt-6 mb-6">
@@ -49,6 +49,7 @@ import {
RemoveOrganizationMembersRequest,
RuntimeConfigActionsResponse,
TenantBillingState,
TenantCreditBalance,
TenantPaymentMethodList,
UpdateManagedWorkerRequest,
UpdateOrganizationRequest,
@@ -834,6 +835,23 @@ export class Api<
format: "json",
...params,
});
/**
* @description Get the Stripe credit balance for a tenant
*
* @tags Billing
* @name TenantCreditBalanceGet
* @summary Get the Stripe credit balance for a tenant
* @request GET:/api/v1/billing/tenants/{tenant}/credit-balance
* @secure
*/
tenantCreditBalanceGet = (tenant: string, params: RequestParams = {}) =>
this.request<TenantCreditBalance, APIErrors>({
path: `/api/v1/billing/tenants/${tenant}/credit-balance`,
method: "GET",
secure: true,
format: "json",
...params,
});
/**
* @description Get all feature flags for the tenant
*
@@ -359,6 +359,20 @@ export interface TenantPaymentMethod {
export type TenantPaymentMethodList = TenantPaymentMethod[];
export interface TenantCreditBalance {
/** The Stripe customer balance in cents. Negative means customer credit. */
balanceCents: number;
/** ISO currency code for the Stripe customer balance. */
currency: string;
/** Human-readable description for the active credit balance, if available. */
description?: string;
/**
* The timestamp at which the current credit balance is scheduled to expire.
* @format date-time
*/
expiresAt?: string;
}
export interface SubscriptionPlan {
/** The code of the plan. */
planCode: string;
+4
View File
@@ -32,6 +32,10 @@ export const queries = createQueryKeyStore({
queryKey: ['billing-state:get', tenant],
queryFn: async () => (await cloudApi.tenantBillingStateGet(tenant)).data,
}),
creditBalance: (tenant: string) => ({
queryKey: ['credit-balance:get', tenant],
queryFn: async () => (await cloudApi.tenantCreditBalanceGet(tenant)).data,
}),
paymentMethods: (tenant: string) => ({
queryKey: ['payment-methods:get', tenant],
+4 -4
View File
@@ -60,7 +60,7 @@ require (
go.opentelemetry.io/proto/otlp v1.9.0
go.uber.org/goleak v1.3.0
golang.org/x/time v0.14.0
google.golang.org/api v0.268.0
google.golang.org/api v0.269.0
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409
k8s.io/api v0.35.1
k8s.io/apimachinery v0.35.1
@@ -70,7 +70,7 @@ require (
require (
cel.dev/expr v0.25.1 // indirect
cloud.google.com/go/auth v0.18.1 // indirect
cloud.google.com/go/auth v0.18.2 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
cloud.google.com/go/compute/metadata v0.9.0 // indirect
dario.cat/mergo v1.0.2 // indirect
@@ -126,7 +126,7 @@ require (
github.com/google/gnostic-models v0.7.0 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/s2a-go v0.1.9 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.11 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.12 // indirect
github.com/googleapis/gax-go/v2 v2.17.0 // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 // indirect
@@ -210,7 +210,7 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/exaring/otelpgx v0.10.0
github.com/fsnotify/fsnotify v1.9.0
github.com/getsentry/sentry-go v0.42.0
github.com/getsentry/sentry-go v0.43.0
github.com/go-chi/chi v1.5.5
github.com/go-playground/validator/v10 v10.30.1
github.com/google/cel-go v0.27.0
+8 -8
View File
@@ -1,7 +1,7 @@
cel.dev/expr v0.25.1 h1:1KrZg61W6TWSxuNZ37Xy49ps13NUovb66QLprthtwi4=
cel.dev/expr v0.25.1/go.mod h1:hrXvqGP6G6gyx8UAHSHJ5RGk//1Oj5nXQ2NI02Nrsg4=
cloud.google.com/go/auth v0.18.1 h1:IwTEx92GFUo2pJ6Qea0EU3zYvKnTAeRCODxfA/G5UWs=
cloud.google.com/go/auth v0.18.1/go.mod h1:GfTYoS9G3CWpRA3Va9doKN9mjPGRS+v41jmZAhBzbrA=
cloud.google.com/go/auth v0.18.2 h1:+Nbt5Ev0xEqxlNjd6c+yYUeosQ5TtEUaNcN/3FozlaM=
cloud.google.com/go/auth v0.18.2/go.mod h1:xD+oY7gcahcu7G2SG2DsBerfFxgPAJz17zz2joOFF3M=
cloud.google.com/go/auth/oauth2adapt v0.2.8 h1:keo8NaayQZ6wimpNSmW5OPc283g65QNIiLpZnkHRbnc=
cloud.google.com/go/auth/oauth2adapt v0.2.8/go.mod h1:XQ9y31RkqZCcwJWNSx2Xvric3RrU88hAYYbjDWYDL+c=
cloud.google.com/go/compute/metadata v0.9.0 h1:pDUj4QMoPejqq20dK0Pg2N4yG9zIkYGdBtwLoEkH9Zs=
@@ -145,8 +145,8 @@ github.com/gabriel-vasile/mimetype v1.4.12 h1:e9hWvmLYvtp846tLHam2o++qitpguFiYCK
github.com/gabriel-vasile/mimetype v1.4.12/go.mod h1:d+9Oxyo1wTzWdyVUPMmXFvp4F9tea18J8ufA774AB3s=
github.com/getkin/kin-openapi v0.133.0 h1:pJdmNohVIJ97r4AUFtEXRXwESr8b0bD721u/Tz6k8PQ=
github.com/getkin/kin-openapi v0.133.0/go.mod h1:boAciF6cXk5FhPqe/NQeBTeenbjqU4LhWBf09ILVvWE=
github.com/getsentry/sentry-go v0.42.0 h1:eeFMACuZTbUQf90RE8dE4tXeSe4CZyfvR1MBL7RLEt8=
github.com/getsentry/sentry-go v0.42.0/go.mod h1:eRXCoh3uvmjQLY6qu63BjUZnaBu5L5WhMV1RwYO8W5s=
github.com/getsentry/sentry-go v0.43.0 h1:XbXLpFicpo8HmBDaInk7dum18G9KSLcjZiyUKS+hLW4=
github.com/getsentry/sentry-go v0.43.0/go.mod h1:XDotiNZbgf5U8bPDUAfvcFmOnMQQceESxyKaObSssW0=
github.com/go-chi/chi v1.5.5 h1:vOB/HbEMt9QqBqErz07QehcOKHaWFtuj87tTDVz2qXE=
github.com/go-chi/chi v1.5.5/go.mod h1:C9JqLr3tIYjDOZpzn+BCuxY8z8vmca43EeMgyZt7irw=
github.com/go-co-op/gocron/v2 v2.19.1 h1:B4iLeA0NB/2iO3EKQ7NfKn5KsQgZfjb2fkvoZJU3yBI=
@@ -212,8 +212,8 @@ github.com/google/s2a-go v0.1.9 h1:LGD7gtMgezd8a/Xak7mEWL0PjoTQFvpRudN895yqKW0=
github.com/google/s2a-go v0.1.9/go.mod h1:YA0Ei2ZQL3acow2O62kdp9UlnvMmU7kA6Eutn0dXayM=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/enterprise-certificate-proxy v0.3.11 h1:vAe81Msw+8tKUxi2Dqh/NZMz7475yUvmRIkXr4oN2ao=
github.com/googleapis/enterprise-certificate-proxy v0.3.11/go.mod h1:RFV7MUdlb7AgEq2v7FmMCfeSMCllAzWxFgRdusoGks8=
github.com/googleapis/enterprise-certificate-proxy v0.3.12 h1:Fg+zsqzYEs1ZnvmcztTYxhgCBsx3eEhEwQ1W/lHq/sQ=
github.com/googleapis/enterprise-certificate-proxy v0.3.12/go.mod h1:vqVt9yG9480NtzREnTlmGSBmFrA+bzb0yl0TxoBQXOg=
github.com/googleapis/gax-go/v2 v2.17.0 h1:RksgfBpxqff0EZkDWYuz9q/uWsTVz+kf43LsZ1J6SMc=
github.com/googleapis/gax-go/v2 v2.17.0/go.mod h1:mzaqghpQp4JDh3HvADwrat+6M3MOIDp5YKHhb9PAgDY=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
@@ -573,8 +573,8 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/api v0.268.0 h1:hgA3aS4lt9rpF5RCCkX0Q2l7DvHgvlb53y4T4u6iKkA=
google.golang.org/api v0.268.0/go.mod h1:HXMyMH496wz+dAJwD/GkAPLd3ZL33Kh0zEG32eNvy9w=
google.golang.org/api v0.269.0 h1:qDrTOxKUQ/P0MveH6a7vZ+DNHxJQjtGm/uvdbdGXCQg=
google.golang.org/api v0.269.0/go.mod h1:N8Wpcu23Tlccl0zSHEkcAZQKDLdquxK+l9r2LkwAauE=
google.golang.org/genproto v0.0.0-20260128011058-8636f8732409 h1:VQZ/yAbAtjkHgH80teYd2em3xtIkkHd7ZhqfH2N9CsM=
google.golang.org/genproto v0.0.0-20260128011058-8636f8732409/go.mod h1:rxKD3IEILWEu3P44seeNOAwZN4SaoKaQ/2eTg4mM6EM=
google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 h1:merA0rdPeUV3YIIfHHcH4qBkiQAc1nfCKSI7lB4cV2M=
+76
View File
@@ -10,6 +10,7 @@ import (
"sync"
"time"
"github.com/google/uuid"
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -18,6 +19,7 @@ import (
admincontracts "github.com/hatchet-dev/hatchet/internal/services/admin/contracts/workflows"
v1contracts "github.com/hatchet-dev/hatchet/internal/services/shared/proto/v1"
"github.com/hatchet-dev/hatchet/pkg/client/rest"
"github.com/hatchet-dev/hatchet/pkg/client/types"
"github.com/hatchet-dev/hatchet/pkg/config/client"
"github.com/hatchet-dev/hatchet/pkg/validator"
@@ -39,6 +41,38 @@ type WorkflowRun struct {
Options []RunOptFunc
}
func taskStatusFromProto(s v1contracts.RunStatus) rest.V1TaskStatus {
switch s {
case v1contracts.RunStatus_COMPLETED:
return rest.V1TaskStatusCOMPLETED
case v1contracts.RunStatus_CANCELLED:
return rest.V1TaskStatusCANCELLED
case v1contracts.RunStatus_FAILED:
return rest.V1TaskStatusFAILED
case v1contracts.RunStatus_RUNNING:
return rest.V1TaskStatusRUNNING
default:
return rest.V1TaskStatusQUEUED
}
}
type TaskRunDetails struct {
ExternalId uuid.UUID
ReadableId string
Status rest.V1TaskStatus
Output json.RawMessage
Error *string
}
type RunDetails struct {
ExternalId uuid.UUID
Status rest.V1TaskStatus
Input json.RawMessage
AdditionalMetadata json.RawMessage
TaskRuns map[string]*TaskRunDetails
Done bool
}
type AdminClient interface {
// Deprecated: PutWorkflow is part of the legacy v0 workflow definition system.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead. Migration guide: https://docs.hatchet.run/home/migration-guide-go
@@ -58,6 +92,8 @@ type AdminClient interface {
RunChildWorkflows(workflows []*RunChildWorkflowsOpts) ([]string, error)
PutRateLimit(key string, opts *types.RateLimitOpts) error
GetRunDetails(ctx context.Context, externalId uuid.UUID) (*RunDetails, error)
}
type DedupeViolationErr struct {
@@ -468,6 +504,46 @@ func (a *adminClientImpl) PutRateLimit(key string, opts *types.RateLimitOpts) er
return nil
}
func (a *adminClientImpl) GetRunDetails(ctx context.Context, externalId uuid.UUID) (*RunDetails, error) {
resp, err := a.v1Client.GetRunDetails(a.ctx.newContext(ctx), &v1contracts.GetRunDetailsRequest{
ExternalId: externalId.String(),
})
if err != nil {
return nil, fmt.Errorf("could not get run details: %w", err)
}
taskRuns := make(map[string]*TaskRunDetails, len(resp.GetTaskRuns()))
for readableId, detail := range resp.GetTaskRuns() {
var errStr *string
if detail.Error != nil {
errStr = detail.Error
}
externalId, err := uuid.Parse(detail.ExternalId)
if err != nil {
return nil, fmt.Errorf("could not parse task run external id: %w", err)
}
taskRuns[readableId] = &TaskRunDetails{
ExternalId: externalId,
ReadableId: detail.GetReadableId(),
Status: taskStatusFromProto(detail.GetStatus()),
Output: detail.GetOutput(),
Error: errStr,
}
}
return &RunDetails{
ExternalId: externalId,
Status: taskStatusFromProto(resp.GetStatus()),
Input: resp.GetInput(),
AdditionalMetadata: resp.GetAdditionalMetadata(),
TaskRuns: taskRuns,
Done: resp.GetDone(),
}, nil
}
func (a *adminClientImpl) getPutRequest(workflow *types.Workflow) (*admincontracts.PutWorkflowRequest, error) {
opts := &admincontracts.CreateWorkflowVersionOpts{
Name: workflow.Name,
+7 -1
View File
@@ -45,6 +45,8 @@ type EventClient interface {
PutLog(ctx context.Context, taskRunId, msg string, level *string, taskRetryCount *int32) error
PutLogWithTimestamp(ctx context.Context, taskRunId, msg string, level *string, taskRetryCount *int32, createdAt *timestamppb.Timestamp) error
PutStreamEvent(ctx context.Context, stepRunId string, message []byte, options ...StreamEventOption) error
}
@@ -193,8 +195,12 @@ func (a *eventClientImpl) BulkPush(ctx context.Context, payload []EventWithAddit
}
func (a *eventClientImpl) PutLog(ctx context.Context, taskRunId, msg string, level *string, taskRetryCount *int32) error {
return a.PutLogWithTimestamp(ctx, taskRunId, msg, level, taskRetryCount, timestamppb.Now())
}
func (a *eventClientImpl) PutLogWithTimestamp(ctx context.Context, taskRunId, msg string, level *string, taskRetryCount *int32, createdAt *timestamppb.Timestamp) error {
_, err := a.client.PutLog(a.ctx.newContext(ctx), &eventcontracts.PutLogRequest{
CreatedAt: timestamppb.Now(),
CreatedAt: createdAt,
TaskRunExternalId: taskRunId,
Message: msg,
Level: level,
+1 -1
View File
@@ -589,7 +589,7 @@ func (r *OLAPRepositoryImpl) ReadTaskRunData(ctx context.Context, tenantId uuid.
workflowRunId = taskRun.ExternalID
}
externalIds := make([]uuid.UUID, 0)
externalIds := []uuid.UUID{workflowRunId}
if taskRun.OutputEventExternalID != nil {
externalIds = append(externalIds, *taskRun.OutputEventExternalID)
+13
View File
@@ -30,6 +30,10 @@ type RunsClient interface {
// Deprecated: Use Get instead.
GetDetails(ctx context.Context, runId string) (*rest.V1WorkflowRunGetResponse, error)
// GetV1Details retrieves detailed information about a workflow run via gRPC,
// including task-level output, errors, and status.
GetV1Details(ctx context.Context, runId uuid.UUID) (*client.RunDetails, error)
// List retrieves a collection of workflow runs based on the provided parameters.
List(ctx context.Context, opts rest.V1WorkflowRunListParams) (*rest.V1WorkflowRunListResponse, error)
@@ -104,6 +108,15 @@ func (r *runsClientImpl) GetDetails(ctx context.Context, runId string) (*rest.V1
)
}
// Deprecated: GetV1Details is part of the old generics-based v1 Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead. Migration guide: https://docs.hatchet.run/home/migration-guide-go
//
// GetV1Details retrieves detailed information about a workflow run via gRPC,
// including task-level output, errors, and status.
func (r *runsClientImpl) GetV1Details(ctx context.Context, runId uuid.UUID) (*client.RunDetails, error) {
return r.v0Client.Admin().GetRunDetails(ctx, runId)
}
// Deprecated: List is part of the old generics-based v1 Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead. Migration guide: https://docs.hatchet.run/home/migration-guide-go
//
+51 -4
View File
@@ -9,6 +9,7 @@ import (
"time"
"github.com/rs/zerolog"
"google.golang.org/protobuf/types/known/timestamppb"
v1 "github.com/hatchet-dev/hatchet/internal/services/shared/proto/v1"
"github.com/hatchet-dev/hatchet/pkg/client"
@@ -87,6 +88,8 @@ type HatchetContext interface {
ParentOutput(parent create.NamedTask, output interface{}) error
WasSkipped(parent create.NamedTask) bool
client() client.Client
action() *client.Action
@@ -252,6 +255,20 @@ func (h *hatchetContext) ParentOutput(parent create.NamedTask, output interface{
return fmt.Errorf("parent %s not found in action payload", stepName)
}
func (h *hatchetContext) WasSkipped(parent create.NamedTask) bool {
stepName := parent.GetName()
if val, ok := h.stepData.Parents[stepName]; ok {
if skipped, ok := val["skipped"]; ok {
if skippedBool, ok := skipped.(bool); ok {
return skippedBool
}
}
}
return false
}
// Deprecated: TriggeredByEvent is an internal method used by the new Go SDK.
// Use the new Go SDK at github.com/hatchet-dev/hatchet/sdks/go instead of using this directly. Migration guide: https://docs.hatchet.run/home/migration-guide-go
func (h *hatchetContext) TriggeredByEvent() bool {
@@ -344,11 +361,41 @@ func (h *hatchetContext) Log(message string) {
message = string(runes[:10_000])
}
err := h.c.Event().PutLog(h, h.a.StepRunId, message, &infoLevel, &h.a.RetryCount)
stepRunId := h.a.StepRunId
retryCount := h.a.RetryCount
createdAt := timestamppb.Now()
if err != nil {
h.l.Err(err).Msg("could not put log")
}
go func() {
const maxRetries = 3
baseDelay := 100 * time.Millisecond
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
var err error
for attempt := range maxRetries + 1 {
if attempt > 0 {
delay := baseDelay * time.Duration(1<<(attempt-1))
select {
case <-ctx.Done():
h.l.Warn().Err(err).Msg("log delivery timed out, abandoning")
return
case <-time.After(delay):
}
}
err = h.c.Event().PutLogWithTimestamp(ctx, stepRunId, message, &infoLevel, &retryCount, createdAt)
if err == nil {
return
}
h.l.Warn().Err(err).Msgf("failed to put log (attempt %d/%d)", attempt+1, maxRetries+1)
}
h.l.Err(err).Msg("could not put log after all retries")
}()
}
// Deprecated: ReleaseSlot is an internal method used by the new Go SDK.
+4
View File
@@ -39,6 +39,10 @@ func (c *testHatchetContext) ParentOutput(task create.NamedTask, target interfac
return nil
}
func (c *testHatchetContext) WasSkipped(task create.NamedTask) bool {
return false
}
func (c *testHatchetContext) TriggeredByEvent() bool {
return false
}
+81 -131
View File
@@ -2,7 +2,6 @@ package main
import (
"context"
"fmt"
"log"
"math/rand"
"time"
@@ -11,19 +10,12 @@ import (
hatchet "github.com/hatchet-dev/hatchet/sdks/go"
)
type WorkflowInput struct {
ProcessID string `json:"process_id"`
}
type StepOutput struct {
StepName string `json:"step_name"`
RandomNumber int `json:"random_number"`
ProcessedAt string `json:"processed_at"`
RandomNumber int `json:"random_number"`
}
type SumOutput struct {
Total int `json:"total"`
Summary string `json:"summary"`
type RandomSum struct {
Sum int `json:"sum"`
}
func main() {
@@ -33,168 +25,129 @@ func main() {
}
// > Create a workflow
workflow := client.NewWorkflow("conditional-workflow")
workflow := client.NewWorkflow("TaskConditionWorkflow")
// !!
// Initial task
// > Add base task
start := workflow.NewTask("start", func(ctx hatchet.Context, input WorkflowInput) (StepOutput, error) {
randomNum := rand.Intn(100) + 1 //nolint:gosec // This is a demo
log.Printf("Starting workflow for process %s with random number: %d", input.ProcessID, randomNum)
return StepOutput{
StepName: "start",
RandomNumber: randomNum,
ProcessedAt: time.Now().Format(time.RFC3339),
}, nil
start := workflow.NewTask("start", func(ctx hatchet.Context, _ any) (StepOutput, error) {
return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
})
// !!
// > Add wait for sleep
waitForSleep := workflow.NewTask("wait-for-sleep", func(ctx hatchet.Context, input WorkflowInput) (StepOutput, error) {
return StepOutput{
RandomNumber: rand.Intn(100) + 1,
}, nil
waitForSleep := workflow.NewTask("wait-for-sleep", func(ctx hatchet.Context, _ any) (StepOutput, error) {
return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
},
hatchet.WithParents(start),
hatchet.WithWaitFor(hatchet.SleepCondition(10*time.Second)),
)
// !!
// > Add skip condition override
_ = workflow.NewTask("skip-with-multiple-parents", func(ctx hatchet.Context, _ any) (StepOutput, error) {
return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
},
hatchet.WithParents(start, waitForSleep),
hatchet.WithSkipIf(hatchet.ParentCondition(start, "output.random_number > 0")),
)
// !!
// > Add skip on event
// Task that waits for either 10 seconds or a user event
skipOnEvent := workflow.NewTask("skip-on-event", func(ctx hatchet.Context, input WorkflowInput) (StepOutput, error) {
log.Printf("Skip on event task completed for process %s", input.ProcessID)
return StepOutput{
StepName: "skip-on-event",
RandomNumber: rand.Intn(50) + 1, //nolint:gosec // This is a demo
ProcessedAt: time.Now().Format(time.RFC3339),
}, nil
skipOnEvent := workflow.NewTask("skip-on-event", func(ctx hatchet.Context, _ any) (StepOutput, error) {
return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
},
hatchet.WithParents(start),
hatchet.WithWaitFor(hatchet.SleepCondition(10*time.Second)),
hatchet.WithSkipIf(hatchet.UserEventCondition("process:skip", "true")),
)
// !!
// > Add wait for event
// Task that might be skipped based on external event
skipableTask := workflow.NewTask("skipable-task", func(ctx hatchet.Context, input WorkflowInput) (StepOutput, error) {
log.Printf("Skipable task executing for process %s", input.ProcessID)
return StepOutput{
StepName: "skipable-task",
RandomNumber: rand.Intn(10) + 1, //nolint:gosec // This is a demo
ProcessedAt: time.Now().Format(time.RFC3339),
}, nil
},
hatchet.WithParents(start),
hatchet.WithWaitFor(hatchet.SleepCondition(3*time.Second)),
hatchet.WithSkipIf(hatchet.UserEventCondition("process:skip", "true")),
hatchet.WithWaitFor(hatchet.SleepCondition(30*time.Second)),
hatchet.WithSkipIf(hatchet.UserEventCondition("skip_on_event:skip", "")),
)
// !!
// > Add branching
// Left branch - only runs if start's random number <= 50
leftBranch := workflow.NewTask("left-branch", func(ctx hatchet.Context, input WorkflowInput) (StepOutput, error) {
log.Printf("Left branch executing for process %s", input.ProcessID)
return StepOutput{
StepName: "left-branch",
RandomNumber: rand.Intn(25) + 1, //nolint:gosec // This is a demo
ProcessedAt: time.Now().Format(time.RFC3339),
}, nil
leftBranch := workflow.NewTask("left-branch", func(ctx hatchet.Context, _ any) (StepOutput, error) {
return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
},
hatchet.WithParents(waitForSleep),
hatchet.WithSkipIf(hatchet.ParentCondition(start, "output.randomNumber > 50")),
hatchet.WithSkipIf(hatchet.ParentCondition(waitForSleep, "output.random_number > 50")),
)
// Right branch - only runs if start's random number > 50
rightBranch := workflow.NewTask("right-branch", func(ctx hatchet.Context, input WorkflowInput) (StepOutput, error) {
log.Printf("Right branch executing for process %s", input.ProcessID)
return StepOutput{
StepName: "right-branch",
RandomNumber: rand.Intn(25) + 26, //nolint:gosec // This is a demo
ProcessedAt: time.Now().Format(time.RFC3339),
}, nil
rightBranch := workflow.NewTask("right-branch", func(ctx hatchet.Context, _ any) (StepOutput, error) {
return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
},
hatchet.WithParents(waitForSleep),
hatchet.WithSkipIf(hatchet.ParentCondition(start, "output.randomNumber <= 50")),
hatchet.WithSkipIf(hatchet.ParentCondition(waitForSleep, "output.random_number <= 50")),
)
// !!
// > Add wait for event
waitForEvent := workflow.NewTask("wait-for-event", func(ctx hatchet.Context, _ any) (StepOutput, error) {
return StepOutput{RandomNumber: rand.Intn(100) + 1}, nil //nolint:gosec
},
hatchet.WithParents(start),
hatchet.WithWaitFor(hatchet.OrCondition(
hatchet.SleepCondition(1*time.Minute),
hatchet.UserEventCondition("wait_for_event:start", ""),
)),
)
// !!
// Final aggregation task
// > Add sum
_ = workflow.NewTask("summarize", func(ctx hatchet.Context, input WorkflowInput) (SumOutput, error) {
var total int
var summary string
// Get start output
var startOutput StepOutput
if err := ctx.ParentOutput(start, &startOutput); err != nil {
return SumOutput{}, err
}
total += startOutput.RandomNumber
summary = fmt.Sprintf("Start: %d", startOutput.RandomNumber)
// Get wait for sleep output
var waitForSleepOutput StepOutput
if err := ctx.ParentOutput(waitForSleep, &waitForSleepOutput); err != nil {
return SumOutput{}, err
}
total += waitForSleepOutput.RandomNumber
summary += fmt.Sprintf(", Wait for sleep: %d", waitForSleepOutput.RandomNumber)
// Get skip on event output
var skipOnEventOutput StepOutput
if err := ctx.ParentOutput(skipOnEvent, &skipOnEventOutput); err != nil {
return SumOutput{}, err
}
total += skipOnEventOutput.RandomNumber
summary += fmt.Sprintf(", Skip on event: %d", skipOnEventOutput.RandomNumber)
// Try to get left branch output (might be skipped)
var leftOutput StepOutput
if err := ctx.ParentOutput(leftBranch, &leftOutput); err == nil {
total += leftOutput.RandomNumber
summary += fmt.Sprintf(", Left: %d", leftOutput.RandomNumber)
} else {
summary += ", Left: skipped"
_ = workflow.NewTask("sum", func(ctx hatchet.Context, _ any) (RandomSum, error) {
var startOut StepOutput
err := ctx.ParentOutput(start, &startOut)
if err != nil {
return RandomSum{}, err
}
// Try to get right branch output (might be skipped)
var rightOutput StepOutput
if err := ctx.ParentOutput(rightBranch, &rightOutput); err == nil {
total += rightOutput.RandomNumber
summary += fmt.Sprintf(", Right: %d", rightOutput.RandomNumber)
} else {
summary += ", Right: skipped"
var waitForEventOut StepOutput
err = ctx.ParentOutput(waitForEvent, &waitForEventOut)
if err != nil {
return RandomSum{}, err
}
// Try to get skipable task output (might be skipped)
var skipableOutput StepOutput
if err := ctx.ParentOutput(skipableTask, &skipableOutput); err == nil {
total += skipableOutput.RandomNumber
summary += fmt.Sprintf(", Skipable: %d", skipableOutput.RandomNumber)
} else {
summary += ", Skipable: skipped"
var waitForSleepOut StepOutput
err = ctx.ParentOutput(waitForSleep, &waitForSleepOut)
if err != nil {
return RandomSum{}, err
}
log.Printf("Final summary for process %s: total=%d, %s", input.ProcessID, total, summary)
total := startOut.RandomNumber + waitForEventOut.RandomNumber + waitForSleepOut.RandomNumber
return SumOutput{
Total: total,
Summary: summary,
}, nil
if !ctx.WasSkipped(skipOnEvent) {
var out StepOutput
err = ctx.ParentOutput(skipOnEvent, &out)
if err == nil {
total += out.RandomNumber
}
}
if !ctx.WasSkipped(leftBranch) {
var out StepOutput
err = ctx.ParentOutput(leftBranch, &out)
if err == nil {
total += out.RandomNumber
}
}
if !ctx.WasSkipped(rightBranch) {
var out StepOutput
err = ctx.ParentOutput(rightBranch, &out)
if err == nil {
total += out.RandomNumber
}
}
return RandomSum{Sum: total}, nil
}, hatchet.WithParents(
start,
waitForSleep,
waitForEvent,
skipOnEvent,
leftBranch,
rightBranch,
skipableTask,
))
// !!
worker, err := client.NewWorker("conditional-worker", hatchet.WithWorkflows(workflow))
worker, err := client.NewWorker("dag-worker", hatchet.WithWorkflows(workflow))
if err != nil {
log.Fatalf("failed to create worker: %v", err)
}
@@ -209,10 +162,7 @@ func main() {
}
}()
// Run the workflow
_, err = client.Run(context.Background(), "conditional-workflow", WorkflowInput{
ProcessID: "demo-process-1",
})
_, err = client.Run(context.Background(), "TaskConditionWorkflow", nil)
if err != nil {
log.Fatalf("failed to run workflow: %v", err)
}
+11
View File
@@ -72,6 +72,17 @@ func (r *RunsClient) GetStatus(ctx context.Context, runId string) (*rest.V1TaskS
return resp.JSON200, nil
}
// GetDetails retrieves detailed information about a workflow run via gRPC,
// including task-level output, errors, and status.
func (r *RunsClient) GetDetails(ctx context.Context, runId uuid.UUID) (*client.RunDetails, error) {
resp, err := r.v0Client.Admin().GetRunDetails(ctx, runId)
if err != nil {
return nil, errors.Wrap(err, "failed to get workflow run details")
}
return resp, nil
}
// List retrieves a collection of workflow runs based on the provided parameters.
func (r *RunsClient) List(ctx context.Context, opts rest.V1WorkflowRunListParams) (*rest.V1TaskSummaryList, error) {
resp, err := r.api.V1WorkflowRunListWithResponse(
+12 -2
View File
@@ -655,7 +655,13 @@ func (w *workflowDeclarationImpl[I, O]) Dump() (*contracts.CreateWorkflowVersion
// Call the original function using reflection
fnValue := reflect.ValueOf(originalFn)
inputs := []reflect.Value{reflect.ValueOf(ctx), reflect.ValueOf(input)}
inputVal := reflect.ValueOf(input)
if !inputVal.IsValid() {
// input is a nil interface (e.g. I = any with null input),
// use a zero value of the function's expected parameter type
inputVal = reflect.Zero(fnValue.Type().In(1))
}
inputs := []reflect.Value{reflect.ValueOf(ctx), inputVal}
results := fnValue.Call(inputs)
// Handle errors
@@ -689,7 +695,11 @@ func (w *workflowDeclarationImpl[I, O]) Dump() (*contracts.CreateWorkflowVersion
// Call the original function using reflection
fnValue := reflect.ValueOf(originalFn)
inputs := []reflect.Value{reflect.ValueOf(durableCtx), reflect.ValueOf(input)}
inputVal := reflect.ValueOf(input)
if !inputVal.IsValid() {
inputVal = reflect.Zero(fnValue.Type().In(1))
}
inputs := []reflect.Value{reflect.ValueOf(durableCtx), inputVal}
results := fnValue.Call(inputs)
// Handle errors
+17
View File
@@ -5,6 +5,23 @@ All notable changes to Hatchet's Python SDK will be documented in this changelog
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.26.1] - 2026-02-25
### Added
- Adds `retry_429` to `TenacityConfig` (default: `False`) to optionally retry REST HTTP 429 responses.
- Adds `TooManyRequestsException` and maps REST HTTP 429 responses to it.
## [1.26.0] - 2026-02-25
### Fixed
- Fixes dependencies not working when using `type Dependency = Annotated[..., ...]` syntax for annotations on python version 3.12 and 3.13. Adds `typing-inspection` as a dependency.
### Changed
- Changes one function in the python SDK to use `inspect.iscoroutinefunction` instead of `asyncio.iscoroutinefunction` which is deprecated.
## [1.25.2] - 2026-02-19
### Fixed
+45 -1
View File
@@ -91,6 +91,50 @@ def patch_grpc_init_signature(content: str) -> str:
)
def patch_rest_429_exception(content: str) -> str:
"""Add TooManyRequestsException with 429 mapping to generated exceptions.py."""
# Insert class definition once, before RestTransportError.
if "class TooManyRequestsException" not in content:
new_class = (
"class TooManyRequestsException(ApiException):\n"
' """Exception for HTTP 429 Too Many Requests."""\n'
" pass\n"
)
pattern = r"(?m)^class RestTransportError\b"
content, n = re.subn(
pattern,
new_class + "\n\nclass RestTransportError",
content,
count=1,
)
if n != 1:
raise ValueError(
"patch_rest_429_exception: expected 'class RestTransportError' anchor not found"
)
# insert mapping once, before the 5xx check.
if "http_resp.status == 429" not in content:
pattern = r"(?m)^(?P<indent>[ \t]*)if 500 <= http_resp\.status <= 599:"
def _insert_429(m: re.Match[str]) -> str:
indent = m.group("indent")
return (
f"{indent}if http_resp.status == 429:\n"
f"{indent} raise TooManyRequestsException(http_resp=http_resp, body=body, data=data)\n"
f"\n"
f"{indent}if 500 <= http_resp.status <= 599:"
)
content, n = re.subn(pattern, _insert_429, content, count=1)
if n != 1:
raise ValueError(
"patch_rest_429_exception: expected 5xx mapping anchor not found"
)
return content
def patch_rest_transport_exceptions(content: str) -> str:
"""Insert typed REST transport exception classes into exceptions.py.
@@ -367,7 +411,7 @@ if __name__ == "__main__":
atomically_patch_file(
"hatchet_sdk/clients/rest/exceptions.py",
[patch_rest_transport_exceptions],
[patch_rest_transport_exceptions, patch_rest_429_exception],
)
atomically_patch_file(
"hatchet_sdk/clients/rest/rest.py",
@@ -0,0 +1,25 @@
from hatchet_sdk.runnables.task import Depends
from hatchet_sdk import Context
from hatchet_sdk.runnables.types import EmptyModel
from typing import Annotated, TypeAlias
async def async_dep(input: EmptyModel, ctx: Context) -> bool:
return True
def sync_dep(input: EmptyModel, ctx: Context) -> bool:
return True
AsyncDepNoTypeAlias = Annotated[bool, Depends(async_dep)]
AsyncDepTypeAlias: TypeAlias = Annotated[bool, Depends(async_dep)]
AsyncDepTypeSyntax: TypeAlias = (
AsyncDepTypeAlias # python <3.12 doesn't support `type` syntax for type alias so we use type alias again
)
SyncDepNoTypeAlias = Annotated[bool, Depends(sync_dep)]
SyncDepTypeAlias: TypeAlias = Annotated[bool, Depends(sync_dep)]
SyncDepTypeSyntax: TypeAlias = (
SyncDepTypeAlias # python <3.12 doesn't support `type` syntax for type alias so we use type alias again
)
@@ -0,0 +1,21 @@
from hatchet_sdk.runnables.task import Depends
from hatchet_sdk import Context
from hatchet_sdk.runnables.types import EmptyModel
from typing import Annotated, TypeAlias
async def async_dep(input: EmptyModel, ctx: Context) -> bool:
return True
def sync_dep(input: EmptyModel, ctx: Context) -> bool:
return True
AsyncDepNoTypeAlias = Annotated[bool, Depends(async_dep)]
AsyncDepTypeAlias: TypeAlias = Annotated[bool, Depends(async_dep)]
type AsyncDepTypeSyntax = Annotated[bool, Depends(async_dep)]
SyncDepNoTypeAlias = Annotated[bool, Depends(sync_dep)]
SyncDepTypeAlias: TypeAlias = Annotated[bool, Depends(sync_dep)]
type SyncDepTypeSyntax = Annotated[bool, Depends(sync_dep)]
@@ -12,6 +12,7 @@ from examples.dependency_injection.worker import (
di_workflow,
durable_async_task_with_dependencies,
sync_task_with_dependencies,
task_with_type_aliases,
)
from hatchet_sdk import EmptyModel
from hatchet_sdk.runnables.workflow import Standalone
@@ -62,3 +63,11 @@ async def test_di_workflows() -> None:
)
assert parsed.chained_dep == "chained_" + CHAINED_CM_VALUE
assert parsed.chained_async_dep == "chained_" + CHAINED_ASYNC_CM_VALUE
@pytest.mark.asyncio(loop_scope="session")
async def test_type_aliases() -> None:
result = await task_with_type_aliases.aio_run(EmptyModel())
assert result
for key in result:
assert result[key] is True
@@ -1,5 +1,6 @@
from contextlib import asynccontextmanager, contextmanager
from typing import Annotated, AsyncGenerator, Generator
import sys
from pydantic import BaseModel
@@ -14,6 +15,46 @@ ASYNC_CM_DEPENDENCY_VALUE = "async_cm_dependency_value"
CHAINED_CM_VALUE = "chained_cm_value"
CHAINED_ASYNC_CM_VALUE = "chained_async_cm_value"
if sys.version_info >= (3, 12):
from examples.dependency_injection.dependency_annotations312 import (
AsyncDepNoTypeAlias,
AsyncDepTypeAlias,
SyncDepNoTypeAlias,
AsyncDepTypeSyntax,
SyncDepTypeAlias,
SyncDepTypeSyntax,
)
else:
from examples.dependency_injection.dependency_annotations310 import (
AsyncDepNoTypeAlias,
AsyncDepTypeAlias,
SyncDepNoTypeAlias,
AsyncDepTypeSyntax,
SyncDepTypeAlias,
SyncDepTypeSyntax,
)
@hatchet.task()
async def task_with_type_aliases(
_i: EmptyModel,
ctx: Context,
async_dep_no_type_alias: AsyncDepNoTypeAlias,
async_dep_type_alias: AsyncDepTypeAlias,
async_dep_type_syntax: AsyncDepTypeSyntax,
sync_dep_no_type_alias: SyncDepNoTypeAlias,
sync_dep_type_alias: SyncDepTypeAlias,
sync_dep_type_syntax: SyncDepTypeSyntax,
) -> dict[str, bool]:
return {
"async_dep_no_type_alias": async_dep_no_type_alias,
"async_dep_type_alias": async_dep_type_alias,
"async_dep_type_syntax": async_dep_type_syntax,
"sync_dep_no_type_alias": sync_dep_no_type_alias,
"sync_dep_type_alias": sync_dep_type_alias,
"sync_dep_type_syntax": sync_dep_type_syntax,
}
# > Declare dependencies
async def async_dep(input: EmptyModel, ctx: Context) -> str:
@@ -231,6 +272,7 @@ def main() -> None:
sync_task_with_dependencies,
durable_async_task_with_dependencies,
di_workflow,
task_with_type_aliases,
],
)
worker.start()
@@ -62,6 +62,7 @@ async def dummy_runs() -> None:
return
@pytest.mark.skip(reason="Very flaky test")
@pytest.mark.parametrize(
"on_demand_worker",
[
@@ -149,6 +150,7 @@ async def test_priority(
assert curr.finished_at >= curr.started_at
@pytest.mark.skip(reason="Very flaky test")
@pytest.mark.parametrize(
"on_demand_worker",
[
+2
View File
@@ -28,6 +28,7 @@ from examples.dependency_injection.worker import (
di_workflow,
durable_async_task_with_dependencies,
sync_task_with_dependencies,
task_with_type_aliases,
)
from examples.dict_input.worker import say_hello_unsafely
from examples.durable.worker import (
@@ -118,6 +119,7 @@ def main() -> None:
async_task_with_dependencies,
sync_task_with_dependencies,
durable_async_task_with_dependencies,
task_with_type_aliases,
say_hello,
say_hello_unsafely,
serde_workflow,
@@ -161,6 +161,9 @@ class ApiException(OpenApiException):
http_resp=http_resp, body=body, data=data
)
if http_resp.status == 429:
raise TooManyRequestsException(http_resp=http_resp, body=body, data=data)
if 500 <= http_resp.status <= 599:
raise ServiceException(http_resp=http_resp, body=body, data=data)
raise ApiException(http_resp=http_resp, body=body, data=data)
@@ -209,6 +212,12 @@ class UnprocessableEntityException(ApiException):
pass
class TooManyRequestsException(ApiException):
"""Exception for HTTP 429 Too Many Requests."""
pass
class RestTransportError(ApiException):
"""Base exception for REST transport-level errors (network, timeout, TLS)."""
@@ -1,13 +1,21 @@
from __future__ import annotations
from collections.abc import Callable
from typing import ParamSpec, TypeVar
from typing import TYPE_CHECKING, ParamSpec, TypeVar
import grpc
import tenacity
from hatchet_sdk.clients.rest.exceptions import NotFoundException, ServiceException
from hatchet_sdk.config import TenacityConfig
from hatchet_sdk.clients.rest.exceptions import (
NotFoundException,
ServiceException,
TooManyRequestsException,
)
from hatchet_sdk.logger import logger
if TYPE_CHECKING:
from hatchet_sdk.config import TenacityConfig
P = ParamSpec("P")
R = TypeVar("R")
@@ -16,12 +24,15 @@ def tenacity_retry(func: Callable[P, R], config: TenacityConfig) -> Callable[P,
if config.max_attempts <= 0:
return func
def should_retry(ex: BaseException) -> bool:
return tenacity_should_retry(ex, config)
return tenacity.retry(
reraise=True,
wait=tenacity.wait_exponential_jitter(),
stop=tenacity.stop_after_attempt(config.max_attempts),
before_sleep=tenacity_alert_retry,
retry=tenacity.retry_if_exception(tenacity_should_retry),
retry=tenacity.retry_if_exception(should_retry),
)(func)
@@ -33,10 +44,16 @@ def tenacity_alert_retry(retry_state: tenacity.RetryCallState) -> None:
)
def tenacity_should_retry(ex: BaseException) -> bool:
def tenacity_should_retry(
ex: BaseException, config: TenacityConfig | None = None
) -> bool:
"""Return True when the exception should be retried."""
if isinstance(ex, ServiceException | NotFoundException):
return True
if isinstance(ex, TooManyRequestsException):
return bool(config and config.retry_429)
if isinstance(ex, grpc.aio.AioRpcError | grpc.RpcError):
return ex.code() not in [
grpc.StatusCode.UNIMPLEMENTED,
+5
View File
@@ -94,6 +94,11 @@ class TenacityConfig(BaseSettings):
max_attempts: int = 5
retry_429: bool = Field(
default=False,
description="Enable retries for HTTP 429 Too Many Requests responses. Default: off.",
)
DEFAULT_HOST_PORT = "localhost:7070"
@@ -24,6 +24,7 @@ from typing import (
)
from pydantic import BaseModel, ConfigDict, TypeAdapter
from typing_inspection.typing_objects import is_typealiastype
from hatchet_sdk.conditions import (
Action,
@@ -279,6 +280,8 @@ class Task(Generic[TWorkflowInput, R]):
) = None,
) -> DependencyToInject | None:
annotation = param.annotation
if is_typealiastype(annotation):
annotation = annotation.__value__
if get_origin(annotation) is Annotated:
args = get_args(annotation)
+650 -623
View File
File diff suppressed because it is too large Load Diff
+6 -4
View File
@@ -1,6 +1,6 @@
[tool.poetry]
name = "hatchet-sdk"
version = "1.25.2"
version = "1.26.1"
description = "This is the official Python SDK for Hatchet, a distributed, fault-tolerant task queue. The SDK allows you to easily integrate Hatchet's task scheduling and workflow orchestration capabilities into your Python applications."
authors = [
"Alexander Belanger <alexander@hatchet.run>",
@@ -23,6 +23,7 @@ aiohttp = "^3.10.5"
tenacity = ">=8.4.1"
prometheus-client = ">=0.21.1"
pydantic-settings = "^2.7.1"
typing-inspection = ">=0.1.0"
## OTel Extra
opentelemetry-api = { version = "^1.28.0", optional = true }
@@ -51,6 +52,7 @@ pytest-env = "^1.1.5"
pytest-retry = "^1.7.0"
psycopg = { extras = ["pool"], version = "^3.2.6" }
pytest-xdist = "^3.7.0"
requests = "^2.32.5"
[tool.poetry.group.docs.dependencies]
@@ -201,9 +203,9 @@ exclude = [
[tool.pydoclint]
style = 'sphinx'
exclude = 'v0|clients/rest/*|contracts/*|.venv|site/*'
arg-type-hints-in-docstring = false # Automatically checked by mypy and mkdocs
check-return-types = false # Automatically checked by mypy and mkdocs
exclude = 'v0|clients/rest/*|contracts/*|.venv|site/*|examples/dependency_injection/dependency_annotations312.py'
arg-type-hints-in-docstring = false # Automatically checked by mypy and mkdocs
check-return-types = false # Automatically checked by mypy and mkdocs
[tool.poetry.plugins."mkdocs.plugins"]
"markdown-export" = "docs.generator.markdown_export:MarkdownExportPlugin"
+67
View File
@@ -0,0 +1,67 @@
"""Unit tests for HTTP 429 Too Many Requests retry behavior."""
import pytest
from hatchet_sdk.clients.rest.exceptions import (
ApiException,
NotFoundException,
ServiceException,
TooManyRequestsException,
)
from hatchet_sdk.clients.rest.tenacity_utils import tenacity_should_retry
from hatchet_sdk.config import TenacityConfig
class FakeHttpResponse:
"""Minimal fake HTTP response for testing ApiException.from_response()."""
def __init__(self, status: int, reason: str = "", data: bytes = b"") -> None:
self.status = status
self.reason = reason
self.data = data
def getheaders(self) -> dict[str, str]:
return {}
def test_from_response__429_raises_too_many_requests_exception() -> None:
"""ApiException.from_response() should raise TooManyRequestsException for status 429."""
http_resp = FakeHttpResponse(status=429, reason="Too Many Requests")
with pytest.raises(TooManyRequestsException) as exc_info:
ApiException.from_response(http_resp=http_resp, body=None, data=None)
exc = exc_info.value
assert exc.status == 429
assert exc.reason == "Too Many Requests"
def test_default__429_not_retried() -> None:
"""By default (no config), TooManyRequestsException should NOT be retried."""
exc = TooManyRequestsException(status=429, reason="Too Many Requests")
assert tenacity_should_retry(exc) is False
def test_optin__429_retried_when_enabled() -> None:
"""TooManyRequestsException should be retried when retry_429=True."""
exc = TooManyRequestsException(status=429, reason="Too Many Requests")
config = TenacityConfig(retry_429=True)
assert tenacity_should_retry(exc, config) is True
def test_regression__service_exception_still_retried() -> None:
"""ServiceException (5xx) should still be retried."""
exc = ServiceException(status=500, reason="Internal Server Error")
assert tenacity_should_retry(exc) is True
def test_regression__not_found_exception_still_retried() -> None:
"""NotFoundException (404) should still be retried."""
exc = NotFoundException(status=404, reason="Not Found")
assert tenacity_should_retry(exc) is True
def test_config__default_retry_429_is_false() -> None:
"""retry_429 should default to False."""
config = TenacityConfig()
assert config.retry_429 is False
+8 -21
View File
@@ -1,14 +1,6 @@
"""Unit tests for the tenacity retry predicate.
"""Unit tests for the tenacity retry predicate (tenacity_should_retry).
These tests verify which exceptions trigger retries and which do not.
The retry predicate is used by the SDK to determine whether to retry
failed API calls.
Current retry behavior (as of this PR):
- REST: ServiceException (5xx) and NotFoundException (404) are retried
- REST: Transport errors (RestTimeoutError, etc.) are not retried
- REST: Other 4xx errors are not retried
- gRPC: Most errors are retried except specific status codes
These tests verify which REST exceptions and gRPC status codes are treated as retryable.
"""
import grpc
@@ -24,12 +16,11 @@ from hatchet_sdk.clients.rest.exceptions import (
RestTLSError,
RestTransportError,
ServiceException,
TooManyRequestsException,
UnauthorizedException,
)
from hatchet_sdk.clients.rest.tenacity_utils import tenacity_should_retry
# --- REST exception retry predicate tests ---
@pytest.mark.parametrize(
("exc", "expected"),
@@ -59,6 +50,11 @@ from hatchet_sdk.clients.rest.tenacity_utils import tenacity_should_retry
False,
id="ForbiddenException (HTTP 403) should not be retried",
),
pytest.param(
TooManyRequestsException(status=429, reason="Too Many Requests"),
False,
id="TooManyRequestsException (HTTP 429) should not be retried by default",
),
],
)
def test_rest__exception_retry_behavior(exc: BaseException, expected: bool) -> None:
@@ -66,9 +62,6 @@ def test_rest__exception_retry_behavior(exc: BaseException, expected: bool) -> N
assert tenacity_should_retry(exc) is expected
# --- REST transport error retry predicate tests ---
@pytest.mark.parametrize(
("exc", "expected"),
[
@@ -104,9 +97,6 @@ def test_transport__error_retry_behavior(exc: BaseException, expected: bool) ->
assert tenacity_should_retry(exc) is expected
# --- Generic exception retry predicate tests ---
@pytest.mark.parametrize(
("exc", "expected"),
[
@@ -132,9 +122,6 @@ def test_generic__exception_retry_behavior(exc: BaseException, expected: bool) -
assert tenacity_should_retry(exc) is expected
# --- gRPC exception retry predicate tests ---
class FakeRpcError(grpc.RpcError):
"""A fake gRPC RpcError for testing without real gRPC infrastructure."""