Files
hatchet/sdks/python/examples/non_retryable/test_no_retry.py
T
Matt Kaye 993817b049 [Python] Single listener base class + bug fixes + refactors (#1470)
* fix: register durable steps and workflows separately

* chore: initial copy of pooled listener

* feat: initial generic impl

* feat: use pooled listener for wf run listener

* refactor: move listeners to subdir

* feat: refactor durable event listener

* fix: bug

* feat: share single pooled workflow listener and event listener everywhere

* cruft: rm hatchet fixture

* fix: rebase issue

* feat: remove asyncio api client in favor of sync one

* chore: minor version

* proposal: crazy hack idea to make the workflow run listener work

* fix: sleeps and error handling

* Revert "cruft: rm hatchet fixture"

This reverts commit b75f625e6ccec095e8c4e294d6727db166796411.

* fix: set timeout

* fix: rm pytest-timeout

* fix: rm retry

* fix: use v1 by default

* fix: try removing retry state

* fix: try using async client?

* fix: try running sequentially

* debug: loop

* debug: maybe it's this?

* fix: lint

* fix: re-remove unused fixtures

* fix: lazily create clients in admin client

* fix: default

* fix: lazily initialize dispatcher client

* fix: hint

* fix: no. way.

* feat: add back retries in ci

* fix: clients + imports

* fix: loop scope

* debug: try running skipped tests in ci again

* Revert "debug: try running skipped tests in ci again"

This reverts commit 8d9e18150e5207ee6051d8df8a6fe2a7504c722e.

* fix: rm duped code

* refactor: rename everything as `to_proto`

* refactor: removals of `namespace` being passed around

* fix: task output stupidity

* feat: add deprecation warning

* fix: remove more unused code

* feat: mix sync and async in dag example

* fix: autouse

* fix: more input types

* feat: remove ability to pass in loop

* fix: overload key gen
2025-04-10 08:18:17 -04:00

52 lines
1.6 KiB
Python

import pytest
from examples.non_retryable.worker import (
non_retryable_workflow,
should_not_retry,
should_not_retry_successful_task,
should_retry_wrong_exception_type,
)
from hatchet_sdk import Hatchet
from hatchet_sdk.clients.rest.models.v1_task_event_type import V1TaskEventType
from hatchet_sdk.clients.rest.models.v1_workflow_run_details import V1WorkflowRunDetails
def find_id(runs: V1WorkflowRunDetails, match: str) -> str:
return next(t.metadata.id for t in runs.tasks if match in t.display_name)
@pytest.mark.asyncio(loop_scope="session")
async def test_no_retry(hatchet: Hatchet) -> None:
ref = await non_retryable_workflow.aio_run_no_wait()
with pytest.raises(Exception, match="retry"):
await ref.aio_result()
runs = await hatchet.runs.aio_get(ref.workflow_run_id)
task_to_id = {
task: find_id(runs, task.name)
for task in [
should_not_retry_successful_task,
should_retry_wrong_exception_type,
should_not_retry,
]
}
retrying_events = [
e for e in runs.task_events if e.event_type == V1TaskEventType.RETRYING
]
"""Only one task should be retried."""
assert len(retrying_events) == 1
"""The task id of the retrying events should match the tasks that are retried"""
assert {e.task_id for e in retrying_events} == {
task_to_id[should_retry_wrong_exception_type],
}
"""Three failed events should emit, one each for the two failing initial runs and one for the retry."""
assert (
len([e for e in runs.task_events if e.event_type == V1TaskEventType.FAILED])
== 3
)