mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-01-07 01:09:38 -06:00
Fe overhaul docs (#1640)
* api changes * doc changes * move docs * generated * generate * pkg * backmerge main * revert to main * revert main * race? * remove go tests
This commit is contained in:
320
examples/python/priority/test_priority.py
Normal file
320
examples/python/priority/test_priority.py
Normal file
@@ -0,0 +1,320 @@
|
||||
import asyncio
|
||||
from datetime import datetime, timedelta
|
||||
from random import choice
|
||||
from typing import AsyncGenerator, Literal
|
||||
from uuid import uuid4
|
||||
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
from pydantic import BaseModel
|
||||
|
||||
from examples.priority.worker import DEFAULT_PRIORITY, SLEEP_TIME, priority_workflow
|
||||
from hatchet_sdk import Hatchet, ScheduleTriggerWorkflowOptions, TriggerWorkflowOptions
|
||||
from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus
|
||||
|
||||
Priority = Literal["low", "medium", "high", "default"]
|
||||
|
||||
|
||||
class RunPriorityStartedAt(BaseModel):
|
||||
priority: Priority
|
||||
started_at: datetime
|
||||
finished_at: datetime
|
||||
|
||||
|
||||
def priority_to_int(priority: Priority) -> int:
|
||||
match priority:
|
||||
case "high":
|
||||
return 3
|
||||
case "medium":
|
||||
return 2
|
||||
case "low":
|
||||
return 1
|
||||
case "default":
|
||||
return DEFAULT_PRIORITY
|
||||
case _:
|
||||
raise ValueError(f"Invalid priority: {priority}")
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(loop_scope="session", scope="function")
|
||||
async def dummy_runs() -> None:
|
||||
priority: Priority = "high"
|
||||
|
||||
await priority_workflow.aio_run_many_no_wait(
|
||||
[
|
||||
priority_workflow.create_bulk_run_item(
|
||||
options=TriggerWorkflowOptions(
|
||||
priority=(priority_to_int(priority)),
|
||||
additional_metadata={
|
||||
"priority": priority,
|
||||
"key": ix,
|
||||
"type": "dummy",
|
||||
},
|
||||
)
|
||||
)
|
||||
for ix in range(40)
|
||||
]
|
||||
)
|
||||
|
||||
await asyncio.sleep(3)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@pytest.mark.asyncio()
|
||||
async def test_priority(hatchet: Hatchet, dummy_runs: None) -> None:
|
||||
test_run_id = str(uuid4())
|
||||
choices: list[Priority] = ["low", "medium", "high", "default"]
|
||||
N = 30
|
||||
|
||||
run_refs = await priority_workflow.aio_run_many_no_wait(
|
||||
[
|
||||
priority_workflow.create_bulk_run_item(
|
||||
options=TriggerWorkflowOptions(
|
||||
priority=(priority_to_int(priority := choice(choices))),
|
||||
additional_metadata={
|
||||
"priority": priority,
|
||||
"key": ix,
|
||||
"test_run_id": test_run_id,
|
||||
},
|
||||
)
|
||||
)
|
||||
for ix in range(N)
|
||||
]
|
||||
)
|
||||
|
||||
await asyncio.gather(*[r.aio_result() for r in run_refs])
|
||||
|
||||
workflows = (
|
||||
await hatchet.workflows.aio_list(workflow_name=priority_workflow.name)
|
||||
).rows
|
||||
|
||||
assert workflows
|
||||
|
||||
workflow = next((w for w in workflows if w.name == priority_workflow.name), None)
|
||||
|
||||
assert workflow
|
||||
|
||||
assert workflow.name == priority_workflow.name
|
||||
|
||||
runs = await hatchet.runs.aio_list(
|
||||
workflow_ids=[workflow.metadata.id],
|
||||
additional_metadata={
|
||||
"test_run_id": test_run_id,
|
||||
},
|
||||
limit=1_000,
|
||||
)
|
||||
|
||||
runs_ids_started_ats: list[RunPriorityStartedAt] = sorted(
|
||||
[
|
||||
RunPriorityStartedAt(
|
||||
priority=(r.additional_metadata or {}).get("priority") or "low",
|
||||
started_at=r.started_at or datetime.min,
|
||||
finished_at=r.finished_at or datetime.min,
|
||||
)
|
||||
for r in runs.rows
|
||||
],
|
||||
key=lambda x: x.started_at,
|
||||
)
|
||||
|
||||
assert len(runs_ids_started_ats) == len(run_refs)
|
||||
assert len(runs_ids_started_ats) == N
|
||||
|
||||
for i in range(len(runs_ids_started_ats) - 1):
|
||||
curr = runs_ids_started_ats[i]
|
||||
nxt = runs_ids_started_ats[i + 1]
|
||||
|
||||
"""Run start times should be in order of priority"""
|
||||
assert priority_to_int(curr.priority) >= priority_to_int(nxt.priority)
|
||||
|
||||
"""Runs should proceed one at a time"""
|
||||
assert curr.finished_at <= nxt.finished_at
|
||||
assert nxt.finished_at >= nxt.started_at
|
||||
|
||||
"""Runs should finish after starting (this is mostly a test for engine datetime handling bugs)"""
|
||||
assert curr.finished_at >= curr.started_at
|
||||
|
||||
|
||||
@pytest.mark.asyncio()
|
||||
async def test_priority_via_scheduling(hatchet: Hatchet, dummy_runs: None) -> None:
|
||||
test_run_id = str(uuid4())
|
||||
sleep_time = 3
|
||||
n = 30
|
||||
choices: list[Priority] = ["low", "medium", "high", "default"]
|
||||
run_at = datetime.now() + timedelta(seconds=sleep_time)
|
||||
|
||||
versions = await asyncio.gather(
|
||||
*[
|
||||
priority_workflow.aio_schedule(
|
||||
run_at=run_at,
|
||||
options=ScheduleTriggerWorkflowOptions(
|
||||
priority=(priority_to_int(priority := choice(choices))),
|
||||
additional_metadata={
|
||||
"priority": priority,
|
||||
"key": ix,
|
||||
"test_run_id": test_run_id,
|
||||
},
|
||||
),
|
||||
)
|
||||
for ix in range(n)
|
||||
]
|
||||
)
|
||||
|
||||
await asyncio.sleep(sleep_time * 2)
|
||||
|
||||
workflow_id = versions[0].workflow_id
|
||||
|
||||
attempts = 0
|
||||
|
||||
while True:
|
||||
if attempts >= SLEEP_TIME * n * 2:
|
||||
raise TimeoutError("Timed out waiting for runs to finish")
|
||||
|
||||
attempts += 1
|
||||
await asyncio.sleep(1)
|
||||
runs = await hatchet.runs.aio_list(
|
||||
workflow_ids=[workflow_id],
|
||||
additional_metadata={
|
||||
"test_run_id": test_run_id,
|
||||
},
|
||||
limit=1_000,
|
||||
)
|
||||
|
||||
if not runs.rows:
|
||||
continue
|
||||
|
||||
if any(
|
||||
r.status in [V1TaskStatus.FAILED, V1TaskStatus.CANCELLED] for r in runs.rows
|
||||
):
|
||||
raise ValueError("One or more runs failed or were cancelled")
|
||||
|
||||
if all(r.status == V1TaskStatus.COMPLETED for r in runs.rows):
|
||||
break
|
||||
|
||||
runs_ids_started_ats: list[RunPriorityStartedAt] = sorted(
|
||||
[
|
||||
RunPriorityStartedAt(
|
||||
priority=(r.additional_metadata or {}).get("priority") or "low",
|
||||
started_at=r.started_at or datetime.min,
|
||||
finished_at=r.finished_at or datetime.min,
|
||||
)
|
||||
for r in runs.rows
|
||||
],
|
||||
key=lambda x: x.started_at,
|
||||
)
|
||||
|
||||
assert len(runs_ids_started_ats) == len(versions)
|
||||
|
||||
for i in range(len(runs_ids_started_ats) - 1):
|
||||
curr = runs_ids_started_ats[i]
|
||||
nxt = runs_ids_started_ats[i + 1]
|
||||
|
||||
"""Run start times should be in order of priority"""
|
||||
assert priority_to_int(curr.priority) >= priority_to_int(nxt.priority)
|
||||
|
||||
"""Runs should proceed one at a time"""
|
||||
assert curr.finished_at <= nxt.finished_at
|
||||
assert nxt.finished_at >= nxt.started_at
|
||||
|
||||
"""Runs should finish after starting (this is mostly a test for engine datetime handling bugs)"""
|
||||
assert curr.finished_at >= curr.started_at
|
||||
|
||||
|
||||
@pytest_asyncio.fixture(loop_scope="session", scope="function")
|
||||
async def crons(
|
||||
hatchet: Hatchet, dummy_runs: None
|
||||
) -> AsyncGenerator[tuple[str, str, int], None]:
|
||||
test_run_id = str(uuid4())
|
||||
choices: list[Priority] = ["low", "medium", "high"]
|
||||
n = 30
|
||||
|
||||
crons = await asyncio.gather(
|
||||
*[
|
||||
hatchet.cron.aio_create(
|
||||
workflow_name=priority_workflow.name,
|
||||
cron_name=f"{test_run_id}-cron-{i}",
|
||||
expression="* * * * *",
|
||||
input={},
|
||||
additional_metadata={
|
||||
"trigger": "cron",
|
||||
"test_run_id": test_run_id,
|
||||
"priority": (priority := choice(choices)),
|
||||
"key": str(i),
|
||||
},
|
||||
priority=(priority_to_int(priority)),
|
||||
)
|
||||
for i in range(n)
|
||||
]
|
||||
)
|
||||
|
||||
yield crons[0].workflow_id, test_run_id, n
|
||||
|
||||
await asyncio.gather(*[hatchet.cron.aio_delete(cron.metadata.id) for cron in crons])
|
||||
|
||||
|
||||
def time_until_next_minute() -> float:
|
||||
now = datetime.now()
|
||||
next_minute = now.replace(second=0, microsecond=0, minute=now.minute + 1)
|
||||
|
||||
return (next_minute - now).total_seconds()
|
||||
|
||||
|
||||
@pytest.mark.asyncio()
|
||||
async def test_priority_via_cron(hatchet: Hatchet, crons: tuple[str, str, int]) -> None:
|
||||
workflow_id, test_run_id, n = crons
|
||||
|
||||
await asyncio.sleep(time_until_next_minute() + 10)
|
||||
|
||||
attempts = 0
|
||||
|
||||
while True:
|
||||
if attempts >= SLEEP_TIME * n * 2:
|
||||
raise TimeoutError("Timed out waiting for runs to finish")
|
||||
|
||||
attempts += 1
|
||||
await asyncio.sleep(1)
|
||||
runs = await hatchet.runs.aio_list(
|
||||
workflow_ids=[workflow_id],
|
||||
additional_metadata={
|
||||
"test_run_id": test_run_id,
|
||||
},
|
||||
limit=1_000,
|
||||
)
|
||||
|
||||
if not runs.rows:
|
||||
continue
|
||||
|
||||
if any(
|
||||
r.status in [V1TaskStatus.FAILED, V1TaskStatus.CANCELLED] for r in runs.rows
|
||||
):
|
||||
raise ValueError("One or more runs failed or were cancelled")
|
||||
|
||||
if all(r.status == V1TaskStatus.COMPLETED for r in runs.rows):
|
||||
break
|
||||
|
||||
runs_ids_started_ats: list[RunPriorityStartedAt] = sorted(
|
||||
[
|
||||
RunPriorityStartedAt(
|
||||
priority=(r.additional_metadata or {}).get("priority") or "low",
|
||||
started_at=r.started_at or datetime.min,
|
||||
finished_at=r.finished_at or datetime.min,
|
||||
)
|
||||
for r in runs.rows
|
||||
],
|
||||
key=lambda x: x.started_at,
|
||||
)
|
||||
|
||||
assert len(runs_ids_started_ats) == n
|
||||
|
||||
for i in range(len(runs_ids_started_ats) - 1):
|
||||
curr = runs_ids_started_ats[i]
|
||||
nxt = runs_ids_started_ats[i + 1]
|
||||
|
||||
"""Run start times should be in order of priority"""
|
||||
assert priority_to_int(curr.priority) >= priority_to_int(nxt.priority)
|
||||
|
||||
"""Runs should proceed one at a time"""
|
||||
assert curr.finished_at <= nxt.finished_at
|
||||
assert nxt.finished_at >= nxt.started_at
|
||||
|
||||
"""Runs should finish after starting (this is mostly a test for engine datetime handling bugs)"""
|
||||
assert curr.finished_at >= curr.started_at
|
||||
51
examples/python/priority/trigger.py
Normal file
51
examples/python/priority/trigger.py
Normal file
@@ -0,0 +1,51 @@
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
from examples.priority.worker import priority_workflow
|
||||
from hatchet_sdk import ScheduleTriggerWorkflowOptions, TriggerWorkflowOptions
|
||||
|
||||
priority_workflow.run_no_wait()
|
||||
|
||||
# > Runtime priority
|
||||
low_prio = priority_workflow.run_no_wait(
|
||||
options=TriggerWorkflowOptions(
|
||||
## 👀 Adding priority and key to metadata to show them in the dashboard
|
||||
priority=1,
|
||||
additional_metadata={"priority": "low", "key": 1},
|
||||
)
|
||||
)
|
||||
|
||||
high_prio = priority_workflow.run_no_wait(
|
||||
options=TriggerWorkflowOptions(
|
||||
## 👀 Adding priority and key to metadata to show them in the dashboard
|
||||
priority=3,
|
||||
additional_metadata={"priority": "high", "key": 1},
|
||||
)
|
||||
)
|
||||
|
||||
# > Scheduled priority
|
||||
schedule = priority_workflow.schedule(
|
||||
run_at=datetime.now() + timedelta(minutes=1),
|
||||
options=ScheduleTriggerWorkflowOptions(priority=3),
|
||||
)
|
||||
|
||||
cron = priority_workflow.create_cron(
|
||||
cron_name="my-scheduled-cron",
|
||||
expression="0 * * * *",
|
||||
priority=3,
|
||||
)
|
||||
|
||||
# > Default priority
|
||||
low_prio = priority_workflow.run_no_wait(
|
||||
options=TriggerWorkflowOptions(
|
||||
## 👀 Adding priority and key to metadata to show them in the dashboard
|
||||
priority=1,
|
||||
additional_metadata={"priority": "low", "key": 2},
|
||||
)
|
||||
)
|
||||
high_prio = priority_workflow.run_no_wait(
|
||||
options=TriggerWorkflowOptions(
|
||||
## 👀 Adding priority and key to metadata to show them in the dashboard
|
||||
priority=3,
|
||||
additional_metadata={"priority": "high", "key": 2},
|
||||
)
|
||||
)
|
||||
45
examples/python/priority/worker.py
Normal file
45
examples/python/priority/worker.py
Normal file
@@ -0,0 +1,45 @@
|
||||
import time
|
||||
|
||||
from hatchet_sdk import (
|
||||
ConcurrencyExpression,
|
||||
ConcurrencyLimitStrategy,
|
||||
Context,
|
||||
EmptyModel,
|
||||
Hatchet,
|
||||
)
|
||||
|
||||
hatchet = Hatchet(debug=True)
|
||||
|
||||
# > Default priority
|
||||
DEFAULT_PRIORITY = 1
|
||||
SLEEP_TIME = 0.25
|
||||
|
||||
priority_workflow = hatchet.workflow(
|
||||
name="PriorityWorkflow",
|
||||
default_priority=DEFAULT_PRIORITY,
|
||||
concurrency=ConcurrencyExpression(
|
||||
max_runs=1,
|
||||
expression="'true'",
|
||||
limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
@priority_workflow.task()
|
||||
def priority_task(input: EmptyModel, ctx: Context) -> None:
|
||||
print("Priority:", ctx.priority)
|
||||
time.sleep(SLEEP_TIME)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
worker = hatchet.worker(
|
||||
"priority-worker",
|
||||
slots=1,
|
||||
workflows=[priority_workflow],
|
||||
)
|
||||
|
||||
worker.start()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user