mirror of
https://github.com/hatchet-dev/hatchet.git
synced 2026-02-05 07:39:03 -06:00
bugfix: allow python healthcheck server to bind on ipv6 hosts (#2855)
* feat: python healthcheck bind address config * fix(healthcheck): config issue * fix(pyproject): bump version * fix(healthcheck): bind address default to all hosts? * fix(): linting via ./lint.sh * fix(pr): review * fix(healthcheck): cast "None" settings value -> None * fix: simplify validation logic * chore: changelog * fix: allow none * fix: add back validator * chore: clean up action listener arg passing * chore: version * fix: clean up instance vars a little bit --------- Co-authored-by: mrkaye97 <mrkaye97@gmail.com>
This commit is contained in:
@@ -5,6 +5,12 @@ All notable changes to Hatchet's Python SDK will be documented in this changelog
|
||||
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
|
||||
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [1.22.10] - 2026-01-26
|
||||
|
||||
### Added
|
||||
|
||||
- `HATCHET_CLIENT_WORKER_HEALTHCHECK_BIND_ADDRESS` now allows configuring the bind address for the worker healthcheck server (default: `0.0.0.0`)
|
||||
|
||||
## [1.22.9] - 2026-01-26
|
||||
|
||||
### Added
|
||||
|
||||
@@ -37,11 +37,11 @@ class HealthcheckConfig(BaseSettings):
|
||||
|
||||
port: int = 8001
|
||||
enabled: bool = False
|
||||
# HATCHET_CLIENT_WORKER_HEALTHCHECK_EVENT_LOOP_BLOCK_THRESHOLD_SECONDS
|
||||
event_loop_block_threshold_seconds: timedelta = Field(
|
||||
default=timedelta(seconds=5),
|
||||
description="If the worker listener process event loop appears blocked longer than this threshold, /health returns 503. Value is interpreted as seconds.",
|
||||
)
|
||||
bind_address: str | None = "0.0.0.0"
|
||||
|
||||
@field_validator("event_loop_block_threshold_seconds", mode="before")
|
||||
@classmethod
|
||||
@@ -62,6 +62,17 @@ class HealthcheckConfig(BaseSettings):
|
||||
|
||||
return timedelta(seconds=float(v))
|
||||
|
||||
@field_validator("bind_address", mode="after")
|
||||
@classmethod
|
||||
def validate_bind_address(cls, value: str | None) -> str | None:
|
||||
if value is None:
|
||||
return None
|
||||
|
||||
if value.lower() == "none" or not value.strip():
|
||||
return None
|
||||
|
||||
return value
|
||||
|
||||
|
||||
class OpenTelemetryConfig(BaseSettings):
|
||||
model_config = create_settings_config(
|
||||
|
||||
@@ -73,8 +73,6 @@ class WorkerActionListenerProcess:
|
||||
handle_kill: bool,
|
||||
debug: bool,
|
||||
labels: dict[str, str | int],
|
||||
enable_health_server: bool = False,
|
||||
healthcheck_port: int = 8001,
|
||||
) -> None:
|
||||
self.name = name
|
||||
self.actions = actions
|
||||
@@ -85,8 +83,6 @@ class WorkerActionListenerProcess:
|
||||
self.debug = debug
|
||||
self.labels = labels
|
||||
self.handle_kill = handle_kill
|
||||
self.enable_health_server = enable_health_server
|
||||
self.healthcheck_port = healthcheck_port
|
||||
|
||||
self._health_runner: web.AppRunner | None = None
|
||||
self._listener_health_gauge: Gauge | None = None
|
||||
@@ -94,9 +90,6 @@ class WorkerActionListenerProcess:
|
||||
self._event_loop_monitor_task: asyncio.Task[None] | None = None
|
||||
self._event_loop_last_lag_seconds: float = 0.0
|
||||
self._event_loop_blocked_since: float | None = None
|
||||
self._event_loop_block_threshold: timedelta = (
|
||||
self.config.healthcheck.event_loop_block_threshold_seconds
|
||||
)
|
||||
self._waiting_steps_blocked_since: float | None = None
|
||||
self._starting_since: float = time.time()
|
||||
|
||||
@@ -125,7 +118,7 @@ class WorkerActionListenerProcess:
|
||||
signal.SIGQUIT, lambda: asyncio.create_task(self.exit_gracefully())
|
||||
)
|
||||
|
||||
if self.enable_health_server:
|
||||
if self.config.healthcheck.enabled:
|
||||
self._listener_health_gauge = Gauge(
|
||||
"hatchet_worker_listener_health",
|
||||
"Listener health (1 healthy, 0 unhealthy)",
|
||||
@@ -147,7 +140,10 @@ class WorkerActionListenerProcess:
|
||||
lag = max(0.0, elapsed - interval)
|
||||
# If the loop is "completely blocked" across multiple monitor ticks,
|
||||
# report a continuously increasing lag value (time since first detected block).
|
||||
if timedelta(seconds=lag) >= self._event_loop_block_threshold:
|
||||
if (
|
||||
timedelta(seconds=lag)
|
||||
>= self.config.healthcheck.event_loop_block_threshold_seconds
|
||||
):
|
||||
if self._event_loop_blocked_since is None:
|
||||
self._event_loop_blocked_since = start + interval
|
||||
self._event_loop_last_lag_seconds = max(
|
||||
@@ -156,7 +152,10 @@ class WorkerActionListenerProcess:
|
||||
else:
|
||||
self._event_loop_last_lag_seconds = lag
|
||||
|
||||
if timedelta(seconds=lag) < self._event_loop_block_threshold:
|
||||
if (
|
||||
timedelta(seconds=lag)
|
||||
< self.config.healthcheck.event_loop_block_threshold_seconds
|
||||
):
|
||||
self._event_loop_blocked_since = None
|
||||
|
||||
def _starting_timed_out(self) -> bool:
|
||||
@@ -170,7 +169,7 @@ class WorkerActionListenerProcess:
|
||||
if (
|
||||
self._event_loop_blocked_since is not None
|
||||
and timedelta(seconds=(time.time() - self._event_loop_blocked_since))
|
||||
> self._event_loop_block_threshold
|
||||
> self.config.healthcheck.event_loop_block_threshold_seconds
|
||||
):
|
||||
return HealthStatus.UNHEALTHY
|
||||
|
||||
@@ -180,7 +179,7 @@ class WorkerActionListenerProcess:
|
||||
if (
|
||||
self._waiting_steps_blocked_since is not None
|
||||
and timedelta(seconds=(time.time() - self._waiting_steps_blocked_since))
|
||||
> self._event_loop_block_threshold
|
||||
> self.config.healthcheck.event_loop_block_threshold_seconds
|
||||
):
|
||||
return HealthStatus.UNHEALTHY
|
||||
|
||||
@@ -241,7 +240,7 @@ class WorkerActionListenerProcess:
|
||||
return web.Response(body=generate_latest(), content_type="text/plain")
|
||||
|
||||
async def start_health_server(self) -> None:
|
||||
if not self.enable_health_server:
|
||||
if not self.config.healthcheck.enabled:
|
||||
return
|
||||
|
||||
if self._health_runner is not None:
|
||||
@@ -259,14 +258,18 @@ class WorkerActionListenerProcess:
|
||||
|
||||
try:
|
||||
await runner.setup()
|
||||
await web.TCPSite(runner, "0.0.0.0", self.healthcheck_port).start()
|
||||
await web.TCPSite(
|
||||
runner,
|
||||
host=self.config.healthcheck.bind_address,
|
||||
port=self.config.healthcheck.port,
|
||||
).start()
|
||||
except Exception:
|
||||
logger.exception("failed to start healthcheck server (listener process)")
|
||||
return
|
||||
|
||||
self._health_runner = runner
|
||||
logger.info(
|
||||
f"healthcheck server (listener process) running on port {self.healthcheck_port}"
|
||||
f"healthcheck server (listener process) running on {self.config.healthcheck.bind_address}:{self.config.healthcheck.port}"
|
||||
)
|
||||
|
||||
if self._event_loop_monitor_task is None:
|
||||
@@ -510,9 +513,29 @@ class WorkerActionListenerProcess:
|
||||
logger.debug("forcefully closing listener...")
|
||||
|
||||
|
||||
def worker_action_listener_process(*args: Any, **kwargs: Any) -> None:
|
||||
def worker_action_listener_process(
|
||||
name: str,
|
||||
actions: list[str],
|
||||
slots: int,
|
||||
config: ClientConfig,
|
||||
action_queue: "Queue[Action]",
|
||||
event_queue: "Queue[ActionEvent | STOP_LOOP_TYPE]",
|
||||
handle_kill: bool,
|
||||
debug: bool,
|
||||
labels: dict[str, str | int],
|
||||
) -> None:
|
||||
async def run() -> None:
|
||||
process = WorkerActionListenerProcess(*args, **kwargs)
|
||||
process = WorkerActionListenerProcess(
|
||||
name=name,
|
||||
actions=actions,
|
||||
slots=slots,
|
||||
config=config,
|
||||
action_queue=action_queue,
|
||||
event_queue=event_queue,
|
||||
handle_kill=handle_kill,
|
||||
debug=debug,
|
||||
labels=labels,
|
||||
)
|
||||
await process.start_health_server()
|
||||
await process.start()
|
||||
# Keep the process running
|
||||
|
||||
@@ -333,8 +333,6 @@ class Worker:
|
||||
self.handle_kill,
|
||||
self.client.debug,
|
||||
self.labels,
|
||||
enable_health_server,
|
||||
healthcheck_port,
|
||||
),
|
||||
)
|
||||
process.start()
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[tool.poetry]
|
||||
name = "hatchet-sdk"
|
||||
version = "1.22.9"
|
||||
version = "1.22.10"
|
||||
description = "This is the official Python SDK for Hatchet, a distributed, fault-tolerant task queue. The SDK allows you to easily integrate Hatchet's task scheduling and workflow orchestration capabilities into your Python applications."
|
||||
authors = [
|
||||
"Alexander Belanger <alexander@hatchet.run>",
|
||||
|
||||
Reference in New Issue
Block a user