diff --git a/examples/computer_examples.py b/examples/computer_examples.py index 212d6041..4d4482ab 100644 --- a/examples/computer_examples.py +++ b/examples/computer_examples.py @@ -22,8 +22,6 @@ for path in pythonpath.split(":"): from computer import Computer, VMProviderType from computer.logger import LogLevel -from computer.utils import get_image_size - async def main(): try: @@ -31,15 +29,20 @@ async def main(): # Create computer with configured host computer = Computer( - display="1024x768", # Higher resolution - memory="8GB", # More memory - cpu="4", # More CPU cores + display="1024x768", + memory="8GB", + cpu="4", os_type="macos", verbosity=LogLevel.NORMAL, # Use QUIET to suppress most logs - use_host_computer_server=False, - provider_type=VMProviderType.LUME, # Explicitly use the Lume provider + provider_type=VMProviderType.LUME, + storage="/Users/francescobonacci/repos/trycua/computer/examples/storage", + # shared_directories=[ + # "/Users/francescobonacci/repos/trycua/computer/examples/shared" + # ] ) + try: + # Run the computer with default parameters await computer.run() await computer.interface.hotkey("command", "space") @@ -89,8 +92,7 @@ async def main(): finally: # Important to clean up resources - pass - # await computer.stop() + await computer.stop() except Exception as e: print(f"Error in main: {e}") traceback.print_exc() diff --git a/libs/computer/computer/computer.py b/libs/computer/computer/computer.py index d294d659..22413ddf 100644 --- a/libs/computer/computer/computer.py +++ b/libs/computer/computer/computer.py @@ -35,8 +35,9 @@ class Computer: telemetry_enabled: bool = True, provider_type: Union[str, VMProviderType] = VMProviderType.LUME, port: Optional[int] = 3000, + noVNC_port: Optional[int] = 8006, host: str = os.environ.get("PYLUME_HOST", "localhost"), - storage_path: Optional[str] = None, + storage: Optional[str] = None # Path for persistent VM storage (Lumier provider) ): """Initialize a new Computer instance. @@ -48,7 +49,7 @@ class Computer: Defaults to "1024x768" memory: The VM memory allocation. Defaults to "8GB" cpu: The VM CPU allocation. Defaults to "4" - os: The operating system type ('macos' or 'linux') + os_type: The operating system type ('macos' or 'linux') name: The VM name image: The VM image name shared_directories: Optional list of directory paths to share with the VM @@ -58,9 +59,9 @@ class Computer: telemetry_enabled: Whether to enable telemetry tracking. Defaults to True. provider_type: The VM provider type to use (lume, qemu, cloud) port: Optional port to use for the VM provider server + noVNC_port: Optional port for the noVNC web interface (Lumier provider) host: Host to use for VM provider connections (e.g. "localhost", "host.docker.internal") - bin_path: Optional path to the VM provider binary - storage_path: Optional path to store VM data + storage: Optional path for persistent VM storage (Lumier provider) """ self.logger = Logger("cua.computer", verbosity) @@ -69,10 +70,18 @@ class Computer: # Store original parameters self.image = image self.port = port + self.noVNC_port = noVNC_port self.host = host self.os_type = os_type self.provider_type = provider_type - self.storage_path = storage_path + self.storage = storage + + # For Lumier provider, store the first shared directory path to use + # for VM file sharing + self.shared_path = None + if shared_directories and len(shared_directories) > 0: + self.shared_path = shared_directories[0] + self.logger.info(f"Using first shared directory for VM file sharing: {self.shared_path}") # Store telemetry preference self._telemetry_enabled = telemetry_enabled @@ -202,12 +211,29 @@ class Computer: # Configure provider based on initialization parameters provider_kwargs = { - "storage_path": self.storage_path, + "storage": self.storage, "verbose": self.verbosity >= LogLevel.DEBUG, } - - # Set port if specified - if self.port is not None: + + # VM name is already set in self.config.name and will be used when calling provider methods + + # For Lumier provider, add specific configuration + if self.provider_type == VMProviderType.LUMIER: + # Pass VM image to LumierProvider + provider_kwargs["image"] = self.image + self.logger.info(f"Using VM image for Lumier provider: {self.image}") + + # Add shared_path if specified (for file sharing between host and VM) + if self.shared_path: + provider_kwargs["shared_path"] = self.shared_path + self.logger.info(f"Using shared path for Lumier provider: {self.shared_path}") + + # Add noVNC_port if specified (for web interface) + if self.noVNC_port: + provider_kwargs["noVNC_port"] = self.noVNC_port + self.logger.info(f"Using noVNC port for Lumier provider: {self.noVNC_port}") + elif self.port is not None: + # For other providers, set port if specified provider_kwargs["port"] = self.port self.logger.verbose(f"Using specified port for provider: {self.port}") @@ -257,53 +283,52 @@ class Computer: path = os.path.abspath(os.path.expanduser(path)) if not os.path.exists(path): self.logger.warning(f"Shared directory does not exist: {path}") - continue - shared_dirs.append({"host_path": path, "vm_path": path}) - - # Create VM run options with specs from config - # Account for optional shared directories + + # Define VM run options run_opts = { - "cpu": int(self.config.cpu), + "noDisplay": False, + "sharedDirectories": shared_dirs, + "display": self.config.display, "memory": self.config.memory, - "display": { - "width": self.config.display.width, - "height": self.config.display.height - } + "cpu": self.config.cpu } - if shared_dirs: - run_opts["shared_directories"] = shared_dirs - - # Log the run options for debugging + # For Lumier provider, pass the noVNC_port if specified + if self.provider_type == VMProviderType.LUMIER and self.noVNC_port is not None: + run_opts["noVNC_port"] = self.noVNC_port + self.logger.info(f"Using noVNC_port {self.noVNC_port} for Lumier provider") self.logger.info(f"VM run options: {run_opts}") - # Log the equivalent curl command for debugging - payload = json.dumps({"noDisplay": False, "sharedDirectories": []}) - curl_cmd = f"curl -X POST 'http://localhost:3000/lume/vms/{self.config.name}/run' -H 'Content-Type: application/json' -d '{payload}'" - # self.logger.info(f"Equivalent curl command:") - # self.logger.info(f"{curl_cmd}") - try: if self.config.vm_provider is None: raise RuntimeError(f"VM provider not initialized for {self.config.name}") - response = await self.config.vm_provider.run_vm(self.config.name, run_opts) + response = await self.config.vm_provider.run_vm( + name=self.config.name, + run_opts=run_opts, + storage=self.storage # Pass storage explicitly for clarity + ) self.logger.info(f"VM run response: {response if response else 'None'}") except Exception as run_error: self.logger.error(f"Failed to run VM: {run_error}") raise RuntimeError(f"Failed to start VM: {run_error}") - # Wait for VM to be ready with required properties - self.logger.info("Waiting for VM to be ready...") + # Wait for VM to be ready with a valid IP address + self.logger.info("Waiting for VM to be ready with a valid IP address...") try: - ip = await self.get_ip() - if ip: - self.logger.info(f"VM is ready with IP: {ip}") - # Store the IP address for later use instead of returning early - ip_address = ip - else: - # If no IP was found, try to raise a helpful error - raise RuntimeError(f"VM {self.config.name} failed to get IP address") + # Use the enhanced get_ip method that includes retry logic + max_retries = 30 # Increased for initial VM startup + retry_delay = 2 # 2 seconds between retries + + self.logger.info(f"Waiting up to {max_retries * retry_delay} seconds for VM to be ready...") + ip = await self.get_ip(max_retries=max_retries, retry_delay=retry_delay) + + # If we get here, we have a valid IP + self.logger.info(f"VM is ready with IP: {ip}") + ip_address = ip + except TimeoutError as timeout_error: + self.logger.error(str(timeout_error)) + raise RuntimeError(f"VM startup timed out: {timeout_error}") except Exception as wait_error: self.logger.error(f"Error waiting for VM: {wait_error}") raise RuntimeError(f"VM failed to become ready: {wait_error}") @@ -312,6 +337,10 @@ class Computer: raise RuntimeError(f"Failed to initialize computer: {e}") try: + # Verify we have a valid IP before initializing the interface + if not ip_address or ip_address == "unknown" or ip_address == "0.0.0.0": + raise RuntimeError(f"Cannot initialize interface - invalid IP address: {ip_address}") + # Initialize the interface using the factory with the specified OS self.logger.info(f"Initializing interface for {self.os_type} at {ip_address}") from .interface.base import BaseComputerInterface @@ -328,10 +357,11 @@ class Computer: try: # Use a single timeout for the entire connection process - await self._interface.wait_for_ready(timeout=60) + # The VM should already be ready at this point, so we're just establishing the connection + await self._interface.wait_for_ready(timeout=30) self.logger.info("WebSocket interface connected successfully") except TimeoutError as e: - self.logger.error("Failed to connect to WebSocket interface") + self.logger.error(f"Failed to connect to WebSocket interface at {ip_address}") raise TimeoutError( f"Could not connect to WebSocket interface at {ip_address}:8000/ws: {str(e)}" ) @@ -359,44 +389,26 @@ class Computer: start_time = time.time() try: - if self._running: - self._running = False - self.logger.info("Stopping Computer...") + self.logger.info("Stopping Computer...") - if hasattr(self, "_stop_event"): - self._stop_event.set() - if hasattr(self, "_keep_alive_task"): - await self._keep_alive_task - - if self._interface: # Only try to close interface if it exists - self.logger.verbose("Closing interface...") - # For host computer server, just use normal close to keep the server running - if self.use_host_computer_server: - self._interface.close() - else: - # For VM mode, force close the connection - if hasattr(self._interface, "force_close"): - self._interface.force_close() - else: - self._interface.close() - - if not self.use_host_computer_server and self._provider_context: + # In VM mode, first explicitly stop the VM, then exit the provider context + if not self.use_host_computer_server and self._provider_context and self.config.vm_provider is not None: try: self.logger.info(f"Stopping VM {self.config.name}...") - if self.config.vm_provider is not None: - await self.config.vm_provider.stop_vm(self.config.name) + await self.config.vm_provider.stop_vm( + name=self.config.name, + storage=self.storage # Pass storage explicitly for clarity + ) except Exception as e: self.logger.error(f"Error stopping VM: {e}") self.logger.verbose("Closing VM provider context...") - if self.config.vm_provider is not None: - await self.config.vm_provider.__aexit__(None, None, None) + await self.config.vm_provider.__aexit__(None, None, None) self._provider_context = None + self.logger.info("Computer stopped") except Exception as e: - self.logger.debug( - f"Error during cleanup: {e}" - ) # Log as debug since this might be expected + self.logger.debug(f"Error during cleanup: {e}") # Log as debug since this might be expected finally: # Log stop time for performance monitoring duration_ms = (time.time() - start_time) * 1000 @@ -404,12 +416,69 @@ class Computer: return # @property - async def get_ip(self) -> str: - """Get the IP address of the VM or localhost if using host computer server.""" + async def get_ip(self, max_retries: int = 15, retry_delay: int = 2) -> str: + """Get the IP address of the VM or localhost if using host computer server. + + Args: + max_retries: Maximum number of retries to get the IP (default: 15) + retry_delay: Delay between retries in seconds (default: 2) + + Returns: + IP address of the VM or localhost if using host computer server + + Raises: + TimeoutError: If unable to get a valid IP address after retries + """ if self.use_host_computer_server: return "127.0.0.1" - ip = await self.config.get_ip() - return ip or "unknown" # Return "unknown" if ip is None + + # Try multiple times to get a valid IP + for attempt in range(1, max_retries + 1): + if attempt > 1: + self.logger.info(f"Retrying to get VM IP address (attempt {attempt}/{max_retries})...") + + try: + # Get VM information from the provider + if self.config.vm_provider is None: + raise RuntimeError("VM provider is not initialized") + + # Get VM info from provider with explicit storage parameter + vm_info = await self.config.vm_provider.get_vm( + name=self.config.name, + storage=self.storage # Pass storage explicitly for clarity + ) + + # Check if we got a valid IP + ip = vm_info.get("ip_address", None) + if ip and ip != "unknown" and not ip.startswith("0.0.0.0"): + self.logger.info(f"Got valid VM IP address: {ip}") + return ip + + # Check the VM status + status = vm_info.get("status", "unknown") + + # If the VM is in a non-running state (stopped, paused, etc.) + # raise a more informative error instead of waiting + if status in ["stopped"]: + raise RuntimeError(f"VM is not running yet (status: {status})") + + # If VM is starting or initializing, wait and retry + if status != "running": + self.logger.info(f"VM is not running yet (status: {status}). Waiting...") + await asyncio.sleep(retry_delay) + continue + + # If VM is running but no IP yet, wait and retry + self.logger.info("VM is running but no valid IP address yet. Waiting...") + + except Exception as e: + self.logger.warning(f"Error getting VM IP: {e}") + + await asyncio.sleep(retry_delay) + + # If we get here, we couldn't get a valid IP after all retries + raise TimeoutError(f"Failed to get valid IP address for VM {self.config.name} after {max_retries} attempts") + async def wait_vm_ready(self) -> Optional[Dict[str, Any]]: """Wait for VM to be ready with an IP address. @@ -434,7 +503,11 @@ class Computer: try: # Keep polling for VM info - vm = await self.config.vm_provider.get_vm(self.config.name) + if self.config.vm_provider is None: + self.logger.error("VM provider is not initialized") + vm = None + else: + vm = await self.config.vm_provider.get_vm(self.config.name) # Log full VM properties for debugging (every 30 attempts) if attempts % 30 == 0: @@ -492,9 +565,9 @@ class Computer: try: if self.config.vm_provider is not None: vm = await self.config.vm_provider.get_vm(self.config.name) - # VMStatus is a Pydantic model with attributes, not a dictionary - status = vm.status if vm else "unknown" - ip = vm.ip_address if vm else None + # VM data is returned as a dictionary from the Lumier provider + status = vm.get('status', 'unknown') if vm else "unknown" + ip = vm.get('ip_address') if vm else None else: status = "unknown" ip = None @@ -516,7 +589,11 @@ class Computer: "memory": memory or self.config.memory } if self.config.vm_provider is not None: - await self.config.vm_provider.update_vm(self.config.name, update_opts) + await self.config.vm_provider.update_vm( + name=self.config.name, + update_opts=update_opts, + storage=self.storage # Pass storage explicitly for clarity + ) else: raise RuntimeError("VM provider not initialized") diff --git a/libs/computer/computer/interface/macos.py b/libs/computer/computer/interface/macos.py index 2460086c..cac0b9e5 100644 --- a/libs/computer/computer/interface/macos.py +++ b/libs/computer/computer/interface/macos.py @@ -17,7 +17,6 @@ class MacOSComputerInterface(BaseComputerInterface): def __init__(self, ip_address: str, username: str = "lume", password: str = "lume"): super().__init__(ip_address, username, password) - self.ws_uri = f"ws://{ip_address}:8000/ws" self._ws = None self._reconnect_task = None self._closed = False @@ -31,6 +30,15 @@ class MacOSComputerInterface(BaseComputerInterface): # Set logger name for MacOS interface self.logger = Logger("cua.interface.macos", LogLevel.NORMAL) + @property + def ws_uri(self) -> str: + """Get the WebSocket URI using the current IP address. + + Returns: + WebSocket URI for the Computer API Server + """ + return f"ws://{self.ip_address}:8000/ws" + async def _keep_alive(self): """Keep the WebSocket connection alive with automatic reconnection.""" retry_count = 0 diff --git a/libs/computer/computer/models.py b/libs/computer/computer/models.py index f042641a..5ead143f 100644 --- a/libs/computer/computer/models.py +++ b/libs/computer/computer/models.py @@ -37,6 +37,11 @@ class Computer: return None vm = await self.vm_provider.get_vm(self.name) - # PyLume returns a VMStatus object, not a dictionary - # Access ip_address as an attribute, not with .get() - return vm.ip_address if vm else None \ No newline at end of file + # Handle both object attribute and dictionary access for ip_address + if vm: + if isinstance(vm, dict): + return vm.get("ip_address") + else: + # Access as attribute for object-based return values + return getattr(vm, "ip_address", None) + return None \ No newline at end of file diff --git a/libs/computer/computer/providers/base.py b/libs/computer/computer/providers/base.py index b9e0c6ac..555a7567 100644 --- a/libs/computer/computer/providers/base.py +++ b/libs/computer/computer/providers/base.py @@ -8,6 +8,7 @@ from typing import Dict, List, Optional, Any, AsyncContextManager class VMProviderType(str, Enum): """Enum of supported VM provider types.""" LUME = "lume" + LUMIER = "lumier" QEMU = "qemu" CLOUD = "cloud" UNKNOWN = "unknown" @@ -26,8 +27,17 @@ class BaseVMProvider(AsyncContextManager): pass @abc.abstractmethod - async def get_vm(self, name: str) -> Dict[str, Any]: - """Get VM information by name.""" + async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: + """Get VM information by name. + + Args: + name: Name of the VM to get information for + storage: Optional storage path override. If provided, this will be used + instead of the provider's default storage path. + + Returns: + Dictionary with VM information including status, IP address, etc. + """ pass @abc.abstractmethod @@ -36,16 +46,45 @@ class BaseVMProvider(AsyncContextManager): pass @abc.abstractmethod - async def run_vm(self, name: str, run_opts: Dict[str, Any]) -> Dict[str, Any]: - """Run a VM with the given options.""" + async def run_vm(self, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]: + """Run a VM by name with the given options. + + Args: + name: Name of the VM to run + run_opts: Dictionary of run options (memory, cpu, etc.) + storage: Optional storage path override. If provided, this will be used + instead of the provider's default storage path. + + Returns: + Dictionary with VM run status and information + """ pass @abc.abstractmethod - async def stop_vm(self, name: str) -> Dict[str, Any]: - """Stop a running VM.""" + async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: + """Stop a VM by name. + + Args: + name: Name of the VM to stop + storage: Optional storage path override. If provided, this will be used + instead of the provider's default storage path. + + Returns: + Dictionary with VM stop status and information + """ pass @abc.abstractmethod - async def update_vm(self, name: str, update_opts: Dict[str, Any]) -> Dict[str, Any]: - """Update VM configuration.""" + async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]: + """Update VM configuration. + + Args: + name: Name of the VM to update + update_opts: Dictionary of update options (memory, cpu, etc.) + storage: Optional storage path override. If provided, this will be used + instead of the provider's default storage path. + + Returns: + Dictionary with VM update status and information + """ pass diff --git a/libs/computer/computer/providers/factory.py b/libs/computer/computer/providers/factory.py index d6e81660..428c1af3 100644 --- a/libs/computer/computer/providers/factory.py +++ b/libs/computer/computer/providers/factory.py @@ -14,13 +14,29 @@ class VMProviderFactory: @staticmethod def create_provider( provider_type: Union[str, VMProviderType], - **kwargs + port: Optional[int] = None, + host: str = "localhost", + bin_path: Optional[str] = None, + storage: Optional[str] = None, + shared_path: Optional[str] = None, + image: Optional[str] = None, + verbose: bool = False, + ephemeral: bool = False, + noVNC_port: Optional[int] = None ) -> BaseVMProvider: """Create a VM provider of the specified type. Args: provider_type: Type of VM provider to create - **kwargs: Additional arguments to pass to the provider constructor + port: Port for the API server + host: Hostname for the API server + bin_path: Path to provider binary if needed + storage: Path for persistent VM storage + shared_path: Path for shared folder between host and VM + image: VM image to use (for Lumier provider) + verbose: Enable verbose logging + ephemeral: Use ephemeral (temporary) storage + noVNC_port: Specific port for noVNC interface (for Lumier provider) Returns: An instance of the requested VM provider @@ -44,17 +60,53 @@ class VMProviderFactory: "The pylume package is required for LumeProvider. " "Please install it with 'pip install cua-computer[lume]'" ) - return LumeProvider(**kwargs) + return LumeProvider( + port=port, + host=host, + bin_path=bin_path, + storage=storage, + verbose=verbose + ) except ImportError as e: logger.error(f"Failed to import LumeProvider: {e}") raise ImportError( "The pylume package is required for LumeProvider. " "Please install it with 'pip install cua-computer[lume]'" ) from e + elif provider_type == VMProviderType.LUMIER: + try: + from .lumier import LumierProvider, HAS_LUMIER + if not HAS_LUMIER: + raise ImportError( + "Docker is required for LumierProvider. " + "Please install Docker for Apple Silicon and Lume CLI before using this provider." + ) + return LumierProvider( + port=port, + host=host, + storage=storage, + shared_path=shared_path, + image=image or "macos-sequoia-cua:latest", + verbose=verbose, + ephemeral=ephemeral, + noVNC_port=noVNC_port + ) + except ImportError as e: + logger.error(f"Failed to import LumierProvider: {e}") + raise ImportError( + "Docker and Lume CLI are required for LumierProvider. " + "Please install Docker for Apple Silicon and run the Lume installer script." + ) from e elif provider_type == VMProviderType.QEMU: try: from .qemu import QEMUProvider - return QEMUProvider(**kwargs) + return QEMUProvider( + bin_path=bin_path, + storage=storage, + port=port, + host=host, + verbose=verbose + ) except ImportError as e: logger.error(f"Failed to import QEMUProvider: {e}") raise ImportError( @@ -64,7 +116,13 @@ class VMProviderFactory: elif provider_type == VMProviderType.CLOUD: try: from .cloud import CloudProvider - return CloudProvider(**kwargs) + # Cloud provider might need different parameters, but including basic ones + return CloudProvider( + host=host, + port=port, + storage=storage, + verbose=verbose + ) except ImportError as e: logger.error(f"Failed to import CloudProvider: {e}") raise ImportError( diff --git a/libs/computer/computer/providers/lume/provider.py b/libs/computer/computer/providers/lume/provider.py index e433fce3..31671d98 100644 --- a/libs/computer/computer/providers/lume/provider.py +++ b/libs/computer/computer/providers/lume/provider.py @@ -1,73 +1,70 @@ -"""Lume VM provider implementation.""" +"""Lume VM provider implementation using curl commands. +This provider uses direct curl commands to interact with the Lume API, +removing the dependency on the pylume Python package. +""" + +import os +import re +import asyncio +import json +import subprocess import logging -from typing import Dict, List, Optional, Any, Tuple, TypeVar, Type - -# Only import pylume when this module is actually used -try: - from pylume import PyLume - from pylume.models import VMRunOpts, VMUpdateOpts, ImageRef, SharedDirectory, VMStatus - HAS_PYLUME = True -except ImportError: - HAS_PYLUME = False - # Create dummy classes for type checking - class PyLume: - pass - class VMRunOpts: - pass - class VMUpdateOpts: - pass - class ImageRef: - pass - class SharedDirectory: - pass - class VMStatus: - pass +from typing import Dict, Any, Optional, List, Tuple from ..base import BaseVMProvider, VMProviderType +from ...logger import Logger, LogLevel +from ..lume_api import ( + lume_api_get, + lume_api_run, + lume_api_stop, + lume_api_update, + HAS_CURL, + parse_memory +) +# Setup logging logger = logging.getLogger(__name__) class LumeProvider(BaseVMProvider): - """Lume VM provider implementation using pylume.""" + """Lume VM provider implementation using direct curl commands. + + This provider uses curl to interact with the Lume API server, + removing the dependency on the pylume Python package. + """ def __init__( self, port: Optional[int] = None, host: str = "localhost", bin_path: Optional[str] = None, - storage_path: Optional[str] = None, + storage: Optional[str] = None, verbose: bool = False, - **kwargs ): """Initialize the Lume provider. Args: - port: Optional port to use for the PyLume server - host: Host to use for PyLume connections - bin_path: Optional path to the Lume binary - storage_path: Optional path to store VM data + port: Port for the Lume API server (default: 3000) + host: Host to use for API connections (default: localhost) + bin_path: Optional path to the Lume binary (not used directly) + storage: Path to store VM data verbose: Enable verbose logging """ - if not HAS_PYLUME: + if not HAS_CURL: raise ImportError( - "The pylume package is required for LumeProvider. " - "Please install it with 'pip install cua-computer[lume]'" + "curl is required for LumeProvider. " + "Please ensure it is installed and in your PATH." ) - # PyLume doesn't accept bin_path or storage_path parameters - # Convert verbose to debug parameter for PyLume - self._pylume = PyLume( - port=port, - host=host, - debug=verbose, - **kwargs - ) - # Store these for reference, even though PyLume doesn't use them directly - self._bin_path = bin_path - self._storage_path = storage_path - self._context = None + self.host = host + self.port = port or 3000 # Default port for Lume API + self.storage = storage + self.bin_path = bin_path + self.verbose = verbose + + # Base API URL for Lume API calls + self.api_base_url = f"http://{self.host}:{self.port}" @property def provider_type(self) -> VMProviderType: @@ -76,54 +73,225 @@ class LumeProvider(BaseVMProvider): async def __aenter__(self): """Enter async context manager.""" - self._context = await self._pylume.__aenter__() + # No initialization needed, just return self return self async def __aexit__(self, exc_type, exc_val, exc_tb): """Exit async context manager.""" - if self._context: - await self._pylume.__aexit__(exc_type, exc_val, exc_tb) - self._context = None + # No cleanup needed + pass - async def get_vm(self, name: str) -> VMStatus: - """Get VM information by name.""" - # PyLume get_vm returns a VMStatus object, not a dictionary - return await self._pylume.get_vm(name) + def _lume_api_get(self, vm_name: str = "", storage: Optional[str] = None, debug: bool = False) -> Dict[str, Any]: + """Get VM information using shared lume_api function. + + Args: + vm_name: Optional name of the VM to get info for. + If empty, lists all VMs. + storage: Optional storage path override. If provided, this will be used instead of self.storage + debug: Whether to show debug output + + Returns: + Dictionary with VM status information parsed from JSON response + """ + # Use the shared implementation from lume_api module + return lume_api_get( + vm_name=vm_name, + host=self.host, + port=self.port, + storage=storage if storage is not None else self.storage, + debug=debug, + verbose=self.verbose + ) + + def _lume_api_run(self, vm_name: str, run_opts: Dict[str, Any], debug: bool = False) -> Dict[str, Any]: + """Run a VM using shared lume_api function. + + Args: + vm_name: Name of the VM to run + run_opts: Dictionary of run options + debug: Whether to show debug output + + Returns: + Dictionary with API response or error information + """ + # Use the shared implementation from lume_api module + return lume_api_run( + vm_name=vm_name, + host=self.host, + port=self.port, + run_opts=run_opts, + storage=self.storage, + debug=debug, + verbose=self.verbose + ) + + def _lume_api_stop(self, vm_name: str, debug: bool = False) -> Dict[str, Any]: + """Stop a VM using shared lume_api function. + + Args: + vm_name: Name of the VM to stop + debug: Whether to show debug output + + Returns: + Dictionary with API response or error information + """ + # Use the shared implementation from lume_api module + return lume_api_stop( + vm_name=vm_name, + host=self.host, + port=self.port, + storage=self.storage, + debug=debug, + verbose=self.verbose + ) + + def _lume_api_update(self, vm_name: str, update_opts: Dict[str, Any], debug: bool = False) -> Dict[str, Any]: + """Update VM configuration using shared lume_api function. + + Args: + vm_name: Name of the VM to update + update_opts: Dictionary of update options + debug: Whether to show debug output + + Returns: + Dictionary with API response or error information + """ + # Use the shared implementation from lume_api module + return lume_api_update( + vm_name=vm_name, + host=self.host, + port=self.port, + update_opts=update_opts, + storage=self.storage, + debug=debug, + verbose=self.verbose + ) + + async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: + """Get VM information by name. + + Args: + name: Name of the VM to get information for + storage: Optional storage path override. If provided, this will be used + instead of the provider's default storage path. + + Returns: + Dictionary with VM information including status, IP address, etc. + + Note: + If storage is not provided, the provider's default storage path will be used. + The storage parameter allows overriding the storage location for this specific call. + """ + if not HAS_CURL: + logger.error("curl is not available. Cannot get VM status.") + return { + "name": name, + "status": "unavailable", + "error": "curl is not available" + } + + # First try to get detailed VM info from the API + try: + # Query the Lume API for VM status using the provider's storage_path + vm_info = self._lume_api_get( + vm_name=name, + storage=storage if storage is not None else self.storage, + debug=self.verbose + ) + + # Check for API errors + if "error" in vm_info: + logger.debug(f"API request error: {vm_info['error']}") + # If we got an error from the API, report the VM as not ready yet + return { + "name": name, + "status": "starting", # VM is still starting - do not attempt to connect yet + "api_status": "error", + "error": vm_info["error"] + } + + # Process the VM status information + vm_status = vm_info.get("status", "unknown") + + # Check if VM is stopped or not running - don't wait for IP in this case + if vm_status == "stopped": + logger.info(f"VM {name} is in '{vm_status}' state - not waiting for IP address") + # Return the status as-is without waiting for an IP + result = { + "name": name, + "status": vm_status, + **vm_info # Include all original fields from the API response + } + return result + + # Handle field name differences between APIs + # Some APIs use camelCase, others use snake_case + if "vncUrl" in vm_info: + vnc_url = vm_info["vncUrl"] + elif "vnc_url" in vm_info: + vnc_url = vm_info["vnc_url"] + else: + vnc_url = "" + + if "ipAddress" in vm_info: + ip_address = vm_info["ipAddress"] + elif "ip_address" in vm_info: + ip_address = vm_info["ip_address"] + else: + # If no IP address is provided and VM is supposed to be running, + # report it as still starting + ip_address = None + logger.info(f"VM {name} is in '{vm_status}' state but no IP address found - reporting as still starting") + + logger.info(f"VM {name} status: {vm_status}") + + # Return the complete status information + result = { + "name": name, + "status": vm_status if vm_status else "running", + "ip_address": ip_address, + "vnc_url": vnc_url, + "api_status": "ok" + } + + # Include all original fields from the API response + if isinstance(vm_info, dict): + for key, value in vm_info.items(): + if key not in result: # Don't override our carefully processed fields + result[key] = value + + return result + + except Exception as e: + logger.error(f"Failed to get VM status: {e}") + # Return a fallback status that indicates the VM is not ready yet + return { + "name": name, + "status": "initializing", # VM is still initializing + "error": f"Failed to get VM status: {str(e)}" + } async def list_vms(self) -> List[Dict[str, Any]]: """List all available VMs.""" - return await self._pylume.list_vms() + result = self._lume_api_get(debug=self.verbose) - async def run_vm(self, name: str, run_opts: Dict[str, Any]) -> Dict[str, Any]: + # Extract the VMs list from the response + if "vms" in result and isinstance(result["vms"], list): + return result["vms"] + elif "error" in result: + logger.error(f"Error listing VMs: {result['error']}") + return [] + else: + return [] + + async def run_vm(self, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]: """Run a VM with the given options.""" - # Convert dict to VMRunOpts if needed - if isinstance(run_opts, dict): - run_opts = VMRunOpts(**run_opts) - return await self._pylume.run_vm(name, run_opts) + return self._lume_api_run(name, run_opts, debug=self.verbose) - async def stop_vm(self, name: str) -> Dict[str, Any]: + async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: """Stop a running VM.""" - return await self._pylume.stop_vm(name) + return self._lume_api_stop(name, debug=self.verbose) - async def update_vm(self, name: str, update_opts: Dict[str, Any]) -> Dict[str, Any]: + async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]: """Update VM configuration.""" - # Convert dict to VMUpdateOpts if needed - if isinstance(update_opts, dict): - update_opts = VMUpdateOpts(**update_opts) - return await self._pylume.update_vm(name, update_opts) - - # Pylume-specific helper methods - def get_pylume_instance(self) -> PyLume: - """Get the underlying PyLume instance.""" - return self._pylume - - # Helper methods for converting between PyLume and generic types - @staticmethod - def create_vm_run_opts(**kwargs) -> VMRunOpts: - """Create VMRunOpts from kwargs.""" - return VMRunOpts(**kwargs) - - @staticmethod - def create_vm_update_opts(**kwargs) -> VMUpdateOpts: - """Create VMUpdateOpts from kwargs.""" - return VMUpdateOpts(**kwargs) + return self._lume_api_update(name, update_opts, debug=self.verbose) diff --git a/libs/computer/computer/providers/lume_api.py b/libs/computer/computer/providers/lume_api.py new file mode 100644 index 00000000..5010299b --- /dev/null +++ b/libs/computer/computer/providers/lume_api.py @@ -0,0 +1,380 @@ +"""Shared API utilities for Lume and Lumier providers. + +This module contains shared functions for interacting with the Lume API, +used by both the LumeProvider and LumierProvider classes. +""" + +import logging +import json +import subprocess +import urllib.parse +from typing import Dict, List, Optional, Any + +# Setup logging +logger = logging.getLogger(__name__) + +# Check if curl is available +try: + subprocess.run(["curl", "--version"], capture_output=True, check=True) + HAS_CURL = True +except (subprocess.SubprocessError, FileNotFoundError): + HAS_CURL = False + + +def lume_api_get( + vm_name: str, + host: str, + port: int, + storage: Optional[str] = None, + debug: bool = False, + verbose: bool = False +) -> Dict[str, Any]: + """Use curl to get VM information from Lume API. + + Args: + vm_name: Name of the VM to get info for + host: API host + port: API port + storage: Storage path for the VM + debug: Whether to show debug output + verbose: Enable verbose logging + + Returns: + Dictionary with VM status information parsed from JSON response + """ + # URL encode the storage parameter for the query + encoded_storage = "" + storage_param = "" + + if storage: + # First encode the storage path properly + encoded_storage = urllib.parse.quote(storage, safe='') + storage_param = f"?storage={encoded_storage}" + + # Construct API URL with encoded storage parameter if needed + api_url = f"http://{host}:{port}/lume/vms/{vm_name}{storage_param}" + + # Construct the curl command with increased timeouts for more reliability + # --connect-timeout: Time to establish connection (15 seconds) + # --max-time: Maximum time for the whole operation (20 seconds) + # -f: Fail silently (no output at all) on server errors + # Add single quotes around URL to ensure special characters are handled correctly + cmd = ["curl", "--connect-timeout", "15", "--max-time", "20", "-s", "-f", f"'{api_url}'"] + + # For logging and display, show the properly escaped URL + display_cmd = ["curl", "--connect-timeout", "15", "--max-time", "20", "-s", "-f", api_url] + + # Only print the curl command when debug is enabled + display_curl_string = ' '.join(display_cmd) + if debug or verbose: + print(f"DEBUG: Executing curl API call: {display_curl_string}") + logger.debug(f"Executing API request: {display_curl_string}") + + # Execute the command - for execution we need to use shell=True to handle URLs with special characters + try: + # Use a single string with shell=True for proper URL handling + shell_cmd = ' '.join(cmd) + result = subprocess.run(shell_cmd, shell=True, capture_output=True, text=True) + + # Handle curl exit codes + if result.returncode != 0: + curl_error = "Unknown error" + + # Map common curl error codes to helpful messages + if result.returncode == 7: + curl_error = "Failed to connect to the API server - it might still be starting up" + elif result.returncode == 22: + curl_error = "HTTP error returned from API server" + elif result.returncode == 28: + curl_error = "Operation timeout - the API server is taking too long to respond" + elif result.returncode == 52: + curl_error = "Empty reply from server - the API server is starting but not fully ready yet" + elif result.returncode == 56: + curl_error = "Network problem during data transfer - check container networking" + + # Only log at debug level to reduce noise during retries + logger.debug(f"API request failed with code {result.returncode}: {curl_error}") + + # Return a more useful error message + return { + "error": f"API request failed: {curl_error}", + "curl_code": result.returncode, + "vm_name": vm_name, + "status": "unknown" # We don't know the actual status due to API error + } + + # Try to parse the response as JSON + if result.stdout and result.stdout.strip(): + try: + vm_status = json.loads(result.stdout) + if debug or verbose: + logger.info(f"Successfully parsed VM status: {vm_status.get('status', 'unknown')}") + return vm_status + except json.JSONDecodeError as e: + # Return the raw response if it's not valid JSON + logger.warning(f"Invalid JSON response: {e}") + if "Virtual machine not found" in result.stdout: + return {"status": "not_found", "message": "VM not found in Lume API"} + + return {"error": f"Invalid JSON response: {result.stdout[:100]}...", "status": "unknown"} + else: + return {"error": "Empty response from API", "status": "unknown"} + except subprocess.SubprocessError as e: + logger.error(f"Failed to execute API request: {e}") + return {"error": f"Failed to execute API request: {str(e)}", "status": "unknown"} + + +def lume_api_run( + vm_name: str, + host: str, + port: int, + run_opts: Dict[str, Any], + storage: Optional[str] = None, + debug: bool = False, + verbose: bool = False +) -> Dict[str, Any]: + """Run a VM using curl. + + Args: + vm_name: Name of the VM to run + host: API host + port: API port + run_opts: Dictionary of run options + storage: Storage path for the VM + debug: Whether to show debug output + verbose: Enable verbose logging + + Returns: + Dictionary with API response or error information + """ + # Construct API URL + api_url = f"http://{host}:{port}/lume/vms/{vm_name}/run" + + # Prepare JSON payload with required parameters + payload = {} + + # Add CPU cores if specified + if "cpu" in run_opts: + payload["cpu"] = run_opts["cpu"] + + # Add memory if specified + if "memory" in run_opts: + payload["memory"] = run_opts["memory"] + + # Add storage parameter if specified + if storage: + payload["storage"] = storage + elif "storage" in run_opts: + payload["storage"] = run_opts["storage"] + + # Add shared directories if specified + if "shared_directories" in run_opts: + payload["sharedDirectories"] = run_opts["shared_directories"] + + # Construct the curl command + cmd = [ + "curl", "--connect-timeout", "30", "--max-time", "30", + "-s", "-X", "POST", "-H", "Content-Type: application/json", + "-d", json.dumps(payload), + api_url + ] + + # Always print the command for debugging + if debug or verbose: + print(f"DEBUG: Executing curl run API call: {' '.join(cmd)}") + print(f"Run payload: {json.dumps(payload, indent=2)}") + + # Execute the command + try: + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode != 0: + logger.warning(f"API request failed with code {result.returncode}: {result.stderr}") + return {"error": f"API request failed: {result.stderr}"} + + # Try to parse the response as JSON + if result.stdout and result.stdout.strip(): + try: + response = json.loads(result.stdout) + return response + except json.JSONDecodeError: + # Return the raw response if it's not valid JSON + return {"success": True, "message": "VM started successfully", "raw_response": result.stdout} + else: + return {"success": True, "message": "VM started successfully"} + except subprocess.SubprocessError as e: + logger.error(f"Failed to execute run request: {e}") + return {"error": f"Failed to execute run request: {str(e)}"} + + +def lume_api_stop( + vm_name: str, + host: str, + port: int, + storage: Optional[str] = None, + debug: bool = False, + verbose: bool = False +) -> Dict[str, Any]: + """Stop a VM using curl. + + Args: + vm_name: Name of the VM to stop + host: API host + port: API port + storage: Storage path for the VM + debug: Whether to show debug output + verbose: Enable verbose logging + + Returns: + Dictionary with API response or error information + """ + # Construct API URL + api_url = f"http://{host}:{port}/lume/vms/{vm_name}/stop" + + # Prepare JSON payload with required parameters + payload = {} + + # Add storage path if specified + if storage: + payload["storage"] = storage + + # Construct the curl command + cmd = [ + "curl", "--connect-timeout", "15", "--max-time", "20", + "-s", "-X", "POST", "-H", "Content-Type: application/json", + "-d", json.dumps(payload), + api_url + ] + + # Execute the command + try: + if debug or verbose: + logger.info(f"Executing: {' '.join(cmd)}") + + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode != 0: + logger.warning(f"API request failed with code {result.returncode}: {result.stderr}") + return {"error": f"API request failed: {result.stderr}"} + + # Try to parse the response as JSON + if result.stdout and result.stdout.strip(): + try: + response = json.loads(result.stdout) + return response + except json.JSONDecodeError: + # Return the raw response if it's not valid JSON + return {"success": True, "message": "VM stopped successfully", "raw_response": result.stdout} + else: + return {"success": True, "message": "VM stopped successfully"} + except subprocess.SubprocessError as e: + logger.error(f"Failed to execute stop request: {e}") + return {"error": f"Failed to execute stop request: {str(e)}"} + + +def lume_api_update( + vm_name: str, + host: str, + port: int, + update_opts: Dict[str, Any], + storage: Optional[str] = None, + debug: bool = False, + verbose: bool = False +) -> Dict[str, Any]: + """Update VM settings using curl. + + Args: + vm_name: Name of the VM to update + host: API host + port: API port + update_opts: Dictionary of update options + storage: Storage path for the VM + debug: Whether to show debug output + verbose: Enable verbose logging + + Returns: + Dictionary with API response or error information + """ + # Construct API URL + api_url = f"http://{host}:{port}/lume/vms/{vm_name}/update" + + # Prepare JSON payload with required parameters + payload = {} + + # Add CPU cores if specified + if "cpu" in update_opts: + payload["cpu"] = update_opts["cpu"] + + # Add memory if specified + if "memory" in update_opts: + payload["memory"] = update_opts["memory"] + + # Add storage path if specified + if storage: + payload["storage"] = storage + + # Construct the curl command + cmd = [ + "curl", "--connect-timeout", "15", "--max-time", "20", + "-s", "-X", "POST", "-H", "Content-Type: application/json", + "-d", json.dumps(payload), + api_url + ] + + # Execute the command + try: + if debug: + logger.info(f"Executing: {' '.join(cmd)}") + + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode != 0: + logger.warning(f"API request failed with code {result.returncode}: {result.stderr}") + return {"error": f"API request failed: {result.stderr}"} + + # Try to parse the response as JSON + if result.stdout and result.stdout.strip(): + try: + response = json.loads(result.stdout) + return response + except json.JSONDecodeError: + # Return the raw response if it's not valid JSON + return {"success": True, "message": "VM updated successfully", "raw_response": result.stdout} + else: + return {"success": True, "message": "VM updated successfully"} + except subprocess.SubprocessError as e: + logger.error(f"Failed to execute update request: {e}") + return {"error": f"Failed to execute update request: {str(e)}"} + + +def parse_memory(memory_str: str) -> int: + """Parse memory string to MB integer. + + Examples: + "8GB" -> 8192 + "1024MB" -> 1024 + "512" -> 512 + + Returns: + Memory value in MB + """ + if isinstance(memory_str, int): + return memory_str + + if isinstance(memory_str, str): + # Extract number and unit + import re + match = re.match(r"(\d+)([A-Za-z]*)", memory_str) + if match: + value, unit = match.groups() + value = int(value) + unit = unit.upper() + + if unit == "GB" or unit == "G": + return value * 1024 + elif unit == "MB" or unit == "M" or unit == "": + return value + + # Default fallback + logger.warning(f"Could not parse memory string '{memory_str}', using 8GB default") + return 8192 # Default to 8GB diff --git a/libs/computer/computer/providers/lumier/__init__.py b/libs/computer/computer/providers/lumier/__init__.py new file mode 100644 index 00000000..32a3954b --- /dev/null +++ b/libs/computer/computer/providers/lumier/__init__.py @@ -0,0 +1,8 @@ +"""Lumier VM provider implementation.""" + +try: + # Use the same import approach as in the Lume provider + from .provider import LumierProvider + HAS_LUMIER = True +except ImportError: + HAS_LUMIER = False diff --git a/libs/computer/computer/providers/lumier/provider.py b/libs/computer/computer/providers/lumier/provider.py new file mode 100644 index 00000000..1ddf6250 --- /dev/null +++ b/libs/computer/computer/providers/lumier/provider.py @@ -0,0 +1,724 @@ +""" +Lumier VM provider implementation. + +This provider uses Docker containers running the Lumier image to create +macOS and Linux VMs. It handles VM lifecycle operations through Docker +commands and container management. +""" + +import logging +import os +import json +import asyncio +from typing import Dict, List, Optional, Any +import subprocess +import time +import re + +from ..base import BaseVMProvider, VMProviderType +from ..lume_api import ( + lume_api_get, + lume_api_run, + lume_api_stop, + lume_api_update +) + +# Setup logging +logger = logging.getLogger(__name__) + +# Check if Docker is available +try: + subprocess.run(["docker", "--version"], capture_output=True, check=True) + HAS_LUMIER = True +except (subprocess.SubprocessError, FileNotFoundError): + HAS_LUMIER = False + + +class LumierProvider(BaseVMProvider): + """ + Lumier VM Provider implementation using Docker containers. + + This provider uses Docker to run Lumier containers that can create + macOS and Linux VMs through containerization. + """ + + def __init__( + self, + port: Optional[int] = 3000, + host: str = "localhost", + storage: Optional[str] = None, + shared_path: Optional[str] = None, + image: str = "macos-sequoia-cua:latest", # VM image to use + verbose: bool = False, + ephemeral: bool = False, + noVNC_port: Optional[int] = 8006, + ): + """Initialize the Lumier VM Provider. + + Args: + port: Port for the API server (default: 3000) + host: Hostname for the API server (default: localhost) + storage: Path for persistent VM storage + shared_path: Path for shared folder between host and VM + image: VM image to use (e.g. "macos-sequoia-cua:latest") + verbose: Enable verbose logging + ephemeral: Use ephemeral (temporary) storage + noVNC_port: Specific port for noVNC interface (default: 8006) + """ + self.host = host + # Always ensure api_port has a valid value (3000 is the default) + self.api_port = 3000 if port is None else port + self.vnc_port = noVNC_port # User-specified noVNC port, will be set in run_vm if provided + self.ephemeral = ephemeral + + # Handle ephemeral storage (temporary directory) + if ephemeral: + self.storage = "ephemeral" + else: + self.storage = storage + + self.shared_path = shared_path + self.vm_image = image # Store the VM image name to use + # The container_name will be set in run_vm using the VM name + self.verbose = verbose + self._container_id = None + self._api_url = None # Will be set after container starts + + @property + def provider_type(self) -> VMProviderType: + """Return the provider type.""" + return VMProviderType.LUMIER + + def _parse_memory(self, memory_str: str) -> int: + """Parse memory string to MB integer. + + Examples: + "8GB" -> 8192 + "1024MB" -> 1024 + "512" -> 512 + """ + if isinstance(memory_str, int): + return memory_str + + if isinstance(memory_str, str): + # Extract number and unit + match = re.match(r"(\d+)([A-Za-z]*)", memory_str) + if match: + value, unit = match.groups() + value = int(value) + unit = unit.upper() + + if unit == "GB" or unit == "G": + return value * 1024 + elif unit == "MB" or unit == "M" or unit == "": + return value + + # Default fallback + logger.warning(f"Could not parse memory string '{memory_str}', using 8GB default") + return 8192 # Default to 8GB + + # Helper methods for interacting with the Lumier API through curl + # These methods handle the various VM operations via API calls + + def _get_curl_error_message(self, return_code: int) -> str: + """Get a descriptive error message for curl return codes. + + Args: + return_code: The curl return code + + Returns: + A descriptive error message + """ + # Map common curl error codes to helpful messages + if return_code == 7: + return "Failed to connect - API server is starting up" + elif return_code == 22: + return "HTTP error returned from API server" + elif return_code == 28: + return "Operation timeout - API server is slow to respond" + elif return_code == 52: + return "Empty reply from server - API is starting but not ready" + elif return_code == 56: + return "Network problem during data transfer" + else: + return f"Unknown curl error code: {return_code}" + + + async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: + """Get VM information by name. + + Args: + name: Name of the VM to get information for + storage: Optional storage path override. If provided, this will be used + instead of the provider's default storage path. + + Returns: + Dictionary with VM information including status, IP address, etc. + """ + if not HAS_LUMIER: + logger.error("Docker is not available. Cannot get VM status.") + return { + "name": name, + "status": "unavailable", + "error": "Docker is not available" + } + + # Store the current name for API requests + self.container_name = name + + try: + # Check if the container exists and is running + check_cmd = ["docker", "ps", "-a", "--filter", f"name={name}", "--format", "{{.Status}}"] + check_result = subprocess.run(check_cmd, capture_output=True, text=True) + container_status = check_result.stdout.strip() + + if not container_status: + logger.info(f"Container {name} does not exist. Will create when run_vm is called.") + return { + "name": name, + "status": "not_found", + "message": "Container doesn't exist yet" + } + + # Container exists, check if it's running + is_running = container_status.startswith("Up") + + if not is_running: + logger.info(f"Container {name} exists but is not running. Status: {container_status}") + return { + "name": name, + "status": "stopped", + "container_status": container_status, + } + + # Container is running, get the IP address and API status from Lumier API + logger.info(f"Container {name} is running. Getting VM status from API.") + + # Use the shared lume_api_get function directly + vm_info = lume_api_get( + vm_name=name, + host=self.host, + port=self.api_port, + storage=storage if storage is not None else self.storage, + debug=self.verbose, + verbose=self.verbose + ) + + # Check for API errors + if "error" in vm_info: + # Use debug level instead of warning to reduce log noise during polling + logger.debug(f"API request error: {vm_info['error']}") + return { + "name": name, + "status": "running", # Container is running even if API is not responsive + "api_status": "error", + "error": vm_info["error"], + "container_status": container_status + } + + # Process the VM status information + vm_status = vm_info.get("status", "unknown") + vnc_url = vm_info.get("vncUrl", "") + ip_address = vm_info.get("ipAddress", "") + + # IMPORTANT: Always ensure we have a valid IP address for connectivity + # If the API doesn't return an IP address, default to localhost (127.0.0.1) + # This makes the behavior consistent with LumeProvider + if not ip_address and vm_status == "running": + ip_address = "127.0.0.1" + logger.info(f"No IP address returned from API, defaulting to {ip_address}") + vm_info["ipAddress"] = ip_address + + logger.info(f"VM {name} status: {vm_status}") + + if ip_address and vnc_url: + logger.info(f"VM {name} has IP: {ip_address} and VNC URL: {vnc_url}") + elif not ip_address and not vnc_url and vm_status != "running": + # Not running is expected in this case + logger.info(f"VM {name} is not running yet. Status: {vm_status}") + else: + # Missing IP or VNC but status is running - this is unusual but handled with default IP + logger.warning(f"VM {name} is running but missing expected fields. API response: {vm_info}") + + # Return the full status information + return { + "name": name, + "status": vm_status, + "ip_address": ip_address, + "vnc_url": vnc_url, + "api_status": "ok", + "container_status": container_status, + **vm_info # Include all fields from the API response + } + except subprocess.SubprocessError as e: + logger.error(f"Failed to check container status: {e}") + return { + "name": name, + "status": "error", + "error": f"Failed to check container status: {str(e)}" + } + + async def list_vms(self) -> List[Dict[str, Any]]: + """List all VMs managed by this provider. + + For Lumier provider, there is only one VM per container. + """ + try: + status = await self.get_vm("default") + return [status] if status.get("status") != "unknown" else [] + except Exception as e: + logger.error(f"Failed to list VMs: {e}") + return [] + + async def run_vm(self, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]: + """Run a VM with the given options. + + Args: + name: Name of the VM to run (used for the container name and Docker image tag) + run_opts: Options for running the VM, including: + - cpu: Number of CPU cores + - memory: Amount of memory (e.g. "8GB") + - noVNC_port: Specific port for noVNC interface + + Returns: + Dictionary with VM status information + """ + # Set the container name using the VM name for consistency + self.container_name = name or "lumier1-vm" + try: + # First, check if container already exists and remove it + try: + check_cmd = ["docker", "ps", "-a", "--filter", f"name={self.container_name}", "--format", "{{.ID}}"] + check_result = subprocess.run(check_cmd, capture_output=True, text=True) + existing_container = check_result.stdout.strip() + + if existing_container: + logger.info(f"Removing existing container: {self.container_name}") + remove_cmd = ["docker", "rm", "-f", self.container_name] + subprocess.run(remove_cmd, check=True) + except subprocess.CalledProcessError as e: + logger.warning(f"Error removing existing container: {e}") + # Continue anyway, next steps will fail if there's a real problem + + # Prepare the Docker run command + cmd = ["docker", "run", "-d", "--name", self.container_name] + + cmd.extend(["-p", f"{self.vnc_port}:8006"]) + print(f"Using specified noVNC_port: {self.vnc_port}") + + # Set API URL using the API port + self._api_url = f"http://{self.host}:{self.api_port}" + + # Parse memory setting + memory_mb = self._parse_memory(run_opts.get("memory", "8GB")) + + # Add storage volume mount if storage is specified (for persistent VM storage) + if self.storage and self.storage != "ephemeral": + # Create storage directory if it doesn't exist + storage_dir = os.path.abspath(os.path.expanduser(self.storage or "")) + os.makedirs(storage_dir, exist_ok=True) + + # Add volume mount for storage + cmd.extend([ + "-v", f"{storage_dir}:/storage", + "-e", f"HOST_STORAGE_PATH={storage_dir}" + ]) + print(f"Using persistent storage at: {storage_dir}") + + # Add shared folder volume mount if shared_path is specified + if self.shared_path: + # Create shared directory if it doesn't exist + shared_dir = os.path.abspath(os.path.expanduser(self.shared_path or "")) + os.makedirs(shared_dir, exist_ok=True) + + # Add volume mount for shared folder + cmd.extend([ + "-v", f"{shared_dir}:/shared", + "-e", f"HOST_SHARED_PATH={shared_dir}" + ]) + print(f"Using shared folder at: {shared_dir}") + + # Add environment variables + # Always use the container_name as the VM_NAME for consistency + # Use the VM image passed from the Computer class + print(f"Using VM image: {self.vm_image}") + + cmd.extend([ + "-e", f"VM_NAME={self.container_name}", + "-e", f"VERSION=ghcr.io/trycua/{self.vm_image}", + "-e", f"CPU_CORES={run_opts.get('cpu', '4')}", + "-e", f"RAM_SIZE={memory_mb}", + ]) + + # Specify the Lumier image with the full image name + lumier_image = "trycua/lumier:latest" + + # First check if the image exists locally + try: + print(f"Checking if Docker image {lumier_image} exists locally...") + check_image_cmd = ["docker", "image", "inspect", lumier_image] + subprocess.run(check_image_cmd, capture_output=True, check=True) + print(f"Docker image {lumier_image} found locally.") + except subprocess.CalledProcessError: + # Image doesn't exist locally + print(f"\nWARNING: Docker image {lumier_image} not found locally.") + print("The system will attempt to pull it from Docker Hub, which may fail if you have network connectivity issues.") + print("If the Docker pull fails, you may need to manually pull the image first with:") + print(f" docker pull {lumier_image}\n") + + # Add the image to the command + cmd.append(lumier_image) + + # Print the Docker command for debugging + print(f"DOCKER COMMAND: {' '.join(cmd)}") + + # Run the container with improved error handling + try: + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + except subprocess.CalledProcessError as e: + if "no route to host" in str(e.stderr).lower() or "failed to resolve reference" in str(e.stderr).lower(): + error_msg = (f"Network error while trying to pull Docker image '{lumier_image}'\n" + f"Error: {e.stderr}\n\n" + f"SOLUTION: Please try one of the following:\n" + f"1. Check your internet connection\n" + f"2. Pull the image manually with: docker pull {lumier_image}\n" + f"3. Check if Docker is running properly\n") + logger.error(error_msg) + raise RuntimeError(error_msg) + raise + + # Container started, now check VM status with polling + print("Container started, checking VM status...") + print("NOTE: This may take some time while the VM image is being pulled and initialized") + print("TIP: You can run 'lume logs -f' in another terminal to see the detailed initialization progress") + + # Skip waiting for container readiness and just poll get_vm directly + # Poll the get_vm method indefinitely until the VM is ready with an IP address + attempt = 0 + consecutive_errors = 0 + vm_running = False + + while True: # Wait indefinitely + try: + # Use longer delays to give the system time to initialize + if attempt > 0: + # Start with 5s delay, then increase gradually up to 30s for later attempts + # But use shorter delays while we're getting API errors + if consecutive_errors > 0 and consecutive_errors < 5: + wait_time = 3 # Use shorter delays when we're getting API errors + else: + wait_time = min(30, 5 + (attempt * 2)) + + print(f"Waiting {wait_time}s before retry #{attempt+1}...") + await asyncio.sleep(wait_time) + + # Try to get VM status + print(f"Checking VM status (attempt {attempt+1})...") + vm_status = await self.get_vm(name) + + # Check for API errors + if 'error' in vm_status: + consecutive_errors += 1 + error_msg = vm_status.get('error', 'Unknown error') + + # Only print a user-friendly status message, not the raw error + # since _lume_api_get already logged the technical details + if consecutive_errors == 1 or attempt % 5 == 0: + if 'Empty reply from server' in error_msg: + print("API server is starting up - container is running, but API isn't fully initialized yet.") + print("This is expected during the initial VM setup - will continue polling...") + else: + # Don't repeat the exact same error message each time + logger.debug(f"API request error (attempt {attempt+1}): {error_msg}") + # Just log that we're still working on it + if attempt > 3: + print("Still waiting for the API server to become available...") + + # If we're getting errors but container is running, that's normal during startup + if vm_status.get('status') == 'running': + if not vm_running: + print("Container is running, waiting for the VM within it to become fully ready...") + print("This might take a minute while the VM initializes...") + vm_running = True + + # Increase counter and continue + attempt += 1 + continue + + # Reset consecutive error counter when we get a successful response + consecutive_errors = 0 + + # If the VM is running, check if it has an IP address (which means it's fully ready) + if vm_status.get('status') == 'running': + vm_running = True + + # Check if we have an IP address, which means the VM is fully ready + if 'ip_address' in vm_status and vm_status['ip_address']: + print(f"VM is now fully running with IP: {vm_status.get('ip_address')}") + if 'vnc_url' in vm_status and vm_status['vnc_url']: + print(f"VNC URL: {vm_status.get('vnc_url')}") + return vm_status + else: + print("VM is running but still initializing network interfaces...") + print("Waiting for IP address to be assigned...") + else: + # VM exists but might still be starting up + status = vm_status.get('status', 'unknown') + print(f"VM found but status is: {status}. Continuing to poll...") + + # Increase counter for next iteration's delay calculation + attempt += 1 + + # If we reach a very large number of attempts, give a reassuring message but continue + if attempt % 10 == 0: + print(f"Still waiting after {attempt} attempts. This might take several minutes for first-time setup.") + if not vm_running and attempt >= 20: + print("\nNOTE: First-time VM initialization can be slow as images are downloaded.") + print("If this continues for more than 10 minutes, you may want to check:") + print(" 1. Docker logs with: docker logs " + name) + print(" 2. If your network can access container registries") + print("Press Ctrl+C to abort if needed.\n") + + # After 150 attempts (likely over 30-40 minutes), return current status + if attempt >= 150: + print(f"Reached 150 polling attempts. VM status is: {vm_status.get('status', 'unknown')}") + print("Returning current VM status, but please check Docker logs if there are issues.") + return vm_status + + except Exception as e: + # Always continue retrying, but with increasing delays + logger.warning(f"Error checking VM status (attempt {attempt+1}): {e}. Will retry.") + consecutive_errors += 1 + + # If we've had too many consecutive errors, might be a deeper problem + if consecutive_errors >= 10: + print(f"\nWARNING: Encountered {consecutive_errors} consecutive errors while checking VM status.") + print("You may need to check the Docker container logs or restart the process.") + print(f"Error details: {str(e)}\n") + + # Increase attempt counter for next iteration + attempt += 1 + + # After many consecutive errors, add a delay to avoid hammering the system + if attempt > 5: + error_delay = min(30, 10 + attempt) + print(f"Multiple connection errors, waiting {error_delay}s before next attempt...") + await asyncio.sleep(error_delay) + + except subprocess.CalledProcessError as e: + error_msg = f"Failed to start Lumier container: {e.stderr if hasattr(e, 'stderr') else str(e)}" + logger.error(error_msg) + raise RuntimeError(error_msg) + + async def _wait_for_container_ready(self, container_name: str, timeout: int = 90) -> bool: + """Wait for the Lumier container to be fully ready with a valid API response. + + Args: + container_name: Name of the Docker container to check + timeout: Maximum time to wait in seconds (default: 90 seconds) + + Returns: + True if the container is running, even if API is not fully ready. + This allows operations to continue with appropriate fallbacks. + """ + start_time = time.time() + api_ready = False + container_running = False + + print(f"Waiting for container {container_name} to be ready (timeout: {timeout}s)...") + + while time.time() - start_time < timeout: + # Check if container is running + try: + check_cmd = ["docker", "ps", "--filter", f"name={container_name}", "--format", "{{.Status}}"] + result = subprocess.run(check_cmd, capture_output=True, text=True, check=True) + container_status = result.stdout.strip() + + if container_status and container_status.startswith("Up"): + container_running = True + print(f"Container {container_name} is running") + logger.info(f"Container {container_name} is running with status: {container_status}") + else: + logger.warning(f"Container {container_name} not yet running, status: {container_status}") + # container is not running yet, wait and try again + await asyncio.sleep(2) # Longer sleep to give Docker time + continue + except subprocess.CalledProcessError as e: + logger.warning(f"Error checking container status: {e}") + await asyncio.sleep(2) + continue + + # Container is running, check if API is responsive + try: + # First check the health endpoint + api_url = f"http://{self.host}:{self.api_port}/health" + logger.info(f"Checking API health at: {api_url}") + + # Use longer timeout for API health check since it may still be initializing + curl_cmd = ["curl", "-s", "--connect-timeout", "5", "--max-time", "10", api_url] + result = subprocess.run(curl_cmd, capture_output=True, text=True) + + if result.returncode == 0 and "ok" in result.stdout.lower(): + api_ready = True + print(f"API is ready at {api_url}") + logger.info(f"API is ready at {api_url}") + break + else: + # API health check failed, now let's check if the VM status endpoint is responsive + # This covers cases where the health endpoint isn't implemented but the VM API is working + vm_api_url = f"http://{self.host}:{self.api_port}/lume/vms/{container_name}" + if self.storage: + import urllib.parse + encoded_storage = urllib.parse.quote_plus(self.storage) + vm_api_url += f"?storage={encoded_storage}" + + curl_vm_cmd = ["curl", "-s", "--connect-timeout", "5", "--max-time", "10", vm_api_url] + vm_result = subprocess.run(curl_vm_cmd, capture_output=True, text=True) + + if vm_result.returncode == 0 and vm_result.stdout.strip(): + # VM API responded with something - consider the API ready + api_ready = True + print(f"VM API is ready at {vm_api_url}") + logger.info(f"VM API is ready at {vm_api_url}") + break + else: + curl_code = result.returncode + if curl_code == 0: + curl_code = vm_result.returncode + + # Map common curl error codes to helpful messages + if curl_code == 7: + curl_error = "Failed to connect - API server is starting up" + elif curl_code == 22: + curl_error = "HTTP error returned from API server" + elif curl_code == 28: + curl_error = "Operation timeout - API server is slow to respond" + elif curl_code == 52: + curl_error = "Empty reply from server - API is starting but not ready" + elif curl_code == 56: + curl_error = "Network problem during data transfer" + else: + curl_error = f"Unknown curl error code: {curl_code}" + + print(f"API not ready yet: {curl_error}") + logger.info(f"API not ready yet: {curl_error}") + except subprocess.SubprocessError as e: + logger.warning(f"Error checking API status: {e}") + + # If the container is running but API is not ready, that's OK - we'll just wait + # a bit longer before checking again, as the container may still be initializing + elapsed_seconds = time.time() - start_time + if int(elapsed_seconds) % 5 == 0: # Only print status every 5 seconds to reduce verbosity + print(f"Waiting for API to initialize... ({elapsed_seconds:.1f}s / {timeout}s)") + + await asyncio.sleep(3) # Longer sleep between API checks + + # Handle timeout - if the container is running but API is not ready, that's not + # necessarily an error - the API might just need more time to start up + if not container_running: + print(f"Timed out waiting for container {container_name} to start") + logger.warning(f"Timed out waiting for container {container_name} to start") + return False + + if not api_ready: + print(f"Container {container_name} is running, but API is not fully ready yet.") + print("Proceeding with operations. API will become available shortly.") + print("NOTE: You may see some 'API request failed' messages while the API initializes.") + logger.warning(f"Container {container_name} is running, but API is not fully ready yet.") + + # Return True if container is running, even if API isn't ready yet + # This allows VM operations to proceed, with appropriate retries for API calls + return container_running + + async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: + """Stop a running VM by stopping the Lumier container.""" + try: + # Use Docker commands to stop the container directly + if hasattr(self, '_container_id') and self._container_id: + logger.info(f"Stopping Lumier container: {self.container_name}") + cmd = ["docker", "stop", self.container_name] + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + logger.info(f"Container stopped: {result.stdout.strip()}") + + # Return minimal status info + return { + "name": name, + "status": "stopped", + "container_id": self._container_id, + } + else: + # Try to find the container by name + check_cmd = ["docker", "ps", "-a", "--filter", f"name={self.container_name}", "--format", "{{.ID}}"] + check_result = subprocess.run(check_cmd, capture_output=True, text=True) + container_id = check_result.stdout.strip() + + if container_id: + logger.info(f"Found container ID: {container_id}") + cmd = ["docker", "stop", self.container_name] + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + logger.info(f"Container stopped: {result.stdout.strip()}") + + return { + "name": name, + "status": "stopped", + "container_id": container_id, + } + else: + logger.warning(f"No container found with name {self.container_name}") + return { + "name": name, + "status": "unknown", + } + except subprocess.CalledProcessError as e: + error_msg = f"Failed to stop container: {e.stderr if hasattr(e, 'stderr') else str(e)}" + logger.error(error_msg) + raise RuntimeError(f"Failed to stop Lumier container: {error_msg}") + + # update_vm is not implemented as it's not needed for Lumier + # The BaseVMProvider requires it, so we provide a minimal implementation + async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]: + """Not implemented for Lumier provider.""" + logger.warning("update_vm is not implemented for Lumier provider") + return {"name": name, "status": "unchanged"} + + async def __aenter__(self): + """Async context manager entry. + + This method is called when entering an async context manager block. + Returns self to be used in the context. + """ + logger.debug("Entering LumierProvider context") + + # Initialize the API URL with the default value if not already set + # This ensures get_vm can work before run_vm is called + if not hasattr(self, '_api_url') or not self._api_url: + self._api_url = f"http://{self.host}:{self.api_port}" + logger.info(f"Initialized default Lumier API URL: {self._api_url}") + + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Async context manager exit. + + This method is called when exiting an async context manager block. + It handles proper cleanup of resources, including stopping any running containers. + """ + logger.debug(f"Exiting LumierProvider context, handling exceptions: {exc_type}") + try: + # If we have a container ID, we should stop it to clean up resources + if hasattr(self, '_container_id') and self._container_id: + logger.info(f"Stopping Lumier container on context exit: {self.container_name}") + try: + cmd = ["docker", "stop", self.container_name] + subprocess.run(cmd, capture_output=True, text=True, check=True) + logger.info(f"Container stopped during context exit: {self.container_name}") + except subprocess.CalledProcessError as e: + logger.warning(f"Failed to stop container during cleanup: {e.stderr}") + # Don't raise an exception here, we want to continue with cleanup + except Exception as e: + logger.error(f"Error during LumierProvider cleanup: {e}") + # We don't want to suppress the original exception if there was one + if exc_type is None: + raise + # Return False to indicate that any exception should propagate + return False diff --git a/libs/computer/computer/providers/qemu/provider.py b/libs/computer/computer/providers/qemu/provider.py index 22907eb4..cec21b00 100644 --- a/libs/computer/computer/providers/qemu/provider.py +++ b/libs/computer/computer/providers/qemu/provider.py @@ -18,17 +18,16 @@ class QEMUProvider(BaseVMProvider): def __init__( self, bin_path: Optional[str] = None, - storage_path: Optional[str] = None, + storage: Optional[str] = None, port: Optional[int] = None, host: str = "localhost", verbose: bool = False, - **kwargs ): """Initialize the QEMU provider. Args: bin_path: Optional path to the QEMU binary - storage_path: Optional path to store VM data + storage: Optional path to store VM data port: Optional port for management host: Host to use for connections verbose: Enable verbose logging @@ -36,7 +35,7 @@ class QEMUProvider(BaseVMProvider): self._context = None self._verbose = verbose self._bin_path = bin_path - self._storage_path = storage_path + self._storage = storage self._port = port self._host = host @@ -56,22 +55,31 @@ class QEMUProvider(BaseVMProvider): # In a real implementation, this would clean up QEMU resources self._context = None - async def get_vm(self, name: str) -> Dict[str, Any]: - """Get VM information by name.""" + async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: + """Get VM information by name. + + Args: + name: Name of the VM to get information for + storage: Optional storage path override. If provided, this will be used + instead of the provider's default storage path. + + Returns: + Dictionary with VM information including status, IP address, etc. + """ raise NotImplementedError("QEMU provider is not implemented yet") async def list_vms(self) -> List[Dict[str, Any]]: """List all available VMs.""" raise NotImplementedError("QEMU provider is not implemented yet") - async def run_vm(self, name: str, run_opts: Dict[str, Any]) -> Dict[str, Any]: + async def run_vm(self, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]: """Run a VM with the given options.""" raise NotImplementedError("QEMU provider is not implemented yet") - async def stop_vm(self, name: str) -> Dict[str, Any]: + async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]: """Stop a running VM.""" raise NotImplementedError("QEMU provider is not implemented yet") - async def update_vm(self, name: str, update_opts: Dict[str, Any]) -> Dict[str, Any]: + async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]: """Update VM configuration.""" raise NotImplementedError("QEMU provider is not implemented yet") diff --git a/libs/computer/pyproject.toml b/libs/computer/pyproject.toml index 67a377e9..7a2bb6c6 100644 --- a/libs/computer/pyproject.toml +++ b/libs/computer/pyproject.toml @@ -24,6 +24,9 @@ requires-python = ">=3.10" lume = [ "pylume>=0.1.8" ] +lumier = [ + # No additional Python packages required - uses Docker CLI directly +] ui = [ "gradio>=5.23.3,<6.0.0", "python-dotenv>=1.0.1,<2.0.0", diff --git a/libs/lume/src/Server/Handlers.swift b/libs/lume/src/Server/Handlers.swift index 9ac20732..0fe61e3b 100644 --- a/libs/lume/src/Server/Handlers.swift +++ b/libs/lume/src/Server/Handlers.swift @@ -12,16 +12,55 @@ extension Server { let vms = try vmController.list(storage: storage) return try .json(vms) } catch { + print( + "ERROR: Failed to list VMs: \(error.localizedDescription), storage=\(String(describing: storage))" + ) return .badRequest(message: error.localizedDescription) } } func handleGetVM(name: String, storage: String? = nil) async throws -> HTTPResponse { + print("Getting VM details: name=\(name), storage=\(String(describing: storage))") + do { let vmController = LumeController() + print("Created VM controller, attempting to get VM") let vm = try vmController.get(name: name, storage: storage) - return try .json(vm.details) + print("Successfully retrieved VM") + + // Check for nil values that might cause crashes + if vm.vmDirContext.config.macAddress == nil { + print("ERROR: VM has nil macAddress") + return .badRequest(message: "VM configuration is invalid (nil macAddress)") + } + print("MacAddress check passed") + + // Log that we're about to access details + print("Preparing VM details response") + + // Print the full details object for debugging + let details = vm.details + print("VM DETAILS: \(details)") + print(" name: \(details.name)") + print(" os: \(details.os)") + print(" cpuCount: \(details.cpuCount)") + print(" memorySize: \(details.memorySize)") + print(" diskSize: \(details.diskSize)") + print(" display: \(details.display)") + print(" status: \(details.status)") + print(" vncUrl: \(String(describing: details.vncUrl))") + print(" ipAddress: \(String(describing: details.ipAddress))") + print(" locationName: \(details.locationName)") + + // Serialize the VM details + print("About to serialize VM details") + let response = try HTTPResponse.json(vm.details) + print("Successfully serialized VM details") + return response + } catch { + // This will catch errors from both vmController.get and the json serialization + print("ERROR: Failed to get VM details: \(error.localizedDescription)") return .badRequest(message: error.localizedDescription) } } @@ -158,15 +197,51 @@ extension Server { } func handleStopVM(name: String, storage: String? = nil) async throws -> HTTPResponse { + Logger.info( + "Stopping VM", metadata: ["name": name, "storage": String(describing: storage)]) + do { + Logger.info("Creating VM controller", metadata: ["name": name]) let vmController = LumeController() + + Logger.info("Calling stopVM on controller", metadata: ["name": name]) try await vmController.stopVM(name: name, storage: storage) + + Logger.info( + "VM stopped, waiting 5 seconds for locks to clear", metadata: ["name": name]) + + // Add a delay to ensure locks are fully released before returning + for i in 1...5 { + try? await Task.sleep(nanoseconds: 1_000_000_000) + Logger.info("Lock clearing delay", metadata: ["name": name, "seconds": "\(i)/5"]) + } + + // Verify the VM is really in a stopped state + Logger.info("Verifying VM is stopped", metadata: ["name": name]) + let vm = try? vmController.get(name: name, storage: storage) + if let vm = vm, vm.details.status == "running" { + Logger.info( + "VM still reports as running despite stop operation", + metadata: ["name": name, "severity": "warning"]) + } else { + Logger.info( + "Verification complete: VM is in stopped state", metadata: ["name": name]) + } + + Logger.info("Returning successful response", metadata: ["name": name]) return HTTPResponse( statusCode: .ok, headers: ["Content-Type": "application/json"], body: try JSONEncoder().encode(["message": "VM stopped successfully"]) ) } catch { + Logger.error( + "Failed to stop VM", + metadata: [ + "name": name, + "error": error.localizedDescription, + "storage": String(describing: storage), + ]) return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], @@ -176,14 +251,39 @@ extension Server { } func handleRunVM(name: String, body: Data?) async throws -> HTTPResponse { - let request = - body.flatMap { try? JSONDecoder().decode(RunVMRequest.self, from: $0) } - ?? RunVMRequest(noDisplay: nil, sharedDirectories: nil, recoveryMode: nil, storage: nil) + Logger.info("Running VM", metadata: ["name": name]) + + // Log the raw body data if available + if let body = body, let bodyString = String(data: body, encoding: .utf8) { + Logger.info("Run VM raw request body", metadata: ["name": name, "body": bodyString]) + } else { + Logger.info("No request body or could not decode as string", metadata: ["name": name]) + } do { + Logger.info("Creating VM controller and parsing request", metadata: ["name": name]) + let request = + body.flatMap { try? JSONDecoder().decode(RunVMRequest.self, from: $0) } + ?? RunVMRequest( + noDisplay: nil, sharedDirectories: nil, recoveryMode: nil, storage: nil) + + Logger.info( + "Parsed request", + metadata: [ + "name": name, + "noDisplay": String(describing: request.noDisplay), + "sharedDirectories": "\(request.sharedDirectories?.count ?? 0)", + "storage": String(describing: request.storage), + ]) + + Logger.info("Parsing shared directories", metadata: ["name": name]) let dirs = try request.parse() + Logger.info( + "Successfully parsed shared directories", + metadata: ["name": name, "count": "\(dirs.count)"]) // Start VM in background + Logger.info("Starting VM in background", metadata: ["name": name]) startVM( name: name, noDisplay: request.noDisplay ?? false, @@ -191,6 +291,7 @@ extension Server { recoveryMode: request.recoveryMode ?? false, storage: request.storage ) + Logger.info("VM start initiated in background", metadata: ["name": name]) // Return response immediately return HTTPResponse( @@ -203,6 +304,12 @@ extension Server { ]) ) } catch { + Logger.error( + "Failed to run VM", + metadata: [ + "name": name, + "error": error.localizedDescription, + ]) return HTTPResponse( statusCode: .badRequest, headers: ["Content-Type": "application/json"], @@ -290,7 +397,7 @@ extension Server { func handlePush(_ body: Data?) async throws -> HTTPResponse { guard let body = body, - let request = try? JSONDecoder().decode(PushRequest.self, from: body) + let request = try? JSONDecoder().decode(PushRequest.self, from: body) else { return HTTPResponse( statusCode: .badRequest, @@ -311,15 +418,16 @@ extension Server { organization: request.organization, storage: request.storage, chunkSizeMb: request.chunkSizeMb, - verbose: false, // Verbose typically handled by server logs - dryRun: false, // Default API behavior is likely non-dry-run - reassemble: false // Default API behavior is likely non-reassemble + verbose: false, // Verbose typically handled by server logs + dryRun: false, // Default API behavior is likely non-dry-run + reassemble: false // Default API behavior is likely non-reassemble + ) + print( + "Background push completed successfully for image: \(request.imageName):\(request.tags.joined(separator: ","))" ) - Logger.info("Background push completed successfully for image: \(request.imageName):\(request.tags.joined(separator: ","))") } catch { - Logger.error( - "Background push failed for image: \(request.imageName):\(request.tags.joined(separator: ","))", - metadata: ["error": error.localizedDescription] + print( + "Background push failed for image: \(request.imageName):\(request.tags.joined(separator: ",")) - Error: \(error.localizedDescription)" ) } } @@ -520,25 +628,25 @@ extension Server { } // MARK: - Log Handlers - + func handleGetLogs(type: String?, lines: Int?) async throws -> HTTPResponse { do { let logType = type?.lowercased() ?? "all" let infoPath = "/tmp/lume_daemon.log" let errorPath = "/tmp/lume_daemon.error.log" - + let fileManager = FileManager.default var response: [String: String] = [:] - + // Function to read log files func readLogFile(path: String) -> String? { guard fileManager.fileExists(atPath: path) else { return nil } - + do { let content = try String(contentsOfFile: path, encoding: .utf8) - + // If lines parameter is provided, return only the specified number of lines from the end if let lineCount = lines { let allLines = content.components(separatedBy: .newlines) @@ -546,28 +654,28 @@ extension Server { let lastLines = Array(allLines[startIndex...]) return lastLines.joined(separator: "\n") } - + return content } catch { return "Error reading log file: \(error.localizedDescription)" } } - + // Get logs based on requested type if logType == "info" || logType == "all" { response["info"] = readLogFile(path: infoPath) ?? "Info log file not found" } - + if logType == "error" || logType == "all" { response["error"] = readLogFile(path: errorPath) ?? "Error log file not found" } - + return try .json(response) } catch { return .badRequest(message: error.localizedDescription) } } - + // MARK: - Private Helper Methods nonisolated private func startVM( @@ -577,10 +685,27 @@ extension Server { recoveryMode: Bool = false, storage: String? = nil ) { + Logger.info( + "Starting VM in detached task", + metadata: [ + "name": name, + "noDisplay": "\(noDisplay)", + "recoveryMode": "\(recoveryMode)", + "storage": String(describing: storage), + ]) + Task.detached { @MainActor @Sendable in - Logger.info("Starting VM in background", metadata: ["name": name]) + Logger.info("Background task started for VM", metadata: ["name": name]) do { + Logger.info("Creating VM controller in background task", metadata: ["name": name]) let vmController = LumeController() + + Logger.info( + "Calling runVM on controller", + metadata: [ + "name": name, + "noDisplay": "\(noDisplay)", + ]) try await vmController.runVM( name: name, noDisplay: noDisplay, @@ -588,15 +713,16 @@ extension Server { recoveryMode: recoveryMode, storage: storage ) - Logger.info("VM started successfully in background", metadata: ["name": name]) + Logger.info("VM started successfully in background task", metadata: ["name": name]) } catch { Logger.error( - "Failed to start VM in background", + "Failed to start VM in background task", metadata: [ "name": name, "error": error.localizedDescription, ]) } } + Logger.info("Background task dispatched for VM", metadata: ["name": name]) } } diff --git a/libs/lume/src/VM/VM.swift b/libs/lume/src/VM/VM.swift index b9e22b98..fef55bf9 100644 --- a/libs/lume/src/VM/VM.swift +++ b/libs/lume/src/VM/VM.swift @@ -65,9 +65,16 @@ class VM { // MARK: - VM State Management private var isRunning: Bool { - // First check if we have an IP address - guard let ipAddress = DHCPLeaseParser.getIPAddress(forMAC: vmDirContext.config.macAddress!) - else { + // First check if we have a MAC address + guard let macAddress = vmDirContext.config.macAddress else { + Logger.info( + "Cannot check if VM is running: macAddress is nil", + metadata: ["name": vmDirContext.name]) + return false + } + + // Then check if we have an IP address + guard let ipAddress = DHCPLeaseParser.getIPAddress(forMAC: macAddress) else { return false } @@ -78,37 +85,35 @@ class VM { var details: VMDetails { let isRunning: Bool = self.isRunning let vncUrl = isRunning ? getVNCUrl() : nil - - // Try to load shared directories from the session file - var sharedDirs: [SharedDirectory]? = nil - - // Check if sessions file exists and load shared directories - let sessionsPath = vmDirContext.dir.sessionsPath.path - let fileExists = FileManager.default.fileExists(atPath: sessionsPath) - + + // Safely get disk size with fallback + let diskSizeValue: DiskSize do { - if fileExists { - let session = try vmDirContext.dir.loadSession() - sharedDirs = session.sharedDirectories - } + diskSizeValue = try getDiskSize() } catch { - // It's okay if we don't have a saved session - Logger.error("Failed to load session data", metadata: ["name": vmDirContext.name, "error": "\(error)"]) + Logger.error( + "Failed to get disk size", + metadata: ["name": vmDirContext.name, "error": "\(error)"]) + // Provide a fallback value to avoid crashing + diskSizeValue = DiskSize(allocated: 0, total: vmDirContext.config.diskSize ?? 0) } + // Safely access MAC address + let macAddress = vmDirContext.config.macAddress + let ipAddress: String? = + isRunning && macAddress != nil ? DHCPLeaseParser.getIPAddress(forMAC: macAddress!) : nil + return VMDetails( name: vmDirContext.name, os: getOSType(), cpuCount: vmDirContext.config.cpuCount ?? 0, memorySize: vmDirContext.config.memorySize ?? 0, - diskSize: try! getDiskSize(), + diskSize: diskSizeValue, display: vmDirContext.config.display.string, status: isRunning ? "running" : "stopped", vncUrl: vncUrl, - ipAddress: isRunning - ? DHCPLeaseParser.getIPAddress(forMAC: vmDirContext.config.macAddress!) : nil, - locationName: vmDirContext.storage ?? "default", - sharedDirectories: sharedDirs + ipAddress: ipAddress, + locationName: vmDirContext.storage ?? "default" ) } @@ -118,57 +123,84 @@ class VM { noDisplay: Bool, sharedDirectories: [SharedDirectory], mount: Path?, vncPort: Int = 0, recoveryMode: Bool = false, usbMassStoragePaths: [Path]? = nil ) async throws { + Logger.info( + "VM.run method called", + metadata: [ + "name": vmDirContext.name, + "noDisplay": "\(noDisplay)", + "recoveryMode": "\(recoveryMode)", + ]) + guard vmDirContext.initialized else { + Logger.error("VM not initialized", metadata: ["name": vmDirContext.name]) throw VMError.notInitialized(vmDirContext.name) } guard let cpuCount = vmDirContext.config.cpuCount, let memorySize = vmDirContext.config.memorySize else { + Logger.error("VM missing cpuCount or memorySize", metadata: ["name": vmDirContext.name]) throw VMError.notInitialized(vmDirContext.name) } // Try to acquire lock on config file - let fileHandle = try FileHandle(forWritingTo: vmDirContext.dir.configPath.url) - guard flock(fileHandle.fileDescriptor, LOCK_EX | LOCK_NB) == 0 else { - try? fileHandle.close() - throw VMError.alreadyRunning(vmDirContext.name) - } + Logger.info( + "Attempting to acquire lock on config file", + metadata: [ + "path": vmDirContext.dir.configPath.path, + "name": vmDirContext.name, + ]) + var fileHandle = try FileHandle(forWritingTo: vmDirContext.dir.configPath.url) - // Keep track of shared directories for logging + if flock(fileHandle.fileDescriptor, LOCK_EX | LOCK_NB) != 0 { + try? fileHandle.close() + Logger.error( + "VM already running (failed to acquire lock)", metadata: ["name": vmDirContext.name] + ) + + // Try to forcibly clear the lock before giving up + Logger.info("Attempting emergency lock cleanup", metadata: ["name": vmDirContext.name]) + unlockConfigFile() + + // Try one more time to acquire the lock + if let retryHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url), + flock(retryHandle.fileDescriptor, LOCK_EX | LOCK_NB) == 0 + { + Logger.info("Emergency lock cleanup worked", metadata: ["name": vmDirContext.name]) + // Continue with a fresh file handle + try? retryHandle.close() + // Get a completely new file handle to be safe + guard let newHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url) + else { + throw VMError.internalError("Failed to open file handle after lock cleanup") + } + // Update our main file handle + fileHandle = newHandle + } else { + // If we still can't get the lock, give up + Logger.error( + "Could not acquire lock even after emergency cleanup", + metadata: ["name": vmDirContext.name]) + throw VMError.alreadyRunning(vmDirContext.name) + } + } + Logger.info("Successfully acquired lock", metadata: ["name": vmDirContext.name]) Logger.info( "Running VM with configuration", metadata: [ + "name": vmDirContext.name, "cpuCount": "\(cpuCount)", "memorySize": "\(memorySize)", "diskSize": "\(vmDirContext.config.diskSize ?? 0)", - "macAddress": vmDirContext.config.macAddress ?? "none", - "sharedDirectoryCount": "\(sharedDirectories.count)", - "mount": mount?.path ?? "none", - "vncPort": "\(vncPort)", + "sharedDirectories": sharedDirectories.map { $0.string }.joined(separator: ", "), "recoveryMode": "\(recoveryMode)", - "usbMassStorageDeviceCount": "\(usbMassStoragePaths?.count ?? 0)", - ]) - - // Log disk paths and existence for debugging - Logger.info( - "VM disk paths", - metadata: [ - "diskPath": vmDirContext.diskPath.path, - "diskExists": - "\(FileManager.default.fileExists(atPath: vmDirContext.diskPath.path))", - "nvramPath": vmDirContext.nvramPath.path, - "nvramExists": - "\(FileManager.default.fileExists(atPath: vmDirContext.nvramPath.path))", - "configPath": vmDirContext.dir.configPath.path, - "configExists": - "\(FileManager.default.fileExists(atPath: vmDirContext.dir.configPath.path))", - "locationName": vmDirContext.storage ?? "default", ]) // Create and configure the VM do { + Logger.info( + "Creating virtualization service context", metadata: ["name": vmDirContext.name]) let config = try createVMVirtualizationServiceContext( cpuCount: cpuCount, memorySize: memorySize, @@ -178,32 +210,64 @@ class VM { recoveryMode: recoveryMode, usbMassStoragePaths: usbMassStoragePaths ) - virtualizationService = try virtualizationServiceFactory(config) + Logger.info( + "Successfully created virtualization service context", + metadata: ["name": vmDirContext.name]) - let vncInfo = try await setupSession(noDisplay: noDisplay, port: vncPort, sharedDirectories: sharedDirectories) - Logger.info("VNC info", metadata: ["vncInfo": vncInfo]) + Logger.info( + "Initializing virtualization service", metadata: ["name": vmDirContext.name]) + virtualizationService = try virtualizationServiceFactory(config) + Logger.info( + "Successfully initialized virtualization service", + metadata: ["name": vmDirContext.name]) + + Logger.info( + "Setting up VNC", + metadata: [ + "name": vmDirContext.name, + "noDisplay": "\(noDisplay)", + "port": "\(vncPort)", + ]) + let vncInfo = try await setupSession( + noDisplay: noDisplay, port: vncPort, sharedDirectories: sharedDirectories) + Logger.info( + "VNC setup successful", metadata: ["name": vmDirContext.name, "vncInfo": vncInfo]) // Start the VM guard let service = virtualizationService else { + Logger.error("Virtualization service is nil", metadata: ["name": vmDirContext.name]) throw VMError.internalError("Virtualization service not initialized") } + Logger.info( + "Starting VM via virtualization service", metadata: ["name": vmDirContext.name]) try await service.start() + Logger.info("VM started successfully", metadata: ["name": vmDirContext.name]) while true { try await Task.sleep(nanoseconds: UInt64(1e9)) } } catch { Logger.error( - "Failed to create/start VM", + "Failed in VM.run", metadata: [ - "error": "\(error)", + "name": vmDirContext.name, + "error": error.localizedDescription, "errorType": "\(type(of: error))", ]) virtualizationService = nil vncService.stop() + // Release lock + Logger.info("Releasing file lock after error", metadata: ["name": vmDirContext.name]) flock(fileHandle.fileDescriptor, LOCK_UN) try? fileHandle.close() + + // Additionally, perform our aggressive unlock to ensure no locks remain + Logger.info( + "Performing additional lock cleanup after error", + metadata: ["name": vmDirContext.name]) + unlockConfigFile() + throw error } } @@ -219,34 +283,55 @@ class VM { // If we have a virtualization service, try to stop it cleanly first if let service = virtualizationService { do { + Logger.info( + "Stopping VM via virtualization service", metadata: ["name": vmDirContext.name]) try await service.stop() virtualizationService = nil vncService.stop() Logger.info( "VM stopped successfully via virtualization service", metadata: ["name": vmDirContext.name]) + + // Try to ensure any existing locks are released + Logger.info( + "Attempting to clear any locks on config file", + metadata: ["name": vmDirContext.name]) + unlockConfigFile() + return } catch let error { Logger.error( - "Failed to stop VM via virtualization service, falling back to process termination", + "Failed to stop VM via virtualization service", metadata: [ "name": vmDirContext.name, - "error": "\(error)", + "error": error.localizedDescription, ]) // Fall through to process termination } } - // Try to open config file to get file descriptor - note that this matches with the serve process - so this is only for the command line + // Try to open config file to get file descriptor + Logger.info( + "Attempting to access config file lock", + metadata: [ + "path": vmDirContext.dir.configPath.path, + "name": vmDirContext.name, + ]) let fileHandle = try? FileHandle(forReadingFrom: vmDirContext.dir.configPath.url) guard let fileHandle = fileHandle else { - Logger.error( - "Failed to open config file - VM not running", metadata: ["name": vmDirContext.name] - ) + Logger.info( + "Failed to open config file - VM may not be running", + metadata: ["name": vmDirContext.name]) + + // Even though we couldn't open the file, try to force unlock anyway + unlockConfigFile() + throw VMError.notRunning(vmDirContext.name) } // Get the PID of the process holding the lock using lsof command + Logger.info( + "Finding process holding lock on config file", metadata: ["name": vmDirContext.name]) let task = Process() task.executableURL = URL(fileURLWithPath: "/usr/sbin/lsof") task.arguments = ["-F", "p", vmDirContext.dir.configPath.path] @@ -263,29 +348,44 @@ class VM { let pid = pid_t(pidString) else { try? fileHandle.close() - Logger.error( - "Failed to find VM process - VM not running", metadata: ["name": vmDirContext.name]) + Logger.info( + "Failed to find process holding lock - VM may not be running", + metadata: ["name": vmDirContext.name]) + + // Even though we couldn't find the process, try to force unlock + unlockConfigFile() + throw VMError.notRunning(vmDirContext.name) } + Logger.info( + "Found process \(pid) holding lock on config file", + metadata: ["name": vmDirContext.name]) + // First try graceful shutdown with SIGINT if kill(pid, SIGINT) == 0 { - Logger.info( - "Sent SIGINT to VM process", metadata: ["name": vmDirContext.name, "pid": "\(pid)"]) + Logger.info("Sent SIGINT to VM process \(pid)", metadata: ["name": vmDirContext.name]) } // Wait for process to stop with timeout var attempts = 0 while attempts < 10 { + Logger.info( + "Waiting for process \(pid) to terminate (attempt \(attempts + 1)/10)", + metadata: ["name": vmDirContext.name]) try await Task.sleep(nanoseconds: 1_000_000_000) // Check if process still exists if kill(pid, 0) != 0 { // Process is gone, do final cleanup + Logger.info("Process \(pid) has terminated", metadata: ["name": vmDirContext.name]) virtualizationService = nil vncService.stop() try? fileHandle.close() + // Force unlock the config file + unlockConfigFile() + Logger.info( "VM stopped successfully via process termination", metadata: ["name": vmDirContext.name]) @@ -296,8 +396,11 @@ class VM { // If graceful shutdown failed, force kill the process Logger.info( - "Graceful shutdown failed, forcing termination", metadata: ["name": vmDirContext.name]) + "Graceful shutdown failed, forcing termination of process \(pid)", + metadata: ["name": vmDirContext.name]) if kill(pid, SIGKILL) == 0 { + Logger.info("Sent SIGKILL to process \(pid)", metadata: ["name": vmDirContext.name]) + // Wait a moment for the process to be fully killed try await Task.sleep(nanoseconds: 2_000_000_000) @@ -306,16 +409,124 @@ class VM { vncService.stop() try? fileHandle.close() + // Force unlock the config file + unlockConfigFile() + Logger.info("VM forcefully stopped", metadata: ["name": vmDirContext.name]) return } // If we get here, something went very wrong try? fileHandle.close() - Logger.error("Failed to stop VM", metadata: ["name": vmDirContext.name, "pid": "\(pid)"]) + Logger.error( + "Failed to stop VM - could not terminate process \(pid)", + metadata: ["name": vmDirContext.name]) + + // As a last resort, try to force unlock + unlockConfigFile() + throw VMError.internalError("Failed to stop VM process") } + // Helper method to forcibly clear any locks on the config file + private func unlockConfigFile() { + Logger.info( + "Forcibly clearing locks on config file", + metadata: [ + "path": vmDirContext.dir.configPath.path, + "name": vmDirContext.name, + ]) + + // First attempt: standard unlock methods + if let fileHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url) { + // Use F_GETLK and F_SETLK to check and clear locks + var lockInfo = flock() + lockInfo.l_type = Int16(F_UNLCK) + lockInfo.l_whence = Int16(SEEK_SET) + lockInfo.l_start = 0 + lockInfo.l_len = 0 + + // Try to unlock the file using fcntl + _ = fcntl(fileHandle.fileDescriptor, F_SETLK, &lockInfo) + + // Also try the regular flock method + flock(fileHandle.fileDescriptor, LOCK_UN) + + try? fileHandle.close() + Logger.info("Standard unlock attempts performed", metadata: ["name": vmDirContext.name]) + } + + // Second attempt: try to acquire and immediately release a fresh lock + if let tempHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url) { + if flock(tempHandle.fileDescriptor, LOCK_EX | LOCK_NB) == 0 { + Logger.info( + "Successfully acquired and released lock to reset state", + metadata: ["name": vmDirContext.name]) + flock(tempHandle.fileDescriptor, LOCK_UN) + } else { + Logger.info( + "Could not acquire lock for resetting - may still be locked", + metadata: ["name": vmDirContext.name]) + } + try? tempHandle.close() + } + + // Third attempt (most aggressive): copy the config file, remove the original, and restore + Logger.info( + "Trying aggressive method: backup and restore config file", + metadata: ["name": vmDirContext.name]) + // Only proceed if the config file exists + let fileManager = FileManager.default + let configPath = vmDirContext.dir.configPath.path + let backupPath = configPath + ".backup" + + if fileManager.fileExists(atPath: configPath) { + // Create a backup of the config file + if let configData = try? Data(contentsOf: URL(fileURLWithPath: configPath)) { + // Make backup + try? configData.write(to: URL(fileURLWithPath: backupPath)) + + // Remove the original file to clear all locks + try? fileManager.removeItem(atPath: configPath) + Logger.info( + "Removed original config file to clear locks", + metadata: ["name": vmDirContext.name]) + + // Wait a moment for OS to fully release resources + Thread.sleep(forTimeInterval: 0.1) + + // Restore from backup + try? configData.write(to: URL(fileURLWithPath: configPath)) + Logger.info( + "Restored config file from backup", metadata: ["name": vmDirContext.name]) + } else { + Logger.error( + "Could not read config file content for backup", + metadata: ["name": vmDirContext.name]) + } + } else { + Logger.info( + "Config file does not exist, cannot perform aggressive unlock", + metadata: ["name": vmDirContext.name]) + } + + // Final check + if let finalHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url) { + let lockResult = flock(finalHandle.fileDescriptor, LOCK_EX | LOCK_NB) + if lockResult == 0 { + Logger.info( + "Lock successfully cleared - verified by acquiring test lock", + metadata: ["name": vmDirContext.name]) + flock(finalHandle.fileDescriptor, LOCK_UN) + } else { + Logger.info( + "Lock still present after all clearing attempts", + metadata: ["name": vmDirContext.name, "severity": "warning"]) + } + try? finalHandle.close() + } + } + // MARK: - Resource Management func updateVMConfig(vmConfig: VMConfig) throws { @@ -422,40 +633,44 @@ class VM { guard let url = vncService.url else { throw VMError.vncNotConfigured } - + return url } - + /// Saves the session information including shared directories to disk private func saveSessionData(url: String, sharedDirectories: [SharedDirectory]) { do { - let session = VNCSession(url: url, sharedDirectories: sharedDirectories.isEmpty ? nil : sharedDirectories) + let session = VNCSession( + url: url, sharedDirectories: sharedDirectories.isEmpty ? nil : sharedDirectories) try vmDirContext.dir.saveSession(session) - Logger.info("Saved VNC session with shared directories", - metadata: [ - "count": "\(sharedDirectories.count)", - "dirs": "\(sharedDirectories.map { $0.hostPath }.joined(separator: ", "))", - "sessionsPath": "\(vmDirContext.dir.sessionsPath.path)" - ]) + Logger.info( + "Saved VNC session with shared directories", + metadata: [ + "count": "\(sharedDirectories.count)", + "dirs": "\(sharedDirectories.map { $0.hostPath }.joined(separator: ", "))", + "sessionsPath": "\(vmDirContext.dir.sessionsPath.path)", + ]) } catch { Logger.error("Failed to save VNC session", metadata: ["error": "\(error)"]) } } - + /// Main session setup method that handles VNC and persists session data - private func setupSession(noDisplay: Bool, port: Int = 0, sharedDirectories: [SharedDirectory] = []) async throws -> String { + private func setupSession( + noDisplay: Bool, port: Int = 0, sharedDirectories: [SharedDirectory] = [] + ) async throws -> String { // Start the VNC service and get the URL let url = try await startVNCService(port: port) - + // Save the session data saveSessionData(url: url, sharedDirectories: sharedDirectories) - + // Open the VNC client if needed if !noDisplay { - Logger.info("Starting VNC session") + Logger.info("Starting VNC session", metadata: ["name": vmDirContext.name]) try await vncService.openClient(url: url) } - + return url } @@ -599,7 +814,8 @@ class VM { ) virtualizationService = try virtualizationServiceFactory(config) - let vncInfo = try await setupSession(noDisplay: noDisplay, port: vncPort, sharedDirectories: sharedDirectories) + let vncInfo = try await setupSession( + noDisplay: noDisplay, port: vncPort, sharedDirectories: sharedDirectories) Logger.info("VNC info", metadata: ["vncInfo": vncInfo]) // Start the VM diff --git a/libs/lumier/src/bin/entry.sh b/libs/lumier/src/bin/entry.sh index c9cd2f0f..d6f96f52 100755 --- a/libs/lumier/src/bin/entry.sh +++ b/libs/lumier/src/bin/entry.sh @@ -64,34 +64,58 @@ fi echo "Lumier VM is starting..." # Cleanup function to ensure VM and noVNC proxy shutdown on container stop +# Counter for signal handling +SIGNAL_COUNT=0 + cleanup() { + local signal_name=$1 set +e # Don't exit on error in cleanup - echo "[cleanup] Caught signal, shutting down..." - # Check if we're in the middle of an image pull - if [[ "$PULL_IN_PROGRESS" == "1" ]]; then - echo "[cleanup] Interrupted during image pull, skipping VM stop." - else - echo "[cleanup] Stopping VM..." - stop_vm true - fi + # Increment signal counter + SIGNAL_COUNT=$((SIGNAL_COUNT + 1)) - # Attempt to clean up ephemeral storage if it's in the /private/tmp directory - if [[ "$HOST_STORAGE_PATH" == "ephemeral" ]]; then - # First check if VM actually exists - VM_INFO=$(lume_get "$VM_NAME" "$HOST_STORAGE_PATH" "json" "false") + # If this is the first signal, try graceful shutdown + if [ $SIGNAL_COUNT -eq 1 ]; then + echo "[cleanup] Caught $signal_name signal, shutting down..." - # Only try VM deletion if VM exists and not in the middle of a pull - if [[ "$PULL_IN_PROGRESS" != "1" && $VM_INFO != *"Virtual machine not found"* ]]; then - echo "[cleanup] Cleaning up VM..." - lume_delete "$VM_NAME" "$HOST_STORAGE_PATH" > /dev/null 2>&1 + # Check if we're in the middle of an image pull + if [[ "$PULL_IN_PROGRESS" == "1" ]]; then + echo "[cleanup] Interrupted during image pull, skipping VM stop." + else + echo "[cleanup] Stopping VM..." + stop_vm true fi + + # Attempt to clean up ephemeral storage if it's in the /private/tmp directory + if [[ "$HOST_STORAGE_PATH" == "ephemeral" ]]; then + # First check if VM actually exists + VM_INFO=$(lume_get "$VM_NAME" "$HOST_STORAGE_PATH" "json" "false") + + # Only try VM deletion if VM exists and not in the middle of a pull + if [[ "$PULL_IN_PROGRESS" != "1" && $VM_INFO != *"Virtual machine not found"* ]]; then + echo "[cleanup] Cleaning up VM..." + lume_delete "$VM_NAME" "$HOST_STORAGE_PATH" > /dev/null 2>&1 + fi + fi + else + # For multiple signals, force an immediate exit + echo "got $SIGNAL_COUNT SIGTERM/SIGINTs, forcefully exiting" fi - exit 0 + # If we've received multiple signals, just exit immediately + if [ $SIGNAL_COUNT -ge 3 ]; then + exit 1 + fi + + # Exit with success for the first signal + if [ $SIGNAL_COUNT -eq 1 ]; then + exit 0 + fi } # Ensure we catch all typical container termination signals -trap cleanup SIGTERM SIGINT SIGHUP +trap 'cleanup SIGTERM' SIGTERM +trap 'cleanup SIGINT' SIGINT +trap 'cleanup SIGHUP' SIGHUP # Now enable strict error handling after initialization set -euo pipefail @@ -116,4 +140,14 @@ if [ -n "${VNC_PORT:-}" ] && [ -n "${VNC_PASSWORD:-}" ]; then fi echo "Lumier is running. Press Ctrl+C to stop." -tail -f /dev/null \ No newline at end of file + +# Instead of tail -f /dev/null, use a wait loop that can be interrupted by signals +while true; do + # Sleep in small increments to make signal handling more responsive + sleep 1 & + wait $! + # Break the loop if we've received a signal + if [ $SIGNAL_COUNT -gt 0 ]; then + break + fi +done \ No newline at end of file diff --git a/libs/lumier/src/lib/vm.sh b/libs/lumier/src/lib/vm.sh index 54379767..8e87ffa5 100755 --- a/libs/lumier/src/lib/vm.sh +++ b/libs/lumier/src/lib/vm.sh @@ -312,6 +312,21 @@ lume_pull() { # Inform users how to check pull progress echo "You can check the pull progress using: lume logs -f" + # Always print the curl command before executing + echo "" + echo "EXECUTING PULL COMMAND:" + echo "curl -X POST \\ + -H \"Content-Type: application/json\" \\ + -d '{ + \"image\": \"$image\", + \"name\": \"$vm_name\", + \"registry\": \"$registry\", + \"organization\": \"$organization\", + \"storage\": \"$storage\" + }' \\ + \"http://${api_host}:${api_port}/lume/pull\"" + echo "" + # Pull image via API and capture response local response if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then