[Python] Fix: Task defaults, Patching fixes, Datetime conversions (#1667)

* feat: start wiring up defaults

* feat: add test coverage

* fix: test docs

* feat: expand tests

* fix: rm validators for now

* chore: minor version

* fix: skip prio tests in ci for now

* chore: docs

* debug: attempt to fix prio test by running with a single slot and no concurrency

* fix: python script to apply patches

* chore; gen

* feat: atomic patches

* fix: rm sed-based patches

* fix: use current tz

* fix: ordering
This commit is contained in:
Matt Kaye
2025-05-06 17:31:36 -04:00
committed by GitHub
parent 3f4424b0fc
commit 94d06a643c
24 changed files with 1226 additions and 247 deletions

View File

@@ -1,6 +1,6 @@
# Hatchet Python SDK Reference
This is the Python SDK reference, documenting methods available for interacting with Hatchet resources. Check out the [user guide](../../home) for an introduction for getting your first tasks running
This is the Python SDK reference, documenting methods available for interacting with Hatchet resources. Check out the [user guide](../../home) for an introduction for getting your first tasks running.
## The Hatchet Python Client
@@ -94,18 +94,18 @@ Define a Hatchet workflow, which can then declare `task`s and be `run`, `schedul
Parameters:
| Name | Type | Description | Default |
| ------------------ | -------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------- |
| `name` | `str` | The name of the workflow. | _required_ |
| `description` | `str \| None` | A description for the workflow | `None` |
| `input_validator` | `Type[TWorkflowInput] \| None` | A Pydantic model to use as a validator for the `input` to the tasks in the workflow. If no validator is provided, defaults to an `EmptyModel` under the hood. The `EmptyModel` is a Pydantic model with no fields specified, and with the `extra` config option set to `"allow"`. | `None` |
| `on_events` | `list[str]` | A list of event triggers for the workflow - events which cause the workflow to be run. | `[]` |
| `on_crons` | `list[str]` | A list of cron triggers for the workflow. | `[]` |
| `version` | `str \| None` | A version for the workflow | `None` |
| `sticky` | `StickyStrategy \| None` | A sticky strategy for the workflow | `None` |
| `default_priority` | `int` | The priority of the workflow. Higher values will cause this workflow to have priority in scheduling over other, lower priority ones. | `1` |
| `concurrency` | `ConcurrencyExpression \| list[ConcurrencyExpression] \| None` | A concurrency object controlling the concurrency settings for this workflow. | `None` |
| `task_defaults` | `TaskDefaults` | A `TaskDefaults` object controlling the default task settings for this workflow. | `TaskDefaults()` |
| Name | Type | Description | Default |
| ------------------ | -------------------------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------- |
| `name` | `str` | The name of the workflow. | _required_ |
| `description` | `str \| None` | A description for the workflow | `None` |
| `input_validator` | `Type[TWorkflowInput] \| None` | A Pydantic model to use as a validator for the `input` to the tasks in the workflow. If no validator is provided, defaults to an `EmptyModel` under the hood. | `None` |
| `on_events` | `list[str]` | A list of event triggers for the workflow - events which cause the workflow to be run. | `[]` |
| `on_crons` | `list[str]` | A list of cron triggers for the workflow. | `[]` |
| `version` | `str \| None` | A version for the workflow | `None` |
| `sticky` | `StickyStrategy \| None` | A sticky strategy for the workflow | `None` |
| `default_priority` | `int` | The priority of the workflow. Higher values will cause this workflow to have priority in scheduling over other, lower priority ones. | `1` |
| `concurrency` | `ConcurrencyExpression \| list[ConcurrencyExpression] \| None` | A concurrency object controlling the concurrency settings for this workflow. | `None` |
| `task_defaults` | `TaskDefaults` | A `TaskDefaults` object controlling the default task settings for this workflow. | `TaskDefaults()` |
Returns:
@@ -119,24 +119,24 @@ A decorator to transform a function into a standalone Hatchet task that runs as
Parameters:
| Name | Type | Description | Default |
| ----------------------- | -------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------- | --------------------------- |
| `name` | `str` | The name of the task. If not specified, defaults to the name of the function being wrapped by the `task` decorator. | _required_ |
| `description` | `str \| None` | An optional description for the task. | `None` |
| `input_validator` | `Type[TWorkflowInput] \| None` | A Pydantic model to use as a validator for the input to the task. If no validator is provided, defaults to an `EmptyModel`. | `None` |
| `on_events` | `list[str]` | A list of event triggers for the task - events which cause the task to be run. | `[]` |
| `on_crons` | `list[str]` | A list of cron triggers for the task. | `[]` |
| `version` | `str \| None` | A version for the task. | `None` |
| `sticky` | `StickyStrategy \| None` | A sticky strategy for the task. | `None` |
| `default_priority` | `int` | The priority of the task. Higher values will cause this task to have priority in scheduling. | `1` |
| `concurrency` | `ConcurrencyExpression \| list[ConcurrencyExpression] \| None` | A concurrency object controlling the concurrency settings for this task. | `None` |
| `schedule_timeout` | `Duration` | The maximum time allowed for scheduling the task. | `DEFAULT_SCHEDULE_TIMEOUT` |
| `execution_timeout` | `Duration` | The maximum time allowed for executing the task. | `DEFAULT_EXECUTION_TIMEOUT` |
| `retries` | `int` | The number of times to retry the task before failing. | `0` |
| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the task. | `[]` |
| `desired_worker_labels` | `dict[str, DesiredWorkerLabel]` | A dictionary of desired worker labels that determine to which worker the task should be assigned. | `{}` |
| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` |
| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` |
| Name | Type | Description | Default |
| ----------------------- | -------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------- | ----------------------- |
| `name` | `str` | The name of the task. If not specified, defaults to the name of the function being wrapped by the `task` decorator. | _required_ |
| `description` | `str \| None` | An optional description for the task. | `None` |
| `input_validator` | `Type[TWorkflowInput] \| None` | A Pydantic model to use as a validator for the input to the task. If no validator is provided, defaults to an `EmptyModel`. | `None` |
| `on_events` | `list[str]` | A list of event triggers for the task - events which cause the task to be run. | `[]` |
| `on_crons` | `list[str]` | A list of cron triggers for the task. | `[]` |
| `version` | `str \| None` | A version for the task. | `None` |
| `sticky` | `StickyStrategy \| None` | A sticky strategy for the task. | `None` |
| `default_priority` | `int` | The priority of the task. Higher values will cause this task to have priority in scheduling. | `1` |
| `concurrency` | `ConcurrencyExpression \| list[ConcurrencyExpression] \| None` | A concurrency object controlling the concurrency settings for this task. | `None` |
| `schedule_timeout` | `Duration` | The maximum time allowed for scheduling the task. | `timedelta(minutes=5)` |
| `execution_timeout` | `Duration` | The maximum time allowed for executing the task. | `timedelta(seconds=60)` |
| `retries` | `int` | The number of times to retry the task before failing. | `0` |
| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the task. | `[]` |
| `desired_worker_labels` | `dict[str, DesiredWorkerLabel]` | A dictionary of desired worker labels that determine to which worker the task should be assigned. | `{}` |
| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` |
| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` |
Returns:
@@ -150,24 +150,24 @@ A decorator to transform a function into a standalone Hatchet _durable_ task tha
Parameters:
| Name | Type | Description | Default |
| ----------------------- | ------------------------------- | --------------------------------------------------------------------------------------------------------------------------- | --------------------------- |
| `name` | `str` | The name of the task. If not specified, defaults to the name of the function being wrapped by the `task` decorator. | _required_ |
| `description` | `str \| None` | An optional description for the task. | `None` |
| `input_validator` | `Type[TWorkflowInput] \| None` | A Pydantic model to use as a validator for the input to the task. If no validator is provided, defaults to an `EmptyModel`. | `None` |
| `on_events` | `list[str]` | A list of event triggers for the task - events which cause the task to be run. | `[]` |
| `on_crons` | `list[str]` | A list of cron triggers for the task. | `[]` |
| `version` | `str \| None` | A version for the task. | `None` |
| `sticky` | `StickyStrategy \| None` | A sticky strategy for the task. | `None` |
| `default_priority` | `int` | The priority of the task. Higher values will cause this task to have priority in scheduling. | `1` |
| `concurrency` | `ConcurrencyExpression \| None` | A concurrency object controlling the concurrency settings for this task. | `None` |
| `schedule_timeout` | `Duration` | The maximum time allowed for scheduling the task. | `DEFAULT_SCHEDULE_TIMEOUT` |
| `execution_timeout` | `Duration` | The maximum time allowed for executing the task. | `DEFAULT_EXECUTION_TIMEOUT` |
| `retries` | `int` | The number of times to retry the task before failing. | `0` |
| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the task. | `[]` |
| `desired_worker_labels` | `dict[str, DesiredWorkerLabel]` | A dictionary of desired worker labels that determine to which worker the task should be assigned. | `{}` |
| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` |
| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` |
| Name | Type | Description | Default |
| ----------------------- | ------------------------------- | --------------------------------------------------------------------------------------------------------------------------- | ----------------------- |
| `name` | `str` | The name of the task. If not specified, defaults to the name of the function being wrapped by the `task` decorator. | _required_ |
| `description` | `str \| None` | An optional description for the task. | `None` |
| `input_validator` | `Type[TWorkflowInput] \| None` | A Pydantic model to use as a validator for the input to the task. If no validator is provided, defaults to an `EmptyModel`. | `None` |
| `on_events` | `list[str]` | A list of event triggers for the task - events which cause the task to be run. | `[]` |
| `on_crons` | `list[str]` | A list of cron triggers for the task. | `[]` |
| `version` | `str \| None` | A version for the task. | `None` |
| `sticky` | `StickyStrategy \| None` | A sticky strategy for the task. | `None` |
| `default_priority` | `int` | The priority of the task. Higher values will cause this task to have priority in scheduling. | `1` |
| `concurrency` | `ConcurrencyExpression \| None` | A concurrency object controlling the concurrency settings for this task. | `None` |
| `schedule_timeout` | `Duration` | The maximum time allowed for scheduling the task. | `timedelta(minutes=5)` |
| `execution_timeout` | `Duration` | The maximum time allowed for executing the task. | `timedelta(seconds=60)` |
| `retries` | `int` | The number of times to retry the task before failing. | `0` |
| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the task. | `[]` |
| `desired_worker_labels` | `dict[str, DesiredWorkerLabel]` | A dictionary of desired worker labels that determine to which worker the task should be assigned. | `{}` |
| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` |
| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` |
Returns:

View File

@@ -9,10 +9,10 @@ Methods:
| Name | Description |
| -------------------------- | ------------------------------------------------------------------------- |
| `aio_get_queue_metrics` | Retrieve queue metrics for a set of workflow ids and additional metadata. |
| `aio_get_task_metrics` | Retrieve queue metrics |
| `aio_get_task_metrics` | Retrieve queue metrics. |
| `aio_get_workflow_metrics` | Retrieve workflow metrics for a given workflow ID. |
| `get_queue_metrics` | Retrieve queue metrics for a set of workflow ids and additional metadata. |
| `get_task_metrics` | Retrieve queue metrics |
| `get_task_metrics` | Retrieve queue metrics. |
| `get_workflow_metrics` | Retrieve workflow metrics for a given workflow ID. |
### Functions
@@ -36,13 +36,13 @@ Returns:
#### `aio_get_task_metrics`
Retrieve queue metrics
Retrieve queue metrics.
Returns:
| Type | Description |
| --------------------------- | ------------------------------------- |
| `TenantStepRunQueueMetrics` | Step run queue metrics for the tenant |
| Type | Description |
| --------------------------- | -------------------------------------- |
| `TenantStepRunQueueMetrics` | Step run queue metrics for the tenant. |
#### `aio_get_workflow_metrics`
@@ -81,13 +81,13 @@ Returns:
#### `get_task_metrics`
Retrieve queue metrics
Retrieve queue metrics.
Returns:
| Type | Description |
| --------------------------- | ------------------------------------- |
| `TenantStepRunQueueMetrics` | Step run queue metrics for the tenant |
| Type | Description |
| --------------------------- | -------------------------------------- |
| `TenantStepRunQueueMetrics` | Step run queue metrics for the tenant. |
#### `get_workflow_metrics`

View File

@@ -9,7 +9,7 @@
Bases: `BaseWorkflow[TWorkflowInput]`
A Hatchet workflow, which allows you to define tasks to be run and perform actions on the workflow.
A Hatchet workflow allows you to define tasks to be run and perform actions on the workflow.
Workflows in Hatchet represent coordinated units of work that can be triggered, scheduled, or run on a cron schedule. Each workflow can contain multiple tasks that can be arranged in dependencies (DAGs), have customized retry behavior, timeouts, concurrency controls, and more.
@@ -40,9 +40,9 @@ Workflows support various execution patterns including:
- Cron-based recurring execution with `create_cron()`
- Bulk operations with `run_many()`
Tasks within workflows can be defined with `@workflow.task()` or `@workflow.durable_task()` decorators and can be arranged into complex dependency patterns.
Tasks within workflows can be defined with `@workflow.task()` or `@workflow.durable_task()` decorators and arranged into complex dependency patterns.
Methods:
### Methods
| Name | Description |
| ---------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------ |
@@ -73,21 +73,21 @@ A decorator to transform a function into a Hatchet task that runs as part of a w
Parameters:
| Name | Type | Description | Default |
| ----------------------- | --------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------- |
| `name` | `str \| None` | The name of the task. If not specified, defaults to the name of the function being wrapped by the `task` decorator. | `None` |
| `schedule_timeout` | `Duration` | The maximum time to wait for the task to be scheduled. The run will be canceled if the task does not begin within this time. | `DEFAULT_SCHEDULE_TIMEOUT` |
| `execution_timeout` | `Duration` | The maximum time to wait for the task to complete. The run will be canceled if the task does not complete within this time. | `DEFAULT_EXECUTION_TIMEOUT` |
| `parents` | `list[Task[TWorkflowInput, Any]]` | A list of tasks that are parents of the task. Note: Parents must be defined before their children. | `[]` |
| `retries` | `int` | The number of times to retry the task before failing. | `0` |
| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the task. | `[]` |
| `desired_worker_labels` | `dict[str, DesiredWorkerLabel]` | A dictionary of desired worker labels that determine to which worker the task should be assigned. See documentation and examples on affinity and worker labels for more details. | `{}` |
| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` |
| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` |
| `concurrency` | `list[ConcurrencyExpression]` | A list of concurrency expressions for the task. | `[]` |
| `wait_for` | `list[Condition \| OrGroup]` | A list of conditions that must be met before the task can run. | `[]` |
| `skip_if` | `list[Condition \| OrGroup]` | A list of conditions that, if met, will cause the task to be skipped. | `[]` |
| `cancel_if` | `list[Condition \| OrGroup]` | A list of conditions that, if met, will cause the task to be canceled. | `[]` |
| Name | Type | Description | Default |
| ----------------------- | --------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------- |
| `name` | `str \| None` | The name of the task. If not specified, defaults to the name of the function being wrapped by the `task` decorator. | `None` |
| `schedule_timeout` | `Duration` | The maximum time to wait for the task to be scheduled. The run will be canceled if the task does not begin within this time. | `timedelta(minutes=5)` |
| `execution_timeout` | `Duration` | The maximum time to wait for the task to complete. The run will be canceled if the task does not complete within this time. | `timedelta(seconds=60)` |
| `parents` | `list[Task[TWorkflowInput, Any]]` | A list of tasks that are parents of the task. Note: Parents must be defined before their children. | `[]` |
| `retries` | `int` | The number of times to retry the task before failing. | `0` |
| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the task. | `[]` |
| `desired_worker_labels` | `dict[str, DesiredWorkerLabel]` | A dictionary of desired worker labels that determine to which worker the task should be assigned. See documentation and examples on affinity and worker labels for more details. | `{}` |
| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` |
| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` |
| `concurrency` | `list[ConcurrencyExpression]` | A list of concurrency expressions for the task. | `[]` |
| `wait_for` | `list[Condition \| OrGroup]` | A list of conditions that must be met before the task can run. | `[]` |
| `skip_if` | `list[Condition \| OrGroup]` | A list of conditions that, if met, will cause the task to be skipped. | `[]` |
| `cancel_if` | `list[Condition \| OrGroup]` | A list of conditions that, if met, will cause the task to be canceled. | `[]` |
Returns:
@@ -105,21 +105,21 @@ See the Hatchet docs for more information on durable execution to decide if this
Parameters:
| Name | Type | Description | Default |
| ----------------------- | --------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------- |
| `name` | `str \| None` | The name of the task. If not specified, defaults to the name of the function being wrapped by the `task` decorator. | `None` |
| `schedule_timeout` | `Duration` | The maximum time to wait for the task to be scheduled. The run will be canceled if the task does not begin within this time. | `DEFAULT_SCHEDULE_TIMEOUT` |
| `execution_timeout` | `Duration` | The maximum time to wait for the task to complete. The run will be canceled if the task does not complete within this time. | `DEFAULT_EXECUTION_TIMEOUT` |
| `parents` | `list[Task[TWorkflowInput, Any]]` | A list of tasks that are parents of the task. Note: Parents must be defined before their children. | `[]` |
| `retries` | `int` | The number of times to retry the task before failing. | `0` |
| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the task. | `[]` |
| `desired_worker_labels` | `dict[str, DesiredWorkerLabel]` | A dictionary of desired worker labels that determine to which worker the task should be assigned. See documentation and examples on affinity and worker labels for more details. | `{}` |
| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` |
| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` |
| `concurrency` | `list[ConcurrencyExpression]` | A list of concurrency expressions for the task. | `[]` |
| `wait_for` | `list[Condition \| OrGroup]` | A list of conditions that must be met before the task can run. | `[]` |
| `skip_if` | `list[Condition \| OrGroup]` | A list of conditions that, if met, will cause the task to be skipped. | `[]` |
| `cancel_if` | `list[Condition \| OrGroup]` | A list of conditions that, if met, will cause the task to be canceled. | `[]` |
| Name | Type | Description | Default |
| ----------------------- | --------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------- |
| `name` | `str \| None` | The name of the task. If not specified, defaults to the name of the function being wrapped by the `task` decorator. | `None` |
| `schedule_timeout` | `Duration` | The maximum time to wait for the task to be scheduled. The run will be canceled if the task does not begin within this time. | `timedelta(minutes=5)` |
| `execution_timeout` | `Duration` | The maximum time to wait for the task to complete. The run will be canceled if the task does not complete within this time. | `timedelta(seconds=60)` |
| `parents` | `list[Task[TWorkflowInput, Any]]` | A list of tasks that are parents of the task. Note: Parents must be defined before their children. | `[]` |
| `retries` | `int` | The number of times to retry the task before failing. | `0` |
| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the task. | `[]` |
| `desired_worker_labels` | `dict[str, DesiredWorkerLabel]` | A dictionary of desired worker labels that determine to which worker the task should be assigned. See documentation and examples on affinity and worker labels for more details. | `{}` |
| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` |
| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` |
| `concurrency` | `list[ConcurrencyExpression]` | A list of concurrency expressions for the task. | `[]` |
| `wait_for` | `list[Condition \| OrGroup]` | A list of conditions that must be met before the task can run. | `[]` |
| `skip_if` | `list[Condition \| OrGroup]` | A list of conditions that, if met, will cause the task to be skipped. | `[]` |
| `cancel_if` | `list[Condition \| OrGroup]` | A list of conditions that, if met, will cause the task to be canceled. | `[]` |
Returns:
@@ -133,16 +133,16 @@ A decorator to transform a function into a Hatchet on-failure task that runs as
Parameters:
| Name | Type | Description | Default |
| --------------------- | ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- | --------------------------- |
| `name` | `str \| None` | The name of the on-failure task. If not specified, defaults to the name of the function being wrapped by the `on_failure_task` decorator. | `None` |
| `schedule_timeout` | `Duration` | The maximum time to wait for the task to be scheduled. The run will be canceled if the task does not begin within this time. | `DEFAULT_SCHEDULE_TIMEOUT` |
| `execution_timeout` | `Duration` | The maximum time to wait for the task to complete. The run will be canceled if the task does not complete within this time. | `DEFAULT_EXECUTION_TIMEOUT` |
| `retries` | `int` | The number of times to retry the on-failure task before failing. | `0` |
| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the on-failure task. | `[]` |
| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` |
| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` |
| `concurrency` | `list[ConcurrencyExpression]` | A list of concurrency expressions for the on-success task. | `[]` |
| Name | Type | Description | Default |
| --------------------- | ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- | ----------------------- |
| `name` | `str \| None` | The name of the on-failure task. If not specified, defaults to the name of the function being wrapped by the `on_failure_task` decorator. | `None` |
| `schedule_timeout` | `Duration` | The maximum time to wait for the task to be scheduled. The run will be canceled if the task does not begin within this time. | `timedelta(minutes=5)` |
| `execution_timeout` | `Duration` | The maximum time to wait for the task to complete. The run will be canceled if the task does not complete within this time. | `timedelta(seconds=60)` |
| `retries` | `int` | The number of times to retry the on-failure task before failing. | `0` |
| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the on-failure task. | `[]` |
| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` |
| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` |
| `concurrency` | `list[ConcurrencyExpression]` | A list of concurrency expressions for the on-success task. | `[]` |
Returns:
@@ -156,16 +156,16 @@ A decorator to transform a function into a Hatchet on-success task that runs as
Parameters:
| Name | Type | Description | Default |
| --------------------- | ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- | --------------------------- |
| `name` | `str \| None` | The name of the on-success task. If not specified, defaults to the name of the function being wrapped by the `on_success_task` decorator. | `None` |
| `schedule_timeout` | `Duration` | The maximum time to wait for the task to be scheduled. The run will be canceled if the task does not begin within this time. | `DEFAULT_SCHEDULE_TIMEOUT` |
| `execution_timeout` | `Duration` | The maximum time to wait for the task to complete. The run will be canceled if the task does not complete within this time. | `DEFAULT_EXECUTION_TIMEOUT` |
| `retries` | `int` | The number of times to retry the on-success task before failing | `0` |
| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the on-success task. | `[]` |
| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` |
| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` |
| `concurrency` | `list[ConcurrencyExpression]` | A list of concurrency expressions for the on-success task. | `[]` |
| Name | Type | Description | Default |
| --------------------- | ----------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------- | ----------------------- |
| `name` | `str \| None` | The name of the on-success task. If not specified, defaults to the name of the function being wrapped by the `on_success_task` decorator. | `None` |
| `schedule_timeout` | `Duration` | The maximum time to wait for the task to be scheduled. The run will be canceled if the task does not begin within this time. | `timedelta(minutes=5)` |
| `execution_timeout` | `Duration` | The maximum time to wait for the task to complete. The run will be canceled if the task does not complete within this time. | `timedelta(seconds=60)` |
| `retries` | `int` | The number of times to retry the on-success task before failing | `0` |
| `rate_limits` | `list[RateLimit]` | A list of rate limit configurations for the on-success task. | `[]` |
| `backoff_factor` | `float \| None` | The backoff factor for controlling exponential backoff in retries. | `None` |
| `backoff_max_seconds` | `int \| None` | The maximum number of seconds to allow retries with exponential backoff to continue. | `None` |
| `concurrency` | `list[ConcurrencyExpression]` | A list of concurrency expressions for the on-success task. | `[]` |
Returns:
@@ -441,7 +441,7 @@ Returns:
Bases: `BaseWorkflow[TWorkflowInput]`, `Generic[TWorkflowInput, R]`
Methods:
### Methods
| Name | Description |
| ---------------------- | ------------------------------------------------------------------------- |

View File

@@ -0,0 +1,140 @@
import re
from copy import deepcopy
from pathlib import Path
from typing import Callable
def prepend_import(content: str, import_statement: str) -> str:
if import_statement in content:
return content
match = re.search(r"^import\s+|^from\s+", content, re.MULTILINE)
insert_position = match.start() if match else 0
return (
content[:insert_position] + import_statement + "\n" + content[insert_position:]
)
def apply_patch(content: str, pattern: str, replacement: str) -> str:
return re.sub(pattern, replacement, content)
def atomically_patch_file(
file_path: str, patch_funcs: list[Callable[[str], str]]
) -> None:
path = Path(file_path)
original = path.read_text()
modified = deepcopy(original)
try:
for func in patch_funcs:
modified = func(modified)
except Exception as e:
print(f"Error patching {file_path}: {e}")
return
if modified != original:
path.write_text(modified)
print(f"Patched {file_path}")
else:
print(f"No changes made to {file_path}")
def patch_contract_import_paths(content: str) -> str:
return apply_patch(content, r"\bfrom v1\b", "from hatchet_sdk.contracts.v1")
def patch_grpc_dispatcher_import(content: str) -> str:
return apply_patch(
content,
r"\bimport dispatcher_pb2 as dispatcher__pb2\b",
"from hatchet_sdk.contracts import dispatcher_pb2 as dispatcher__pb2",
)
def patch_grpc_events_import(content: str) -> str:
return apply_patch(
content,
r"\bimport events_pb2 as events__pb2\b",
"from hatchet_sdk.contracts import events_pb2 as events__pb2",
)
def patch_grpc_workflows_import(content: str) -> str:
return apply_patch(
content,
r"\bimport workflows_pb2 as workflows__pb2\b",
"from hatchet_sdk.contracts import workflows_pb2 as workflows__pb2",
)
def patch_grpc_init_signature(content: str) -> str:
return apply_patch(
content,
r"def __init__\(self, channel\):",
"def __init__(self, channel: grpc.Channel | grpc.aio.Channel) -> None:",
)
def apply_patches_to_matching_files(
root: str, glob: str, patch_funcs: list[Callable[[str], str]]
) -> None:
for file_path in Path(root).rglob(glob):
atomically_patch_file(str(file_path), patch_funcs)
def patch_api_client_datetime_format_on_post(content: str) -> str:
content = prepend_import(content, "from hatchet_sdk.logger import logger")
pattern = r"([ \t]*)elif isinstance\(obj, \(datetime\.datetime, datetime\.date\)\):\s*\n\1[ \t]*return obj\.isoformat\(\)"
replacement = (
r"\1## IMPORTANT: Checking `datetime` must come before `date` since `datetime` is a subclass of `date`\n"
r"\1elif isinstance(obj, datetime.datetime):\n"
r"\1 if not obj.tzinfo:\n"
r"\1 current_tz = (datetime.datetime.now(datetime.timezone(datetime.timedelta(0))).astimezone().tzinfo or datetime.timezone.utc)\n"
r'\1 logger.warning(f"timezone-naive datetime found. assuming {current_tz}.")\n'
r"\1 obj = obj.replace(tzinfo=current_tz)\n\n"
r"\1 return obj.isoformat()\n"
r"\1elif isinstance(obj, datetime.date):\n"
r"\1 return obj.isoformat()"
)
return apply_patch(content, pattern, replacement)
def patch_workflow_run_metrics_counts_return_type(content: str) -> str:
content = prepend_import(
content,
"from hatchet_sdk.clients.rest.models.workflow_runs_metrics_counts import WorkflowRunsMetricsCounts",
)
pattern = r"([ \t]*)counts: Optional\[Dict\[str, Any\]\] = None"
replacement = r"\1counts: Optional[WorkflowRunsMetricsCounts] = None"
return apply_patch(content, pattern, replacement)
if __name__ == "__main__":
atomically_patch_file(
"hatchet_sdk/clients/rest/api_client.py",
[patch_api_client_datetime_format_on_post],
)
atomically_patch_file(
"hatchet_sdk/clients/rest/models/workflow_runs_metrics.py",
[patch_workflow_run_metrics_counts_return_type],
)
grpc_patches: list[Callable[[str], str]] = [
patch_contract_import_paths,
patch_grpc_dispatcher_import,
patch_grpc_events_import,
patch_grpc_workflows_import,
patch_grpc_init_signature,
]
pb2_patches: list[Callable[[str], str]] = [
patch_contract_import_paths,
]
apply_patches_to_matching_files("hatchet_sdk/contracts", "*_grpc.py", grpc_patches)
apply_patches_to_matching_files("hatchet_sdk/contracts", "*_pb2.py", pb2_patches)
apply_patches_to_matching_files("hatchet_sdk/contracts", "*_pb2.pyi", pb2_patches)

View File

@@ -51,7 +51,7 @@ class RunMetadata(BaseModel):
return self.key
@pytest.mark.asyncio()
@pytest.mark.asyncio(loop_scope="session")
async def test_workflow_level_concurrency(hatchet: Hatchet) -> None:
test_run_id = str(uuid4())

View File

@@ -1,7 +1,8 @@
import asyncio
from datetime import datetime, timedelta
from random import choice
from typing import AsyncGenerator, Literal
from subprocess import Popen
from typing import Any, AsyncGenerator, Literal
from uuid import uuid4
import pytest
@@ -60,8 +61,20 @@ async def dummy_runs() -> None:
return None
@pytest.mark.asyncio()
async def test_priority(hatchet: Hatchet, dummy_runs: None) -> None:
@pytest.mark.parametrize(
"on_demand_worker",
[
(
["poetry", "run", "python", "examples/priority/worker.py", "--slots", "1"],
8003,
)
],
indirect=True,
)
@pytest.mark.asyncio(loop_scope="session")
async def test_priority(
hatchet: Hatchet, dummy_runs: None, on_demand_worker: Popen[Any]
) -> None:
test_run_id = str(uuid4())
choices: list[Priority] = ["low", "medium", "high", "default"]
N = 30
@@ -134,8 +147,20 @@ async def test_priority(hatchet: Hatchet, dummy_runs: None) -> None:
assert curr.finished_at >= curr.started_at
@pytest.mark.asyncio()
async def test_priority_via_scheduling(hatchet: Hatchet, dummy_runs: None) -> None:
@pytest.mark.parametrize(
"on_demand_worker",
[
(
["poetry", "run", "python", "examples/priority/worker.py", "--slots", "1"],
8003,
)
],
indirect=True,
)
@pytest.mark.asyncio(loop_scope="session")
async def test_priority_via_scheduling(
hatchet: Hatchet, dummy_runs: None, on_demand_worker: Popen[Any]
) -> None:
test_run_id = str(uuid4())
sleep_time = 3
n = 30
@@ -258,8 +283,20 @@ def time_until_next_minute() -> float:
return (next_minute - now).total_seconds()
@pytest.mark.asyncio()
async def test_priority_via_cron(hatchet: Hatchet, crons: tuple[str, str, int]) -> None:
@pytest.mark.parametrize(
"on_demand_worker",
[
(
["poetry", "run", "python", "examples/priority/worker.py", "--slots", "1"],
8003,
)
],
indirect=True,
)
@pytest.mark.asyncio(loop_scope="session")
async def test_priority_via_cron(
hatchet: Hatchet, crons: tuple[str, str, int], on_demand_worker: Popen[Any]
) -> None:
workflow_id, test_run_id, n = crons
await asyncio.sleep(time_until_next_minute() + 10)

View File

@@ -1,12 +1,6 @@
import time
from hatchet_sdk import (
ConcurrencyExpression,
ConcurrencyLimitStrategy,
Context,
EmptyModel,
Hatchet,
)
from hatchet_sdk import Context, EmptyModel, Hatchet
hatchet = Hatchet(debug=True)
@@ -17,11 +11,6 @@ SLEEP_TIME = 0.25
priority_workflow = hatchet.workflow(
name="PriorityWorkflow",
default_priority=DEFAULT_PRIORITY,
concurrency=ConcurrencyExpression(
max_runs=1,
expression="'true'",
limit_strategy=ConcurrencyLimitStrategy.GROUP_ROUND_ROBIN,
),
)
# !!

View File

@@ -16,7 +16,6 @@ from examples.lifespans.simple import lifespan, lifespan_task
from examples.logger.workflow import logging_workflow
from examples.non_retryable.worker import non_retryable_workflow
from examples.on_failure.worker import on_failure_wf, on_failure_wf_with_details
from examples.priority.worker import priority_workflow
from examples.timeout.worker import refresh_timeout_wf, timeout_wf
from examples.waits.worker import task_condition_workflow
from hatchet_sdk import Hatchet
@@ -52,7 +51,6 @@ def main() -> None:
sync_fanout_child,
non_retryable_workflow,
concurrency_workflow_level_workflow,
priority_workflow,
lifespan_task,
],
lifespan=lifespan,

View File

@@ -94,24 +94,13 @@ git restore pyproject.toml poetry.lock
poetry install --all-extras
# Fix relative imports in _grpc.py files
find ./hatchet_sdk/contracts -type f -name '*_grpc.py' -print0 | xargs -0 sed -i '' 's/from v1/from hatchet_sdk.contracts.v1/g'
find ./hatchet_sdk/contracts -type f -name '*_pb2.pyi' -print0 | xargs -0 sed -i '' 's/from v1/from hatchet_sdk.contracts.v1/g'
find ./hatchet_sdk/contracts -type f -name '*_pb2.py' -print0 | xargs -0 sed -i '' 's/from v1/from hatchet_sdk.contracts.v1/g'
find ./hatchet_sdk/contracts -type f -name '*_grpc.py' -print0 | xargs -0 sed -i '' 's/import dispatcher_pb2 as dispatcher__pb2/from hatchet_sdk.contracts import dispatcher_pb2 as dispatcher__pb2/g'
find ./hatchet_sdk/contracts -type f -name '*_grpc.py' -print0 | xargs -0 sed -i '' 's/import events_pb2 as events__pb2/from hatchet_sdk.contracts import events_pb2 as events__pb2/g'
find ./hatchet_sdk/contracts -type f -name '*_grpc.py' -print0 | xargs -0 sed -i '' 's/import workflows_pb2 as workflows__pb2/from hatchet_sdk.contracts import workflows_pb2 as workflows__pb2/g'
find ./hatchet_sdk/contracts -type f -name '*_grpc.py' -print0 | xargs -0 sed -i '' 's/def __init__(self, channel):/def __init__(self, channel: grpc.Channel | grpc.aio.Channel) -> None:/g'
set +e
# Note: Hack to run the linters without failing so that we can apply the patch
# Note: Hack to run the linters without failing so that we can apply the patches
./lint.sh
set -e
# apply patch to openapi-generator generated code
patch -p1 --no-backup-if-mismatch <./openapi_patch.patch
# apply patches to openapi-generator generated code
poetry run python apply_patches.py
# Rerun the linters and fail if there are any issues
./lint.sh

View File

@@ -245,6 +245,8 @@ from hatchet_sdk.clients.rest.models.v1_task_run_status import V1TaskRunStatus
from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus
from hatchet_sdk.clients.rest.models.v1_task_summary import V1TaskSummary
from hatchet_sdk.clients.rest.models.v1_task_summary_list import V1TaskSummaryList
from hatchet_sdk.clients.rest.models.v1_task_timing import V1TaskTiming
from hatchet_sdk.clients.rest.models.v1_task_timing_list import V1TaskTimingList
from hatchet_sdk.clients.rest.models.v1_trigger_workflow_run_request import (
V1TriggerWorkflowRunRequest,
)

View File

@@ -1555,6 +1555,9 @@ class TaskApi:
since: Annotated[
datetime, Field(description="The start time to get metrics for")
],
until: Annotated[
Optional[datetime], Field(description="The end time to get metrics for")
] = None,
workflow_ids: Annotated[
Optional[
List[Annotated[str, Field(min_length=36, strict=True, max_length=36)]]
@@ -1585,6 +1588,8 @@ class TaskApi:
:type tenant: str
:param since: The start time to get metrics for (required)
:type since: datetime
:param until: The end time to get metrics for
:type until: datetime
:param workflow_ids: The workflow id to find runs for
:type workflow_ids: List[str]
:param parent_task_external_id: The parent task's external id
@@ -1614,6 +1619,7 @@ class TaskApi:
_param = self._v1_task_list_status_metrics_serialize(
tenant=tenant,
since=since,
until=until,
workflow_ids=workflow_ids,
parent_task_external_id=parent_task_external_id,
_request_auth=_request_auth,
@@ -1649,6 +1655,9 @@ class TaskApi:
since: Annotated[
datetime, Field(description="The start time to get metrics for")
],
until: Annotated[
Optional[datetime], Field(description="The end time to get metrics for")
] = None,
workflow_ids: Annotated[
Optional[
List[Annotated[str, Field(min_length=36, strict=True, max_length=36)]]
@@ -1679,6 +1688,8 @@ class TaskApi:
:type tenant: str
:param since: The start time to get metrics for (required)
:type since: datetime
:param until: The end time to get metrics for
:type until: datetime
:param workflow_ids: The workflow id to find runs for
:type workflow_ids: List[str]
:param parent_task_external_id: The parent task's external id
@@ -1708,6 +1719,7 @@ class TaskApi:
_param = self._v1_task_list_status_metrics_serialize(
tenant=tenant,
since=since,
until=until,
workflow_ids=workflow_ids,
parent_task_external_id=parent_task_external_id,
_request_auth=_request_auth,
@@ -1743,6 +1755,9 @@ class TaskApi:
since: Annotated[
datetime, Field(description="The start time to get metrics for")
],
until: Annotated[
Optional[datetime], Field(description="The end time to get metrics for")
] = None,
workflow_ids: Annotated[
Optional[
List[Annotated[str, Field(min_length=36, strict=True, max_length=36)]]
@@ -1773,6 +1788,8 @@ class TaskApi:
:type tenant: str
:param since: The start time to get metrics for (required)
:type since: datetime
:param until: The end time to get metrics for
:type until: datetime
:param workflow_ids: The workflow id to find runs for
:type workflow_ids: List[str]
:param parent_task_external_id: The parent task's external id
@@ -1802,6 +1819,7 @@ class TaskApi:
_param = self._v1_task_list_status_metrics_serialize(
tenant=tenant,
since=since,
until=until,
workflow_ids=workflow_ids,
parent_task_external_id=parent_task_external_id,
_request_auth=_request_auth,
@@ -1825,6 +1843,7 @@ class TaskApi:
self,
tenant,
since,
until,
workflow_ids,
parent_task_external_id,
_request_auth,
@@ -1863,6 +1882,17 @@ class TaskApi:
else:
_query_params.append(("since", since))
if until is not None:
if isinstance(until, datetime):
_query_params.append(
(
"until",
until.strftime(self.api_client.configuration.datetime_format),
)
)
else:
_query_params.append(("until", until))
if workflow_ids is not None:
_query_params.append(("workflow_ids", workflow_ids))

View File

@@ -23,6 +23,7 @@ from hatchet_sdk.clients.rest.api_response import ApiResponse
from hatchet_sdk.clients.rest.models.v1_task_event_list import V1TaskEventList
from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus
from hatchet_sdk.clients.rest.models.v1_task_summary_list import V1TaskSummaryList
from hatchet_sdk.clients.rest.models.v1_task_timing_list import V1TaskTimingList
from hatchet_sdk.clients.rest.models.v1_trigger_workflow_run_request import (
V1TriggerWorkflowRunRequest,
)
@@ -914,6 +915,304 @@ class WorkflowRunsApi:
_request_auth=_request_auth,
)
@validate_call
def v1_workflow_run_get_timings(
self,
v1_workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The workflow run id to get",
),
],
depth: Annotated[
Optional[StrictInt], Field(description="The depth to retrieve children")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> V1TaskTimingList:
"""List timings for a workflow run
Get the timings for a workflow run
:param v1_workflow_run: The workflow run id to get (required)
:type v1_workflow_run: str
:param depth: The depth to retrieve children
:type depth: int
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._v1_workflow_run_get_timings_serialize(
v1_workflow_run=v1_workflow_run,
depth=depth,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "V1TaskTimingList",
"400": "APIErrors",
"403": "APIErrors",
"501": "APIErrors",
}
response_data = self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
def v1_workflow_run_get_timings_with_http_info(
self,
v1_workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The workflow run id to get",
),
],
depth: Annotated[
Optional[StrictInt], Field(description="The depth to retrieve children")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[V1TaskTimingList]:
"""List timings for a workflow run
Get the timings for a workflow run
:param v1_workflow_run: The workflow run id to get (required)
:type v1_workflow_run: str
:param depth: The depth to retrieve children
:type depth: int
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._v1_workflow_run_get_timings_serialize(
v1_workflow_run=v1_workflow_run,
depth=depth,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "V1TaskTimingList",
"400": "APIErrors",
"403": "APIErrors",
"501": "APIErrors",
}
response_data = self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
def v1_workflow_run_get_timings_without_preload_content(
self,
v1_workflow_run: Annotated[
str,
Field(
min_length=36,
strict=True,
max_length=36,
description="The workflow run id to get",
),
],
depth: Annotated[
Optional[StrictInt], Field(description="The depth to retrieve children")
] = None,
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)], Annotated[StrictFloat, Field(gt=0)]
],
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""List timings for a workflow run
Get the timings for a workflow run
:param v1_workflow_run: The workflow run id to get (required)
:type v1_workflow_run: str
:param depth: The depth to retrieve children
:type depth: int
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._v1_workflow_run_get_timings_serialize(
v1_workflow_run=v1_workflow_run,
depth=depth,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index,
)
_response_types_map: Dict[str, Optional[str]] = {
"200": "V1TaskTimingList",
"400": "APIErrors",
"403": "APIErrors",
"501": "APIErrors",
}
response_data = self.api_client.call_api(
*_param, _request_timeout=_request_timeout
)
return response_data.response
def _v1_workflow_run_get_timings_serialize(
self,
v1_workflow_run,
depth,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[
str, Union[str, bytes, List[str], List[bytes], List[Tuple[str, bytes]]]
] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if v1_workflow_run is not None:
_path_params["v1-workflow-run"] = v1_workflow_run
# process the query parameters
if depth is not None:
_query_params.append(("depth", depth))
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
if "Accept" not in _header_params:
_header_params["Accept"] = self.api_client.select_header_accept(
["application/json"]
)
# authentication setting
_auth_settings: List[str] = ["cookieAuth", "bearerAuth"]
return self.api_client.param_serialize(
method="GET",
resource_path="/api/v1/stable/workflow-runs/{v1-workflow-run}/task-timings",
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth,
)
@validate_call
def v1_workflow_run_list(
self,

View File

@@ -40,6 +40,7 @@ from hatchet_sdk.clients.rest.exceptions import (
ServiceException,
UnauthorizedException,
)
from hatchet_sdk.logger import logger
RequestSerialized = Tuple[str, str, Dict[str, str], Optional[str], List[str]]
@@ -356,7 +357,20 @@ class ApiClient:
return [self.sanitize_for_serialization(sub_obj) for sub_obj in obj]
elif isinstance(obj, tuple):
return tuple(self.sanitize_for_serialization(sub_obj) for sub_obj in obj)
elif isinstance(obj, (datetime.datetime, datetime.date)):
## IMPORTANT: Checking `datetime` must come before `date` since `datetime` is a subclass of `date`
elif isinstance(obj, datetime.datetime):
if not obj.tzinfo:
current_tz = (
datetime.datetime.now(datetime.timezone(datetime.timedelta(0)))
.astimezone()
.tzinfo
or datetime.timezone.utc
)
logger.warning(f"timezone-naive datetime found. assuming {current_tz}.")
obj = obj.replace(tzinfo=current_tz)
return obj.isoformat()
elif isinstance(obj, datetime.date):
return obj.isoformat()
elif isinstance(obj, decimal.Decimal):
return str(obj)

View File

@@ -210,6 +210,8 @@ from hatchet_sdk.clients.rest.models.v1_task_run_status import V1TaskRunStatus
from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus
from hatchet_sdk.clients.rest.models.v1_task_summary import V1TaskSummary
from hatchet_sdk.clients.rest.models.v1_task_summary_list import V1TaskSummaryList
from hatchet_sdk.clients.rest.models.v1_task_timing import V1TaskTiming
from hatchet_sdk.clients.rest.models.v1_task_timing_list import V1TaskTimingList
from hatchet_sdk.clients.rest.models.v1_trigger_workflow_run_request import (
V1TriggerWorkflowRunRequest,
)

View File

@@ -95,10 +95,8 @@ class V1TaskSummary(BaseModel):
)
workflow_id: StrictStr = Field(alias="workflowId")
workflow_name: Optional[StrictStr] = Field(default=None, alias="workflowName")
workflow_run_external_id: Optional[StrictStr] = Field(
default=None,
description="The external ID of the workflow run",
alias="workflowRunExternalId",
workflow_run_external_id: StrictStr = Field(
description="The external ID of the workflow run", alias="workflowRunExternalId"
)
workflow_version_id: Optional[StrictStr] = Field(
default=None,

View File

@@ -0,0 +1,159 @@
# coding: utf-8
"""
Hatchet API
The Hatchet API
The version of the OpenAPI document: 1.0.0
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
from __future__ import annotations
import json
import pprint
import re # noqa: F401
from datetime import datetime
from typing import Any, ClassVar, Dict, List, Optional, Set
from pydantic import BaseModel, ConfigDict, Field, StrictInt, StrictStr
from typing_extensions import Annotated, Self
from hatchet_sdk.clients.rest.models.api_resource_meta import APIResourceMeta
from hatchet_sdk.clients.rest.models.v1_task_status import V1TaskStatus
class V1TaskTiming(BaseModel):
"""
V1TaskTiming
""" # noqa: E501
metadata: APIResourceMeta
depth: StrictInt = Field(description="The depth of the task in the waterfall.")
status: V1TaskStatus
task_display_name: StrictStr = Field(
description="The display name of the task run.", alias="taskDisplayName"
)
task_external_id: Annotated[
str, Field(min_length=36, strict=True, max_length=36)
] = Field(description="The external ID of the task.", alias="taskExternalId")
task_id: StrictInt = Field(description="The ID of the task.", alias="taskId")
task_inserted_at: datetime = Field(
description="The timestamp the task was inserted.", alias="taskInsertedAt"
)
tenant_id: Annotated[str, Field(min_length=36, strict=True, max_length=36)] = Field(
description="The ID of the tenant.", alias="tenantId"
)
parent_task_external_id: Optional[
Annotated[str, Field(min_length=36, strict=True, max_length=36)]
] = Field(
default=None,
description="The external ID of the parent task.",
alias="parentTaskExternalId",
)
queued_at: Optional[datetime] = Field(
default=None,
description="The timestamp the task run was queued.",
alias="queuedAt",
)
started_at: Optional[datetime] = Field(
default=None,
description="The timestamp the task run started.",
alias="startedAt",
)
finished_at: Optional[datetime] = Field(
default=None,
description="The timestamp the task run finished.",
alias="finishedAt",
)
__properties: ClassVar[List[str]] = [
"metadata",
"depth",
"status",
"taskDisplayName",
"taskExternalId",
"taskId",
"taskInsertedAt",
"tenantId",
"parentTaskExternalId",
"queuedAt",
"startedAt",
"finishedAt",
]
model_config = ConfigDict(
populate_by_name=True,
validate_assignment=True,
protected_namespaces=(),
)
def to_str(self) -> str:
"""Returns the string representation of the model using alias"""
return pprint.pformat(self.model_dump(by_alias=True))
def to_json(self) -> str:
"""Returns the JSON representation of the model using alias"""
# TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> Optional[Self]:
"""Create an instance of V1TaskTiming from a JSON string"""
return cls.from_dict(json.loads(json_str))
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of the model using alias.
This has the following differences from calling pydantic's
`self.model_dump(by_alias=True)`:
* `None` is only added to the output dict for nullable fields that
were set at model initialization. Other fields with value `None`
are ignored.
"""
excluded_fields: Set[str] = set([])
_dict = self.model_dump(
by_alias=True,
exclude=excluded_fields,
exclude_none=True,
)
# override the default output from pydantic by calling `to_dict()` of metadata
if self.metadata:
_dict["metadata"] = self.metadata.to_dict()
return _dict
@classmethod
def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
"""Create an instance of V1TaskTiming from a dict"""
if obj is None:
return None
if not isinstance(obj, dict):
return cls.model_validate(obj)
_obj = cls.model_validate(
{
"metadata": (
APIResourceMeta.from_dict(obj["metadata"])
if obj.get("metadata") is not None
else None
),
"depth": obj.get("depth"),
"status": obj.get("status"),
"taskDisplayName": obj.get("taskDisplayName"),
"taskExternalId": obj.get("taskExternalId"),
"taskId": obj.get("taskId"),
"taskInsertedAt": obj.get("taskInsertedAt"),
"tenantId": obj.get("tenantId"),
"parentTaskExternalId": obj.get("parentTaskExternalId"),
"queuedAt": obj.get("queuedAt"),
"startedAt": obj.get("startedAt"),
"finishedAt": obj.get("finishedAt"),
}
)
return _obj

View File

@@ -0,0 +1,110 @@
# coding: utf-8
"""
Hatchet API
The Hatchet API
The version of the OpenAPI document: 1.0.0
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
from __future__ import annotations
import json
import pprint
import re # noqa: F401
from typing import Any, ClassVar, Dict, List, Optional, Set
from pydantic import BaseModel, ConfigDict, Field
from typing_extensions import Self
from hatchet_sdk.clients.rest.models.pagination_response import PaginationResponse
from hatchet_sdk.clients.rest.models.v1_task_timing import V1TaskTiming
class V1TaskTimingList(BaseModel):
"""
V1TaskTimingList
""" # noqa: E501
pagination: PaginationResponse
rows: List[V1TaskTiming] = Field(description="The list of task timings")
__properties: ClassVar[List[str]] = ["pagination", "rows"]
model_config = ConfigDict(
populate_by_name=True,
validate_assignment=True,
protected_namespaces=(),
)
def to_str(self) -> str:
"""Returns the string representation of the model using alias"""
return pprint.pformat(self.model_dump(by_alias=True))
def to_json(self) -> str:
"""Returns the JSON representation of the model using alias"""
# TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> Optional[Self]:
"""Create an instance of V1TaskTimingList from a JSON string"""
return cls.from_dict(json.loads(json_str))
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of the model using alias.
This has the following differences from calling pydantic's
`self.model_dump(by_alias=True)`:
* `None` is only added to the output dict for nullable fields that
were set at model initialization. Other fields with value `None`
are ignored.
"""
excluded_fields: Set[str] = set([])
_dict = self.model_dump(
by_alias=True,
exclude=excluded_fields,
exclude_none=True,
)
# override the default output from pydantic by calling `to_dict()` of pagination
if self.pagination:
_dict["pagination"] = self.pagination.to_dict()
# override the default output from pydantic by calling `to_dict()` of each item in rows (list)
_items = []
if self.rows:
for _item_rows in self.rows:
if _item_rows:
_items.append(_item_rows.to_dict())
_dict["rows"] = _items
return _dict
@classmethod
def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
"""Create an instance of V1TaskTimingList from a dict"""
if obj is None:
return None
if not isinstance(obj, dict):
return cls.model_validate(obj)
_obj = cls.model_validate(
{
"pagination": (
PaginationResponse.from_dict(obj["pagination"])
if obj.get("pagination") is not None
else None
),
"rows": (
[V1TaskTiming.from_dict(_item) for _item in obj["rows"]]
if obj.get("rows") is not None
else None
),
}
)
return _obj

View File

@@ -1,5 +1,6 @@
import asyncio
import logging
from datetime import timedelta
from typing import Any, Callable, Type, Union, cast, overload
from hatchet_sdk import Context, DurableContext
@@ -21,8 +22,6 @@ from hatchet_sdk.logger import logger
from hatchet_sdk.rate_limit import RateLimit
from hatchet_sdk.runnables.standalone import Standalone
from hatchet_sdk.runnables.types import (
DEFAULT_EXECUTION_TIMEOUT,
DEFAULT_SCHEDULE_TIMEOUT,
ConcurrencyExpression,
EmptyModel,
R,
@@ -294,8 +293,8 @@ class Hatchet:
sticky: StickyStrategy | None = None,
default_priority: int = 1,
concurrency: ConcurrencyExpression | list[ConcurrencyExpression] | None = None,
schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT,
execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT,
schedule_timeout: Duration = timedelta(minutes=5),
execution_timeout: Duration = timedelta(seconds=60),
retries: int = 0,
rate_limits: list[RateLimit] = [],
desired_worker_labels: dict[str, DesiredWorkerLabel] = {},
@@ -316,8 +315,8 @@ class Hatchet:
sticky: StickyStrategy | None = None,
default_priority: int = 1,
concurrency: ConcurrencyExpression | list[ConcurrencyExpression] | None = None,
schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT,
execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT,
schedule_timeout: Duration = timedelta(minutes=5),
execution_timeout: Duration = timedelta(seconds=60),
retries: int = 0,
rate_limits: list[RateLimit] = [],
desired_worker_labels: dict[str, DesiredWorkerLabel] = {},
@@ -339,8 +338,8 @@ class Hatchet:
sticky: StickyStrategy | None = None,
default_priority: int = 1,
concurrency: ConcurrencyExpression | list[ConcurrencyExpression] | None = None,
schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT,
execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT,
schedule_timeout: Duration = timedelta(minutes=5),
execution_timeout: Duration = timedelta(seconds=60),
retries: int = 0,
rate_limits: list[RateLimit] = [],
desired_worker_labels: dict[str, DesiredWorkerLabel] = {},
@@ -451,8 +450,8 @@ class Hatchet:
sticky: StickyStrategy | None = None,
default_priority: int = 1,
concurrency: ConcurrencyExpression | None = None,
schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT,
execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT,
schedule_timeout: Duration = timedelta(minutes=5),
execution_timeout: Duration = timedelta(seconds=60),
retries: int = 0,
rate_limits: list[RateLimit] = [],
desired_worker_labels: dict[str, DesiredWorkerLabel] = {},
@@ -475,8 +474,8 @@ class Hatchet:
sticky: StickyStrategy | None = None,
default_priority: int = 1,
concurrency: ConcurrencyExpression | None = None,
schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT,
execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT,
schedule_timeout: Duration = timedelta(minutes=5),
execution_timeout: Duration = timedelta(seconds=60),
retries: int = 0,
rate_limits: list[RateLimit] = [],
desired_worker_labels: dict[str, DesiredWorkerLabel] = {},
@@ -498,8 +497,8 @@ class Hatchet:
sticky: StickyStrategy | None = None,
default_priority: int = 1,
concurrency: ConcurrencyExpression | None = None,
schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT,
execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT,
schedule_timeout: Duration = timedelta(minutes=5),
execution_timeout: Duration = timedelta(seconds=60),
retries: int = 0,
rate_limits: list[RateLimit] = [],
desired_worker_labels: dict[str, DesiredWorkerLabel] = {},

View File

@@ -1,10 +1,10 @@
from datetime import timedelta
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Generic,
TypeVar,
Union,
cast,
get_type_hints,
@@ -18,8 +18,6 @@ from hatchet_sdk.contracts.v1.workflows_pb2 import (
DesiredWorkerLabels,
)
from hatchet_sdk.runnables.types import (
DEFAULT_EXECUTION_TIMEOUT,
DEFAULT_SCHEDULE_TIMEOUT,
ConcurrencyExpression,
R,
StepType,
@@ -43,18 +41,6 @@ if TYPE_CHECKING:
from hatchet_sdk.runnables.workflow import Workflow
T = TypeVar("T")
def fall_back_to_default(value: T, default: T, fallback_value: T) -> T:
## If the value is not the default, it's set
if value != default:
return value
## Otherwise, it's unset, so return the fallback value
return fallback_value
class Task(Generic[TWorkflowInput, R]):
def __init__(
self,
@@ -68,8 +54,8 @@ class Task(Generic[TWorkflowInput, R]):
type: StepType,
workflow: "Workflow[TWorkflowInput]",
name: str,
execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT,
schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT,
execution_timeout: Duration = timedelta(seconds=60),
schedule_timeout: Duration = timedelta(minutes=5),
parents: "list[Task[TWorkflowInput, Any]]" = [],
retries: int = 0,
rate_limits: list[CreateTaskRateLimit] = [],
@@ -89,12 +75,8 @@ class Task(Generic[TWorkflowInput, R]):
self.workflow = workflow
self.type = type
self.execution_timeout = fall_back_to_default(
execution_timeout, DEFAULT_EXECUTION_TIMEOUT, DEFAULT_EXECUTION_TIMEOUT
)
self.schedule_timeout = fall_back_to_default(
schedule_timeout, DEFAULT_SCHEDULE_TIMEOUT, DEFAULT_SCHEDULE_TIMEOUT
)
self.execution_timeout = execution_timeout
self.schedule_timeout = schedule_timeout
self.name = name
self.parents = parents
self.retries = retries

View File

@@ -1,9 +1,8 @@
import asyncio
from datetime import timedelta
from enum import Enum
from typing import Any, Awaitable, Callable, ParamSpec, Type, TypeGuard, TypeVar, Union
from pydantic import BaseModel, ConfigDict, Field, StrictInt, model_validator
from pydantic import BaseModel, ConfigDict, Field, model_validator
from hatchet_sdk.context.context import Context, DurableContext
from hatchet_sdk.contracts.v1.workflows_pb2 import Concurrency
@@ -16,11 +15,6 @@ R = TypeVar("R", bound=Union[ValidTaskReturnType, Awaitable[ValidTaskReturnType]
P = ParamSpec("P")
DEFAULT_EXECUTION_TIMEOUT = timedelta(seconds=60)
DEFAULT_SCHEDULE_TIMEOUT = timedelta(minutes=5)
DEFAULT_PRIORITY = 1
class EmptyModel(BaseModel):
model_config = ConfigDict(extra="allow", frozen=True)
@@ -65,9 +59,12 @@ TWorkflowInput = TypeVar("TWorkflowInput", bound=BaseModel)
class TaskDefaults(BaseModel):
schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT
execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT
priority: StrictInt = Field(gt=0, lt=4, default=DEFAULT_PRIORITY)
schedule_timeout: Duration | None = None
execution_timeout: Duration | None = None
priority: int | None = Field(gt=0, lt=4, default=None)
retries: int | None = None
backoff_factor: float | None = None
backoff_max_seconds: int | None = None
class WorkflowConfig(BaseModel):

View File

@@ -1,9 +1,9 @@
import asyncio
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, Callable, Generic, cast
from typing import TYPE_CHECKING, Any, Callable, Generic, TypeVar, cast
from google.protobuf import timestamp_pb2
from pydantic import BaseModel
from pydantic import BaseModel, model_validator
from hatchet_sdk.clients.admin import (
ScheduleTriggerWorkflowOptions,
@@ -25,12 +25,11 @@ from hatchet_sdk.logger import logger
from hatchet_sdk.rate_limit import RateLimit
from hatchet_sdk.runnables.task import Task
from hatchet_sdk.runnables.types import (
DEFAULT_EXECUTION_TIMEOUT,
DEFAULT_SCHEDULE_TIMEOUT,
ConcurrencyExpression,
EmptyModel,
R,
StepType,
TaskDefaults,
TWorkflowInput,
WorkflowConfig,
)
@@ -45,6 +44,62 @@ if TYPE_CHECKING:
from hatchet_sdk.runnables.standalone import Standalone
T = TypeVar("T")
def fall_back_to_default(value: T, param_default: T, fallback_value: T | None) -> T:
## If the value is not the param default, it's set
if value != param_default:
return value
## Otherwise, it's unset, so return the fallback value if it's set
if fallback_value is not None:
return fallback_value
## Otherwise return the param value
return value
class ComputedTaskParameters(BaseModel):
schedule_timeout: Duration
execution_timeout: Duration
retries: int
backoff_factor: float | None
backoff_max_seconds: int | None
task_defaults: TaskDefaults
@model_validator(mode="after")
def validate_params(self) -> "ComputedTaskParameters":
self.execution_timeout = fall_back_to_default(
value=self.execution_timeout,
param_default=timedelta(seconds=60),
fallback_value=self.task_defaults.execution_timeout,
)
self.schedule_timeout = fall_back_to_default(
value=self.schedule_timeout,
param_default=timedelta(minutes=5),
fallback_value=self.task_defaults.schedule_timeout,
)
self.backoff_factor = fall_back_to_default(
value=self.backoff_factor,
param_default=None,
fallback_value=self.task_defaults.backoff_factor,
)
self.backoff_max_seconds = fall_back_to_default(
value=self.backoff_max_seconds,
param_default=None,
fallback_value=self.task_defaults.backoff_max_seconds,
)
self.retries = fall_back_to_default(
value=self.retries,
param_default=0,
fallback_value=self.task_defaults.retries,
)
return self
def transform_desired_worker_label(d: DesiredWorkerLabel) -> DesiredWorkerLabels:
value = d.value
return DesiredWorkerLabels(
@@ -530,8 +585,8 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
def task(
self,
name: str | None = None,
schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT,
execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT,
schedule_timeout: Duration = timedelta(minutes=5),
execution_timeout: Duration = timedelta(seconds=60),
parents: list[Task[TWorkflowInput, Any]] = [],
retries: int = 0,
rate_limits: list[RateLimit] = [],
@@ -575,6 +630,15 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
:returns: A decorator which creates a `Task` object.
"""
computed_params = ComputedTaskParameters(
schedule_timeout=schedule_timeout,
execution_timeout=execution_timeout,
retries=retries,
backoff_factor=backoff_factor,
backoff_max_seconds=backoff_max_seconds,
task_defaults=self.config.task_defaults,
)
def inner(
func: Callable[[TWorkflowInput, Context], R]
) -> Task[TWorkflowInput, R]:
@@ -584,17 +648,17 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
workflow=self,
type=StepType.DEFAULT,
name=self._parse_task_name(name, func),
execution_timeout=execution_timeout,
schedule_timeout=schedule_timeout,
execution_timeout=computed_params.execution_timeout,
schedule_timeout=computed_params.schedule_timeout,
parents=parents,
retries=retries,
retries=computed_params.retries,
rate_limits=[r.to_proto() for r in rate_limits],
desired_worker_labels={
key: transform_desired_worker_label(d)
for key, d in desired_worker_labels.items()
},
backoff_factor=backoff_factor,
backoff_max_seconds=backoff_max_seconds,
backoff_factor=computed_params.backoff_factor,
backoff_max_seconds=computed_params.backoff_max_seconds,
concurrency=concurrency,
wait_for=wait_for,
skip_if=skip_if,
@@ -610,8 +674,8 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
def durable_task(
self,
name: str | None = None,
schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT,
execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT,
schedule_timeout: Duration = timedelta(minutes=5),
execution_timeout: Duration = timedelta(seconds=60),
parents: list[Task[TWorkflowInput, Any]] = [],
retries: int = 0,
rate_limits: list[RateLimit] = [],
@@ -661,6 +725,15 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
:returns: A decorator which creates a `Task` object.
"""
computed_params = ComputedTaskParameters(
schedule_timeout=schedule_timeout,
execution_timeout=execution_timeout,
retries=retries,
backoff_factor=backoff_factor,
backoff_max_seconds=backoff_max_seconds,
task_defaults=self.config.task_defaults,
)
def inner(
func: Callable[[TWorkflowInput, DurableContext], R]
) -> Task[TWorkflowInput, R]:
@@ -670,17 +743,17 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
workflow=self,
type=StepType.DEFAULT,
name=self._parse_task_name(name, func),
execution_timeout=execution_timeout,
schedule_timeout=schedule_timeout,
execution_timeout=computed_params.execution_timeout,
schedule_timeout=computed_params.schedule_timeout,
parents=parents,
retries=retries,
retries=computed_params.retries,
rate_limits=[r.to_proto() for r in rate_limits],
desired_worker_labels={
key: transform_desired_worker_label(d)
for key, d in desired_worker_labels.items()
},
backoff_factor=backoff_factor,
backoff_max_seconds=backoff_max_seconds,
backoff_factor=computed_params.backoff_factor,
backoff_max_seconds=computed_params.backoff_max_seconds,
concurrency=concurrency,
wait_for=wait_for,
skip_if=skip_if,
@@ -696,8 +769,8 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
def on_failure_task(
self,
name: str | None = None,
schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT,
execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT,
schedule_timeout: Duration = timedelta(minutes=5),
execution_timeout: Duration = timedelta(seconds=60),
retries: int = 0,
rate_limits: list[RateLimit] = [],
backoff_factor: float | None = None,
@@ -756,8 +829,8 @@ class Workflow(BaseWorkflow[TWorkflowInput]):
def on_success_task(
self,
name: str | None = None,
schedule_timeout: Duration = DEFAULT_SCHEDULE_TIMEOUT,
execution_timeout: Duration = DEFAULT_EXECUTION_TIMEOUT,
schedule_timeout: Duration = timedelta(minutes=5),
execution_timeout: Duration = timedelta(seconds=60),
retries: int = 0,
rate_limits: list[RateLimit] = [],
backoff_factor: float | None = None,

View File

@@ -1,23 +0,0 @@
diff --git a/hatchet_sdk/clients/rest/models/workflow_runs_metrics.py b/hatchet_sdk/clients/rest/models/workflow_runs_metrics.py
index 71b6351..5f70c44 100644
--- a/hatchet_sdk/clients/rest/models/workflow_runs_metrics.py
+++ b/hatchet_sdk/clients/rest/models/workflow_runs_metrics.py
@@ -22,13 +22,17 @@ from typing import Any, ClassVar, Dict, List, Optional, Set
from pydantic import BaseModel, ConfigDict
from typing_extensions import Self
+from hatchet_sdk.clients.rest.models.workflow_runs_metrics_counts import (
+ WorkflowRunsMetricsCounts,
+)
+
class WorkflowRunsMetrics(BaseModel):
"""
WorkflowRunsMetrics
""" # noqa: E501
- counts: Optional[Dict[str, Any]] = None
+ counts: Optional[WorkflowRunsMetricsCounts] = None
__properties: ClassVar[List[str]] = ["counts"]
model_config = ConfigDict(

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "hatchet-sdk"
version = "1.7.0"
version = "1.8.0"
description = ""
authors = ["Alexander Belanger <alexander@hatchet.run>"]
readme = "README.md"

View File

@@ -0,0 +1,184 @@
"""
IMPORTANT
----------
These tests are intended to prevent us from changing defaults in one place and
forgetting to change them in other places, or otherwise breaking the default handling logic.
If you get a failure here, you likely changed the default values for some of the params (below)
in one of the task decorators e.g. `Workflow.task`, etc.
The intention of these tests is to:
1. Ensure that the behavior of falling back to `TaskDefaults` works as expected, which means that
if no value for a certain parameter to one of these decorators is provided, it should fall back to the
value in `TaskDefaults` if one is set.
2. Ensure that the default values set in the rest of the codebase don't change, and are consistent with each other.
If you change the default values for any of these parameters, please update the tests accordingly.
"""
from datetime import timedelta
from typing import Any
import pytest
from hatchet_sdk import Context, DurableContext, EmptyModel, Hatchet, Task, TaskDefaults
def dummy_task(input: EmptyModel, context: Context) -> dict[str, str]:
return {"foo": "bar"}
def dummy_durable_task(input: EmptyModel, context: DurableContext) -> dict[str, str]:
return {"foo": "bar"}
DEFAULT_SCHEDULE_TIMEOUT = timedelta(minutes=5)
DEFAULT_EXECUTION_TIMEOUT = timedelta(seconds=60)
DEFAULT_RETRIES = 0
DEFAULT_BACKOFF_FACTOR = None
DEFAULT_BACKOFF_MAX_SECONDS = None
def task(
hatchet: Hatchet,
is_durable: bool,
task_defaults: TaskDefaults,
**kwargs: Any,
) -> Task[EmptyModel, dict[str, str]]:
workflow = hatchet.workflow(
name="foo",
task_defaults=task_defaults,
)
task_fn = workflow.durable_task if is_durable else workflow.task
return task_fn(**kwargs)(dummy_durable_task if is_durable else dummy_task) # type: ignore
def standalone_task(
hatchet: Hatchet,
is_durable: bool,
**kwargs: Any,
) -> Task[EmptyModel, dict[str, str]]:
task_fn = hatchet.durable_task if is_durable else hatchet.task
return task_fn(**kwargs)(dummy_durable_task if is_durable else task)._task # type: ignore
@pytest.mark.parametrize("is_durable", [False, True])
def test_task_defaults_applied_correctly(hatchet: Hatchet, is_durable: bool) -> None:
schedule_timeout = timedelta(seconds=3)
execution_timeout = timedelta(seconds=1)
retries = 4
backoff_factor = 1
backoff_max_seconds = 5
t = task(
hatchet=hatchet,
is_durable=is_durable,
task_defaults=TaskDefaults(
schedule_timeout=schedule_timeout,
execution_timeout=execution_timeout,
retries=retries,
backoff_factor=backoff_factor,
backoff_max_seconds=backoff_max_seconds,
),
)
assert t.schedule_timeout == schedule_timeout
assert t.execution_timeout == execution_timeout
assert t.retries == retries
assert t.backoff_factor == backoff_factor
assert t.backoff_max_seconds == backoff_max_seconds
@pytest.mark.parametrize(
"is_durable,is_standalone",
[
(False, False),
(True, False),
(False, True),
(True, True),
],
)
def test_fallbacking_ensure_default_unchanged(
hatchet: Hatchet, is_durable: bool, is_standalone: bool
) -> None:
t = task(
hatchet=hatchet,
is_durable=is_durable,
task_defaults=TaskDefaults(),
)
"""If this test fails, it means that you changed the default values for the params to one of the `task` or `durable_task` decorators"""
assert t.schedule_timeout == DEFAULT_SCHEDULE_TIMEOUT
assert t.execution_timeout == DEFAULT_EXECUTION_TIMEOUT
assert t.retries == DEFAULT_RETRIES
assert t.backoff_factor == DEFAULT_BACKOFF_FACTOR
assert t.backoff_max_seconds == DEFAULT_BACKOFF_MAX_SECONDS
@pytest.mark.parametrize(
"is_durable,is_standalone",
[
(False, False),
(True, False),
(False, True),
(True, True),
],
)
def test_defaults_correctly_overridden_by_params_passed_in(
hatchet: Hatchet, is_durable: bool, is_standalone: bool
) -> None:
t = task(
hatchet=hatchet,
is_durable=is_durable,
task_defaults=TaskDefaults(
schedule_timeout=timedelta(seconds=3),
execution_timeout=timedelta(seconds=1),
retries=4,
backoff_factor=1,
backoff_max_seconds=5,
),
schedule_timeout=timedelta(seconds=9),
execution_timeout=timedelta(seconds=2),
retries=6,
backoff_factor=3,
backoff_max_seconds=5,
)
assert t.schedule_timeout == timedelta(seconds=9)
assert t.execution_timeout == timedelta(seconds=2)
assert t.retries == 6
assert t.backoff_factor == 3
assert t.backoff_max_seconds == 5
@pytest.mark.parametrize(
"is_durable,is_standalone",
[
(False, False),
(True, False),
(False, True),
(True, True),
],
)
def test_params_correctly_set_with_no_defaults(
hatchet: Hatchet, is_durable: bool, is_standalone: bool
) -> None:
t = task(
hatchet=hatchet,
is_durable=is_durable,
task_defaults=TaskDefaults(),
schedule_timeout=timedelta(seconds=9),
execution_timeout=timedelta(seconds=2),
retries=6,
backoff_factor=3,
backoff_max_seconds=5,
)
assert t.schedule_timeout == timedelta(seconds=9)
assert t.execution_timeout == timedelta(seconds=2)
assert t.retries == 6
assert t.backoff_factor == 3
assert t.backoff_max_seconds == 5