Merge pull request #645 from trycua/feat/cua-bench-submodules

Introduce cua-bench-ui submodules (webview API, configurable ports, improved python RPC)
This commit is contained in:
ddupont
2025-12-09 13:28:06 -08:00
committed by GitHub
28 changed files with 4108 additions and 2924 deletions

View File

@@ -20,7 +20,11 @@ const geistMono = Geist_Mono({
export default function Layout({ children }: { children: ReactNode }) {
return (
<html lang="en" className={`${geist.variable} ${geistMono.variable} font-sans`} suppressHydrationWarning>
<html
lang="en"
className={`${geist.variable} ${geistMono.variable} font-sans`}
suppressHydrationWarning
>
<head>
<link rel="icon" href="/docs/favicon.ico" sizes="any" />
</head>

View File

@@ -3,4 +3,3 @@
from .browser_tool import BrowserTool
__all__ = ["BrowserTool"]

View File

@@ -0,0 +1,26 @@
# CUA Bench UI
Lightweight webUI window controller for CUA bench environments using pywebview
## Usage
```python
from bench_ui import launch_window, get_element_rect, execute_javascript
# Launch a window with inline HTML content
pid = launch_window(html="<html><body><h1>Hello</h1></body></html>")
# Get element rect in screen space
rect = get_element_rect(pid, "h1", space="screen")
print(rect)
# Execute arbitrary JavaScript
text = execute_javascript(pid, "document.querySelector('h1')?.textContent")
print(text)
```
## Installation
```bash
pip install cua-bench-ui
```

View File

@@ -0,0 +1,3 @@
from .api import execute_javascript, get_element_rect, launch_window
__all__ = ["launch_window", "get_element_rect", "execute_javascript"]

View File

@@ -0,0 +1,181 @@
import json
import os
import subprocess
import sys
import tempfile
import time
from pathlib import Path
from typing import Any, Dict, Optional
from urllib import request
from urllib.error import HTTPError, URLError
import psutil
# Map child PID -> listening port
_pid_to_port: Dict[int, int] = {}
def _post_json(url: str, payload: Dict[str, Any]) -> Dict[str, Any]:
data = json.dumps(payload).encode("utf-8")
req = request.Request(
url, data=data, headers={"Content-Type": "application/json"}, method="POST"
)
try:
with request.urlopen(req, timeout=5) as resp:
text = resp.read().decode("utf-8")
return json.loads(text)
except HTTPError as e:
try:
body = (e.read() or b"").decode("utf-8", errors="ignore")
return json.loads(body)
except Exception:
return {"error": "http_error", "status": getattr(e, "code", None)}
except URLError as e:
return {"error": "url_error", "reason": str(e.reason)}
def _detect_port_for_pid(pid: int) -> int:
"""Detect a listening local TCP port for the given PID using psutil.
Fails fast if psutil is unavailable or if no suitable port is found.
"""
if psutil is None:
raise RuntimeError("psutil is required for PID->port detection. Please install psutil.")
# Scan system-wide connections and filter by PID
for c in psutil.net_connections(kind="tcp"):
if getattr(c, "pid", None) != pid:
continue
laddr = getattr(c, "laddr", None)
status = str(getattr(c, "status", ""))
if not laddr or not isinstance(laddr, tuple) or len(laddr) < 2:
continue
lip, lport = laddr[0], int(laddr[1])
if status.upper() != "LISTEN":
continue
if lip in ("127.0.0.1", "::1", "0.0.0.0", "::"):
return lport
raise RuntimeError(f"Could not detect listening port for pid {pid}")
def launch_window(
url: Optional[str] = None,
*,
html: Optional[str] = None,
folder: Optional[str] = None,
title: str = "Window",
x: Optional[int] = None,
y: Optional[int] = None,
width: int = 600,
height: int = 400,
icon: Optional[str] = None,
use_inner_size: bool = False,
title_bar_style: str = "default",
) -> int:
"""Create a pywebview window in a child process and return its PID.
Preferred input is a URL via the positional `url` parameter.
To load inline HTML instead, pass `html=...`.
To serve a static folder, pass `folder=...` (path to directory).
Spawns `python -m bench_ui.child` with a JSON config passed via a temp file.
The child prints a single JSON line: {"pid": <pid>, "port": <port>}.
We cache pid->port for subsequent control calls like get_element_rect.
"""
if not url and not html and not folder:
raise ValueError("launch_window requires either a url, html, or folder")
config = {
"url": url,
"html": html,
"folder": folder,
"title": title,
"x": x,
"y": y,
"width": width,
"height": height,
"icon": icon,
"use_inner_size": use_inner_size,
"title_bar_style": title_bar_style,
}
with tempfile.NamedTemporaryFile("w", delete=False, suffix=".json") as f:
json.dump(config, f)
cfg_path = f.name
try:
# Launch child process
proc = subprocess.Popen(
[sys.executable, "-m", "bench_ui.child", cfg_path],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
assert proc.stdout is not None
# Read first line with startup info
line = proc.stdout.readline().strip()
info = json.loads(line)
pid = int(info["pid"]) if "pid" in info else proc.pid
port = int(info["port"]) # required
_pid_to_port[pid] = port
return pid
finally:
try:
os.unlink(cfg_path)
except Exception:
pass
def get_element_rect(pid: int, selector: str, *, space: str = "window"):
"""Ask the child process to compute element client rect via injected JS.
Returns a dict like {"x": float, "y": float, "width": float, "height": float} or None if not found.
"""
if pid not in _pid_to_port:
_pid_to_port[pid] = _detect_port_for_pid(pid)
port = _pid_to_port[pid]
url = f"http://127.0.0.1:{port}/rect"
last: Dict[str, Any] = {}
for _ in range(30): # ~3s total
resp = _post_json(url, {"selector": selector, "space": space})
last = resp or {}
rect = last.get("rect") if isinstance(last, dict) else None
err = last.get("error") if isinstance(last, dict) else None
if rect is not None:
return rect
if err in ("window_not_ready", "invalid_json"):
time.sleep(0.1)
continue
# If other transient errors, brief retry
if err:
time.sleep(0.1)
continue
time.sleep(0.1)
raise RuntimeError(f"Failed to get element rect: {last}")
def execute_javascript(pid: int, javascript: str):
"""Execute arbitrary JavaScript in the window and return its result.
Retries briefly while the window is still becoming ready.
"""
if pid not in _pid_to_port:
_pid_to_port[pid] = _detect_port_for_pid(pid)
port = _pid_to_port[pid]
url = f"http://127.0.0.1:{port}/eval"
last: Dict[str, Any] = {}
for _ in range(30): # ~3s total
resp = _post_json(url, {"javascript": javascript})
last = resp or {}
if isinstance(last, dict):
if "result" in last:
return last["result"]
if last.get("error") in ("window_not_ready", "invalid_json"):
time.sleep(0.1)
continue
if last.get("error"):
time.sleep(0.1)
continue
time.sleep(0.1)
raise RuntimeError(f"Failed to execute JavaScript: {last}")

View File

@@ -0,0 +1,221 @@
import asyncio
import json
import os
import random
import socket
import sys
import threading
from pathlib import Path
from typing import Optional
import webview
from aiohttp import web
def _get_free_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
def _start_http_server(
window: webview.Window,
port: int,
ready_event: threading.Event,
html_content: str | None = None,
folder_path: str | None = None,
):
async def rect_handler(request: web.Request):
try:
data = await request.json()
except Exception:
return web.json_response({"error": "invalid_json"}, status=400)
selector = data.get("selector")
space = data.get("space", "window")
if not isinstance(selector, str):
return web.json_response({"error": "selector_required"}, status=400)
# Ensure window content is loaded
if not ready_event.is_set():
# give it a short chance to finish loading
ready_event.wait(timeout=2.0)
if not ready_event.is_set():
return web.json_response({"error": "window_not_ready"}, status=409)
# Safely embed selector into JS
selector_js = json.dumps(selector)
if space == "screen":
# Compute approximate screen coordinates using window metrics
js = (
"(function(){"
f"const s = {selector_js};"
"const el = document.querySelector(s);"
"if(!el){return null;}"
"const r = el.getBoundingClientRect();"
"const sx = (window.screenX ?? window.screenLeft ?? 0);"
"const syRaw = (window.screenY ?? window.screenTop ?? 0);"
"const frameH = (window.outerHeight - window.innerHeight) || 0;"
"const sy = syRaw + frameH;"
"return {x:sx + r.left, y:sy + r.top, width:r.width, height:r.height};"
"})()"
)
else:
js = (
"(function(){"
f"const s = {selector_js};"
"const el = document.querySelector(s);"
"if(!el){return null;}"
"const r = el.getBoundingClientRect();"
"return {x:r.left,y:r.top,width:r.width,height:r.height};"
"})()"
)
try:
# Evaluate JS on the target window; this call is thread-safe in pywebview
result = window.evaluate_js(js)
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
return web.json_response({"rect": result})
async def eval_handler(request: web.Request):
try:
data = await request.json()
except Exception:
return web.json_response({"error": "invalid_json"}, status=400)
code = data.get("javascript") or data.get("code")
if not isinstance(code, str):
return web.json_response({"error": "javascript_required"}, status=400)
if not ready_event.is_set():
ready_event.wait(timeout=2.0)
if not ready_event.is_set():
return web.json_response({"error": "window_not_ready"}, status=409)
try:
result = window.evaluate_js(code)
except Exception as e:
return web.json_response({"error": str(e)}, status=500)
return web.json_response({"result": result})
async def index_handler(request: web.Request):
if html_content is None:
return web.json_response({"status": "ok", "message": "bench-ui control server"})
return web.Response(text=html_content, content_type="text/html")
app = web.Application()
# If serving a folder, add static file routes
if folder_path:
app.router.add_static("/", folder_path, show_index=True)
else:
app.router.add_get("/", index_handler)
app.router.add_post("/rect", rect_handler)
app.router.add_post("/eval", eval_handler)
loop = asyncio.new_event_loop()
def run_loop():
asyncio.set_event_loop(loop)
runner = web.AppRunner(app)
loop.run_until_complete(runner.setup())
site = web.TCPSite(runner, "127.0.0.1", port)
loop.run_until_complete(site.start())
loop.run_forever()
t = threading.Thread(target=run_loop, daemon=True)
t.start()
def main():
if len(sys.argv) < 2:
print("Usage: python -m bench_ui.child <config.json>", file=sys.stderr)
sys.exit(2)
cfg_path = Path(sys.argv[1])
cfg = json.loads(cfg_path.read_text(encoding="utf-8"))
html: Optional[str] = cfg.get("html") or ""
url: Optional[str] = cfg.get("url")
folder: Optional[str] = cfg.get("folder")
title: str = cfg.get("title", "Window")
x: Optional[int] = cfg.get("x")
y: Optional[int] = cfg.get("y")
width: int = int(cfg.get("width", 600))
height: int = int(cfg.get("height", 400))
icon: Optional[str] = cfg.get("icon")
use_inner_size: bool = bool(cfg.get("use_inner_size", False))
title_bar_style: str = cfg.get("title_bar_style", "default")
# Choose port early so we can point the window to it when serving inline HTML or folder
port = _get_free_port()
# Create window
if url:
window = webview.create_window(
title,
url=url,
width=width,
height=height,
x=x,
y=y,
confirm_close=False,
text_select=True,
background_color="#FFFFFF",
)
html_for_server = None
folder_for_server = None
elif folder:
# Serve static folder at control server root and point window to index.html
resolved_url = f"http://127.0.0.1:{port}/index.html"
window = webview.create_window(
title,
url=resolved_url,
width=width,
height=height,
x=x,
y=y,
confirm_close=False,
text_select=True,
background_color="#FFFFFF",
)
html_for_server = None
folder_for_server = folder
else:
# Serve inline HTML at control server root and point window to it
resolved_url = f"http://127.0.0.1:{port}/"
window = webview.create_window(
title,
url=resolved_url,
width=width,
height=height,
x=x,
y=y,
confirm_close=False,
text_select=True,
background_color="#FFFFFF",
)
html_for_server = html
folder_for_server = None
# Track when the page is loaded so JS execution succeeds
window_ready = threading.Event()
def _on_loaded():
window_ready.set()
window.events.loaded += _on_loaded # type: ignore[attr-defined]
# Start HTTP server for control (and optionally serve inline HTML or static folder)
_start_http_server(
window, port, window_ready, html_content=html_for_server, folder_path=folder_for_server
)
# Print startup info for parent to read
print(json.dumps({"pid": os.getpid(), "port": port}), flush=True)
# Start GUI (blocking)
webview.start(debug=os.environ.get("CUA_BENCH_UI_DEBUG", "false").lower() in ("true", "1"))
if __name__ == "__main__":
main()

Binary file not shown.

After

Width:  |  Height:  |  Size: 743 KiB

View File

@@ -0,0 +1,80 @@
from __future__ import annotations
import os
import time
from pathlib import Path
from bench_ui import execute_javascript, get_element_rect, launch_window
HTML = """
<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<title>Bench UI Example</title>
<style>
body { font-family: system-ui, sans-serif; margin: 24px; }
#target { width: 220px; height: 120px; background: #4f46e5; color: white; display: flex; align-items: center; justify-content: center; border-radius: 8px; }
</style>
</head>
<body>
<h1>Bench UI Example</h1>
<div id="target">Hello from pywebview</div>
<h1>Click the button</h1>
<button id="submit" class="btn" data-instruction="the button">Submit</button>
<script>
window.__submitted = false;
document.getElementById('submit').addEventListener('click', function() {
window.__submitted = true;
this.textContent = 'Submitted!';
this.disabled = true;
});
</script>
</body>
</html>
"""
def main():
os.environ["CUA_BENCH_UI_DEBUG"] = "1"
# Launch a window with inline HTML content
pid = launch_window(
html=HTML,
title="Bench UI Example",
width=800,
height=600,
)
print(f"Launched window with PID: {pid}")
# Give the window a brief moment to render
time.sleep(1.0)
# Query the client rect of an element via CSS selector in SCREEN space
rect = get_element_rect(pid, "#target", space="screen")
print("Element rect (screen space):", rect)
# Take a screenshot and overlay the bbox
try:
from PIL import ImageDraw, ImageGrab
img = ImageGrab.grab() # full screen
draw = ImageDraw.Draw(img)
x, y, w, h = rect["x"], rect["y"], rect["width"], rect["height"]
box = (x, y, x + w, y + h)
draw.rectangle(box, outline=(255, 0, 0), width=3)
out_path = Path(__file__).parent / "output_overlay.png"
img.save(out_path)
print(f"Saved overlay screenshot to: {out_path}")
except Exception as e:
print(f"Failed to capture/annotate screenshot: {e}")
# Execute arbitrary JavaScript
text = execute_javascript(pid, "window.__submitted")
print("text:", text)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,25 @@
[build-system]
requires = ["pdm-backend"]
build-backend = "pdm.backend"
[project]
name = "cua-bench-ui"
version = "0.7.0"
description = "Lightweight webUI window controller for CUA bench using pywebview"
readme = "README.md"
authors = [
{ name = "TryCua", email = "gh@trycua.com" }
]
dependencies = [
"pywebview>=5.3",
"aiohttp>=3.9.0",
"psutil>=5.9",
]
requires-python = ">=3.12"
[tool.pdm]
distribution = true
[tool.pdm.build]
includes = ["bench_ui/"]
source-includes = ["README.md"]

View File

@@ -0,0 +1,50 @@
import time
import psutil
import pytest
from bench_ui import execute_javascript, launch_window
from bench_ui.api import _pid_to_port
HTML = """
<!doctype html>
<html>
<head>
<meta charset="utf-8" />
<title>Bench UI Test</title>
</head>
<body>
<div id="t">hello-world</div>
</body>
</html>
"""
def test_execute_js_after_clearing_port_mapping():
# Skip if pywebview backend is unavailable on this machine
pywebview = pytest.importorskip("webview")
pid = launch_window(html=HTML, title="Bench UI Test", width=400, height=300)
try:
# Give a brief moment for window to render and server to start
time.sleep(1.0)
# Sanity: mapping should exist initially
assert pid in _pid_to_port
# Clear the cached mapping to simulate a fresh process lookup
del _pid_to_port[pid]
# Now execute JS; this should succeed by detecting the port via psutil
result = execute_javascript(pid, "document.querySelector('#t')?.textContent")
assert result == "hello-world"
finally:
# Best-effort cleanup of the child process
try:
p = psutil.Process(pid)
p.terminate()
try:
p.wait(timeout=3)
except psutil.TimeoutExpired:
p.kill()
except Exception:
pass

View File

@@ -24,8 +24,8 @@ from fastapi import (
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from .handlers.factory import HandlerFactory
from .browser import get_browser_manager
from .handlers.factory import HandlerFactory
# Authentication session TTL (in seconds). Override via env var CUA_AUTH_TTL_SECONDS. Default: 60s
AUTH_SESSION_TTL_SECONDS: int = int(os.environ.get("CUA_AUTH_TTL_SECONDS", "60"))
@@ -805,7 +805,7 @@ async def playwright_exec_endpoint(
try:
browser_manager = get_browser_manager()
result = await browser_manager.execute_command(command, params)
if result.get("success"):
return JSONResponse(content=result)
else:

View File

@@ -7,7 +7,28 @@ import platform
import re
import time
import traceback
from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union, cast
from functools import wraps
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
List,
Literal,
Optional,
TypeVar,
Union,
cast,
)
try:
from typing import ParamSpec
except Exception: # pragma: no cover
from typing_extensions import ParamSpec # type: ignore
P = ParamSpec("P")
R = TypeVar("R")
from core.telemetry import is_telemetry_enabled, record_event
from PIL import Image
@@ -66,8 +87,9 @@ class Computer:
verbosity: Union[int, LogLevel] = logging.INFO,
telemetry_enabled: bool = True,
provider_type: Union[str, VMProviderType] = VMProviderType.LUME,
port: Optional[int] = 7777,
provider_port: Optional[int] = 7777,
noVNC_port: Optional[int] = 8006,
api_port: Optional[int] = None,
host: str = os.environ.get("PYLUME_HOST", "localhost"),
storage: Optional[str] = None,
ephemeral: bool = False,
@@ -118,14 +140,19 @@ class Computer:
# Store original parameters
self.image = image
self.port = port
self.provider_port = provider_port
self.noVNC_port = noVNC_port
self.api_port = api_port
self.host = host
self.os_type = os_type
self.provider_type = provider_type
self.ephemeral = ephemeral
self.api_key = api_key if self.provider_type == VMProviderType.CLOUD else None
# Set default API port if not specified
if self.api_port is None:
self.api_port = 8443 if self.api_key else 8000
self.api_key = api_key
self.experiments = experiments or []
if "app-use" in self.experiments:
@@ -273,7 +300,7 @@ class Computer:
interface = cast(
BaseComputerInterface,
InterfaceFactory.create_interface_for_os(
os=self.os_type, ip_address=ip_address # type: ignore[arg-type]
os=self.os_type, ip_address=ip_address, api_port=self.api_port # type: ignore[arg-type]
),
)
self._interface = interface
@@ -300,7 +327,7 @@ class Computer:
storage = "ephemeral" if self.ephemeral else self.storage
verbose = self.verbosity >= LogLevel.DEBUG
ephemeral = self.ephemeral
port = self.port if self.port is not None else 7777
port = self.provider_port if self.provider_port is not None else 7777
host = self.host if self.host else "localhost"
image = self.image
shared_path = self.shared_path
@@ -365,6 +392,7 @@ class Computer:
verbose=verbose,
ephemeral=ephemeral,
noVNC_port=noVNC_port,
api_port=self.api_port,
)
else:
raise ValueError(f"Unsupported provider type: {self.provider_type}")
@@ -513,13 +541,14 @@ class Computer:
ip_address=ip_address,
api_key=self.api_key,
vm_name=self.config.name,
api_port=self.api_port,
),
)
else:
interface = cast(
BaseComputerInterface,
InterfaceFactory.create_interface_for_os(
os=self.os_type, ip_address=ip_address
os=self.os_type, ip_address=ip_address, api_port=self.api_port
),
)
@@ -533,15 +562,13 @@ class Computer:
# Use a single timeout for the entire connection process
# The VM should already be ready at this point, so we're just establishing the connection
await self._interface.wait_for_ready(timeout=30)
self.logger.info("WebSocket interface connected successfully")
self.logger.info("Sandbox interface connected successfully")
except TimeoutError as e:
self.logger.error(f"Failed to connect to WebSocket interface at {ip_address}")
port = getattr(self._interface, "_api_port", 8000) # Default to 8000 if not set
self.logger.error(f"Failed to connect to sandbox interface at {ip_address}:{port}")
raise TimeoutError(
f"Could not connect to WebSocket interface at {ip_address}:8000/ws: {str(e)}"
f"Could not connect to sandbox interface at {ip_address}:{port}: {str(e)}"
)
# self.logger.warning(
# f"Could not connect to WebSocket interface at {ip_address}:8000/ws: {str(e)}, expect missing functionality"
# )
# Create an event to keep the VM running in background if needed
if not self.use_host_computer_server:
@@ -688,6 +715,7 @@ class Computer:
ip_address=ip_address,
api_key=self.api_key,
vm_name=self.config.name,
api_port=self.api_port,
),
)
else:
@@ -696,6 +724,7 @@ class Computer:
InterfaceFactory.create_interface_for_os(
os=self.os_type,
ip_address=ip_address,
api_port=self.api_port,
),
)
@@ -1013,7 +1042,7 @@ class Computer:
else:
# POSIX (macOS/Linux)
venv_path = f"$HOME/.venvs/{venv_name}"
create_cmd = f'mkdir -p "$HOME/.venvs" && python3 -m venv "{venv_path}"'
create_cmd = f'mkdir -p "$HOME/.venvs" && python -m venv "{venv_path}"'
# Check if venv exists, if not create it
check_cmd = f'test -d "{venv_path}" || ({create_cmd})'
_ = await self.interface.run_command(check_cmd)
@@ -1024,7 +1053,25 @@ class Computer:
if requirements_str
else "echo No requirements to install"
)
return await self.interface.run_command(install_cmd)
return await self.interface.run_command(install_cmd)
async def pip_install(self, requirements: list[str]):
"""Install packages using the system Python/pip (no venv).
Args:
requirements: List of package requirements to install globally/user site.
Returns:
Tuple of (stdout, stderr) from the installation command
"""
requirements = requirements or []
if not requirements:
return await self.interface.run_command("echo No requirements to install")
# Use python -m pip for cross-platform consistency
reqs = " ".join(requirements)
install_cmd = f"python -m pip install {reqs}"
return await self.interface.run_command(install_cmd)
async def venv_cmd(self, venv_name: str, command: str):
"""Execute a shell command in a virtual environment.
@@ -1101,19 +1148,23 @@ class Computer:
raise Exception(f"Failed to reconstruct function source: {e}")
# Create Python code that will define and execute the function
args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii")
kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii")
python_code = f'''
import json
import traceback
import base64
try:
# Define the function from source
{textwrap.indent(func_source, " ")}
# Deserialize args and kwargs from JSON
args_json = """{args_json}"""
kwargs_json = """{kwargs_json}"""
args = json.loads(args_json)
kwargs = json.loads(kwargs_json)
# Deserialize args and kwargs from base64 JSON
_args_b64 = """{args_b64}"""
_kwargs_b64 = """{kwargs_b64}"""
args = json.loads(base64.b64decode(_args_b64).decode('utf-8'))
kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8'))
# Execute the function
result = {func_name}(*args, **kwargs)
@@ -1177,10 +1228,21 @@ print(f"<<<VENV_EXEC_START>>>{{output_json}}<<<VENV_EXEC_END>>>")
if output_payload["success"]:
return output_payload["result"]
else:
import builtins
# Recreate and raise the original exception
error_info = output_payload["error"]
error_class = eval(error_info["type"])
raise error_class(error_info["message"])
error_info = output_payload.get("error", {}) or {}
err_type = error_info.get("type") or "Exception"
err_msg = error_info.get("message") or ""
err_tb = error_info.get("traceback") or ""
exc_cls = getattr(builtins, err_type, None)
if isinstance(exc_cls, type) and issubclass(exc_cls, BaseException):
# Built-in exception: rethrow with remote traceback appended
raise exc_cls(f"{err_msg}\n\nRemote traceback:\n{err_tb}")
else:
# Non built-in: raise a safe local error carrying full remote context
raise RuntimeError(f"{err_type}: {err_msg}\n\nRemote traceback:\n{err_tb}")
else:
raise Exception("Invalid output format: markers found but no content between them")
else:
@@ -1188,3 +1250,357 @@ print(f"<<<VENV_EXEC_START>>>{{output_json}}<<<VENV_EXEC_END>>>")
raise Exception(
f"No output payload found. stdout: {result.stdout}, stderr: {result.stderr}"
)
async def venv_exec_background(
self, venv_name: str, python_func, *args, requirements: Optional[List[str]] = None, **kwargs
) -> int:
"""Run the Python function in the venv in the background and return the PID.
Uses a short launcher Python that spawns a detached child and exits immediately.
"""
import base64
import inspect
import json
import textwrap
import time as _time
try:
source = inspect.getsource(python_func)
func_source = textwrap.dedent(source).strip()
while func_source.lstrip().startswith("@"):
func_source = func_source.split("\n", 1)[1].strip()
func_name = python_func.__name__
args_json = json.dumps(args, default=str)
kwargs_json = json.dumps(kwargs, default=str)
except OSError as e:
raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}")
except Exception as e:
raise Exception(f"Failed to reconstruct function source: {e}")
reqs_list = requirements or []
reqs_json = json.dumps(reqs_list)
# Create Python code that will define and execute the function
args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii")
kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii")
payload_code = (
f'''
import json
import traceback
import base64
try:
# Define the function from source
{textwrap.indent(func_source, " ")}
# Deserialize args and kwargs from base64 JSON
_args_b64 = """{args_b64}"""
_kwargs_b64 = """{kwargs_b64}"""
args = json.loads(base64.b64decode(_args_b64).decode('utf-8'))
kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8'))
# Ensure requirements inside the active venv
for pkg in json.loads('''
+ repr(reqs_json)
+ """):
if pkg:
import subprocess, sys
subprocess.run([sys.executable, '-m', 'pip', 'install', pkg], check=False)
_ = {func_name}(*args, **kwargs)
except Exception:
import sys
sys.stderr.write(traceback.format_exc())
"""
)
payload_b64 = base64.b64encode(payload_code.encode("utf-8")).decode("ascii")
if self.os_type == "windows":
# Launcher spawns detached child and prints its PID
launcher_code = f"""
import base64, subprocess, os, sys
DETACHED_PROCESS = 0x00000008
CREATE_NEW_PROCESS_GROUP = 0x00000200
creationflags = DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP
code = base64.b64decode("{payload_b64}").decode("utf-8")
p = subprocess.Popen(["python", "-c", code], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=creationflags)
print(p.pid)
"""
launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii")
venv_path = f"%USERPROFILE%\\.venvs\\{venv_name}"
cmd = (
'cmd /c "'
f'call "{venv_path}\\Scripts\\activate.bat" && '
f"python -c \"import base64; exec(base64.b64decode('{launcher_b64}').decode('utf-8'))\""
'"'
)
result = await self.interface.run_command(cmd)
pid_str = (result.stdout or "").strip().splitlines()[-1].strip()
return int(pid_str)
else:
log = f"/tmp/cua_bg_{int(_time.time())}.log"
launcher_code = f"""
import base64, subprocess, os, sys
code = base64.b64decode("{payload_b64}").decode("utf-8")
with open("{log}", "ab", buffering=0) as f:
p = subprocess.Popen(["python", "-c", code], stdout=f, stderr=subprocess.STDOUT, preexec_fn=getattr(os, "setsid", None))
print(p.pid)
"""
launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii")
venv_path = f"$HOME/.venvs/{venv_name}"
shell = (
f'. "{venv_path}/bin/activate" && '
f"python -c \"import base64; exec(base64.b64decode('{launcher_b64}').decode('utf-8'))\""
)
result = await self.interface.run_command(shell)
pid_str = (result.stdout or "").strip().splitlines()[-1].strip()
return int(pid_str)
async def python_exec(self, python_func, *args, **kwargs):
"""Execute a Python function using the system Python (no venv).
Uses source extraction and base64 transport, mirroring venv_exec but
without virtual environment activation.
Returns the function result or raises a reconstructed exception with
remote traceback context appended.
"""
import base64
import inspect
import json
import textwrap
try:
source = inspect.getsource(python_func)
func_source = textwrap.dedent(source).strip()
while func_source.lstrip().startswith("@"):
func_source = func_source.split("\n", 1)[1].strip()
func_name = python_func.__name__
args_json = json.dumps(args, default=str)
kwargs_json = json.dumps(kwargs, default=str)
except OSError as e:
raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}")
except Exception as e:
raise Exception(f"Failed to reconstruct function source: {e}")
# Create Python code that will define and execute the function
args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii")
kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii")
python_code = f'''
import json
import traceback
import base64
try:
# Define the function from source
{textwrap.indent(func_source, " ")}
# Deserialize args and kwargs from base64 JSON
_args_b64 = """{args_b64}"""
_kwargs_b64 = """{kwargs_b64}"""
args = json.loads(base64.b64decode(_args_b64).decode('utf-8'))
kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8'))
# Execute the function
result = {func_name}(*args, **kwargs)
# Create success output payload
output_payload = {{
"success": True,
"result": result,
"error": None
}}
except Exception as e:
# Create error output payload
output_payload = {{
"success": False,
"result": None,
"error": {{
"type": type(e).__name__,
"message": str(e),
"traceback": traceback.format_exc()
}}
}}
# Serialize the output payload as JSON
import json
output_json = json.dumps(output_payload, default=str)
# Print the JSON output with markers
print(f"<<<VENV_EXEC_START>>>{{output_json}}<<<VENV_EXEC_END>>>")
'''
encoded_code = base64.b64encode(python_code.encode("utf-8")).decode("ascii")
python_command = (
f"python -c \"import base64; exec(base64.b64decode('{encoded_code}').decode('utf-8'))\""
)
result = await self.interface.run_command(python_command)
start_marker = "<<<VENV_EXEC_START>>>"
end_marker = "<<<VENV_EXEC_END>>>"
print(result.stdout[: result.stdout.find(start_marker)])
if start_marker in result.stdout and end_marker in result.stdout:
start_idx = result.stdout.find(start_marker) + len(start_marker)
end_idx = result.stdout.find(end_marker)
if start_idx < end_idx:
output_json = result.stdout[start_idx:end_idx]
try:
output_payload = json.loads(output_json)
except Exception as e:
raise Exception(f"Failed to decode output payload: {e}")
if output_payload["success"]:
return output_payload["result"]
else:
import builtins
error_info = output_payload.get("error", {}) or {}
err_type = error_info.get("type") or "Exception"
err_msg = error_info.get("message") or ""
err_tb = error_info.get("traceback") or ""
exc_cls = getattr(builtins, err_type, None)
if isinstance(exc_cls, type) and issubclass(exc_cls, BaseException):
raise exc_cls(f"{err_msg}\n\nRemote traceback:\n{err_tb}")
else:
raise RuntimeError(f"{err_type}: {err_msg}\n\nRemote traceback:\n{err_tb}")
else:
raise Exception("Invalid output format: markers found but no content between them")
else:
raise Exception(
f"No output payload found. stdout: {result.stdout}, stderr: {result.stderr}"
)
async def python_exec_background(
self, python_func, *args, requirements: Optional[List[str]] = None, **kwargs
) -> int:
"""Run a Python function with the system interpreter in the background and return PID.
Uses a short launcher Python that spawns a detached child and exits immediately.
"""
import base64
import inspect
import json
import textwrap
import time as _time
try:
source = inspect.getsource(python_func)
func_source = textwrap.dedent(source).strip()
while func_source.lstrip().startswith("@"):
func_source = func_source.split("\n", 1)[1].strip()
func_name = python_func.__name__
args_json = json.dumps(args, default=str)
kwargs_json = json.dumps(kwargs, default=str)
except OSError as e:
raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}")
except Exception as e:
raise Exception(f"Failed to reconstruct function source: {e}")
# Create Python code that will define and execute the function
args_b64 = base64.b64encode(args_json.encode("utf-8")).decode("ascii")
kwargs_b64 = base64.b64encode(kwargs_json.encode("utf-8")).decode("ascii")
payload_code = f'''
import json
import traceback
import base64
try:
# Define the function from source
{textwrap.indent(func_source, " ")}
# Deserialize args and kwargs from base64 JSON
_args_b64 = """{args_b64}"""
_kwargs_b64 = """{kwargs_b64}"""
args = json.loads(base64.b64decode(_args_b64).decode('utf-8'))
kwargs = json.loads(base64.b64decode(_kwargs_b64).decode('utf-8'))
_ = {func_name}(*args, **kwargs)
except Exception:
import sys
sys.stderr.write(traceback.format_exc())
'''
payload_b64 = base64.b64encode(payload_code.encode("utf-8")).decode("ascii")
if self.os_type == "windows":
launcher_code = f"""
import base64, subprocess, os, sys
DETACHED_PROCESS = 0x00000008
CREATE_NEW_PROCESS_GROUP = 0x00000200
creationflags = DETACHED_PROCESS | CREATE_NEW_PROCESS_GROUP
code = base64.b64decode("{payload_b64}").decode("utf-8")
p = subprocess.Popen(["python", "-c", code], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, creationflags=creationflags)
print(p.pid)
"""
launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii")
cmd = f"python -c \"import base64; exec(base64.b64decode('{launcher_b64}').decode('utf-8'))\""
result = await self.interface.run_command(cmd)
pid_str = (result.stdout or "").strip().splitlines()[-1].strip()
return int(pid_str)
else:
log = f"/tmp/cua_bg_{int(_time.time())}.log"
launcher_code = f"""
import base64, subprocess, os, sys
code = base64.b64decode("{payload_b64}").decode("utf-8")
with open("{log}", "ab", buffering=0) as f:
p = subprocess.Popen(["python", "-c", code], stdout=f, stderr=subprocess.STDOUT, preexec_fn=getattr(os, "setsid", None))
print(p.pid)
"""
launcher_b64 = base64.b64encode(launcher_code.encode("utf-8")).decode("ascii")
cmd = f"python -c \"import base64; exec(base64.b64decode('{launcher_b64}').decode('utf-8'))\""
result = await self.interface.run_command(cmd)
pid_str = (result.stdout or "").strip().splitlines()[-1].strip()
return int(pid_str)
def python_command(
self,
requirements: Optional[List[str]] = None,
*,
venv_name: str = "default",
use_system_python: bool = False,
background: bool = False,
) -> Callable[[Callable[P, R]], Callable[P, Awaitable[R]]]:
"""Decorator to execute a Python function remotely in this Computer's venv.
This mirrors `computer.helpers.sandboxed()` but binds to this instance and
optionally ensures required packages are installed before execution.
Args:
requirements: Packages to install in the virtual environment.
venv_name: Name of the virtual environment to use.
use_system_python: If True, use the system Python/pip instead of a venv.
background: If True, run the function detached and return the child PID immediately.
Returns:
A decorator that turns a local function into an async callable which
runs remotely and returns the function's result.
"""
reqs = list(requirements or [])
def decorator(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
@wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
if use_system_python:
# For background, avoid blocking installs; install inside child process
if background:
return await self.python_exec_background(func, *args, requirements=reqs, **kwargs) # type: ignore[return-value]
# Foreground: install first, then execute
if reqs:
await self.pip_install(reqs)
return await self.python_exec(func, *args, **kwargs)
else:
# For background, avoid blocking installs; install inside child process under venv
if background:
return await self.venv_exec_background(venv_name, func, *args, requirements=reqs, **kwargs) # type: ignore[return-value]
# Foreground: ensure venv and install, then execute
await self.venv_install(venv_name, reqs)
return await self.venv_exec(venv_name, func, *args, **kwargs)
return wrapper
return decorator

View File

@@ -5,7 +5,17 @@ Helper functions and decorators for the Computer module.
import asyncio
import logging
from functools import wraps
from typing import Any, Callable, Optional, TypeVar, cast
from typing import Any, Awaitable, Callable, Optional, TypeVar
try:
# Python 3.12+ has ParamSpec in typing
from typing import ParamSpec
except ImportError: # pragma: no cover
# Fallback for environments without ParamSpec in typing
from typing_extensions import ParamSpec # type: ignore
P = ParamSpec("P")
R = TypeVar("R")
# Global reference to the default computer instance
_default_computer = None
@@ -13,7 +23,7 @@ _default_computer = None
logger = logging.getLogger(__name__)
def set_default_computer(computer):
def set_default_computer(computer: Any) -> None:
"""
Set the default computer instance to be used by the remote decorator.
@@ -24,7 +34,11 @@ def set_default_computer(computer):
_default_computer = computer
def sandboxed(venv_name: str = "default", computer: str = "default", max_retries: int = 3):
def sandboxed(
venv_name: str = "default",
computer: str = "default",
max_retries: int = 3,
) -> Callable[[Callable[P, R]], Callable[P, Awaitable[R]]]:
"""
Decorator that wraps a function to be executed remotely via computer.venv_exec
@@ -34,9 +48,9 @@ def sandboxed(venv_name: str = "default", computer: str = "default", max_retries
max_retries: Maximum number of retries for the remote execution
"""
def decorator(func):
def decorator(func: Callable[P, R]) -> Callable[P, Awaitable[R]]:
@wraps(func)
async def wrapper(*args, **kwargs):
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
# Determine which computer instance to use
comp = computer if computer != "default" else _default_computer
@@ -54,6 +68,9 @@ def sandboxed(venv_name: str = "default", computer: str = "default", max_retries
if i == max_retries - 1:
raise e
# Should be unreachable because we either returned or raised
raise RuntimeError("sandboxed wrapper reached unreachable code path")
return wrapper
return decorator

View File

@@ -12,6 +12,7 @@ class InterfaceFactory:
def create_interface_for_os(
os: Literal["macos", "linux", "windows"],
ip_address: str,
api_port: Optional[int] = None,
api_key: Optional[str] = None,
vm_name: Optional[str] = None,
) -> BaseComputerInterface:
@@ -20,6 +21,7 @@ class InterfaceFactory:
Args:
os: Operating system type ('macos', 'linux', or 'windows')
ip_address: IP address of the computer to control
api_port: Optional API port of the computer to control
api_key: Optional API key for cloud authentication
vm_name: Optional VM name for cloud authentication
@@ -35,10 +37,16 @@ class InterfaceFactory:
from .windows import WindowsComputerInterface
if os == "macos":
return MacOSComputerInterface(ip_address, api_key=api_key, vm_name=vm_name)
return MacOSComputerInterface(
ip_address, api_key=api_key, vm_name=vm_name, api_port=api_port
)
elif os == "linux":
return LinuxComputerInterface(ip_address, api_key=api_key, vm_name=vm_name)
return LinuxComputerInterface(
ip_address, api_key=api_key, vm_name=vm_name, api_port=api_port
)
elif os == "windows":
return WindowsComputerInterface(ip_address, api_key=api_key, vm_name=vm_name)
return WindowsComputerInterface(
ip_address, api_key=api_key, vm_name=vm_name, api_port=api_port
)
else:
raise ValueError(f"Unsupported OS type: {os}")

View File

@@ -30,6 +30,7 @@ class GenericComputerInterface(BaseComputerInterface):
api_key: Optional[str] = None,
vm_name: Optional[str] = None,
logger_name: str = "computer.interface.generic",
api_port: Optional[int] = None,
):
super().__init__(ip_address, username, password, api_key, vm_name)
self._ws = None
@@ -47,6 +48,9 @@ class GenericComputerInterface(BaseComputerInterface):
# Set logger name for the interface
self.logger = Logger(logger_name, LogLevel.NORMAL)
# Store custom ports
self._api_port = api_port
# Optional default delay time between commands (in seconds)
self.delay = 0.0
@@ -70,7 +74,12 @@ class GenericComputerInterface(BaseComputerInterface):
WebSocket URI for the Computer API Server
"""
protocol = "wss" if self.api_key else "ws"
port = "8443" if self.api_key else "8000"
# Use custom API port if provided, otherwise use defaults based on API key
port = (
str(self._api_port)
if self._api_port is not None
else ("8443" if self.api_key else "8000")
)
return f"{protocol}://{self.ip_address}:{port}/ws"
@property
@@ -81,7 +90,12 @@ class GenericComputerInterface(BaseComputerInterface):
REST URI for the Computer API Server
"""
protocol = "https" if self.api_key else "http"
port = "8443" if self.api_key else "8000"
# Use custom API port if provided, otherwise use defaults based on API key
port = (
str(self._api_port)
if self._api_port is not None
else ("8443" if self.api_key else "8000")
)
return f"{protocol}://{self.ip_address}:{port}/cmd"
# Mouse actions

