[Python]: Batch Bulk Run Calls (#1504)

* chore: ver

* feat: auto-batch to max batch size of 1000 workflows

* fix: magic number

* feat: bulk for TS

* Revert "feat: bulk for TS"

This reverts commit edbe731bfb.
This commit is contained in:
Matt Kaye
2025-04-07 17:10:50 -04:00
committed by GitHub
parent bc72465b65
commit 76c0bdfc17
2 changed files with 70 additions and 49 deletions

View File

@@ -1,7 +1,7 @@
import asyncio
import json
from datetime import datetime
from typing import Union, cast
from typing import Generator, TypeVar, Union, cast
import grpc
from google.protobuf import timestamp_pb2
@@ -27,6 +27,10 @@ from hatchet_sdk.utils.proto_enums import convert_python_enum_to_proto
from hatchet_sdk.utils.typing import JSONSerializableMapping
from hatchet_sdk.workflow_run import WorkflowRunRef
T = TypeVar("T")
MAX_BULK_WORKFLOW_RUN_BATCH_SIZE = 1000
class ScheduleTriggerWorkflowOptions(BaseModel):
parent_id: str | None = None
@@ -328,9 +332,6 @@ class AdminClient:
input: JSONSerializableMapping,
options: TriggerWorkflowOptions = TriggerWorkflowOptions(),
) -> WorkflowRunRef:
## IMPORTANT: The `pooled_workflow_listener` must be created 1) lazily, and not at `init` time, and 2) on the
## main thread. If 1) is not followed, you'll get an error about something being attached to the wrong event
## loop. If 2) is not followed, you'll get an error about the event loop not being set up.
async with spawn_index_lock:
request = self._create_workflow_run_request(workflow_name, input, options)
@@ -353,70 +354,90 @@ class AdminClient:
config=self.config,
)
def chunk(self, xs: list[T], n: int) -> Generator[list[T], None, None]:
for i in range(0, len(xs), n):
yield xs[i : i + n]
## IMPORTANT: Keep this method's signature in sync with the wrapper in the OTel instrumentor
@tenacity_retry
def run_workflows(
self,
workflows: list[WorkflowRunTriggerConfig],
) -> list[WorkflowRunRef]:
bulk_request = v0_workflow_protos.BulkTriggerWorkflowRequest(
workflows=[
self._create_workflow_run_request(
workflow.workflow_name, workflow.input, workflow.options
)
for workflow in workflows
]
)
resp = cast(
v0_workflow_protos.BulkTriggerWorkflowResponse,
self.v0_client.BulkTriggerWorkflow(
bulk_request,
metadata=get_metadata(self.token),
),
)
return [
WorkflowRunRef(
workflow_run_id=workflow_run_id,
config=self.config,
bulk_workflows = [
self._create_workflow_run_request(
workflow.workflow_name, workflow.input, workflow.options
)
for workflow_run_id in resp.workflow_run_ids
for workflow in workflows
]
refs: list[WorkflowRunRef] = []
for chunk in self.chunk(bulk_workflows, MAX_BULK_WORKFLOW_RUN_BATCH_SIZE):
bulk_request = v0_workflow_protos.BulkTriggerWorkflowRequest(
workflows=chunk
)
resp = cast(
v0_workflow_protos.BulkTriggerWorkflowResponse,
self.v0_client.BulkTriggerWorkflow(
bulk_request,
metadata=get_metadata(self.token),
),
)
refs.extend(
[
WorkflowRunRef(
workflow_run_id=workflow_run_id,
config=self.config,
)
for workflow_run_id in resp.workflow_run_ids
]
)
return refs
@tenacity_retry
async def aio_run_workflows(
self,
workflows: list[WorkflowRunTriggerConfig],
) -> list[WorkflowRunRef]:
## IMPORTANT: The `pooled_workflow_listener` must be created 1) lazily, and not at `init` time, and 2) on the
## main thread. If 1) is not followed, you'll get an error about something being attached to the wrong event
## loop. If 2) is not followed, you'll get an error about the event loop not being set up.
async with spawn_index_lock:
bulk_request = v0_workflow_protos.BulkTriggerWorkflowRequest(
workflows=[
chunks = self.chunk(workflows, MAX_BULK_WORKFLOW_RUN_BATCH_SIZE)
refs: list[WorkflowRunRef] = []
for chunk in chunks:
async with spawn_index_lock:
bulk_workflows = [
self._create_workflow_run_request(
workflow.workflow_name, workflow.input, workflow.options
)
for workflow in workflows
for workflow in chunk
]
bulk_request = v0_workflow_protos.BulkTriggerWorkflowRequest(
workflows=bulk_workflows
)
resp = cast(
v0_workflow_protos.BulkTriggerWorkflowResponse,
self.v0_client.BulkTriggerWorkflow(
bulk_request,
metadata=get_metadata(self.token),
),
)
refs.extend(
[
WorkflowRunRef(
workflow_run_id=workflow_run_id,
config=self.config,
)
for workflow_run_id in resp.workflow_run_ids
]
)
resp = cast(
v0_workflow_protos.BulkTriggerWorkflowResponse,
self.v0_client.BulkTriggerWorkflow(
bulk_request,
metadata=get_metadata(self.token),
),
)
return [
WorkflowRunRef(
workflow_run_id=workflow_run_id,
config=self.config,
)
for workflow_run_id in resp.workflow_run_ids
]
return refs
def get_workflow_run(self, workflow_run_id: str) -> WorkflowRunRef:
return WorkflowRunRef(

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "hatchet-sdk"
version = "1.2.3"
version = "1.2.4"
description = ""
authors = ["Alexander Belanger <alexander@hatchet.run>"]
readme = "README.md"