V1 SDKs and Docs (#1361)

New SDKs and docs for the v1 release.
This commit is contained in:
Matt Kaye
2025-03-25 18:45:07 -04:00
committed by GitHub
parent 76e4fdaae5
commit 5062bf1e3e
623 changed files with 28812 additions and 4804 deletions

View File

@@ -1,5 +0,0 @@
from hatchet_sdk import new_client
client = new_client()
client.event.push("user:create", {"test": "test"})

View File

@@ -1,23 +1,20 @@
import pytest
from hatchet_sdk import Hatchet, Worker
from examples.timeout.worker import refresh_timeout_wf, timeout_wf
from hatchet_sdk import Hatchet
# requires scope module or higher for shared event loop
@pytest.mark.asyncio(scope="session")
@pytest.mark.parametrize("worker", ["timeout"], indirect=True)
async def test_run_timeout(hatchet: Hatchet, worker: Worker) -> None:
run = hatchet.admin.run_workflow("TimeoutWorkflow", {})
try:
@pytest.mark.asyncio(loop_scope="session")
async def test_execution_timeout(hatchet: Hatchet) -> None:
run = timeout_wf.run_no_wait()
with pytest.raises(Exception, match="(Task exceeded timeout|TIMED_OUT)"):
await run.aio_result()
assert False, "Expected workflow to timeout"
except Exception as e:
assert str(e) == "Workflow Errors: ['TIMED_OUT']"
@pytest.mark.asyncio(scope="session")
@pytest.mark.parametrize("worker", ["timeout"], indirect=True)
async def test_run_refresh_timeout(hatchet: Hatchet, worker: Worker) -> None:
run = hatchet.admin.run_workflow("RefreshTimeoutWorkflow", {})
result = await run.aio_result()
assert result["step1"]["status"] == "success"
@pytest.mark.asyncio(loop_scope="session")
async def test_run_refresh_timeout(hatchet: Hatchet) -> None:
result = await refresh_timeout_wf.aio_run()
assert result["refresh_task"]["status"] == "success"

View File

@@ -0,0 +1,4 @@
from examples.timeout.worker import refresh_timeout_wf, timeout_wf
timeout_wf.run()
refresh_timeout_wf.run()

View File

@@ -1,40 +1,50 @@
import time
from datetime import timedelta
from hatchet_sdk import BaseWorkflow, Context, Hatchet
from hatchet_sdk import Context, EmptyModel, Hatchet, TaskDefaults
hatchet = Hatchet(debug=True)
timeout_wf = hatchet.declare_workflow(on_events=["timeout:create"])
# ❓ ScheduleTimeout
timeout_wf = hatchet.workflow(
name="TimeoutWorkflow",
task_defaults=TaskDefaults(execution_timeout=timedelta(minutes=2)),
)
# ‼️
class TimeoutWorkflow(BaseWorkflow):
config = timeout_wf.config
@hatchet.step(timeout="4s")
def step1(self, context: Context) -> dict[str, str]:
time.sleep(5)
return {"status": "success"}
# ❓ ExecutionTimeout
# 👀 Specify an execution timeout on a task
@timeout_wf.task(
execution_timeout=timedelta(seconds=4), schedule_timeout=timedelta(minutes=10)
)
def timeout_task(input: EmptyModel, ctx: Context) -> dict[str, str]:
time.sleep(5)
return {"status": "success"}
refresh_timeout_wf = hatchet.declare_workflow(on_events=["refresh:create"])
# ‼️
refresh_timeout_wf = hatchet.workflow(name="RefreshTimeoutWorkflow")
class RefreshTimeoutWorkflow(BaseWorkflow):
config = refresh_timeout_wf.config
# ❓ RefreshTimeout
@refresh_timeout_wf.task(execution_timeout=timedelta(seconds=4))
def refresh_task(input: EmptyModel, ctx: Context) -> dict[str, str]:
@hatchet.step(timeout="4s")
def step1(self, context: Context) -> dict[str, str]:
ctx.refresh_timeout(timedelta(seconds=10))
time.sleep(5)
context.refresh_timeout("10s")
time.sleep(5)
return {"status": "success"}
return {"status": "success"}
# ‼️
def main() -> None:
worker = hatchet.worker("timeout-worker", max_runs=4)
worker.register_workflow(TimeoutWorkflow())
worker.register_workflow(RefreshTimeoutWorkflow())
worker = hatchet.worker(
"timeout-worker", slots=4, workflows=[timeout_wf, refresh_timeout_wf]
)
worker.start()