Files
hatchet/sdks/python/tests/worker_fixture.py
Matt Kaye 1eeb8e915d Fix: Queue blocking on many concurrency keys + failures (#1622)
* fix: move around case ordering

* feat: move worker fixture around

* fix: clean up fixtures

* feat: expand test, try to repro

* debug: more minimal repro

* fix: bug bashing

* fix: factor out concurrency queries into overwrite

* feat: improve test

* fix: improve test

* fix: lint

* feat: migration for trigger

* Fix: Retry + Cancel Bugs (#1620)

* fix: send original retry count to cancel

* fix: key threads, contexts, and tasks on retry count

* fix: store the key on the action and use it everywhere

* refactor: signature consistency

* fix: instrumentor types

* chore: version

* feat: comment

* fix: thank you mypy

* fix: simplify callback

* fix: ts implementation

* chore: lint

* fix: rework how retries are passed

* Fix: Add Retries to Log Pushes (#1594)

* fix: retry log pushes

* chore: version
2025-04-25 21:49:30 -04:00

79 lines
2.1 KiB
Python

import logging
import os
import subprocess
import time
from contextlib import contextmanager
from io import BytesIO
from threading import Thread
from typing import Callable, Generator
import psutil
import requests
def wait_for_worker_health(healthcheck_port: int) -> bool:
worker_healthcheck_attempts = 0
max_healthcheck_attempts = 25
while True:
if worker_healthcheck_attempts > max_healthcheck_attempts:
raise Exception(
f"Worker failed to start within {max_healthcheck_attempts} seconds"
)
try:
requests.get(f"http://localhost:{healthcheck_port}/health", timeout=5)
return True
except Exception:
time.sleep(1)
worker_healthcheck_attempts += 1
def log_output(pipe: BytesIO, log_func: Callable[[str], None]) -> None:
for line in iter(pipe.readline, b""):
print(line.decode().strip())
@contextmanager
def hatchet_worker(
command: list[str],
healthcheck_port: int = 8001,
) -> Generator[subprocess.Popen[bytes], None, None]:
logging.info(f"Starting background worker: {' '.join(command)}")
os.environ["HATCHET_CLIENT_WORKER_HEALTHCHECK_PORT"] = str(healthcheck_port)
env = os.environ.copy()
proc = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env
)
# Check if the process is still running
if proc.poll() is not None:
raise Exception(f"Worker failed to start with return code {proc.returncode}")
Thread(target=log_output, args=(proc.stdout, logging.info), daemon=True).start()
Thread(target=log_output, args=(proc.stderr, logging.error), daemon=True).start()
wait_for_worker_health(healthcheck_port=healthcheck_port)
yield proc
logging.info("Cleaning up background worker")
parent = psutil.Process(proc.pid)
children = parent.children(recursive=True)
for child in children:
child.terminate()
parent.terminate()
_, alive = psutil.wait_procs([parent] + children, timeout=5)
for p in alive:
logging.warning(f"Force killing process {p.pid}")
p.kill()