Files
hatchet/examples/python/events/test_event.py
Matt Kaye 92e86dc163 Feat: Next UI improvements, filters improvements, Langfuse docs, tenant getter, workflow status getter (#1801)
* fix: rm method from docs button

* feat: close side panel on navigate

* feat: migration to fix pk

* fix: create index at the end

* fix: properly compute pagination response for events

* feat: add event count to query

* feat: queries

* wire up since and until queries

* fix: fe

* fix: ordering

* chore: gen

* fix: pesky zero values

* fix: rm react-table side pagination

* fix: bug

* fix: start working on pagination

* fix: refactor a bit

* fix: pagination

* feat: pagination on workflows

* fix: callbacks

* fix: key

* fix: calc

* feat: migration attempt 2

* fix: lint

* chore: gen

* fix: recreate trigger in migration

* fix: test improvement

* fix: lint

* fix: order in schema

* fix: rename indexes in partitions too

* Feat: FE Burndown, Part V (#1814)

* feat: extend eslint config to make a lot of rules much stricter

* fix: auto-fix a bunch of linting failures

* feat: start fixing a bunch of linting errors

* fix: more

* fix: run knip to remove a bunch of unused stuff

* fix: bunch more errors

* fix: bunch more

* fix: more

* fix: checkout old file

* fix: more

* fix: couple more callbacks

* fix: remaining issues

* fix: tsc

* fix: copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: copilot pushing stuff that doesn't work, as usual

* fix: more ignores

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Feat: Langfuse Integration (#1822)

* Fix: Internal blocking calls in admin client (#1818)

* fix: admin client blockages

* chore: ver

* chore: changelog

* Revert "fix: rm langfuse for now"

This reverts commit 493566a307.

* Revert "fix: rm langfuse trace pic"

This reverts commit fb689f4c50.

* fix: pre

* chore(deps): bump golang.org/x/crypto from 0.38.0 to 0.39.0 (#1827)

Bumps [golang.org/x/crypto](https://github.com/golang/crypto) from 0.38.0 to 0.39.0.
- [Commits](https://github.com/golang/crypto/compare/v0.38.0...v0.39.0)

---
updated-dependencies:
- dependency-name: golang.org/x/crypto
  dependency-version: 0.39.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump google.golang.org/grpc from 1.72.2 to 1.73.0 (#1828)

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.72.2 to 1.73.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.72.2...v1.73.0)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-version: 1.73.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump golang.org/x/text from 0.25.0 to 0.26.0 (#1829)

Bumps [golang.org/x/text](https://github.com/golang/text) from 0.25.0 to 0.26.0.
- [Release notes](https://github.com/golang/text/releases)
- [Commits](https://github.com/golang/text/compare/v0.25.0...v0.26.0)

---
updated-dependencies:
- dependency-name: golang.org/x/text
  dependency-version: 0.26.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump golang.org/x/sync from 0.14.0 to 0.15.0 (#1830)

Bumps [golang.org/x/sync](https://github.com/golang/sync) from 0.14.0 to 0.15.0.
- [Commits](https://github.com/golang/sync/compare/v0.14.0...v0.15.0)

---
updated-dependencies:
- dependency-name: golang.org/x/sync
  dependency-version: 0.15.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump golang.org/x/time from 0.11.0 to 0.12.0 (#1831)

Bumps [golang.org/x/time](https://github.com/golang/time) from 0.11.0 to 0.12.0.
- [Commits](https://github.com/golang/time/compare/v0.11.0...v0.12.0)

---
updated-dependencies:
- dependency-name: golang.org/x/time
  dependency-version: 0.12.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* chore(deps): bump hatchet-sdk in /examples/python/quickstart (#1832)

Bumps hatchet-sdk from 1.11.0 to 1.11.1.

---
updated-dependencies:
- dependency-name: hatchet-sdk
  dependency-version: 1.11.1
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* fix: update langfuse docs / examples to agree with their recommendations

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Feat: Filter + Events Improvements (#1806)

* feat: query for listing filters on a workflow

* feat: first pass at new filtering logic

* feat: event key wildcards

* fix: typo

* fix: write wildcard on event ref insert

* feat: tests for wildcard

* chore: gen

* feat: working wildcards

* fix: test cruft

* fix: tests

* fix: tests

* fix: tests

* feat: improve wildcard handling

* fix: missed a payload include spot

* feat: extend tests more

* feat: extend test more

* fix: flakiness

* feat: add scope col to events

* feat: write scopes into db with events

* fix: god I hate zero values

* chore: gen, lint, etc.

* fix: try wrapping yield in try / finally for robustness

* fix: typing

* fix: simplify

* fix: migration ver

* Feat: Tenant getter + corresponding SDK warnings (#1809)

* feat: tenant get endpoint

* feat: impl for tenant

* chore: gen

* feat: Python impl

* feat: scary warning

* chore: lint

* fix: try / except

* feat: ts client

* feat: go

* chore: versions

* Update sdks/python/hatchet_sdk/hatchet.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update sdks/typescript/src/v1/client/client.ts

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: fmt

* fix: description

* fix: review

* fix: changelog

* chore: gen

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Feat: Workflow run status getter on the API (#1808)

* feat: api for status getter

* feat: api

* feat: sdk

* chore: gen python

* chore: gen ts

* fix: simplify api

* chore: gen

* chore: rm unused python

* chore: vers

* fix: pr feedback

* chore: gen

* chore: gen

* chore: gen

* Feat: Filter Updates (#1840)

* feat: api

* feat: impl

* fix: patch

* chore: gen

* feat: python

* chore: changelog

* feat: ts

* feat: go

* fix: copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Feat: Declaratively create filters on worker start (#1825)

* feat: dynamically create filters when workflow is registered

* fix: python utc timestamps everywhere

* refactor: fix up runs list a bit

* refactor: move more methods under shared BaseWorkflow

* feat: register default filters

* fix: docs

* chore: gen, docs

* chore: lint

* fix: v1

* fix: add filter to proto

* feat: implement default filters on put workflow

* feat: fix go impl

* chore: gen py

* feat: wire up Python

* fix: rm cruft

* fix: ts

* fix: bugs

* chore: gen, versions

* feat: changelog

* chore: lock

* fix: go

* fix: rm cruft

* fix: lockfile

* feat: add is_declarative flag to filters

* feat: extend filter insert queries

* feat: bulk upsert filters on workflow version create

* fix: wire up declarative stuff

* fix: mutexing issue

* feat: wire up declarative filters

* feat: naming

* chore: gen

* fix: nullable payloads

* fix: check json validity

* fix: rm on conflict

* fix: query naming + declarative-ness handling

* fix: rm payload from error

* fix: versions and such

* chore: gen

* Feat: Filtering + Events Docs Revamp + SDK Tweaks (#1843)

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-06-10 11:48:31 -04:00

529 lines
15 KiB
Python

import asyncio
import json
from contextlib import asynccontextmanager
from datetime import datetime, timedelta, timezone
from typing import AsyncGenerator, cast
from uuid import uuid4
import pytest
from pydantic import BaseModel
from examples.events.worker import (
EVENT_KEY,
SECONDARY_KEY,
WILDCARD_KEY,
EventWorkflowInput,
event_workflow,
)
from hatchet_sdk.clients.events import (
BulkPushEventOptions,
BulkPushEventWithMetadata,
PushEventOptions,
)
from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus
from hatchet_sdk.clients.rest.models.v1_task_summary import V1TaskSummary
from hatchet_sdk.contracts.events_pb2 import Event
from hatchet_sdk.hatchet import Hatchet
class ProcessedEvent(BaseModel):
id: str
payload: dict[str, str | bool]
meta: dict[str, str | bool | int]
should_have_runs: bool
test_run_id: str
def __hash__(self) -> int:
return hash(self.model_dump_json())
@asynccontextmanager
async def event_filter(
hatchet: Hatchet,
test_run_id: str,
expression: str | None = None,
payload: dict[str, str] = {},
) -> AsyncGenerator[None, None]:
expression = (
expression
or f"input.should_skip == false && payload.test_run_id == '{test_run_id}'"
)
f = await hatchet.filters.aio_create(
workflow_id=event_workflow.id,
expression=expression,
scope=test_run_id,
payload={"test_run_id": test_run_id, **payload},
)
try:
yield
finally:
await hatchet.filters.aio_delete(f.metadata.id)
async def fetch_runs_for_event(
hatchet: Hatchet, event: Event
) -> tuple[ProcessedEvent, list[V1TaskSummary]]:
runs = await hatchet.runs.aio_list(triggering_event_external_id=event.eventId)
meta = (
cast(dict[str, str | int | bool], json.loads(event.additionalMetadata))
if event.additionalMetadata
else {}
)
payload = (
cast(dict[str, str | bool], json.loads(event.payload)) if event.payload else {}
)
processed_event = ProcessedEvent(
id=event.eventId,
payload=payload,
meta=meta,
should_have_runs=meta.get("should_have_runs", False) is True,
test_run_id=cast(str, meta["test_run_id"]),
)
if not all([r.output for r in runs.rows]):
return (processed_event, [])
return (
processed_event,
runs.rows or [],
)
async def wait_for_result(
hatchet: Hatchet, events: list[Event]
) -> dict[ProcessedEvent, list[V1TaskSummary]]:
await asyncio.sleep(3)
since = datetime.now(tz=timezone.utc) - timedelta(minutes=2)
persisted = (await hatchet.event.aio_list(limit=100, since=since)).rows or []
assert {e.eventId for e in events}.issubset({e.metadata.id for e in persisted})
iters = 0
while True:
print("Waiting for event runs to complete...")
if iters > 15:
print("Timed out waiting for event runs to complete.")
return {
ProcessedEvent(
id=event.eventId,
payload=json.loads(event.payload) if event.payload else {},
meta=(
json.loads(event.additionalMetadata)
if event.additionalMetadata
else {}
),
should_have_runs=False,
test_run_id=cast(
str, json.loads(event.additionalMetadata).get("test_run_id", "")
),
): []
for event in events
}
iters += 1
event_runs = await asyncio.gather(
*[fetch_runs_for_event(hatchet, event) for event in events]
)
all_empty = all(not event_run for _, event_run in event_runs)
if all_empty:
await asyncio.sleep(1)
continue
event_id_to_runs = {event_id: runs for (event_id, runs) in event_runs}
any_queued_or_running = any(
run.status in [V1TaskStatus.QUEUED, V1TaskStatus.RUNNING]
for runs in event_id_to_runs.values()
for run in runs
)
if any_queued_or_running:
await asyncio.sleep(1)
continue
break
return event_id_to_runs
async def wait_for_result_and_assert(hatchet: Hatchet, events: list[Event]) -> None:
event_to_runs = await wait_for_result(hatchet, events)
for event, runs in event_to_runs.items():
await assert_event_runs_processed(event, runs)
async def assert_event_runs_processed(
event: ProcessedEvent,
runs: list[V1TaskSummary],
) -> None:
runs = [
run
for run in runs
if (run.additional_metadata or {}).get("hatchet__event_id") == event.id
]
if event.should_have_runs:
assert len(runs) > 0
for run in runs:
assert run.status == V1TaskStatus.COMPLETED
assert run.output.get("test_run_id") == event.test_run_id
else:
assert len(runs) == 0
def bpi(
index: int = 1,
test_run_id: str = "",
should_skip: bool = False,
should_have_runs: bool = True,
key: str = EVENT_KEY,
payload: dict[str, str] = {},
scope: str | None = None,
) -> BulkPushEventWithMetadata:
return BulkPushEventWithMetadata(
key=key,
payload={
"should_skip": should_skip,
**payload,
},
additional_metadata={
"should_have_runs": should_have_runs,
"test_run_id": test_run_id,
"key": index,
},
scope=scope,
)
def cp(should_skip: bool) -> dict[str, bool]:
return EventWorkflowInput(should_skip=should_skip).model_dump()
@pytest.mark.asyncio(loop_scope="session")
async def test_event_push(hatchet: Hatchet) -> None:
e = hatchet.event.push(EVENT_KEY, cp(False))
assert e.eventId is not None
@pytest.mark.asyncio(loop_scope="session")
async def test_async_event_push(hatchet: Hatchet) -> None:
e = await hatchet.event.aio_push(EVENT_KEY, cp(False))
assert e.eventId is not None
@pytest.mark.asyncio(loop_scope="session")
async def test_async_event_bulk_push(hatchet: Hatchet) -> None:
events = [
BulkPushEventWithMetadata(
key="event1",
payload={"message": "This is event 1", "should_skip": False},
additional_metadata={"source": "test", "user_id": "user123"},
),
BulkPushEventWithMetadata(
key="event2",
payload={"message": "This is event 2", "should_skip": False},
additional_metadata={"source": "test", "user_id": "user456"},
),
BulkPushEventWithMetadata(
key="event3",
payload={"message": "This is event 3", "should_skip": False},
additional_metadata={"source": "test", "user_id": "user789"},
),
]
opts = BulkPushEventOptions(namespace="bulk-test")
e = await hatchet.event.aio_bulk_push(events, opts)
assert len(e) == 3
# Sort both lists of events by their key to ensure comparison order
sorted_events = sorted(events, key=lambda x: x.key)
sorted_returned_events = sorted(e, key=lambda x: x.key)
namespace = "bulk-test"
# Check that the returned events match the original events
for original_event, returned_event in zip(sorted_events, sorted_returned_events):
assert returned_event.key == namespace + original_event.key
@pytest.fixture(scope="function")
def test_run_id() -> str:
return str(uuid4())
@pytest.mark.asyncio(loop_scope="session")
async def test_event_engine_behavior(hatchet: Hatchet) -> None:
test_run_id = str(uuid4())
events = [
bpi(
test_run_id=test_run_id,
),
bpi(
test_run_id=test_run_id,
key="thisisafakeeventfoobarbaz",
should_have_runs=False,
),
]
result = await hatchet.event.aio_bulk_push(events)
await wait_for_result_and_assert(hatchet, result)
def gen_bulk_events(test_run_id: str) -> list[BulkPushEventWithMetadata]:
return [
## No scope, so it shouldn't have any runs
bpi(
index=1,
test_run_id=test_run_id,
should_skip=False,
should_have_runs=False,
),
## No scope, so it shouldn't have any runs
bpi(
index=2,
test_run_id=test_run_id,
should_skip=True,
should_have_runs=False,
),
## Scope is set and `should_skip` is False, so it should have runs
bpi(
index=3,
test_run_id=test_run_id,
should_skip=False,
should_have_runs=True,
scope=test_run_id,
),
## Scope is set and `should_skip` is True, so it shouldn't have runs
bpi(
index=4,
test_run_id=test_run_id,
should_skip=True,
should_have_runs=False,
scope=test_run_id,
),
## Scope is set, `should_skip` is False, but key is different, so it shouldn't have runs
bpi(
index=5,
test_run_id=test_run_id,
should_skip=True,
should_have_runs=False,
scope=test_run_id,
key="thisisafakeeventfoobarbaz",
),
## Scope is set, `should_skip` is False, but key is different, so it shouldn't have runs
bpi(
index=6,
test_run_id=test_run_id,
should_skip=False,
should_have_runs=False,
scope=test_run_id,
key="thisisafakeeventfoobarbaz",
),
]
@pytest.mark.asyncio(loop_scope="session")
async def test_event_skipping_filtering(hatchet: Hatchet, test_run_id: str) -> None:
async with event_filter(hatchet, test_run_id):
events = gen_bulk_events(test_run_id)
result = await hatchet.event.aio_bulk_push(events)
await wait_for_result_and_assert(hatchet, result)
async def bulk_to_single(hatchet: Hatchet, event: BulkPushEventWithMetadata) -> Event:
return await hatchet.event.aio_push(
event_key=event.key,
payload=event.payload,
options=PushEventOptions(
scope=event.scope,
additional_metadata=event.additional_metadata,
priority=event.priority,
),
)
@pytest.mark.asyncio(loop_scope="session")
async def test_event_skipping_filtering_no_bulk(
hatchet: Hatchet, test_run_id: str
) -> None:
async with event_filter(hatchet, test_run_id):
raw_events = gen_bulk_events(test_run_id)
events = await asyncio.gather(
*[bulk_to_single(hatchet, event) for event in raw_events]
)
await wait_for_result_and_assert(hatchet, events)
@pytest.mark.asyncio(loop_scope="session")
async def test_event_payload_filtering(hatchet: Hatchet, test_run_id: str) -> None:
async with event_filter(
hatchet,
test_run_id,
"input.should_skip == false && payload.foobar == 'baz'",
{"foobar": "qux"},
):
event = await hatchet.event.aio_push(
event_key=EVENT_KEY,
payload={"message": "This is event 1", "should_skip": False},
options=PushEventOptions(
scope=test_run_id,
additional_metadata={
"should_have_runs": False,
"test_run_id": test_run_id,
"key": 1,
},
),
)
await wait_for_result_and_assert(hatchet, [event])
@pytest.mark.asyncio(loop_scope="session")
async def test_event_payload_filtering_with_payload_match(
hatchet: Hatchet, test_run_id: str
) -> None:
async with event_filter(
hatchet,
test_run_id,
"input.should_skip == false && payload.foobar == 'baz'",
{"foobar": "baz"},
):
event = await hatchet.event.aio_push(
event_key=EVENT_KEY,
payload={"message": "This is event 1", "should_skip": False},
options=PushEventOptions(
scope=test_run_id,
additional_metadata={
"should_have_runs": True,
"test_run_id": test_run_id,
"key": 1,
},
),
)
await wait_for_result_and_assert(hatchet, [event])
@pytest.mark.asyncio(loop_scope="session")
async def test_filtering_by_event_key(hatchet: Hatchet, test_run_id: str) -> None:
async with event_filter(
hatchet,
test_run_id,
f"event_key == '{SECONDARY_KEY}'",
):
event_1 = await hatchet.event.aio_push(
event_key=SECONDARY_KEY,
payload={
"message": "Should run because filter matches",
"should_skip": False,
},
options=PushEventOptions(
scope=test_run_id,
additional_metadata={
"should_have_runs": True,
"test_run_id": test_run_id,
},
),
)
event_2 = await hatchet.event.aio_push(
event_key=EVENT_KEY,
payload={
"message": "Should skip because filter does not match",
"should_skip": False,
},
options=PushEventOptions(
scope=test_run_id,
additional_metadata={
"should_have_runs": False,
"test_run_id": test_run_id,
},
),
)
await wait_for_result_and_assert(hatchet, [event_1, event_2])
@pytest.mark.asyncio(loop_scope="session")
async def test_key_wildcards(hatchet: Hatchet, test_run_id: str) -> None:
keys = [
WILDCARD_KEY.replace("*", "1"),
WILDCARD_KEY.replace("*", "2"),
"foobar",
EVENT_KEY,
]
async with event_filter(
hatchet,
test_run_id,
):
events = [
await hatchet.event.aio_push(
event_key=key,
payload={
"should_skip": False,
},
options=PushEventOptions(
scope=test_run_id,
additional_metadata={
"should_have_runs": key != "foobar",
"test_run_id": test_run_id,
},
),
)
for key in keys
]
await wait_for_result_and_assert(hatchet, events)
@pytest.mark.asyncio(loop_scope="session")
async def test_multiple_runs_for_multiple_scope_matches(
hatchet: Hatchet, test_run_id: str
) -> None:
async with event_filter(
hatchet, test_run_id, payload={"filter_id": "1"}, expression="1 == 1"
):
async with event_filter(
hatchet, test_run_id, payload={"filter_id": "2"}, expression="2 == 2"
):
event = await hatchet.event.aio_push(
event_key=EVENT_KEY,
payload={
"should_skip": False,
},
options=PushEventOptions(
scope=test_run_id,
additional_metadata={
"should_have_runs": True,
"test_run_id": test_run_id,
},
),
)
event_to_runs = await wait_for_result(hatchet, [event])
assert len(event_to_runs.keys()) == 1
runs = list(event_to_runs.values())[0]
assert len(runs) == 2
assert {r.output.get("filter_id") for r in runs} == {"1", "2"}