feat: cancel in progress (#325)

* chore: bad project path

* fix: remove tickerId for replay

* feat: cancel from request

* feat: cancel button in UI

* chore: rm comment

* fix: build error

* chore: reason case

* chore: reason string

* fix: linting

---------

Co-authored-by: gabriel ruttner <gabe@hatchet.run>
This commit is contained in:
Gabe Ruttner
2024-04-01 18:16:27 -07:00
committed by GitHub
parent d731a13afd
commit 5066547ce6
14 changed files with 2783 additions and 1887 deletions

View File

@@ -0,0 +1,29 @@
from hatchet_sdk import Hatchet, Context
from dotenv import load_dotenv
import asyncio
load_dotenv()
hatchet = Hatchet(debug=True)
@hatchet.workflow(on_events=["user:create"])
class CancelWorkflow:
def __init__(self):
self.my_value = "test"
@hatchet.step(timeout='10s', retries=1)
async def step1(self, context: Context):
i = 0
while not context.exit_flag.is_set() and i < 20:
print(f"Waiting for cancellation {i}")
await asyncio.sleep(1)
i += 1
if context.exit_flag.is_set():
print("Cancelled")
workflow = CancelWorkflow()
worker = hatchet.worker('test-worker', max_runs=4)
worker.register_workflow(workflow)
worker.start()

View File

@@ -605,6 +605,286 @@ class StepRunApi:
@validate_call
def step_run_update_cancel(
self,
tenant: Annotated[str, Field(min_length=36, strict=True, max_length=36, description="The tenant id")],
step_run: Annotated[str, Field(min_length=36, strict=True, max_length=36, description="The step run id")],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> StepRun:
"""Attempts to cancel a step run
Attempts to cancel a step run
:param tenant: The tenant id (required)
:type tenant: str
:param step_run: The step run id (required)
:type step_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_update_cancel_serialize(
tenant=tenant,
step_run=step_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "StepRun",
'400': "APIErrors",
'403': "APIErrors",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
).data
@validate_call
def step_run_update_cancel_with_http_info(
self,
tenant: Annotated[str, Field(min_length=36, strict=True, max_length=36, description="The tenant id")],
step_run: Annotated[str, Field(min_length=36, strict=True, max_length=36, description="The step run id")],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> ApiResponse[StepRun]:
"""Attempts to cancel a step run
Attempts to cancel a step run
:param tenant: The tenant id (required)
:type tenant: str
:param step_run: The step run id (required)
:type step_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_update_cancel_serialize(
tenant=tenant,
step_run=step_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "StepRun",
'400': "APIErrors",
'403': "APIErrors",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
response_data.read()
return self.api_client.response_deserialize(
response_data=response_data,
response_types_map=_response_types_map,
)
@validate_call
def step_run_update_cancel_without_preload_content(
self,
tenant: Annotated[str, Field(min_length=36, strict=True, max_length=36, description="The tenant id")],
step_run: Annotated[str, Field(min_length=36, strict=True, max_length=36, description="The step run id")],
_request_timeout: Union[
None,
Annotated[StrictFloat, Field(gt=0)],
Tuple[
Annotated[StrictFloat, Field(gt=0)],
Annotated[StrictFloat, Field(gt=0)]
]
] = None,
_request_auth: Optional[Dict[StrictStr, Any]] = None,
_content_type: Optional[StrictStr] = None,
_headers: Optional[Dict[StrictStr, Any]] = None,
_host_index: Annotated[StrictInt, Field(ge=0, le=0)] = 0,
) -> RESTResponseType:
"""Attempts to cancel a step run
Attempts to cancel a step run
:param tenant: The tenant id (required)
:type tenant: str
:param step_run: The step run id (required)
:type step_run: str
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:type _request_timeout: int, tuple(int, int), optional
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the
authentication in the spec for a single request.
:type _request_auth: dict, optional
:param _content_type: force content-type for the request.
:type _content_type: str, Optional
:param _headers: set to override the headers for a single
request; this effectively ignores the headers
in the spec for a single request.
:type _headers: dict, optional
:param _host_index: set to override the host_index for a single
request; this effectively ignores the host_index
in the spec for a single request.
:type _host_index: int, optional
:return: Returns the result object.
""" # noqa: E501
_param = self._step_run_update_cancel_serialize(
tenant=tenant,
step_run=step_run,
_request_auth=_request_auth,
_content_type=_content_type,
_headers=_headers,
_host_index=_host_index
)
_response_types_map: Dict[str, Optional[str]] = {
'200': "StepRun",
'400': "APIErrors",
'403': "APIErrors",
}
response_data = self.api_client.call_api(
*_param,
_request_timeout=_request_timeout
)
return response_data.response
def _step_run_update_cancel_serialize(
self,
tenant,
step_run,
_request_auth,
_content_type,
_headers,
_host_index,
) -> RequestSerialized:
_host = None
_collection_formats: Dict[str, str] = {
}
_path_params: Dict[str, str] = {}
_query_params: List[Tuple[str, str]] = []
_header_params: Dict[str, Optional[str]] = _headers or {}
_form_params: List[Tuple[str, str]] = []
_files: Dict[str, str] = {}
_body_params: Optional[bytes] = None
# process the path parameters
if tenant is not None:
_path_params['tenant'] = tenant
if step_run is not None:
_path_params['step-run'] = step_run
# process the query parameters
# process the header parameters
# process the form parameters
# process the body parameter
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
[
'application/json'
]
)
# authentication setting
_auth_settings: List[str] = [
'cookieAuth',
'bearerAuth'
]
return self.api_client.param_serialize(
method='POST',
resource_path='/api/v1/tenants/{tenant}/step-runs/{step-run}/cancel',
path_params=_path_params,
query_params=_query_params,
header_params=_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
auth_settings=_auth_settings,
collection_formats=_collection_formats,
_host=_host,
_request_auth=_request_auth
)
@validate_call
def step_run_update_rerun(
self,

View File

@@ -0,0 +1,87 @@
# coding: utf-8
"""
Hatchet API
The Hatchet API
The version of the OpenAPI document: 1.0.0
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
""" # noqa: E501
from __future__ import annotations
import pprint
import re # noqa: F401
import json
from pydantic import BaseModel
from typing import Any, ClassVar, Dict, List
from typing import Optional, Set
from typing_extensions import Self
class CancelStepRunRequest(BaseModel):
"""
CancelStepRunRequest
""" # noqa: E501
input: Dict[str, Any]
__properties: ClassVar[List[str]] = ["input"]
model_config = {
"populate_by_name": True,
"validate_assignment": True,
"protected_namespaces": (),
}
def to_str(self) -> str:
"""Returns the string representation of the model using alias"""
return pprint.pformat(self.model_dump(by_alias=True))
def to_json(self) -> str:
"""Returns the JSON representation of the model using alias"""
# TODO: pydantic v2: use .model_dump_json(by_alias=True, exclude_unset=True) instead
return json.dumps(self.to_dict())
@classmethod
def from_json(cls, json_str: str) -> Optional[Self]:
"""Create an instance of CancelStepRunRequest from a JSON string"""
return cls.from_dict(json.loads(json_str))
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of the model using alias.
This has the following differences from calling pydantic's
`self.model_dump(by_alias=True)`:
* `None` is only added to the output dict for nullable fields that
were set at model initialization. Other fields with value `None`
are ignored.
"""
excluded_fields: Set[str] = set([
])
_dict = self.model_dump(
by_alias=True,
exclude=excluded_fields,
exclude_none=True,
)
return _dict
@classmethod
def from_dict(cls, obj: Optional[Dict[str, Any]]) -> Optional[Self]:
"""Create an instance of CancelStepRunRequest from a dict"""
if obj is None:
return None
if not isinstance(obj, dict):
return cls.model_validate(obj)
_obj = cls.model_validate({
"input": obj.get("input")
})
return _obj