From 9b39ac92e4473dc86d2376247b137df3bdb3b77f Mon Sep 17 00:00:00 2001 From: Matt Kaye Date: Thu, 10 Jul 2025 11:50:55 -0400 Subject: [PATCH] Feat: Improve metrics feature client (#1976) * feat: add optional replacement to remove null unicode char * feat: rework metrics client * feat: rm unused * chore: changelog * chore: gen * feat: add tenant prom metrics scraper * feat: docs * chore: changelog, naming * fix: lint * fix: type * fix: pass replacement through --- .../sdks/python/feature-clients/metrics.mdx | 126 ++++----- sdks/python/CHANGELOG.md | 10 + .../clients/rest/api/tenant_api.py | 266 ++++++++++++++++++ .../clients/rest/models/v1_task_summary.py | 7 + sdks/python/hatchet_sdk/features/metrics.py | 199 +++++++------ sdks/python/hatchet_sdk/utils/serde.py | 28 +- sdks/python/pyproject.toml | 2 +- 7 files changed, 470 insertions(+), 168 deletions(-) diff --git a/frontend/docs/pages/sdks/python/feature-clients/metrics.mdx b/frontend/docs/pages/sdks/python/feature-clients/metrics.mdx index 262ba3c32..295ca2b5a 100644 --- a/frontend/docs/pages/sdks/python/feature-clients/metrics.mdx +++ b/frontend/docs/pages/sdks/python/feature-clients/metrics.mdx @@ -6,103 +6,93 @@ The metrics client is a client for reading metrics out of Hatchet's metrics API. Methods: -| Name | Description | -| -------------------------- | ------------------------------------------------------------------------- | -| `aio_get_queue_metrics` | Retrieve queue metrics for a set of workflow ids and additional metadata. | -| `aio_get_task_metrics` | Retrieve queue metrics | -| `aio_get_workflow_metrics` | Retrieve workflow metrics for a given workflow ID. | -| `get_queue_metrics` | Retrieve queue metrics for a set of workflow ids and additional metadata. | -| `get_task_metrics` | Retrieve queue metrics | -| `get_workflow_metrics` | Retrieve workflow metrics for a given workflow ID. | +| Name | Description | +| -------------------------------------- | ----------------------------------------------------------------------------------------- | +| `aio_get_queue_metrics` | Retrieve the current queue metrics for the tenant. | +| `aio_get_task_metrics` | Retrieve task metrics, grouped by status (queued, running, completed, failed, cancelled). | +| `aio_scrape_tenant_prometheus_metrics` | Scrape Prometheus metrics for the tenant. Returns the metrics in Prometheus text format. | +| `get_queue_metrics` | Retrieve the current queue metrics for the tenant. | +| `get_task_metrics` | Retrieve task metrics, grouped by status (queued, running, completed, failed, cancelled). | +| `scrape_tenant_prometheus_metrics` | Scrape Prometheus metrics for the tenant. Returns the metrics in Prometheus text format. | ### Functions #### `aio_get_queue_metrics` -Retrieve queue metrics for a set of workflow ids and additional metadata. - -Parameters: - -| Name | Type | Description | Default | -| --------------------- | --------------------------------- | ----------------------------------------------- | ------- | -| `workflow_ids` | `list[str] \| None` | A list of workflow IDs to retrieve metrics for. | `None` | -| `additional_metadata` | `JSONSerializableMapping \| None` | Additional metadata to filter the metrics by. | `None` | +Retrieve the current queue metrics for the tenant. Returns: -| Type | Description | -| -------------------- | ------------------------------------------------ | -| `TenantQueueMetrics` | Workflow metrics for the specified workflow IDs. | +| Type | Description | +| ---------------- | ------------------------- | +| `dict[str, Any]` | The current queue metrics | #### `aio_get_task_metrics` -Retrieve queue metrics - -Returns: - -| Type | Description | -| --------------------------- | ------------------------------------- | -| `TenantStepRunQueueMetrics` | Step run queue metrics for the tenant | - -#### `aio_get_workflow_metrics` - -Retrieve workflow metrics for a given workflow ID. +Retrieve task metrics, grouped by status (queued, running, completed, failed, cancelled). Parameters: -| Name | Type | Description | Default | -| ------------- | --------------------------- | ----------------------------------------------- | ---------- | -| `workflow_id` | `str` | The ID of the workflow to retrieve metrics for. | _required_ | -| `status` | `WorkflowRunStatus \| None` | The status of the workflow run to filter by. | `None` | -| `group_key` | `str \| None` | The key to group the metrics by. | `None` | +| Name | Type | Description | Default | +| ------------------------------ | ------------------- | -------------------------------------------------------------------------------------------------------------------- | ------- | +| `since` | `datetime \| None` | Start time for the metrics query (defaults to the past day if unset) | `None` | +| `until` | `datetime \| None` | End time for the metrics query | `None` | +| `workflow_ids` | `list[str] \| None` | List of workflow IDs to filter the metrics by | `None` | +| `parent_task_external_id` | `str \| None` | ID of the parent task to filter by (note that parent task here refers to the task that spawned this task as a child) | `None` | +| `triggering_event_external_id` | `str \| None` | ID of the triggering event to filter by | `None` | Returns: -| Type | Description | -| ----------------- | ----------------------------------------------- | -| `WorkflowMetrics` | Workflow metrics for the specified workflow ID. | +| Type | Description | +| ------------- | ------------ | +| `TaskMetrics` | Task metrics | + +#### `aio_scrape_tenant_prometheus_metrics` + +Scrape Prometheus metrics for the tenant. Returns the metrics in Prometheus text format. + +Returns: + +| Type | Description | +| ----- | ----------------------------------------------- | +| `str` | The metrics, returned in Prometheus text format | #### `get_queue_metrics` -Retrieve queue metrics for a set of workflow ids and additional metadata. - -Parameters: - -| Name | Type | Description | Default | -| --------------------- | --------------------------------- | ----------------------------------------------- | ------- | -| `workflow_ids` | `list[str] \| None` | A list of workflow IDs to retrieve metrics for. | `None` | -| `additional_metadata` | `JSONSerializableMapping \| None` | Additional metadata to filter the metrics by. | `None` | +Retrieve the current queue metrics for the tenant. Returns: -| Type | Description | -| -------------------- | ------------------------------------------------ | -| `TenantQueueMetrics` | Workflow metrics for the specified workflow IDs. | +| Type | Description | +| ---------------- | ------------------------- | +| `dict[str, Any]` | The current queue metrics | #### `get_task_metrics` -Retrieve queue metrics - -Returns: - -| Type | Description | -| --------------------------- | ------------------------------------- | -| `TenantStepRunQueueMetrics` | Step run queue metrics for the tenant | - -#### `get_workflow_metrics` - -Retrieve workflow metrics for a given workflow ID. +Retrieve task metrics, grouped by status (queued, running, completed, failed, cancelled). Parameters: -| Name | Type | Description | Default | -| ------------- | --------------------------- | ----------------------------------------------- | ---------- | -| `workflow_id` | `str` | The ID of the workflow to retrieve metrics for. | _required_ | -| `status` | `WorkflowRunStatus \| None` | The status of the workflow run to filter by. | `None` | -| `group_key` | `str \| None` | The key to group the metrics by. | `None` | +| Name | Type | Description | Default | +| ------------------------------ | ------------------- | -------------------------------------------------------------------------------------------------------------------- | ------- | +| `since` | `datetime \| None` | Start time for the metrics query (defaults to the past day if unset) | `None` | +| `until` | `datetime \| None` | End time for the metrics query | `None` | +| `workflow_ids` | `list[str] \| None` | List of workflow IDs to filter the metrics by | `None` | +| `parent_task_external_id` | `str \| None` | ID of the parent task to filter by (note that parent task here refers to the task that spawned this task as a child) | `None` | +| `triggering_event_external_id` | `str \| None` | ID of the triggering event to filter by | `None` | Returns: -| Type | Description | -| ----------------- | ----------------------------------------------- | -| `WorkflowMetrics` | Workflow metrics for the specified workflow ID. | +| Type | Description | +| ------------- | ------------ | +| `TaskMetrics` | Task metrics | + +#### `scrape_tenant_prometheus_metrics` + +Scrape Prometheus metrics for the tenant. Returns the metrics in Prometheus text format. + +Returns: + +| Type | Description | +| ----- | ----------------------------------------------- | +| `str` | The metrics, returned in Prometheus text format | diff --git a/sdks/python/CHANGELOG.md b/sdks/python/CHANGELOG.md index 054727b9b..4915192ec 100644 --- a/sdks/python/CHANGELOG.md +++ b/sdks/python/CHANGELOG.md @@ -5,6 +5,16 @@ All notable changes to Hatchet's Python SDK will be documented in this changelog The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.15.0] - 2025-07-10 + +### Added + +- The `Metrics` client now includes a method to scrape Prometheus metrics from the tenant. + +### Changed + +- The `Metrics` client's `get_task_metrics` and `get_queue_metrics` now return better-shaped, correctly-fetched data from the API. + ## [1.14.4] - 2025-07-09 ### Added diff --git a/sdks/python/hatchet_sdk/clients/rest/api/tenant_api.py b/sdks/python/hatchet_sdk/clients/rest/api/tenant_api.py index f64274bd9..ccbc017dc 100644 --- a/sdks/python/hatchet_sdk/clients/rest/api/tenant_api.py +++ b/sdks/python/hatchet_sdk/clients/rest/api/tenant_api.py @@ -2005,6 +2005,272 @@ class TenantApi: _request_auth=_request_auth, ) + @validate_call + def tenant_get_prometheus_metrics( + self, + tenant: Annotated[ + str, + Field( + min_length=36, strict=True, max_length=36, description="The tenant id" + ), + ], + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] + ], + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> str: + """Get prometheus metrics + + Get the prometheus metrics for the tenant + + :param tenant: The tenant id (required) + :type tenant: str + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._tenant_get_prometheus_metrics_serialize( + tenant=tenant, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index, + ) + + _response_types_map: Dict[str, Optional[str]] = { + "200": "str", + "400": "APIErrors", + "403": "APIErrors", + "404": "APIErrors", + } + response_data = self.api_client.call_api( + *_param, _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ).data + + @validate_call + def tenant_get_prometheus_metrics_with_http_info( + self, + tenant: Annotated[ + str, + Field( + min_length=36, strict=True, max_length=36, description="The tenant id" + ), + ], + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] + ], + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> ApiResponse[str]: + """Get prometheus metrics + + Get the prometheus metrics for the tenant + + :param tenant: The tenant id (required) + :type tenant: str + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._tenant_get_prometheus_metrics_serialize( + tenant=tenant, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index, + ) + + _response_types_map: Dict[str, Optional[str]] = { + "200": "str", + "400": "APIErrors", + "403": "APIErrors", + "404": "APIErrors", + } + response_data = self.api_client.call_api( + *_param, _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ) + + @validate_call + def tenant_get_prometheus_metrics_without_preload_content( + self, + tenant: Annotated[ + str, + Field( + min_length=36, strict=True, max_length=36, description="The tenant id" + ), + ], + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] + ], + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> RESTResponseType: + """Get prometheus metrics + + Get the prometheus metrics for the tenant + + :param tenant: The tenant id (required) + :type tenant: str + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._tenant_get_prometheus_metrics_serialize( + tenant=tenant, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index, + ) + + _response_types_map: Dict[str, Optional[str]] = { + "200": "str", + "400": "APIErrors", + "403": "APIErrors", + "404": "APIErrors", + } + response_data = self.api_client.call_api( + *_param, _request_timeout=_request_timeout + ) + return response_data.response + + def _tenant_get_prometheus_metrics_serialize( + self, + tenant, + _request_auth, + _content_type, + _headers, + _host_index, + ) -> RequestSerialized: + + _host = None + + _collection_formats: Dict[str, str] = {} + + _path_params: Dict[str, str] = {} + _query_params: List[Tuple[str, str]] = [] + _header_params: Dict[str, Optional[str]] = _headers or {} + _form_params: List[Tuple[str, str]] = [] + _files: Dict[ + str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] + ] = {} + _body_params: Optional[bytes] = None + + # process the path parameters + if tenant is not None: + _path_params["tenant"] = tenant + # process the query parameters + # process the header parameters + # process the form parameters + # process the body parameter + + # set the HTTP header `Accept` + if "Accept" not in _header_params: + _header_params["Accept"] = self.api_client.select_header_accept( + ["text/plain", "application/json"] + ) + + # authentication setting + _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] + + return self.api_client.param_serialize( + method="GET", + resource_path="/api/v1/tenants/{tenant}/prometheus-metrics", + path_params=_path_params, + query_params=_query_params, + header_params=_header_params, + body=_body_params, + post_params=_form_params, + files=_files, + auth_settings=_auth_settings, + collection_formats=_collection_formats, + _host=_host, + _request_auth=_request_auth, + ) + @validate_call def tenant_get_step_run_queue_metrics( self, diff --git a/sdks/python/hatchet_sdk/clients/rest/models/v1_task_summary.py b/sdks/python/hatchet_sdk/clients/rest/models/v1_task_summary.py index 8e0863eb7..b1d11add3 100644 --- a/sdks/python/hatchet_sdk/clients/rest/models/v1_task_summary.py +++ b/sdks/python/hatchet_sdk/clients/rest/models/v1_task_summary.py @@ -114,6 +114,11 @@ class V1TaskSummary(BaseModel): workflow_config: Optional[Dict[str, Any]] = Field( default=None, alias="workflowConfig" ) + parent_task_external_id: Optional[StrictStr] = Field( + default=None, + description="The external ID of the parent task.", + alias="parentTaskExternalId", + ) __properties: ClassVar[List[str]] = [ "metadata", "actionId", @@ -142,6 +147,7 @@ class V1TaskSummary(BaseModel): "workflowRunExternalId", "workflowVersionId", "workflowConfig", + "parentTaskExternalId", ] model_config = ConfigDict( @@ -239,6 +245,7 @@ class V1TaskSummary(BaseModel): "workflowRunExternalId": obj.get("workflowRunExternalId"), "workflowVersionId": obj.get("workflowVersionId"), "workflowConfig": obj.get("workflowConfig"), + "parentTaskExternalId": obj.get("parentTaskExternalId"), } ) return _obj diff --git a/sdks/python/hatchet_sdk/features/metrics.py b/sdks/python/hatchet_sdk/features/metrics.py index 2bdd8f98d..312ebe213 100644 --- a/sdks/python/hatchet_sdk/features/metrics.py +++ b/sdks/python/hatchet_sdk/features/metrics.py @@ -1,20 +1,21 @@ import asyncio +from datetime import datetime, timedelta, timezone +from typing import Any +from pydantic import BaseModel + +from hatchet_sdk.clients.rest.api.task_api import TaskApi from hatchet_sdk.clients.rest.api.tenant_api import TenantApi -from hatchet_sdk.clients.rest.api.workflow_api import WorkflowApi from hatchet_sdk.clients.rest.api_client import ApiClient -from hatchet_sdk.clients.rest.models.tenant_queue_metrics import TenantQueueMetrics -from hatchet_sdk.clients.rest.models.tenant_step_run_queue_metrics import ( - TenantStepRunQueueMetrics, -) -from hatchet_sdk.clients.rest.models.workflow_metrics import WorkflowMetrics -from hatchet_sdk.clients.rest.models.workflow_run_status import WorkflowRunStatus -from hatchet_sdk.clients.v1.api_client import ( - BaseRestClient, - maybe_additional_metadata_to_kv, - retry, -) -from hatchet_sdk.utils.typing import JSONSerializableMapping +from hatchet_sdk.clients.v1.api_client import BaseRestClient, retry + + +class TaskMetrics(BaseModel): + cancelled: int + completed: int + failed: int + queued: int + running: int class MetricsClient(BaseRestClient): @@ -22,108 +23,126 @@ class MetricsClient(BaseRestClient): The metrics client is a client for reading metrics out of Hatchet's metrics API. """ - def _wa(self, client: ApiClient) -> WorkflowApi: - return WorkflowApi(client) + def _taskapi(self, client: ApiClient) -> TaskApi: + return TaskApi(client) def _ta(self, client: ApiClient) -> TenantApi: return TenantApi(client) - @retry - def get_workflow_metrics( - self, - workflow_id: str, - status: WorkflowRunStatus | None = None, - group_key: str | None = None, - ) -> WorkflowMetrics: - """ - Retrieve workflow metrics for a given workflow ID. - - :param workflow_id: The ID of the workflow to retrieve metrics for. - :param status: The status of the workflow run to filter by. - :param group_key: The key to group the metrics by. - - :return: Workflow metrics for the specified workflow ID. - """ - with self.client() as client: - return self._wa(client).workflow_get_metrics( - workflow=workflow_id, status=status, group_key=group_key - ) - - async def aio_get_workflow_metrics( - self, - workflow_id: str, - status: WorkflowRunStatus | None = None, - group_key: str | None = None, - ) -> WorkflowMetrics: - """ - Retrieve workflow metrics for a given workflow ID. - - :param workflow_id: The ID of the workflow to retrieve metrics for. - :param status: The status of the workflow run to filter by. - :param group_key: The key to group the metrics by. - - :return: Workflow metrics for the specified workflow ID. - """ - return await asyncio.to_thread( - self.get_workflow_metrics, workflow_id, status, group_key - ) - @retry def get_queue_metrics( self, - workflow_ids: list[str] | None = None, - additional_metadata: JSONSerializableMapping | None = None, - ) -> TenantQueueMetrics: + ) -> dict[str, Any]: """ - Retrieve queue metrics for a set of workflow ids and additional metadata. + Retrieve the current queue metrics for the tenant. - :param workflow_ids: A list of workflow IDs to retrieve metrics for. - :param additional_metadata: Additional metadata to filter the metrics by. - - :return: Workflow metrics for the specified workflow IDs. + :return: The current queue metrics """ with self.client() as client: - return self._wa(client).tenant_get_queue_metrics( - tenant=self.client_config.tenant_id, - workflows=workflow_ids, - additional_metadata=maybe_additional_metadata_to_kv( - additional_metadata - ), - ) + return ( + self._ta(client) + .tenant_get_step_run_queue_metrics( + tenant=self.client_config.tenant_id, + ) + .queues + ) or {} async def aio_get_queue_metrics( self, - workflow_ids: list[str] | None = None, - additional_metadata: JSONSerializableMapping | None = None, - ) -> TenantQueueMetrics: + ) -> dict[str, Any]: """ - Retrieve queue metrics for a set of workflow ids and additional metadata. + Retrieve the current queue metrics for the tenant. - :param workflow_ids: A list of workflow IDs to retrieve metrics for. - :param additional_metadata: Additional metadata to filter the metrics by. - - :return: Workflow metrics for the specified workflow IDs. + :return: The current queue metrics """ - return await asyncio.to_thread( - self.get_queue_metrics, workflow_ids, additional_metadata - ) + + return await asyncio.to_thread(self.get_queue_metrics) @retry - def get_task_metrics(self) -> TenantStepRunQueueMetrics: + def scrape_tenant_prometheus_metrics( + self, + ) -> str: """ - Retrieve queue metrics + Scrape Prometheus metrics for the tenant. Returns the metrics in Prometheus text format. - :return: Step run queue metrics for the tenant + :return: The metrics, returned in Prometheus text format """ with self.client() as client: - return self._ta(client).tenant_get_step_run_queue_metrics( - tenant=self.client_config.tenant_id + return self._ta(client).tenant_get_prometheus_metrics( + tenant=self.client_config.tenant_id, ) - async def aio_get_task_metrics(self) -> TenantStepRunQueueMetrics: + async def aio_scrape_tenant_prometheus_metrics( + self, + ) -> str: """ - Retrieve queue metrics + Scrape Prometheus metrics for the tenant. Returns the metrics in Prometheus text format. - :return: Step run queue metrics for the tenant + :return: The metrics, returned in Prometheus text format """ - return await asyncio.to_thread(self.get_task_metrics) + + return await asyncio.to_thread(self.scrape_tenant_prometheus_metrics) + + @retry + def get_task_metrics( + self, + since: datetime | None = None, + until: datetime | None = None, + workflow_ids: list[str] | None = None, + parent_task_external_id: str | None = None, + triggering_event_external_id: str | None = None, + ) -> TaskMetrics: + """ + Retrieve task metrics, grouped by status (queued, running, completed, failed, cancelled). + + :param since: Start time for the metrics query (defaults to the past day if unset) + :param until: End time for the metrics query + :param workflow_ids: List of workflow IDs to filter the metrics by + :param parent_task_external_id: ID of the parent task to filter by (note that parent task here refers to the task that spawned this task as a child) + :param triggering_event_external_id: ID of the triggering event to filter by + :return: Task metrics + """ + + since = since or datetime.now(timezone.utc) - timedelta(days=1) + until = until or datetime.now(timezone.utc) + with self.client() as client: + metrics = { + m.status.name.lower(): m.count + for m in self._taskapi(client).v1_task_list_status_metrics( + tenant=self.client_config.tenant_id, + since=since, + until=until, + workflow_ids=workflow_ids, + parent_task_external_id=parent_task_external_id, + triggering_event_external_id=triggering_event_external_id, + ) + } + + return TaskMetrics.model_validate(metrics) + + async def aio_get_task_metrics( + self, + since: datetime | None = None, + until: datetime | None = None, + workflow_ids: list[str] | None = None, + parent_task_external_id: str | None = None, + triggering_event_external_id: str | None = None, + ) -> TaskMetrics: + """ + Retrieve task metrics, grouped by status (queued, running, completed, failed, cancelled). + + :param since: Start time for the metrics query (defaults to the past day if unset) + :param until: End time for the metrics query + :param workflow_ids: List of workflow IDs to filter the metrics by + :param parent_task_external_id: ID of the parent task to filter by (note that parent task here refers to the task that spawned this task as a child) + :param triggering_event_external_id: ID of the triggering event to filter by + :return: Task metrics + """ + return await asyncio.to_thread( + self.get_task_metrics, + since, + until, + workflow_ids, + parent_task_external_id, + triggering_event_external_id, + ) diff --git a/sdks/python/hatchet_sdk/utils/serde.py b/sdks/python/hatchet_sdk/utils/serde.py index 932cb0161..7ac1cc7d0 100644 --- a/sdks/python/hatchet_sdk/utils/serde.py +++ b/sdks/python/hatchet_sdk/utils/serde.py @@ -5,23 +5,27 @@ K = TypeVar("K") @overload -def remove_null_unicode_character(data: str) -> str: ... +def remove_null_unicode_character(data: str, replacement: str = "") -> str: ... @overload -def remove_null_unicode_character(data: dict[K, T]) -> dict[K, T]: ... +def remove_null_unicode_character( + data: dict[K, T], replacement: str = "" +) -> dict[K, T]: ... @overload -def remove_null_unicode_character(data: list[T]) -> list[T]: ... +def remove_null_unicode_character(data: list[T], replacement: str = "") -> list[T]: ... @overload -def remove_null_unicode_character(data: tuple[T, ...]) -> tuple[T, ...]: ... +def remove_null_unicode_character( + data: tuple[T, ...], replacement: str = "" +) -> tuple[T, ...]: ... def remove_null_unicode_character( - data: str | dict[K, T] | list[T] | tuple[T, ...], + data: str | dict[K, T] | list[T] | tuple[T, ...], replacement: str = "" ) -> str | dict[K, T] | list[T] | tuple[T, ...]: """ Recursively traverse a dictionary (a task's output) and remove the unicode escape sequence \\u0000 which will cause unexpected behavior in Hatchet. @@ -29,23 +33,29 @@ def remove_null_unicode_character( Needed as Hatchet does not support \\u0000 in task outputs :param data: The task output (a JSON-serializable dictionary or mapping) + :param replacement: The string to replace \\u0000 with. + :return: The same dictionary with all \\u0000 characters removed from strings, and nested dictionaries/lists processed recursively. :raises TypeError: If the input is not a string, dictionary, list, or tuple. """ if isinstance(data, str): - return data.replace("\u0000", "") + return data.replace("\u0000", replacement) if isinstance(data, dict): return { - key: remove_null_unicode_character(cast(Any, value)) + key: remove_null_unicode_character(cast(Any, value), replacement) for key, value in data.items() } if isinstance(data, list): - return [remove_null_unicode_character(cast(Any, item)) for item in data] + return [ + remove_null_unicode_character(cast(Any, item), replacement) for item in data + ] if isinstance(data, tuple): - return tuple(remove_null_unicode_character(cast(Any, item)) for item in data) + return tuple( + remove_null_unicode_character(cast(Any, item), replacement) for item in data + ) raise TypeError( f"Unsupported type {type(data)}. Expected str, dict, list, or tuple." diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index 256f63c2e..57e8d3f78 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "hatchet-sdk" -version = "1.14.4" +version = "1.15.0" description = "" authors = ["Alexander Belanger "] readme = "README.md"