Files
hatchet/examples/python/priority/test_priority.py
Matt Kaye 92e86dc163 Feat: Next UI improvements, filters improvements, Langfuse docs, tenant getter, workflow status getter (#1801)
* fix: rm method from docs button

* feat: close side panel on navigate

* feat: migration to fix pk

* fix: create index at the end

* fix: properly compute pagination response for events

* feat: add event count to query

* feat: queries

* wire up since and until queries

* fix: fe

* fix: ordering

* chore: gen

* fix: pesky zero values

* fix: rm react-table side pagination

* fix: bug

* fix: start working on pagination

* fix: refactor a bit

* fix: pagination

* feat: pagination on workflows

* fix: callbacks

* fix: key

* fix: calc

* feat: migration attempt 2

* fix: lint

* chore: gen

* fix: recreate trigger in migration

* fix: test improvement

* fix: lint

* fix: order in schema

* fix: rename indexes in partitions too

* Feat: FE Burndown, Part V (#1814)

* feat: extend eslint config to make a lot of rules much stricter

* fix: auto-fix a bunch of linting failures

* feat: start fixing a bunch of linting errors

* fix: more

* fix: run knip to remove a bunch of unused stuff

* fix: bunch more errors

* fix: bunch more

* fix: more

* fix: checkout old file

* fix: more

* fix: couple more callbacks

* fix: remaining issues

* fix: tsc

* fix: copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: copilot pushing stuff that doesn't work, as usual

* fix: more ignores

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Feat: Langfuse Integration (#1822)

* Fix: Internal blocking calls in admin client (#1818)

* fix: admin client blockages

* chore: ver

* chore: changelog

* Revert "fix: rm langfuse for now"

This reverts commit 493566a307.

* Revert "fix: rm langfuse trace pic"

This reverts commit fb689f4c50.

* fix: pre

* chore(deps): bump golang.org/x/crypto from 0.38.0 to 0.39.0 (#1827)

Bumps [golang.org/x/crypto](https://github.com/golang/crypto) from 0.38.0 to 0.39.0.
- [Commits](https://github.com/golang/crypto/compare/v0.38.0...v0.39.0)

---
updated-dependencies:
- dependency-name: golang.org/x/crypto
  dependency-version: 0.39.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump google.golang.org/grpc from 1.72.2 to 1.73.0 (#1828)

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.72.2 to 1.73.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.72.2...v1.73.0)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-version: 1.73.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump golang.org/x/text from 0.25.0 to 0.26.0 (#1829)

Bumps [golang.org/x/text](https://github.com/golang/text) from 0.25.0 to 0.26.0.
- [Release notes](https://github.com/golang/text/releases)
- [Commits](https://github.com/golang/text/compare/v0.25.0...v0.26.0)

---
updated-dependencies:
- dependency-name: golang.org/x/text
  dependency-version: 0.26.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump golang.org/x/sync from 0.14.0 to 0.15.0 (#1830)

Bumps [golang.org/x/sync](https://github.com/golang/sync) from 0.14.0 to 0.15.0.
- [Commits](https://github.com/golang/sync/compare/v0.14.0...v0.15.0)

---
updated-dependencies:
- dependency-name: golang.org/x/sync
  dependency-version: 0.15.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump golang.org/x/time from 0.11.0 to 0.12.0 (#1831)

Bumps [golang.org/x/time](https://github.com/golang/time) from 0.11.0 to 0.12.0.
- [Commits](https://github.com/golang/time/compare/v0.11.0...v0.12.0)

---
updated-dependencies:
- dependency-name: golang.org/x/time
  dependency-version: 0.12.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump hatchet-sdk in /examples/python/quickstart (#1832)

Bumps hatchet-sdk from 1.11.0 to 1.11.1.

---
updated-dependencies:
- dependency-name: hatchet-sdk
  dependency-version: 1.11.1
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* fix: update langfuse docs / examples to agree with their recommendations

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Feat: Filter + Events Improvements (#1806)

* feat: query for listing filters on a workflow

* feat: first pass at new filtering logic

* feat: event key wildcards

* fix: typo

* fix: write wildcard on event ref insert

* feat: tests for wildcard

* chore: gen

* feat: working wildcards

* fix: test cruft

* fix: tests

* fix: tests

* fix: tests

* feat: improve wildcard handling

* fix: missed a payload include spot

* feat: extend tests more

* feat: extend test more

* fix: flakiness

* feat: add scope col to events

* feat: write scopes into db with events

* fix: god I hate zero values

* chore: gen, lint, etc.

* fix: try wrapping yield in try / finally for robustness

* fix: typing

* fix: simplify

* fix: migration ver

* Feat: Tenant getter + corresponding SDK warnings (#1809)

* feat: tenant get endpoint

* feat: impl for tenant

* chore: gen

* feat: Python impl

* feat: scary warning

* chore: lint

* fix: try / except

* feat: ts client

* feat: go

* chore: versions

* Update sdks/python/hatchet_sdk/hatchet.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update sdks/typescript/src/v1/client/client.ts

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: fmt

* fix: description

* fix: review

* fix: changelog

* chore: gen

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Feat: Workflow run status getter on the API (#1808)

* feat: api for status getter

* feat: api

* feat: sdk

* chore: gen python

* chore: gen ts

* fix: simplify api

* chore: gen

* chore: rm unused python

* chore: vers

* fix: pr feedback

* chore: gen

* chore: gen

* chore: gen

* Feat: Filter Updates (#1840)

* feat: api

* feat: impl

* fix: patch

* chore: gen

* feat: python

* chore: changelog

* feat: ts

* feat: go

* fix: copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Feat: Declaratively create filters on worker start (#1825)

* feat: dynamically create filters when workflow is registered

* fix: python utc timestamps everywhere

* refactor: fix up runs list a bit

* refactor: move more methods under shared BaseWorkflow

* feat: register default filters

* fix: docs

* chore: gen, docs

* chore: lint

* fix: v1

* fix: add filter to proto

* feat: implement default filters on put workflow

* feat: fix go impl

* chore: gen py

* feat: wire up Python

* fix: rm cruft

* fix: ts

* fix: bugs

* chore: gen, versions

* feat: changelog

* chore: lock

* fix: go

* fix: rm cruft

* fix: lockfile

* feat: add is_declarative flag to filters

* feat: extend filter insert queries

* feat: bulk upsert filters on workflow version create

* fix: wire up declarative stuff

* fix: mutexing issue

* feat: wire up declarative filters

* feat: naming

* chore: gen

* fix: nullable payloads

* fix: check json validity

* fix: rm on conflict

* fix: query naming + declarative-ness handling

* fix: rm payload from error

* fix: versions and such

* chore: gen

* Feat: Filtering + Events Docs Revamp + SDK Tweaks (#1843)

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-06-10 11:48:31 -04:00

361 lines
11 KiB
Python

import asyncio
from datetime import datetime, timedelta, timezone
from random import choice
from subprocess import Popen
from typing import Any, AsyncGenerator, Literal
from uuid import uuid4
import pytest
import pytest_asyncio
from pydantic import BaseModel
from examples.priority.worker import DEFAULT_PRIORITY, SLEEP_TIME, priority_workflow
from hatchet_sdk import Hatchet, ScheduleTriggerWorkflowOptions, TriggerWorkflowOptions
from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus
Priority = Literal["low", "medium", "high", "default"]
class RunPriorityStartedAt(BaseModel):
priority: Priority
started_at: datetime
finished_at: datetime
def priority_to_int(priority: Priority) -> int:
match priority:
case "high":
return 3
case "medium":
return 2
case "low":
return 1
case "default":
return DEFAULT_PRIORITY
case _:
raise ValueError(f"Invalid priority: {priority}")
@pytest_asyncio.fixture(loop_scope="session", scope="function")
async def dummy_runs() -> None:
priority: Priority = "high"
await priority_workflow.aio_run_many_no_wait(
[
priority_workflow.create_bulk_run_item(
options=TriggerWorkflowOptions(
priority=(priority_to_int(priority)),
additional_metadata={
"priority": priority,
"key": ix,
"type": "dummy",
},
)
)
for ix in range(40)
]
)
await asyncio.sleep(3)
return None
@pytest.mark.parametrize(
"on_demand_worker",
[
(
["poetry", "run", "python", "examples/priority/worker.py", "--slots", "1"],
8003,
)
],
indirect=True,
)
@pytest.mark.asyncio(loop_scope="session")
async def test_priority(
hatchet: Hatchet, dummy_runs: None, on_demand_worker: Popen[Any]
) -> None:
test_run_id = str(uuid4())
choices: list[Priority] = ["low", "medium", "high", "default"]
N = 30
run_refs = await priority_workflow.aio_run_many_no_wait(
[
priority_workflow.create_bulk_run_item(
options=TriggerWorkflowOptions(
priority=(priority_to_int(priority := choice(choices))),
additional_metadata={
"priority": priority,
"key": ix,
"test_run_id": test_run_id,
},
)
)
for ix in range(N)
]
)
await asyncio.gather(*[r.aio_result() for r in run_refs])
workflows = (
await hatchet.workflows.aio_list(workflow_name=priority_workflow.name)
).rows
assert workflows
workflow = next((w for w in workflows if w.name == priority_workflow.name), None)
assert workflow
assert workflow.name == priority_workflow.name
runs = await hatchet.runs.aio_list(
workflow_ids=[workflow.metadata.id],
additional_metadata={
"test_run_id": test_run_id,
},
limit=1_000,
)
runs_ids_started_ats: list[RunPriorityStartedAt] = sorted(
[
RunPriorityStartedAt(
priority=(r.additional_metadata or {}).get("priority") or "low",
started_at=r.started_at or datetime.min,
finished_at=r.finished_at or datetime.min,
)
for r in runs.rows
],
key=lambda x: x.started_at,
)
assert len(runs_ids_started_ats) == len(run_refs)
assert len(runs_ids_started_ats) == N
for i in range(len(runs_ids_started_ats) - 1):
curr = runs_ids_started_ats[i]
nxt = runs_ids_started_ats[i + 1]
"""Run start times should be in order of priority"""
assert priority_to_int(curr.priority) >= priority_to_int(nxt.priority)
"""Runs should proceed one at a time"""
assert curr.finished_at <= nxt.finished_at
assert nxt.finished_at >= nxt.started_at
"""Runs should finish after starting (this is mostly a test for engine datetime handling bugs)"""
assert curr.finished_at >= curr.started_at
@pytest.mark.parametrize(
"on_demand_worker",
[
(
["poetry", "run", "python", "examples/priority/worker.py", "--slots", "1"],
8003,
)
],
indirect=True,
)
@pytest.mark.asyncio(loop_scope="session")
async def test_priority_via_scheduling(
hatchet: Hatchet, dummy_runs: None, on_demand_worker: Popen[Any]
) -> None:
test_run_id = str(uuid4())
sleep_time = 3
n = 30
choices: list[Priority] = ["low", "medium", "high", "default"]
run_at = datetime.now(tz=timezone.utc) + timedelta(seconds=sleep_time)
versions = await asyncio.gather(
*[
priority_workflow.aio_schedule(
run_at=run_at,
options=ScheduleTriggerWorkflowOptions(
priority=(priority_to_int(priority := choice(choices))),
additional_metadata={
"priority": priority,
"key": ix,
"test_run_id": test_run_id,
},
),
)
for ix in range(n)
]
)
await asyncio.sleep(sleep_time * 2)
workflow_id = versions[0].workflow_id
attempts = 0
while True:
if attempts >= SLEEP_TIME * n * 2:
raise TimeoutError("Timed out waiting for runs to finish")
attempts += 1
await asyncio.sleep(1)
runs = await hatchet.runs.aio_list(
workflow_ids=[workflow_id],
additional_metadata={
"test_run_id": test_run_id,
},
limit=1_000,
)
if not runs.rows:
continue
if any(
r.status in [V1TaskStatus.FAILED, V1TaskStatus.CANCELLED] for r in runs.rows
):
raise ValueError("One or more runs failed or were cancelled")
if all(r.status == V1TaskStatus.COMPLETED for r in runs.rows):
break
runs_ids_started_ats: list[RunPriorityStartedAt] = sorted(
[
RunPriorityStartedAt(
priority=(r.additional_metadata or {}).get("priority") or "low",
started_at=r.started_at or datetime.min,
finished_at=r.finished_at or datetime.min,
)
for r in runs.rows
],
key=lambda x: x.started_at,
)
assert len(runs_ids_started_ats) == len(versions)
for i in range(len(runs_ids_started_ats) - 1):
curr = runs_ids_started_ats[i]
nxt = runs_ids_started_ats[i + 1]
"""Run start times should be in order of priority"""
assert priority_to_int(curr.priority) >= priority_to_int(nxt.priority)
"""Runs should proceed one at a time"""
assert curr.finished_at <= nxt.finished_at
assert nxt.finished_at >= nxt.started_at
"""Runs should finish after starting (this is mostly a test for engine datetime handling bugs)"""
assert curr.finished_at >= curr.started_at
@pytest_asyncio.fixture(loop_scope="session", scope="function")
async def crons(
hatchet: Hatchet, dummy_runs: None
) -> AsyncGenerator[tuple[str, str, int], None]:
test_run_id = str(uuid4())
choices: list[Priority] = ["low", "medium", "high"]
n = 30
crons = await asyncio.gather(
*[
hatchet.cron.aio_create(
workflow_name=priority_workflow.name,
cron_name=f"{test_run_id}-cron-{i}",
expression="* * * * *",
input={},
additional_metadata={
"trigger": "cron",
"test_run_id": test_run_id,
"priority": (priority := choice(choices)),
"key": str(i),
},
priority=(priority_to_int(priority)),
)
for i in range(n)
]
)
yield crons[0].workflow_id, test_run_id, n
await asyncio.gather(*[hatchet.cron.aio_delete(cron.metadata.id) for cron in crons])
def time_until_next_minute() -> float:
now = datetime.now(tz=timezone.utc)
next_minute = (now + timedelta(minutes=1)).replace(second=0, microsecond=0)
return (next_minute - now).total_seconds()
@pytest.mark.skip(
reason="Test is flaky because the first jobs that are picked up don't necessarily go in priority order"
)
@pytest.mark.parametrize(
"on_demand_worker",
[
(
["poetry", "run", "python", "examples/priority/worker.py", "--slots", "1"],
8003,
)
],
indirect=True,
)
@pytest.mark.asyncio(loop_scope="session")
async def test_priority_via_cron(
hatchet: Hatchet, crons: tuple[str, str, int], on_demand_worker: Popen[Any]
) -> None:
workflow_id, test_run_id, n = crons
await asyncio.sleep(time_until_next_minute() + 10)
attempts = 0
while True:
if attempts >= SLEEP_TIME * n * 2:
raise TimeoutError("Timed out waiting for runs to finish")
attempts += 1
await asyncio.sleep(1)
runs = await hatchet.runs.aio_list(
workflow_ids=[workflow_id],
additional_metadata={
"test_run_id": test_run_id,
},
limit=1_000,
)
if not runs.rows:
continue
if any(
r.status in [V1TaskStatus.FAILED, V1TaskStatus.CANCELLED] for r in runs.rows
):
raise ValueError("One or more runs failed or were cancelled")
if all(r.status == V1TaskStatus.COMPLETED for r in runs.rows):
break
runs_ids_started_ats: list[RunPriorityStartedAt] = sorted(
[
RunPriorityStartedAt(
priority=(r.additional_metadata or {}).get("priority") or "low",
started_at=r.started_at or datetime.min,
finished_at=r.finished_at or datetime.min,
)
for r in runs.rows
],
key=lambda x: x.started_at,
)
assert len(runs_ids_started_ats) == n
for i in range(len(runs_ids_started_ats) - 1):
curr = runs_ids_started_ats[i]
nxt = runs_ids_started_ats[i + 1]
"""Run start times should be in order of priority"""
assert priority_to_int(curr.priority) >= priority_to_int(nxt.priority)
"""Runs should proceed one at a time"""
assert curr.finished_at <= nxt.finished_at
assert nxt.finished_at >= nxt.started_at
"""Runs should finish after starting (this is mostly a test for engine datetime handling bugs)"""
assert curr.finished_at >= curr.started_at