Files
hatchet/sdks/python/examples/priority/test_priority.py
Matt Kaye 80137736af Feat: Priority (#1513)
* feat: initial work wiring up priorities

* fix: add default to default prio in the db

* feat: wire priority through api on wf creation

* feat: extend python test

* feat: priority for scheduled workflows

* feat: wire priority through python api

* feat: more wiring priority through the api

* feat: I think it works?

* feat: e2e test for priority

* it works!

* feat: expand tests for default priorities

* feat: e2e scheduling test

* fix: skip broken test for now

* fix: lint

* feat: add priority columns to cron and schedule ref  tables

* feat: update inserts to include prio

* feat: wire up more apis

* feat: more wiring

* feat: wire up more rest api fields

* chore: cruft

* fix: more wiring

* fix: lint

* chore: gen + wire up priorities

* fix: retries

* fix: try changing fixture scope

* chore: bump version again

* feat: send priority with action payload

* fix: generate script

* Feat  priority ts (#1518)

* feat: initial work wiring up priorities

* fix: add default to default prio in the db

* feat: wire priority through api on wf creation

* feat: extend python test

* feat: priority for scheduled workflows

* feat: wire priority through python api

* feat: more wiring priority through the api

* feat: I think it works?

* feat: e2e test for priority

* it works!

* feat: expand tests for default priorities

* feat: e2e scheduling test

* chore: minor version for priority

* fix: skip broken test for now

* fix: lint

* feat: add priority columns to cron and schedule ref  tables

* feat: update inserts to include prio

* feat: wire up more apis

* feat: more wiring

* feat: wire up more rest api fields

* chore: cruft

* fix: more wiring

* fix: lint

* chore: gen + wire up priorities

* fix: increase timeout

* fix: retries

* fix: try changing fixture scope

* chore: generate

* fix: set schedule priority

* feat: priority

* fix: move priority to wf

* release: 1.2.0

* rm log

* fix: import

* fix: add priority to step

---------

Co-authored-by: mrkaye97 <mrkaye97@gmail.com>

* fix: add dummy runs to priority test to prevent race conditions

* fix: non-breaking field

* fix: gen

* feat: initial pass at docs

* feat: priority in go sdk

* feat: initial work on go example

* fix: doc examples

* fix: proofread

* chore: version

* feat: go sdk

* fix: lint

* fix: declarations and add back RunAsChild

* fix: child workflows

* fix: namespace

* fix: faster child workflows

* fix: sticky

* add back run as child

---------

Co-authored-by: Gabe Ruttner <gabriel.ruttner@gmail.com>
Co-authored-by: Alexander Belanger <alexander@hatchet.run>
2025-04-14 16:22:00 -04:00

319 lines
9.6 KiB
Python

import asyncio
from datetime import datetime, timedelta
from random import choice
from typing import AsyncGenerator, Literal
from uuid import uuid4
import pytest
import pytest_asyncio
from pydantic import BaseModel
from examples.priority.worker import DEFAULT_PRIORITY, SLEEP_TIME, priority_workflow
from hatchet_sdk import Hatchet, ScheduleTriggerWorkflowOptions, TriggerWorkflowOptions
from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus
Priority = Literal["low", "medium", "high", "default"]
class RunPriorityStartedAt(BaseModel):
priority: Priority
started_at: datetime
finished_at: datetime
def priority_to_int(priority: Priority) -> int:
match priority:
case "high":
return 3
case "medium":
return 2
case "low":
return 1
case "default":
return DEFAULT_PRIORITY
case _:
raise ValueError(f"Invalid priority: {priority}")
@pytest_asyncio.fixture(loop_scope="session", scope="function")
async def dummy_runs() -> None:
priority: Priority = "high"
await priority_workflow.aio_run_many_no_wait(
[
priority_workflow.create_bulk_run_item(
options=TriggerWorkflowOptions(
priority=(priority_to_int(priority)),
additional_metadata={
"priority": priority,
"key": ix,
"type": "dummy",
},
)
)
for ix in range(60)
]
)
await asyncio.sleep(5)
return None
@pytest.mark.asyncio()
async def test_priority(hatchet: Hatchet, dummy_runs: None) -> None:
test_run_id = str(uuid4())
choices: list[Priority] = ["low", "medium", "high", "default"]
run_refs = await priority_workflow.aio_run_many_no_wait(
[
priority_workflow.create_bulk_run_item(
options=TriggerWorkflowOptions(
priority=(priority_to_int(priority := choice(choices))),
additional_metadata={
"priority": priority,
"key": ix,
"test_run_id": test_run_id,
},
)
)
for ix in range(30)
]
)
await asyncio.gather(*[r.aio_result() for r in run_refs])
workflows = (
await hatchet.workflows.aio_list(workflow_name=priority_workflow.name)
).rows
assert workflows
workflow = next((w for w in workflows if w.name == priority_workflow.name), None)
assert workflow
assert workflow.name == priority_workflow.name
runs = await hatchet.runs.aio_list(
workflow_ids=[workflow.metadata.id],
additional_metadata={
"test_run_id": test_run_id,
},
limit=1_000,
)
runs_ids_started_ats: list[RunPriorityStartedAt] = sorted(
[
RunPriorityStartedAt(
priority=(r.additional_metadata or {}).get("priority") or "low",
started_at=r.started_at or datetime.min,
finished_at=r.finished_at or datetime.min,
)
for r in runs.rows
],
key=lambda x: x.started_at,
)
assert len(runs_ids_started_ats) == len(run_refs)
for i in range(len(runs_ids_started_ats) - 1):
curr = runs_ids_started_ats[i]
nxt = runs_ids_started_ats[i + 1]
"""Run start times should be in order of priority"""
assert priority_to_int(curr.priority) >= priority_to_int(nxt.priority)
"""Runs should proceed one at a time"""
assert curr.finished_at <= nxt.finished_at
assert nxt.finished_at >= nxt.started_at
"""Runs should finish after starting (this is mostly a test for engine datetime handling bugs)"""
assert curr.finished_at >= curr.started_at
@pytest.mark.asyncio()
async def test_priority_via_scheduling(hatchet: Hatchet, dummy_runs: None) -> None:
test_run_id = str(uuid4())
sleep_time = 3
n = 100
choices: list[Priority] = ["low", "medium", "high", "default"]
run_at = datetime.now() + timedelta(seconds=sleep_time)
versions = await asyncio.gather(
*[
priority_workflow.aio_schedule(
run_at=run_at,
options=ScheduleTriggerWorkflowOptions(
priority=(priority_to_int(priority := choice(choices))),
additional_metadata={
"priority": priority,
"key": ix,
"test_run_id": test_run_id,
},
),
)
for ix in range(n)
]
)
await asyncio.sleep(sleep_time * 2)
workflow_id = versions[0].workflow_id
attempts = 0
while True:
if attempts >= SLEEP_TIME * n * 2:
raise TimeoutError("Timed out waiting for runs to finish")
attempts += 1
await asyncio.sleep(1)
runs = await hatchet.runs.aio_list(
workflow_ids=[workflow_id],
additional_metadata={
"test_run_id": test_run_id,
},
limit=1_000,
)
if not runs.rows:
continue
if any(
r.status in [V1TaskStatus.FAILED, V1TaskStatus.CANCELLED] for r in runs.rows
):
raise ValueError("One or more runs failed or were cancelled")
if all(r.status == V1TaskStatus.COMPLETED for r in runs.rows):
break
runs_ids_started_ats: list[RunPriorityStartedAt] = sorted(
[
RunPriorityStartedAt(
priority=(r.additional_metadata or {}).get("priority") or "low",
started_at=r.started_at or datetime.min,
finished_at=r.finished_at or datetime.min,
)
for r in runs.rows
],
key=lambda x: x.started_at,
)
assert len(runs_ids_started_ats) == len(versions)
for i in range(len(runs_ids_started_ats) - 1):
curr = runs_ids_started_ats[i]
nxt = runs_ids_started_ats[i + 1]
"""Run start times should be in order of priority"""
assert priority_to_int(curr.priority) >= priority_to_int(nxt.priority)
"""Runs should proceed one at a time"""
assert curr.finished_at <= nxt.finished_at
assert nxt.finished_at >= nxt.started_at
"""Runs should finish after starting (this is mostly a test for engine datetime handling bugs)"""
assert curr.finished_at >= curr.started_at
@pytest_asyncio.fixture(loop_scope="session", scope="function")
async def crons(
hatchet: Hatchet, dummy_runs: None
) -> AsyncGenerator[tuple[str, str, int], None]:
test_run_id = str(uuid4())
choices: list[Priority] = ["low", "medium", "high"]
n = 100
crons = await asyncio.gather(
*[
hatchet.cron.aio_create(
workflow_name=priority_workflow.name,
cron_name=f"{test_run_id}-cron-{i}",
expression="* * * * *",
input={},
additional_metadata={
"trigger": "cron",
"test_run_id": test_run_id,
"priority": (priority := choice(choices)),
"key": str(i),
},
priority=(priority_to_int(priority)),
)
for i in range(n)
]
)
yield crons[0].workflow_id, test_run_id, n
await asyncio.gather(*[hatchet.cron.aio_delete(cron.metadata.id) for cron in crons])
def time_until_next_minute() -> float:
now = datetime.now()
next_minute = now.replace(second=0, microsecond=0, minute=now.minute + 1)
return (next_minute - now).total_seconds()
@pytest.mark.asyncio()
async def test_priority_via_cron(hatchet: Hatchet, crons: tuple[str, str, int]) -> None:
workflow_id, test_run_id, n = crons
await asyncio.sleep(time_until_next_minute() + 10)
attempts = 0
while True:
if attempts >= SLEEP_TIME * n * 2:
raise TimeoutError("Timed out waiting for runs to finish")
attempts += 1
await asyncio.sleep(1)
runs = await hatchet.runs.aio_list(
workflow_ids=[workflow_id],
additional_metadata={
"test_run_id": test_run_id,
},
limit=1_000,
)
if not runs.rows:
continue
if any(
r.status in [V1TaskStatus.FAILED, V1TaskStatus.CANCELLED] for r in runs.rows
):
raise ValueError("One or more runs failed or were cancelled")
if all(r.status == V1TaskStatus.COMPLETED for r in runs.rows):
break
runs_ids_started_ats: list[RunPriorityStartedAt] = sorted(
[
RunPriorityStartedAt(
priority=(r.additional_metadata or {}).get("priority") or "low",
started_at=r.started_at or datetime.min,
finished_at=r.finished_at or datetime.min,
)
for r in runs.rows
],
key=lambda x: x.started_at,
)
assert len(runs_ids_started_ats) == n
for i in range(len(runs_ids_started_ats) - 1):
curr = runs_ids_started_ats[i]
nxt = runs_ids_started_ats[i + 1]
"""Run start times should be in order of priority"""
assert priority_to_int(curr.priority) >= priority_to_int(nxt.priority)
"""Runs should proceed one at a time"""
assert curr.finished_at <= nxt.finished_at
assert nxt.finished_at >= nxt.started_at
"""Runs should finish after starting (this is mostly a test for engine datetime handling bugs)"""
assert curr.finished_at >= curr.started_at