mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-05-12 21:28:50 -05:00
Feat: Durable Execution Revamp (#2954)
* Feat: Durable Execution Revamp --------- Co-authored-by: Gabe Ruttner <gabriel.ruttner@gmail.com>
This commit is contained in:
@@ -1,8 +1,11 @@
|
||||
import asyncio
|
||||
import time
|
||||
from datetime import timedelta
|
||||
from typing import Any
|
||||
from uuid import uuid4
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from hatchet_sdk import (
|
||||
Context,
|
||||
DurableContext,
|
||||
@@ -12,9 +15,47 @@ from hatchet_sdk import (
|
||||
UserEventCondition,
|
||||
or_,
|
||||
)
|
||||
from hatchet_sdk.exceptions import NonDeterminismError
|
||||
|
||||
hatchet = Hatchet(debug=True)
|
||||
|
||||
|
||||
dag_child_workflow = hatchet.workflow(name="dag-child-workflow")
|
||||
|
||||
|
||||
@dag_child_workflow.task()
|
||||
async def dag_child_1(input: EmptyModel, ctx: Context) -> dict[str, str]:
|
||||
await asyncio.sleep(1)
|
||||
return {"result": "child1"}
|
||||
|
||||
|
||||
@dag_child_workflow.task(parents=[dag_child_1])
|
||||
async def dag_child_2(input: EmptyModel, ctx: Context) -> dict[str, str]:
|
||||
await asyncio.sleep(5)
|
||||
return {"result": "child2"}
|
||||
|
||||
|
||||
@hatchet.durable_task(execution_timeout=timedelta(seconds=10))
|
||||
async def durable_spawn_dag(input: EmptyModel, ctx: DurableContext) -> dict[str, Any]:
|
||||
# NOTE: typically its not safe to use time.time() in a durable task, but
|
||||
# this test assumes that the task is not replayed or evicted and it is
|
||||
# used to ensure that the waits are accurate relative to the single invocation.
|
||||
sleep_start = time.time()
|
||||
sleep_result = await ctx.aio_sleep_for(timedelta(seconds=1))
|
||||
sleep_duration = time.time() - sleep_start
|
||||
|
||||
spawn_start = time.time()
|
||||
spawn_result = await dag_child_workflow.aio_run()
|
||||
spawn_duration = time.time() - spawn_start
|
||||
|
||||
return {
|
||||
"sleep_duration": sleep_duration,
|
||||
"sleep_result": sleep_result,
|
||||
"spawn_duration": spawn_duration,
|
||||
"spawn_result": spawn_result,
|
||||
}
|
||||
|
||||
|
||||
# > Create a durable workflow
|
||||
durable_workflow = hatchet.workflow(name="DurableWorkflow")
|
||||
|
||||
@@ -25,6 +66,7 @@ ephemeral_workflow = hatchet.workflow(name="EphemeralWorkflow")
|
||||
# > Add durable task
|
||||
EVENT_KEY = "durable-example:event"
|
||||
SLEEP_TIME = 5
|
||||
REPLAY_RESET_SLEEP_TIME = 3
|
||||
|
||||
|
||||
@durable_workflow.task()
|
||||
@@ -32,21 +74,26 @@ async def ephemeral_task(input: EmptyModel, ctx: Context) -> None:
|
||||
print("Running non-durable task")
|
||||
|
||||
|
||||
class AwaitedEvent(BaseModel):
|
||||
id: str
|
||||
|
||||
|
||||
@durable_workflow.durable_task()
|
||||
async def durable_task(input: EmptyModel, ctx: DurableContext) -> dict[str, str]:
|
||||
async def durable_task(input: EmptyModel, ctx: DurableContext) -> dict[str, str | int]:
|
||||
print("Waiting for sleep")
|
||||
await ctx.aio_sleep_for(duration=timedelta(seconds=SLEEP_TIME))
|
||||
sleep = await ctx.aio_sleep_for(duration=timedelta(seconds=SLEEP_TIME))
|
||||
print("Sleep finished")
|
||||
|
||||
print("Waiting for event")
|
||||
await ctx.aio_wait_for(
|
||||
"event",
|
||||
UserEventCondition(event_key=EVENT_KEY, expression="true"),
|
||||
event = await ctx.aio_wait_for_event(
|
||||
EVENT_KEY, "true", payload_validator=AwaitedEvent
|
||||
)
|
||||
print("Event received")
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"event_id": event.id,
|
||||
"sleep_duration_seconds": sleep.duration.seconds,
|
||||
}
|
||||
|
||||
|
||||
@@ -58,7 +105,7 @@ async def durable_task(input: EmptyModel, ctx: DurableContext) -> dict[str, str]
|
||||
@durable_workflow.durable_task()
|
||||
async def wait_for_or_group_1(
|
||||
_i: EmptyModel, ctx: DurableContext
|
||||
) -> dict[str, str | int]:
|
||||
) -> dict[str, str | int | float]:
|
||||
start = time.time()
|
||||
wait_result = await ctx.aio_wait_for(
|
||||
uuid4().hex,
|
||||
@@ -72,7 +119,7 @@ async def wait_for_or_group_1(
|
||||
event_id = list(wait_result[key].keys())[0]
|
||||
|
||||
return {
|
||||
"runtime": int(time.time() - start),
|
||||
"runtime": time.time() - start,
|
||||
"key": key,
|
||||
"event_id": event_id,
|
||||
}
|
||||
@@ -83,7 +130,7 @@ async def wait_for_or_group_1(
|
||||
@durable_workflow.durable_task()
|
||||
async def wait_for_or_group_2(
|
||||
_i: EmptyModel, ctx: DurableContext
|
||||
) -> dict[str, str | int]:
|
||||
) -> dict[str, str | int | float]:
|
||||
start = time.time()
|
||||
wait_result = await ctx.aio_wait_for(
|
||||
uuid4().hex,
|
||||
@@ -97,7 +144,7 @@ async def wait_for_or_group_2(
|
||||
event_id = list(wait_result[key].keys())[0]
|
||||
|
||||
return {
|
||||
"runtime": int(time.time() - start),
|
||||
"runtime": time.time() - start,
|
||||
"key": key,
|
||||
"event_id": event_id,
|
||||
}
|
||||
@@ -106,7 +153,7 @@ async def wait_for_or_group_2(
|
||||
@durable_workflow.durable_task()
|
||||
async def wait_for_multi_sleep(
|
||||
_i: EmptyModel, ctx: DurableContext
|
||||
) -> dict[str, str | int]:
|
||||
) -> dict[str, str | float]:
|
||||
start = time.time()
|
||||
|
||||
for _ in range(3):
|
||||
@@ -115,7 +162,7 @@ async def wait_for_multi_sleep(
|
||||
)
|
||||
|
||||
return {
|
||||
"runtime": int(time.time() - start),
|
||||
"runtime": time.time() - start,
|
||||
}
|
||||
|
||||
|
||||
@@ -124,10 +171,18 @@ def ephemeral_task_2(input: EmptyModel, ctx: Context) -> None:
|
||||
print("Running non-durable task")
|
||||
|
||||
|
||||
@hatchet.durable_task()
|
||||
async def memo_now_caching(_i: EmptyModel, ctx: DurableContext) -> dict[str, str]:
|
||||
now = await ctx.aio_now()
|
||||
return {
|
||||
"start_time": now.isoformat(),
|
||||
}
|
||||
|
||||
|
||||
@hatchet.durable_task()
|
||||
async def wait_for_sleep_twice(
|
||||
input: EmptyModel, ctx: DurableContext
|
||||
) -> dict[str, int]:
|
||||
) -> dict[str, float]:
|
||||
try:
|
||||
start = time.time()
|
||||
|
||||
@@ -136,16 +191,163 @@ async def wait_for_sleep_twice(
|
||||
)
|
||||
|
||||
return {
|
||||
"runtime": int(time.time() - start),
|
||||
"runtime": time.time() - start,
|
||||
}
|
||||
except asyncio.CancelledError:
|
||||
return {"runtime": -1}
|
||||
return {"runtime": -1.0}
|
||||
|
||||
|
||||
class DurableBulkSpawnInput(BaseModel):
|
||||
n: int = 1
|
||||
|
||||
|
||||
@hatchet.task(input_validator=DurableBulkSpawnInput)
|
||||
def spawn_child_task(input: DurableBulkSpawnInput, ctx: Context) -> dict[str, str]:
|
||||
return {"message": "hello from child " + str(input.n)}
|
||||
|
||||
|
||||
@hatchet.durable_task(execution_timeout=timedelta(seconds=10))
|
||||
async def durable_with_spawn(input: EmptyModel, ctx: DurableContext) -> dict[str, Any]:
|
||||
child_result = await spawn_child_task.aio_run()
|
||||
return {"child_output": child_result}
|
||||
|
||||
|
||||
@hatchet.durable_task(input_validator=DurableBulkSpawnInput)
|
||||
async def durable_with_bulk_spawn(
|
||||
input: DurableBulkSpawnInput, ctx: DurableContext
|
||||
) -> dict[str, Any]:
|
||||
child_results = await spawn_child_task.aio_run_many(
|
||||
[
|
||||
spawn_child_task.create_bulk_run_item(
|
||||
input=DurableBulkSpawnInput(n=i),
|
||||
)
|
||||
for i in range(input.n)
|
||||
]
|
||||
)
|
||||
return {"child_outputs": child_results}
|
||||
|
||||
|
||||
@hatchet.durable_task()
|
||||
async def durable_sleep_event_spawn(
|
||||
input: EmptyModel, ctx: DurableContext
|
||||
) -> dict[str, Any]:
|
||||
start = time.time()
|
||||
|
||||
await ctx.aio_sleep_for(timedelta(seconds=SLEEP_TIME))
|
||||
|
||||
await ctx.aio_wait_for_event(
|
||||
EVENT_KEY,
|
||||
"true",
|
||||
)
|
||||
|
||||
child_result = await spawn_child_task.aio_run()
|
||||
|
||||
return {
|
||||
"runtime": time.time() - start,
|
||||
"child_output": child_result,
|
||||
}
|
||||
|
||||
|
||||
class NonDeterminismOutput(BaseModel):
|
||||
attempt_number: int
|
||||
sleep_time: int
|
||||
|
||||
non_determinism_detected: bool = False
|
||||
node_id: int | None = None
|
||||
|
||||
|
||||
@hatchet.durable_task(execution_timeout=timedelta(seconds=10))
|
||||
async def durable_non_determinism(
|
||||
input: EmptyModel, ctx: DurableContext
|
||||
) -> NonDeterminismOutput:
|
||||
sleep_time = ctx.attempt_number * 2
|
||||
|
||||
try:
|
||||
await ctx.aio_sleep_for(timedelta(seconds=sleep_time))
|
||||
except NonDeterminismError as e:
|
||||
return NonDeterminismOutput(
|
||||
attempt_number=ctx.attempt_number,
|
||||
sleep_time=sleep_time,
|
||||
non_determinism_detected=True,
|
||||
node_id=e.node_id,
|
||||
)
|
||||
|
||||
return NonDeterminismOutput(
|
||||
attempt_number=ctx.attempt_number,
|
||||
sleep_time=sleep_time,
|
||||
)
|
||||
|
||||
|
||||
class ReplayResetResponse(BaseModel):
|
||||
sleep_1_duration: float
|
||||
sleep_2_duration: float
|
||||
sleep_3_duration: float
|
||||
|
||||
|
||||
@hatchet.durable_task(execution_timeout=timedelta(seconds=20))
|
||||
async def durable_replay_reset(
|
||||
input: EmptyModel, ctx: DurableContext
|
||||
) -> ReplayResetResponse:
|
||||
start = time.time()
|
||||
await ctx.aio_sleep_for(timedelta(seconds=REPLAY_RESET_SLEEP_TIME))
|
||||
sleep_1_duration = time.time() - start
|
||||
|
||||
start = time.time()
|
||||
await ctx.aio_sleep_for(timedelta(seconds=REPLAY_RESET_SLEEP_TIME))
|
||||
sleep_2_duration = time.time() - start
|
||||
|
||||
start = time.time()
|
||||
await ctx.aio_sleep_for(timedelta(seconds=REPLAY_RESET_SLEEP_TIME))
|
||||
sleep_3_duration = time.time() - start
|
||||
|
||||
return ReplayResetResponse(
|
||||
sleep_1_duration=sleep_1_duration,
|
||||
sleep_2_duration=sleep_2_duration,
|
||||
sleep_3_duration=sleep_3_duration,
|
||||
)
|
||||
|
||||
|
||||
class SleepResult(BaseModel):
|
||||
message: str
|
||||
duration: float
|
||||
|
||||
|
||||
class MemoInput(BaseModel):
|
||||
message: str
|
||||
|
||||
|
||||
async def expensive_computation(message: str) -> SleepResult:
|
||||
await asyncio.sleep(SLEEP_TIME)
|
||||
|
||||
return SleepResult(message=message, duration=SLEEP_TIME)
|
||||
|
||||
|
||||
@hatchet.durable_task(input_validator=MemoInput)
|
||||
async def memo_task(input: MemoInput, ctx: DurableContext) -> SleepResult:
|
||||
start = time.time()
|
||||
res = await ctx._aio_memo(
|
||||
expensive_computation,
|
||||
SleepResult,
|
||||
input.message,
|
||||
)
|
||||
|
||||
return SleepResult(message=res.message, duration=time.time() - start)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
worker = hatchet.worker(
|
||||
"durable-worker",
|
||||
workflows=[durable_workflow, ephemeral_workflow, wait_for_sleep_twice],
|
||||
workflows=[
|
||||
durable_workflow,
|
||||
ephemeral_workflow,
|
||||
wait_for_sleep_twice,
|
||||
spawn_child_task,
|
||||
durable_with_spawn,
|
||||
durable_with_bulk_spawn,
|
||||
durable_sleep_event_spawn,
|
||||
durable_non_determinism,
|
||||
durable_replay_reset,
|
||||
],
|
||||
)
|
||||
worker.start()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user