View File

@@ -13,7 +13,8 @@ class LinuxComputerInterface(GenericComputerInterface):
password: str = "lume",
api_key: Optional[str] = None,
vm_name: Optional[str] = None,
api_port: Optional[int] = None,
):
super().__init__(
ip_address, username, password, api_key, vm_name, "computer.interface.linux"
ip_address, username, password, api_key, vm_name, "computer.interface.linux", api_port
)

View File

@@ -13,9 +13,10 @@ class MacOSComputerInterface(GenericComputerInterface):
password: str = "lume",
api_key: Optional[str] = None,
vm_name: Optional[str] = None,
api_port: Optional[int] = None,
):
super().__init__(
ip_address, username, password, api_key, vm_name, "computer.interface.macos"
ip_address, username, password, api_key, vm_name, "computer.interface.macos", api_port
)
async def diorama_cmd(self, action: str, arguments: Optional[dict] = None) -> dict:

View File

@@ -13,7 +13,8 @@ class WindowsComputerInterface(GenericComputerInterface):
password: str = "lume",
api_key: Optional[str] = None,
vm_name: Optional[str] = None,
api_port: Optional[int] = None,
):
super().__init__(
ip_address, username, password, api_key, vm_name, "computer.interface.windows"
ip_address, username, password, api_key, vm_name, "computer.interface.windows", api_port
)

