add: CoroutineLike & AwaitableLike types (#1713)

* add: CoroutineLike & AwaitableLike types

* fix: type checker

* fix: reduce type narrowing on _parse_task_name

* fix: rm _aio_output helper

* Revert "fix: rm _aio_output helper"

This reverts commit ff51126c43.

* fix: rm _aio_output helper

* chore: version

* feat: add tests for all the standalone run flavors

* refactor: simplify internals a bit

---------

Co-authored-by: mrkaye97 <mrkaye97@gmail.com>
This commit is contained in:
Nathan Gage
2025-05-14 11:06:14 -04:00
committed by GitHub
parent 8c1b93b4f8
commit 99bcfa1037
11 changed files with 127 additions and 64 deletions
@@ -0,0 +1,36 @@
import pytest
from examples.simple.worker import step1
@pytest.mark.asyncio(loop_scope="session")
async def test_simple_workflow_running_options() -> None:
x1 = step1.run()
x2 = await step1.aio_run()
x3 = step1.run_many([step1.create_bulk_run_item()])[0]
x4 = (await step1.aio_run_many([step1.create_bulk_run_item()]))[0]
x5 = step1.run_no_wait().result()
x6 = (await step1.aio_run_no_wait()).result()
x7 = [x.result() for x in step1.run_many_no_wait([step1.create_bulk_run_item()])][0]
x8 = [
x.result()
for x in await step1.aio_run_many_no_wait([step1.create_bulk_run_item()])
][0]
x9 = await step1.run_no_wait().aio_result()
x10 = await (await step1.aio_run_no_wait()).aio_result()
x11 = [
await x.aio_result()
for x in step1.run_many_no_wait([step1.create_bulk_run_item()])
][0]
x12 = [
await x.aio_result()
for x in await step1.aio_run_many_no_wait([step1.create_bulk_run_item()])
][0]
assert all(
x == {"result": "Hello, world!"}
for x in [x1, x2, x3, x4, x5, x6, x7, x8, x9, x10, x11, x12]
)
+3 -3
View File
@@ -6,12 +6,12 @@ hatchet = Hatchet(debug=True)
@hatchet.task(name="SimpleWorkflow")
def step1(input: EmptyModel, ctx: Context) -> None:
print("executed step1")
def step1(input: EmptyModel, ctx: Context) -> dict[str, str]:
return {"result": "Hello, world!"}
def main() -> None:
worker = hatchet.worker("test-worker", slots=1, workflows=[step1])
worker = hatchet.worker("test-worker", workflows=[step1])
worker.start()