Fix file lock at stop

This commit is contained in:
f-trycua
2025-05-12 13:42:29 -07:00
parent 9fdcf2f7df
commit d0a0a344a0
16 changed files with 2193 additions and 322 deletions

View File

@@ -22,8 +22,6 @@ for path in pythonpath.split(":"):
from computer import Computer, VMProviderType
from computer.logger import LogLevel
from computer.utils import get_image_size
async def main():
try:
@@ -31,15 +29,20 @@ async def main():
# Create computer with configured host
computer = Computer(
display="1024x768", # Higher resolution
memory="8GB", # More memory
cpu="4", # More CPU cores
display="1024x768",
memory="8GB",
cpu="4",
os_type="macos",
verbosity=LogLevel.NORMAL, # Use QUIET to suppress most logs
use_host_computer_server=False,
provider_type=VMProviderType.LUME, # Explicitly use the Lume provider
provider_type=VMProviderType.LUME,
storage="/Users/francescobonacci/repos/trycua/computer/examples/storage",
# shared_directories=[
# "/Users/francescobonacci/repos/trycua/computer/examples/shared"
# ]
)
try:
# Run the computer with default parameters
await computer.run()
await computer.interface.hotkey("command", "space")
@@ -89,8 +92,7 @@ async def main():
finally:
# Important to clean up resources
pass
# await computer.stop()
await computer.stop()
except Exception as e:
print(f"Error in main: {e}")
traceback.print_exc()

View File