View File

@@ -37,7 +37,6 @@ class DockerProvider(BaseVMProvider):
def __init__(
self,
port: Optional[int] = 8000,
host: str = "localhost",
storage: Optional[str] = None,
shared_path: Optional[str] = None,
@@ -45,11 +44,11 @@ class DockerProvider(BaseVMProvider):
verbose: bool = False,
ephemeral: bool = False,
vnc_port: Optional[int] = 6901,
api_port: Optional[int] = None,
):
"""Initialize the Docker VM Provider.
Args:
port: Currently unused (VM provider port)
host: Hostname for the API server (default: localhost)
storage: Path for persistent VM storage
shared_path: Path for shared folder between host and container
@@ -60,9 +59,10 @@ class DockerProvider(BaseVMProvider):
verbose: Enable verbose logging
ephemeral: Use ephemeral (temporary) storage
vnc_port: Port for VNC interface (default: 6901)
api_port: Port for API server (default: 8000)
"""
self.host = host
self.api_port = 8000
self.api_port = api_port if api_port is not None else 8000
self.vnc_port = vnc_port
self.ephemeral = ephemeral
@@ -296,6 +296,7 @@ class DockerProvider(BaseVMProvider):
if vnc_port:
cmd.extend(["-p", f"{vnc_port}:6901"]) # VNC port
if api_port:
# Map the API port to container port 8000 (computer-server default)
cmd.extend(["-p", f"{api_port}:8000"]) # computer-server API port
# Add volume mounts if storage is specified

