feat: sdk-side workflow pause

This commit is contained in:
mrkaye97
2025-05-09 16:28:36 -04:00
parent 62dea8aca5
commit 83b4a63c20
5 changed files with 243 additions and 180 deletions
@@ -13,9 +13,13 @@ Methods:
| `aio_get` | Get a workflow by its ID. |
| `aio_get_version` | Get a workflow version by the workflow ID and an optional version. |
| `aio_list` | List all workflows in the tenant determined by the client config that match optional filters. |
| `aio_pause` | Pause a workflow by its ID. |
| `aio_unpause` | Unpause a workflow by its ID. |
| `get` | Get a workflow by its ID. |
| `get_version` | Get a workflow version by the workflow ID and an optional version. |
| `list` | List all workflows in the tenant determined by the client config that match optional filters. |
| `pause` | Pause a workflow by its ID. |
| `unpause` | Unpause a workflow by its ID. |
### Functions
@@ -70,6 +74,38 @@ Returns:
| -------------- | -------------------- |
| `WorkflowList` | A list of workflows. |
#### `aio_pause`
Pause a workflow by its ID.
Parameters:
| Name | Type | Description | Default |
| ------------- | ----- | -------------------------------- | ---------- |
| `workflow_id` | `str` | The ID of the workflow to pause. | _required_ |
Returns:
| Type | Description |
| ---------- | --------------------- |
| `Workflow` | The workflow version. |
#### `aio_unpause`
Unpause a workflow by its ID.
Parameters:
| Name | Type | Description | Default |
| ------------- | ----- | ---------------------------------- | ---------- |
| `workflow_id` | `str` | The ID of the workflow to unpause. | _required_ |
Returns:
| Type | Description |
| ---------- | --------------------- |
| `Workflow` | The workflow version. |
#### `get`
Get a workflow by its ID.
@@ -120,3 +156,35 @@ Returns:
| Type | Description |
| -------------- | -------------------- |
| `WorkflowList` | A list of workflows. |
#### `pause`
Pause a workflow by its ID.
Parameters:
| Name | Type | Description | Default |
| ------------- | ----- | -------------------------------- | ---------- |
| `workflow_id` | `str` | The ID of the workflow to pause. | _required_ |
Returns:
| Type | Description |
| ---------- | --------------------- |
| `Workflow` | The workflow version. |
#### `unpause`
Unpause a workflow by its ID.
Parameters:
| Name | Type | Description | Default |
| ------------- | ----- | ---------------------------------- | ---------- |
| `workflow_id` | `str` | The ID of the workflow to unpause. | _required_ |
Returns:
| Type | Description |
| ---------- | --------------------- |
| `Workflow` | The workflow version. |
+8
View File
@@ -32,6 +32,10 @@
- is_durable
- list_runs
- aio_list_runs
- pause
- aio_pause
- unpause
- aio_unpause
## Standalone
@@ -54,3 +58,7 @@
- is_durable
- list_runs
- aio_list_runs
- pause
- aio_pause
- unpause
- aio_unpause
@@ -5,6 +5,9 @@ from hatchet_sdk.clients.rest.api.workflow_run_api import WorkflowRunApi
from hatchet_sdk.clients.rest.api_client import ApiClient
from hatchet_sdk.clients.rest.models.workflow import Workflow
from hatchet_sdk.clients.rest.models.workflow_list import WorkflowList
from hatchet_sdk.clients.rest.models.workflow_update_request import (
WorkflowUpdateRequest,
)
from hatchet_sdk.clients.rest.models.workflow_version import WorkflowVersion
from hatchet_sdk.clients.v1.api_client import BaseRestClient
@@ -105,3 +108,55 @@ class WorkflowsClient(BaseRestClient):
:return: The workflow version.
"""
return await asyncio.to_thread(self.get_version, workflow_id, version)
def pause(self, workflow_id: str) -> Workflow:
"""
Pause a workflow by its ID.
:param workflow_id: The ID of the workflow to pause.
:return: The workflow version.
"""
with self.client() as client:
return self._wa(client).workflow_update(
workflow_id,
WorkflowUpdateRequest(
isPaused=True,
),
)
async def aio_pause(self, workflow_id: str) -> Workflow:
"""
Pause a workflow by its ID.
:param workflow_id: The ID of the workflow to pause.
:return: The workflow version.
"""
return await asyncio.to_thread(self.pause, workflow_id)
def unpause(self, workflow_id: str) -> Workflow:
"""
Unpause a workflow by its ID.
:param workflow_id: The ID of the workflow to unpause.
:return: The workflow version.
"""
with self.client() as client:
return self._wa(client).workflow_update(
workflow_id,
WorkflowUpdateRequest(
isPaused=False,
),
)
async def aio_unpause(self, workflow_id: str) -> Workflow:
"""
Unpause a workflow by its ID.
:param workflow_id: The ID of the workflow to unpause.
:return: The workflow version.
"""
return await asyncio.to_thread(self.unpause, workflow_id)
@@ -1,5 +1,4 @@
import asyncio
from datetime import datetime, timedelta
from datetime import datetime
from typing import Any, Generic, cast, get_type_hints
from hatchet_sdk.clients.admin import (
@@ -8,10 +7,7 @@ from hatchet_sdk.clients.admin import (
WorkflowRunTriggerConfig,
)
from hatchet_sdk.clients.rest.models.cron_workflows import CronWorkflows
from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus
from hatchet_sdk.clients.rest.models.v1_task_summary import V1TaskSummary
from hatchet_sdk.contracts.workflows_pb2 import WorkflowVersion
from hatchet_sdk.logger import logger
from hatchet_sdk.runnables.task import Task
from hatchet_sdk.runnables.types import EmptyModel, R, TWorkflowInput
from hatchet_sdk.runnables.workflow import BaseWorkflow, Workflow
@@ -298,88 +294,3 @@ class Standalone(BaseWorkflow[TWorkflowInput], Generic[TWorkflowInput, R]):
def to_task(self) -> Task[TWorkflowInput, R]:
return self._task
def list_runs(
self,
since: datetime | None = None,
until: datetime | None = None,
limit: int = 100,
offset: int | None = None,
statuses: list[V1TaskStatus] | None = None,
additional_metadata: dict[str, str] | None = None,
worker_id: str | None = None,
parent_task_external_id: str | None = None,
) -> list[V1TaskSummary]:
"""
List runs of the workflow.
:param since: The start time for the runs to be listed.
:param until: The end time for the runs to be listed.
:param limit: The maximum number of runs to be listed.
:param offset: The offset for pagination.
:param statuses: The statuses of the runs to be listed.
:param additional_metadata: Additional metadata for filtering the runs.
:param worker_id: The ID of the worker that ran the tasks.
:param parent_task_external_id: The external ID of the parent task.
:returns: A list of `V1TaskSummary` objects representing the runs of the workflow.
"""
workflows = self.client.workflows.list(workflow_name=self._workflow.name)
if not workflows.rows:
logger.warning(f"No runs found for {self.name}")
return []
workflow = workflows.rows[0]
response = self.client.runs.list(
workflow_ids=[workflow.metadata.id],
since=since or datetime.now() - timedelta(days=1),
only_tasks=True,
offset=offset,
limit=limit,
statuses=statuses,
until=until,
additional_metadata=additional_metadata,
worker_id=worker_id,
parent_task_external_id=parent_task_external_id,
)
return response.rows
async def aio_list_runs(
self,
since: datetime | None = None,
until: datetime | None = None,
limit: int = 100,
offset: int | None = None,
statuses: list[V1TaskStatus] | None = None,
additional_metadata: dict[str, str] | None = None,
worker_id: str | None = None,
parent_task_external_id: str | None = None,
) -> list[V1TaskSummary]:
"""
List runs of the workflow.
:param since: The start time for the runs to be listed.
:param until: The end time for the runs to be listed.
:param limit: The maximum number of runs to be listed.
:param offset: The offset for pagination.
:param statuses: The statuses of the runs to be listed.
:param additional_metadata: Additional metadata for filtering the runs.
:param worker_id: The ID of the worker that ran the tasks.
:param parent_task_external_id: The external ID of the parent task.
:returns: A list of `V1TaskSummary` objects representing the runs of the workflow.
"""
return await asyncio.to_thread(
self.list_runs,
since=since or datetime.now() - timedelta(days=1),
offset=offset,
limit=limit,
statuses=statuses,
until=until,
additional_metadata=additional_metadata,
worker_id=worker_id,
parent_task_external_id=parent_task_external_id,
)
+111 -90
View File
@@ -124,6 +124,7 @@ class BaseWorkflow(Generic[TWorkflowInput]):
self._on_failure_task: Task[TWorkflowInput, Any] | None = None
self._on_success_task: Task[TWorkflowInput, Any] | None = None
self.client = client
self._id: str | None = None
@property
def service_name(self) -> str:
@@ -279,6 +280,116 @@ class BaseWorkflow(Generic[TWorkflowInput]):
f"Input must be a BaseModel or `None`, got {type(input)} instead."
)
@property
def id(self) -> str:
if self._id:
return self._id
workflows = self.client.workflows.list(workflow_name=self.name)
if not workflows.rows:
raise ValueError(f"Workflow {self.name} not found.")
workflow = workflows.rows[0]
self._id = workflow.metadata.id
return self.id
def pause(self) -> None:
self.client.workflows.pause(self.id)
async def aio_pause(self) -> None:
await self.client.workflows.aio_pause(self.id)
def unpause(self) -> None:
self.client.workflows.unpause(self.id)
async def aio_unpause(self) -> None:
await self.client.workflows.aio_unpause(self.id)
def list_runs(
self,
since: datetime | None = None,
until: datetime | None = None,
limit: int = 100,
offset: int | None = None,
statuses: list[V1TaskStatus] | None = None,
additional_metadata: dict[str, str] | None = None,
worker_id: str | None = None,
parent_task_external_id: str | None = None,
only_tasks: bool = False,
) -> list[V1TaskSummary]:
"""
List runs of the workflow.
:param since: The start time for the runs to be listed.
:param until: The end time for the runs to be listed.
:param limit: The maximum number of runs to be listed.
:param offset: The offset for pagination.
:param statuses: The statuses of the runs to be listed.
:param additional_metadata: Additional metadata for filtering the runs.
:param worker_id: The ID of the worker that ran the tasks.
:param parent_task_external_id: The external ID of the parent task.
:param only_tasks: Whether to list only task runs.
:returns: A list of `V1TaskSummary` objects representing the runs of the workflow.
"""
response = self.client.runs.list(
workflow_ids=[self.id],
since=since or datetime.now() - timedelta(days=1),
only_tasks=only_tasks,
offset=offset,
limit=limit,
statuses=statuses,
until=until,
additional_metadata=additional_metadata,
worker_id=worker_id,
parent_task_external_id=parent_task_external_id,
)
return response.rows
async def aio_list_runs(
self,
since: datetime | None = None,
until: datetime | None = None,
limit: int = 100,
offset: int | None = None,
statuses: list[V1TaskStatus] | None = None,
additional_metadata: dict[str, str] | None = None,
worker_id: str | None = None,
parent_task_external_id: str | None = None,
only_tasks: bool = False,
) -> list[V1TaskSummary]:
"""
List runs of the workflow.
:param since: The start time for the runs to be listed.
:param until: The end time for the runs to be listed.
:param limit: The maximum number of runs to be listed.
:param offset: The offset for pagination.
:param statuses: The statuses of the runs to be listed.
:param additional_metadata: Additional metadata for filtering the runs.
:param worker_id: The ID of the worker that ran the tasks.
:param parent_task_external_id: The external ID of the parent task.
:param only_tasks: Whether to list only task runs.
:returns: A list of `V1TaskSummary` objects representing the runs of the workflow.
"""
return await asyncio.to_thread(
self.list_runs,
since=since or datetime.now() - timedelta(days=1),
only_tasks=only_tasks,
offset=offset,
limit=limit,
statuses=statuses,
until=until,
additional_metadata=additional_metadata,
worker_id=worker_id,
parent_task_external_id=parent_task_external_id,
)
class Workflow(BaseWorkflow[TWorkflowInput]):
"""
@@ -921,93 +1032,3 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
self._on_success_task = _task
case _:
raise ValueError("Invalid task type")
def list_runs(
self,
since: datetime | None = None,
until: datetime | None = None,
limit: int = 100,
offset: int | None = None,
statuses: list[V1TaskStatus] | None = None,
additional_metadata: dict[str, str] | None = None,
worker_id: str | None = None,
parent_task_external_id: str | None = None,
only_tasks: bool = False,
) -> list[V1TaskSummary]:
"""
List runs of the workflow.
:param since: The start time for the runs to be listed.
:param until: The end time for the runs to be listed.
:param limit: The maximum number of runs to be listed.
:param offset: The offset for pagination.
:param statuses: The statuses of the runs to be listed.
:param additional_metadata: Additional metadata for filtering the runs.
:param worker_id: The ID of the worker that ran the tasks.
:param parent_task_external_id: The external ID of the parent task.
:param only_tasks: Whether to list only task runs.
:returns: A list of `V1TaskSummary` objects representing the runs of the workflow.
"""
workflows = self.client.workflows.list(workflow_name=self.name)
if not workflows.rows:
logger.warning(f"No runs found for {self.name}")
return []
workflow = workflows.rows[0]
response = self.client.runs.list(
workflow_ids=[workflow.metadata.id],
since=since or datetime.now() - timedelta(days=1),
only_tasks=only_tasks,
offset=offset,
limit=limit,
statuses=statuses,
until=until,
additional_metadata=additional_metadata,
worker_id=worker_id,
parent_task_external_id=parent_task_external_id,
)
return response.rows
async def aio_list_runs(
self,
since: datetime | None = None,
until: datetime | None = None,
limit: int = 100,
offset: int | None = None,
statuses: list[V1TaskStatus] | None = None,
additional_metadata: dict[str, str] | None = None,
worker_id: str | None = None,
parent_task_external_id: str | None = None,
only_tasks: bool = False,
) -> list[V1TaskSummary]:
"""
List runs of the workflow.
:param since: The start time for the runs to be listed.
:param until: The end time for the runs to be listed.
:param limit: The maximum number of runs to be listed.
:param offset: The offset for pagination.
:param statuses: The statuses of the runs to be listed.
:param additional_metadata: Additional metadata for filtering the runs.
:param worker_id: The ID of the worker that ran the tasks.
:param parent_task_external_id: The external ID of the parent task.
:param only_tasks: Whether to list only task runs.
:returns: A list of `V1TaskSummary` objects representing the runs of the workflow.
"""
return await asyncio.to_thread(
self.list_runs,
since=since or datetime.now() - timedelta(days=1),
only_tasks=only_tasks,
offset=offset,
limit=limit,
statuses=statuses,
until=until,
additional_metadata=additional_metadata,
worker_id=worker_id,
parent_task_external_id=parent_task_external_id,
)