Merge pull request #527 from trycua/chore/remove-pylume

Remove PyLume library
This commit is contained in:
James Murdza
2025-10-28 16:57:01 -07:00
committed by GitHub
10 changed files with 0 additions and 1409 deletions

View File

@@ -1,10 +0,0 @@
[bumpversion]
current_version = 0.2.1
commit = True
tag = True
tag_name = pylume-v{new_version}
message = Bump pylume to v{new_version}
[bumpversion:file:pylume/__init__.py]
search = __version__ = "{current_version}"
replace = __version__ = "{new_version}"

View File

@@ -1,46 +0,0 @@
<div align="center">
<h1>
<div class="image-wrapper" style="display: inline-block;">
<picture>
<source media="(prefers-color-scheme: dark)" alt="logo" height="150" srcset="https://raw.githubusercontent.com/trycua/cua/main/img/logo_white.png" style="display: block; margin: auto;">
<source media="(prefers-color-scheme: light)" alt="logo" height="150" srcset="https://raw.githubusercontent.com/trycua/cua/main/img/logo_black.png" style="display: block; margin: auto;">
<img alt="Shows my svg">
</picture>
</div>
[![Python](https://img.shields.io/badge/Python-333333?logo=python&logoColor=white&labelColor=333333)](#)
[![macOS](https://img.shields.io/badge/macOS-000000?logo=apple&logoColor=F0F0F0)](#)
[![Discord](https://img.shields.io/badge/Discord-%235865F2.svg?&logo=discord&logoColor=white)](https://discord.com/invite/mVnXXpdE85)
[![PyPI](https://img.shields.io/pypi/v/pylume?color=333333)](https://pypi.org/project/pylume/)
</h1>
</div>
**pylume** is a lightweight Python library based on [lume](https://github.com/trycua/lume) to create, run and manage macOS and Linux virtual machines (VMs) natively on Apple Silicon.
```bash
pip install pylume
```
## Usage
Please refer to this [Notebook](./samples/nb.ipynb) for a quickstart. More details about the underlying API used by pylume are available [here](https://github.com/trycua/lume/docs/API-Reference.md).
## Prebuilt Images
Pre-built images are available on [ghcr.io/trycua](https://github.com/orgs/trycua/packages).
These images come pre-configured with an SSH server and auto-login enabled.
## Contributing
We welcome and greatly appreciate contributions to lume! Whether you're improving documentation, adding new features, fixing bugs, or adding new VM images, your efforts help make pylume better for everyone.
Join our [Discord community](https://discord.com/invite/mVnXXpdE85) to discuss ideas or get assistance.
## License
lume is open-sourced under the MIT License - see the [LICENSE](LICENSE) file for details.
## Stargazers over time
[![Stargazers over time](https://starchart.cc/trycua/pylume.svg?variant=adaptive)](https://starchart.cc/trycua/pylume)

View File

@@ -1,9 +0,0 @@
"""
PyLume Python SDK - A client library for managing macOS VMs with PyLume.
"""
from pylume.exceptions import *
from pylume.models import *
from pylume.pylume import *
__version__ = "0.1.0"

View File

@@ -1,59 +0,0 @@
"""
PyLume Python SDK - A client library for managing macOS VMs with PyLume.
Example:
>>> from pylume import PyLume, VMConfig
>>> client = PyLume()
>>> config = VMConfig(name="my-vm", cpu=4, memory="8GB", disk_size="64GB")
>>> client.create_vm(config)
>>> client.run_vm("my-vm")
"""
# Import exceptions then all models
from .exceptions import (
LumeConfigError,
LumeConnectionError,
LumeError,
LumeImageError,
LumeNotFoundError,
LumeServerError,
LumeTimeoutError,
LumeVMError,
)
from .models import (
CloneSpec,
ImageInfo,
ImageList,
ImageRef,
SharedDirectory,
VMConfig,
VMRunOpts,
VMStatus,
VMUpdateOpts,
)
# Import main class last to avoid circular imports
from .pylume import PyLume
__version__ = "0.2.1"
__all__ = [
"PyLume",
"VMConfig",
"VMStatus",
"VMRunOpts",
"VMUpdateOpts",
"ImageRef",
"CloneSpec",
"SharedDirectory",
"ImageList",
"ImageInfo",
"LumeError",
"LumeServerError",
"LumeConnectionError",
"LumeTimeoutError",
"LumeNotFoundError",
"LumeConfigError",
"LumeVMError",
"LumeImageError",
]

View File

@@ -1,119 +0,0 @@
import asyncio
import json
import shlex
import subprocess
from typing import Any, Dict, Optional
from .exceptions import (
LumeConfigError,
LumeConnectionError,
LumeError,
LumeNotFoundError,
LumeServerError,
LumeTimeoutError,
)
class LumeClient:
def __init__(self, base_url: str, timeout: float = 60.0, debug: bool = False):
self.base_url = base_url
self.timeout = timeout
self.debug = debug
def _log_debug(self, message: str, **kwargs) -> None:
"""Log debug information if debug mode is enabled."""
if self.debug:
print(f"DEBUG: {message}")
if kwargs:
print(json.dumps(kwargs, indent=2))
async def _run_curl(
self,
method: str,
path: str,
data: Optional[Dict[str, Any]] = None,
params: Optional[Dict[str, Any]] = None,
) -> Any:
"""Execute a curl command and return the response."""
url = f"{self.base_url}{path}"
if params:
param_str = "&".join(f"{k}={v}" for k, v in params.items())
url = f"{url}?{param_str}"
cmd = ["curl", "-X", method, "-s", "-w", "%{http_code}", "-m", str(self.timeout)]
if data is not None:
cmd.extend(["-H", "Content-Type: application/json", "-d", json.dumps(data)])
cmd.append(url)
self._log_debug(f"Running curl command: {' '.join(map(shlex.quote, cmd))}")
try:
process = await asyncio.create_subprocess_exec(
*cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise LumeConnectionError(f"Curl command failed: {stderr.decode()}")
# The last 3 characters are the status code
response = stdout.decode()
status_code = int(response[-3:])
response_body = response[:-3] # Remove status code from response
if status_code >= 400:
if status_code == 404:
raise LumeNotFoundError(f"Resource not found: {path}")
elif status_code == 400:
raise LumeConfigError(f"Invalid request: {response_body}")
elif status_code >= 500:
raise LumeServerError(f"Server error: {response_body}")
else:
raise LumeError(f"Request failed with status {status_code}: {response_body}")
return json.loads(response_body) if response_body.strip() else None
except asyncio.TimeoutError:
raise LumeTimeoutError(f"Request timed out after {self.timeout} seconds")
async def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Any:
"""Make a GET request."""
return await self._run_curl("GET", path, params=params)
async def post(
self, path: str, data: Optional[Dict[str, Any]] = None, timeout: Optional[float] = None
) -> Any:
"""Make a POST request."""
old_timeout = self.timeout
if timeout is not None:
self.timeout = timeout
try:
return await self._run_curl("POST", path, data=data)
finally:
self.timeout = old_timeout
async def patch(self, path: str, data: Dict[str, Any]) -> None:
"""Make a PATCH request."""
await self._run_curl("PATCH", path, data=data)
async def delete(self, path: str) -> None:
"""Make a DELETE request."""
await self._run_curl("DELETE", path)
def print_curl(self, method: str, path: str, data: Optional[Dict[str, Any]] = None) -> None:
"""Print equivalent curl command for debugging."""
curl_cmd = f"""curl -X {method} \\
'{self.base_url}{path}'"""
if data:
curl_cmd += f" \\\n -H 'Content-Type: application/json' \\\n -d '{json.dumps(data)}'"
print("\nEquivalent curl command:")
print(curl_cmd)
print()
async def close(self) -> None:
"""Close the client resources."""
pass # No shared resources to clean up

View File

@@ -1,54 +0,0 @@
from typing import Optional
class LumeError(Exception):
"""Base exception for all PyLume errors."""
pass
class LumeServerError(LumeError):
"""Raised when there's an error with the PyLume server."""
def __init__(
self, message: str, status_code: Optional[int] = None, response_text: Optional[str] = None
):
self.status_code = status_code
self.response_text = response_text
super().__init__(message)
class LumeConnectionError(LumeError):
"""Raised when there's an error connecting to the PyLume server."""
pass
class LumeTimeoutError(LumeError):
"""Raised when a request to the PyLume server times out."""
pass
class LumeNotFoundError(LumeError):
"""Raised when a requested resource is not found."""
pass
class LumeConfigError(LumeError):
"""Raised when there's an error with the configuration."""
pass
class LumeVMError(LumeError):
"""Raised when there's an error with a VM operation."""
pass
class LumeImageError(LumeError):
"""Raised when there's an error with an image operation."""
pass

View File

@@ -1,265 +0,0 @@
import re
from typing import Any, Dict, List, Literal, Optional
from pydantic import BaseModel, ConfigDict, Field, RootModel, computed_field, validator
class DiskInfo(BaseModel):
"""Information about disk storage allocation.
Attributes:
total: Total disk space in bytes
allocated: Currently allocated disk space in bytes
"""
total: int
allocated: int
class VMConfig(BaseModel):
"""Configuration for creating a new VM.
Note: Memory and disk sizes should be specified with units (e.g., "4GB", "64GB")
Attributes:
name: Name of the virtual machine
os: Operating system type, either "macOS" or "linux"
cpu: Number of CPU cores to allocate
memory: Amount of memory to allocate with units
disk_size: Size of the disk to create with units
display: Display resolution in format "widthxheight"
ipsw: IPSW path or 'latest' for macOS VMs, None for other OS types
"""
name: str
os: Literal["macOS", "linux"] = "macOS"
cpu: int = Field(default=2, ge=1)
memory: str = "4GB"
disk_size: str = Field(default="64GB", alias="diskSize")
display: str = "1024x768"
ipsw: Optional[str] = Field(default=None, description="IPSW path or 'latest', for macOS VMs")
class Config:
populate_by_alias = True
class SharedDirectory(BaseModel):
"""Configuration for a shared directory.
Attributes:
host_path: Path to the directory on the host system
read_only: Whether the directory should be mounted as read-only
"""
host_path: str = Field(..., alias="hostPath") # Allow host_path but serialize as hostPath
read_only: bool = False
class Config:
populate_by_name = True # Allow both alias and original name
alias_generator = lambda s: "".join(
word.capitalize() if i else word for i, word in enumerate(s.split("_"))
)
class VMRunOpts(BaseModel):
"""Configuration for running a VM.
Args:
no_display: Whether to not display the VNC client
shared_directories: List of directories to share with the VM
"""
no_display: bool = Field(default=False, alias="noDisplay")
shared_directories: Optional[list[SharedDirectory]] = Field(
default=None, alias="sharedDirectories"
)
model_config = ConfigDict(
populate_by_name=True,
alias_generator=lambda s: "".join(
word.capitalize() if i else word for i, word in enumerate(s.split("_"))
),
)
def model_dump(self, **kwargs):
"""Export model data with proper field name conversion.
Converts shared directory fields to match API expectations when using aliases.
Args:
**kwargs: Keyword arguments passed to parent model_dump method
Returns:
dict: Model data with properly formatted field names
"""
data = super().model_dump(**kwargs)
# Convert shared directory fields to match API expectations
if self.shared_directories and "by_alias" in kwargs and kwargs["by_alias"]:
data["sharedDirectories"] = [
{"hostPath": d.host_path, "readOnly": d.read_only} for d in self.shared_directories
]
# Remove the snake_case version if it exists
data.pop("shared_directories", None)
return data
class VMStatus(BaseModel):
"""Status information for a virtual machine.
Attributes:
name: Name of the virtual machine
status: Current status of the VM
os: Operating system type
cpu_count: Number of CPU cores allocated
memory_size: Amount of memory allocated in bytes
disk_size: Disk storage information
vnc_url: URL for VNC connection if available
ip_address: IP address of the VM if available
"""
name: str
status: str
os: Literal["macOS", "linux"]
cpu_count: int = Field(alias="cpuCount")
memory_size: int = Field(alias="memorySize") # API returns memory size in bytes
disk_size: DiskInfo = Field(alias="diskSize")
vnc_url: Optional[str] = Field(default=None, alias="vncUrl")
ip_address: Optional[str] = Field(default=None, alias="ipAddress")
class Config:
populate_by_alias = True
@computed_field
@property
def state(self) -> str:
"""Get the current state of the VM.
Returns:
str: Current VM status
"""
return self.status
@computed_field
@property
def cpu(self) -> int:
"""Get the number of CPU cores.
Returns:
int: Number of CPU cores allocated to the VM
"""
return self.cpu_count
@computed_field
@property
def memory(self) -> str:
"""Get memory allocation in human-readable format.
Returns:
str: Memory size formatted as "{size}GB"
"""
# Convert bytes to GB
gb = self.memory_size / (1024 * 1024 * 1024)
return f"{int(gb)}GB"
class VMUpdateOpts(BaseModel):
"""Options for updating VM configuration.
Attributes:
cpu: Number of CPU cores to update to
memory: Amount of memory to update to with units
disk_size: Size of disk to update to with units
"""
cpu: Optional[int] = None
memory: Optional[str] = None
disk_size: Optional[str] = None
class ImageRef(BaseModel):
"""Reference to a VM image.
Attributes:
image: Name of the image
tag: Tag version of the image
registry: Registry hostname where image is stored
organization: Organization or namespace in the registry
"""
image: str
tag: str = "latest"
registry: Optional[str] = "ghcr.io"
organization: Optional[str] = "trycua"
def model_dump(self, **kwargs):
"""Override model_dump to return just the image:tag format.
Args:
**kwargs: Keyword arguments (ignored)
Returns:
str: Image reference in "image:tag" format
"""
return f"{self.image}:{self.tag}"
class CloneSpec(BaseModel):
"""Specification for cloning a VM.
Attributes:
name: Name of the source VM to clone
new_name: Name for the new cloned VM
"""
name: str
new_name: str = Field(alias="newName")
class Config:
populate_by_alias = True
class ImageInfo(BaseModel):
"""Model for individual image information.
Attributes:
imageId: Unique identifier for the image
"""
imageId: str
class ImageList(RootModel):
"""Response model for the images endpoint.
A list-like container for ImageInfo objects that provides
iteration and indexing capabilities.
"""
root: List[ImageInfo]
def __iter__(self):
"""Iterate over the image list.
Returns:
Iterator over ImageInfo objects
"""
return iter(self.root)
def __getitem__(self, item):
"""Get an item from the image list by index.
Args:
item: Index or slice to retrieve
Returns:
ImageInfo or list of ImageInfo objects
"""
return self.root[item]
def __len__(self):
"""Get the number of images in the list.
Returns:
int: Number of images in the list
"""
return len(self.root)

View File

@@ -1,315 +0,0 @@
import asyncio
import json
import os
import re
import signal
import subprocess
import sys
import time
from functools import wraps
from typing import Any, Callable, List, Optional, TypeVar, Union
from .client import LumeClient
from .exceptions import (
LumeConfigError,
LumeConnectionError,
LumeError,
LumeImageError,
LumeNotFoundError,
LumeServerError,
LumeTimeoutError,
LumeVMError,
)
from .models import (
CloneSpec,
ImageList,
ImageRef,
SharedDirectory,
VMConfig,
VMRunOpts,
VMStatus,
VMUpdateOpts,
)
from .server import LumeServer
# Type variable for the decorator
T = TypeVar("T")
def ensure_server(func: Callable[..., T]) -> Callable[..., T]:
"""Decorator to ensure server is running before executing the method."""
@wraps(func)
async def wrapper(self: "PyLume", *args: Any, **kwargs: Any) -> T:
# ensure_running is an async method, so we need to await it
await self.server.ensure_running()
# Initialize client if needed
await self._init_client()
return await func(self, *args, **kwargs) # type: ignore
return wrapper # type: ignore
class PyLume:
def __init__(
self,
debug: bool = False,
server_start_timeout: int = 60,
port: Optional[int] = None,
use_existing_server: bool = False,
host: str = "localhost",
):
"""Initialize the async PyLume client.
Args:
debug: Enable debug logging
auto_start_server: Whether to automatically start the lume server if not running
server_start_timeout: Timeout in seconds to wait for server to start
port: Port number for the lume server. Required when use_existing_server is True.
use_existing_server: If True, will try to connect to an existing server on the specified port
instead of starting a new one.
host: Host to use for connections (e.g., "localhost", "127.0.0.1", "host.docker.internal")
"""
if use_existing_server and port is None:
raise LumeConfigError("Port must be specified when using an existing server")
self.server = LumeServer(
debug=debug,
server_start_timeout=server_start_timeout,
port=port,
use_existing_server=use_existing_server,
host=host,
)
self.client = None
async def __aenter__(self) -> "PyLume":
"""Async context manager entry."""
if self.server.use_existing_server:
# Just ensure base_url is set for existing server
if self.server.requested_port is None:
raise LumeConfigError("Port must be specified when using an existing server")
if not self.server.base_url:
self.server.port = self.server.requested_port
self.server.base_url = f"http://{self.server.host}:{self.server.port}/lume"
# Ensure the server is running (will connect to existing or start new as needed)
await self.server.ensure_running()
# Initialize the client
await self._init_client()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit."""
if self.client is not None:
await self.client.close()
await self.server.stop()
async def _init_client(self) -> None:
"""Initialize the client if not already initialized."""
if self.client is None:
if self.server.base_url is None:
raise RuntimeError("Server base URL not set")
self.client = LumeClient(self.server.base_url, debug=self.server.debug)
def _log_debug(self, message: str, **kwargs) -> None:
"""Log debug information if debug mode is enabled."""
if self.server.debug:
print(f"DEBUG: {message}")
if kwargs:
print(json.dumps(kwargs, indent=2))
async def _handle_api_error(self, e: Exception, operation: str) -> None:
"""Handle API errors and raise appropriate custom exceptions."""
if isinstance(e, subprocess.SubprocessError):
raise LumeConnectionError(f"Failed to connect to PyLume server: {str(e)}")
elif isinstance(e, asyncio.TimeoutError):
raise LumeTimeoutError(f"Request timed out: {str(e)}")
if not hasattr(e, "status") and not isinstance(e, subprocess.CalledProcessError):
raise LumeServerError(f"Unknown error during {operation}: {str(e)}")
status_code = getattr(e, "status", 500)
response_text = str(e)
self._log_debug(
f"{operation} request failed", status_code=status_code, response_text=response_text
)
if status_code == 404:
raise LumeNotFoundError(f"Resource not found during {operation}")
elif status_code == 400:
raise LumeConfigError(f"Invalid configuration for {operation}: {response_text}")
elif status_code >= 500:
raise LumeServerError(
f"Server error during {operation}",
status_code=status_code,
response_text=response_text,
)
else:
raise LumeServerError(
f"Error during {operation}", status_code=status_code, response_text=response_text
)
async def _read_output(self) -> None:
"""Read and log server output."""
try:
while True:
if not self.server.server_process or self.server.server_process.poll() is not None:
self._log_debug("Server process ended")
break
# Read stdout without blocking
if self.server.server_process.stdout:
while True:
line = self.server.server_process.stdout.readline()
if not line:
break
line = line.strip()
self._log_debug(f"Server stdout: {line}")
if "Server started" in line.decode("utf-8"):
self._log_debug("Detected server started message")
return
# Read stderr without blocking
if self.server.server_process.stderr:
while True:
line = self.server.server_process.stderr.readline()
if not line:
break
line = line.strip()
self._log_debug(f"Server stderr: {line}")
if "error" in line.decode("utf-8").lower():
raise RuntimeError(f"Server error: {line}")
await asyncio.sleep(0.1) # Small delay to prevent CPU spinning
except Exception as e:
self._log_debug(f"Error in output reader: {str(e)}")
raise
@ensure_server
async def create_vm(self, spec: Union[VMConfig, dict]) -> None:
"""Create a VM with the given configuration."""
# Ensure client is initialized
await self._init_client()
if isinstance(spec, VMConfig):
spec = spec.model_dump(by_alias=True, exclude_none=True)
# Suppress optional attribute access errors
self.client.print_curl("POST", "/vms", spec) # type: ignore[attr-defined]
await self.client.post("/vms", spec) # type: ignore[attr-defined]
@ensure_server
async def run_vm(self, name: str, opts: Optional[Union[VMRunOpts, dict]] = None) -> None:
"""Run a VM."""
if opts is None:
opts = VMRunOpts(no_display=False) # type: ignore[attr-defined]
elif isinstance(opts, dict):
opts = VMRunOpts(**opts)
payload = opts.model_dump(by_alias=True, exclude_none=True)
self.client.print_curl("POST", f"/vms/{name}/run", payload) # type: ignore[attr-defined]
await self.client.post(f"/vms/{name}/run", payload) # type: ignore[attr-defined]
@ensure_server
async def list_vms(self) -> List[VMStatus]:
"""List all VMs."""
data = await self.client.get("/vms") # type: ignore[attr-defined]
return [VMStatus.model_validate(vm) for vm in data]
@ensure_server
async def get_vm(self, name: str) -> VMStatus:
"""Get VM details."""
data = await self.client.get(f"/vms/{name}") # type: ignore[attr-defined]
return VMStatus.model_validate(data)
@ensure_server
async def update_vm(self, name: str, params: Union[VMUpdateOpts, dict]) -> None:
"""Update VM settings."""
if isinstance(params, dict):
params = VMUpdateOpts(**params)
payload = params.model_dump(by_alias=True, exclude_none=True)
self.client.print_curl("PATCH", f"/vms/{name}", payload) # type: ignore[attr-defined]
await self.client.patch(f"/vms/{name}", payload) # type: ignore[attr-defined]
@ensure_server
async def stop_vm(self, name: str) -> None:
"""Stop a VM."""
await self.client.post(f"/vms/{name}/stop") # type: ignore[attr-defined]
@ensure_server
async def delete_vm(self, name: str) -> None:
"""Delete a VM."""
await self.client.delete(f"/vms/{name}") # type: ignore[attr-defined]
@ensure_server
async def pull_image(
self, spec: Union[ImageRef, dict, str], name: Optional[str] = None
) -> None:
"""Pull a VM image."""
await self._init_client()
if isinstance(spec, str):
if ":" in spec:
image_str = spec
else:
image_str = f"{spec}:latest"
registry = "ghcr.io"
organization = "trycua"
elif isinstance(spec, dict):
image = spec.get("image", "")
tag = spec.get("tag", "latest")
image_str = f"{image}:{tag}"
registry = spec.get("registry", "ghcr.io")
organization = spec.get("organization", "trycua")
else:
image_str = f"{spec.image}:{spec.tag}"
registry = spec.registry
organization = spec.organization
payload = {
"image": image_str,
"name": name,
"registry": registry,
"organization": organization,
}
self.client.print_curl("POST", "/pull", payload) # type: ignore[attr-defined]
await self.client.post("/pull", payload, timeout=300.0) # type: ignore[attr-defined]
@ensure_server
async def clone_vm(self, name: str, new_name: str) -> None:
"""Clone a VM with the given name to a new VM with new_name."""
config = CloneSpec(name=name, newName=new_name)
self.client.print_curl("POST", "/vms/clone", config.model_dump()) # type: ignore[attr-defined]
await self.client.post("/vms/clone", config.model_dump()) # type: ignore[attr-defined]
@ensure_server
async def get_latest_ipsw_url(self) -> str:
"""Get the latest IPSW URL."""
await self._init_client()
data = await self.client.get("/ipsw") # type: ignore[attr-defined]
return data["url"]
@ensure_server
async def get_images(self, organization: Optional[str] = None) -> ImageList:
"""Get list of available images."""
await self._init_client()
params = {"organization": organization} if organization else None
data = await self.client.get("/images", params) # type: ignore[attr-defined]
return ImageList(root=data)
async def close(self) -> None:
"""Close the client and stop the server."""
if self.client is not None:
await self.client.close()
self.client = None
await asyncio.sleep(1)
await self.server.stop()
async def _ensure_client(self) -> None:
"""Ensure client is initialized."""
if self.client is None:
await self._init_client()

View File

@@ -1,481 +0,0 @@
import asyncio
import json
import logging
import os
import random
import shlex
import signal
import socket
import subprocess
import sys
import tempfile
import time
from logging import getLogger
from typing import Optional
from .exceptions import LumeConnectionError
class LumeServer:
def __init__(
self,
debug: bool = False,
server_start_timeout: int = 60,
port: Optional[int] = None,
use_existing_server: bool = False,
host: str = "localhost",
):
"""Initialize the LumeServer.
Args:
debug: Enable debug logging
server_start_timeout: Timeout in seconds to wait for server to start
port: Specific port to use for the server
use_existing_server: If True, will try to connect to an existing server
instead of starting a new one
host: Host to use for connections (e.g., "localhost", "127.0.0.1", "host.docker.internal")
"""
self.debug = debug
self.server_start_timeout = server_start_timeout
self.server_process = None
self.output_file = None
self.requested_port = port
self.port = None
self.base_url = None
self.use_existing_server = use_existing_server
self.host = host
# Configure logging
self.logger = getLogger("pylume.server")
if not self.logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.DEBUG if debug else logging.INFO)
self.logger.debug(f"Server initialized with host: {self.host}")
def _check_port_available(self, port: int) -> bool:
"""Check if a port is available."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.5)
result = s.connect_ex(("127.0.0.1", port))
if result == 0: # Port is in use on localhost
return False
except:
pass
# Check the specified host (e.g., "host.docker.internal") if it's not a localhost alias
if self.host not in ["localhost", "127.0.0.1"]:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(0.5)
result = s.connect_ex((self.host, port))
if result == 0: # Port is in use on host
return False
except:
pass
return True
def _get_server_port(self) -> int:
"""Get an available port for the server."""
# Use requested port if specified
if self.requested_port is not None:
if not self._check_port_available(self.requested_port):
raise RuntimeError(f"Requested port {self.requested_port} is not available")
return self.requested_port
# Find a free port
for _ in range(10): # Try up to 10 times
port = random.randint(49152, 65535)
if self._check_port_available(port):
return port
raise RuntimeError("Could not find an available port")
async def _ensure_server_running(self) -> None:
"""Ensure the lume server is running, start it if it's not."""
try:
self.logger.debug("Checking if lume server is running...")
# Try to connect to the server with a short timeout
cmd = ["curl", "-s", "-w", "%{http_code}", "-m", "5", f"{self.base_url}/vms"]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
response = stdout.decode()
status_code = int(response[-3:])
if status_code == 200:
self.logger.debug("PyLume server is running")
return
self.logger.debug("PyLume server not running, attempting to start it")
# Server not running, try to start it
lume_path = os.path.join(os.path.dirname(__file__), "lume")
if not os.path.exists(lume_path):
raise RuntimeError(f"Could not find lume binary at {lume_path}")
# Make sure the file is executable
os.chmod(lume_path, 0o755)
# Create a temporary file for server output
self.output_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)
self.logger.debug(f"Using temporary file for server output: {self.output_file.name}")
# Start the server
self.logger.debug(f"Starting lume server with: {lume_path} serve --port {self.port}")
# Start server in background using subprocess.Popen
try:
self.server_process = subprocess.Popen(
[lume_path, "serve", "--port", str(self.port)],
stdout=self.output_file,
stderr=self.output_file,
cwd=os.path.dirname(lume_path),
start_new_session=True, # Run in new session to avoid blocking
)
except Exception as e:
self.output_file.close()
os.unlink(self.output_file.name)
raise RuntimeError(f"Failed to start lume server process: {str(e)}")
# Wait for server to start
self.logger.debug(
f"Waiting up to {self.server_start_timeout} seconds for server to start..."
)
start_time = time.time()
server_ready = False
last_size = 0
while time.time() - start_time < self.server_start_timeout:
if self.server_process.poll() is not None:
# Process has terminated
self.output_file.seek(0)
output = self.output_file.read()
self.output_file.close()
os.unlink(self.output_file.name)
error_msg = (
f"Server process terminated unexpectedly.\n"
f"Exit code: {self.server_process.returncode}\n"
f"Output: {output}"
)
raise RuntimeError(error_msg)
# Check output file for server ready message
self.output_file.seek(0, os.SEEK_END)
size = self.output_file.tell()
if size > last_size: # Only read if there's new content
self.output_file.seek(last_size)
new_output = self.output_file.read()
if new_output.strip(): # Only log non-empty output
self.logger.debug(f"Server output: {new_output.strip()}")
last_size = size
if "Server started" in new_output:
server_ready = True
self.logger.debug("Server startup detected")
break
# Try to connect to the server periodically
try:
cmd = ["curl", "-s", "-w", "%{http_code}", "-m", "5", f"{self.base_url}/vms"]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
response = stdout.decode()
status_code = int(response[-3:])
if status_code == 200:
server_ready = True
self.logger.debug("Server is responding to requests")
break
except:
pass # Server not ready yet
await asyncio.sleep(1.0)
if not server_ready:
# Cleanup if server didn't start
if self.server_process:
self.server_process.terminate()
try:
self.server_process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.server_process.kill()
self.output_file.close()
os.unlink(self.output_file.name)
raise RuntimeError(
f"Failed to start lume server after {self.server_start_timeout} seconds. "
"Check the debug output for more details."
)
# Give the server a moment to fully initialize
await asyncio.sleep(2.0)
# Verify server is responding
try:
cmd = ["curl", "-s", "-w", "%{http_code}", "-m", "10", f"{self.base_url}/vms"]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise RuntimeError(f"Curl command failed: {stderr.decode()}")
response = stdout.decode()
status_code = int(response[-3:])
if status_code != 200:
raise RuntimeError(f"Server returned status code {status_code}")
self.logger.debug("PyLume server started successfully")
except Exception as e:
self.logger.debug(f"Server verification failed: {str(e)}")
if self.server_process:
self.server_process.terminate()
try:
self.server_process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.server_process.kill()
self.output_file.close()
os.unlink(self.output_file.name)
raise RuntimeError(f"Server started but is not responding: {str(e)}")
self.logger.debug("Server startup completed successfully")
except Exception as e:
raise RuntimeError(f"Failed to start lume server: {str(e)}")
async def _start_server(self) -> None:
"""Start the lume server using the lume executable."""
self.logger.debug("Starting PyLume server")
# Get absolute path to lume executable in the same directory as this file
lume_path = os.path.join(os.path.dirname(__file__), "lume")
if not os.path.exists(lume_path):
raise RuntimeError(f"Could not find lume binary at {lume_path}")
try:
# Make executable
os.chmod(lume_path, 0o755)
# Get and validate port
self.port = self._get_server_port()
self.base_url = f"http://{self.host}:{self.port}/lume"
# Set up output handling
self.output_file = tempfile.NamedTemporaryFile(mode="w+", delete=False)
# Start the server process with the lume executable
env = os.environ.copy()
env["RUST_BACKTRACE"] = "1" # Enable backtrace for better error reporting
# Specify the host to bind to (0.0.0.0 to allow external connections)
self.server_process = subprocess.Popen(
[lume_path, "serve", "--port", str(self.port)],
stdout=self.output_file,
stderr=subprocess.STDOUT,
cwd=os.path.dirname(lume_path), # Run from same directory as executable
env=env,
)
# Wait for server to initialize
await asyncio.sleep(2)
await self._wait_for_server()
except Exception as e:
await self._cleanup()
raise RuntimeError(f"Failed to start lume server process: {str(e)}")
async def _tail_log(self) -> None:
"""Read and display server log output in debug mode."""
while True:
try:
self.output_file.seek(0, os.SEEK_END) # type: ignore[attr-defined]
line = self.output_file.readline() # type: ignore[attr-defined]
if line:
line = line.strip()
if line:
print(f"SERVER: {line}")
if self.server_process.poll() is not None: # type: ignore[attr-defined]
print("Server process ended")
break
await asyncio.sleep(0.1)
except Exception as e:
print(f"Error reading log: {e}")
await asyncio.sleep(0.1)
async def _wait_for_server(self) -> None:
"""Wait for server to start and become responsive with increased timeout."""
start_time = time.time()
while time.time() - start_time < self.server_start_timeout:
if self.server_process.poll() is not None: # type: ignore[attr-defined]
error_msg = await self._get_error_output()
await self._cleanup()
raise RuntimeError(error_msg)
try:
await self._verify_server()
self.logger.debug("Server is now responsive")
return
except Exception as e:
self.logger.debug(f"Server not ready yet: {str(e)}")
await asyncio.sleep(1.0)
await self._cleanup()
raise RuntimeError(f"Server failed to start after {self.server_start_timeout} seconds")
async def _verify_server(self) -> None:
"""Verify server is responding to requests."""
try:
cmd = [
"curl",
"-s",
"-w",
"%{http_code}",
"-m",
"10",
f"http://{self.host}:{self.port}/lume/vms",
]
process = await asyncio.create_subprocess_exec(
*cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
raise RuntimeError(f"Curl command failed: {stderr.decode()}")
response = stdout.decode()
status_code = int(response[-3:])
if status_code != 200:
raise RuntimeError(f"Server returned status code {status_code}")
self.logger.debug("PyLume server started successfully")
except Exception as e:
raise RuntimeError(f"Server not responding: {str(e)}")
async def _get_error_output(self) -> str:
"""Get error output from the server process."""
if not self.output_file:
return "No output available"
self.output_file.seek(0)
output = self.output_file.read()
return (
f"Server process terminated unexpectedly.\n"
f"Exit code: {self.server_process.returncode}\n" # type: ignore[attr-defined]
f"Output: {output}"
)
async def _cleanup(self) -> None:
"""Clean up all server resources."""
if self.server_process:
try:
self.server_process.terminate()
try:
self.server_process.wait(timeout=5)
except subprocess.TimeoutExpired:
self.server_process.kill()
except:
pass
self.server_process = None
# Clean up output file
if self.output_file:
try:
self.output_file.close()
os.unlink(self.output_file.name)
except Exception as e:
self.logger.debug(f"Error cleaning up output file: {e}")
self.output_file = None
async def ensure_running(self) -> None:
"""Ensure the server is running.
If use_existing_server is True, will only try to connect to an existing server.
Otherwise will:
1. Try to connect to an existing server on the specified port
2. If that fails and not in Docker, start a new server
3. If in Docker and no existing server is found, raise an error
"""
# First check if we're in Docker
in_docker = os.path.exists("/.dockerenv") or (
os.path.exists("/proc/1/cgroup") and "docker" in open("/proc/1/cgroup", "r").read()
)
# If using a non-localhost host like host.docker.internal, set up the connection details
if self.host not in ["localhost", "127.0.0.1"]:
if self.requested_port is None:
raise RuntimeError("Port must be specified when using a remote host")
self.port = self.requested_port
self.base_url = f"http://{self.host}:{self.port}/lume"
self.logger.debug(f"Using remote host server at {self.base_url}")
# Try to verify the server is accessible
try:
await self._verify_server()
self.logger.debug("Successfully connected to remote server")
return
except Exception as e:
if self.use_existing_server or in_docker:
# If explicitly requesting an existing server or in Docker, we can't start a new one
raise RuntimeError(
f"Failed to connect to remote server at {self.base_url}: {str(e)}"
)
else:
self.logger.debug(f"Remote server not available at {self.base_url}: {str(e)}")
# Fall back to localhost for starting a new server
self.host = "localhost"
# If explicitly using an existing server, verify it's running
if self.use_existing_server:
if self.requested_port is None:
raise RuntimeError("Port must be specified when using an existing server")
self.port = self.requested_port
self.base_url = f"http://{self.host}:{self.port}/lume"
try:
await self._verify_server()
self.logger.debug("Successfully connected to existing server")
except Exception as e:
raise RuntimeError(
f"Failed to connect to existing server at {self.base_url}: {str(e)}"
)
else:
# Try to connect to an existing server first
if self.requested_port is not None:
self.port = self.requested_port
self.base_url = f"http://{self.host}:{self.port}/lume"
try:
await self._verify_server()
self.logger.debug("Successfully connected to existing server")
return
except Exception:
self.logger.debug(f"No existing server found at {self.base_url}")
# If in Docker and can't connect to existing server, raise an error
if in_docker:
raise RuntimeError(
f"Failed to connect to server at {self.base_url} and cannot start a new server in Docker"
)
# Start a new server
self.logger.debug("Starting a new server instance")
await self._start_server()
async def stop(self) -> None:
"""Stop the server if we're managing it."""
if not self.use_existing_server:
self.logger.debug("Stopping lume server...")
await self._cleanup()

View File

@@ -1,51 +0,0 @@
[build-system]
build-backend = "pdm.backend"
requires = ["pdm-backend"]
[project]
authors = [{ name = "TryCua", email = "gh@trycua.com" }]
classifiers = [
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: MacOS :: MacOS X",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]
dependencies = ["pydantic>=2.11.1"]
description = "Python SDK for lume - run macOS and Linux VMs on Apple Silicon"
dynamic = ["version"]
keywords = ["apple-silicon", "macos", "virtualization", "vm"]
license = { text = "MIT" }
name = "pylume"
readme = "README.md"
requires-python = ">=3.12"
[tool.pdm.version]
path = "pylume/__init__.py"
source = "file"
[project.urls]
homepage = "https://github.com/trycua/pylume"
repository = "https://github.com/trycua/pylume"
[tool.pdm]
distribution = true
[tool.pdm.dev-dependencies]
dev = [
"black>=23.0.0",
"isort>=5.12.0",
"pytest-asyncio>=0.23.0",
"pytest>=7.0.0",
]
[tool.pytest.ini_options]
asyncio_mode = "auto"
python_files = "test_*.py"
testpaths = ["tests"]
[tool.pdm.build]
includes = ["pylume/"]
source-includes = ["LICENSE", "README.md", "tests/"]