View File

@@ -14,7 +14,7 @@ class VMProviderFactory:
@staticmethod
def create_provider(
provider_type: Union[str, VMProviderType],
port: int = 7777,
provider_port: int = 7777,
host: str = "localhost",
bin_path: Optional[str] = None,
storage: Optional[str] = None,
@@ -23,13 +23,14 @@ class VMProviderFactory:
verbose: bool = False,
ephemeral: bool = False,
noVNC_port: Optional[int] = None,
api_port: Optional[int] = None,
**kwargs,
) -> BaseVMProvider:
"""Create a VM provider of the specified type.
Args:
provider_type: Type of VM provider to create
port: Port for the API server
provider_port: Port for the provider's API server
host: Hostname for the API server
bin_path: Path to provider binary if needed
storage: Path for persistent VM storage
@@ -37,7 +38,8 @@ class VMProviderFactory:
image: VM image to use (for Lumier provider)
verbose: Enable verbose logging
ephemeral: Use ephemeral (temporary) storage
noVNC_port: Specific port for noVNC interface (for Lumier provider)
noVNC_port: Specific port for noVNC interface (for Lumier and Docker provider)
api_port: Specific port for Computer API server (for Docker provider)
Returns:
An instance of the requested VM provider
@@ -63,7 +65,11 @@ class VMProviderFactory:
"Please install it with 'pip install cua-computer[lume]'"
)
return LumeProvider(
port=port, host=host, storage=storage, verbose=verbose, ephemeral=ephemeral
provider_port=provider_port,
host=host,
storage=storage,
verbose=verbose,
ephemeral=ephemeral,
)
except ImportError as e:
logger.error(f"Failed to import LumeProvider: {e}")
@@ -81,7 +87,7 @@ class VMProviderFactory:
"Please install Docker for Apple Silicon and Lume CLI before using this provider."
)
return LumierProvider(
port=port,
provider_port=provider_port,
host=host,
storage=storage,
shared_path=shared_path,
@@ -121,7 +127,6 @@ class VMProviderFactory:
"Please install it with 'pip install -U git+https://github.com/karkason/pywinsandbox.git'"
)
return WinSandboxProvider(
port=port,
host=host,
storage=storage,
verbose=verbose,
@@ -144,7 +149,6 @@ class VMProviderFactory:
"Please install Docker and ensure it is running."
)
return DockerProvider(
port=port,
host=host,
storage=storage,
shared_path=shared_path,
@@ -152,6 +156,7 @@ class VMProviderFactory:
verbose=verbose,
ephemeral=ephemeral,
vnc_port=noVNC_port,
api_port=api_port,
)
except ImportError as e:
logger.error(f"Failed to import DockerProvider: {e}")

View File

@@ -38,7 +38,7 @@ class LumeProvider(BaseVMProvider):
def __init__(
self,
port: int = 7777,
provider_port: int = 7777,
host: str = "localhost",
storage: Optional[str] = None,
verbose: bool = False,
@@ -47,7 +47,7 @@ class LumeProvider(BaseVMProvider):
"""Initialize the Lume provider.
Args:
port: Port for the Lume API server (default: 7777)
provider_port: Port for the Lume API server (default: 7777)
host: Host to use for API connections (default: localhost)
storage: Path to store VM data
verbose: Enable verbose logging
@@ -59,7 +59,7 @@ class LumeProvider(BaseVMProvider):
)
self.host = host
self.port = port # Default port for Lume API
self.port = provider_port # Default port for Lume API
self.storage = storage
self.verbose = verbose
self.ephemeral = ephemeral # If True, VMs will be deleted after stopping

