Files
hatchet/examples/python/events/test_event.py
T
Gabe Ruttner 8e80faf2d6 Fe overhaul docs (#1640)
* api changes

* doc changes

* move docs

* generated

* generate

* pkg

* backmerge main

* revert to main

* revert main

* race?

* remove go tests
2025-04-30 14:10:09 -07:00

55 lines
1.8 KiB
Python

import pytest
from hatchet_sdk.clients.events import BulkPushEventOptions, BulkPushEventWithMetadata
from hatchet_sdk.hatchet import Hatchet
@pytest.mark.asyncio(loop_scope="session")
async def test_event_push(hatchet: Hatchet) -> None:
e = hatchet.event.push("user:create", {"test": "test"})
assert e.eventId is not None
@pytest.mark.asyncio(loop_scope="session")
async def test_async_event_push(hatchet: Hatchet) -> None:
e = await hatchet.event.aio_push("user:create", {"test": "test"})
assert e.eventId is not None
@pytest.mark.asyncio(loop_scope="session")
async def test_async_event_bulk_push(hatchet: Hatchet) -> None:
events = [
BulkPushEventWithMetadata(
key="event1",
payload={"message": "This is event 1"},
additional_metadata={"source": "test", "user_id": "user123"},
),
BulkPushEventWithMetadata(
key="event2",
payload={"message": "This is event 2"},
additional_metadata={"source": "test", "user_id": "user456"},
),
BulkPushEventWithMetadata(
key="event3",
payload={"message": "This is event 3"},
additional_metadata={"source": "test", "user_id": "user789"},
),
]
opts = BulkPushEventOptions(namespace="bulk-test")
e = await hatchet.event.aio_bulk_push(events, opts)
assert len(e) == 3
# Sort both lists of events by their key to ensure comparison order
sorted_events = sorted(events, key=lambda x: x.key)
sorted_returned_events = sorted(e, key=lambda x: x.key)
namespace = "bulk-test"
# Check that the returned events match the original events
for original_event, returned_event in zip(sorted_events, sorted_returned_events):
assert returned_event.key == namespace + original_event.key