Feat: Batched replays (#1860)

* feat: batched replays

* fix: add some comments

* feat: e2e test for bulk replays

* chore: gen

* fix: improving tests a bit

* fix: copilot

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* chore: gen

* fix: tag

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Matt Kaye
2025-06-13 13:47:50 -04:00
committed by GitHub
parent 0518a2f97f
commit a73e34cd92
19 changed files with 481 additions and 54 deletions

View File

@@ -0,0 +1,106 @@
import asyncio
from datetime import datetime, timedelta, timezone
from uuid import uuid4
import pytest
from examples.bulk_operations.worker import (
bulk_replay_test_1,
bulk_replay_test_2,
bulk_replay_test_3,
)
from hatchet_sdk import BulkCancelReplayOpts, Hatchet, RunFilter, TriggerWorkflowOptions
from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus
@pytest.mark.asyncio(loop_scope="session")
async def test_bulk_replay(hatchet: Hatchet) -> None:
test_run_id = str(uuid4())
n = 100
with pytest.raises(Exception):
await bulk_replay_test_1.aio_run_many(
[
bulk_replay_test_1.create_bulk_run_item(
options=TriggerWorkflowOptions(
additional_metadata={
"test_run_id": test_run_id,
}
)
)
for _ in range(n + 1)
]
)
with pytest.raises(Exception):
await bulk_replay_test_2.aio_run_many(
[
bulk_replay_test_2.create_bulk_run_item(
options=TriggerWorkflowOptions(
additional_metadata={
"test_run_id": test_run_id,
}
)
)
for _ in range((n // 2) - 1)
]
)
with pytest.raises(Exception):
await bulk_replay_test_3.aio_run_many(
[
bulk_replay_test_3.create_bulk_run_item(
options=TriggerWorkflowOptions(
additional_metadata={
"test_run_id": test_run_id,
}
)
)
for _ in range((n // 2) - 2)
]
)
workflow_ids = [
bulk_replay_test_1.id,
bulk_replay_test_2.id,
bulk_replay_test_3.id,
]
## Should result in two batches of replays
await hatchet.runs.aio_bulk_replay(
opts=BulkCancelReplayOpts(
filters=RunFilter(
workflow_ids=workflow_ids,
since=datetime.now(tz=timezone.utc) - timedelta(minutes=2),
additional_metadata={"test_run_id": test_run_id},
)
)
)
await asyncio.sleep(5)
runs = await hatchet.runs.aio_list(
workflow_ids=workflow_ids,
since=datetime.now(tz=timezone.utc) - timedelta(minutes=2),
additional_metadata={"test_run_id": test_run_id},
limit=1000,
)
assert len(runs.rows) == n + 1 + (n // 2 - 1) + (n // 2 - 2)
for run in runs.rows:
assert run.status == V1TaskStatus.COMPLETED
assert run.retry_count == 1
assert run.attempt == 2
assert (
len([r for r in runs.rows if r.workflow_id == bulk_replay_test_1.id]) == n + 1
)
assert (
len([r for r in runs.rows if r.workflow_id == bulk_replay_test_2.id])
== n // 2 - 1
)
assert (
len([r for r in runs.rows if r.workflow_id == bulk_replay_test_3.id])
== n // 2 - 2
)

View File

@@ -0,0 +1,37 @@
from hatchet_sdk import Context, EmptyModel, Hatchet
hatchet = Hatchet(debug=True)
@hatchet.task()
def bulk_replay_test_1(input: EmptyModel, ctx: Context) -> None:
print("retrying bulk replay test task", ctx.retry_count)
if ctx.retry_count == 0:
raise ValueError("This is a test error to trigger a retry.")
@hatchet.task()
def bulk_replay_test_2(input: EmptyModel, ctx: Context) -> None:
print("retrying bulk replay test task", ctx.retry_count)
if ctx.retry_count == 0:
raise ValueError("This is a test error to trigger a retry.")
@hatchet.task()
def bulk_replay_test_3(input: EmptyModel, ctx: Context) -> None:
print("retrying bulk replay test task", ctx.retry_count)
if ctx.retry_count == 0:
raise ValueError("This is a test error to trigger a retry.")
def main() -> None:
worker = hatchet.worker(
"bulk-replay-test-worker",
workflows=[bulk_replay_test_1, bulk_replay_test_2, bulk_replay_test_3],
)
worker.start()
if __name__ == "__main__":
main()

View File

@@ -1,5 +1,10 @@
from examples.affinity_workers.worker import affinity_worker_workflow
from examples.bulk_fanout.worker import bulk_child_wf, bulk_parent_wf
from examples.bulk_operations.worker import (
bulk_replay_test_1,
bulk_replay_test_2,
bulk_replay_test_3,
)
from examples.cancellation.worker import cancellation_workflow
from examples.concurrency_limit.worker import concurrency_limit_workflow
from examples.concurrency_limit_rr.worker import concurrency_limit_rr_workflow
@@ -57,6 +62,9 @@ def main() -> None:
lifespan_task,
simple,
simple_durable,
bulk_replay_test_1,
bulk_replay_test_2,
bulk_replay_test_3,
],
lifespan=lifespan,
)