mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-03-20 19:50:47 -05:00
Feat: Durable event log wiring (#2956)
* feat: initial protos * chore: lint * fix: work on improving naming * chore: rename session id to invocation count * feat: scaffold implementation of durabletask rpc * fix: one more session rename * feat: initial work on the server scaffolding * chore: gen protos for python * feat: initial durable task client * feat: initial durable context work for python * fix: pass client through to runner * fix: clean up type checking errors * fix: cruft * feat: initial work wiring up durable events * fix: get -> getorcreate * feat: query + wiring for updating latest node id * fix: simplify, bump latest node ids in the same query * chore: note * feat: wire up sleeps with internal signal matches * chore: gen * fix: callback data writes * feat: cache previous events * fix: wire up external id writes * feat: got sleeps sorta working! * fix: tenant and external id wiring * chore: comments * fix: clean up some types a bit * feat: add run triggering params to proto to allow for spawning children * feat: first pass at child spawning * feat: start wiring up child spawning * fix: use `triggerWriter` for spawn * feat: update trigger proto def * chore: regen python * feat: start wiring up spawning correctly with all opts * refactor: share trigger code * chore: remove log lines, lint * fix: add triggered run external id * feat: start wiring up child key storage better * chore: gen again * fix: gen, colname * fix: trigger opts panicking * hack: get things working for now * feat: shared rpc message * chore: fix imports * feat: add tenant id to tables * fix: improve ingest logic * refactor: shared trigger opt type * fix: send tenant id through everywhere * chore: fix log file insert on conflict * fix: repo * fix: generate external id upstream * feat: add columns to the match * feat: first pass at durable waits on the controllers instead of the dispatcher * fix: types * feat: wire up callbacks * fix: invoc counts * fix: typing, lint * driveby: more constants for message ids * refactor: struct for callback keys everywhere * fix: bugs, passing tests * fix: return errnorows * fix: schema * fix: remove current callback flow * feat: new message types * fix: remove key from callback model * fix: rm unused queries * refactor: start reworking flow * fix: start working on feedback * fix: query * fix: wire up external ids * revert: drive by * refactor: rm extra interface * chore: move listener, lint * refactor: remove old listener, rename * refactor: consolidate migrations * fix: immediately send already-satisfied callbacks * fix: union * chore: rm unused queries * fix: check if entry already exists before re-spawning / signaling * fix: node id incrementation * fix: rm json dump * fix: don't pass node id * fix: store latest invocation, update query * fix: upsert logic * Revert "fix: upsert logic" This reverts commitcf7c609c1d. * fix: change logic slightly * fix: split up get and create queries * fix: err * fix: pass node ids around properly * fix: invocation handling * fix: callback bug * fix: naming * fix: rm cruft method, dynamic kind * fix: wire up memo payload and kind stuff * fix: propagate trigger opts * fix: child spawn signaling + olap wiring * fix: extract output method * feat: improve test coverage a bit * fix: child spawning * feat: another test * fix: query fixes, overwrite * fix: match bug * fix: proto indexes, regen * fix: eviction comment * fix: warning for non-async durable tasks * fix: rm contracts import * fix: basic locking, rm sync durable tasks * fix: invocation counts, etc. * chore: add fixme * fix: rm unused invocation count param from callback response * fix: rm dispatcher id from the callback * fix: di test * Revert "fix: rm dispatcher id from the callback" This reverts commit26e6c82797. * fix: migration * fix: use optimistictx * fix: lift grpc codes out of trigger repo * fix: span names * fix: rm comment * fix: consolidate kind types, batching, not-null kinds * fix: null bug * fix: satisfied claim bug, simplify queries * fix: add back payload storage * fix: match bug, simplification * fix: factor out trigger opts to the dispatcher level * fix: factor out conditions * fix: rm unused structs * fix: rm dupes * fix: migration * refactor: switch case helpers * fix: panic * fix: couple warnings * fix: lint * fix: generate external ids properly * refactor: return trigger task data from helper * fix: handle matches correctly for dag spawns * fix: add validators, one more uuid type * chore: gen * chore: bump pytest-asyncio to latest * fix: store the worker instead of the dispatcher, then look up the dispatcher * fix: store dispatcher id on the worker * chore: lint
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
import asyncio
|
||||
import time
|
||||
from datetime import timedelta
|
||||
from typing import Any
|
||||
from uuid import uuid4
|
||||
|
||||
from hatchet_sdk import (
|
||||
@@ -15,6 +16,40 @@ from hatchet_sdk import (
|
||||
|
||||
hatchet = Hatchet(debug=True)
|
||||
|
||||
|
||||
dag_child_workflow = hatchet.workflow(name="dag-child-workflow")
|
||||
|
||||
|
||||
@dag_child_workflow.task()
|
||||
async def dag_child_1(input: EmptyModel, ctx: Context) -> dict[str, str]:
|
||||
await asyncio.sleep(1)
|
||||
return {"result": "child1"}
|
||||
|
||||
|
||||
@dag_child_workflow.task()
|
||||
async def dag_child_2(input: EmptyModel, ctx: Context) -> dict[str, str]:
|
||||
await asyncio.sleep(5)
|
||||
return {"result": "child2"}
|
||||
|
||||
|
||||
@hatchet.durable_task()
|
||||
async def durable_spawn_dag(input: EmptyModel, ctx: DurableContext) -> dict[str, Any]:
|
||||
sleep_start = time.time()
|
||||
sleep_result = await ctx.aio_sleep_for(timedelta(seconds=1))
|
||||
sleep_duration = time.time() - sleep_start
|
||||
|
||||
spawn_start = time.time()
|
||||
spawn_result = await dag_child_workflow.aio_run()
|
||||
spawn_duration = time.time() - spawn_start
|
||||
|
||||
return {
|
||||
"sleep_duration": sleep_duration,
|
||||
"sleep_result": sleep_result,
|
||||
"spawn_duration": spawn_duration,
|
||||
"spawn_result": spawn_result,
|
||||
}
|
||||
|
||||
|
||||
# > Create a durable workflow
|
||||
durable_workflow = hatchet.workflow(name="DurableWorkflow")
|
||||
|
||||
@@ -142,10 +177,49 @@ async def wait_for_sleep_twice(
|
||||
return {"runtime": -1}
|
||||
|
||||
|
||||
@hatchet.task()
|
||||
def spawn_child_task(input: EmptyModel, ctx: Context) -> dict[str, str]:
|
||||
return {"message": "hello from child"}
|
||||
|
||||
|
||||
@hatchet.durable_task()
|
||||
async def durable_with_spawn(input: EmptyModel, ctx: DurableContext) -> dict[str, Any]:
|
||||
child_result = await spawn_child_task.aio_run()
|
||||
return {"child_output": child_result}
|
||||
|
||||
|
||||
@hatchet.durable_task()
|
||||
async def durable_sleep_event_spawn(
|
||||
input: EmptyModel, ctx: DurableContext
|
||||
) -> dict[str, Any]:
|
||||
start = time.time()
|
||||
|
||||
await ctx.aio_sleep_for(timedelta(seconds=SLEEP_TIME))
|
||||
|
||||
await ctx.aio_wait_for(
|
||||
"event",
|
||||
UserEventCondition(event_key=EVENT_KEY, expression="true"),
|
||||
)
|
||||
|
||||
child_result = await spawn_child_task.aio_run()
|
||||
|
||||
return {
|
||||
"runtime": int(time.time() - start),
|
||||
"child_output": child_result,
|
||||
}
|
||||
|
||||
|
||||
def main() -> None:
|
||||
worker = hatchet.worker(
|
||||
"durable-worker",
|
||||
workflows=[durable_workflow, ephemeral_workflow, wait_for_sleep_twice],
|
||||
workflows=[
|
||||
durable_workflow,
|
||||
ephemeral_workflow,
|
||||
wait_for_sleep_twice,
|
||||
spawn_child_task,
|
||||
durable_with_spawn,
|
||||
durable_sleep_event_spawn,
|
||||
],
|
||||
)
|
||||
worker.start()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user