From 2cfb345dcf46658caae691d7e9ca5ea5c22dfc97 Mon Sep 17 00:00:00 2001 From: Matt Kaye Date: Wed, 9 Jul 2025 14:18:41 -0400 Subject: [PATCH] Feat: Handle unicode error, fix OTel error capturing (#1959) * feat: raise illegal task output error out of the runner if we get a null unicode escape sequence * feat: add helper for sanitizing outputs * feat: improve error * fix: lock index * feat: use async stream method in example * chore: ver * chore: changelog * fix: turns out we don't need to lock * fix: return exception for instrumentation * chore: changelog * chore: bunch of generated crap * fix: comment placement * fix: copilot Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- examples/go/bulk-operations/main.go | 11 +- examples/go/z_v0/procedural/main.go | 201 +++--------------- examples/python/quickstart/poetry.lock | 74 ++++--- examples/python/streaming/worker.py | 2 +- .../snips/go/bulk-operations/index.ts | 3 + .../snips/go/bulk-operations/main.ts | 29 +++ .../next/lib/docs/generated/snips/go/index.ts | 2 + .../snips/python/streaming/worker.ts | 2 +- .../snips/go/bulk-operations/main.ts | 18 +- .../snips/python/streaming/worker.ts | 2 +- sdks/python/CHANGELOG.md | 11 + sdks/python/examples/streaming/worker.py | 2 +- sdks/python/hatchet_sdk/__init__.py | 4 +- sdks/python/hatchet_sdk/utils/serde.py | 52 +++++ .../hatchet_sdk/worker/runner/runner.py | 42 +++- sdks/python/pyproject.toml | 2 +- 16 files changed, 221 insertions(+), 236 deletions(-) create mode 100644 frontend/app/src/next/lib/docs/generated/snips/go/bulk-operations/index.ts create mode 100644 frontend/app/src/next/lib/docs/generated/snips/go/bulk-operations/main.ts create mode 100644 sdks/python/hatchet_sdk/utils/serde.py diff --git a/examples/go/bulk-operations/main.go b/examples/go/bulk-operations/main.go index cd2a0ddb0..ef17f4b35 100644 --- a/examples/go/bulk-operations/main.go +++ b/examples/go/bulk-operations/main.go @@ -15,6 +15,7 @@ import ( func main() { // > Setup + hatchet, err := v1.NewHatchetClient() if err != nil { log.Fatalf("failed to create hatchet client: %v", err) @@ -34,7 +35,7 @@ func main() { selectedWorkflow := (*workflows.Rows)[0] selectedWorkflowUUID := uuid.MustParse(selectedWorkflow.Metadata.Id) - // > List Runs + // > List runs workflowRuns, err := hatchet.Runs().List(ctx, rest.V1WorkflowRunListParams{ WorkflowIds: &[]types.UUID{selectedWorkflowUUID}, }) @@ -48,9 +49,7 @@ func main() { runIds = append(runIds, uuid.MustParse(run.Metadata.Id)) } - tNow := time.Now().UTC() - - // > Cancel By Run IDs + // > Cancel by run ids _, err = hatchet.Runs().Cancel(ctx, rest.V1CancelTaskRequest{ ExternalIds: &runIds, }) @@ -58,7 +57,9 @@ func main() { log.Fatalf("failed to cancel runs by ids: %v", err) } - // > Cancel By Filters + // > Cancel by filters + tNow := time.Now().UTC() + _, err = hatchet.Runs().Cancel(ctx, rest.V1CancelTaskRequest{ Filter: &rest.V1TaskFilter{ Since: tNow.Add(-24 * time.Hour), diff --git a/examples/go/z_v0/procedural/main.go b/examples/go/z_v0/procedural/main.go index 17564bf88..8d1271c80 100644 --- a/examples/go/z_v0/procedural/main.go +++ b/examples/go/z_v0/procedural/main.go @@ -13,55 +13,18 @@ import ( "github.com/hatchet-dev/hatchet/pkg/worker" ) -// Global depth configuration -type DepthConfig struct { - MaxDepth int `json:"max_depth"` - Branching map[int]int `json:"branching"` // depth -> number of children -} - -// Default configuration: 3 levels with decreasing branching factor -var GlobalDepthConfig = DepthConfig{ - MaxDepth: 3, - Branching: map[int]int{ - 0: 5, // Root level spawns 5 children - 1: 3, // Level 1 spawns 3 children each - 2: 2, // Level 2 spawns 2 children each - }, -} +const NUM_CHILDREN = 50 type proceduralChildInput struct { - Index int `json:"index"` - Depth int `json:"depth"` - Config DepthConfig `json:"config"` - ParentPath string `json:"parent_path"` + Index int `json:"index"` } type proceduralChildOutput struct { - Index int `json:"index"` - Depth int `json:"depth"` - ChildSum int `json:"child_sum"` - TotalNodes int `json:"total_nodes"` + Index int `json:"index"` } type proceduralParentOutput struct { - ChildSum int `json:"child_sum"` - TotalNodes int `json:"total_nodes"` -} - -// Helper function to calculate maximum possible nodes -func calculateMaxNodes(config DepthConfig) int { - total := 1 // Root node - currentLevelNodes := 1 - - for depth := 0; depth < config.MaxDepth; depth++ { - if branching, exists := config.Branching[depth]; exists && branching > 0 { - currentLevelNodes *= branching - total += currentLevelNodes - } else { - break - } - } - return total + ChildSum int `json:"child_sum"` } func main() { @@ -70,9 +33,7 @@ func main() { panic(err) } - // Calculate max possible events based on config - maxEvents := calculateMaxNodes(GlobalDepthConfig) * 2 // *2 for start/complete events - events := make(chan string, maxEvents) + events := make(chan string, 5*NUM_CHILDREN) interrupt := cmdutils.InterruptChan() cleanup, err := run(events) @@ -109,31 +70,20 @@ func run(events chan<- string) (func() error, error) { worker.NoTrigger(), &worker.WorkflowJob{ Name: "procedural-parent-workflow", - Description: "This is a test of procedural workflows with hierarchical depth.", + Description: "This is a test of procedural workflows.", Steps: []*worker.WorkflowStep{ worker.Fn( func(ctx worker.HatchetContext) (result *proceduralParentOutput, err error) { - // Root level starts at depth 0 - numChildren := GlobalDepthConfig.Branching[0] - if numChildren == 0 { - return &proceduralParentOutput{ChildSum: 0, TotalNodes: 1}, nil - } + childWorkflows := make([]*client.Workflow, NUM_CHILDREN) - childWorkflows := make([]*client.Workflow, numChildren) - - for i := 0; i < numChildren; i++ { + for i := 0; i < NUM_CHILDREN; i++ { childInput := proceduralChildInput{ - Index: i, - Depth: 1, // Children start at depth 1 - Config: GlobalDepthConfig, - ParentPath: "root", + Index: i, } childWorkflow, err := ctx.SpawnWorkflow("procedural-child-workflow", childInput, &worker.SpawnWorkflowOpts{ AdditionalMetadata: &map[string]string{ - "childKey": "childValue", - "depth": fmt.Sprintf("%d", 1), - "parentPath": "root", + "childKey": "childValue", }, }) @@ -143,13 +93,14 @@ func run(events chan<- string) (func() error, error) { childWorkflows[i] = childWorkflow - events <- fmt.Sprintf("root-child-%d-started", i) + events <- fmt.Sprintf("child-%d-started", i) } eg := errgroup.Group{} - eg.SetLimit(numChildren) - childOutputs := make([]proceduralChildOutput, 0) + eg.SetLimit(NUM_CHILDREN) + + childOutputs := make([]int, 0) childOutputsMu := sync.Mutex{} for i, childWorkflow := range childWorkflows { @@ -170,12 +121,13 @@ func run(events chan<- string) (func() error, error) { } childOutputsMu.Lock() - childOutputs = append(childOutputs, childOutput) + childOutputs = append(childOutputs, childOutput.Index) childOutputsMu.Unlock() - events <- fmt.Sprintf("root-child-%d-completed", childOutput.Index) + events <- fmt.Sprintf("child-%d-completed", childOutput.Index) return nil + } }(i, childWorkflow)) } @@ -187,7 +139,7 @@ func run(events chan<- string) (func() error, error) { err = eg.Wait() }() - timer := time.NewTimer(120 * time.Second) // Increased timeout for deeper hierarchy + timer := time.NewTimer(60 * time.Second) select { case <-finishedCh: @@ -200,7 +152,7 @@ func run(events chan<- string) (func() error, error) { for i := range childWorkflows { completed := false for _, childOutput := range childOutputs { - if childOutput.Index == i { + if childOutput == i { completed = true break } @@ -215,21 +167,16 @@ func run(events chan<- string) (func() error, error) { } sum := 0 - totalNodes := 1 // Count root node for _, childOutput := range childOutputs { - sum += childOutput.ChildSum - totalNodes += childOutput.TotalNodes + sum += childOutput } - fmt.Printf("🎯 Parent workflow completed: ChildSum=%d, TotalNodes=%d\n", sum, totalNodes) - return &proceduralParentOutput{ - ChildSum: sum, - TotalNodes: totalNodes, + ChildSum: sum, }, nil }, - ).SetTimeout("15m"), + ).SetTimeout("10m"), }, }, ) @@ -242,7 +189,7 @@ func run(events chan<- string) (func() error, error) { worker.NoTrigger(), &worker.WorkflowJob{ Name: "procedural-child-workflow", - Description: "This is a hierarchical child workflow that can spawn its own children.", + Description: "This is a test of procedural workflows.", Steps: []*worker.WorkflowStep{ worker.Fn( func(ctx worker.HatchetContext) (result *proceduralChildOutput, err error) { @@ -254,105 +201,11 @@ func run(events chan<- string) (func() error, error) { return nil, err } - childSum := input.Index // Start with own index - totalNodes := 1 // Count self - - fmt.Printf("🌲 Node at depth %d (index %d, path %s.%d) starting\n", - input.Depth, input.Index, input.ParentPath, input.Index) - - // Check if we should spawn children at this depth - numChildren, shouldSpawn := input.Config.Branching[input.Depth] - if shouldSpawn && input.Depth < input.Config.MaxDepth { - fmt.Printf("🌱 Spawning %d children at depth %d\n", numChildren, input.Depth+1) - - // Spawn children recursively - childWorkflows := make([]*client.Workflow, numChildren) - - for i := 0; i < numChildren; i++ { - childInput := proceduralChildInput{ - Index: i, - Depth: input.Depth + 1, - Config: input.Config, - ParentPath: fmt.Sprintf("%s.%d", input.ParentPath, input.Index), - } - - childWorkflow, err := ctx.SpawnWorkflow("procedural-child-workflow", childInput, &worker.SpawnWorkflowOpts{ - AdditionalMetadata: &map[string]string{ - "childKey": "childValue", - "depth": fmt.Sprintf("%d", input.Depth+1), - "parentPath": fmt.Sprintf("%s.%d", input.ParentPath, input.Index), - }, - }) - - if err != nil { - return nil, err - } - - childWorkflows[i] = childWorkflow - - events <- fmt.Sprintf("%s.%d-child-%d-started", input.ParentPath, input.Index, i) - } - - // Wait for all children to complete - eg := errgroup.Group{} - eg.SetLimit(numChildren) - - childOutputs := make([]proceduralChildOutput, 0) - childOutputsMu := sync.Mutex{} - - for i, childWorkflow := range childWorkflows { - eg.Go(func(i int, childWorkflow *client.Workflow) func() error { - return func() error { - childResult, err := childWorkflow.Result() - - if err != nil { - return err - } - - childOutput := proceduralChildOutput{} - - err = childResult.StepOutput("step-one", &childOutput) - - if err != nil { - return err - } - - childOutputsMu.Lock() - childOutputs = append(childOutputs, childOutput) - childOutputsMu.Unlock() - - events <- fmt.Sprintf("%s.%d-child-%d-completed", input.ParentPath, input.Index, childOutput.Index) - - return nil - } - }(i, childWorkflow)) - } - - err = eg.Wait() - if err != nil { - return nil, err - } - - // Aggregate child results - for _, childOutput := range childOutputs { - childSum += childOutput.ChildSum - totalNodes += childOutput.TotalNodes - } - - fmt.Printf("🌳 Node at depth %d completed with %d children: ChildSum=%d, TotalNodes=%d\n", - input.Depth, len(childOutputs), childSum, totalNodes) - } else { - fmt.Printf("🍃 Leaf node at depth %d (max depth reached or no branching configured)\n", input.Depth) - } - return &proceduralChildOutput{ - Index: input.Index, - Depth: input.Depth, - ChildSum: childSum, - TotalNodes: totalNodes, + Index: input.Index, }, nil }, - ).SetName("step-one").SetTimeout("10m"), + ).SetName("step-one"), }, }, ) @@ -364,10 +217,6 @@ func run(events chan<- string) (func() error, error) { go func() { time.Sleep(1 * time.Second) - fmt.Printf("🚀 Starting hierarchical workflow with config: MaxDepth=%d, Branching=%v\n", - GlobalDepthConfig.MaxDepth, GlobalDepthConfig.Branching) - fmt.Printf("📊 Expected total nodes: %d\n", calculateMaxNodes(GlobalDepthConfig)) - _, err := c.Admin().RunWorkflow("procedural-parent-workflow", nil) if err != nil { diff --git a/examples/python/quickstart/poetry.lock b/examples/python/quickstart/poetry.lock index 5a29ec4c8..0bb6a6dde 100644 --- a/examples/python/quickstart/poetry.lock +++ b/examples/python/quickstart/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 2.0.0 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -114,7 +114,7 @@ propcache = ">=0.2.0" yarl = ">=1.17.0,<2.0" [package.extras] -speedups = ["Brotli ; platform_python_implementation == \"CPython\"", "aiodns (>=3.2.0) ; sys_platform == \"linux\" or sys_platform == \"darwin\"", "brotlicffi ; platform_python_implementation != \"CPython\""] +speedups = ["Brotli", "aiodns (>=3.2.0)", "brotlicffi"] [[package]] name = "aiohttp-retry" @@ -199,12 +199,12 @@ files = [ ] [package.extras] -benchmark = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-codspeed", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] -cov = ["cloudpickle ; platform_python_implementation == \"CPython\"", "coverage[toml] (>=5.3)", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] -dev = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pre-commit-uv", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] +benchmark = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-codspeed", "pytest-mypy-plugins", "pytest-xdist[psutil]"] +cov = ["cloudpickle", "coverage[toml] (>=5.3)", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] +dev = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pre-commit-uv", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] docs = ["cogapp", "furo", "myst-parser", "sphinx", "sphinx-notfound-page", "sphinxcontrib-towncrier", "towncrier"] -tests = ["cloudpickle ; platform_python_implementation == \"CPython\"", "hypothesis", "mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-xdist[psutil]"] -tests-mypy = ["mypy (>=1.11.1) ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\"", "pytest-mypy-plugins ; platform_python_implementation == \"CPython\" and python_version >= \"3.10\""] +tests = ["cloudpickle", "hypothesis", "mypy (>=1.11.1)", "pympler", "pytest (>=4.3.0)", "pytest-mypy-plugins", "pytest-xdist[psutil]"] +tests-mypy = ["mypy (>=1.11.1)", "pytest-mypy-plugins"] [[package]] name = "cel-python" @@ -460,14 +460,14 @@ setuptools = "*" [[package]] name = "hatchet-sdk" -version = "1.14.2" +version = "1.0.0a1" description = "" optional = false python-versions = "<4.0,>=3.10" groups = ["main"] files = [ - {file = "hatchet_sdk-1.14.2-py3-none-any.whl", hash = "sha256:75af4bf45e516ad3212463764ffe4242a71a1a10fcad278a6c04321e465b733c"}, - {file = "hatchet_sdk-1.14.2.tar.gz", hash = "sha256:00404bc431abdf285c66d1cf31684f4f884fc22f515fa3e79fa11b30801f228f"}, + {file = "hatchet_sdk-1.0.0a1-py3-none-any.whl", hash = "sha256:bfc84358c8842cecd0d95b30645109733b7292dff0db1a776ca862785ee93d7f"}, + {file = "hatchet_sdk-1.0.0a1.tar.gz", hash = "sha256:f0272bbaac6faed75ff727826e9f7b1ac42ae597f9b590e14d392aada9c9692f"}, ] [package.dependencies] @@ -483,11 +483,13 @@ grpcio-tools = [ {version = ">=1.64.1,<1.68.dev0 || >=1.69.dev0", markers = "python_version < \"3.13\""}, {version = ">=1.69.0", markers = "python_version >= \"3.13\""}, ] +nest-asyncio = ">=1.6.0,<2.0.0" prometheus-client = ">=0.21.1,<0.22.0" -protobuf = ">=5.29.5,<6.0.0" +protobuf = ">=5.29.1,<6.0.0" pydantic = ">=2.6.3,<3.0.0" pydantic-settings = ">=2.7.1,<3.0.0" python-dateutil = ">=2.9.0.post0,<3.0.0" +pyyaml = ">=6.0.1,<7.0.0" tenacity = ">=8.4.1" urllib3 = ">=1.26.20" @@ -643,6 +645,18 @@ files = [ [package.dependencies] typing-extensions = {version = ">=4.1.0", markers = "python_version < \"3.11\""} +[[package]] +name = "nest-asyncio" +version = "1.6.0" +description = "Patch asyncio to allow nested event loops" +optional = false +python-versions = ">=3.5" +groups = ["main"] +files = [ + {file = "nest_asyncio-1.6.0-py3-none-any.whl", hash = "sha256:87af6efd6b5e897c81050477ef65c62e2b2f35d51703cae01aff2905b1852e1c"}, + {file = "nest_asyncio-1.6.0.tar.gz", hash = "sha256:6f172d5449aca15afd6c646851f4e31e02c598d553a667e38cafa997cfec55fe"}, +] + [[package]] name = "prometheus-client" version = "0.21.1" @@ -768,23 +782,23 @@ files = [ [[package]] name = "protobuf" -version = "5.29.5" +version = "5.29.4" description = "" optional = false python-versions = ">=3.8" groups = ["main"] files = [ - {file = "protobuf-5.29.5-cp310-abi3-win32.whl", hash = "sha256:3f1c6468a2cfd102ff4703976138844f78ebd1fb45f49011afc5139e9e283079"}, - {file = "protobuf-5.29.5-cp310-abi3-win_amd64.whl", hash = "sha256:3f76e3a3675b4a4d867b52e4a5f5b78a2ef9565549d4037e06cf7b0942b1d3fc"}, - {file = "protobuf-5.29.5-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:e38c5add5a311f2a6eb0340716ef9b039c1dfa428b28f25a7838ac329204a671"}, - {file = "protobuf-5.29.5-cp38-abi3-manylinux2014_aarch64.whl", hash = "sha256:fa18533a299d7ab6c55a238bf8629311439995f2e7eca5caaff08663606e9015"}, - {file = "protobuf-5.29.5-cp38-abi3-manylinux2014_x86_64.whl", hash = "sha256:63848923da3325e1bf7e9003d680ce6e14b07e55d0473253a690c3a8b8fd6e61"}, - {file = "protobuf-5.29.5-cp38-cp38-win32.whl", hash = "sha256:ef91363ad4faba7b25d844ef1ada59ff1604184c0bcd8b39b8a6bef15e1af238"}, - {file = "protobuf-5.29.5-cp38-cp38-win_amd64.whl", hash = "sha256:7318608d56b6402d2ea7704ff1e1e4597bee46d760e7e4dd42a3d45e24b87f2e"}, - {file = "protobuf-5.29.5-cp39-cp39-win32.whl", hash = "sha256:6f642dc9a61782fa72b90878af134c5afe1917c89a568cd3476d758d3c3a0736"}, - {file = "protobuf-5.29.5-cp39-cp39-win_amd64.whl", hash = "sha256:470f3af547ef17847a28e1f47200a1cbf0ba3ff57b7de50d22776607cd2ea353"}, - {file = "protobuf-5.29.5-py3-none-any.whl", hash = "sha256:6cf42630262c59b2d8de33954443d94b746c952b01434fc58a417fdbd2e84bd5"}, - {file = "protobuf-5.29.5.tar.gz", hash = "sha256:bc1463bafd4b0929216c35f437a8e28731a2b7fe3d98bb77a600efced5a15c84"}, + {file = "protobuf-5.29.4-cp310-abi3-win32.whl", hash = "sha256:13eb236f8eb9ec34e63fc8b1d6efd2777d062fa6aaa68268fb67cf77f6839ad7"}, + {file = "protobuf-5.29.4-cp310-abi3-win_amd64.whl", hash = "sha256:bcefcdf3976233f8a502d265eb65ea740c989bacc6c30a58290ed0e519eb4b8d"}, + {file = "protobuf-5.29.4-cp38-abi3-macosx_10_9_universal2.whl", hash = "sha256:307ecba1d852ec237e9ba668e087326a67564ef83e45a0189a772ede9e854dd0"}, + {file = "protobuf-5.29.4-cp38-abi3-manylinux2014_aarch64.whl", hash = "sha256:aec4962f9ea93c431d5714ed1be1c93f13e1a8618e70035ba2b0564d9e633f2e"}, + {file = "protobuf-5.29.4-cp38-abi3-manylinux2014_x86_64.whl", hash = "sha256:d7d3f7d1d5a66ed4942d4fefb12ac4b14a29028b209d4bfb25c68ae172059922"}, + {file = "protobuf-5.29.4-cp38-cp38-win32.whl", hash = "sha256:1832f0515b62d12d8e6ffc078d7e9eb06969aa6dc13c13e1036e39d73bebc2de"}, + {file = "protobuf-5.29.4-cp38-cp38-win_amd64.whl", hash = "sha256:476cb7b14914c780605a8cf62e38c2a85f8caff2e28a6a0bad827ec7d6c85d68"}, + {file = "protobuf-5.29.4-cp39-cp39-win32.whl", hash = "sha256:fd32223020cb25a2cc100366f1dedc904e2d71d9322403224cdde5fdced0dabe"}, + {file = "protobuf-5.29.4-cp39-cp39-win_amd64.whl", hash = "sha256:678974e1e3a9b975b8bc2447fca458db5f93a2fb6b0c8db46b6675b5b5346812"}, + {file = "protobuf-5.29.4-py3-none-any.whl", hash = "sha256:3fde11b505e1597f71b875ef2fc52062b6a9740e5f7c8997ce878b6009145862"}, + {file = "protobuf-5.29.4.tar.gz", hash = "sha256:4f1dfcd7997b31ef8f53ec82781ff434a28bf71d9102ddde14d076adcfc78c99"}, ] [[package]] @@ -806,7 +820,7 @@ typing-extensions = ">=4.12.2" [package.extras] email = ["email-validator (>=2.0.0)"] -timezone = ["tzdata ; python_version >= \"3.9\" and platform_system == \"Windows\""] +timezone = ["tzdata"] [[package]] name = "pydantic-core" @@ -1048,13 +1062,13 @@ files = [ ] [package.extras] -check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1) ; sys_platform != \"cygwin\"", "ruff (>=0.8.0) ; sys_platform != \"cygwin\""] -core = ["importlib_metadata (>=6) ; python_version < \"3.10\"", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1) ; python_version < \"3.11\"", "wheel (>=0.43.0)"] +check = ["pytest-checkdocs (>=2.4)", "pytest-ruff (>=0.2.1)", "ruff (>=0.8.0)"] +core = ["importlib_metadata (>=6)", "jaraco.functools (>=4)", "jaraco.text (>=3.7)", "more_itertools", "more_itertools (>=8.8)", "packaging (>=24.2)", "platformdirs (>=4.2.2)", "tomli (>=2.0.1)", "wheel (>=0.43.0)"] cover = ["pytest-cov"] doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier", "towncrier (<24.7)"] enabler = ["pytest-enabler (>=2.2)"] -test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21) ; python_version >= \"3.9\" and sys_platform != \"cygwin\"", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf ; sys_platform != \"cygwin\"", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"] -type = ["importlib_metadata (>=7.0.2) ; python_version < \"3.10\"", "jaraco.develop (>=7.21) ; sys_platform != \"cygwin\"", "mypy (==1.14.*)", "pytest-mypy"] +test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.7.2)", "jaraco.test (>=5.5)", "packaging (>=24.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.*)", "pytest-home (>=0.5)", "pytest-perf", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel (>=0.44.0)"] +type = ["importlib_metadata (>=7.0.2)", "jaraco.develop (>=7.21)", "mypy (==1.14.*)", "pytest-mypy"] [[package]] name = "six" @@ -1133,7 +1147,7 @@ files = [ ] [package.extras] -brotli = ["brotli (>=1.0.9) ; platform_python_implementation == \"CPython\"", "brotlicffi (>=0.8.0) ; platform_python_implementation != \"CPython\""] +brotli = ["brotli (>=1.0.9)", "brotlicffi (>=0.8.0)"] h2 = ["h2 (>=4,<5)"] socks = ["pysocks (>=1.5.6,!=1.5.7,<2.0)"] zstd = ["zstandard (>=0.18.0)"] @@ -1238,4 +1252,4 @@ propcache = ">=0.2.0" [metadata] lock-version = "2.1" python-versions = "^3.10" -content-hash = "f97ce5425c150f6fd3bfc9433378bfee784d9b3694a95b1162cd578881727928" +content-hash = "74c12e499aa797ca5c8559af579f1212b0e4e3a77f068f9385db39d70ba304e0" diff --git a/examples/python/streaming/worker.py b/examples/python/streaming/worker.py index fe0c73d1e..f9a41d409 100644 --- a/examples/python/streaming/worker.py +++ b/examples/python/streaming/worker.py @@ -28,7 +28,7 @@ async def stream_task(input: EmptyModel, ctx: Context) -> None: await asyncio.sleep(2) for chunk in chunks: - ctx.put_stream(chunk) + await ctx.aio_put_stream(chunk) await asyncio.sleep(0.20) diff --git a/frontend/app/src/next/lib/docs/generated/snips/go/bulk-operations/index.ts b/frontend/app/src/next/lib/docs/generated/snips/go/bulk-operations/index.ts new file mode 100644 index 000000000..65b980543 --- /dev/null +++ b/frontend/app/src/next/lib/docs/generated/snips/go/bulk-operations/index.ts @@ -0,0 +1,3 @@ +import main from './main'; + +export { main }; diff --git a/frontend/app/src/next/lib/docs/generated/snips/go/bulk-operations/main.ts b/frontend/app/src/next/lib/docs/generated/snips/go/bulk-operations/main.ts new file mode 100644 index 000000000..66a01cc6e --- /dev/null +++ b/frontend/app/src/next/lib/docs/generated/snips/go/bulk-operations/main.ts @@ -0,0 +1,29 @@ +import { Snippet } from '@/next/lib/docs/generated/snips/types'; + +const snippet: Snippet = { + language: 'go', + content: + 'package main\n\nimport (\n\t"context"\n\t"fmt"\n\t"log"\n\t"time"\n\n\t"github.com/google/uuid"\n\t"github.com/oapi-codegen/runtime/types"\n\n\t"github.com/hatchet-dev/hatchet/pkg/client/rest"\n\tv1 "github.com/hatchet-dev/hatchet/pkg/v1"\n)\n\nfunc main() {\n\t// > Setup\n\n\thatchet, err := v1.NewHatchetClient()\n\tif err != nil {\n\t\tlog.Fatalf("failed to create hatchet client: %v", err)\n\t}\n\n\tctx := context.Background()\n\n\tworkflows, err := hatchet.Workflows().List(ctx, nil)\n\tif err != nil {\n\t\tlog.Fatalf("failed to list workflows: %v", err)\n\t}\n\n\tif workflows == nil || workflows.Rows == nil || len(*workflows.Rows) == 0 {\n\t\tlog.Fatalf("no workflows found")\n\t}\n\n\tselectedWorkflow := (*workflows.Rows)[0]\n\tselectedWorkflowUUID := uuid.MustParse(selectedWorkflow.Metadata.Id)\n\n\n\t// > List runs\n\tworkflowRuns, err := hatchet.Runs().List(ctx, rest.V1WorkflowRunListParams{\n\t\tWorkflowIds: &[]types.UUID{selectedWorkflowUUID},\n\t})\n\tif err != nil || workflowRuns == nil || workflowRuns.JSON200 == nil || workflowRuns.JSON200.Rows == nil {\n\t\tlog.Fatalf("failed to list workflow runs for workflow %s: %v", selectedWorkflow.Name, err)\n\t}\n\n\tvar runIds []types.UUID\n\n\tfor _, run := range workflowRuns.JSON200.Rows {\n\t\trunIds = append(runIds, uuid.MustParse(run.Metadata.Id))\n\t}\n\n\n\t// > Cancel by run ids\n\t_, err = hatchet.Runs().Cancel(ctx, rest.V1CancelTaskRequest{\n\t\tExternalIds: &runIds,\n\t})\n\tif err != nil {\n\t\tlog.Fatalf("failed to cancel runs by ids: %v", err)\n\t}\n\n\n\t// > Cancel by filters\n\ttNow := time.Now().UTC()\n\n\t_, err = hatchet.Runs().Cancel(ctx, rest.V1CancelTaskRequest{\n\t\tFilter: &rest.V1TaskFilter{\n\t\t\tSince: tNow.Add(-24 * time.Hour),\n\t\t\tUntil: &tNow,\n\t\t\tStatuses: &[]rest.V1TaskStatus{rest.V1TaskStatusRUNNING},\n\t\t\tWorkflowIds: &[]types.UUID{selectedWorkflowUUID},\n\t\t\tAdditionalMetadata: &[]string{`{"key": "value"}`},\n\t\t},\n\t})\n\tif err != nil {\n\t\tlog.Fatalf("failed to cancel runs by filters: %v", err)\n\t}\n\n\n\tfmt.Println("cancelled all runs for workflow", selectedWorkflow.Name)\n}\n', + source: 'out/go/bulk-operations/main.go', + blocks: { + setup: { + start: 18, + stop: 37, + }, + list_runs: { + start: 40, + stop: 52, + }, + cancel_by_run_ids: { + start: 55, + stop: 61, + }, + cancel_by_filters: { + start: 64, + stop: 78, + }, + }, + highlights: {}, +}; + +export default snippet; diff --git a/frontend/app/src/next/lib/docs/generated/snips/go/index.ts b/frontend/app/src/next/lib/docs/generated/snips/go/index.ts index 6d38417ed..0918d56d3 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/go/index.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/go/index.ts @@ -1,3 +1,4 @@ +import * as bulk_operations from './bulk-operations'; import * as migration_guides from './migration-guides'; import * as quickstart from './quickstart'; import * as run from './run'; @@ -5,6 +6,7 @@ import * as worker from './worker'; import * as workflows from './workflows'; import * as z_v0 from './z_v0'; +export { bulk_operations }; export { migration_guides }; export { quickstart }; export { run }; diff --git a/frontend/app/src/next/lib/docs/generated/snips/python/streaming/worker.ts b/frontend/app/src/next/lib/docs/generated/snips/python/streaming/worker.ts index f9f784ffe..d6b8d97a8 100644 --- a/frontend/app/src/next/lib/docs/generated/snips/python/streaming/worker.ts +++ b/frontend/app/src/next/lib/docs/generated/snips/python/streaming/worker.ts @@ -3,7 +3,7 @@ import { Snippet } from '@/next/lib/docs/generated/snips/types'; const snippet: Snippet = { language: 'python', content: - 'import asyncio\nfrom typing import Generator\n\nfrom hatchet_sdk import Context, EmptyModel, Hatchet\n\nhatchet = Hatchet(debug=False)\n\n# > Streaming\n\nanna_karenina = """\nHappy families are all alike; every unhappy family is unhappy in its own way.\n\nEverything was in confusion in the Oblonskys\' house. The wife had discovered that the husband was carrying on an intrigue with a French girl, who had been a governess in their family, and she had announced to her husband that she could not go on living in the same house with him.\n"""\n\n\ndef create_chunks(content: str, n: int) -> Generator[str, None, None]:\n for i in range(0, len(content), n):\n yield content[i : i + n]\n\n\nchunks = list(create_chunks(anna_karenina, 10))\n\n\n@hatchet.task()\nasync def stream_task(input: EmptyModel, ctx: Context) -> None:\n # 👀 Sleeping to avoid race conditions\n await asyncio.sleep(2)\n\n for chunk in chunks:\n ctx.put_stream(chunk)\n await asyncio.sleep(0.20)\n\n\n\n\ndef main() -> None:\n worker = hatchet.worker("test-worker", workflows=[stream_task])\n worker.start()\n\n\nif __name__ == "__main__":\n main()\n', + 'import asyncio\nfrom typing import Generator\n\nfrom hatchet_sdk import Context, EmptyModel, Hatchet\n\nhatchet = Hatchet(debug=False)\n\n# > Streaming\n\nanna_karenina = """\nHappy families are all alike; every unhappy family is unhappy in its own way.\n\nEverything was in confusion in the Oblonskys\' house. The wife had discovered that the husband was carrying on an intrigue with a French girl, who had been a governess in their family, and she had announced to her husband that she could not go on living in the same house with him.\n"""\n\n\ndef create_chunks(content: str, n: int) -> Generator[str, None, None]:\n for i in range(0, len(content), n):\n yield content[i : i + n]\n\n\nchunks = list(create_chunks(anna_karenina, 10))\n\n\n@hatchet.task()\nasync def stream_task(input: EmptyModel, ctx: Context) -> None:\n # 👀 Sleeping to avoid race conditions\n await asyncio.sleep(2)\n\n for chunk in chunks:\n await ctx.aio_put_stream(chunk)\n await asyncio.sleep(0.20)\n\n\n\n\ndef main() -> None:\n worker = hatchet.worker("test-worker", workflows=[stream_task])\n worker.start()\n\n\nif __name__ == "__main__":\n main()\n', source: 'out/python/streaming/worker.py', blocks: { streaming: { diff --git a/frontend/docs/lib/generated/snips/go/bulk-operations/main.ts b/frontend/docs/lib/generated/snips/go/bulk-operations/main.ts index 71b41ec2c..d7b60792c 100644 --- a/frontend/docs/lib/generated/snips/go/bulk-operations/main.ts +++ b/frontend/docs/lib/generated/snips/go/bulk-operations/main.ts @@ -2,24 +2,24 @@ import { Snippet } from '@/lib/generated/snips/types'; const snippet: Snippet = { "language": "go", - "content": "package main\n\nimport (\n\t\"context\"\n\t\"fmt\"\n\t\"log\"\n\t\"time\"\n\n\t\"github.com/google/uuid\"\n\t\"github.com/hatchet-dev/hatchet/pkg/client/rest\"\n\tv1 \"github.com/hatchet-dev/hatchet/pkg/v1\"\n\t\"github.com/oapi-codegen/runtime/types\"\n)\n\nfunc main() {\n\t// > Setup\n\n\thatchet, err := v1.NewHatchetClient()\n\tif err != nil {\n\t\tlog.Fatalf(\"failed to create hatchet client: %v\", err)\n\t}\n\n\tctx := context.Background()\n\n\tworkflows, err := hatchet.Workflows().List(ctx, nil)\n\tif err != nil {\n\t\tlog.Fatalf(\"failed to list workflows: %v\", err)\n\t}\n\n\tif workflows == nil || workflows.Rows == nil || len(*workflows.Rows) == 0 {\n\t\tlog.Fatalf(\"no workflows found\")\n\t}\n\n\tselectedWorkflow := (*workflows.Rows)[0]\n\tselectedWorkflowUUID := types.UUID(uuid.MustParse(selectedWorkflow.Metadata.Id))\n\n\n\t// > List runs\n\tworkflowRuns, err := hatchet.Runs().List(ctx, rest.V1WorkflowRunListParams{\n\t\tWorkflowIds: &[]types.UUID{selectedWorkflowUUID},\n\t})\n\tif err != nil || workflowRuns == nil || workflowRuns.JSON200 == nil || workflowRuns.JSON200.Rows == nil {\n\t\tlog.Fatalf(\"failed to list workflow runs for workflow %s: %v\", selectedWorkflow.Name, err)\n\t}\n\n\tvar runIds []types.UUID\n\n\tfor _, run := range workflowRuns.JSON200.Rows {\n\t\trunIds = append(runIds, types.UUID(uuid.MustParse(run.Metadata.Id)))\n\t}\n\n\n\t// > Cancel by run ids\n\t_, err = hatchet.Runs().Cancel(ctx, rest.V1CancelTaskRequest{\n\t\tExternalIds: &runIds,\n\t})\n\tif err != nil {\n\t\tlog.Fatalf(\"failed to cancel runs by ids: %v\", err)\n\t}\n\n\n\t// > Cancel by filters\n\ttNow := time.Now().UTC()\n\n\t_, err = hatchet.Runs().Cancel(ctx, rest.V1CancelTaskRequest{\n\t\tFilter: &rest.V1TaskFilter{\n\t\t\tSince: tNow.Add(-24 * time.Hour),\n\t\t\tUntil: &tNow,\n\t\t\tStatuses: &[]rest.V1TaskStatus{rest.V1TaskStatusRUNNING},\n\t\t\tWorkflowIds: &[]types.UUID{selectedWorkflowUUID},\n\t\t\tAdditionalMetadata: &[]string{`{\"key\": \"value\"}`},\n\t\t},\n\t})\n\tif err != nil {\n\t\tlog.Fatalf(\"failed to cancel runs by filters: %v\", err)\n\t}\n\n\n\tfmt.Println(\"cancelled all runs for workflow\", selectedWorkflow.Name)\n}\n", + "content": "package main\n\nimport (\n\t\"context\"\n\t\"fmt\"\n\t\"log\"\n\t\"time\"\n\n\t\"github.com/google/uuid\"\n\t\"github.com/oapi-codegen/runtime/types\"\n\n\t\"github.com/hatchet-dev/hatchet/pkg/client/rest\"\n\tv1 \"github.com/hatchet-dev/hatchet/pkg/v1\"\n)\n\nfunc main() {\n\t// > Setup\n\n\thatchet, err := v1.NewHatchetClient()\n\tif err != nil {\n\t\tlog.Fatalf(\"failed to create hatchet client: %v\", err)\n\t}\n\n\tctx := context.Background()\n\n\tworkflows, err := hatchet.Workflows().List(ctx, nil)\n\tif err != nil {\n\t\tlog.Fatalf(\"failed to list workflows: %v\", err)\n\t}\n\n\tif workflows == nil || workflows.Rows == nil || len(*workflows.Rows) == 0 {\n\t\tlog.Fatalf(\"no workflows found\")\n\t}\n\n\tselectedWorkflow := (*workflows.Rows)[0]\n\tselectedWorkflowUUID := uuid.MustParse(selectedWorkflow.Metadata.Id)\n\n\n\t// > List runs\n\tworkflowRuns, err := hatchet.Runs().List(ctx, rest.V1WorkflowRunListParams{\n\t\tWorkflowIds: &[]types.UUID{selectedWorkflowUUID},\n\t})\n\tif err != nil || workflowRuns == nil || workflowRuns.JSON200 == nil || workflowRuns.JSON200.Rows == nil {\n\t\tlog.Fatalf(\"failed to list workflow runs for workflow %s: %v\", selectedWorkflow.Name, err)\n\t}\n\n\tvar runIds []types.UUID\n\n\tfor _, run := range workflowRuns.JSON200.Rows {\n\t\trunIds = append(runIds, uuid.MustParse(run.Metadata.Id))\n\t}\n\n\n\t// > Cancel by run ids\n\t_, err = hatchet.Runs().Cancel(ctx, rest.V1CancelTaskRequest{\n\t\tExternalIds: &runIds,\n\t})\n\tif err != nil {\n\t\tlog.Fatalf(\"failed to cancel runs by ids: %v\", err)\n\t}\n\n\n\t// > Cancel by filters\n\ttNow := time.Now().UTC()\n\n\t_, err = hatchet.Runs().Cancel(ctx, rest.V1CancelTaskRequest{\n\t\tFilter: &rest.V1TaskFilter{\n\t\t\tSince: tNow.Add(-24 * time.Hour),\n\t\t\tUntil: &tNow,\n\t\t\tStatuses: &[]rest.V1TaskStatus{rest.V1TaskStatusRUNNING},\n\t\t\tWorkflowIds: &[]types.UUID{selectedWorkflowUUID},\n\t\t\tAdditionalMetadata: &[]string{`{\"key\": \"value\"}`},\n\t\t},\n\t})\n\tif err != nil {\n\t\tlog.Fatalf(\"failed to cancel runs by filters: %v\", err)\n\t}\n\n\n\tfmt.Println(\"cancelled all runs for workflow\", selectedWorkflow.Name)\n}\n", "source": "out/go/bulk-operations/main.go", "blocks": { "setup": { - "start": 17, - "stop": 36 + "start": 18, + "stop": 37 }, "list_runs": { - "start": 39, - "stop": 51 + "start": 40, + "stop": 52 }, "cancel_by_run_ids": { - "start": 54, - "stop": 60 + "start": 55, + "stop": 61 }, "cancel_by_filters": { - "start": 63, - "stop": 77 + "start": 64, + "stop": 78 } }, "highlights": {} diff --git a/frontend/docs/lib/generated/snips/python/streaming/worker.ts b/frontend/docs/lib/generated/snips/python/streaming/worker.ts index 83803f8b2..8216deca8 100644 --- a/frontend/docs/lib/generated/snips/python/streaming/worker.ts +++ b/frontend/docs/lib/generated/snips/python/streaming/worker.ts @@ -2,7 +2,7 @@ import { Snippet } from '@/lib/generated/snips/types'; const snippet: Snippet = { "language": "python", - "content": "import asyncio\nfrom typing import Generator\n\nfrom hatchet_sdk import Context, EmptyModel, Hatchet\n\nhatchet = Hatchet(debug=False)\n\n# > Streaming\n\nanna_karenina = \"\"\"\nHappy families are all alike; every unhappy family is unhappy in its own way.\n\nEverything was in confusion in the Oblonskys' house. The wife had discovered that the husband was carrying on an intrigue with a French girl, who had been a governess in their family, and she had announced to her husband that she could not go on living in the same house with him.\n\"\"\"\n\n\ndef create_chunks(content: str, n: int) -> Generator[str, None, None]:\n for i in range(0, len(content), n):\n yield content[i : i + n]\n\n\nchunks = list(create_chunks(anna_karenina, 10))\n\n\n@hatchet.task()\nasync def stream_task(input: EmptyModel, ctx: Context) -> None:\n # 👀 Sleeping to avoid race conditions\n await asyncio.sleep(2)\n\n for chunk in chunks:\n ctx.put_stream(chunk)\n await asyncio.sleep(0.20)\n\n\n\n\ndef main() -> None:\n worker = hatchet.worker(\"test-worker\", workflows=[stream_task])\n worker.start()\n\n\nif __name__ == \"__main__\":\n main()\n", + "content": "import asyncio\nfrom typing import Generator\n\nfrom hatchet_sdk import Context, EmptyModel, Hatchet\n\nhatchet = Hatchet(debug=False)\n\n# > Streaming\n\nanna_karenina = \"\"\"\nHappy families are all alike; every unhappy family is unhappy in its own way.\n\nEverything was in confusion in the Oblonskys' house. The wife had discovered that the husband was carrying on an intrigue with a French girl, who had been a governess in their family, and she had announced to her husband that she could not go on living in the same house with him.\n\"\"\"\n\n\ndef create_chunks(content: str, n: int) -> Generator[str, None, None]:\n for i in range(0, len(content), n):\n yield content[i : i + n]\n\n\nchunks = list(create_chunks(anna_karenina, 10))\n\n\n@hatchet.task()\nasync def stream_task(input: EmptyModel, ctx: Context) -> None:\n # 👀 Sleeping to avoid race conditions\n await asyncio.sleep(2)\n\n for chunk in chunks:\n await ctx.aio_put_stream(chunk)\n await asyncio.sleep(0.20)\n\n\n\n\ndef main() -> None:\n worker = hatchet.worker(\"test-worker\", workflows=[stream_task])\n worker.start()\n\n\nif __name__ == \"__main__\":\n main()\n", "source": "out/python/streaming/worker.py", "blocks": { "streaming": { diff --git a/sdks/python/CHANGELOG.md b/sdks/python/CHANGELOG.md index e8a51cccc..9076d3644 100644 --- a/sdks/python/CHANGELOG.md +++ b/sdks/python/CHANGELOG.md @@ -5,6 +5,17 @@ All notable changes to Hatchet's Python SDK will be documented in this changelog The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.14.3] - 2025-07-03 + +### Added + +Adds `remove_null_unicode_character` utility function to remove null unicode characters from data structures. + +### Changed + +- Task outputs that contain a null unicode character (\u0000) will now throw an exception instead of being serialized. +- OpenTelemetry instrumentor now correctly reports exceptions raised in tasks to the OTel collector. + ## [1.14.2] - 2025-07-03 ### Added diff --git a/sdks/python/examples/streaming/worker.py b/sdks/python/examples/streaming/worker.py index 8302bcdd4..75e6caf06 100644 --- a/sdks/python/examples/streaming/worker.py +++ b/sdks/python/examples/streaming/worker.py @@ -28,7 +28,7 @@ async def stream_task(input: EmptyModel, ctx: Context) -> None: await asyncio.sleep(2) for chunk in chunks: - ctx.put_stream(chunk) + await ctx.aio_put_stream(chunk) await asyncio.sleep(0.20) diff --git a/sdks/python/hatchet_sdk/__init__.py b/sdks/python/hatchet_sdk/__init__.py index 697e8ed12..e3246d52a 100644 --- a/sdks/python/hatchet_sdk/__init__.py +++ b/sdks/python/hatchet_sdk/__init__.py @@ -11,9 +11,9 @@ from hatchet_sdk.clients.listeners.run_event_listener import ( StepRunEventType, WorkflowRunEventType, ) -from hatchet_sdk.clients.rest.models.accept_invite_request import AcceptInviteRequest # import models into sdk package +from hatchet_sdk.clients.rest.models.accept_invite_request import AcceptInviteRequest from hatchet_sdk.clients.rest.models.api_error import APIError from hatchet_sdk.clients.rest.models.api_errors import APIErrors from hatchet_sdk.clients.rest.models.api_meta import APIMeta @@ -166,6 +166,7 @@ from hatchet_sdk.runnables.types import ( ) from hatchet_sdk.runnables.workflow import TaskRunRef from hatchet_sdk.utils.opentelemetry import OTelAttribute +from hatchet_sdk.utils.serde import remove_null_unicode_character from hatchet_sdk.worker.worker import Worker, WorkerStartOptions, WorkerStatus from hatchet_sdk.workflow_run import WorkflowRunRef @@ -290,5 +291,6 @@ __all__ = [ "WorkflowVersionDefinition", "WorkflowVersionMeta", "or_", + "remove_null_unicode_character", "workflow", ] diff --git a/sdks/python/hatchet_sdk/utils/serde.py b/sdks/python/hatchet_sdk/utils/serde.py new file mode 100644 index 000000000..932cb0161 --- /dev/null +++ b/sdks/python/hatchet_sdk/utils/serde.py @@ -0,0 +1,52 @@ +from typing import Any, TypeVar, cast, overload + +T = TypeVar("T") +K = TypeVar("K") + + +@overload +def remove_null_unicode_character(data: str) -> str: ... + + +@overload +def remove_null_unicode_character(data: dict[K, T]) -> dict[K, T]: ... + + +@overload +def remove_null_unicode_character(data: list[T]) -> list[T]: ... + + +@overload +def remove_null_unicode_character(data: tuple[T, ...]) -> tuple[T, ...]: ... + + +def remove_null_unicode_character( + data: str | dict[K, T] | list[T] | tuple[T, ...], +) -> str | dict[K, T] | list[T] | tuple[T, ...]: + """ + Recursively traverse a dictionary (a task's output) and remove the unicode escape sequence \\u0000 which will cause unexpected behavior in Hatchet. + + Needed as Hatchet does not support \\u0000 in task outputs + + :param data: The task output (a JSON-serializable dictionary or mapping) + :return: The same dictionary with all \\u0000 characters removed from strings, and nested dictionaries/lists processed recursively. + :raises TypeError: If the input is not a string, dictionary, list, or tuple. + """ + if isinstance(data, str): + return data.replace("\u0000", "") + + if isinstance(data, dict): + return { + key: remove_null_unicode_character(cast(Any, value)) + for key, value in data.items() + } + + if isinstance(data, list): + return [remove_null_unicode_character(cast(Any, item)) for item in data] + + if isinstance(data, tuple): + return tuple(remove_null_unicode_character(cast(Any, item)) for item in data) + + raise TypeError( + f"Unsupported type {type(data)}. Expected str, dict, list, or tuple." + ) diff --git a/sdks/python/hatchet_sdk/worker/runner/runner.py b/sdks/python/hatchet_sdk/worker/runner/runner.py index d6249bf02..9955748c5 100644 --- a/sdks/python/hatchet_sdk/worker/runner/runner.py +++ b/sdks/python/hatchet_sdk/worker/runner/runner.py @@ -4,9 +4,9 @@ import functools import json from collections.abc import Callable from concurrent.futures import ThreadPoolExecutor -from contextlib import suppress from enum import Enum from multiprocessing import Queue +from textwrap import dedent from threading import Thread, current_thread from typing import Any, Literal, cast, overload @@ -49,6 +49,7 @@ from hatchet_sdk.runnables.contextvars import ( ) from hatchet_sdk.runnables.task import Task from hatchet_sdk.runnables.types import R, TWorkflowInput +from hatchet_sdk.utils.serde import remove_null_unicode_character from hatchet_sdk.worker.action_listener_process import ActionEvent from hatchet_sdk.worker.runner.utils.capture_logs import ( AsyncLogSender, @@ -410,7 +411,7 @@ class Runner: ) ## IMPORTANT: Keep this method's signature in sync with the wrapper in the OTel instrumentor - async def handle_start_step_run(self, action: Action) -> None: + async def handle_start_step_run(self, action: Action) -> Exception | None: action_name = action.action_id # Find the corresponding action function from the registry @@ -444,8 +445,11 @@ class Runner: ## FIXME: Handle cancelled exceptions and other special exceptions ## that we don't want to suppress here - with suppress(Exception): + try: await task + except Exception as e: + ## Used for the OTel instrumentor to capture exceptions + return e ## Once the step run completes, we need to remove the workflow spawn index ## so we don't leak memory @@ -453,6 +457,8 @@ class Runner: async with spawn_index_lock: workflow_spawn_indices.pop(action.key) + return None + ## IMPORTANT: Keep this method's signature in sync with the wrapper in the OTel instrumentor async def handle_start_group_key_run(self, action: Action) -> Exception | None: action_name = action.action_id @@ -557,14 +563,30 @@ class Runner: f"Tasks must return either a dictionary or a Pydantic BaseModel which can be serialized to a JSON object. Got object of type {type(output)} instead." ) - if output is not None: - try: - return json.dumps(output, default=str) - except Exception as e: - logger.error(f"Could not serialize output: {e}") - return str(output) + if output is None: + return "" - return "" + try: + serialized_output = json.dumps(output, default=str) + except Exception as e: + logger.error(f"Could not serialize output: {e}") + serialized_output = str(output) + + if "\\u0000" in serialized_output: + raise IllegalTaskOutputError( + dedent( + f""" + Task outputs cannot contain the unicode null character \\u0000 + + Please see this Discord thread: https://discord.com/channels/1088927970518909068/1384324576166678710/1386714014565928992 + Relevant Postgres documentation: https://www.postgresql.org/docs/current/datatype-json.html + + Use `hatchet_sdk.{remove_null_unicode_character.__name__}` to sanitize your output if you'd like to remove the character. + """ + ) + ) + + return serialized_output async def wait_for_tasks(self) -> None: running = len(self.tasks.keys()) diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index 4d8a1b34b..80c41ff31 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "hatchet-sdk" -version = "1.14.2" +version = "1.14.3" description = "" authors = ["Alexander Belanger "] readme = "README.md"