From 57b6efe83302d4a18a333cca8029e81109bceeb8 Mon Sep 17 00:00:00 2001 From: Dillon DuPont Date: Tue, 1 Jul 2025 12:24:59 -0400 Subject: [PATCH] implemented watchdog on server --- .../computer-server/computer_server/cli.py | 68 ++++ .../computer_server/watchdog.py | 337 ++++++++++++++++++ libs/python/computer-server/pyproject.toml | 3 +- 3 files changed, 407 insertions(+), 1 deletion(-) create mode 100644 libs/python/computer-server/computer_server/watchdog.py diff --git a/libs/python/computer-server/computer_server/cli.py b/libs/python/computer-server/computer_server/cli.py index 30f7e519..0fcbda00 100644 --- a/libs/python/computer-server/computer_server/cli.py +++ b/libs/python/computer-server/computer_server/cli.py @@ -3,11 +3,15 @@ Command-line interface for the Computer API server. """ import argparse +import asyncio import logging +import os import sys +import threading from typing import List, Optional from .server import Server +from .watchdog import Watchdog logger = logging.getLogger(__name__) @@ -37,6 +41,22 @@ def parse_args(args: Optional[List[str]] = None) -> argparse.Namespace: type=str, help="Path to SSL certificate file (enables HTTPS)", ) + parser.add_argument( + "--watchdog", + action="store_true", + help="Enable watchdog monitoring (automatically enabled if CONTAINER_NAME env var is set)", + ) + parser.add_argument( + "--watchdog-interval", + type=int, + default=30, + help="Watchdog ping interval in seconds (default: 30)", + ) + parser.add_argument( + "--no-restart", + action="store_true", + help="Disable automatic server restart in watchdog", + ) return parser.parse_args(args) @@ -51,6 +71,54 @@ def main() -> None: format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) + # Check if watchdog should be enabled + container_name = os.environ.get("CONTAINER_NAME") + enable_watchdog = args.watchdog or bool(container_name) + + if container_name: + logger.info(f"Container environment detected (CONTAINER_NAME={container_name}), enabling watchdog") + elif args.watchdog: + logger.info("Watchdog explicitly enabled via --watchdog flag") + + # Start watchdog if enabled + if enable_watchdog: + logger.info(f"Starting watchdog monitoring with {args.watchdog_interval}s interval") + + def run_watchdog_thread(): + """Run watchdog in a separate thread.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + # Create CLI args dict for watchdog + cli_args = { + 'host': args.host, + 'port': args.port, + 'log_level': args.log_level, + 'ssl_keyfile': args.ssl_keyfile, + 'ssl_certfile': args.ssl_certfile + } + + # Create watchdog with restart settings + watchdog = Watchdog( + cli_args=cli_args, + ping_interval=args.watchdog_interval + ) + watchdog.restart_enabled = not args.no_restart + + loop.run_until_complete(watchdog.start_monitoring()) + except Exception as e: + logger.error(f"Watchdog error: {e}") + finally: + loop.close() + + # Start watchdog in background thread + watchdog_thread = threading.Thread( + target=run_watchdog_thread, + daemon=True, + name="watchdog" + ) + watchdog_thread.start() + # Create and start the server logger.info(f"Starting CUA Computer API server on {args.host}:{args.port}...") diff --git a/libs/python/computer-server/computer_server/watchdog.py b/libs/python/computer-server/computer_server/watchdog.py new file mode 100644 index 00000000..bfa9fc65 --- /dev/null +++ b/libs/python/computer-server/computer_server/watchdog.py @@ -0,0 +1,337 @@ +""" +Watchdog module for monitoring the Computer API server health. +Unix/Linux only - provides process management and restart capabilities. +""" + +import asyncio +import fcntl +import json +import logging +import os +import platform +import subprocess +import sys +import time +import websockets +from typing import Optional + +logger = logging.getLogger(__name__) + + +def instance_already_running(label="watchdog"): + """ + Detect if an an instance with the label is already running, globally + at the operating system level. + + Using `os.open` ensures that the file pointer won't be closed + by Python's garbage collector after the function's scope is exited. + + The lock will be released when the program exits, or could be + released if the file pointer were closed. + """ + + lock_file_pointer = os.open(f"/tmp/instance_{label}.lock", os.O_WRONLY | os.O_CREAT) + + try: + fcntl.lockf(lock_file_pointer, fcntl.LOCK_EX | fcntl.LOCK_NB) + already_running = False + except IOError: + already_running = True + + return already_running + + +class Watchdog: + """Watchdog class to monitor server health via WebSocket connection. + Unix/Linux only - provides restart capabilities. + """ + + def __init__(self, cli_args: Optional[dict] = None, ping_interval: int = 30): + """ + Initialize the watchdog. + + Args: + cli_args: Dictionary of CLI arguments to replicate when restarting + ping_interval: Interval between ping checks in seconds + """ + # Check if running on Unix/Linux + if platform.system() not in ['Linux', 'Darwin']: + raise RuntimeError("Watchdog is only supported on Unix/Linux systems") + + # Store CLI arguments for restart + self.cli_args = cli_args or {} + self.host = self.cli_args.get('host', 'localhost') + self.port = self.cli_args.get('port', 8000) + self.ping_interval = ping_interval + self.container_name = os.environ.get("CONTAINER_NAME") + self.running = False + self.restart_enabled = True + + @property + def ws_uri(self) -> str: + """Get the WebSocket URI using the current IP address. + + Returns: + WebSocket URI for the Computer API Server + """ + ip_address = "localhost" if not self.container_name else f"{self.container_name}.containers.cloud.trycua.com" + protocol = "wss" if self.container_name else "ws" + port = "8443" if self.container_name else "8000" + return f"{protocol}://{ip_address}:{port}/ws" + + async def ping(self) -> bool: + """ + Test connection to the WebSocket endpoint. + + Returns: + True if connection successful, False otherwise + """ + try: + # Create a simple ping message + ping_message = { + "type": "ping", + "timestamp": time.time() + } + + # Try to connect to the WebSocket + async with websockets.connect( + self.ws_uri, + timeout=10, + max_size=1024 * 1024 * 10 # 10MB limit to match server + ) as websocket: + # Send ping message + await websocket.send(json.dumps(ping_message)) + + # Wait for any response or just close + try: + response = await asyncio.wait_for(websocket.recv(), timeout=5) + logger.debug(f"Ping response received: {response[:100]}...") + except asyncio.TimeoutError: + # No response is fine for a ping + logger.debug("Ping sent successfully (no response expected)") + + return True + + except Exception as e: + logger.warning(f"Ping failed: {e}") + return False + + def kill_processes_on_port(self, port: int) -> bool: + """ + Kill any processes using the specified port. + + Args: + port: Port number to check and kill processes on + + Returns: + True if processes were killed or none found, False on error + """ + try: + # Find processes using the port + result = subprocess.run( + ["lsof", "-ti", f":{port}"], + capture_output=True, + text=True, + timeout=10 + ) + + if result.returncode == 0 and result.stdout.strip(): + pids = result.stdout.strip().split('\n') + logger.info(f"Found {len(pids)} processes using port {port}: {pids}") + + # Kill each process + for pid in pids: + if pid.strip(): + try: + subprocess.run(["kill", "-9", pid.strip()], timeout=5) + logger.info(f"Killed process {pid}") + except subprocess.TimeoutExpired: + logger.warning(f"Timeout killing process {pid}") + except Exception as e: + logger.warning(f"Error killing process {pid}: {e}") + + return True + else: + logger.debug(f"No processes found using port {port}") + return True + + except subprocess.TimeoutExpired: + logger.error(f"Timeout finding processes on port {port}") + return False + except Exception as e: + logger.error(f"Error finding processes on port {port}: {e}") + return False + + def restart_server(self) -> bool: + """ + Attempt to restart the server by killing existing processes and starting new one. + + Returns: + True if restart was attempted, False on error + """ + if not self.restart_enabled: + logger.info("Server restart is disabled") + return False + + try: + logger.info("Attempting to restart server...") + + # Kill processes on the port + port_to_kill = 8443 if self.container_name else self.port + if not self.kill_processes_on_port(port_to_kill): + logger.error("Failed to kill processes on port, restart aborted") + return False + + # Wait a moment for processes to die + time.sleep(2) + + # Try to restart the server + # In container mode, we can't easily restart, so just log + if self.container_name: + logger.warning("Container mode detected - cannot restart server automatically") + logger.warning("Container orchestrator should handle restart") + return False + else: + # For local mode, try to restart the CLI + logger.info("Attempting to restart local server...") + + # Get the current Python executable and script + python_exe = sys.executable + + # Try to find the CLI module + try: + # Build command with all original CLI arguments + cmd = [python_exe, "-m", "computer_server.cli"] + + # Add all CLI arguments except watchdog-related ones + for key, value in self.cli_args.items(): + if key in ['watchdog', 'watchdog_interval', 'no_restart']: + continue # Skip watchdog args to avoid recursive watchdog + + # Convert underscores to hyphens for CLI args + arg_name = f"--{key.replace('_', '-')}" + + if isinstance(value, bool): + if value: # Only add flag if True + cmd.append(arg_name) + else: + cmd.extend([arg_name, str(value)]) + + logger.info(f"Starting server with command: {' '.join(cmd)}") + + # Start process in background + subprocess.Popen( + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + start_new_session=True + ) + + logger.info("Server restart initiated") + return True + + except Exception as e: + logger.error(f"Failed to restart server: {e}") + return False + + except Exception as e: + logger.error(f"Error during server restart: {e}") + return False + + async def start_monitoring(self) -> None: + """Start the watchdog monitoring loop.""" + self.running = True + logger.info(f"Starting watchdog monitoring for {self.ws_uri}") + logger.info(f"Ping interval: {self.ping_interval} seconds") + if self.container_name: + logger.info(f"Container mode detected: {self.container_name}") + + consecutive_failures = 0 + max_failures = 3 + + while self.running: + try: + success = await self.ping() + + if success: + if consecutive_failures > 0: + logger.info("Server connection restored") + consecutive_failures = 0 + logger.debug("Ping successful") + else: + consecutive_failures += 1 + logger.warning(f"Ping failed ({consecutive_failures}/{max_failures})") + + if consecutive_failures >= max_failures: + logger.error(f"Server appears to be down after {max_failures} consecutive failures") + + # Attempt to restart the server + if self.restart_enabled: + logger.info("Attempting automatic server restart...") + restart_success = self.restart_server() + + if restart_success: + logger.info("Server restart initiated, waiting before next ping...") + # Wait longer after restart attempt + await asyncio.sleep(self.ping_interval * 2) + consecutive_failures = 0 # Reset counter after restart attempt + else: + logger.error("Server restart failed") + else: + logger.warning("Automatic restart is disabled") + + # Wait for next ping interval + await asyncio.sleep(self.ping_interval) + + except asyncio.CancelledError: + logger.info("Watchdog monitoring cancelled") + break + except Exception as e: + logger.error(f"Unexpected error in watchdog loop: {e}") + await asyncio.sleep(self.ping_interval) + + def stop_monitoring(self) -> None: + """Stop the watchdog monitoring.""" + self.running = False + logger.info("Stopping watchdog monitoring") + + +async def run_watchdog(cli_args: Optional[dict] = None, ping_interval: int = 30) -> None: + """ + Run the watchdog monitoring. + + Args: + cli_args: Dictionary of CLI arguments to replicate when restarting + ping_interval: Interval between ping checks in seconds + """ + watchdog = Watchdog(cli_args=cli_args, ping_interval=ping_interval) + + try: + await watchdog.start_monitoring() + except KeyboardInterrupt: + logger.info("Watchdog stopped by user") + finally: + watchdog.stop_monitoring() + + +if __name__ == "__main__": + # For testing the watchdog standalone + import argparse + + parser = argparse.ArgumentParser(description="Run Computer API server watchdog") + parser.add_argument("--host", default="localhost", help="Server host to monitor") + parser.add_argument("--port", type=int, default=8000, help="Server port to monitor") + parser.add_argument("--ping-interval", type=int, default=30, help="Ping interval in seconds") + + args = parser.parse_args() + + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + ) + + cli_args = { + 'host': args.host, + 'port': args.port + } + asyncio.run(run_watchdog(cli_args, args.ping_interval)) diff --git a/libs/python/computer-server/pyproject.toml b/libs/python/computer-server/pyproject.toml index 090da016..6e9e7240 100644 --- a/libs/python/computer-server/pyproject.toml +++ b/libs/python/computer-server/pyproject.toml @@ -20,7 +20,8 @@ dependencies = [ "pynput>=1.8.1", "pillow>=10.2.0", "aiohttp>=3.9.1", - "pyperclip>=1.9.0" + "pyperclip>=1.9.0", + "websockets>=12.0" ] [project.optional-dependencies]