Files
hatchet/sdks/python/apply_patches.py
Matt Kaye 2f33dd4dbd Feat: Misc. Python improvements + Streaming Improvements (#1846)
* fix: contextvars explicit copy

* feat: fix a ton of ruff errors

* fix: couple more ruff rules

* fix: ignore unhelpful rule

* fix: exception group in newer Python versions for improved handling

* fix: workflow docs

* feat: context docs

* feat: simple task counter

* feat: config for setting max tasks

* feat: graceful exit once worker exceeds max tasks

* fix: optional

* fix: docs

* fix: events docs + gen

* chore: gen

* fix: one more dangling task

* feat: add xdist in ci

* fix: CI

* fix: xdist fails me once again

* fix: fix + extend some tests

* fix: test cleanup

* fix: exception group

* fix: ugh

* feat: changelog

* Add Ruff linter callout to post

* refactor: clean up runner error handling

* feat: improved errors

* fix: lint

* feat: hacky serde impl

* fix: improve serde + formatting

* fix: logging

* fix: lint

* fix: unexpected errors

* fix: naming, ruff

* fix: rm cruft

* Fix: Attempt to fix namespacing issue in event waits (#1885)

* feat: add xdist in ci

* fix: attempt to fix namespacing issue in event waits

* fix: namespaced worker names

* fix: applied namespace to the wrong thing

* fix: rm hack

* drive by: namespacing improvement

* fix: delay

* fix: changelog

* fix: initial log work

* fix: more logging work

* fix: rm print cruft

* feat: use a queue to send logs

* fix: sentinel value to stop the loop

* fix: use the log sender everywhere

* fix: make streaming blocking, remove more thread pools

* feat: changelog

* fix: linting issues

* fix: broken test

* chore: bunch more generated stuff

* fix: changelog

* fix: one more

* fix: mypy

* chore: gen

* Feat: Streaming Improvements (#1886)

* Fix: Filter list improvements (#1899)

* fix: uuid validation

* fix: improve filter filtering

* fix: inner join

* fix: bug in workflow cached prop

* chore: bump

* fix: lint

* chore: changelog

* fix: separate filter queries

* feat: improve filter filtering

* fix: queries and the like

* feat: add xdist in ci

* feat: streaming test + gen

* feat: add index to stream event

* fix: rm langfuse dep

* fix: lf

* chore: gen

* feat: impl index for stream on context

* feat: tweak protos

* feat: extend test

* feat: send event index through queue

* feat: first pass + debug logging

* debug: fixes

* debug: more possible issues

* feat: generate new stream event protos

* feat: first pass at using an alternate exchange for replaying incoming stream events

* fix: exchange create timing

* fix: rm unused protos

* chore: gen

* feat: python cleanup

* fix: revert rabbit changes

* fix: unwind a bunch of cruft

* fix: optional index

* chore: gen python

* fix: event index nil handling

* feat: improve test

* fix: stream impl in sdk

* fix: make test faster

* chore: gen a ton more stuff

* fix: test

* fix: sorting helper

* fix: bug

* fix: one more ordering bug

* feat: add some tests for buffering logic

* feat: hangup test

* feat: test no buffering if no index sent

* fix: regular mutex

* fix: pr feedback

* fix: conflicts
2025-06-25 10:11:01 -04:00

141 lines
4.6 KiB
Python

import re
from collections.abc import Callable
from copy import deepcopy
from pathlib import Path
def prepend_import(content: str, import_statement: str) -> str:
if import_statement in content:
return content
match = re.search(r"^import\s+|^from\s+", content, re.MULTILINE)
insert_position = match.start() if match else 0
return (
content[:insert_position] + import_statement + "\n" + content[insert_position:]
)
def apply_patch(content: str, pattern: str, replacement: str) -> str:
return re.sub(pattern, replacement, content)
def atomically_patch_file(
file_path: str, patch_funcs: list[Callable[[str], str]]
) -> None:
path = Path(file_path)
original = path.read_text()
modified = deepcopy(original)
try:
for func in patch_funcs:
modified = func(modified)
except Exception as e:
print(f"Error patching {file_path}: {e}")
return
if modified != original:
path.write_text(modified)
print(f"Patched {file_path}")
else:
print(f"No changes made to {file_path}")
def patch_contract_import_paths(content: str) -> str:
return apply_patch(content, r"\bfrom v1\b", "from hatchet_sdk.contracts.v1")
def patch_grpc_dispatcher_import(content: str) -> str:
return apply_patch(
content,
r"\bimport dispatcher_pb2 as dispatcher__pb2\b",
"from hatchet_sdk.contracts import dispatcher_pb2 as dispatcher__pb2",
)
def patch_grpc_events_import(content: str) -> str:
return apply_patch(
content,
r"\bimport events_pb2 as events__pb2\b",
"from hatchet_sdk.contracts import events_pb2 as events__pb2",
)
def patch_grpc_workflows_import(content: str) -> str:
return apply_patch(
content,
r"\bimport workflows_pb2 as workflows__pb2\b",
"from hatchet_sdk.contracts import workflows_pb2 as workflows__pb2",
)
def patch_grpc_init_signature(content: str) -> str:
return apply_patch(
content,
r"def __init__\(self, channel\):",
"def __init__(self, channel: grpc.Channel | grpc.aio.Channel) -> None:",
)
def apply_patches_to_matching_files(
root: str, glob: str, patch_funcs: list[Callable[[str], str]]
) -> None:
for file_path in Path(root).rglob(glob):
atomically_patch_file(str(file_path), patch_funcs)
def patch_api_client_datetime_format_on_post(content: str) -> str:
content = prepend_import(content, "from hatchet_sdk.logger import logger")
pattern = r"([ \t]*)elif isinstance\(obj, \(datetime\.datetime, datetime\.date\)\):\s*\n\1[ \t]*return obj\.isoformat\(\)"
replacement = (
r"\1## IMPORTANT: Checking `datetime` must come before `date` since `datetime` is a subclass of `date`\n"
r"\1elif isinstance(obj, datetime.datetime):\n"
r"\1 if not obj.tzinfo:\n"
r"\1 current_tz = (datetime.datetime.now(datetime.timezone(datetime.timedelta(0))).astimezone().tzinfo or datetime.timezone.utc)\n"
r'\1 logger.warning(f"timezone-naive datetime found. assuming {current_tz}.")\n'
r"\1 obj = obj.replace(tzinfo=current_tz)\n\n"
r"\1 return obj.isoformat()\n"
r"\1elif isinstance(obj, datetime.date):\n"
r"\1 return obj.isoformat()"
)
return apply_patch(content, pattern, replacement)
def patch_workflow_run_metrics_counts_return_type(content: str) -> str:
content = prepend_import(
content,
"from hatchet_sdk.clients.rest.models.workflow_runs_metrics_counts import WorkflowRunsMetricsCounts",
)
pattern = r"([ \t]*)counts: Optional\[Dict\[str, Any\]\] = None"
replacement = r"\1counts: Optional[WorkflowRunsMetricsCounts] = None"
return apply_patch(content, pattern, replacement)
if __name__ == "__main__":
atomically_patch_file(
"hatchet_sdk/clients/rest/api_client.py",
[patch_api_client_datetime_format_on_post],
)
atomically_patch_file(
"hatchet_sdk/clients/rest/models/workflow_runs_metrics.py",
[patch_workflow_run_metrics_counts_return_type],
)
grpc_patches: list[Callable[[str], str]] = [
patch_contract_import_paths,
patch_grpc_dispatcher_import,
patch_grpc_events_import,
patch_grpc_workflows_import,
patch_grpc_init_signature,
]
pb2_patches: list[Callable[[str], str]] = [
patch_contract_import_paths,
]
apply_patches_to_matching_files("hatchet_sdk/contracts", "*_grpc.py", grpc_patches)
apply_patches_to_matching_files("hatchet_sdk/contracts", "*_pb2.py", pb2_patches)
apply_patches_to_matching_files("hatchet_sdk/contracts", "*_pb2.pyi", pb2_patches)