@@ -35,8 +35,9 @@ class Computer:
telemetry_enabled: bool = True,
provider_type: Union[str, VMProviderType] = VMProviderType.LUME,
port: Optional[int] = 3000,
noVNC_port: Optional[int] = 8006,
host: str = os.environ.get("PYLUME_HOST", "localhost"),
storage_path: Optional[str] = None,
storage: Optional[str] = None # Path for persistent VM storage (Lumier provider)
):
"""Initialize a new Computer instance.
@@ -48,7 +49,7 @@ class Computer:
Defaults to "1024x768"
memory: The VM memory allocation. Defaults to "8GB"
cpu: The VM CPU allocation. Defaults to "4"
os: The operating system type ('macos' or 'linux')
os_type: The operating system type ('macos' or 'linux')
name: The VM name
image: The VM image name
shared_directories: Optional list of directory paths to share with the VM
@@ -58,9 +59,9 @@ class Computer:
telemetry_enabled: Whether to enable telemetry tracking. Defaults to True.
provider_type: The VM provider type to use (lume, qemu, cloud)
port: Optional port to use for the VM provider server
noVNC_port: Optional port for the noVNC web interface (Lumier provider)
host: Host to use for VM provider connections (e.g. "localhost", "host.docker.internal")
bin_path: Optional path to the VM provider binary
storage_path: Optional path to store VM data
storage: Optional path for persistent VM storage (Lumier provider)
"""
self.logger = Logger("cua.computer", verbosity)
@@ -69,10 +70,18 @@ class Computer:
# Store original parameters
self.image = image
self.port = port
self.noVNC_port = noVNC_port
self.host = host
self.os_type = os_type
self.provider_type = provider_type
self.storage_path = storage_path
self.storage = storage
# For Lumier provider, store the first shared directory path to use
# for VM file sharing
self.shared_path = None
if shared_directories and len(shared_directories) > 0:
self.shared_path = shared_directories[0]
self.logger.info(f"Using first shared directory for VM file sharing: {self.shared_path}")
# Store telemetry preference
self._telemetry_enabled = telemetry_enabled
@@ -202,12 +211,29 @@ class Computer:
# Configure provider based on initialization parameters
provider_kwargs = {
"storage_path": self.storage_path,
"storage": self.storage,
"verbose": self.verbosity >= LogLevel.DEBUG,
}
# Set port if specified
if self.port is not None:
# VM name is already set in self.config.name and will be used when calling provider methods
# For Lumier provider, add specific configuration
if self.provider_type == VMProviderType.LUMIER:
# Pass VM image to LumierProvider
provider_kwargs["image"] = self.image
self.logger.info(f"Using VM image for Lumier provider: {self.image}")
# Add shared_path if specified (for file sharing between host and VM)
if self.shared_path:
provider_kwargs["shared_path"] = self.shared_path
self.logger.info(f"Using shared path for Lumier provider: {self.shared_path}")
# Add noVNC_port if specified (for web interface)
if self.noVNC_port:
provider_kwargs["noVNC_port"] = self.noVNC_port
self.logger.info(f"Using noVNC port for Lumier provider: {self.noVNC_port}")
elif self.port is not None:
# For other providers, set port if specified
provider_kwargs["port"] = self.port
self.logger.verbose(f"Using specified port for provider: {self.port}")
@@ -257,53 +283,52 @@ class Computer:
path = os.path.abspath(os.path.expanduser(path))
if not os.path.exists(path):
self.logger.warning(f"Shared directory does not exist: {path}")
continue
shared_dirs.append({"host_path": path, "vm_path": path})
# Create VM run options with specs from config
# Account for optional shared directories
# Define VM run options
run_opts = {
"cpu": int(self.config.cpu),
"noDisplay": False,
"sharedDirectories": shared_dirs,
"display": self.config.display,
"memory": self.config.memory,
"display": {
"width": self.config.display.width,
"height": self.config.display.height
}
"cpu": self.config.cpu
}
if shared_dirs:
run_opts["shared_directories"] = shared_dirs
# Log the run options for debugging
# For Lumier provider, pass the noVNC_port if specified
if self.provider_type == VMProviderType.LUMIER and self.noVNC_port is not None:
run_opts["noVNC_port"] = self.noVNC_port
self.logger.info(f"Using noVNC_port {self.noVNC_port} for Lumier provider")
self.logger.info(f"VM run options: {run_opts}")
# Log the equivalent curl command for debugging
payload = json.dumps({"noDisplay": False, "sharedDirectories": []})
curl_cmd = f"curl -X POST 'http://localhost:3000/lume/vms/{self.config.name}/run' -H 'Content-Type: application/json' -d '{payload}'"
# self.logger.info(f"Equivalent curl command:")
# self.logger.info(f"{curl_cmd}")
try:
if self.config.vm_provider is None:
raise RuntimeError(f"VM provider not initialized for {self.config.name}")
response = await self.config.vm_provider.run_vm(self.config.name, run_opts)
response = await self.config.vm_provider.run_vm(
name=self.config.name,
run_opts=run_opts,
storage=self.storage # Pass storage explicitly for clarity
)
self.logger.info(f"VM run response: {response if response else 'None'}")
except Exception as run_error:
self.logger.error(f"Failed to run VM: {run_error}")
raise RuntimeError(f"Failed to start VM: {run_error}")
# Wait for VM to be ready with required properties
self.logger.info("Waiting for VM to be ready...")
# Wait for VM to be ready with a valid IP address
self.logger.info("Waiting for VM to be ready with a valid IP address...")
try:
ip = await self.get_ip()
if ip:
self.logger.info(f"VM is ready with IP: {ip}")
# Store the IP address for later use instead of returning early
ip_address = ip
else:
# If no IP was found, try to raise a helpful error
raise RuntimeError(f"VM {self.config.name} failed to get IP address")
# Use the enhanced get_ip method that includes retry logic
max_retries = 30 # Increased for initial VM startup
retry_delay = 2 # 2 seconds between retries
self.logger.info(f"Waiting up to {max_retries * retry_delay} seconds for VM to be ready...")
ip = await self.get_ip(max_retries=max_retries, retry_delay=retry_delay)
# If we get here, we have a valid IP
self.logger.info(f"VM is ready with IP: {ip}")
ip_address = ip
except TimeoutError as timeout_error:
self.logger.error(str(timeout_error))
raise RuntimeError(f"VM startup timed out: {timeout_error}")
except Exception as wait_error:
self.logger.error(f"Error waiting for VM: {wait_error}")
raise RuntimeError(f"VM failed to become ready: {wait_error}")
@@ -312,6 +337,10 @@ class Computer:
raise RuntimeError(f"Failed to initialize computer: {e}")
try:
# Verify we have a valid IP before initializing the interface
if not ip_address or ip_address == "unknown" or ip_address == "0.0.0.0":
raise RuntimeError(f"Cannot initialize interface - invalid IP address: {ip_address}")
# Initialize the interface using the factory with the specified OS
self.logger.info(f"Initializing interface for {self.os_type} at {ip_address}")
from .interface.base import BaseComputerInterface
@@ -328,10 +357,11 @@ class Computer:
try:
# Use a single timeout for the entire connection process
await self._interface.wait_for_ready(timeout=60)
# The VM should already be ready at this point, so we're just establishing the connection
await self._interface.wait_for_ready(timeout=30)
self.logger.info("WebSocket interface connected successfully")
except TimeoutError as e:
self.logger.error("Failed to connect to WebSocket interface")
self.logger.error(f"Failed to connect to WebSocket interface at {ip_address}")
raise TimeoutError(
f"Could not connect to WebSocket interface at {ip_address}:8000/ws: {str(e)}"
)
@@ -359,44 +389,26 @@ class Computer:
start_time = time.time()
try:
if self._running:
self._running = False
self.logger.info("Stopping Computer...")
self.logger.info("Stopping Computer...")
if hasattr(self, "_stop_event"):
self._stop_event.set()
if hasattr(self, "_keep_alive_task"):
await self._keep_alive_task
if self._interface: # Only try to close interface if it exists
self.logger.verbose("Closing interface...")
# For host computer server, just use normal close to keep the server running
if self.use_host_computer_server:
self._interface.close()
else:
# For VM mode, force close the connection
if hasattr(self._interface, "force_close"):
self._interface.force_close()
else:
self._interface.close()
if not self.use_host_computer_server and self._provider_context:
# In VM mode, first explicitly stop the VM, then exit the provider context
if not self.use_host_computer_server and self._provider_context and self.config.vm_provider is not None:
try:
self.logger.info(f"Stopping VM {self.config.name}...")
if self.config.vm_provider is not None:
await self.config.vm_provider.stop_vm(self.config.name)
await self.config.vm_provider.stop_vm(
name=self.config.name,
storage=self.storage # Pass storage explicitly for clarity
)
except Exception as e:
self.logger.error(f"Error stopping VM: {e}")
self.logger.verbose("Closing VM provider context...")
if self.config.vm_provider is not None:
await self.config.vm_provider.__aexit__(None, None, None)
await self.config.vm_provider.__aexit__(None, None, None)
self._provider_context = None
self.logger.info("Computer stopped")
except Exception as e:
self.logger.debug(
f"Error during cleanup: {e}"
) # Log as debug since this might be expected
self.logger.debug(f"Error during cleanup: {e}") # Log as debug since this might be expected
finally:
# Log stop time for performance monitoring
duration_ms = (time.time() - start_time) * 1000
@@ -404,12 +416,69 @@ class Computer:
return
# @property
async def get_ip(self) -> str:
"""Get the IP address of the VM or localhost if using host computer server."""
async def get_ip(self, max_retries: int = 15, retry_delay: int = 2) -> str:
"""Get the IP address of the VM or localhost if using host computer server.
Args:
max_retries: Maximum number of retries to get the IP (default: 15)
retry_delay: Delay between retries in seconds (default: 2)
Returns:
IP address of the VM or localhost if using host computer server
Raises:
TimeoutError: If unable to get a valid IP address after retries
"""
if self.use_host_computer_server:
return "127.0.0.1"
ip = await self.config.get_ip()
return ip or "unknown" # Return "unknown" if ip is None
# Try multiple times to get a valid IP
for attempt in range(1, max_retries + 1):
if attempt > 1:
self.logger.info(f"Retrying to get VM IP address (attempt {attempt}/{max_retries})...")
try:
# Get VM information from the provider
if self.config.vm_provider is None:
raise RuntimeError("VM provider is not initialized")
# Get VM info from provider with explicit storage parameter
vm_info = await self.config.vm_provider.get_vm(
name=self.config.name,
storage=self.storage # Pass storage explicitly for clarity
)
# Check if we got a valid IP
ip = vm_info.get("ip_address", None)
if ip and ip != "unknown" and not ip.startswith("0.0.0.0"):
self.logger.info(f"Got valid VM IP address: {ip}")
return ip
# Check the VM status
status = vm_info.get("status", "unknown")
# If the VM is in a non-running state (stopped, paused, etc.)
# raise a more informative error instead of waiting
if status in ["stopped"]:
raise RuntimeError(f"VM is not running yet (status: {status})")
# If VM is starting or initializing, wait and retry
if status != "running":
self.logger.info(f"VM is not running yet (status: {status}). Waiting...")
await asyncio.sleep(retry_delay)
continue
# If VM is running but no IP yet, wait and retry
self.logger.info("VM is running but no valid IP address yet. Waiting...")
except Exception as e:
self.logger.warning(f"Error getting VM IP: {e}")
await asyncio.sleep(retry_delay)
# If we get here, we couldn't get a valid IP after all retries
raise TimeoutError(f"Failed to get valid IP address for VM {self.config.name} after {max_retries} attempts")
async def wait_vm_ready(self) -> Optional[Dict[str, Any]]:
"""Wait for VM to be ready with an IP address.
@@ -434,7 +503,11 @@ class Computer:
try:
# Keep polling for VM info
vm = await self.config.vm_provider.get_vm(self.config.name)
if self.config.vm_provider is None:
self.logger.error("VM provider is not initialized")
vm = None
else:
vm = await self.config.vm_provider.get_vm(self.config.name)
# Log full VM properties for debugging (every 30 attempts)
if attempts % 30 == 0:
@@ -492,9 +565,9 @@ class Computer:
try:
if self.config.vm_provider is not None:
vm = await self.config.vm_provider.get_vm(self.config.name)
# VMStatus is a Pydantic model with attributes, not a dictionary
status = vm.status if vm else "unknown"
ip = vm.ip_address if vm else None
# VM data is returned as a dictionary from the Lumier provider
status = vm.get('status', 'unknown') if vm else "unknown"
ip = vm.get('ip_address') if vm else None
else:
status = "unknown"
ip = None
@@ -516,7 +589,11 @@ class Computer:
"memory": memory or self.config.memory
}
if self.config.vm_provider is not None:
await self.config.vm_provider.update_vm(self.config.name, update_opts)
await self.config.vm_provider.update_vm(
name=self.config.name,
update_opts=update_opts,
storage=self.storage # Pass storage explicitly for clarity
)
else:
raise RuntimeError("VM provider not initialized")

View File

@@ -17,7 +17,6 @@ class MacOSComputerInterface(BaseComputerInterface):
def __init__(self, ip_address: str, username: str = "lume", password: str = "lume"):
super().__init__(ip_address, username, password)
self.ws_uri = f"ws://{ip_address}:8000/ws"
self._ws = None
self._reconnect_task = None
self._closed = False
@@ -31,6 +30,15 @@ class MacOSComputerInterface(BaseComputerInterface):
# Set logger name for MacOS interface
self.logger = Logger("cua.interface.macos", LogLevel.NORMAL)
@property
def ws_uri(self) -> str:
"""Get the WebSocket URI using the current IP address.
Returns:
WebSocket URI for the Computer API Server
"""
return f"ws://{self.ip_address}:8000/ws"
async def _keep_alive(self):
"""Keep the WebSocket connection alive with automatic reconnection."""
retry_count = 0

View File

@@ -37,6 +37,11 @@ class Computer:
return None
vm = await self.vm_provider.get_vm(self.name)
# PyLume returns a VMStatus object, not a dictionary
# Access ip_address as an attribute, not with .get()
return vm.ip_address if vm else None
# Handle both object attribute and dictionary access for ip_address
if vm:
if isinstance(vm, dict):
return vm.get("ip_address")
else:
# Access as attribute for object-based return values
return getattr(vm, "ip_address", None)
return None

View File

@@ -8,6 +8,7 @@ from typing import Dict, List, Optional, Any, AsyncContextManager
class VMProviderType(str, Enum):
"""Enum of supported VM provider types."""
LUME = "lume"
LUMIER = "lumier"
QEMU = "qemu"
CLOUD = "cloud"
UNKNOWN = "unknown"
@@ -26,8 +27,17 @@ class BaseVMProvider(AsyncContextManager):
pass
@abc.abstractmethod
async def get_vm(self, name: str) -> Dict[str, Any]:
"""Get VM information by name."""
async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Get VM information by name.
Args:
name: Name of the VM to get information for
storage: Optional storage path override. If provided, this will be used
instead of the provider's default storage path.
Returns:
Dictionary with VM information including status, IP address, etc.
"""
pass
@abc.abstractmethod
@@ -36,16 +46,45 @@ class BaseVMProvider(AsyncContextManager):
pass
@abc.abstractmethod
async def run_vm(self, name: str, run_opts: Dict[str, Any]) -> Dict[str, Any]:
"""Run a VM with the given options."""
async def run_vm(self, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
"""Run a VM by name with the given options.
Args:
name: Name of the VM to run
run_opts: Dictionary of run options (memory, cpu, etc.)
storage: Optional storage path override. If provided, this will be used
instead of the provider's default storage path.
Returns:
Dictionary with VM run status and information
"""
pass
@abc.abstractmethod
async def stop_vm(self, name: str) -> Dict[str, Any]:
"""Stop a running VM."""
async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Stop a VM by name.
Args:
name: Name of the VM to stop
storage: Optional storage path override. If provided, this will be used
instead of the provider's default storage path.
Returns:
Dictionary with VM stop status and information
"""
pass
@abc.abstractmethod
async def update_vm(self, name: str, update_opts: Dict[str, Any]) -> Dict[str, Any]:
"""Update VM configuration."""
async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
"""Update VM configuration.
Args:
name: Name of the VM to update
update_opts: Dictionary of update options (memory, cpu, etc.)
storage: Optional storage path override. If provided, this will be used
instead of the provider's default storage path.
Returns:
Dictionary with VM update status and information
"""
pass

View File

@@ -14,13 +14,29 @@ class VMProviderFactory:
@staticmethod
def create_provider(
provider_type: Union[str, VMProviderType],
**kwargs
port: Optional[int] = None,
host: str = "localhost",
bin_path: Optional[str] = None,
storage: Optional[str] = None,
shared_path: Optional[str] = None,
image: Optional[str] = None,
verbose: bool = False,
ephemeral: bool = False,
noVNC_port: Optional[int] = None
) -> BaseVMProvider:
"""Create a VM provider of the specified type.
Args:
provider_type: Type of VM provider to create
**kwargs: Additional arguments to pass to the provider constructor
port: Port for the API server
host: Hostname for the API server
bin_path: Path to provider binary if needed
storage: Path for persistent VM storage
shared_path: Path for shared folder between host and VM
image: VM image to use (for Lumier provider)
verbose: Enable verbose logging
ephemeral: Use ephemeral (temporary) storage
noVNC_port: Specific port for noVNC interface (for Lumier provider)
Returns:
An instance of the requested VM provider
@@ -44,17 +60,53 @@ class VMProviderFactory:
"The pylume package is required for LumeProvider. "
"Please install it with 'pip install cua-computer[lume]'"
)
return LumeProvider(**kwargs)
return LumeProvider(
port=port,
host=host,
bin_path=bin_path,
storage=storage,
verbose=verbose
)
except ImportError as e:
logger.error(f"Failed to import LumeProvider: {e}")
raise ImportError(
"The pylume package is required for LumeProvider. "
"Please install it with 'pip install cua-computer[lume]'"
) from e
elif provider_type == VMProviderType.LUMIER:
try:
from .lumier import LumierProvider, HAS_LUMIER
if not HAS_LUMIER:
raise ImportError(
"Docker is required for LumierProvider. "
"Please install Docker for Apple Silicon and Lume CLI before using this provider."
)
return LumierProvider(
port=port,
host=host,
storage=storage,
shared_path=shared_path,
image=image or "macos-sequoia-cua:latest",
verbose=verbose,
ephemeral=ephemeral,
noVNC_port=noVNC_port
)
except ImportError as e:
logger.error(f"Failed to import LumierProvider: {e}")
raise ImportError(
"Docker and Lume CLI are required for LumierProvider. "
"Please install Docker for Apple Silicon and run the Lume installer script."
) from e
elif provider_type == VMProviderType.QEMU:
try:
from .qemu import QEMUProvider
return QEMUProvider(**kwargs)
return QEMUProvider(
bin_path=bin_path,
storage=storage,
port=port,
host=host,
verbose=verbose
)
except ImportError as e:
logger.error(f"Failed to import QEMUProvider: {e}")
raise ImportError(
@@ -64,7 +116,13 @@ class VMProviderFactory:
elif provider_type == VMProviderType.CLOUD:
try:
from .cloud import CloudProvider
return CloudProvider(**kwargs)
# Cloud provider might need different parameters, but including basic ones
return CloudProvider(
host=host,
port=port,
storage=storage,
verbose=verbose
)
except ImportError as e:
logger.error(f"Failed to import CloudProvider: {e}")
raise ImportError(

View File

@@ -1,73 +1,70 @@
"""Lume VM provider implementation."""
"""Lume VM provider implementation using curl commands.
This provider uses direct curl commands to interact with the Lume API,
removing the dependency on the pylume Python package.
"""
import os
import re
import asyncio
import json
import subprocess
import logging
from typing import Dict, List, Optional, Any, Tuple, TypeVar, Type
# Only import pylume when this module is actually used
try:
from pylume import PyLume
from pylume.models import VMRunOpts, VMUpdateOpts, ImageRef, SharedDirectory, VMStatus
HAS_PYLUME = True
except ImportError:
HAS_PYLUME = False
# Create dummy classes for type checking
class PyLume:
pass
class VMRunOpts:
pass
class VMUpdateOpts:
pass
class ImageRef:
pass
class SharedDirectory:
pass
class VMStatus:
pass
from typing import Dict, Any, Optional, List, Tuple
from ..base import BaseVMProvider, VMProviderType
from ...logger import Logger, LogLevel
from ..lume_api import (
lume_api_get,
lume_api_run,
lume_api_stop,
lume_api_update,
HAS_CURL,
parse_memory
)
# Setup logging
logger = logging.getLogger(__name__)
class LumeProvider(BaseVMProvider):
"""Lume VM provider implementation using pylume."""
"""Lume VM provider implementation using direct curl commands.
This provider uses curl to interact with the Lume API server,
removing the dependency on the pylume Python package.
"""
def __init__(
self,
port: Optional[int] = None,
host: str = "localhost",
bin_path: Optional[str] = None,
storage_path: Optional[str] = None,
storage: Optional[str] = None,
verbose: bool = False,
**kwargs
):
"""Initialize the Lume provider.
Args:
port: Optional port to use for the PyLume server
host: Host to use for PyLume connections
bin_path: Optional path to the Lume binary
storage_path: Optional path to store VM data
port: Port for the Lume API server (default: 3000)
host: Host to use for API connections (default: localhost)
bin_path: Optional path to the Lume binary (not used directly)
storage: Path to store VM data
verbose: Enable verbose logging
"""
if not HAS_PYLUME:
if not HAS_CURL:
raise ImportError(
"The pylume package is required for LumeProvider. "
"Please install it with 'pip install cua-computer[lume]'"
"curl is required for LumeProvider. "
"Please ensure it is installed and in your PATH."
)
# PyLume doesn't accept bin_path or storage_path parameters
# Convert verbose to debug parameter for PyLume
self._pylume = PyLume(
port=port,
host=host,
debug=verbose,
**kwargs
)
# Store these for reference, even though PyLume doesn't use them directly
self._bin_path = bin_path
self._storage_path = storage_path
self._context = None
self.host = host
self.port = port or 3000 # Default port for Lume API
self.storage = storage
self.bin_path = bin_path
self.verbose = verbose
# Base API URL for Lume API calls
self.api_base_url = f"http://{self.host}:{self.port}"
@property
def provider_type(self) -> VMProviderType:
@@ -76,54 +73,225 @@ class LumeProvider(BaseVMProvider):
async def __aenter__(self):
"""Enter async context manager."""
self._context = await self._pylume.__aenter__()
# No initialization needed, just return self
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit async context manager."""
if self._context:
await self._pylume.__aexit__(exc_type, exc_val, exc_tb)
self._context = None
# No cleanup needed
pass
async def get_vm(self, name: str) -> VMStatus:
"""Get VM information by name."""
# PyLume get_vm returns a VMStatus object, not a dictionary
return await self._pylume.get_vm(name)
def _lume_api_get(self, vm_name: str = "", storage: Optional[str] = None, debug: bool = False) -> Dict[str, Any]:
"""Get VM information using shared lume_api function.
Args:
vm_name: Optional name of the VM to get info for.
If empty, lists all VMs.
storage: Optional storage path override. If provided, this will be used instead of self.storage
debug: Whether to show debug output
Returns:
Dictionary with VM status information parsed from JSON response
"""
# Use the shared implementation from lume_api module
return lume_api_get(
vm_name=vm_name,
host=self.host,
port=self.port,
storage=storage if storage is not None else self.storage,
debug=debug,
verbose=self.verbose
)
def _lume_api_run(self, vm_name: str, run_opts: Dict[str, Any], debug: bool = False) -> Dict[str, Any]:
"""Run a VM using shared lume_api function.
Args:
vm_name: Name of the VM to run
run_opts: Dictionary of run options
debug: Whether to show debug output
Returns:
Dictionary with API response or error information
"""
# Use the shared implementation from lume_api module
return lume_api_run(
vm_name=vm_name,
host=self.host,
port=self.port,
run_opts=run_opts,
storage=self.storage,
debug=debug,
verbose=self.verbose
)
def _lume_api_stop(self, vm_name: str, debug: bool = False) -> Dict[str, Any]:
"""Stop a VM using shared lume_api function.
Args:
vm_name: Name of the VM to stop
debug: Whether to show debug output
Returns:
Dictionary with API response or error information
"""
# Use the shared implementation from lume_api module
return lume_api_stop(
vm_name=vm_name,
host=self.host,
port=self.port,
storage=self.storage,
debug=debug,
verbose=self.verbose
)
def _lume_api_update(self, vm_name: str, update_opts: Dict[str, Any], debug: bool = False) -> Dict[str, Any]:
"""Update VM configuration using shared lume_api function.
Args:
vm_name: Name of the VM to update
update_opts: Dictionary of update options
debug: Whether to show debug output
Returns:
Dictionary with API response or error information
"""
# Use the shared implementation from lume_api module
return lume_api_update(
vm_name=vm_name,
host=self.host,
port=self.port,
update_opts=update_opts,
storage=self.storage,
debug=debug,
verbose=self.verbose
)
async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Get VM information by name.
Args:
name: Name of the VM to get information for
storage: Optional storage path override. If provided, this will be used
instead of the provider's default storage path.
Returns:
Dictionary with VM information including status, IP address, etc.
Note:
If storage is not provided, the provider's default storage path will be used.
The storage parameter allows overriding the storage location for this specific call.
"""
if not HAS_CURL:
logger.error("curl is not available. Cannot get VM status.")
return {
"name": name,
"status": "unavailable",
"error": "curl is not available"
}
# First try to get detailed VM info from the API
try:
# Query the Lume API for VM status using the provider's storage_path
vm_info = self._lume_api_get(
vm_name=name,
storage=storage if storage is not None else self.storage,
debug=self.verbose
)
# Check for API errors
if "error" in vm_info:
logger.debug(f"API request error: {vm_info['error']}")
# If we got an error from the API, report the VM as not ready yet
return {
"name": name,
"status": "starting", # VM is still starting - do not attempt to connect yet
"api_status": "error",
"error": vm_info["error"]
}
# Process the VM status information
vm_status = vm_info.get("status", "unknown")
# Check if VM is stopped or not running - don't wait for IP in this case
if vm_status == "stopped":
logger.info(f"VM {name} is in '{vm_status}' state - not waiting for IP address")
# Return the status as-is without waiting for an IP
result = {
"name": name,
"status": vm_status,
**vm_info # Include all original fields from the API response
}
return result
# Handle field name differences between APIs
# Some APIs use camelCase, others use snake_case
if "vncUrl" in vm_info:
vnc_url = vm_info["vncUrl"]
elif "vnc_url" in vm_info:
vnc_url = vm_info["vnc_url"]
else:
vnc_url = ""
if "ipAddress" in vm_info:
ip_address = vm_info["ipAddress"]
elif "ip_address" in vm_info:
ip_address = vm_info["ip_address"]
else:
# If no IP address is provided and VM is supposed to be running,
# report it as still starting
ip_address = None
logger.info(f"VM {name} is in '{vm_status}' state but no IP address found - reporting as still starting")
logger.info(f"VM {name} status: {vm_status}")
# Return the complete status information
result = {
"name": name,
"status": vm_status if vm_status else "running",
"ip_address": ip_address,
"vnc_url": vnc_url,
"api_status": "ok"
}
# Include all original fields from the API response
if isinstance(vm_info, dict):
for key, value in vm_info.items():
if key not in result: # Don't override our carefully processed fields
result[key] = value
return result
except Exception as e:
logger.error(f"Failed to get VM status: {e}")
# Return a fallback status that indicates the VM is not ready yet
return {
"name": name,
"status": "initializing", # VM is still initializing
"error": f"Failed to get VM status: {str(e)}"
}
async def list_vms(self) -> List[Dict[str, Any]]:
"""List all available VMs."""
return await self._pylume.list_vms()
result = self._lume_api_get(debug=self.verbose)
async def run_vm(self, name: str, run_opts: Dict[str, Any]) -> Dict[str, Any]:
# Extract the VMs list from the response
if "vms" in result and isinstance(result["vms"], list):
return result["vms"]
elif "error" in result:
logger.error(f"Error listing VMs: {result['error']}")
return []
else:
return []
async def run_vm(self, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
"""Run a VM with the given options."""
# Convert dict to VMRunOpts if needed
if isinstance(run_opts, dict):
run_opts = VMRunOpts(**run_opts)
return await self._pylume.run_vm(name, run_opts)
return self._lume_api_run(name, run_opts, debug=self.verbose)
async def stop_vm(self, name: str) -> Dict[str, Any]:
async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Stop a running VM."""
return await self._pylume.stop_vm(name)
return self._lume_api_stop(name, debug=self.verbose)
async def update_vm(self, name: str, update_opts: Dict[str, Any]) -> Dict[str, Any]:
async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
"""Update VM configuration."""
# Convert dict to VMUpdateOpts if needed
if isinstance(update_opts, dict):
update_opts = VMUpdateOpts(**update_opts)
return await self._pylume.update_vm(name, update_opts)
# Pylume-specific helper methods
def get_pylume_instance(self) -> PyLume:
"""Get the underlying PyLume instance."""
return self._pylume
# Helper methods for converting between PyLume and generic types
@staticmethod
def create_vm_run_opts(**kwargs) -> VMRunOpts:
"""Create VMRunOpts from kwargs."""
return VMRunOpts(**kwargs)
@staticmethod
def create_vm_update_opts(**kwargs) -> VMUpdateOpts:
"""Create VMUpdateOpts from kwargs."""
return VMUpdateOpts(**kwargs)
return self._lume_api_update(name, update_opts, debug=self.verbose)

View File

@@ -0,0 +1,380 @@
"""Shared API utilities for Lume and Lumier providers.
This module contains shared functions for interacting with the Lume API,
used by both the LumeProvider and LumierProvider classes.
"""
import logging
import json
import subprocess
import urllib.parse
from typing import Dict, List, Optional, Any
# Setup logging
logger = logging.getLogger(__name__)
# Check if curl is available
try:
subprocess.run(["curl", "--version"], capture_output=True, check=True)
HAS_CURL = True
except (subprocess.SubprocessError, FileNotFoundError):
HAS_CURL = False
def lume_api_get(
vm_name: str,
host: str,
port: int,
storage: Optional[str] = None,
debug: bool = False,
verbose: bool = False
) -> Dict[str, Any]:
"""Use curl to get VM information from Lume API.
Args:
vm_name: Name of the VM to get info for
host: API host
port: API port
storage: Storage path for the VM
debug: Whether to show debug output
verbose: Enable verbose logging
Returns:
Dictionary with VM status information parsed from JSON response
"""
# URL encode the storage parameter for the query
encoded_storage = ""
storage_param = ""
if storage:
# First encode the storage path properly
encoded_storage = urllib.parse.quote(storage, safe='')
storage_param = f"?storage={encoded_storage}"
# Construct API URL with encoded storage parameter if needed
api_url = f"http://{host}:{port}/lume/vms/{vm_name}{storage_param}"
# Construct the curl command with increased timeouts for more reliability
# --connect-timeout: Time to establish connection (15 seconds)
# --max-time: Maximum time for the whole operation (20 seconds)
# -f: Fail silently (no output at all) on server errors
# Add single quotes around URL to ensure special characters are handled correctly
cmd = ["curl", "--connect-timeout", "15", "--max-time", "20", "-s", "-f", f"'{api_url}'"]
# For logging and display, show the properly escaped URL
display_cmd = ["curl", "--connect-timeout", "15", "--max-time", "20", "-s", "-f", api_url]
# Only print the curl command when debug is enabled
display_curl_string = ' '.join(display_cmd)
if debug or verbose:
print(f"DEBUG: Executing curl API call: {display_curl_string}")
logger.debug(f"Executing API request: {display_curl_string}")
# Execute the command - for execution we need to use shell=True to handle URLs with special characters
try:
# Use a single string with shell=True for proper URL handling
shell_cmd = ' '.join(cmd)
result = subprocess.run(shell_cmd, shell=True, capture_output=True, text=True)
# Handle curl exit codes
if result.returncode != 0:
curl_error = "Unknown error"
# Map common curl error codes to helpful messages
if result.returncode == 7:
curl_error = "Failed to connect to the API server - it might still be starting up"
elif result.returncode == 22:
curl_error = "HTTP error returned from API server"
elif result.returncode == 28:
curl_error = "Operation timeout - the API server is taking too long to respond"
elif result.returncode == 52:
curl_error = "Empty reply from server - the API server is starting but not fully ready yet"
elif result.returncode == 56:
curl_error = "Network problem during data transfer - check container networking"
# Only log at debug level to reduce noise during retries
logger.debug(f"API request failed with code {result.returncode}: {curl_error}")
# Return a more useful error message
return {
"error": f"API request failed: {curl_error}",
"curl_code": result.returncode,
"vm_name": vm_name,
"status": "unknown" # We don't know the actual status due to API error
}
# Try to parse the response as JSON
if result.stdout and result.stdout.strip():
try:
vm_status = json.loads(result.stdout)
if debug or verbose:
logger.info(f"Successfully parsed VM status: {vm_status.get('status', 'unknown')}")
return vm_status
except json.JSONDecodeError as e:
# Return the raw response if it's not valid JSON
logger.warning(f"Invalid JSON response: {e}")
if "Virtual machine not found" in result.stdout:
return {"status": "not_found", "message": "VM not found in Lume API"}
return {"error": f"Invalid JSON response: {result.stdout[:100]}...", "status": "unknown"}
else:
return {"error": "Empty response from API", "status": "unknown"}
except subprocess.SubprocessError as e:
logger.error(f"Failed to execute API request: {e}")
return {"error": f"Failed to execute API request: {str(e)}", "status": "unknown"}
def lume_api_run(
vm_name: str,
host: str,
port: int,
run_opts: Dict[str, Any],
storage: Optional[str] = None,
debug: bool = False,
verbose: bool = False
) -> Dict[str, Any]:
"""Run a VM using curl.
Args:
vm_name: Name of the VM to run
host: API host
port: API port
run_opts: Dictionary of run options
storage: Storage path for the VM
debug: Whether to show debug output
verbose: Enable verbose logging
Returns:
Dictionary with API response or error information
"""
# Construct API URL
api_url = f"http://{host}:{port}/lume/vms/{vm_name}/run"
# Prepare JSON payload with required parameters
payload = {}
# Add CPU cores if specified
if "cpu" in run_opts:
payload["cpu"] = run_opts["cpu"]
# Add memory if specified
if "memory" in run_opts:
payload["memory"] = run_opts["memory"]
# Add storage parameter if specified
if storage:
payload["storage"] = storage
elif "storage" in run_opts:
payload["storage"] = run_opts["storage"]
# Add shared directories if specified
if "shared_directories" in run_opts:
payload["sharedDirectories"] = run_opts["shared_directories"]
# Construct the curl command
cmd = [
"curl", "--connect-timeout", "30", "--max-time", "30",
"-s", "-X", "POST", "-H", "Content-Type: application/json",
"-d", json.dumps(payload),
api_url
]
# Always print the command for debugging
if debug or verbose:
print(f"DEBUG: Executing curl run API call: {' '.join(cmd)}")
print(f"Run payload: {json.dumps(payload, indent=2)}")
# Execute the command
try:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.warning(f"API request failed with code {result.returncode}: {result.stderr}")
return {"error": f"API request failed: {result.stderr}"}
# Try to parse the response as JSON
if result.stdout and result.stdout.strip():
try:
response = json.loads(result.stdout)
return response
except json.JSONDecodeError:
# Return the raw response if it's not valid JSON
return {"success": True, "message": "VM started successfully", "raw_response": result.stdout}
else:
return {"success": True, "message": "VM started successfully"}
except subprocess.SubprocessError as e:
logger.error(f"Failed to execute run request: {e}")
return {"error": f"Failed to execute run request: {str(e)}"}
def lume_api_stop(
vm_name: str,
host: str,
port: int,
storage: Optional[str] = None,
debug: bool = False,
verbose: bool = False
) -> Dict[str, Any]:
"""Stop a VM using curl.
Args:
vm_name: Name of the VM to stop
host: API host
port: API port
storage: Storage path for the VM
debug: Whether to show debug output
verbose: Enable verbose logging
Returns:
Dictionary with API response or error information
"""
# Construct API URL
api_url = f"http://{host}:{port}/lume/vms/{vm_name}/stop"
# Prepare JSON payload with required parameters
payload = {}
# Add storage path if specified
if storage:
payload["storage"] = storage
# Construct the curl command
cmd = [
"curl", "--connect-timeout", "15", "--max-time", "20",
"-s", "-X", "POST", "-H", "Content-Type: application/json",
"-d", json.dumps(payload),
api_url
]
# Execute the command
try:
if debug or verbose:
logger.info(f"Executing: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.warning(f"API request failed with code {result.returncode}: {result.stderr}")
return {"error": f"API request failed: {result.stderr}"}
# Try to parse the response as JSON
if result.stdout and result.stdout.strip():
try:
response = json.loads(result.stdout)
return response
except json.JSONDecodeError:
# Return the raw response if it's not valid JSON
return {"success": True, "message": "VM stopped successfully", "raw_response": result.stdout}
else:
return {"success": True, "message": "VM stopped successfully"}
except subprocess.SubprocessError as e:
logger.error(f"Failed to execute stop request: {e}")
return {"error": f"Failed to execute stop request: {str(e)}"}
def lume_api_update(
vm_name: str,
host: str,
port: int,
update_opts: Dict[str, Any],
storage: Optional[str] = None,
debug: bool = False,
verbose: bool = False
) -> Dict[str, Any]:
"""Update VM settings using curl.
Args:
vm_name: Name of the VM to update
host: API host
port: API port
update_opts: Dictionary of update options
storage: Storage path for the VM
debug: Whether to show debug output
verbose: Enable verbose logging
Returns:
Dictionary with API response or error information
"""
# Construct API URL
api_url = f"http://{host}:{port}/lume/vms/{vm_name}/update"
# Prepare JSON payload with required parameters
payload = {}
# Add CPU cores if specified
if "cpu" in update_opts:
payload["cpu"] = update_opts["cpu"]
# Add memory if specified
if "memory" in update_opts:
payload["memory"] = update_opts["memory"]
# Add storage path if specified
if storage:
payload["storage"] = storage
# Construct the curl command
cmd = [
"curl", "--connect-timeout", "15", "--max-time", "20",
"-s", "-X", "POST", "-H", "Content-Type: application/json",
"-d", json.dumps(payload),
api_url
]
# Execute the command
try:
if debug:
logger.info(f"Executing: {' '.join(cmd)}")
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
logger.warning(f"API request failed with code {result.returncode}: {result.stderr}")
return {"error": f"API request failed: {result.stderr}"}
# Try to parse the response as JSON
if result.stdout and result.stdout.strip():
try:
response = json.loads(result.stdout)
return response
except json.JSONDecodeError:
# Return the raw response if it's not valid JSON
return {"success": True, "message": "VM updated successfully", "raw_response": result.stdout}
else:
return {"success": True, "message": "VM updated successfully"}
except subprocess.SubprocessError as e:
logger.error(f"Failed to execute update request: {e}")
return {"error": f"Failed to execute update request: {str(e)}"}
def parse_memory(memory_str: str) -> int:
"""Parse memory string to MB integer.
Examples:
"8GB" -> 8192
"1024MB" -> 1024
"512" -> 512
Returns:
Memory value in MB
"""
if isinstance(memory_str, int):
return memory_str
if isinstance(memory_str, str):
# Extract number and unit
import re
match = re.match(r"(\d+)([A-Za-z]*)", memory_str)
if match:
value, unit = match.groups()
value = int(value)
unit = unit.upper()
if unit == "GB" or unit == "G":
return value * 1024
elif unit == "MB" or unit == "M" or unit == "":
return value
# Default fallback
logger.warning(f"Could not parse memory string '{memory_str}', using 8GB default")
return 8192 # Default to 8GB

View File

@@ -0,0 +1,8 @@
"""Lumier VM provider implementation."""
try:
# Use the same import approach as in the Lume provider
from .provider import LumierProvider
HAS_LUMIER = True
except ImportError:
HAS_LUMIER = False

View File

@@ -0,0 +1,724 @@
"""
Lumier VM provider implementation.
This provider uses Docker containers running the Lumier image to create
macOS and Linux VMs. It handles VM lifecycle operations through Docker
commands and container management.
"""
import logging
import os
import json
import asyncio
from typing import Dict, List, Optional, Any
import subprocess
import time
import re
from ..base import BaseVMProvider, VMProviderType
from ..lume_api import (
lume_api_get,
lume_api_run,
lume_api_stop,
lume_api_update
)
# Setup logging
logger = logging.getLogger(__name__)
# Check if Docker is available
try:
subprocess.run(["docker", "--version"], capture_output=True, check=True)
HAS_LUMIER = True
except (subprocess.SubprocessError, FileNotFoundError):
HAS_LUMIER = False
class LumierProvider(BaseVMProvider):
"""
Lumier VM Provider implementation using Docker containers.
This provider uses Docker to run Lumier containers that can create
macOS and Linux VMs through containerization.
"""
def __init__(
self,
port: Optional[int] = 3000,
host: str = "localhost",
storage: Optional[str] = None,
shared_path: Optional[str] = None,
image: str = "macos-sequoia-cua:latest", # VM image to use
verbose: bool = False,
ephemeral: bool = False,
noVNC_port: Optional[int] = 8006,
):
"""Initialize the Lumier VM Provider.
Args:
port: Port for the API server (default: 3000)
host: Hostname for the API server (default: localhost)
storage: Path for persistent VM storage
shared_path: Path for shared folder between host and VM
image: VM image to use (e.g. "macos-sequoia-cua:latest")
verbose: Enable verbose logging
ephemeral: Use ephemeral (temporary) storage
noVNC_port: Specific port for noVNC interface (default: 8006)
"""
self.host = host
# Always ensure api_port has a valid value (3000 is the default)
self.api_port = 3000 if port is None else port
self.vnc_port = noVNC_port # User-specified noVNC port, will be set in run_vm if provided
self.ephemeral = ephemeral
# Handle ephemeral storage (temporary directory)
if ephemeral:
self.storage = "ephemeral"
else:
self.storage = storage
self.shared_path = shared_path
self.vm_image = image # Store the VM image name to use
# The container_name will be set in run_vm using the VM name
self.verbose = verbose
self._container_id = None
self._api_url = None # Will be set after container starts
@property
def provider_type(self) -> VMProviderType:
"""Return the provider type."""
return VMProviderType.LUMIER
def _parse_memory(self, memory_str: str) -> int:
"""Parse memory string to MB integer.
Examples:
"8GB" -> 8192
"1024MB" -> 1024
"512" -> 512
"""
if isinstance(memory_str, int):
return memory_str
if isinstance(memory_str, str):
# Extract number and unit
match = re.match(r"(\d+)([A-Za-z]*)", memory_str)
if match:
value, unit = match.groups()
value = int(value)
unit = unit.upper()
if unit == "GB" or unit == "G":
return value * 1024
elif unit == "MB" or unit == "M" or unit == "":
return value
# Default fallback
logger.warning(f"Could not parse memory string '{memory_str}', using 8GB default")
return 8192 # Default to 8GB
# Helper methods for interacting with the Lumier API through curl
# These methods handle the various VM operations via API calls
def _get_curl_error_message(self, return_code: int) -> str:
"""Get a descriptive error message for curl return codes.
Args:
return_code: The curl return code
Returns:
A descriptive error message
"""
# Map common curl error codes to helpful messages
if return_code == 7:
return "Failed to connect - API server is starting up"
elif return_code == 22:
return "HTTP error returned from API server"
elif return_code == 28:
return "Operation timeout - API server is slow to respond"
elif return_code == 52:
return "Empty reply from server - API is starting but not ready"
elif return_code == 56:
return "Network problem during data transfer"
else:
return f"Unknown curl error code: {return_code}"
async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Get VM information by name.
Args:
name: Name of the VM to get information for
storage: Optional storage path override. If provided, this will be used
instead of the provider's default storage path.
Returns:
Dictionary with VM information including status, IP address, etc.
"""
if not HAS_LUMIER:
logger.error("Docker is not available. Cannot get VM status.")
return {
"name": name,
"status": "unavailable",
"error": "Docker is not available"
}
# Store the current name for API requests
self.container_name = name
try:
# Check if the container exists and is running
check_cmd = ["docker", "ps", "-a", "--filter", f"name={name}", "--format", "{{.Status}}"]
check_result = subprocess.run(check_cmd, capture_output=True, text=True)
container_status = check_result.stdout.strip()
if not container_status:
logger.info(f"Container {name} does not exist. Will create when run_vm is called.")
return {
"name": name,
"status": "not_found",
"message": "Container doesn't exist yet"
}
# Container exists, check if it's running
is_running = container_status.startswith("Up")
if not is_running:
logger.info(f"Container {name} exists but is not running. Status: {container_status}")
return {
"name": name,
"status": "stopped",
"container_status": container_status,
}
# Container is running, get the IP address and API status from Lumier API
logger.info(f"Container {name} is running. Getting VM status from API.")
# Use the shared lume_api_get function directly
vm_info = lume_api_get(
vm_name=name,
host=self.host,
port=self.api_port,
storage=storage if storage is not None else self.storage,
debug=self.verbose,
verbose=self.verbose
)
# Check for API errors
if "error" in vm_info:
# Use debug level instead of warning to reduce log noise during polling
logger.debug(f"API request error: {vm_info['error']}")
return {
"name": name,
"status": "running", # Container is running even if API is not responsive
"api_status": "error",
"error": vm_info["error"],
"container_status": container_status
}
# Process the VM status information
vm_status = vm_info.get("status", "unknown")
vnc_url = vm_info.get("vncUrl", "")
ip_address = vm_info.get("ipAddress", "")
# IMPORTANT: Always ensure we have a valid IP address for connectivity
# If the API doesn't return an IP address, default to localhost (127.0.0.1)
# This makes the behavior consistent with LumeProvider
if not ip_address and vm_status == "running":
ip_address = "127.0.0.1"
logger.info(f"No IP address returned from API, defaulting to {ip_address}")
vm_info["ipAddress"] = ip_address
logger.info(f"VM {name} status: {vm_status}")
if ip_address and vnc_url:
logger.info(f"VM {name} has IP: {ip_address} and VNC URL: {vnc_url}")
elif not ip_address and not vnc_url and vm_status != "running":
# Not running is expected in this case
logger.info(f"VM {name} is not running yet. Status: {vm_status}")
else:
# Missing IP or VNC but status is running - this is unusual but handled with default IP
logger.warning(f"VM {name} is running but missing expected fields. API response: {vm_info}")
# Return the full status information
return {
"name": name,
"status": vm_status,
"ip_address": ip_address,
"vnc_url": vnc_url,
"api_status": "ok",
"container_status": container_status,
**vm_info # Include all fields from the API response
}
except subprocess.SubprocessError as e:
logger.error(f"Failed to check container status: {e}")
return {
"name": name,
"status": "error",
"error": f"Failed to check container status: {str(e)}"
}
async def list_vms(self) -> List[Dict[str, Any]]:
"""List all VMs managed by this provider.
For Lumier provider, there is only one VM per container.
"""
try:
status = await self.get_vm("default")
return [status] if status.get("status") != "unknown" else []
except Exception as e:
logger.error(f"Failed to list VMs: {e}")
return []
async def run_vm(self, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
"""Run a VM with the given options.
Args:
name: Name of the VM to run (used for the container name and Docker image tag)
run_opts: Options for running the VM, including:
- cpu: Number of CPU cores
- memory: Amount of memory (e.g. "8GB")
- noVNC_port: Specific port for noVNC interface
Returns:
Dictionary with VM status information
"""
# Set the container name using the VM name for consistency
self.container_name = name or "lumier1-vm"
try:
# First, check if container already exists and remove it
try:
check_cmd = ["docker", "ps", "-a", "--filter", f"name={self.container_name}", "--format", "{{.ID}}"]
check_result = subprocess.run(check_cmd, capture_output=True, text=True)
existing_container = check_result.stdout.strip()
if existing_container:
logger.info(f"Removing existing container: {self.container_name}")
remove_cmd = ["docker", "rm", "-f", self.container_name]
subprocess.run(remove_cmd, check=True)
except subprocess.CalledProcessError as e:
logger.warning(f"Error removing existing container: {e}")
# Continue anyway, next steps will fail if there's a real problem
# Prepare the Docker run command
cmd = ["docker", "run", "-d", "--name", self.container_name]
cmd.extend(["-p", f"{self.vnc_port}:8006"])
print(f"Using specified noVNC_port: {self.vnc_port}")
# Set API URL using the API port
self._api_url = f"http://{self.host}:{self.api_port}"
# Parse memory setting
memory_mb = self._parse_memory(run_opts.get("memory", "8GB"))
# Add storage volume mount if storage is specified (for persistent VM storage)
if self.storage and self.storage != "ephemeral":
# Create storage directory if it doesn't exist
storage_dir = os.path.abspath(os.path.expanduser(self.storage or ""))
os.makedirs(storage_dir, exist_ok=True)
# Add volume mount for storage
cmd.extend([
"-v", f"{storage_dir}:/storage",
"-e", f"HOST_STORAGE_PATH={storage_dir}"
])
print(f"Using persistent storage at: {storage_dir}")
# Add shared folder volume mount if shared_path is specified
if self.shared_path:
# Create shared directory if it doesn't exist
shared_dir = os.path.abspath(os.path.expanduser(self.shared_path or ""))
os.makedirs(shared_dir, exist_ok=True)
# Add volume mount for shared folder
cmd.extend([
"-v", f"{shared_dir}:/shared",
"-e", f"HOST_SHARED_PATH={shared_dir}"
])
print(f"Using shared folder at: {shared_dir}")
# Add environment variables
# Always use the container_name as the VM_NAME for consistency
# Use the VM image passed from the Computer class
print(f"Using VM image: {self.vm_image}")
cmd.extend([
"-e", f"VM_NAME={self.container_name}",
"-e", f"VERSION=ghcr.io/trycua/{self.vm_image}",
"-e", f"CPU_CORES={run_opts.get('cpu', '4')}",
"-e", f"RAM_SIZE={memory_mb}",
])
# Specify the Lumier image with the full image name
lumier_image = "trycua/lumier:latest"
# First check if the image exists locally
try:
print(f"Checking if Docker image {lumier_image} exists locally...")
check_image_cmd = ["docker", "image", "inspect", lumier_image]
subprocess.run(check_image_cmd, capture_output=True, check=True)
print(f"Docker image {lumier_image} found locally.")
except subprocess.CalledProcessError:
# Image doesn't exist locally
print(f"\nWARNING: Docker image {lumier_image} not found locally.")
print("The system will attempt to pull it from Docker Hub, which may fail if you have network connectivity issues.")
print("If the Docker pull fails, you may need to manually pull the image first with:")
print(f" docker pull {lumier_image}\n")
# Add the image to the command
cmd.append(lumier_image)
# Print the Docker command for debugging
print(f"DOCKER COMMAND: {' '.join(cmd)}")
# Run the container with improved error handling
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
except subprocess.CalledProcessError as e:
if "no route to host" in str(e.stderr).lower() or "failed to resolve reference" in str(e.stderr).lower():
error_msg = (f"Network error while trying to pull Docker image '{lumier_image}'\n"
f"Error: {e.stderr}\n\n"
f"SOLUTION: Please try one of the following:\n"
f"1. Check your internet connection\n"
f"2. Pull the image manually with: docker pull {lumier_image}\n"
f"3. Check if Docker is running properly\n")
logger.error(error_msg)
raise RuntimeError(error_msg)
raise
# Container started, now check VM status with polling
print("Container started, checking VM status...")
print("NOTE: This may take some time while the VM image is being pulled and initialized")
print("TIP: You can run 'lume logs -f' in another terminal to see the detailed initialization progress")
# Skip waiting for container readiness and just poll get_vm directly
# Poll the get_vm method indefinitely until the VM is ready with an IP address
attempt = 0
consecutive_errors = 0
vm_running = False
while True: # Wait indefinitely
try:
# Use longer delays to give the system time to initialize
if attempt > 0:
# Start with 5s delay, then increase gradually up to 30s for later attempts
# But use shorter delays while we're getting API errors
if consecutive_errors > 0 and consecutive_errors < 5:
wait_time = 3 # Use shorter delays when we're getting API errors
else:
wait_time = min(30, 5 + (attempt * 2))
print(f"Waiting {wait_time}s before retry #{attempt+1}...")
await asyncio.sleep(wait_time)
# Try to get VM status
print(f"Checking VM status (attempt {attempt+1})...")
vm_status = await self.get_vm(name)
# Check for API errors
if 'error' in vm_status:
consecutive_errors += 1
error_msg = vm_status.get('error', 'Unknown error')
# Only print a user-friendly status message, not the raw error
# since _lume_api_get already logged the technical details
if consecutive_errors == 1 or attempt % 5 == 0:
if 'Empty reply from server' in error_msg:
print("API server is starting up - container is running, but API isn't fully initialized yet.")
print("This is expected during the initial VM setup - will continue polling...")
else:
# Don't repeat the exact same error message each time
logger.debug(f"API request error (attempt {attempt+1}): {error_msg}")
# Just log that we're still working on it
if attempt > 3:
print("Still waiting for the API server to become available...")
# If we're getting errors but container is running, that's normal during startup
if vm_status.get('status') == 'running':
if not vm_running:
print("Container is running, waiting for the VM within it to become fully ready...")
print("This might take a minute while the VM initializes...")
vm_running = True
# Increase counter and continue
attempt += 1
continue
# Reset consecutive error counter when we get a successful response
consecutive_errors = 0
# If the VM is running, check if it has an IP address (which means it's fully ready)
if vm_status.get('status') == 'running':
vm_running = True
# Check if we have an IP address, which means the VM is fully ready
if 'ip_address' in vm_status and vm_status['ip_address']:
print(f"VM is now fully running with IP: {vm_status.get('ip_address')}")
if 'vnc_url' in vm_status and vm_status['vnc_url']:
print(f"VNC URL: {vm_status.get('vnc_url')}")
return vm_status
else:
print("VM is running but still initializing network interfaces...")
print("Waiting for IP address to be assigned...")
else:
# VM exists but might still be starting up
status = vm_status.get('status', 'unknown')
print(f"VM found but status is: {status}. Continuing to poll...")
# Increase counter for next iteration's delay calculation
attempt += 1
# If we reach a very large number of attempts, give a reassuring message but continue
if attempt % 10 == 0:
print(f"Still waiting after {attempt} attempts. This might take several minutes for first-time setup.")
if not vm_running and attempt >= 20:
print("\nNOTE: First-time VM initialization can be slow as images are downloaded.")
print("If this continues for more than 10 minutes, you may want to check:")
print(" 1. Docker logs with: docker logs " + name)
print(" 2. If your network can access container registries")
print("Press Ctrl+C to abort if needed.\n")
# After 150 attempts (likely over 30-40 minutes), return current status
if attempt >= 150:
print(f"Reached 150 polling attempts. VM status is: {vm_status.get('status', 'unknown')}")
print("Returning current VM status, but please check Docker logs if there are issues.")
return vm_status
except Exception as e:
# Always continue retrying, but with increasing delays
logger.warning(f"Error checking VM status (attempt {attempt+1}): {e}. Will retry.")
consecutive_errors += 1
# If we've had too many consecutive errors, might be a deeper problem
if consecutive_errors >= 10:
print(f"\nWARNING: Encountered {consecutive_errors} consecutive errors while checking VM status.")
print("You may need to check the Docker container logs or restart the process.")
print(f"Error details: {str(e)}\n")
# Increase attempt counter for next iteration
attempt += 1
# After many consecutive errors, add a delay to avoid hammering the system
if attempt > 5:
error_delay = min(30, 10 + attempt)
print(f"Multiple connection errors, waiting {error_delay}s before next attempt...")
await asyncio.sleep(error_delay)
except subprocess.CalledProcessError as e:
error_msg = f"Failed to start Lumier container: {e.stderr if hasattr(e, 'stderr') else str(e)}"
logger.error(error_msg)
raise RuntimeError(error_msg)
async def _wait_for_container_ready(self, container_name: str, timeout: int = 90) -> bool:
"""Wait for the Lumier container to be fully ready with a valid API response.
Args:
container_name: Name of the Docker container to check
timeout: Maximum time to wait in seconds (default: 90 seconds)
Returns:
True if the container is running, even if API is not fully ready.
This allows operations to continue with appropriate fallbacks.
"""
start_time = time.time()
api_ready = False
container_running = False
print(f"Waiting for container {container_name} to be ready (timeout: {timeout}s)...")
while time.time() - start_time < timeout:
# Check if container is running
try:
check_cmd = ["docker", "ps", "--filter", f"name={container_name}", "--format", "{{.Status}}"]
result = subprocess.run(check_cmd, capture_output=True, text=True, check=True)
container_status = result.stdout.strip()
if container_status and container_status.startswith("Up"):
container_running = True
print(f"Container {container_name} is running")
logger.info(f"Container {container_name} is running with status: {container_status}")
else:
logger.warning(f"Container {container_name} not yet running, status: {container_status}")
# container is not running yet, wait and try again
await asyncio.sleep(2) # Longer sleep to give Docker time
continue
except subprocess.CalledProcessError as e:
logger.warning(f"Error checking container status: {e}")
await asyncio.sleep(2)
continue
# Container is running, check if API is responsive
try:
# First check the health endpoint
api_url = f"http://{self.host}:{self.api_port}/health"
logger.info(f"Checking API health at: {api_url}")
# Use longer timeout for API health check since it may still be initializing
curl_cmd = ["curl", "-s", "--connect-timeout", "5", "--max-time", "10", api_url]
result = subprocess.run(curl_cmd, capture_output=True, text=True)
if result.returncode == 0 and "ok" in result.stdout.lower():
api_ready = True
print(f"API is ready at {api_url}")
logger.info(f"API is ready at {api_url}")
break
else:
# API health check failed, now let's check if the VM status endpoint is responsive
# This covers cases where the health endpoint isn't implemented but the VM API is working
vm_api_url = f"http://{self.host}:{self.api_port}/lume/vms/{container_name}"
if self.storage:
import urllib.parse
encoded_storage = urllib.parse.quote_plus(self.storage)
vm_api_url += f"?storage={encoded_storage}"
curl_vm_cmd = ["curl", "-s", "--connect-timeout", "5", "--max-time", "10", vm_api_url]
vm_result = subprocess.run(curl_vm_cmd, capture_output=True, text=True)
if vm_result.returncode == 0 and vm_result.stdout.strip():
# VM API responded with something - consider the API ready
api_ready = True
print(f"VM API is ready at {vm_api_url}")
logger.info(f"VM API is ready at {vm_api_url}")
break
else:
curl_code = result.returncode
if curl_code == 0:
curl_code = vm_result.returncode
# Map common curl error codes to helpful messages
if curl_code == 7:
curl_error = "Failed to connect - API server is starting up"
elif curl_code == 22:
curl_error = "HTTP error returned from API server"
elif curl_code == 28:
curl_error = "Operation timeout - API server is slow to respond"
elif curl_code == 52:
curl_error = "Empty reply from server - API is starting but not ready"
elif curl_code == 56:
curl_error = "Network problem during data transfer"
else:
curl_error = f"Unknown curl error code: {curl_code}"
print(f"API not ready yet: {curl_error}")
logger.info(f"API not ready yet: {curl_error}")
except subprocess.SubprocessError as e:
logger.warning(f"Error checking API status: {e}")
# If the container is running but API is not ready, that's OK - we'll just wait
# a bit longer before checking again, as the container may still be initializing
elapsed_seconds = time.time() - start_time
if int(elapsed_seconds) % 5 == 0: # Only print status every 5 seconds to reduce verbosity
print(f"Waiting for API to initialize... ({elapsed_seconds:.1f}s / {timeout}s)")
await asyncio.sleep(3) # Longer sleep between API checks
# Handle timeout - if the container is running but API is not ready, that's not
# necessarily an error - the API might just need more time to start up
if not container_running:
print(f"Timed out waiting for container {container_name} to start")
logger.warning(f"Timed out waiting for container {container_name} to start")
return False
if not api_ready:
print(f"Container {container_name} is running, but API is not fully ready yet.")
print("Proceeding with operations. API will become available shortly.")
print("NOTE: You may see some 'API request failed' messages while the API initializes.")
logger.warning(f"Container {container_name} is running, but API is not fully ready yet.")
# Return True if container is running, even if API isn't ready yet
# This allows VM operations to proceed, with appropriate retries for API calls
return container_running
async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Stop a running VM by stopping the Lumier container."""
try:
# Use Docker commands to stop the container directly
if hasattr(self, '_container_id') and self._container_id:
logger.info(f"Stopping Lumier container: {self.container_name}")
cmd = ["docker", "stop", self.container_name]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
logger.info(f"Container stopped: {result.stdout.strip()}")
# Return minimal status info
return {
"name": name,
"status": "stopped",
"container_id": self._container_id,
}
else:
# Try to find the container by name
check_cmd = ["docker", "ps", "-a", "--filter", f"name={self.container_name}", "--format", "{{.ID}}"]
check_result = subprocess.run(check_cmd, capture_output=True, text=True)
container_id = check_result.stdout.strip()
if container_id:
logger.info(f"Found container ID: {container_id}")
cmd = ["docker", "stop", self.container_name]
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
logger.info(f"Container stopped: {result.stdout.strip()}")
return {
"name": name,
"status": "stopped",
"container_id": container_id,
}
else:
logger.warning(f"No container found with name {self.container_name}")
return {
"name": name,
"status": "unknown",
}
except subprocess.CalledProcessError as e:
error_msg = f"Failed to stop container: {e.stderr if hasattr(e, 'stderr') else str(e)}"
logger.error(error_msg)
raise RuntimeError(f"Failed to stop Lumier container: {error_msg}")
# update_vm is not implemented as it's not needed for Lumier
# The BaseVMProvider requires it, so we provide a minimal implementation
async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
"""Not implemented for Lumier provider."""
logger.warning("update_vm is not implemented for Lumier provider")
return {"name": name, "status": "unchanged"}
async def __aenter__(self):
"""Async context manager entry.
This method is called when entering an async context manager block.
Returns self to be used in the context.
"""
logger.debug("Entering LumierProvider context")
# Initialize the API URL with the default value if not already set
# This ensures get_vm can work before run_vm is called
if not hasattr(self, '_api_url') or not self._api_url:
self._api_url = f"http://{self.host}:{self.api_port}"
logger.info(f"Initialized default Lumier API URL: {self._api_url}")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit.
This method is called when exiting an async context manager block.
It handles proper cleanup of resources, including stopping any running containers.
"""
logger.debug(f"Exiting LumierProvider context, handling exceptions: {exc_type}")
try:
# If we have a container ID, we should stop it to clean up resources
if hasattr(self, '_container_id') and self._container_id:
logger.info(f"Stopping Lumier container on context exit: {self.container_name}")
try:
cmd = ["docker", "stop", self.container_name]
subprocess.run(cmd, capture_output=True, text=True, check=True)
logger.info(f"Container stopped during context exit: {self.container_name}")
except subprocess.CalledProcessError as e:
logger.warning(f"Failed to stop container during cleanup: {e.stderr}")
# Don't raise an exception here, we want to continue with cleanup
except Exception as e:
logger.error(f"Error during LumierProvider cleanup: {e}")
# We don't want to suppress the original exception if there was one
if exc_type is None:
raise
# Return False to indicate that any exception should propagate
return False

View File

@@ -18,17 +18,16 @@ class QEMUProvider(BaseVMProvider):
def __init__(
self,
bin_path: Optional[str] = None,
storage_path: Optional[str] = None,
storage: Optional[str] = None,
port: Optional[int] = None,
host: str = "localhost",
verbose: bool = False,
**kwargs
):
"""Initialize the QEMU provider.
Args:
bin_path: Optional path to the QEMU binary
storage_path: Optional path to store VM data
storage: Optional path to store VM data
port: Optional port for management
host: Host to use for connections
verbose: Enable verbose logging
@@ -36,7 +35,7 @@ class QEMUProvider(BaseVMProvider):
self._context = None
self._verbose = verbose
self._bin_path = bin_path
self._storage_path = storage_path
self._storage = storage
self._port = port
self._host = host
@@ -56,22 +55,31 @@ class QEMUProvider(BaseVMProvider):
# In a real implementation, this would clean up QEMU resources
self._context = None
async def get_vm(self, name: str) -> Dict[str, Any]:
"""Get VM information by name."""
async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Get VM information by name.
Args:
name: Name of the VM to get information for
storage: Optional storage path override. If provided, this will be used
instead of the provider's default storage path.
Returns:
Dictionary with VM information including status, IP address, etc.
"""
raise NotImplementedError("QEMU provider is not implemented yet")
async def list_vms(self) -> List[Dict[str, Any]]:
"""List all available VMs."""
raise NotImplementedError("QEMU provider is not implemented yet")
async def run_vm(self, name: str, run_opts: Dict[str, Any]) -> Dict[str, Any]:
async def run_vm(self, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
"""Run a VM with the given options."""
raise NotImplementedError("QEMU provider is not implemented yet")
async def stop_vm(self, name: str) -> Dict[str, Any]:
async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Stop a running VM."""
raise NotImplementedError("QEMU provider is not implemented yet")
async def update_vm(self, name: str, update_opts: Dict[str, Any]) -> Dict[str, Any]:
async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
"""Update VM configuration."""
raise NotImplementedError("QEMU provider is not implemented yet")

View File

@@ -24,6 +24,9 @@ requires-python = ">=3.10"
lume = [
"pylume>=0.1.8"
]
lumier = [
# No additional Python packages required - uses Docker CLI directly
]
ui = [
"gradio>=5.23.3,<6.0.0",
"python-dotenv>=1.0.1,<2.0.0",

View File

@@ -12,16 +12,55 @@ extension Server {
let vms = try vmController.list(storage: storage)
return try .json(vms)
} catch {
print(
"ERROR: Failed to list VMs: \(error.localizedDescription), storage=\(String(describing: storage))"
)
return .badRequest(message: error.localizedDescription)
}
}
func handleGetVM(name: String, storage: String? = nil) async throws -> HTTPResponse {
print("Getting VM details: name=\(name), storage=\(String(describing: storage))")
do {
let vmController = LumeController()
print("Created VM controller, attempting to get VM")
let vm = try vmController.get(name: name, storage: storage)
return try .json(vm.details)
print("Successfully retrieved VM")
// Check for nil values that might cause crashes
if vm.vmDirContext.config.macAddress == nil {
print("ERROR: VM has nil macAddress")
return .badRequest(message: "VM configuration is invalid (nil macAddress)")
}
print("MacAddress check passed")
// Log that we're about to access details
print("Preparing VM details response")
// Print the full details object for debugging
let details = vm.details
print("VM DETAILS: \(details)")
print(" name: \(details.name)")
print(" os: \(details.os)")
print(" cpuCount: \(details.cpuCount)")
print(" memorySize: \(details.memorySize)")
print(" diskSize: \(details.diskSize)")
print(" display: \(details.display)")
print(" status: \(details.status)")
print(" vncUrl: \(String(describing: details.vncUrl))")
print(" ipAddress: \(String(describing: details.ipAddress))")
print(" locationName: \(details.locationName)")
// Serialize the VM details
print("About to serialize VM details")
let response = try HTTPResponse.json(vm.details)
print("Successfully serialized VM details")
return response
} catch {
// This will catch errors from both vmController.get and the json serialization
print("ERROR: Failed to get VM details: \(error.localizedDescription)")
return .badRequest(message: error.localizedDescription)
}
}
@@ -158,15 +197,51 @@ extension Server {
}
func handleStopVM(name: String, storage: String? = nil) async throws -> HTTPResponse {
Logger.info(
"Stopping VM", metadata: ["name": name, "storage": String(describing: storage)])
do {
Logger.info("Creating VM controller", metadata: ["name": name])
let vmController = LumeController()
Logger.info("Calling stopVM on controller", metadata: ["name": name])
try await vmController.stopVM(name: name, storage: storage)
Logger.info(
"VM stopped, waiting 5 seconds for locks to clear", metadata: ["name": name])
// Add a delay to ensure locks are fully released before returning
for i in 1...5 {
try? await Task.sleep(nanoseconds: 1_000_000_000)
Logger.info("Lock clearing delay", metadata: ["name": name, "seconds": "\(i)/5"])
}
// Verify the VM is really in a stopped state
Logger.info("Verifying VM is stopped", metadata: ["name": name])
let vm = try? vmController.get(name: name, storage: storage)
if let vm = vm, vm.details.status == "running" {
Logger.info(
"VM still reports as running despite stop operation",
metadata: ["name": name, "severity": "warning"])
} else {
Logger.info(
"Verification complete: VM is in stopped state", metadata: ["name": name])
}
Logger.info("Returning successful response", metadata: ["name": name])
return HTTPResponse(
statusCode: .ok,
headers: ["Content-Type": "application/json"],
body: try JSONEncoder().encode(["message": "VM stopped successfully"])
)
} catch {
Logger.error(
"Failed to stop VM",
metadata: [
"name": name,
"error": error.localizedDescription,
"storage": String(describing: storage),
])
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
@@ -176,14 +251,39 @@ extension Server {
}
func handleRunVM(name: String, body: Data?) async throws -> HTTPResponse {
let request =
body.flatMap { try? JSONDecoder().decode(RunVMRequest.self, from: $0) }
?? RunVMRequest(noDisplay: nil, sharedDirectories: nil, recoveryMode: nil, storage: nil)
Logger.info("Running VM", metadata: ["name": name])
// Log the raw body data if available
if let body = body, let bodyString = String(data: body, encoding: .utf8) {
Logger.info("Run VM raw request body", metadata: ["name": name, "body": bodyString])
} else {
Logger.info("No request body or could not decode as string", metadata: ["name": name])
}
do {
Logger.info("Creating VM controller and parsing request", metadata: ["name": name])
let request =
body.flatMap { try? JSONDecoder().decode(RunVMRequest.self, from: $0) }
?? RunVMRequest(
noDisplay: nil, sharedDirectories: nil, recoveryMode: nil, storage: nil)
Logger.info(
"Parsed request",
metadata: [
"name": name,
"noDisplay": String(describing: request.noDisplay),
"sharedDirectories": "\(request.sharedDirectories?.count ?? 0)",
"storage": String(describing: request.storage),
])
Logger.info("Parsing shared directories", metadata: ["name": name])
let dirs = try request.parse()
Logger.info(
"Successfully parsed shared directories",
metadata: ["name": name, "count": "\(dirs.count)"])
// Start VM in background
Logger.info("Starting VM in background", metadata: ["name": name])
startVM(
name: name,
noDisplay: request.noDisplay ?? false,
@@ -191,6 +291,7 @@ extension Server {
recoveryMode: request.recoveryMode ?? false,
storage: request.storage
)
Logger.info("VM start initiated in background", metadata: ["name": name])
// Return response immediately
return HTTPResponse(
@@ -203,6 +304,12 @@ extension Server {
])
)
} catch {
Logger.error(
"Failed to run VM",
metadata: [
"name": name,
"error": error.localizedDescription,
])
return HTTPResponse(
statusCode: .badRequest,
headers: ["Content-Type": "application/json"],
@@ -290,7 +397,7 @@ extension Server {
func handlePush(_ body: Data?) async throws -> HTTPResponse {
guard let body = body,
let request = try? JSONDecoder().decode(PushRequest.self, from: body)
let request = try? JSONDecoder().decode(PushRequest.self, from: body)
else {
return HTTPResponse(
statusCode: .badRequest,
@@ -311,15 +418,16 @@ extension Server {
organization: request.organization,
storage: request.storage,
chunkSizeMb: request.chunkSizeMb,
verbose: false, // Verbose typically handled by server logs
dryRun: false, // Default API behavior is likely non-dry-run
reassemble: false // Default API behavior is likely non-reassemble
verbose: false, // Verbose typically handled by server logs
dryRun: false, // Default API behavior is likely non-dry-run
reassemble: false // Default API behavior is likely non-reassemble
)
print(
"Background push completed successfully for image: \(request.imageName):\(request.tags.joined(separator: ","))"
)
Logger.info("Background push completed successfully for image: \(request.imageName):\(request.tags.joined(separator: ","))")
} catch {
Logger.error(
"Background push failed for image: \(request.imageName):\(request.tags.joined(separator: ","))",
metadata: ["error": error.localizedDescription]
print(
"Background push failed for image: \(request.imageName):\(request.tags.joined(separator: ",")) - Error: \(error.localizedDescription)"
)
}
}
@@ -520,25 +628,25 @@ extension Server {
}
// MARK: - Log Handlers
func handleGetLogs(type: String?, lines: Int?) async throws -> HTTPResponse {
do {
let logType = type?.lowercased() ?? "all"
let infoPath = "/tmp/lume_daemon.log"
let errorPath = "/tmp/lume_daemon.error.log"
let fileManager = FileManager.default
var response: [String: String] = [:]
// Function to read log files
func readLogFile(path: String) -> String? {
guard fileManager.fileExists(atPath: path) else {
return nil
}
do {
let content = try String(contentsOfFile: path, encoding: .utf8)
// If lines parameter is provided, return only the specified number of lines from the end
if let lineCount = lines {
let allLines = content.components(separatedBy: .newlines)
@@ -546,28 +654,28 @@ extension Server {
let lastLines = Array(allLines[startIndex...])
return lastLines.joined(separator: "\n")
}
return content
} catch {
return "Error reading log file: \(error.localizedDescription)"
}
}
// Get logs based on requested type
if logType == "info" || logType == "all" {
response["info"] = readLogFile(path: infoPath) ?? "Info log file not found"
}
if logType == "error" || logType == "all" {
response["error"] = readLogFile(path: errorPath) ?? "Error log file not found"
}
return try .json(response)
} catch {
return .badRequest(message: error.localizedDescription)
}
}
// MARK: - Private Helper Methods
nonisolated private func startVM(
@@ -577,10 +685,27 @@ extension Server {
recoveryMode: Bool = false,
storage: String? = nil
) {
Logger.info(
"Starting VM in detached task",
metadata: [
"name": name,
"noDisplay": "\(noDisplay)",
"recoveryMode": "\(recoveryMode)",
"storage": String(describing: storage),
])
Task.detached { @MainActor @Sendable in
Logger.info("Starting VM in background", metadata: ["name": name])
Logger.info("Background task started for VM", metadata: ["name": name])
do {
Logger.info("Creating VM controller in background task", metadata: ["name": name])
let vmController = LumeController()
Logger.info(
"Calling runVM on controller",
metadata: [
"name": name,
"noDisplay": "\(noDisplay)",
])
try await vmController.runVM(
name: name,
noDisplay: noDisplay,
@@ -588,15 +713,16 @@ extension Server {
recoveryMode: recoveryMode,
storage: storage
)
Logger.info("VM started successfully in background", metadata: ["name": name])
Logger.info("VM started successfully in background task", metadata: ["name": name])
} catch {
Logger.error(
"Failed to start VM in background",
"Failed to start VM in background task",
metadata: [
"name": name,
"error": error.localizedDescription,
])
}
}
Logger.info("Background task dispatched for VM", metadata: ["name": name])
}
}

View File

@@ -65,9 +65,16 @@ class VM {
// MARK: - VM State Management
private var isRunning: Bool {
// First check if we have an IP address
guard let ipAddress = DHCPLeaseParser.getIPAddress(forMAC: vmDirContext.config.macAddress!)
else {
// First check if we have a MAC address
guard let macAddress = vmDirContext.config.macAddress else {
Logger.info(
"Cannot check if VM is running: macAddress is nil",
metadata: ["name": vmDirContext.name])
return false
}
// Then check if we have an IP address
guard let ipAddress = DHCPLeaseParser.getIPAddress(forMAC: macAddress) else {
return false
}
@@ -78,37 +85,35 @@ class VM {
var details: VMDetails {
let isRunning: Bool = self.isRunning
let vncUrl = isRunning ? getVNCUrl() : nil
// Try to load shared directories from the session file
var sharedDirs: [SharedDirectory]? = nil
// Check if sessions file exists and load shared directories
let sessionsPath = vmDirContext.dir.sessionsPath.path
let fileExists = FileManager.default.fileExists(atPath: sessionsPath)
// Safely get disk size with fallback
let diskSizeValue: DiskSize
do {
if fileExists {
let session = try vmDirContext.dir.loadSession()
sharedDirs = session.sharedDirectories
}
diskSizeValue = try getDiskSize()
} catch {
// It's okay if we don't have a saved session
Logger.error("Failed to load session data", metadata: ["name": vmDirContext.name, "error": "\(error)"])
Logger.error(
"Failed to get disk size",
metadata: ["name": vmDirContext.name, "error": "\(error)"])
// Provide a fallback value to avoid crashing
diskSizeValue = DiskSize(allocated: 0, total: vmDirContext.config.diskSize ?? 0)
}
// Safely access MAC address
let macAddress = vmDirContext.config.macAddress
let ipAddress: String? =
isRunning && macAddress != nil ? DHCPLeaseParser.getIPAddress(forMAC: macAddress!) : nil
return VMDetails(
name: vmDirContext.name,
os: getOSType(),
cpuCount: vmDirContext.config.cpuCount ?? 0,
memorySize: vmDirContext.config.memorySize ?? 0,
diskSize: try! getDiskSize(),
diskSize: diskSizeValue,
display: vmDirContext.config.display.string,
status: isRunning ? "running" : "stopped",
vncUrl: vncUrl,
ipAddress: isRunning
? DHCPLeaseParser.getIPAddress(forMAC: vmDirContext.config.macAddress!) : nil,
locationName: vmDirContext.storage ?? "default",
sharedDirectories: sharedDirs
ipAddress: ipAddress,
locationName: vmDirContext.storage ?? "default"
)
}
@@ -118,57 +123,84 @@ class VM {
noDisplay: Bool, sharedDirectories: [SharedDirectory], mount: Path?, vncPort: Int = 0,
recoveryMode: Bool = false, usbMassStoragePaths: [Path]? = nil
) async throws {
Logger.info(
"VM.run method called",
metadata: [
"name": vmDirContext.name,
"noDisplay": "\(noDisplay)",
"recoveryMode": "\(recoveryMode)",
])
guard vmDirContext.initialized else {
Logger.error("VM not initialized", metadata: ["name": vmDirContext.name])
throw VMError.notInitialized(vmDirContext.name)
}
guard let cpuCount = vmDirContext.config.cpuCount,
let memorySize = vmDirContext.config.memorySize
else {
Logger.error("VM missing cpuCount or memorySize", metadata: ["name": vmDirContext.name])
throw VMError.notInitialized(vmDirContext.name)
}
// Try to acquire lock on config file
let fileHandle = try FileHandle(forWritingTo: vmDirContext.dir.configPath.url)
guard flock(fileHandle.fileDescriptor, LOCK_EX | LOCK_NB) == 0 else {
try? fileHandle.close()
throw VMError.alreadyRunning(vmDirContext.name)
}
Logger.info(
"Attempting to acquire lock on config file",
metadata: [
"path": vmDirContext.dir.configPath.path,
"name": vmDirContext.name,
])
var fileHandle = try FileHandle(forWritingTo: vmDirContext.dir.configPath.url)
// Keep track of shared directories for logging
if flock(fileHandle.fileDescriptor, LOCK_EX | LOCK_NB) != 0 {
try? fileHandle.close()
Logger.error(
"VM already running (failed to acquire lock)", metadata: ["name": vmDirContext.name]
)
// Try to forcibly clear the lock before giving up
Logger.info("Attempting emergency lock cleanup", metadata: ["name": vmDirContext.name])
unlockConfigFile()
// Try one more time to acquire the lock
if let retryHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url),
flock(retryHandle.fileDescriptor, LOCK_EX | LOCK_NB) == 0
{
Logger.info("Emergency lock cleanup worked", metadata: ["name": vmDirContext.name])
// Continue with a fresh file handle
try? retryHandle.close()
// Get a completely new file handle to be safe
guard let newHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url)
else {
throw VMError.internalError("Failed to open file handle after lock cleanup")
}
// Update our main file handle
fileHandle = newHandle
} else {
// If we still can't get the lock, give up
Logger.error(
"Could not acquire lock even after emergency cleanup",
metadata: ["name": vmDirContext.name])
throw VMError.alreadyRunning(vmDirContext.name)
}
}
Logger.info("Successfully acquired lock", metadata: ["name": vmDirContext.name])
Logger.info(
"Running VM with configuration",
metadata: [
"name": vmDirContext.name,
"cpuCount": "\(cpuCount)",
"memorySize": "\(memorySize)",
"diskSize": "\(vmDirContext.config.diskSize ?? 0)",
"macAddress": vmDirContext.config.macAddress ?? "none",
"sharedDirectoryCount": "\(sharedDirectories.count)",
"mount": mount?.path ?? "none",
"vncPort": "\(vncPort)",
"sharedDirectories": sharedDirectories.map { $0.string }.joined(separator: ", "),
"recoveryMode": "\(recoveryMode)",
"usbMassStorageDeviceCount": "\(usbMassStoragePaths?.count ?? 0)",
])
// Log disk paths and existence for debugging
Logger.info(
"VM disk paths",
metadata: [
"diskPath": vmDirContext.diskPath.path,
"diskExists":
"\(FileManager.default.fileExists(atPath: vmDirContext.diskPath.path))",
"nvramPath": vmDirContext.nvramPath.path,
"nvramExists":
"\(FileManager.default.fileExists(atPath: vmDirContext.nvramPath.path))",
"configPath": vmDirContext.dir.configPath.path,
"configExists":
"\(FileManager.default.fileExists(atPath: vmDirContext.dir.configPath.path))",
"locationName": vmDirContext.storage ?? "default",
])
// Create and configure the VM
do {
Logger.info(
"Creating virtualization service context", metadata: ["name": vmDirContext.name])
let config = try createVMVirtualizationServiceContext(
cpuCount: cpuCount,
memorySize: memorySize,
@@ -178,32 +210,64 @@ class VM {
recoveryMode: recoveryMode,
usbMassStoragePaths: usbMassStoragePaths
)
virtualizationService = try virtualizationServiceFactory(config)
Logger.info(
"Successfully created virtualization service context",
metadata: ["name": vmDirContext.name])
let vncInfo = try await setupSession(noDisplay: noDisplay, port: vncPort, sharedDirectories: sharedDirectories)
Logger.info("VNC info", metadata: ["vncInfo": vncInfo])
Logger.info(
"Initializing virtualization service", metadata: ["name": vmDirContext.name])
virtualizationService = try virtualizationServiceFactory(config)
Logger.info(
"Successfully initialized virtualization service",
metadata: ["name": vmDirContext.name])
Logger.info(
"Setting up VNC",
metadata: [
"name": vmDirContext.name,
"noDisplay": "\(noDisplay)",
"port": "\(vncPort)",
])
let vncInfo = try await setupSession(
noDisplay: noDisplay, port: vncPort, sharedDirectories: sharedDirectories)
Logger.info(
"VNC setup successful", metadata: ["name": vmDirContext.name, "vncInfo": vncInfo])
// Start the VM
guard let service = virtualizationService else {
Logger.error("Virtualization service is nil", metadata: ["name": vmDirContext.name])
throw VMError.internalError("Virtualization service not initialized")
}
Logger.info(
"Starting VM via virtualization service", metadata: ["name": vmDirContext.name])
try await service.start()
Logger.info("VM started successfully", metadata: ["name": vmDirContext.name])
while true {
try await Task.sleep(nanoseconds: UInt64(1e9))
}
} catch {
Logger.error(
"Failed to create/start VM",
"Failed in VM.run",
metadata: [
"error": "\(error)",
"name": vmDirContext.name,
"error": error.localizedDescription,
"errorType": "\(type(of: error))",
])
virtualizationService = nil
vncService.stop()
// Release lock
Logger.info("Releasing file lock after error", metadata: ["name": vmDirContext.name])
flock(fileHandle.fileDescriptor, LOCK_UN)
try? fileHandle.close()
// Additionally, perform our aggressive unlock to ensure no locks remain
Logger.info(
"Performing additional lock cleanup after error",
metadata: ["name": vmDirContext.name])
unlockConfigFile()
throw error
}
}
@@ -219,34 +283,55 @@ class VM {
// If we have a virtualization service, try to stop it cleanly first
if let service = virtualizationService {
do {
Logger.info(
"Stopping VM via virtualization service", metadata: ["name": vmDirContext.name])
try await service.stop()
virtualizationService = nil
vncService.stop()
Logger.info(
"VM stopped successfully via virtualization service",
metadata: ["name": vmDirContext.name])
// Try to ensure any existing locks are released
Logger.info(
"Attempting to clear any locks on config file",
metadata: ["name": vmDirContext.name])
unlockConfigFile()
return
} catch let error {
Logger.error(
"Failed to stop VM via virtualization service, falling back to process termination",
"Failed to stop VM via virtualization service",
metadata: [
"name": vmDirContext.name,
"error": "\(error)",
"error": error.localizedDescription,
])
// Fall through to process termination
}
}
// Try to open config file to get file descriptor - note that this matches with the serve process - so this is only for the command line
// Try to open config file to get file descriptor
Logger.info(
"Attempting to access config file lock",
metadata: [
"path": vmDirContext.dir.configPath.path,
"name": vmDirContext.name,
])
let fileHandle = try? FileHandle(forReadingFrom: vmDirContext.dir.configPath.url)
guard let fileHandle = fileHandle else {
Logger.error(
"Failed to open config file - VM not running", metadata: ["name": vmDirContext.name]
)
Logger.info(
"Failed to open config file - VM may not be running",
metadata: ["name": vmDirContext.name])
// Even though we couldn't open the file, try to force unlock anyway
unlockConfigFile()
throw VMError.notRunning(vmDirContext.name)
}
// Get the PID of the process holding the lock using lsof command
Logger.info(
"Finding process holding lock on config file", metadata: ["name": vmDirContext.name])
let task = Process()
task.executableURL = URL(fileURLWithPath: "/usr/sbin/lsof")
task.arguments = ["-F", "p", vmDirContext.dir.configPath.path]
@@ -263,29 +348,44 @@ class VM {
let pid = pid_t(pidString)
else {
try? fileHandle.close()
Logger.error(
"Failed to find VM process - VM not running", metadata: ["name": vmDirContext.name])
Logger.info(
"Failed to find process holding lock - VM may not be running",
metadata: ["name": vmDirContext.name])
// Even though we couldn't find the process, try to force unlock
unlockConfigFile()
throw VMError.notRunning(vmDirContext.name)
}
Logger.info(
"Found process \(pid) holding lock on config file",
metadata: ["name": vmDirContext.name])
// First try graceful shutdown with SIGINT
if kill(pid, SIGINT) == 0 {
Logger.info(
"Sent SIGINT to VM process", metadata: ["name": vmDirContext.name, "pid": "\(pid)"])
Logger.info("Sent SIGINT to VM process \(pid)", metadata: ["name": vmDirContext.name])
}
// Wait for process to stop with timeout
var attempts = 0
while attempts < 10 {
Logger.info(
"Waiting for process \(pid) to terminate (attempt \(attempts + 1)/10)",
metadata: ["name": vmDirContext.name])
try await Task.sleep(nanoseconds: 1_000_000_000)
// Check if process still exists
if kill(pid, 0) != 0 {
// Process is gone, do final cleanup
Logger.info("Process \(pid) has terminated", metadata: ["name": vmDirContext.name])
virtualizationService = nil
vncService.stop()
try? fileHandle.close()
// Force unlock the config file
unlockConfigFile()
Logger.info(
"VM stopped successfully via process termination",
metadata: ["name": vmDirContext.name])
@@ -296,8 +396,11 @@ class VM {
// If graceful shutdown failed, force kill the process
Logger.info(
"Graceful shutdown failed, forcing termination", metadata: ["name": vmDirContext.name])
"Graceful shutdown failed, forcing termination of process \(pid)",
metadata: ["name": vmDirContext.name])
if kill(pid, SIGKILL) == 0 {
Logger.info("Sent SIGKILL to process \(pid)", metadata: ["name": vmDirContext.name])
// Wait a moment for the process to be fully killed
try await Task.sleep(nanoseconds: 2_000_000_000)
@@ -306,16 +409,124 @@ class VM {
vncService.stop()
try? fileHandle.close()
// Force unlock the config file
unlockConfigFile()
Logger.info("VM forcefully stopped", metadata: ["name": vmDirContext.name])
return
}
// If we get here, something went very wrong
try? fileHandle.close()
Logger.error("Failed to stop VM", metadata: ["name": vmDirContext.name, "pid": "\(pid)"])
Logger.error(
"Failed to stop VM - could not terminate process \(pid)",
metadata: ["name": vmDirContext.name])
// As a last resort, try to force unlock
unlockConfigFile()
throw VMError.internalError("Failed to stop VM process")
}
// Helper method to forcibly clear any locks on the config file
private func unlockConfigFile() {
Logger.info(
"Forcibly clearing locks on config file",
metadata: [
"path": vmDirContext.dir.configPath.path,
"name": vmDirContext.name,
])
// First attempt: standard unlock methods
if let fileHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url) {
// Use F_GETLK and F_SETLK to check and clear locks
var lockInfo = flock()
lockInfo.l_type = Int16(F_UNLCK)
lockInfo.l_whence = Int16(SEEK_SET)
lockInfo.l_start = 0
lockInfo.l_len = 0
// Try to unlock the file using fcntl
_ = fcntl(fileHandle.fileDescriptor, F_SETLK, &lockInfo)
// Also try the regular flock method
flock(fileHandle.fileDescriptor, LOCK_UN)
try? fileHandle.close()
Logger.info("Standard unlock attempts performed", metadata: ["name": vmDirContext.name])
}
// Second attempt: try to acquire and immediately release a fresh lock
if let tempHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url) {
if flock(tempHandle.fileDescriptor, LOCK_EX | LOCK_NB) == 0 {
Logger.info(
"Successfully acquired and released lock to reset state",
metadata: ["name": vmDirContext.name])
flock(tempHandle.fileDescriptor, LOCK_UN)
} else {
Logger.info(
"Could not acquire lock for resetting - may still be locked",
metadata: ["name": vmDirContext.name])
}
try? tempHandle.close()
}
// Third attempt (most aggressive): copy the config file, remove the original, and restore
Logger.info(
"Trying aggressive method: backup and restore config file",
metadata: ["name": vmDirContext.name])
// Only proceed if the config file exists
let fileManager = FileManager.default
let configPath = vmDirContext.dir.configPath.path
let backupPath = configPath + ".backup"
if fileManager.fileExists(atPath: configPath) {
// Create a backup of the config file
if let configData = try? Data(contentsOf: URL(fileURLWithPath: configPath)) {
// Make backup
try? configData.write(to: URL(fileURLWithPath: backupPath))
// Remove the original file to clear all locks
try? fileManager.removeItem(atPath: configPath)
Logger.info(
"Removed original config file to clear locks",
metadata: ["name": vmDirContext.name])
// Wait a moment for OS to fully release resources
Thread.sleep(forTimeInterval: 0.1)
// Restore from backup
try? configData.write(to: URL(fileURLWithPath: configPath))
Logger.info(
"Restored config file from backup", metadata: ["name": vmDirContext.name])
} else {
Logger.error(
"Could not read config file content for backup",
metadata: ["name": vmDirContext.name])
}
} else {
Logger.info(
"Config file does not exist, cannot perform aggressive unlock",
metadata: ["name": vmDirContext.name])
}
// Final check
if let finalHandle = try? FileHandle(forWritingTo: vmDirContext.dir.configPath.url) {
let lockResult = flock(finalHandle.fileDescriptor, LOCK_EX | LOCK_NB)
if lockResult == 0 {
Logger.info(
"Lock successfully cleared - verified by acquiring test lock",
metadata: ["name": vmDirContext.name])
flock(finalHandle.fileDescriptor, LOCK_UN)
} else {
Logger.info(
"Lock still present after all clearing attempts",
metadata: ["name": vmDirContext.name, "severity": "warning"])
}
try? finalHandle.close()
}
}
// MARK: - Resource Management
func updateVMConfig(vmConfig: VMConfig) throws {
@@ -422,40 +633,44 @@ class VM {
guard let url = vncService.url else {
throw VMError.vncNotConfigured
}
return url
}
/// Saves the session information including shared directories to disk
private func saveSessionData(url: String, sharedDirectories: [SharedDirectory]) {
do {
let session = VNCSession(url: url, sharedDirectories: sharedDirectories.isEmpty ? nil : sharedDirectories)
let session = VNCSession(
url: url, sharedDirectories: sharedDirectories.isEmpty ? nil : sharedDirectories)
try vmDirContext.dir.saveSession(session)
Logger.info("Saved VNC session with shared directories",
metadata: [
"count": "\(sharedDirectories.count)",
"dirs": "\(sharedDirectories.map { $0.hostPath }.joined(separator: ", "))",
"sessionsPath": "\(vmDirContext.dir.sessionsPath.path)"
])
Logger.info(
"Saved VNC session with shared directories",
metadata: [
"count": "\(sharedDirectories.count)",
"dirs": "\(sharedDirectories.map { $0.hostPath }.joined(separator: ", "))",
"sessionsPath": "\(vmDirContext.dir.sessionsPath.path)",
])
} catch {
Logger.error("Failed to save VNC session", metadata: ["error": "\(error)"])
}
}
/// Main session setup method that handles VNC and persists session data
private func setupSession(noDisplay: Bool, port: Int = 0, sharedDirectories: [SharedDirectory] = []) async throws -> String {
private func setupSession(
noDisplay: Bool, port: Int = 0, sharedDirectories: [SharedDirectory] = []
) async throws -> String {
// Start the VNC service and get the URL
let url = try await startVNCService(port: port)
// Save the session data
saveSessionData(url: url, sharedDirectories: sharedDirectories)
// Open the VNC client if needed
if !noDisplay {
Logger.info("Starting VNC session")
Logger.info("Starting VNC session", metadata: ["name": vmDirContext.name])
try await vncService.openClient(url: url)
}
return url
}
@@ -599,7 +814,8 @@ class VM {
)
virtualizationService = try virtualizationServiceFactory(config)
let vncInfo = try await setupSession(noDisplay: noDisplay, port: vncPort, sharedDirectories: sharedDirectories)
let vncInfo = try await setupSession(
noDisplay: noDisplay, port: vncPort, sharedDirectories: sharedDirectories)
Logger.info("VNC info", metadata: ["vncInfo": vncInfo])
// Start the VM

View File

@@ -64,34 +64,58 @@ fi
echo "Lumier VM is starting..."
# Cleanup function to ensure VM and noVNC proxy shutdown on container stop
# Counter for signal handling
SIGNAL_COUNT=0
cleanup() {
local signal_name=$1
set +e # Don't exit on error in cleanup
echo "[cleanup] Caught signal, shutting down..."
# Check if we're in the middle of an image pull
if [[ "$PULL_IN_PROGRESS" == "1" ]]; then
echo "[cleanup] Interrupted during image pull, skipping VM stop."
else
echo "[cleanup] Stopping VM..."
stop_vm true
fi
# Increment signal counter
SIGNAL_COUNT=$((SIGNAL_COUNT + 1))
# Attempt to clean up ephemeral storage if it's in the /private/tmp directory
if [[ "$HOST_STORAGE_PATH" == "ephemeral" ]]; then
# First check if VM actually exists
VM_INFO=$(lume_get "$VM_NAME" "$HOST_STORAGE_PATH" "json" "false")
# If this is the first signal, try graceful shutdown
if [ $SIGNAL_COUNT -eq 1 ]; then
echo "[cleanup] Caught $signal_name signal, shutting down..."
# Only try VM deletion if VM exists and not in the middle of a pull
if [[ "$PULL_IN_PROGRESS" != "1" && $VM_INFO != *"Virtual machine not found"* ]]; then
echo "[cleanup] Cleaning up VM..."
lume_delete "$VM_NAME" "$HOST_STORAGE_PATH" > /dev/null 2>&1
# Check if we're in the middle of an image pull
if [[ "$PULL_IN_PROGRESS" == "1" ]]; then
echo "[cleanup] Interrupted during image pull, skipping VM stop."
else
echo "[cleanup] Stopping VM..."
stop_vm true
fi
# Attempt to clean up ephemeral storage if it's in the /private/tmp directory
if [[ "$HOST_STORAGE_PATH" == "ephemeral" ]]; then
# First check if VM actually exists
VM_INFO=$(lume_get "$VM_NAME" "$HOST_STORAGE_PATH" "json" "false")
# Only try VM deletion if VM exists and not in the middle of a pull
if [[ "$PULL_IN_PROGRESS" != "1" && $VM_INFO != *"Virtual machine not found"* ]]; then
echo "[cleanup] Cleaning up VM..."
lume_delete "$VM_NAME" "$HOST_STORAGE_PATH" > /dev/null 2>&1
fi
fi
else
# For multiple signals, force an immediate exit
echo "got $SIGNAL_COUNT SIGTERM/SIGINTs, forcefully exiting"
fi
exit 0
# If we've received multiple signals, just exit immediately
if [ $SIGNAL_COUNT -ge 3 ]; then
exit 1
fi
# Exit with success for the first signal
if [ $SIGNAL_COUNT -eq 1 ]; then
exit 0
fi
}
# Ensure we catch all typical container termination signals
trap cleanup SIGTERM SIGINT SIGHUP
trap 'cleanup SIGTERM' SIGTERM
trap 'cleanup SIGINT' SIGINT
trap 'cleanup SIGHUP' SIGHUP
# Now enable strict error handling after initialization
set -euo pipefail
@@ -116,4 +140,14 @@ if [ -n "${VNC_PORT:-}" ] && [ -n "${VNC_PASSWORD:-}" ]; then
fi
echo "Lumier is running. Press Ctrl+C to stop."
tail -f /dev/null
# Instead of tail -f /dev/null, use a wait loop that can be interrupted by signals
while true; do
# Sleep in small increments to make signal handling more responsive
sleep 1 &
wait $!
# Break the loop if we've received a signal
if [ $SIGNAL_COUNT -gt 0 ]; then
break
fi
done

View File

@@ -312,6 +312,21 @@ lume_pull() {
# Inform users how to check pull progress
echo "You can check the pull progress using: lume logs -f"
# Always print the curl command before executing
echo ""
echo "EXECUTING PULL COMMAND:"
echo "curl -X POST \\
-H \"Content-Type: application/json\" \\
-d '{
\"image\": \"$image\",
\"name\": \"$vm_name\",
\"registry\": \"$registry\",
\"organization\": \"$organization\",
\"storage\": \"$storage\"
}' \\
\"http://${api_host}:${api_port}/lume/pull\""
echo ""
# Pull image via API and capture response
local response
if [[ "${LUMIER_DEBUG:-0}" == "1" ]]; then