Feat: Add event key to CEL env for filtering (#1748)

* feat: add event key to env

* chore: generate a bunch of stuff again

* feat: add a test for filtering by event key

* refactor: duped code

* fix: skip flaky test in ci

* Fix: Add filter payloads to context (#1743)

* feat: add payload to ctx

* chore: ver

* fix: add filter payload to go

* fix: tests + lint

* fix: lint

* fix: naming
This commit is contained in:
Matt Kaye
2025-05-19 12:52:41 -04:00
committed by GitHub
parent 400aadc317
commit 752e6bd5cf
31 changed files with 113 additions and 30 deletions

View File

@@ -7,7 +7,12 @@ from uuid import uuid4
import pytest
from pydantic import BaseModel
from examples.events.worker import EventWorkflowInput, event_workflow
from examples.events.worker import (
EVENT_KEY,
SECONDARY_KEY,
EventWorkflowInput,
event_workflow,
)
from hatchet_sdk.clients.events import (
BulkPushEventOptions,
BulkPushEventWithMetadata,
@@ -140,7 +145,7 @@ def bpi(
test_run_id: str = "",
should_skip: bool = False,
should_have_runs: bool = True,
key: str = "user:create",
key: str = EVENT_KEY,
payload: dict[str, str] = {},
scope: str | None = None,
) -> BulkPushEventWithMetadata:
@@ -165,14 +170,14 @@ def cp(should_skip: bool) -> dict[str, bool]:
@pytest.mark.asyncio(loop_scope="session")
async def test_event_push(hatchet: Hatchet) -> None:
e = hatchet.event.push("user:create", cp(False))
e = hatchet.event.push(EVENT_KEY, cp(False))
assert e.eventId is not None
@pytest.mark.asyncio(loop_scope="session")
async def test_async_event_push(hatchet: Hatchet) -> None:
e = await hatchet.event.aio_push("user:create", cp(False))
e = await hatchet.event.aio_push(EVENT_KEY, cp(False))
assert e.eventId is not None
@@ -338,7 +343,7 @@ async def test_event_payload_filtering(hatchet: Hatchet, test_run_id: str) -> No
{"foobar": "qux"},
):
event = await hatchet.event.aio_push(
event_key="user:create",
event_key=EVENT_KEY,
payload={"message": "This is event 1", "should_skip": False},
options=PushEventOptions(
scope=test_run_id,
@@ -365,7 +370,7 @@ async def test_event_payload_filtering_with_payload_match(
{"foobar": "baz"},
):
event = await hatchet.event.aio_push(
event_key="user:create",
event_key=EVENT_KEY,
payload={"message": "This is event 1", "should_skip": False},
options=PushEventOptions(
scope=test_run_id,
@@ -378,3 +383,45 @@ async def test_event_payload_filtering_with_payload_match(
)
runs = await wait_for_result(hatchet, [event])
assert len(runs) == 1
@pytest.mark.asyncio(loop_scope="session")
async def test_filtering_by_event_key(hatchet: Hatchet, test_run_id: str) -> None:
async with event_filter(
hatchet,
test_run_id,
f"event_key == '{SECONDARY_KEY}'",
):
event_1 = await hatchet.event.aio_push(
event_key=SECONDARY_KEY,
payload={
"message": "Should run because filter matches",
"should_skip": False,
},
options=PushEventOptions(
scope=test_run_id,
additional_metadata={
"should_have_runs": True,
"test_run_id": test_run_id,
},
),
)
event_2 = await hatchet.event.aio_push(
event_key=EVENT_KEY,
payload={
"message": "Should skip because filter does not match",
"should_skip": False,
},
options=PushEventOptions(
scope=test_run_id,
additional_metadata={
"should_have_runs": False,
"test_run_id": test_run_id,
},
),
)
event_to_runs = await wait_for_result(hatchet, [event_1, event_2])
for event, runs in event_to_runs.items():
await assert_event_runs_processed(event, runs)

View File

@@ -4,6 +4,7 @@ from hatchet_sdk import Context, Hatchet
hatchet = Hatchet()
EVENT_KEY = "user:create"
SECONDARY_KEY = "foobarbaz"
class EventWorkflowInput(BaseModel):
@@ -13,7 +14,7 @@ class EventWorkflowInput(BaseModel):
# > Event trigger
event_workflow = hatchet.workflow(
name="EventWorkflow",
on_events=[EVENT_KEY],
on_events=[EVENT_KEY, SECONDARY_KEY],
input_validator=EventWorkflowInput,
)
# !!

View File

@@ -283,6 +283,9 @@ def time_until_next_minute() -> float:
return (next_minute - now).total_seconds()
@pytest.mark.skip(
reason="Test is flaky because the first jobs that are picked up don't necessarily go in priority order"
)
@pytest.mark.parametrize(
"on_demand_worker",
[