diff --git a/frontend/docs/pages/sdks/python/client.mdx b/frontend/docs/pages/sdks/python/client.mdx index 31d512b6d..1cd893c5c 100644 --- a/frontend/docs/pages/sdks/python/client.mdx +++ b/frontend/docs/pages/sdks/python/client.mdx @@ -1,6 +1,6 @@ # Hatchet Python SDK Reference -This is the Python SDK reference, documenting methods available for interacting with Hatchet resources. Check out the [user guide](../../home) for an introduction for getting your first tasks running +This is the Python SDK reference, documenting methods available for interacting with Hatchet resources. Check out the [user guide](../../home) for an introduction for getting your first tasks running. ## The Hatchet Python Client @@ -94,18 +94,18 @@ Define a Hatchet workflow, which can then declare `task`s and be `run`, `schedul Parameters: -| Name | Type | Description | Default | -| ------------------ | -------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------- | -| `name` | `str` | The name of the workflow. | _required_ | -| `description` | `str \| None` | A description for the workflow | `None` | -| `input_validator` | `Type[TWorkflowInput] \| None` | A Pydantic model to use as a validator for the `input` to the tasks in the workflow. If no validator is provided, defaults to an `EmptyModel` under the hood. The `EmptyModel` is a Pydantic model with no fields specified, and with the `extra` config option set to `"allow"`. | `None` | -| `on_events` | `list[str]` | A list of event triggers for the workflow - events which cause the workflow to be run. | `[]` | -| `on_crons` | `list[str]` | A list of cron triggers for the workflow. | `[]` | -| `version` | `str \| None` | A version for the workflow | `None` | -| `sticky` | `StickyStrategy \| None` | A sticky strategy for the workflow | `None` | -| `default_priority` | `int` | The priority of the workflow. Higher values will cause this workflow to have priority in scheduling over other, lower priority ones. | `1` | -| `concurrency` | `ConcurrencyExpression \| list[ConcurrencyExpression] \| None` | A concurrency object controlling the concurrency settings for this workflow. | `None` | -| `task_defaults` | `TaskDefaults` | A `TaskDefaults` object controlling the default task settings for this workflow. | `TaskDefaults()` | +| Name | Type | Description | Default | +| ------------------ | -------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------- | +| `name` | `str` | The name of the workflow. | _required_ | +| `description` | `str \| None` | A description for the workflow | `None` | +| `input_validator` | `Type[TWorkflowInput] \| None` | A Pydantic model to use as a validator for the `input` to the tasks in the workflow. If no validator is provided, defaults to an `EmptyModel` under the hood. | `None` | +| `on_events` | `list[str]` | A list of event triggers for the workflow - events which cause the workflow to be run. | `[]` | +| `on_crons` | `list[str]` | A list of cron triggers for the workflow. | `[]` | +| `version` | `str \| None` | A version for the workflow | `None` | +| `sticky` | `StickyStrategy \| None` | A sticky strategy for the workflow | `None` | +| `default_priority` | `int` | The priority of the workflow. Higher values will cause this workflow to have priority in scheduling over other, lower priority ones. | `1` | +| `concurrency` | `ConcurrencyExpression \| list[ConcurrencyExpression] \| None` | A concurrency object controlling the concurrency settings for this workflow. | `None` | +| `task_defaults` | `TaskDefaults` | A `TaskDefaults` object controlling the default task settings for this workflow. | `TaskDefaults()` | Returns: @@ -119,24 +119,24 @@ A decorator to transform a function into a standalone Hatchet task that runs as Parameters: -| Name | Type | Description | Default | -| ----------------------- | -------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------- | --------------------------- | -| `name` | `str` | The name of the task. If not specified, defaults to the name of the function being wrapped by the `task` decorator. | _required_ | -| `description` | `str \| None` | An optional description for the task. | `None` | -| `input_validator` | `Type[TWorkflowInput] \| None` | A Pydantic model to use as a validator for the input to the task. If no validator is provided, defaults to an `EmptyModel`. | `None` | -| `on_events` | `list[str]` | A list of event triggers for the task - events which cause the task to be run. | `[]` | -| `on_crons` | `list[str]` | A list of cron triggers for the task. | `[]` | -| `version` | `str \| None` | A version for the task. | `None` | -| `sticky` | `StickyStrategy \| None` | A sticky strategy for the task. | `None` | -| `default_priority` | `int` | The priority of the task. Higher values will cause this task to have priority in scheduling. | `1` | -| `concurrency` | `ConcurrencyExpression \| list[ConcurrencyExpression] \| None` | A concurrency object controlling the concurrency settings for this task. | `None` | -| `schedule_timeout` | `Duration` | The maximum time allowed for scheduling the task. | `DEFAULT_SCHEDULE_TIMEOUT` | -| `execution_timeout` | `Duration` | The maximum time allowed for executing the task. | `DEFAULT_EXECUTION_TIMEOUT` | -| `retries` | `int` | The number of times to retry the task before failing. | `0` | -| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the task. | `[]` | -| `desired_worker_labels` | `dict[str, DesiredWorkerLabel]` | A dictionary of desired worker labels that determine to which worker the task should be assigned. | `{}` | -| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` | -| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` | +| Name | Type | Description | Default | +| ----------------------- | -------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------- | ----------------------- | +| `name` | `str` | The name of the task. If not specified, defaults to the name of the function being wrapped by the `task` decorator. | _required_ | +| `description` | `str \| None` | An optional description for the task. | `None` | +| `input_validator` | `Type[TWorkflowInput] \| None` | A Pydantic model to use as a validator for the input to the task. If no validator is provided, defaults to an `EmptyModel`. | `None` | +| `on_events` | `list[str]` | A list of event triggers for the task - events which cause the task to be run. | `[]` | +| `on_crons` | `list[str]` | A list of cron triggers for the task. | `[]` | +| `version` | `str \| None` | A version for the task. | `None` | +| `sticky` | `StickyStrategy \| None` | A sticky strategy for the task. | `None` | +| `default_priority` | `int` | The priority of the task. Higher values will cause this task to have priority in scheduling. | `1` | +| `concurrency` | `ConcurrencyExpression \| list[ConcurrencyExpression] \| None` | A concurrency object controlling the concurrency settings for this task. | `None` | +| `schedule_timeout` | `Duration` | The maximum time allowed for scheduling the task. | `timedelta(minutes=5)` | +| `execution_timeout` | `Duration` | The maximum time allowed for executing the task. | `timedelta(seconds=60)` | +| `retries` | `int` | The number of times to retry the task before failing. | `0` | +| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the task. | `[]` | +| `desired_worker_labels` | `dict[str, DesiredWorkerLabel]` | A dictionary of desired worker labels that determine to which worker the task should be assigned. | `{}` | +| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` | +| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` | Returns: @@ -150,24 +150,24 @@ A decorator to transform a function into a standalone Hatchet _durable_ task tha Parameters: -| Name | Type | Description | Default | -| ----------------------- | ------------------------------- | --------------------------------------------------------------------------------------------------------------------------- | --------------------------- | -| `name` | `str` | The name of the task. If not specified, defaults to the name of the function being wrapped by the `task` decorator. | _required_ | -| `description` | `str \| None` | An optional description for the task. | `None` | -| `input_validator` | `Type[TWorkflowInput] \| None` | A Pydantic model to use as a validator for the input to the task. If no validator is provided, defaults to an `EmptyModel`. | `None` | -| `on_events` | `list[str]` | A list of event triggers for the task - events which cause the task to be run. | `[]` | -| `on_crons` | `list[str]` | A list of cron triggers for the task. | `[]` | -| `version` | `str \| None` | A version for the task. | `None` | -| `sticky` | `StickyStrategy \| None` | A sticky strategy for the task. | `None` | -| `default_priority` | `int` | The priority of the task. Higher values will cause this task to have priority in scheduling. | `1` | -| `concurrency` | `ConcurrencyExpression \| None` | A concurrency object controlling the concurrency settings for this task. | `None` | -| `schedule_timeout` | `Duration` | The maximum time allowed for scheduling the task. | `DEFAULT_SCHEDULE_TIMEOUT` | -| `execution_timeout` | `Duration` | The maximum time allowed for executing the task. | `DEFAULT_EXECUTION_TIMEOUT` | -| `retries` | `int` | The number of times to retry the task before failing. | `0` | -| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the task. | `[]` | -| `desired_worker_labels` | `dict[str, DesiredWorkerLabel]` | A dictionary of desired worker labels that determine to which worker the task should be assigned. | `{}` | -| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` | -| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` | +| Name | Type | Description | Default | +| ----------------------- | ------------------------------- | --------------------------------------------------------------------------------------------------------------------------- | ----------------------- | +| `name` | `str` | The name of the task. If not specified, defaults to the name of the function being wrapped by the `task` decorator. | _required_ | +| `description` | `str \| None` | An optional description for the task. | `None` | +| `input_validator` | `Type[TWorkflowInput] \| None` | A Pydantic model to use as a validator for the input to the task. If no validator is provided, defaults to an `EmptyModel`. | `None` | +| `on_events` | `list[str]` | A list of event triggers for the task - events which cause the task to be run. | `[]` | +| `on_crons` | `list[str]` | A list of cron triggers for the task. | `[]` | +| `version` | `str \| None` | A version for the task. | `None` | +| `sticky` | `StickyStrategy \| None` | A sticky strategy for the task. | `None` | +| `default_priority` | `int` | The priority of the task. Higher values will cause this task to have priority in scheduling. | `1` | +| `concurrency` | `ConcurrencyExpression \| None` | A concurrency object controlling the concurrency settings for this task. | `None` | +| `schedule_timeout` | `Duration` | The maximum time allowed for scheduling the task. | `timedelta(minutes=5)` | +| `execution_timeout` | `Duration` | The maximum time allowed for executing the task. | `timedelta(seconds=60)` | +| `retries` | `int` | The number of times to retry the task before failing. | `0` | +| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the task. | `[]` | +| `desired_worker_labels` | `dict[str, DesiredWorkerLabel]` | A dictionary of desired worker labels that determine to which worker the task should be assigned. | `{}` | +| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` | +| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` | Returns: diff --git a/frontend/docs/pages/sdks/python/feature-clients/metrics.mdx b/frontend/docs/pages/sdks/python/feature-clients/metrics.mdx index 262ba3c32..338d7a847 100644 --- a/frontend/docs/pages/sdks/python/feature-clients/metrics.mdx +++ b/frontend/docs/pages/sdks/python/feature-clients/metrics.mdx @@ -9,10 +9,10 @@ Methods: | Name | Description | | -------------------------- | ------------------------------------------------------------------------- | | `aio_get_queue_metrics` | Retrieve queue metrics for a set of workflow ids and additional metadata. | -| `aio_get_task_metrics` | Retrieve queue metrics | +| `aio_get_task_metrics` | Retrieve queue metrics. | | `aio_get_workflow_metrics` | Retrieve workflow metrics for a given workflow ID. | | `get_queue_metrics` | Retrieve queue metrics for a set of workflow ids and additional metadata. | -| `get_task_metrics` | Retrieve queue metrics | +| `get_task_metrics` | Retrieve queue metrics. | | `get_workflow_metrics` | Retrieve workflow metrics for a given workflow ID. | ### Functions @@ -36,13 +36,13 @@ Returns: #### `aio_get_task_metrics` -Retrieve queue metrics +Retrieve queue metrics. Returns: -| Type | Description | -| --------------------------- | ------------------------------------- | -| `TenantStepRunQueueMetrics` | Step run queue metrics for the tenant | +| Type | Description | +| --------------------------- | -------------------------------------- | +| `TenantStepRunQueueMetrics` | Step run queue metrics for the tenant. | #### `aio_get_workflow_metrics` @@ -81,13 +81,13 @@ Returns: #### `get_task_metrics` -Retrieve queue metrics +Retrieve queue metrics. Returns: -| Type | Description | -| --------------------------- | ------------------------------------- | -| `TenantStepRunQueueMetrics` | Step run queue metrics for the tenant | +| Type | Description | +| --------------------------- | -------------------------------------- | +| `TenantStepRunQueueMetrics` | Step run queue metrics for the tenant. | #### `get_workflow_metrics` diff --git a/frontend/docs/pages/sdks/python/runnables.mdx b/frontend/docs/pages/sdks/python/runnables.mdx index c84daf3dc..396f7dd97 100644 --- a/frontend/docs/pages/sdks/python/runnables.mdx +++ b/frontend/docs/pages/sdks/python/runnables.mdx @@ -9,7 +9,7 @@ Bases: `BaseWorkflow[TWorkflowInput]` -A Hatchet workflow, which allows you to define tasks to be run and perform actions on the workflow. +A Hatchet workflow allows you to define tasks to be run and perform actions on the workflow. Workflows in Hatchet represent coordinated units of work that can be triggered, scheduled, or run on a cron schedule. Each workflow can contain multiple tasks that can be arranged in dependencies (DAGs), have customized retry behavior, timeouts, concurrency controls, and more. @@ -40,9 +40,9 @@ Workflows support various execution patterns including: - Cron-based recurring execution with `create_cron()` - Bulk operations with `run_many()` -Tasks within workflows can be defined with `@workflow.task()` or `@workflow.durable_task()` decorators and can be arranged into complex dependency patterns. +Tasks within workflows can be defined with `@workflow.task()` or `@workflow.durable_task()` decorators and arranged into complex dependency patterns. -Methods: +### Methods | Name | Description | | ---------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------ | @@ -73,21 +73,21 @@ A decorator to transform a function into a Hatchet task that runs as part of a w Parameters: -| Name | Type | Description | Default | -| ----------------------- | --------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------- | -| `name` | `str \| None` | The name of the task. If not specified, defaults to the name of the function being wrapped by the `task` decorator. | `None` | -| `schedule_timeout` | `Duration` | The maximum time to wait for the task to be scheduled. The run will be canceled if the task does not begin within this time. | `DEFAULT_SCHEDULE_TIMEOUT` | -| `execution_timeout` | `Duration` | The maximum time to wait for the task to complete. The run will be canceled if the task does not complete within this time. | `DEFAULT_EXECUTION_TIMEOUT` | -| `parents` | `list[Task[TWorkflowInput, Any]]` | A list of tasks that are parents of the task. Note: Parents must be defined before their children. | `[]` | -| `retries` | `int` | The number of times to retry the task before failing. | `0` | -| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the task. | `[]` | -| `desired_worker_labels` | `dict[str, DesiredWorkerLabel]` | A dictionary of desired worker labels that determine to which worker the task should be assigned. See documentation and examples on affinity and worker labels for more details. | `{}` | -| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` | -| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` | -| `concurrency` | `list[ConcurrencyExpression]` | A list of concurrency expressions for the task. | `[]` | -| `wait_for` | `list[Condition \| OrGroup]` | A list of conditions that must be met before the task can run. | `[]` | -| `skip_if` | `list[Condition \| OrGroup]` | A list of conditions that, if met, will cause the task to be skipped. | `[]` | -| `cancel_if` | `list[Condition \| OrGroup]` | A list of conditions that, if met, will cause the task to be canceled. | `[]` | +| Name | Type | Description | Default | +| ----------------------- | --------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------- | +| `name` | `str \| None` | The name of the task. If not specified, defaults to the name of the function being wrapped by the `task` decorator. | `None` | +| `schedule_timeout` | `Duration` | The maximum time to wait for the task to be scheduled. The run will be canceled if the task does not begin within this time. | `timedelta(minutes=5)` | +| `execution_timeout` | `Duration` | The maximum time to wait for the task to complete. The run will be canceled if the task does not complete within this time. | `timedelta(seconds=60)` | +| `parents` | `list[Task[TWorkflowInput, Any]]` | A list of tasks that are parents of the task. Note: Parents must be defined before their children. | `[]` | +| `retries` | `int` | The number of times to retry the task before failing. | `0` | +| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the task. | `[]` | +| `desired_worker_labels` | `dict[str, DesiredWorkerLabel]` | A dictionary of desired worker labels that determine to which worker the task should be assigned. See documentation and examples on affinity and worker labels for more details. | `{}` | +| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` | +| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` | +| `concurrency` | `list[ConcurrencyExpression]` | A list of concurrency expressions for the task. | `[]` | +| `wait_for` | `list[Condition \| OrGroup]` | A list of conditions that must be met before the task can run. | `[]` | +| `skip_if` | `list[Condition \| OrGroup]` | A list of conditions that, if met, will cause the task to be skipped. | `[]` | +| `cancel_if` | `list[Condition \| OrGroup]` | A list of conditions that, if met, will cause the task to be canceled. | `[]` | Returns: @@ -105,21 +105,21 @@ See the Hatchet docs for more information on durable execution to decide if this Parameters: -| Name | Type | Description | Default | -| ----------------------- | --------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------- | -| `name` | `str \| None` | The name of the task. If not specified, defaults to the name of the function being wrapped by the `task` decorator. | `None` | -| `schedule_timeout` | `Duration` | The maximum time to wait for the task to be scheduled. The run will be canceled if the task does not begin within this time. | `DEFAULT_SCHEDULE_TIMEOUT` | -| `execution_timeout` | `Duration` | The maximum time to wait for the task to complete. The run will be canceled if the task does not complete within this time. | `DEFAULT_EXECUTION_TIMEOUT` | -| `parents` | `list[Task[TWorkflowInput, Any]]` | A list of tasks that are parents of the task. Note: Parents must be defined before their children. | `[]` | -| `retries` | `int` | The number of times to retry the task before failing. | `0` | -| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the task. | `[]` | -| `desired_worker_labels` | `dict[str, DesiredWorkerLabel]` | A dictionary of desired worker labels that determine to which worker the task should be assigned. See documentation and examples on affinity and worker labels for more details. | `{}` | -| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` | -| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` | -| `concurrency` | `list[ConcurrencyExpression]` | A list of concurrency expressions for the task. | `[]` | -| `wait_for` | `list[Condition \| OrGroup]` | A list of conditions that must be met before the task can run. | `[]` | -| `skip_if` | `list[Condition \| OrGroup]` | A list of conditions that, if met, will cause the task to be skipped. | `[]` | -| `cancel_if` | `list[Condition \| OrGroup]` | A list of conditions that, if met, will cause the task to be canceled. | `[]` | +| Name | Type | Description | Default | +| ----------------------- | --------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------- | +| `name` | `str \| None` | The name of the task. If not specified, defaults to the name of the function being wrapped by the `task` decorator. | `None` | +| `schedule_timeout` | `Duration` | The maximum time to wait for the task to be scheduled. The run will be canceled if the task does not begin within this time. | `timedelta(minutes=5)` | +| `execution_timeout` | `Duration` | The maximum time to wait for the task to complete. The run will be canceled if the task does not complete within this time. | `timedelta(seconds=60)` | +| `parents` | `list[Task[TWorkflowInput, Any]]` | A list of tasks that are parents of the task. Note: Parents must be defined before their children. | `[]` | +| `retries` | `int` | The number of times to retry the task before failing. | `0` | +| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the task. | `[]` | +| `desired_worker_labels` | `dict[str, DesiredWorkerLabel]` | A dictionary of desired worker labels that determine to which worker the task should be assigned. See documentation and examples on affinity and worker labels for more details. | `{}` | +| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` | +| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` | +| `concurrency` | `list[ConcurrencyExpression]` | A list of concurrency expressions for the task. | `[]` | +| `wait_for` | `list[Condition \| OrGroup]` | A list of conditions that must be met before the task can run. | `[]` | +| `skip_if` | `list[Condition \| OrGroup]` | A list of conditions that, if met, will cause the task to be skipped. | `[]` | +| `cancel_if` | `list[Condition \| OrGroup]` | A list of conditions that, if met, will cause the task to be canceled. | `[]` | Returns: @@ -133,16 +133,16 @@ A decorator to transform a function into a Hatchet on-failure task that runs as Parameters: -| Name | Type | Description | Default | -| --------------------- | ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- | --------------------------- | -| `name` | `str \| None` | The name of the on-failure task. If not specified, defaults to the name of the function being wrapped by the `on_failure_task` decorator. | `None` | -| `schedule_timeout` | `Duration` | The maximum time to wait for the task to be scheduled. The run will be canceled if the task does not begin within this time. | `DEFAULT_SCHEDULE_TIMEOUT` | -| `execution_timeout` | `Duration` | The maximum time to wait for the task to complete. The run will be canceled if the task does not complete within this time. | `DEFAULT_EXECUTION_TIMEOUT` | -| `retries` | `int` | The number of times to retry the on-failure task before failing. | `0` | -| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the on-failure task. | `[]` | -| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` | -| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` | -| `concurrency` | `list[ConcurrencyExpression]` | A list of concurrency expressions for the on-success task. | `[]` | +| Name | Type | Description | Default | +| --------------------- | ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- | ----------------------- | +| `name` | `str \| None` | The name of the on-failure task. If not specified, defaults to the name of the function being wrapped by the `on_failure_task` decorator. | `None` | +| `schedule_timeout` | `Duration` | The maximum time to wait for the task to be scheduled. The run will be canceled if the task does not begin within this time. | `timedelta(minutes=5)` | +| `execution_timeout` | `Duration` | The maximum time to wait for the task to complete. The run will be canceled if the task does not complete within this time. | `timedelta(seconds=60)` | +| `retries` | `int` | The number of times to retry the on-failure task before failing. | `0` | +| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the on-failure task. | `[]` | +| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` | +| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` | +| `concurrency` | `list[ConcurrencyExpression]` | A list of concurrency expressions for the on-success task. | `[]` | Returns: @@ -156,16 +156,16 @@ A decorator to transform a function into a Hatchet on-success task that runs as Parameters: -| Name | Type | Description | Default | -| --------------------- | ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- | --------------------------- | -| `name` | `str \| None` | The name of the on-success task. If not specified, defaults to the name of the function being wrapped by the `on_success_task` decorator. | `None` | -| `schedule_timeout` | `Duration` | The maximum time to wait for the task to be scheduled. The run will be canceled if the task does not begin within this time. | `DEFAULT_SCHEDULE_TIMEOUT` | -| `execution_timeout` | `Duration` | The maximum time to wait for the task to complete. The run will be canceled if the task does not complete within this time. | `DEFAULT_EXECUTION_TIMEOUT` | -| `retries` | `int` | The number of times to retry the on-success task before failing | `0` | -| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the on-success task. | `[]` | -| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` | -| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` | -| `concurrency` | `list[ConcurrencyExpression]` | A list of concurrency expressions for the on-success task. | `[]` | +| Name | Type | Description | Default | +| --------------------- | ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- | ----------------------- | +| `name` | `str \| None` | The name of the on-success task. If not specified, defaults to the name of the function being wrapped by the `on_success_task` decorator. | `None` | +| `schedule_timeout` | `Duration` | The maximum time to wait for the task to be scheduled. The run will be canceled if the task does not begin within this time. | `timedelta(minutes=5)` | +| `execution_timeout` | `Duration` | The maximum time to wait for the task to complete. The run will be canceled if the task does not complete within this time. | `timedelta(seconds=60)` | +| `retries` | `int` | The number of times to retry the on-success task before failing | `0` | +| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the on-success task. | `[]` | +| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` | +| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` | +| `concurrency` | `list[ConcurrencyExpression]` | A list of concurrency expressions for the on-success task. | `[]` | Returns: @@ -441,7 +441,7 @@ Returns: Bases: `BaseWorkflow[TWorkflowInput]`, `Generic[TWorkflowInput, R]` -Methods: +### Methods | Name | Description | | ---------------------- | ------------------------------------------------------------------------- | diff --git a/sdks/python/apply_patches.py b/sdks/python/apply_patches.py new file mode 100644 index 000000000..ee8725ed7 --- /dev/null +++ b/sdks/python/apply_patches.py @@ -0,0 +1,140 @@ +import re +from copy import deepcopy +from pathlib import Path +from typing import Callable + + +def prepend_import(content: str, import_statement: str) -> str: + if import_statement in content: + return content + + match = re.search(r"^import\s+|^from\s+", content, re.MULTILINE) + insert_position = match.start() if match else 0 + + return ( + content[:insert_position] + import_statement + "\n" + content[insert_position:] + ) + + +def apply_patch(content: str, pattern: str, replacement: str) -> str: + return re.sub(pattern, replacement, content) + + +def atomically_patch_file( + file_path: str, patch_funcs: list[Callable[[str], str]] +) -> None: + path = Path(file_path) + original = path.read_text() + + modified = deepcopy(original) + + try: + for func in patch_funcs: + modified = func(modified) + except Exception as e: + print(f"Error patching {file_path}: {e}") + return + + if modified != original: + path.write_text(modified) + print(f"Patched {file_path}") + else: + print(f"No changes made to {file_path}") + + +def patch_contract_import_paths(content: str) -> str: + return apply_patch(content, r"\bfrom v1\b", "from hatchet_sdk.contracts.v1") + + +def patch_grpc_dispatcher_import(content: str) -> str: + return apply_patch( + content, + r"\bimport dispatcher_pb2 as dispatcher__pb2\b", + "from hatchet_sdk.contracts import dispatcher_pb2 as dispatcher__pb2", + ) + + +def patch_grpc_events_import(content: str) -> str: + return apply_patch( + content, + r"\bimport events_pb2 as events__pb2\b", + "from hatchet_sdk.contracts import events_pb2 as events__pb2", + ) + + +def patch_grpc_workflows_import(content: str) -> str: + return apply_patch( + content, + r"\bimport workflows_pb2 as workflows__pb2\b", + "from hatchet_sdk.contracts import workflows_pb2 as workflows__pb2", + ) + + +def patch_grpc_init_signature(content: str) -> str: + return apply_patch( + content, + r"def __init__\(self, channel\):", + "def __init__(self, channel: grpc.Channel | grpc.aio.Channel) -> None:", + ) + + +def apply_patches_to_matching_files( + root: str, glob: str, patch_funcs: list[Callable[[str], str]] +) -> None: + for file_path in Path(root).rglob(glob): + atomically_patch_file(str(file_path), patch_funcs) + + +def patch_api_client_datetime_format_on_post(content: str) -> str: + content = prepend_import(content, "from hatchet_sdk.logger import logger") + pattern = r"([ \t]*)elif isinstance\(obj, \(datetime\.datetime, datetime\.date\)\):\s*\n\1[ \t]*return obj\.isoformat\(\)" + + replacement = ( + r"\1## IMPORTANT: Checking `datetime` must come before `date` since `datetime` is a subclass of `date`\n" + r"\1elif isinstance(obj, datetime.datetime):\n" + r"\1 if not obj.tzinfo:\n" + r"\1 current_tz = (datetime.datetime.now(datetime.timezone(datetime.timedelta(0))).astimezone().tzinfo or datetime.timezone.utc)\n" + r'\1 logger.warning(f"timezone-naive datetime found. assuming {current_tz}.")\n' + r"\1 obj = obj.replace(tzinfo=current_tz)\n\n" + r"\1 return obj.isoformat()\n" + r"\1elif isinstance(obj, datetime.date):\n" + r"\1 return obj.isoformat()" + ) + return apply_patch(content, pattern, replacement) + + +def patch_workflow_run_metrics_counts_return_type(content: str) -> str: + content = prepend_import( + content, + "from hatchet_sdk.clients.rest.models.workflow_runs_metrics_counts import WorkflowRunsMetricsCounts", + ) + pattern = r"([ \t]*)counts: Optional\[Dict\[str, Any\]\] = None" + replacement = r"\1counts: Optional[WorkflowRunsMetricsCounts] = None" + return apply_patch(content, pattern, replacement) + + +if __name__ == "__main__": + atomically_patch_file( + "hatchet_sdk/clients/rest/api_client.py", + [patch_api_client_datetime_format_on_post], + ) + atomically_patch_file( + "hatchet_sdk/clients/rest/models/workflow_runs_metrics.py", + [patch_workflow_run_metrics_counts_return_type], + ) + + grpc_patches: list[Callable[[str], str]] = [ + patch_contract_import_paths, + patch_grpc_dispatcher_import, + patch_grpc_events_import, + patch_grpc_workflows_import, + patch_grpc_init_signature, + ] + + pb2_patches: list[Callable[[str], str]] = [ + patch_contract_import_paths, + ] + + apply_patches_to_matching_files("hatchet_sdk/contracts", "*_grpc.py", grpc_patches) + apply_patches_to_matching_files("hatchet_sdk/contracts", "*_pb2.py", pb2_patches) + apply_patches_to_matching_files("hatchet_sdk/contracts", "*_pb2.pyi", pb2_patches) diff --git a/sdks/python/examples/concurrency_workflow_level/test_workflow_level_concurrency.py b/sdks/python/examples/concurrency_workflow_level/test_workflow_level_concurrency.py index 262188ccd..3af1ad3b6 100644 --- a/sdks/python/examples/concurrency_workflow_level/test_workflow_level_concurrency.py +++ b/sdks/python/examples/concurrency_workflow_level/test_workflow_level_concurrency.py @@ -51,7 +51,7 @@ class RunMetadata(BaseModel): return self.key -@pytest.mark.asyncio() +@pytest.mark.asyncio(loop_scope="session") async def test_workflow_level_concurrency(hatchet: Hatchet) -> None: test_run_id = str(uuid4()) diff --git a/sdks/python/examples/priority/test_priority.py b/sdks/python/examples/priority/test_priority.py index e488d2405..6445aaa2b 100644 --- a/sdks/python/examples/priority/test_priority.py +++ b/sdks/python/examples/priority/test_priority.py @@ -1,7 +1,8 @@ import asyncio from datetime import datetime, timedelta from random import choice -from typing import AsyncGenerator, Literal +from subprocess import Popen +from typing import Any, AsyncGenerator, Literal from uuid import uuid4 import pytest @@ -60,8 +61,20 @@ async def dummy_runs() -> None: return None -@pytest.mark.asyncio() -async def test_priority(hatchet: Hatchet, dummy_runs: None) -> None: +@pytest.mark.parametrize( + "on_demand_worker", + [ + ( + ["poetry", "run", "python", "examples/priority/worker.py", "--slots", "1"], + 8003, + ) + ], + indirect=True, +) +@pytest.mark.asyncio(loop_scope="session") +async def test_priority( + hatchet: Hatchet, dummy_runs: None, on_demand_worker: Popen[Any] +) -> None: test_run_id = str(uuid4()) choices: list[Priority] = ["low", "medium", "high", "default"] N = 30 @@ -134,8 +147,20 @@ async def test_priority(hatchet: Hatchet, dummy_runs: None) -> None: assert curr.finished_at >= curr.started_at -@pytest.mark.asyncio() -async def test_priority_via_scheduling(hatchet: Hatchet, dummy_runs: None) -> None: +@pytest.mark.parametrize( + "on_demand_worker", + [ + ( + ["poetry", "run", "python", "examples/priority/worker.py", "--slots", "1"], + 8003, + ) + ], + indirect=True, +) +@pytest.mark.asyncio(loop_scope="session") +async def test_priority_via_scheduling( + hatchet: Hatchet, dummy_runs: None, on_demand_worker: Popen[Any] +) -> None: test_run_id = str(uuid4()) sleep_time = 3 n = 30 @@ -258,8 +283,20 @@ def time_until_next_minute() -> float: return (next_minute - now).total_seconds() -@pytest.mark.asyncio() -async def test_priority_via_cron(hatchet: Hatchet, crons: tuple[str, str, int]) -> None: +@pytest.mark.parametrize( + "on_demand_worker", + [ + ( + ["poetry", "run", "python", "examples/priority/worker.py", "--slots", "1"], + 8003, + ) + ], + indirect=True, +) +@pytest.mark.asyncio(loop_scope="session") +async def test_priority_via_cron( + hatchet: Hatchet, crons: tuple[str, str, int], on_demand_worker: Popen[Any] +) -> None: workflow_id, test_run_id, n = crons await asyncio.sleep(time_until_next_minute() + 10) diff --git a/sdks/python/examples/priority/worker.py b/sdks/python/examples/priority/worker.py index 23894cebf..269d76e3e 100644 --- a/sdks/python/examples/priority/worker.py +++ b/sdks/python/examples/priority/worker.py @@ -1,12 +1,6 @@ import time -from hatchet_sdk import ( - ConcurrencyExpression, - ConcurrencyLimitStrategy, - Context, - EmptyModel, - Hatchet, -) +from hatchet_sdk import Context, EmptyModel, Hatchet hatchet = Hatchet(debug=True) @@ -17,11 +11,6 @@ SLEEP_TIME = 0.25 priority_workflow = hatchet.workflow( name="PriorityWorkflow", default_priority=DEFAULT_PRIORITY, - concurrency=ConcurrencyExpression( - max_runs=1, - expression="'true'", - limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN, - ), ) # !! diff --git a/sdks/python/examples/worker.py b/sdks/python/examples/worker.py index dd109e53a..b6cc7fec6 100644 --- a/sdks/python/examples/worker.py +++ b/sdks/python/examples/worker.py @@ -16,7 +16,6 @@ from examples.lifespans.simple import lifespan, lifespan_task from examples.logger.workflow import logging_workflow from examples.non_retryable.worker import non_retryable_workflow from examples.on_failure.worker import on_failure_wf, on_failure_wf_with_details -from examples.priority.worker import priority_workflow from examples.timeout.worker import refresh_timeout_wf, timeout_wf from examples.waits.worker import task_condition_workflow from hatchet_sdk import Hatchet @@ -52,7 +51,6 @@ def main() -> None: sync_fanout_child, non_retryable_workflow, concurrency_workflow_level_workflow, - priority_workflow, lifespan_task, ], lifespan=lifespan, diff --git a/sdks/python/generate.sh b/sdks/python/generate.sh index 2161ee7c6..e777b0f5f 100755 --- a/sdks/python/generate.sh +++ b/sdks/python/generate.sh @@ -94,24 +94,13 @@ git restore pyproject.toml poetry.lock poetry install --all-extras -# Fix relative imports in _grpc.py files -find ./hatchet_sdk/contracts -type f -name '*_grpc.py' -print0 | xargs -0 sed -i '' 's/from v1/from hatchet_sdk.contracts.v1/g' -find ./hatchet_sdk/contracts -type f -name '*_pb2.pyi' -print0 | xargs -0 sed -i '' 's/from v1/from hatchet_sdk.contracts.v1/g' -find ./hatchet_sdk/contracts -type f -name '*_pb2.py' -print0 | xargs -0 sed -i '' 's/from v1/from hatchet_sdk.contracts.v1/g' - -find ./hatchet_sdk/contracts -type f -name '*_grpc.py' -print0 | xargs -0 sed -i '' 's/import dispatcher_pb2 as dispatcher__pb2/from hatchet_sdk.contracts import dispatcher_pb2 as dispatcher__pb2/g' -find ./hatchet_sdk/contracts -type f -name '*_grpc.py' -print0 | xargs -0 sed -i '' 's/import events_pb2 as events__pb2/from hatchet_sdk.contracts import events_pb2 as events__pb2/g' -find ./hatchet_sdk/contracts -type f -name '*_grpc.py' -print0 | xargs -0 sed -i '' 's/import workflows_pb2 as workflows__pb2/from hatchet_sdk.contracts import workflows_pb2 as workflows__pb2/g' - -find ./hatchet_sdk/contracts -type f -name '*_grpc.py' -print0 | xargs -0 sed -i '' 's/def __init__(self, channel):/def __init__(self, channel: grpc.Channel | grpc.aio.Channel) -> None:/g' - set +e -# Note: Hack to run the linters without failing so that we can apply the patch +# Note: Hack to run the linters without failing so that we can apply the patches ./lint.sh set -e -# apply patch to openapi-generator generated code -patch -p1 --no-backup-if-mismatch <./openapi_patch.patch +# apply patches to openapi-generator generated code +poetry run python apply_patches.py # Rerun the linters and fail if there are any issues ./lint.sh diff --git a/sdks/python/hatchet_sdk/clients/rest/__init__.py b/sdks/python/hatchet_sdk/clients/rest/__init__.py index c4c4e9280..3b01f54f7 100644 --- a/sdks/python/hatchet_sdk/clients/rest/__init__.py +++ b/sdks/python/hatchet_sdk/clients/rest/__init__.py @@ -245,6 +245,8 @@ from hatchet_sdk.clients.rest.models.v1_task_run_status import V1TaskRunStatus from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus from hatchet_sdk.clients.rest.models.v1_task_summary import V1TaskSummary from hatchet_sdk.clients.rest.models.v1_task_summary_list import V1TaskSummaryList +from hatchet_sdk.clients.rest.models.v1_task_timing import V1TaskTiming +from hatchet_sdk.clients.rest.models.v1_task_timing_list import V1TaskTimingList from hatchet_sdk.clients.rest.models.v1_trigger_workflow_run_request import ( V1TriggerWorkflowRunRequest, ) diff --git a/sdks/python/hatchet_sdk/clients/rest/api/task_api.py b/sdks/python/hatchet_sdk/clients/rest/api/task_api.py index b5981a66c..9ca57126c 100644 --- a/sdks/python/hatchet_sdk/clients/rest/api/task_api.py +++ b/sdks/python/hatchet_sdk/clients/rest/api/task_api.py @@ -1555,6 +1555,9 @@ class TaskApi: since: Annotated[ datetime, Field(description="The start time to get metrics for") ], + until: Annotated[ + Optional[datetime], Field(description="The end time to get metrics for") + ] = None, workflow_ids: Annotated[ Optional[ List[Annotated[str, Field(min_length=36, strict=True, max_length=36)]] @@ -1585,6 +1588,8 @@ class TaskApi: :type tenant: str :param since: The start time to get metrics for (required) :type since: datetime + :param until: The end time to get metrics for + :type until: datetime :param workflow_ids: The workflow id to find runs for :type workflow_ids: List[str] :param parent_task_external_id: The parent task's external id @@ -1614,6 +1619,7 @@ class TaskApi: _param = self._v1_task_list_status_metrics_serialize( tenant=tenant, since=since, + until=until, workflow_ids=workflow_ids, parent_task_external_id=parent_task_external_id, _request_auth=_request_auth, @@ -1649,6 +1655,9 @@ class TaskApi: since: Annotated[ datetime, Field(description="The start time to get metrics for") ], + until: Annotated[ + Optional[datetime], Field(description="The end time to get metrics for") + ] = None, workflow_ids: Annotated[ Optional[ List[Annotated[str, Field(min_length=36, strict=True, max_length=36)]] @@ -1679,6 +1688,8 @@ class TaskApi: :type tenant: str :param since: The start time to get metrics for (required) :type since: datetime + :param until: The end time to get metrics for + :type until: datetime :param workflow_ids: The workflow id to find runs for :type workflow_ids: List[str] :param parent_task_external_id: The parent task's external id @@ -1708,6 +1719,7 @@ class TaskApi: _param = self._v1_task_list_status_metrics_serialize( tenant=tenant, since=since, + until=until, workflow_ids=workflow_ids, parent_task_external_id=parent_task_external_id, _request_auth=_request_auth, @@ -1743,6 +1755,9 @@ class TaskApi: since: Annotated[ datetime, Field(description="The start time to get metrics for") ], + until: Annotated[ + Optional[datetime], Field(description="The end time to get metrics for") + ] = None, workflow_ids: Annotated[ Optional[ List[Annotated[str, Field(min_length=36, strict=True, max_length=36)]] @@ -1773,6 +1788,8 @@ class TaskApi: :type tenant: str :param since: The start time to get metrics for (required) :type since: datetime + :param until: The end time to get metrics for + :type until: datetime :param workflow_ids: The workflow id to find runs for :type workflow_ids: List[str] :param parent_task_external_id: The parent task's external id @@ -1802,6 +1819,7 @@ class TaskApi: _param = self._v1_task_list_status_metrics_serialize( tenant=tenant, since=since, + until=until, workflow_ids=workflow_ids, parent_task_external_id=parent_task_external_id, _request_auth=_request_auth, @@ -1825,6 +1843,7 @@ class TaskApi: self, tenant, since, + until, workflow_ids, parent_task_external_id, _request_auth, @@ -1863,6 +1882,17 @@ class TaskApi: else: _query_params.append(("since", since)) + if until is not None: + if isinstance(until, datetime): + _query_params.append( + ( + "until", + until.strftime(self.api_client.configuration.datetime_format), + ) + ) + else: + _query_params.append(("until", until)) + if workflow_ids is not None: _query_params.append(("workflow_ids", workflow_ids)) diff --git a/sdks/python/hatchet_sdk/clients/rest/api/workflow_runs_api.py b/sdks/python/hatchet_sdk/clients/rest/api/workflow_runs_api.py index 85b9ddbcf..656761f3f 100644 --- a/sdks/python/hatchet_sdk/clients/rest/api/workflow_runs_api.py +++ b/sdks/python/hatchet_sdk/clients/rest/api/workflow_runs_api.py @@ -23,6 +23,7 @@ from hatchet_sdk.clients.rest.api_response import ApiResponse from hatchet_sdk.clients.rest.models.v1_task_event_list import V1TaskEventList from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus from hatchet_sdk.clients.rest.models.v1_task_summary_list import V1TaskSummaryList +from hatchet_sdk.clients.rest.models.v1_task_timing_list import V1TaskTimingList from hatchet_sdk.clients.rest.models.v1_trigger_workflow_run_request import ( V1TriggerWorkflowRunRequest, ) @@ -914,6 +915,304 @@ class WorkflowRunsApi: _request_auth=_request_auth, ) + @validate_call + def v1_workflow_run_get_timings( + self, + v1_workflow_run: Annotated[ + str, + Field( + min_length=36, + strict=True, + max_length=36, + description="The workflow run id to get", + ), + ], + depth: Annotated[ + Optional[StrictInt], Field(description="The depth to retrieve children") + ] = None, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] + ], + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> V1TaskTimingList: + """List timings for a workflow run + + Get the timings for a workflow run + + :param v1_workflow_run: The workflow run id to get (required) + :type v1_workflow_run: str + :param depth: The depth to retrieve children + :type depth: int + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._v1_workflow_run_get_timings_serialize( + v1_workflow_run=v1_workflow_run, + depth=depth, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index, + ) + + _response_types_map: Dict[str, Optional[str]] = { + "200": "V1TaskTimingList", + "400": "APIErrors", + "403": "APIErrors", + "501": "APIErrors", + } + response_data = self.api_client.call_api( + *_param, _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ).data + + @validate_call + def v1_workflow_run_get_timings_with_http_info( + self, + v1_workflow_run: Annotated[ + str, + Field( + min_length=36, + strict=True, + max_length=36, + description="The workflow run id to get", + ), + ], + depth: Annotated[ + Optional[StrictInt], Field(description="The depth to retrieve children") + ] = None, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] + ], + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> ApiResponse[V1TaskTimingList]: + """List timings for a workflow run + + Get the timings for a workflow run + + :param v1_workflow_run: The workflow run id to get (required) + :type v1_workflow_run: str + :param depth: The depth to retrieve children + :type depth: int + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._v1_workflow_run_get_timings_serialize( + v1_workflow_run=v1_workflow_run, + depth=depth, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index, + ) + + _response_types_map: Dict[str, Optional[str]] = { + "200": "V1TaskTimingList", + "400": "APIErrors", + "403": "APIErrors", + "501": "APIErrors", + } + response_data = self.api_client.call_api( + *_param, _request_timeout=_request_timeout + ) + response_data.read() + return self.api_client.response_deserialize( + response_data=response_data, + response_types_map=_response_types_map, + ) + + @validate_call + def v1_workflow_run_get_timings_without_preload_content( + self, + v1_workflow_run: Annotated[ + str, + Field( + min_length=36, + strict=True, + max_length=36, + description="The workflow run id to get", + ), + ], + depth: Annotated[ + Optional[StrictInt], Field(description="The depth to retrieve children") + ] = None, + _request_timeout: Union[ + None, + Annotated[StrictFloat, Field(gt=0)], + Tuple[ + Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)] + ], + ] = None, + _request_auth: Optional[Dict[StrictStr, Any]] = None, + _content_type: Optional[StrictStr] = None, + _headers: Optional[Dict[StrictStr, Any]] = None, + _host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0, + ) -> RESTResponseType: + """List timings for a workflow run + + Get the timings for a workflow run + + :param v1_workflow_run: The workflow run id to get (required) + :type v1_workflow_run: str + :param depth: The depth to retrieve children + :type depth: int + :param _request_timeout: timeout setting for this request. If one + number provided, it will be total request + timeout. It can also be a pair (tuple) of + (connection, read) timeouts. + :type _request_timeout: int, tuple(int, int), optional + :param _request_auth: set to override the auth_settings for an a single + request; this effectively ignores the + authentication in the spec for a single request. + :type _request_auth: dict, optional + :param _content_type: force content-type for the request. + :type _content_type: str, Optional + :param _headers: set to override the headers for a single + request; this effectively ignores the headers + in the spec for a single request. + :type _headers: dict, optional + :param _host_index: set to override the host_index for a single + request; this effectively ignores the host_index + in the spec for a single request. + :type _host_index: int, optional + :return: Returns the result object. + """ # noqa: E501 + + _param = self._v1_workflow_run_get_timings_serialize( + v1_workflow_run=v1_workflow_run, + depth=depth, + _request_auth=_request_auth, + _content_type=_content_type, + _headers=_headers, + _host_index=_host_index, + ) + + _response_types_map: Dict[str, Optional[str]] = { + "200": "V1TaskTimingList", + "400": "APIErrors", + "403": "APIErrors", + "501": "APIErrors", + } + response_data = self.api_client.call_api( + *_param, _request_timeout=_request_timeout + ) + return response_data.response + + def _v1_workflow_run_get_timings_serialize( + self, + v1_workflow_run, + depth, + _request_auth, + _content_type, + _headers, + _host_index, + ) -> RequestSerialized: + + _host = None + + _collection_formats: Dict[str, str] = {} + + _path_params: Dict[str, str] = {} + _query_params: List[Tuple[str, str]] = [] + _header_params: Dict[str, Optional[str]] = _headers or {} + _form_params: List[Tuple[str, str]] = [] + _files: Dict[ + str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]] + ] = {} + _body_params: Optional[bytes] = None + + # process the path parameters + if v1_workflow_run is not None: + _path_params["v1-workflow-run"] = v1_workflow_run + # process the query parameters + if depth is not None: + + _query_params.append(("depth", depth)) + + # process the header parameters + # process the form parameters + # process the body parameter + + # set the HTTP header `Accept` + if "Accept" not in _header_params: + _header_params["Accept"] = self.api_client.select_header_accept( + ["application/json"] + ) + + # authentication setting + _auth_settings: List[str] = ["cookieAuth", "bearerAuth"] + + return self.api_client.param_serialize( + method="GET", + resource_path="/api/v1/stable/workflow-runs/{v1-workflow-run}/task-timings", + path_params=_path_params, + query_params=_query_params, + header_params=_header_params, + body=_body_params, + post_params=_form_params, + files=_files, + auth_settings=_auth_settings, + collection_formats=_collection_formats, + _host=_host, + _request_auth=_request_auth, + ) + @validate_call def v1_workflow_run_list( self, diff --git a/sdks/python/hatchet_sdk/clients/rest/api_client.py b/sdks/python/hatchet_sdk/clients/rest/api_client.py index e143e3d70..417ed9d16 100644 --- a/sdks/python/hatchet_sdk/clients/rest/api_client.py +++ b/sdks/python/hatchet_sdk/clients/rest/api_client.py @@ -40,6 +40,7 @@ from hatchet_sdk.clients.rest.exceptions import ( ServiceException, UnauthorizedException, ) +from hatchet_sdk.logger import logger RequestSerialized = Tuple[str, str, Dict[str, str], Optional[str], List[str]] @@ -356,7 +357,20 @@ class ApiClient: return [self.sanitize_for_serialization(sub_obj) for sub_obj in obj] elif isinstance(obj, tuple): return tuple(self.sanitize_for_serialization(sub_obj) for sub_obj in obj) - elif isinstance(obj, (datetime.datetime, datetime.date)): + ## IMPORTANT: Checking `datetime` must come before `date` since `datetime` is a subclass of `date` + elif isinstance(obj, datetime.datetime): + if not obj.tzinfo: + current_tz = ( + datetime.datetime.now(datetime.timezone(datetime.timedelta(0))) + .astimezone() + .tzinfo + or datetime.timezone.utc + ) + logger.warning(f"timezone-naive datetime found. assuming {current_tz}.") + obj = obj.replace(tzinfo=current_tz) + + return obj.isoformat() + elif isinstance(obj, datetime.date): return obj.isoformat() elif isinstance(obj, decimal.Decimal): return str(obj) diff --git a/sdks/python/hatchet_sdk/clients/rest/models/__init__.py b/sdks/python/hatchet_sdk/clients/rest/models/__init__.py index 9f363850f..0691183df 100644 --- a/sdks/python/hatchet_sdk/clients/rest/models/__init__.py +++ b/sdks/python/hatchet_sdk/clients/rest/models/__init__.py @@ -210,6 +210,8 @@ from hatchet_sdk.clients.rest.models.v1_task_run_status import V1TaskRunStatus from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus from hatchet_sdk.clients.rest.models.v1_task_summary import V1TaskSummary from hatchet_sdk.clients.rest.models.v1_task_summary_list import V1TaskSummaryList +from hatchet_sdk.clients.rest.models.v1_task_timing import V1TaskTiming +from hatchet_sdk.clients.rest.models.v1_task_timing_list import V1TaskTimingList from hatchet_sdk.clients.rest.models.v1_trigger_workflow_run_request import ( V1TriggerWorkflowRunRequest, ) diff --git a/sdks/python/hatchet_sdk/clients/rest/models/v1_task_summary.py b/sdks/python/hatchet_sdk/clients/rest/models/v1_task_summary.py index 9d5107fd9..01939ebc8 100644 --- a/sdks/python/hatchet_sdk/clients/rest/models/v1_task_summary.py +++ b/sdks/python/hatchet_sdk/clients/rest/models/v1_task_summary.py @@ -95,10 +95,8 @@ class V1TaskSummary(BaseModel): ) workflow_id: StrictStr = Field(alias="workflowId") workflow_name: Optional[StrictStr] = Field(default=None, alias="workflowName") - workflow_run_external_id: Optional[StrictStr] = Field( - default=None, - description="The external ID of the workflow run", - alias="workflowRunExternalId", + workflow_run_external_id: StrictStr = Field( + description="The external ID of the workflow run", alias="workflowRunExternalId" ) workflow_version_id: Optional[StrictStr] = Field( default=None, diff --git a/sdks/python/hatchet_sdk/clients/rest/models/v1_task_timing.py b/sdks/python/hatchet_sdk/clients/rest/models/v1_task_timing.py new file mode 100644 index 000000000..4b3700a22 --- /dev/null +++ b/sdks/python/hatchet_sdk/clients/rest/models/v1_task_timing.py @@ -0,0 +1,159 @@ +# coding: utf-8 + +""" + Hatchet API + + The Hatchet API + + The version of the OpenAPI document: 1.0.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +from __future__ import annotations + +import json +import pprint +import re # noqa: F401 +from datetime import datetime +from typing import Any, ClassVar, Dict, List, Optional, Set + +from pydantic import BaseModel, ConfigDict, Field, StrictInt, StrictStr +from typing_extensions import Annotated, Self + +from hatchet_sdk.clients.rest.models.api_resource_meta import APIResourceMeta +from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus + + +class V1TaskTiming(BaseModel): + """ + V1TaskTiming + """ # noqa: E501 + + metadata: APIResourceMeta + depth: StrictInt = Field(description="The depth of the task in the waterfall.") + status: V1TaskStatus + task_display_name: StrictStr = Field( + description="The display name of the task run.", alias="taskDisplayName" + ) + task_external_id: Annotated[ + str, Field(min_length=36, strict=True, max_length=36) + ] = Field(description="The external ID of the task.", alias="taskExternalId") + task_id: StrictInt = Field(description="The ID of the task.", alias="taskId") + task_inserted_at: datetime = Field( + description="The timestamp the task was inserted.", alias="taskInsertedAt" + ) + tenant_id: Annotated[str, Field(min_length=36, strict=True, max_length=36)] = Field( + description="The ID of the tenant.", alias="tenantId" + ) + parent_task_external_id: Optional[ + Annotated[str, Field(min_length=36, strict=True, max_length=36)] + ] = Field( + default=None, + description="The external ID of the parent task.", + alias="parentTaskExternalId", + ) + queued_at: Optional[datetime] = Field( + default=None, + description="The timestamp the task run was queued.", + alias="queuedAt", + ) + started_at: Optional[datetime] = Field( + default=None, + description="The timestamp the task run started.", + alias="startedAt", + ) + finished_at: Optional[datetime] = Field( + default=None, + description="The timestamp the task run finished.", + alias="finishedAt", + ) + __properties: ClassVar[List[str]] = [ + "metadata", + "depth", + "status", + "taskDisplayName", + "taskExternalId", + "taskId", + "taskInsertedAt", + "tenantId", + "parentTaskExternalId", + "queuedAt", + "startedAt", + "finishedAt", + ] + + model_config = ConfigDict( + populate_by_name=True, + validate_assignment=True, + protected_namespaces=(), + ) + + def to_str(self) -> str: + """Returns the string representation of the model using alias""" + return pprint.pformat(self.model_dump(by_alias=True)) + + def to_json(self) -> str: + """Returns the JSON representation of the model using alias""" + # TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_str: str) -> Optional[Self]: + """Create an instance of V1TaskTiming from a JSON string""" + return cls.from_dict(json.loads(json_str)) + + def to_dict(self) -> Dict[str, Any]: + """Return the dictionary representation of the model using alias. + + This has the following differences from calling pydantic's + `self.model_dump(by_alias=True)`: + + * `None` is only added to the output dict for nullable fields that + were set at model initialization. Other fields with value `None` + are ignored. + """ + excluded_fields: Set[str] = set([]) + + _dict = self.model_dump( + by_alias=True, + exclude=excluded_fields, + exclude_none=True, + ) + # override the default output from pydantic by calling `to_dict()` of metadata + if self.metadata: + _dict["metadata"] = self.metadata.to_dict() + return _dict + + @classmethod + def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: + """Create an instance of V1TaskTiming from a dict""" + if obj is None: + return None + + if not isinstance(obj, dict): + return cls.model_validate(obj) + + _obj = cls.model_validate( + { + "metadata": ( + APIResourceMeta.from_dict(obj["metadata"]) + if obj.get("metadata") is not None + else None + ), + "depth": obj.get("depth"), + "status": obj.get("status"), + "taskDisplayName": obj.get("taskDisplayName"), + "taskExternalId": obj.get("taskExternalId"), + "taskId": obj.get("taskId"), + "taskInsertedAt": obj.get("taskInsertedAt"), + "tenantId": obj.get("tenantId"), + "parentTaskExternalId": obj.get("parentTaskExternalId"), + "queuedAt": obj.get("queuedAt"), + "startedAt": obj.get("startedAt"), + "finishedAt": obj.get("finishedAt"), + } + ) + return _obj diff --git a/sdks/python/hatchet_sdk/clients/rest/models/v1_task_timing_list.py b/sdks/python/hatchet_sdk/clients/rest/models/v1_task_timing_list.py new file mode 100644 index 000000000..0cd2c2d43 --- /dev/null +++ b/sdks/python/hatchet_sdk/clients/rest/models/v1_task_timing_list.py @@ -0,0 +1,110 @@ +# coding: utf-8 + +""" + Hatchet API + + The Hatchet API + + The version of the OpenAPI document: 1.0.0 + Generated by OpenAPI Generator (https://openapi-generator.tech) + + Do not edit the class manually. +""" # noqa: E501 + + +from __future__ import annotations + +import json +import pprint +import re # noqa: F401 +from typing import Any, ClassVar, Dict, List, Optional, Set + +from pydantic import BaseModel, ConfigDict, Field +from typing_extensions import Self + +from hatchet_sdk.clients.rest.models.pagination_response import PaginationResponse +from hatchet_sdk.clients.rest.models.v1_task_timing import V1TaskTiming + + +class V1TaskTimingList(BaseModel): + """ + V1TaskTimingList + """ # noqa: E501 + + pagination: PaginationResponse + rows: List[V1TaskTiming] = Field(description="The list of task timings") + __properties: ClassVar[List[str]] = ["pagination", "rows"] + + model_config = ConfigDict( + populate_by_name=True, + validate_assignment=True, + protected_namespaces=(), + ) + + def to_str(self) -> str: + """Returns the string representation of the model using alias""" + return pprint.pformat(self.model_dump(by_alias=True)) + + def to_json(self) -> str: + """Returns the JSON representation of the model using alias""" + # TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead + return json.dumps(self.to_dict()) + + @classmethod + def from_json(cls, json_str: str) -> Optional[Self]: + """Create an instance of V1TaskTimingList from a JSON string""" + return cls.from_dict(json.loads(json_str)) + + def to_dict(self) -> Dict[str, Any]: + """Return the dictionary representation of the model using alias. + + This has the following differences from calling pydantic's + `self.model_dump(by_alias=True)`: + + * `None` is only added to the output dict for nullable fields that + were set at model initialization. Other fields with value `None` + are ignored. + """ + excluded_fields: Set[str] = set([]) + + _dict = self.model_dump( + by_alias=True, + exclude=excluded_fields, + exclude_none=True, + ) + # override the default output from pydantic by calling `to_dict()` of pagination + if self.pagination: + _dict["pagination"] = self.pagination.to_dict() + # override the default output from pydantic by calling `to_dict()` of each item in rows (list) + _items = [] + if self.rows: + for _item_rows in self.rows: + if _item_rows: + _items.append(_item_rows.to_dict()) + _dict["rows"] = _items + return _dict + + @classmethod + def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]: + """Create an instance of V1TaskTimingList from a dict""" + if obj is None: + return None + + if not isinstance(obj, dict): + return cls.model_validate(obj) + + _obj = cls.model_validate( + { + "pagination": ( + PaginationResponse.from_dict(obj["pagination"]) + if obj.get("pagination") is not None + else None + ), + "rows": ( + [V1TaskTiming.from_dict(_item) for _item in obj["rows"]] + if obj.get("rows") is not None + else None + ), + } + ) + return _obj diff --git a/sdks/python/hatchet_sdk/hatchet.py b/sdks/python/hatchet_sdk/hatchet.py index 377fcd405..39ce34235 100644 --- a/sdks/python/hatchet_sdk/hatchet.py +++ b/sdks/python/hatchet_sdk/hatchet.py @@ -1,5 +1,6 @@ import asyncio import logging +from datetime import timedelta from typing import Any, Callable, Type, Union, cast, overload from hatchet_sdk import Context, DurableContext @@ -21,8 +22,6 @@ from hatchet_sdk.logger import logger from hatchet_sdk.rate_limit import RateLimit from hatchet_sdk.runnables.standalone import Standalone from hatchet_sdk.runnables.types import ( - DEFAULT_EXECUTION_TIMEOUT, - DEFAULT_SCHEDULE_TIMEOUT, ConcurrencyExpression, EmptyModel, R, @@ -294,8 +293,8 @@ class Hatchet: sticky: StickyStrategy | None = None, default_priority: int = 1, concurrency: ConcurrencyExpression | list[ConcurrencyExpression] | None = None, - schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT, - execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT, + schedule_timeout: Duration = timedelta(minutes=5), + execution_timeout: Duration = timedelta(seconds=60), retries: int = 0, rate_limits: list[RateLimit] = [], desired_worker_labels: dict[str, DesiredWorkerLabel] = {}, @@ -316,8 +315,8 @@ class Hatchet: sticky: StickyStrategy | None = None, default_priority: int = 1, concurrency: ConcurrencyExpression | list[ConcurrencyExpression] | None = None, - schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT, - execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT, + schedule_timeout: Duration = timedelta(minutes=5), + execution_timeout: Duration = timedelta(seconds=60), retries: int = 0, rate_limits: list[RateLimit] = [], desired_worker_labels: dict[str, DesiredWorkerLabel] = {}, @@ -339,8 +338,8 @@ class Hatchet: sticky: StickyStrategy | None = None, default_priority: int = 1, concurrency: ConcurrencyExpression | list[ConcurrencyExpression] | None = None, - schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT, - execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT, + schedule_timeout: Duration = timedelta(minutes=5), + execution_timeout: Duration = timedelta(seconds=60), retries: int = 0, rate_limits: list[RateLimit] = [], desired_worker_labels: dict[str, DesiredWorkerLabel] = {}, @@ -451,8 +450,8 @@ class Hatchet: sticky: StickyStrategy | None = None, default_priority: int = 1, concurrency: ConcurrencyExpression | None = None, - schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT, - execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT, + schedule_timeout: Duration = timedelta(minutes=5), + execution_timeout: Duration = timedelta(seconds=60), retries: int = 0, rate_limits: list[RateLimit] = [], desired_worker_labels: dict[str, DesiredWorkerLabel] = {}, @@ -475,8 +474,8 @@ class Hatchet: sticky: StickyStrategy | None = None, default_priority: int = 1, concurrency: ConcurrencyExpression | None = None, - schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT, - execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT, + schedule_timeout: Duration = timedelta(minutes=5), + execution_timeout: Duration = timedelta(seconds=60), retries: int = 0, rate_limits: list[RateLimit] = [], desired_worker_labels: dict[str, DesiredWorkerLabel] = {}, @@ -498,8 +497,8 @@ class Hatchet: sticky: StickyStrategy | None = None, default_priority: int = 1, concurrency: ConcurrencyExpression | None = None, - schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT, - execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT, + schedule_timeout: Duration = timedelta(minutes=5), + execution_timeout: Duration = timedelta(seconds=60), retries: int = 0, rate_limits: list[RateLimit] = [], desired_worker_labels: dict[str, DesiredWorkerLabel] = {}, diff --git a/sdks/python/hatchet_sdk/runnables/task.py b/sdks/python/hatchet_sdk/runnables/task.py index ad1c38dd3..2970026eb 100644 --- a/sdks/python/hatchet_sdk/runnables/task.py +++ b/sdks/python/hatchet_sdk/runnables/task.py @@ -1,10 +1,10 @@ +from datetime import timedelta from typing import ( TYPE_CHECKING, Any, Awaitable, Callable, Generic, - TypeVar, Union, cast, get_type_hints, @@ -18,8 +18,6 @@ from hatchet_sdk.contracts.v1.workflows_pb2 import ( DesiredWorkerLabels, ) from hatchet_sdk.runnables.types import ( - DEFAULT_EXECUTION_TIMEOUT, - DEFAULT_SCHEDULE_TIMEOUT, ConcurrencyExpression, R, StepType, @@ -43,18 +41,6 @@ if TYPE_CHECKING: from hatchet_sdk.runnables.workflow import Workflow -T = TypeVar("T") - - -def fall_back_to_default(value: T, default: T, fallback_value: T) -> T: - ## If the value is not the default, it's set - if value != default: - return value - - ## Otherwise, it's unset, so return the fallback value - return fallback_value - - class Task(Generic[TWorkflowInput, R]): def __init__( self, @@ -68,8 +54,8 @@ class Task(Generic[TWorkflowInput, R]): type: StepType, workflow: "Workflow[TWorkflowInput]", name: str, - execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT, - schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT, + execution_timeout: Duration = timedelta(seconds=60), + schedule_timeout: Duration = timedelta(minutes=5), parents: "list[Task[TWorkflowInput, Any]]" = [], retries: int = 0, rate_limits: list[CreateTaskRateLimit] = [], @@ -89,12 +75,8 @@ class Task(Generic[TWorkflowInput, R]): self.workflow = workflow self.type = type - self.execution_timeout = fall_back_to_default( - execution_timeout, DEFAULT_EXECUTION_TIMEOUT, DEFAULT_EXECUTION_TIMEOUT - ) - self.schedule_timeout = fall_back_to_default( - schedule_timeout, DEFAULT_SCHEDULE_TIMEOUT, DEFAULT_SCHEDULE_TIMEOUT - ) + self.execution_timeout = execution_timeout + self.schedule_timeout = schedule_timeout self.name = name self.parents = parents self.retries = retries diff --git a/sdks/python/hatchet_sdk/runnables/types.py b/sdks/python/hatchet_sdk/runnables/types.py index 6abf66f50..07da14433 100644 --- a/sdks/python/hatchet_sdk/runnables/types.py +++ b/sdks/python/hatchet_sdk/runnables/types.py @@ -1,9 +1,8 @@ import asyncio -from datetime import timedelta from enum import Enum from typing import Any, Awaitable, Callable, ParamSpec, Type, TypeGuard, TypeVar, Union -from pydantic import BaseModel, ConfigDict, Field, StrictInt, model_validator +from pydantic import BaseModel, ConfigDict, Field, model_validator from hatchet_sdk.context.context import Context, DurableContext from hatchet_sdk.contracts.v1.workflows_pb2 import Concurrency @@ -16,11 +15,6 @@ R = TypeVar("R", bound=Union[ValidTaskReturnType, Awaitable[ValidTaskReturnType] P = ParamSpec("P") -DEFAULT_EXECUTION_TIMEOUT = timedelta(seconds=60) -DEFAULT_SCHEDULE_TIMEOUT = timedelta(minutes=5) -DEFAULT_PRIORITY = 1 - - class EmptyModel(BaseModel): model_config = ConfigDict(extra="allow", frozen=True) @@ -65,9 +59,12 @@ TWorkflowInput = TypeVar("TWorkflowInput", bound=BaseModel) class TaskDefaults(BaseModel): - schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT - execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT - priority: StrictInt = Field(gt=0, lt=4, default=DEFAULT_PRIORITY) + schedule_timeout: Duration | None = None + execution_timeout: Duration | None = None + priority: int | None = Field(gt=0, lt=4, default=None) + retries: int | None = None + backoff_factor: float | None = None + backoff_max_seconds: int | None = None class WorkflowConfig(BaseModel): diff --git a/sdks/python/hatchet_sdk/runnables/workflow.py b/sdks/python/hatchet_sdk/runnables/workflow.py index aab134d23..f8a729bb8 100644 --- a/sdks/python/hatchet_sdk/runnables/workflow.py +++ b/sdks/python/hatchet_sdk/runnables/workflow.py @@ -1,9 +1,9 @@ import asyncio from datetime import datetime, timedelta -from typing import TYPE_CHECKING, Any, Callable, Generic, cast +from typing import TYPE_CHECKING, Any, Callable, Generic, TypeVar, cast from google.protobuf import timestamp_pb2 -from pydantic import BaseModel +from pydantic import BaseModel, model_validator from hatchet_sdk.clients.admin import ( ScheduleTriggerWorkflowOptions, @@ -25,12 +25,11 @@ from hatchet_sdk.logger import logger from hatchet_sdk.rate_limit import RateLimit from hatchet_sdk.runnables.task import Task from hatchet_sdk.runnables.types import ( - DEFAULT_EXECUTION_TIMEOUT, - DEFAULT_SCHEDULE_TIMEOUT, ConcurrencyExpression, EmptyModel, R, StepType, + TaskDefaults, TWorkflowInput, WorkflowConfig, ) @@ -45,6 +44,62 @@ if TYPE_CHECKING: from hatchet_sdk.runnables.standalone import Standalone +T = TypeVar("T") + + +def fall_back_to_default(value: T, param_default: T, fallback_value: T | None) -> T: + ## If the value is not the param default, it's set + if value != param_default: + return value + + ## Otherwise, it's unset, so return the fallback value if it's set + if fallback_value is not None: + return fallback_value + + ## Otherwise return the param value + return value + + +class ComputedTaskParameters(BaseModel): + schedule_timeout: Duration + execution_timeout: Duration + retries: int + backoff_factor: float | None + backoff_max_seconds: int | None + + task_defaults: TaskDefaults + + @model_validator(mode="after") + def validate_params(self) -> "ComputedTaskParameters": + self.execution_timeout = fall_back_to_default( + value=self.execution_timeout, + param_default=timedelta(seconds=60), + fallback_value=self.task_defaults.execution_timeout, + ) + self.schedule_timeout = fall_back_to_default( + value=self.schedule_timeout, + param_default=timedelta(minutes=5), + fallback_value=self.task_defaults.schedule_timeout, + ) + self.backoff_factor = fall_back_to_default( + value=self.backoff_factor, + param_default=None, + fallback_value=self.task_defaults.backoff_factor, + ) + self.backoff_max_seconds = fall_back_to_default( + value=self.backoff_max_seconds, + param_default=None, + fallback_value=self.task_defaults.backoff_max_seconds, + ) + self.retries = fall_back_to_default( + value=self.retries, + param_default=0, + fallback_value=self.task_defaults.retries, + ) + + return self + + def transform_desired_worker_label(d: DesiredWorkerLabel) -> DesiredWorkerLabels: value = d.value return DesiredWorkerLabels( @@ -530,8 +585,8 @@ class Workflow(BaseWorkflow[TWorkflowInput]): def task( self, name: str | None = None, - schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT, - execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT, + schedule_timeout: Duration = timedelta(minutes=5), + execution_timeout: Duration = timedelta(seconds=60), parents: list[Task[TWorkflowInput, Any]] = [], retries: int = 0, rate_limits: list[RateLimit] = [], @@ -575,6 +630,15 @@ class Workflow(BaseWorkflow[TWorkflowInput]): :returns: A decorator which creates a `Task` object. """ + computed_params = ComputedTaskParameters( + schedule_timeout=schedule_timeout, + execution_timeout=execution_timeout, + retries=retries, + backoff_factor=backoff_factor, + backoff_max_seconds=backoff_max_seconds, + task_defaults=self.config.task_defaults, + ) + def inner( func: Callable[[TWorkflowInput, Context], R] ) -> Task[TWorkflowInput, R]: @@ -584,17 +648,17 @@ class Workflow(BaseWorkflow[TWorkflowInput]): workflow=self, type=StepType.DEFAULT, name=self._parse_task_name(name, func), - execution_timeout=execution_timeout, - schedule_timeout=schedule_timeout, + execution_timeout=computed_params.execution_timeout, + schedule_timeout=computed_params.schedule_timeout, parents=parents, - retries=retries, + retries=computed_params.retries, rate_limits=[r.to_proto() for r in rate_limits], desired_worker_labels={ key: transform_desired_worker_label(d) for key, d in desired_worker_labels.items() }, - backoff_factor=backoff_factor, - backoff_max_seconds=backoff_max_seconds, + backoff_factor=computed_params.backoff_factor, + backoff_max_seconds=computed_params.backoff_max_seconds, concurrency=concurrency, wait_for=wait_for, skip_if=skip_if, @@ -610,8 +674,8 @@ class Workflow(BaseWorkflow[TWorkflowInput]): def durable_task( self, name: str | None = None, - schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT, - execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT, + schedule_timeout: Duration = timedelta(minutes=5), + execution_timeout: Duration = timedelta(seconds=60), parents: list[Task[TWorkflowInput, Any]] = [], retries: int = 0, rate_limits: list[RateLimit] = [], @@ -661,6 +725,15 @@ class Workflow(BaseWorkflow[TWorkflowInput]): :returns: A decorator which creates a `Task` object. """ + computed_params = ComputedTaskParameters( + schedule_timeout=schedule_timeout, + execution_timeout=execution_timeout, + retries=retries, + backoff_factor=backoff_factor, + backoff_max_seconds=backoff_max_seconds, + task_defaults=self.config.task_defaults, + ) + def inner( func: Callable[[TWorkflowInput, DurableContext], R] ) -> Task[TWorkflowInput, R]: @@ -670,17 +743,17 @@ class Workflow(BaseWorkflow[TWorkflowInput]): workflow=self, type=StepType.DEFAULT, name=self._parse_task_name(name, func), - execution_timeout=execution_timeout, - schedule_timeout=schedule_timeout, + execution_timeout=computed_params.execution_timeout, + schedule_timeout=computed_params.schedule_timeout, parents=parents, - retries=retries, + retries=computed_params.retries, rate_limits=[r.to_proto() for r in rate_limits], desired_worker_labels={ key: transform_desired_worker_label(d) for key, d in desired_worker_labels.items() }, - backoff_factor=backoff_factor, - backoff_max_seconds=backoff_max_seconds, + backoff_factor=computed_params.backoff_factor, + backoff_max_seconds=computed_params.backoff_max_seconds, concurrency=concurrency, wait_for=wait_for, skip_if=skip_if, @@ -696,8 +769,8 @@ class Workflow(BaseWorkflow[TWorkflowInput]): def on_failure_task( self, name: str | None = None, - schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT, - execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT, + schedule_timeout: Duration = timedelta(minutes=5), + execution_timeout: Duration = timedelta(seconds=60), retries: int = 0, rate_limits: list[RateLimit] = [], backoff_factor: float | None = None, @@ -756,8 +829,8 @@ class Workflow(BaseWorkflow[TWorkflowInput]): def on_success_task( self, name: str | None = None, - schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT, - execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT, + schedule_timeout: Duration = timedelta(minutes=5), + execution_timeout: Duration = timedelta(seconds=60), retries: int = 0, rate_limits: list[RateLimit] = [], backoff_factor: float | None = None, diff --git a/sdks/python/openapi_patch.patch b/sdks/python/openapi_patch.patch deleted file mode 100644 index df5fdd443..000000000 --- a/sdks/python/openapi_patch.patch +++ /dev/null @@ -1,23 +0,0 @@ -diff --git a/hatchet_sdk/clients/rest/models/workflow_runs_metrics.py b/hatchet_sdk/clients/rest/models/workflow_runs_metrics.py -index 71b6351..5f70c44 100644 ---- a/hatchet_sdk/clients/rest/models/workflow_runs_metrics.py -+++ b/hatchet_sdk/clients/rest/models/workflow_runs_metrics.py -@@ -22,13 +22,17 @@ from typing import Any, ClassVar, Dict, List, Optional, Set - from pydantic import BaseModel, ConfigDict - from typing_extensions import Self - -+from hatchet_sdk.clients.rest.models.workflow_runs_metrics_counts import ( -+ WorkflowRunsMetricsCounts, -+) -+ - - class WorkflowRunsMetrics(BaseModel): - """ - WorkflowRunsMetrics - """ # noqa: E501 - -- counts: Optional[Dict[str, Any]] = None -+ counts: Optional[WorkflowRunsMetricsCounts] = None - __properties: ClassVar[List[str]] = ["counts"] - - model_config = ConfigDict( diff --git a/sdks/python/pyproject.toml b/sdks/python/pyproject.toml index f1adab4e8..f19a14aaa 100644 --- a/sdks/python/pyproject.toml +++ b/sdks/python/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "hatchet-sdk" -version = "1.7.0" +version = "1.8.0" description = "" authors = ["Alexander Belanger "] readme = "README.md" diff --git a/sdks/python/tests/test_task_default_fallbacks.py b/sdks/python/tests/test_task_default_fallbacks.py new file mode 100644 index 000000000..b76012e53 --- /dev/null +++ b/sdks/python/tests/test_task_default_fallbacks.py @@ -0,0 +1,184 @@ +""" +IMPORTANT +---------- + +These tests are intended to prevent us from changing defaults in one place and +forgetting to change them in other places, or otherwise breaking the default handling logic. +If you get a failure here, you likely changed the default values for some of the params (below) +in one of the task decorators e.g. `Workflow.task`, etc. + +The intention of these tests is to: + 1. Ensure that the behavior of falling back to `TaskDefaults` works as expected, which means that + if no value for a certain parameter to one of these decorators is provided, it should fall back to the + value in `TaskDefaults` if one is set. +2. Ensure that the default values set in the rest of the codebase don't change, and are consistent with each other. + +If you change the default values for any of these parameters, please update the tests accordingly. +""" + +from datetime import timedelta +from typing import Any + +import pytest + +from hatchet_sdk import Context, DurableContext, EmptyModel, Hatchet, Task, TaskDefaults + + +def dummy_task(input: EmptyModel, context: Context) -> dict[str, str]: + return {"foo": "bar"} + + +def dummy_durable_task(input: EmptyModel, context: DurableContext) -> dict[str, str]: + return {"foo": "bar"} + + +DEFAULT_SCHEDULE_TIMEOUT = timedelta(minutes=5) +DEFAULT_EXECUTION_TIMEOUT = timedelta(seconds=60) +DEFAULT_RETRIES = 0 +DEFAULT_BACKOFF_FACTOR = None +DEFAULT_BACKOFF_MAX_SECONDS = None + + +def task( + hatchet: Hatchet, + is_durable: bool, + task_defaults: TaskDefaults, + **kwargs: Any, +) -> Task[EmptyModel, dict[str, str]]: + workflow = hatchet.workflow( + name="foo", + task_defaults=task_defaults, + ) + + task_fn = workflow.durable_task if is_durable else workflow.task + + return task_fn(**kwargs)(dummy_durable_task if is_durable else dummy_task) # type: ignore + + +def standalone_task( + hatchet: Hatchet, + is_durable: bool, + **kwargs: Any, +) -> Task[EmptyModel, dict[str, str]]: + task_fn = hatchet.durable_task if is_durable else hatchet.task + + return task_fn(**kwargs)(dummy_durable_task if is_durable else task)._task # type: ignore + + +@pytest.mark.parametrize("is_durable", [False, True]) +def test_task_defaults_applied_correctly(hatchet: Hatchet, is_durable: bool) -> None: + schedule_timeout = timedelta(seconds=3) + execution_timeout = timedelta(seconds=1) + retries = 4 + backoff_factor = 1 + backoff_max_seconds = 5 + + t = task( + hatchet=hatchet, + is_durable=is_durable, + task_defaults=TaskDefaults( + schedule_timeout=schedule_timeout, + execution_timeout=execution_timeout, + retries=retries, + backoff_factor=backoff_factor, + backoff_max_seconds=backoff_max_seconds, + ), + ) + + assert t.schedule_timeout == schedule_timeout + assert t.execution_timeout == execution_timeout + assert t.retries == retries + assert t.backoff_factor == backoff_factor + assert t.backoff_max_seconds == backoff_max_seconds + + +@pytest.mark.parametrize( + "is_durable,is_standalone", + [ + (False, False), + (True, False), + (False, True), + (True, True), + ], +) +def test_fallbacking_ensure_default_unchanged( + hatchet: Hatchet, is_durable: bool, is_standalone: bool +) -> None: + t = task( + hatchet=hatchet, + is_durable=is_durable, + task_defaults=TaskDefaults(), + ) + + """If this test fails, it means that you changed the default values for the params to one of the `task` or `durable_task` decorators""" + assert t.schedule_timeout == DEFAULT_SCHEDULE_TIMEOUT + assert t.execution_timeout == DEFAULT_EXECUTION_TIMEOUT + assert t.retries == DEFAULT_RETRIES + assert t.backoff_factor == DEFAULT_BACKOFF_FACTOR + assert t.backoff_max_seconds == DEFAULT_BACKOFF_MAX_SECONDS + + +@pytest.mark.parametrize( + "is_durable,is_standalone", + [ + (False, False), + (True, False), + (False, True), + (True, True), + ], +) +def test_defaults_correctly_overridden_by_params_passed_in( + hatchet: Hatchet, is_durable: bool, is_standalone: bool +) -> None: + t = task( + hatchet=hatchet, + is_durable=is_durable, + task_defaults=TaskDefaults( + schedule_timeout=timedelta(seconds=3), + execution_timeout=timedelta(seconds=1), + retries=4, + backoff_factor=1, + backoff_max_seconds=5, + ), + schedule_timeout=timedelta(seconds=9), + execution_timeout=timedelta(seconds=2), + retries=6, + backoff_factor=3, + backoff_max_seconds=5, + ) + + assert t.schedule_timeout == timedelta(seconds=9) + assert t.execution_timeout == timedelta(seconds=2) + assert t.retries == 6 + assert t.backoff_factor == 3 + assert t.backoff_max_seconds == 5 + + +@pytest.mark.parametrize( + "is_durable,is_standalone", + [ + (False, False), + (True, False), + (False, True), + (True, True), + ], +) +def test_params_correctly_set_with_no_defaults( + hatchet: Hatchet, is_durable: bool, is_standalone: bool +) -> None: + t = task( + hatchet=hatchet, + is_durable=is_durable, + task_defaults=TaskDefaults(), + schedule_timeout=timedelta(seconds=9), + execution_timeout=timedelta(seconds=2), + retries=6, + backoff_factor=3, + backoff_max_seconds=5, + ) + + assert t.schedule_timeout == timedelta(seconds=9) + assert t.execution_timeout == timedelta(seconds=2) + assert t.retries == 6 + assert t.backoff_factor == 3 + assert t.backoff_max_seconds == 5