View File

@@ -39,7 +39,7 @@ class LumierProvider(BaseVMProvider):
def __init__(
self,
port: Optional[int] = 7777,
provider_port: Optional[int] = 7777,
host: str = "localhost",
storage: Optional[str] = None, # Can be a path or 'ephemeral'
shared_path: Optional[str] = None,
@@ -51,7 +51,7 @@ class LumierProvider(BaseVMProvider):
"""Initialize the Lumier VM Provider.
Args:
port: Port for the API server (default: 7777)
provider_port: Port for the API server (default: 7777)
host: Hostname for the API server (default: localhost)
storage: Path for persistent VM storage
shared_path: Path for shared folder between host and VM
@@ -61,8 +61,8 @@ class LumierProvider(BaseVMProvider):
noVNC_port: Specific port for noVNC interface (default: 8006)
"""
self.host = host
# Always ensure api_port has a valid value (7777 is the default)
self.api_port = 7777 if port is None else port
# Always ensure lume_port has a valid value (7777 is the default)
self.lume_port = 7777 if provider_port is None else provider_port
self.vnc_port = noVNC_port # User-specified noVNC port, will be set in run_vm if provided
self.ephemeral = ephemeral
@@ -198,7 +198,7 @@ class LumierProvider(BaseVMProvider):
vm_info = lume_api_get(
vm_name=name,
host=self.host,
port=self.api_port,
port=self.lume_port,
storage=storage if storage is not None else self.storage,
debug=self.verbose,
verbose=self.verbose,
@@ -320,7 +320,7 @@ class LumierProvider(BaseVMProvider):
logger.debug(f"Using specified noVNC_port: {self.vnc_port}")
# Set API URL using the API port
self._api_url = f"http://{self.host}:{self.api_port}"
self._api_url = f"http://{self.host}:{self.lume_port}"
# Parse memory setting
memory_mb = self._parse_memory(run_opts.get("memory", "8GB"))
@@ -671,7 +671,7 @@ class LumierProvider(BaseVMProvider):
# Container is running, check if API is responsive
try:
# First check the health endpoint
api_url = f"http://{self.host}:{self.api_port}/health"
api_url = f"http://{self.host}:{self.lume_port}/health"
logger.info(f"Checking API health at: {api_url}")
# Use longer timeout for API health check since it may still be initializing
@@ -685,7 +685,7 @@ class LumierProvider(BaseVMProvider):
else:
# API health check failed, now let's check if the VM status endpoint is responsive
# This covers cases where the health endpoint isn't implemented but the VM API is working
vm_api_url = f"http://{self.host}:{self.api_port}/lume/vms/{container_name}"
vm_api_url = f"http://{self.host}:{self.lume_port}/lume/vms/{container_name}"
if self.storage:
import urllib.parse
@@ -1026,7 +1026,7 @@ class LumierProvider(BaseVMProvider):
# Initialize the API URL with the default value if not already set
# This ensures get_vm can work before run_vm is called
if not hasattr(self, "_api_url") or not self._api_url:
self._api_url = f"http://{self.host}:{self.api_port}"
self._api_url = f"http://{self.host}:{self.lume_port}"
logger.info(f"Initialized default Lumier API URL: {self._api_url}")
return self

View File

@@ -29,7 +29,6 @@ class WinSandboxProvider(BaseVMProvider):
def __init__(
self,
port: int = 7777,
host: str = "localhost",
storage: Optional[str] = None,
verbose: bool = False,
@@ -41,7 +40,6 @@ class WinSandboxProvider(BaseVMProvider):
"""Initialize the Windows Sandbox provider.
Args:
port: Port for the computer server (default: 7777)
host: Host to use for connections (default: localhost)
storage: Storage path (ignored - Windows Sandbox is always ephemeral)
verbose: Enable verbose logging
@@ -56,7 +54,6 @@ class WinSandboxProvider(BaseVMProvider):
)
self.host = host
self.port = port
self.verbose = verbose
self.memory_mb = memory_mb
self.networking = networking

View File

@@ -22,6 +22,20 @@ RUN apt-get update && apt-get install -y \
unzip \
zip \
xdg-utils \
gcc \
# Qt/XCB runtime deps for PyQt5 (libqxcb.so)
libxcb-icccm4 \
libxcb-image0 \
libxcb-keysyms1 \
libxcb-render-util0 \
libxcb-xinerama0 \
libxcb-shape0 \
libxcb-randr0 \
libxcb-xfixes0 \
libxcb-sync1 \
libxcb-util1 \
libxcb-cursor0 \
libxkbcommon-x11-0 \
# Desktop environment
xfce4 \
xfce4-terminal \
@@ -51,6 +65,7 @@ RUN apt-get update && apt-get install -y \
libssl-dev \
libsqlite3-dev \
tk-dev \
libgl1-mesa-dev \
libgdbm-dev \
libc6-dev \
libbz2-dev \
@@ -61,7 +76,8 @@ RUN apt-get update && apt-get install -y \
# Install Python 3.12 from deadsnakes (keep system python3 for apt)
RUN add-apt-repository -y ppa:deadsnakes/ppa && \
apt-get update && apt-get install -y \
python3.12 python3.12-venv python3.12-dev python3.12-tk && \
python3.12 python3.12-venv python3.12-dev python3.12-tk \
&& \
python3.12 -m ensurepip --upgrade && \
python3.12 -m pip install --upgrade pip setuptools wheel && \
rm -rf /var/lib/apt/lists/*
@@ -107,6 +123,10 @@ RUN mkdir -p /home/cua/.cache && \
# Install computer-server using Python 3.12 pip
RUN python3.12 -m pip install cua-computer-server
# Install PyQt6 and pywebview, used by cua-bench for web UIs
RUN python3.12 -m pip install "pywebview[qt]"
RUN python3.12 -m pip install cua-bench-ui>=0.6.0 --no-cache-dir
# Install playwright and Firefox dependencies
RUN python3.12 -m pip install playwright && \
python3.12 -m playwright install --with-deps firefox

View File

@@ -22,6 +22,20 @@ RUN apt-get update && apt-get install -y \
unzip \
zip \
xdg-utils \
gcc \
# Qt/XCB runtime deps for PyQt5 (libqxcb.so)
libxcb-icccm4 \
libxcb-image0 \
libxcb-keysyms1 \
libxcb-render-util0 \
libxcb-xinerama0 \
libxcb-shape0 \
libxcb-randr0 \
libxcb-xfixes0 \
libxcb-sync1 \
libxcb-util1 \
libxcb-cursor0 \
libxkbcommon-x11-0 \
# Desktop environment
xfce4 \
xfce4-terminal \
@@ -51,6 +65,7 @@ RUN apt-get update && apt-get install -y \
libssl-dev \
libsqlite3-dev \
tk-dev \
libgl1-mesa-dev \
libgdbm-dev \
libc6-dev \
libbz2-dev \
@@ -109,6 +124,10 @@ COPY python/computer-server /tmp/computer-server
RUN python3.12 -m pip install /tmp/computer-server && \
rm -rf /tmp/computer-server
# Install PyQt6 and pywebview, used by cua-bench for web UIs
RUN python3.12 -m pip install "pywebview[qt]"
RUN python3.12 -m pip install cua-bench-ui>=0.6.0 --no-cache-dir
# Install playwright and Firefox dependencies
RUN python3.12 -m pip install playwright && \
python3.12 -m playwright install --with-deps firefox

View File

@@ -10,4 +10,4 @@ echo "X server is ready"
# Start computer-server
export DISPLAY=:1
python -m computer_server --port ${API_PORT:-8000}
python3.12 -m computer_server --port ${API_PORT:-8000}

View File

@@ -58,6 +58,7 @@ members = [
"libs/python/computer-server",
"libs/python/som",
"libs/python/mcp-server",
"libs/python/bench-ui",
]
[tool.uv.sources]
@@ -67,6 +68,7 @@ cua-computer = { workspace = true }
cua-computer-server = { workspace = true }
cua-som = { workspace = true }
cua-mcp-server = { workspace = true }
cua-bench-ui = { workspace = true }
[tool.black]
line-length = 100
@@ -105,4 +107,4 @@ py_version = 312
[tool.pytest.ini_options]
asyncio_mode = "auto"
python_files = "test_*.py"
testpaths = ["libs/*/tests"]
testpaths = ["libs/*/tests"]

5805
uv.lock generated

File diff suppressed because it is too large Load Diff