Merge branch 'main' into feature/computer/typescript

This commit is contained in:
Morgan Dean
2025-06-18 10:25:36 -07:00
16 changed files with 1887 additions and 52 deletions

View File

@@ -0,0 +1,51 @@
"""Example of using the Windows Sandbox computer provider.
Learn more at: https://learn.microsoft.com/en-us/windows/security/application-security/application-isolation/windows-sandbox/
"""
import asyncio
from computer import Computer
async def main():
"""Test the Windows Sandbox provider."""
# Create a computer instance using Windows Sandbox
computer = Computer(
provider_type="winsandbox",
os_type="windows",
memory="4GB",
# ephemeral=True, # Always true for Windows Sandbox
)
try:
print("Starting Windows Sandbox...")
await computer.run()
print("Windows Sandbox is ready!")
print(f"IP Address: {await computer.get_ip()}")
# Test basic functionality
print("Testing basic functionality...")
screenshot = await computer.interface.screenshot()
print(f"Screenshot taken: {len(screenshot)} bytes")
# Test running a command
print("Testing command execution...")
stdout, stderr = await computer.interface.run_command("echo Hello from Windows Sandbox!")
print(f"Command output: {stdout}")
print("Press any key to continue...")
input()
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()
finally:
print("Stopping Windows Sandbox...")
await computer.stop()
print("Windows Sandbox stopped.")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,15 @@
"""
Main entry point for agent.ui module.
This allows running the agent UI with:
python -m agent.ui
Instead of:
python -m agent.ui.gradio.app
"""
from .gradio.app import create_gradio_ui
if __name__ == "__main__":
app = create_gradio_ui()
app.launch(share=False, inbrowser=True)

View File

@@ -137,6 +137,7 @@ MODEL_MAPPINGS = {
"openai": {
# Default to operator CUA model
"default": "computer-use-preview",
"OpenAI: Computer-Use Preview": "computer-use-preview",
# Map standard OpenAI model names to CUA-specific model names
"gpt-4-turbo": "computer-use-preview",
"gpt-4o": "computer-use-preview",
@@ -147,9 +148,17 @@ MODEL_MAPPINGS = {
"anthropic": {
# Default to newest model
"default": "claude-3-7-sonnet-20250219",
# New Claude 4 models
"Anthropic: Claude 4 Opus (20250514)": "claude-opus-4-20250514",
"Anthropic: Claude 4 Sonnet (20250514)": "claude-sonnet-4-20250514",
"claude-opus-4-20250514": "claude-opus-4-20250514",
"claude-sonnet-4-20250514": "claude-sonnet-4-20250514",
# Specific Claude models for CUA
"claude-3-5-sonnet-20240620": "claude-3-5-sonnet-20240620",
"Anthropic: Claude 3.7 Sonnet (20250219)": "claude-3-5-sonnet-20240620",
"Anthropic: Claude 3.5 Sonnet (20240620)": "claude-3-7-sonnet-20250219",
"claude-3-7-sonnet-20250219": "claude-3-7-sonnet-20250219",
"claude-3-5-sonnet-20240620": "claude-3-5-sonnet-20240620",
# Map standard model names to CUA-specific model names
"claude-3-opus": "claude-3-7-sonnet-20250219",
"claude-3-sonnet": "claude-3-5-sonnet-20240620",
@@ -209,12 +218,12 @@ def get_provider_and_model(model_name: str, loop_provider: str) -> tuple:
if agent_loop == AgentLoop.OPENAI:
provider = LLMProvider.OPENAI
model_name_to_use = MODEL_MAPPINGS["openai"].get(
model_name.lower(), MODEL_MAPPINGS["openai"]["default"]
model_name, MODEL_MAPPINGS["openai"]["default"]
)
elif agent_loop == AgentLoop.ANTHROPIC:
provider = LLMProvider.ANTHROPIC
model_name_to_use = MODEL_MAPPINGS["anthropic"].get(
model_name.lower(), MODEL_MAPPINGS["anthropic"]["default"]
model_name, MODEL_MAPPINGS["anthropic"]["default"]
)
elif agent_loop == AgentLoop.OMNI:
# Determine provider and clean model name based on the full string from UI
@@ -234,33 +243,11 @@ def get_provider_and_model(model_name: str, loop_provider: str) -> tuple:
cleaned_model_name = model_name.split("OMNI: Ollama ", 1)[1]
elif model_name.startswith("OMNI: Claude "):
provider = LLMProvider.ANTHROPIC
# Extract the canonical model name based on the UI string
# e.g., "OMNI: Claude 3.7 Sonnet (20250219)" -> "3.7 Sonnet" and "20250219"
parts = model_name.split(" (")
model_key_part = parts[0].replace("OMNI: Claude ", "")
date_part = parts[1].replace(")", "") if len(parts) > 1 else ""
# Normalize the extracted key part for comparison
# "3.7 Sonnet" -> "37sonnet"
model_key_part_norm = model_key_part.lower().replace(".", "").replace(" ", "")
cleaned_model_name = MODEL_MAPPINGS["omni"]["default"] # Default if not found
# Find the canonical name in the main Anthropic map
for key_anthropic, val_anthropic in MODEL_MAPPINGS["anthropic"].items():
# Normalize the canonical key for comparison
# "claude-3-7-sonnet-20250219" -> "claude37sonnet20250219"
key_anthropic_norm = key_anthropic.lower().replace("-", "")
# Check if the normalized canonical key starts with "claude" + normalized extracted part
# AND contains the date part.
if (
key_anthropic_norm.startswith("claude" + model_key_part_norm)
and date_part in key_anthropic_norm
):
cleaned_model_name = (
val_anthropic # Use the canonical name like "claude-3-7-sonnet-20250219"
)
break
model_name = model_name.replace("OMNI: ", "Anthropic: ")
cleaned_model_name = MODEL_MAPPINGS["anthropic"].get(
model_name, MODEL_MAPPINGS["anthropic"]["default"]
)
elif model_name.startswith("OMNI: OpenAI "):
provider = LLMProvider.OPENAI
# Extract the model part, e.g., "GPT-4o mini"
@@ -309,6 +296,8 @@ def get_provider_and_model(model_name: str, loop_provider: str) -> tuple:
model_name_to_use = MODEL_MAPPINGS["openai"]["default"]
agent_loop = AgentLoop.OPENAI
print(f"Mapping {model_name} and {loop_provider} to {provider}, {model_name_to_use}, {agent_loop}")
return provider, model_name_to_use, agent_loop
@@ -453,6 +442,9 @@ def create_gradio_ui(
# Always show models regardless of API key availability
openai_models = ["OpenAI: Computer-Use Preview"]
anthropic_models = [
"Anthropic: Claude 4 Opus (20250514)",
"Anthropic: Claude 4 Sonnet (20250514)",
"Anthropic: Claude 3.7 Sonnet (20250219)",
"Anthropic: Claude 3.5 Sonnet (20240620)",
]
@@ -460,6 +452,8 @@ def create_gradio_ui(
"OMNI: OpenAI GPT-4o",
"OMNI: OpenAI GPT-4o mini",
"OMNI: OpenAI GPT-4.5-preview",
"OMNI: Claude 4 Opus (20250514)",
"OMNI: Claude 4 Sonnet (20250514)",
"OMNI: Claude 3.7 Sonnet (20250219)",
"OMNI: Claude 3.5 Sonnet (20240620)"
]
@@ -729,20 +723,25 @@ if __name__ == "__main__":
with gr.Accordion("Computer Configuration", open=True):
# Computer configuration options
computer_os = gr.Radio(
choices=["macos", "linux"],
choices=["macos", "linux", "windows"],
label="Operating System",
value="macos",
info="Select the operating system for the computer",
)
# Detect if current device is MacOS
is_windows = platform.system().lower() == "windows"
is_mac = platform.system().lower() == "darwin"
providers = ["cloud"]
if is_mac:
providers += ["lume"]
elif is_windows:
providers += ["winsandbox"]
computer_provider = gr.Radio(
choices=["cloud", "lume"],
choices=providers,
label="Provider",
value="lume" if is_mac else "cloud",
visible=is_mac,
info="Select the computer provider",
)

View File

@@ -11,6 +11,8 @@ if system == 'darwin':
from computer_server.diorama.macos import MacOSDioramaHandler
elif system == 'linux':
from .linux import LinuxAccessibilityHandler, LinuxAutomationHandler
elif system == 'windows':
from .windows import WindowsAccessibilityHandler, WindowsAutomationHandler
from .generic import GenericFileHandler
@@ -22,7 +24,7 @@ class HandlerFactory:
"""Determine the current OS.
Returns:
str: The OS type ('darwin' for macOS or 'linux' for Linux)
str: The OS type ('darwin' for macOS, 'linux' for Linux, or 'windows' for Windows)
Raises:
RuntimeError: If unable to determine the current OS
@@ -31,13 +33,14 @@ class HandlerFactory:
# Use platform.system() as primary method
system = platform.system().lower()
if system in ['darwin', 'linux', 'windows']:
return 'darwin' if system == 'darwin' else 'linux' if system == 'linux' else 'windows'
return system
# Fallback to uname if platform.system() doesn't return expected values
# Fallback to uname if platform.system() doesn't return expected values (Unix-like systems only)
result = subprocess.run(['uname', '-s'], capture_output=True, text=True)
if result.returncode != 0:
raise RuntimeError(f"uname command failed: {result.stderr}")
return result.stdout.strip().lower()
if result.returncode == 0:
return result.stdout.strip().lower()
raise RuntimeError(f"Unsupported OS: {system}")
except Exception as e:
raise RuntimeError(f"Failed to determine current OS: {str(e)}")
@@ -59,5 +62,7 @@ class HandlerFactory:
return MacOSAccessibilityHandler(), MacOSAutomationHandler(), MacOSDioramaHandler(), GenericFileHandler()
elif os_type == 'linux':
return LinuxAccessibilityHandler(), LinuxAutomationHandler(), BaseDioramaHandler(), GenericFileHandler()
elif os_type == 'windows':
return WindowsAccessibilityHandler(), WindowsAutomationHandler(), BaseDioramaHandler(), GenericFileHandler()
else:
raise NotImplementedError(f"OS '{os_type}' is not supported")
raise NotImplementedError(f"OS '{os_type}' is not supported")

View File

@@ -0,0 +1,405 @@
"""
Windows implementation of automation and accessibility handlers.
This implementation uses pyautogui for GUI automation and Windows-specific APIs
for accessibility and system operations.
"""
from typing import Dict, Any, List, Tuple, Optional
import logging
import subprocess
import base64
import os
from io import BytesIO
# Configure logger
logger = logging.getLogger(__name__)
# Try to import pyautogui
try:
import pyautogui
logger.info("pyautogui successfully imported, GUI automation available")
except Exception as e:
logger.error(f"pyautogui import failed: {str(e)}. GUI operations will not work.")
pyautogui = None
# Try to import Windows-specific modules
try:
import win32gui
import win32con
import win32api
logger.info("Windows API modules successfully imported")
WINDOWS_API_AVAILABLE = True
except Exception as e:
logger.error(f"Windows API modules import failed: {str(e)}. Some Windows-specific features will be unavailable.")
WINDOWS_API_AVAILABLE = False
from .base import BaseAccessibilityHandler, BaseAutomationHandler
class WindowsAccessibilityHandler(BaseAccessibilityHandler):
"""Windows implementation of accessibility handler."""
async def get_accessibility_tree(self) -> Dict[str, Any]:
"""Get the accessibility tree of the current window."""
if not WINDOWS_API_AVAILABLE:
return {"success": False, "error": "Windows API not available"}
try:
# Get the foreground window
hwnd = win32gui.GetForegroundWindow()
if not hwnd:
return {"success": False, "error": "No foreground window found"}
# Get window information
window_text = win32gui.GetWindowText(hwnd)
rect = win32gui.GetWindowRect(hwnd)
tree = {
"role": "Window",
"title": window_text,
"position": {"x": rect[0], "y": rect[1]},
"size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]},
"children": []
}
# Enumerate child windows
def enum_child_proc(hwnd_child, children_list):
try:
child_text = win32gui.GetWindowText(hwnd_child)
child_rect = win32gui.GetWindowRect(hwnd_child)
child_class = win32gui.GetClassName(hwnd_child)
child_info = {
"role": child_class,
"title": child_text,
"position": {"x": child_rect[0], "y": child_rect[1]},
"size": {"width": child_rect[2] - child_rect[0], "height": child_rect[3] - child_rect[1]},
"children": []
}
children_list.append(child_info)
except Exception as e:
logger.debug(f"Error getting child window info: {e}")
return True
win32gui.EnumChildWindows(hwnd, enum_child_proc, tree["children"])
return {"success": True, "tree": tree}
except Exception as e:
logger.error(f"Error getting accessibility tree: {e}")
return {"success": False, "error": str(e)}
async def find_element(self, role: Optional[str] = None,
title: Optional[str] = None,
value: Optional[str] = None) -> Dict[str, Any]:
"""Find an element in the accessibility tree by criteria."""
if not WINDOWS_API_AVAILABLE:
return {"success": False, "error": "Windows API not available"}
try:
# Find window by title if specified
if title:
hwnd = win32gui.FindWindow(None, title)
if hwnd:
rect = win32gui.GetWindowRect(hwnd)
return {
"success": True,
"element": {
"role": "Window",
"title": title,
"position": {"x": rect[0], "y": rect[1]},
"size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]}
}
}
# Find window by class name if role is specified
if role:
hwnd = win32gui.FindWindow(role, None)
if hwnd:
window_text = win32gui.GetWindowText(hwnd)
rect = win32gui.GetWindowRect(hwnd)
return {
"success": True,
"element": {
"role": role,
"title": window_text,
"position": {"x": rect[0], "y": rect[1]},
"size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]}
}
}
return {"success": False, "error": "Element not found"}
except Exception as e:
logger.error(f"Error finding element: {e}")
return {"success": False, "error": str(e)}
class WindowsAutomationHandler(BaseAutomationHandler):
"""Windows implementation of automation handler using pyautogui and Windows APIs."""
# Mouse Actions
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.mouseDown(button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.mouseUp(button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.moveTo(x, y)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.click()
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.rightClick()
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def double_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.doubleClick(interval=0.1)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.dragTo(x, y, duration=duration, button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def drag(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
if not path:
return {"success": False, "error": "Path is empty"}
# Move to first position
pyautogui.moveTo(*path[0])
# Drag through all positions
for x, y in path[1:]:
pyautogui.dragTo(x, y, duration=duration/len(path), button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Keyboard Actions
async def key_down(self, key: str) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.keyDown(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def key_up(self, key: str) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.keyUp(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def type_text(self, text: str) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.write(text)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def press_key(self, key: str) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.press(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def hotkey(self, keys: str) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.hotkey(*keys)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Scrolling Actions
async def scroll(self, x: int, y: int) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
# pyautogui.scroll() only takes one parameter (vertical scroll)
pyautogui.scroll(y)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.scroll(-clicks)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.scroll(clicks)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Screen Actions
async def screenshot(self) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
from PIL import Image
screenshot = pyautogui.screenshot()
if not isinstance(screenshot, Image.Image):
return {"success": False, "error": "Failed to capture screenshot"}
buffered = BytesIO()
screenshot.save(buffered, format="PNG", optimize=True)
buffered.seek(0)
image_data = base64.b64encode(buffered.getvalue()).decode()
return {"success": True, "image_data": image_data}
except Exception as e:
return {"success": False, "error": f"Screenshot error: {str(e)}"}
async def get_screen_size(self) -> Dict[str, Any]:
try:
if pyautogui:
size = pyautogui.size()
return {"success": True, "size": {"width": size.width, "height": size.height}}
elif WINDOWS_API_AVAILABLE:
# Fallback to Windows API
width = win32api.GetSystemMetrics(win32con.SM_CXSCREEN)
height = win32api.GetSystemMetrics(win32con.SM_CYSCREEN)
return {"success": True, "size": {"width": width, "height": height}}
else:
return {"success": False, "error": "No screen size detection method available"}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_cursor_position(self) -> Dict[str, Any]:
try:
if pyautogui:
pos = pyautogui.position()
return {"success": True, "position": {"x": pos.x, "y": pos.y}}
elif WINDOWS_API_AVAILABLE:
# Fallback to Windows API
pos = win32gui.GetCursorPos()
return {"success": True, "position": {"x": pos[0], "y": pos[1]}}
else:
return {"success": False, "error": "No cursor position detection method available"}
except Exception as e:
return {"success": False, "error": str(e)}
# Clipboard Actions
async def copy_to_clipboard(self) -> Dict[str, Any]:
try:
import pyperclip
content = pyperclip.paste()
return {"success": True, "content": content}
except Exception as e:
return {"success": False, "error": str(e)}
async def set_clipboard(self, text: str) -> Dict[str, Any]:
try:
import pyperclip
pyperclip.copy(text)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Command Execution
async def run_command(self, command: str) -> Dict[str, Any]:
try:
# Use cmd.exe for Windows commands
process = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
)
return {
"success": True,
"stdout": process.stdout,
"stderr": process.stderr,
"return_code": process.returncode
}
except Exception as e:
return {"success": False, "error": str(e)}

View File

@@ -19,7 +19,8 @@ dependencies = [
"pyautogui>=0.9.54",
"pynput>=1.8.1",
"pillow>=10.2.0",
"aiohttp>=3.9.1"
"aiohttp>=3.9.1",
"pyperclip>=1.9.0"
]
[project.optional-dependencies]
@@ -31,6 +32,9 @@ macos = [
linux = [
"python-xlib>=0.33"
]
windows = [
"pywin32>=310"
]
[project.urls]
homepage = "https://github.com/trycua/cua"
@@ -80,4 +84,4 @@ disallow_untyped_defs = true
check_untyped_defs = true
warn_return_any = true
show_error_codes = true
warn_unused_ignores = false
warn_unused_ignores = false

View File

@@ -0,0 +1,11 @@
"""Windows Sandbox provider for CUA Computer."""
try:
import winsandbox
HAS_WINSANDBOX = True
except ImportError:
HAS_WINSANDBOX = False
from .provider import WinSandboxProvider
__all__ = ["WinSandboxProvider", "HAS_WINSANDBOX"]

View File

@@ -0,0 +1,468 @@
"""Windows Sandbox VM provider implementation using pywinsandbox."""
import os
import asyncio
import logging
import time
from typing import Dict, Any, Optional, List
from ..base import BaseVMProvider, VMProviderType
# Setup logging
logger = logging.getLogger(__name__)
try:
import winsandbox
HAS_WINSANDBOX = True
except ImportError:
HAS_WINSANDBOX = False
class WinSandboxProvider(BaseVMProvider):
"""Windows Sandbox VM provider implementation using pywinsandbox.
This provider uses Windows Sandbox to create isolated Windows environments.
Storage is always ephemeral with Windows Sandbox.
"""
def __init__(
self,
port: int = 7777,
host: str = "localhost",
storage: Optional[str] = None,
verbose: bool = False,
ephemeral: bool = True, # Windows Sandbox is always ephemeral
memory_mb: int = 4096,
networking: bool = True,
**kwargs
):
"""Initialize the Windows Sandbox provider.
Args:
port: Port for the computer server (default: 7777)
host: Host to use for connections (default: localhost)
storage: Storage path (ignored - Windows Sandbox is always ephemeral)
verbose: Enable verbose logging
ephemeral: Always True for Windows Sandbox
memory_mb: Memory allocation in MB (default: 4096)
networking: Enable networking in sandbox (default: True)
"""
if not HAS_WINSANDBOX:
raise ImportError(
"pywinsandbox is required for WinSandboxProvider. "
"Please install it with 'pip install pywinsandbox'"
)
self.host = host
self.port = port
self.verbose = verbose
self.memory_mb = memory_mb
self.networking = networking
# Windows Sandbox is always ephemeral
if not ephemeral:
logger.warning("Windows Sandbox storage is always ephemeral. Ignoring ephemeral=False.")
self.ephemeral = True
# Storage is always ephemeral for Windows Sandbox
if storage and storage != "ephemeral":
logger.warning("Windows Sandbox does not support persistent storage. Using ephemeral storage.")
self.storage = "ephemeral"
self.logger = logging.getLogger(__name__)
# Track active sandboxes
self._active_sandboxes: Dict[str, Any] = {}
@property
def provider_type(self) -> VMProviderType:
"""Get the provider type."""
return VMProviderType.WINSANDBOX
async def __aenter__(self):
"""Enter async context manager."""
# Verify Windows Sandbox is available
if not HAS_WINSANDBOX:
raise ImportError("pywinsandbox is not available")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit async context manager."""
# Clean up any active sandboxes
for name, sandbox in self._active_sandboxes.items():
try:
sandbox.shutdown()
self.logger.info(f"Terminated sandbox: {name}")
except Exception as e:
self.logger.error(f"Error terminating sandbox {name}: {e}")
self._active_sandboxes.clear()
async def get_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Get VM information by name.
Args:
name: Name of the VM to get information for
storage: Ignored for Windows Sandbox (always ephemeral)
Returns:
Dictionary with VM information including status, IP address, etc.
"""
if name not in self._active_sandboxes:
return {
"name": name,
"status": "stopped",
"ip_address": None,
"storage": "ephemeral"
}
sandbox = self._active_sandboxes[name]
# Check if sandbox is still running
try:
# Try to ping the sandbox to see if it's responsive
try:
sandbox.rpyc.modules.os.getcwd()
sandbox_responsive = True
except Exception:
sandbox_responsive = False
if not sandbox_responsive:
return {
"name": name,
"status": "starting",
"ip_address": None,
"storage": "ephemeral",
"memory_mb": self.memory_mb,
"networking": self.networking
}
# Check for computer server address file
server_address_file = r"C:\Users\WDAGUtilityAccount\Desktop\shared_windows_sandbox_dir\server_address"
try:
# Check if the server address file exists
file_exists = sandbox.rpyc.modules.os.path.exists(server_address_file)
if file_exists:
# Read the server address file
with sandbox.rpyc.builtin.open(server_address_file, 'r') as f:
server_address = f.read().strip()
if server_address and ':' in server_address:
# Parse IP:port from the file
ip_address, port = server_address.split(':', 1)
# Verify the server is actually responding
try:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(3)
result = sock.connect_ex((ip_address, int(port)))
sock.close()
if result == 0:
# Server is responding
status = "running"
self.logger.debug(f"Computer server found at {ip_address}:{port}")
else:
# Server file exists but not responding
status = "starting"
ip_address = None
except Exception as e:
self.logger.debug(f"Error checking server connectivity: {e}")
status = "starting"
ip_address = None
else:
# File exists but doesn't contain valid address
status = "starting"
ip_address = None
else:
# Server address file doesn't exist yet
status = "starting"
ip_address = None
except Exception as e:
self.logger.debug(f"Error checking server address file: {e}")
status = "starting"
ip_address = None
except Exception as e:
self.logger.error(f"Error checking sandbox status: {e}")
status = "error"
ip_address = None
return {
"name": name,
"status": status,
"ip_address": ip_address,
"storage": "ephemeral",
"memory_mb": self.memory_mb,
"networking": self.networking
}
async def list_vms(self) -> List[Dict[str, Any]]:
"""List all available VMs."""
vms = []
for name in self._active_sandboxes.keys():
vm_info = await self.get_vm(name)
vms.append(vm_info)
return vms
async def run_vm(self, image: str, name: str, run_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
"""Run a VM with the given options.
Args:
image: Image name (ignored for Windows Sandbox - always uses host Windows)
name: Name of the VM to run
run_opts: Dictionary of run options (memory, cpu, etc.)
storage: Ignored for Windows Sandbox (always ephemeral)
Returns:
Dictionary with VM run status and information
"""
if name in self._active_sandboxes:
return {
"success": False,
"error": f"Sandbox {name} is already running"
}
try:
# Extract options from run_opts
memory_mb = run_opts.get("memory_mb", self.memory_mb)
if isinstance(memory_mb, str):
# Convert memory string like "4GB" to MB
if memory_mb.upper().endswith("GB"):
memory_mb = int(float(memory_mb[:-2]) * 1024)
elif memory_mb.upper().endswith("MB"):
memory_mb = int(memory_mb[:-2])
else:
memory_mb = self.memory_mb
networking = run_opts.get("networking", self.networking)
# Create folder mappers if shared directories are specified
folder_mappers = []
shared_directories = run_opts.get("shared_directories", [])
for shared_dir in shared_directories:
if isinstance(shared_dir, dict):
host_path = shared_dir.get("hostPath", "")
elif isinstance(shared_dir, str):
host_path = shared_dir
else:
continue
if host_path and os.path.exists(host_path):
folder_mappers.append(winsandbox.FolderMapper(host_path))
self.logger.info(f"Creating Windows Sandbox: {name}")
self.logger.info(f"Memory: {memory_mb}MB, Networking: {networking}")
if folder_mappers:
self.logger.info(f"Shared directories: {len(folder_mappers)}")
# Create the sandbox without logon script
sandbox = winsandbox.new_sandbox(
memory_mb=str(memory_mb),
networking=networking,
folder_mappers=folder_mappers
)
# Store the sandbox
self._active_sandboxes[name] = sandbox
self.logger.info(f"Windows Sandbox {name} created successfully")
# Setup the computer server in the sandbox
await self._setup_computer_server(sandbox, name)
return {
"success": True,
"name": name,
"status": "starting",
"memory_mb": memory_mb,
"networking": networking,
"storage": "ephemeral"
}
except Exception as e:
self.logger.error(f"Failed to create Windows Sandbox {name}: {e}")
# stack trace
import traceback
self.logger.error(f"Stack trace: {traceback.format_exc()}")
return {
"success": False,
"error": f"Failed to create sandbox: {str(e)}"
}
async def stop_vm(self, name: str, storage: Optional[str] = None) -> Dict[str, Any]:
"""Stop a running VM.
Args:
name: Name of the VM to stop
storage: Ignored for Windows Sandbox
Returns:
Dictionary with stop status and information
"""
if name not in self._active_sandboxes:
return {
"success": False,
"error": f"Sandbox {name} is not running"
}
try:
sandbox = self._active_sandboxes[name]
# Terminate the sandbox
sandbox.shutdown()
# Remove from active sandboxes
del self._active_sandboxes[name]
self.logger.info(f"Windows Sandbox {name} stopped successfully")
return {
"success": True,
"name": name,
"status": "stopped"
}
except Exception as e:
self.logger.error(f"Failed to stop Windows Sandbox {name}: {e}")
return {
"success": False,
"error": f"Failed to stop sandbox: {str(e)}"
}
async def update_vm(self, name: str, update_opts: Dict[str, Any], storage: Optional[str] = None) -> Dict[str, Any]:
"""Update VM configuration.
Note: Windows Sandbox does not support runtime configuration updates.
The sandbox must be stopped and restarted with new configuration.
Args:
name: Name of the VM to update
update_opts: Dictionary of update options
storage: Ignored for Windows Sandbox
Returns:
Dictionary with update status and information
"""
return {
"success": False,
"error": "Windows Sandbox does not support runtime configuration updates. "
"Please stop and restart the sandbox with new configuration."
}
async def get_ip(self, name: str, storage: Optional[str] = None, retry_delay: int = 2) -> str:
"""Get the IP address of a VM, waiting indefinitely until it's available.
Args:
name: Name of the VM to get the IP for
storage: Ignored for Windows Sandbox
retry_delay: Delay between retries in seconds (default: 2)
Returns:
IP address of the VM when it becomes available
"""
total_attempts = 0
# Loop indefinitely until we get a valid IP
while True:
total_attempts += 1
# Log retry message but not on first attempt
if total_attempts > 1:
self.logger.info(f"Waiting for Windows Sandbox {name} IP address (attempt {total_attempts})...")
try:
# Get VM information
vm_info = await self.get_vm(name, storage=storage)
# Check if we got a valid IP
ip = vm_info.get("ip_address", None)
if ip and ip != "unknown" and not ip.startswith("0.0.0.0"):
self.logger.info(f"Got valid Windows Sandbox IP address: {ip}")
return ip
# Check the VM status
status = vm_info.get("status", "unknown")
# If VM is not running yet, log and wait
if status != "running":
self.logger.info(f"Windows Sandbox is not running yet (status: {status}). Waiting...")
# If VM is running but no IP yet, wait and retry
else:
self.logger.info("Windows Sandbox is running but no valid IP address yet. Waiting...")
except Exception as e:
self.logger.warning(f"Error getting Windows Sandbox {name} IP: {e}, continuing to wait...")
# Wait before next retry
await asyncio.sleep(retry_delay)
# Add progress log every 10 attempts
if total_attempts % 10 == 0:
self.logger.info(f"Still waiting for Windows Sandbox {name} IP after {total_attempts} attempts...")
async def _setup_computer_server(self, sandbox, name: str, visible: bool = False):
"""Setup the computer server in the Windows Sandbox using RPyC.
Args:
sandbox: The Windows Sandbox instance
name: Name of the sandbox
visible: Whether the opened process should be visible (default: False)
"""
try:
self.logger.info(f"Setting up computer server in sandbox {name}...")
# Read the PowerShell setup script
script_path = os.path.join(os.path.dirname(__file__), "setup_script.ps1")
with open(script_path, 'r', encoding='utf-8') as f:
setup_script_content = f.read()
# Write the setup script to the sandbox using RPyC
script_dest_path = r"C:\Users\WDAGUtilityAccount\setup_cua.ps1"
self.logger.info(f"Writing setup script to {script_dest_path}")
with sandbox.rpyc.builtin.open(script_dest_path, 'w') as f:
f.write(setup_script_content)
# Execute the PowerShell script in the background
self.logger.info("Executing setup script in sandbox...")
# Use subprocess to run PowerShell script
import subprocess
powershell_cmd = [
"powershell.exe",
"-ExecutionPolicy", "Bypass",
"-NoExit", # Keep window open after script completes
"-File", script_dest_path
]
# Set creation flags based on visibility preference
if visible:
# CREATE_NEW_CONSOLE - creates a new console window (visible)
creation_flags = 0x00000010
else:
creation_flags = 0x08000000 # CREATE_NO_WINDOW
# Start the process using RPyC
process = sandbox.rpyc.modules.subprocess.Popen(
powershell_cmd,
creationflags=creation_flags,
shell=False
)
# # Sleep for 30 seconds
# await asyncio.sleep(30)
ip = await self.get_ip(name)
self.logger.info(f"Sandbox IP: {ip}")
self.logger.info(f"Setup script started in background in sandbox {name} with PID: {process.pid}")
except Exception as e:
self.logger.error(f"Failed to setup computer server in sandbox {name}: {e}")
import traceback
self.logger.error(f"Stack trace: {traceback.format_exc()}")

View File

@@ -0,0 +1,124 @@
# Setup script for Windows Sandbox CUA Computer provider
# This script runs when the sandbox starts
Write-Host "Starting CUA Computer setup in Windows Sandbox..."
# Function to find the mapped Python installation from pywinsandbox
function Find-MappedPython {
Write-Host "Looking for mapped Python installation from pywinsandbox..."
# pywinsandbox maps the host Python installation to the sandbox
# Look for mapped shared folders on the desktop (common pywinsandbox pattern)
$desktopPath = "C:\Users\WDAGUtilityAccount\Desktop"
$sharedFolders = Get-ChildItem -Path $desktopPath -Directory -ErrorAction SilentlyContinue
foreach ($folder in $sharedFolders) {
# Look for Python executables in shared folders
$pythonPaths = @(
"$($folder.FullName)\python.exe",
"$($folder.FullName)\Scripts\python.exe",
"$($folder.FullName)\bin\python.exe"
)
foreach ($pythonPath in $pythonPaths) {
if (Test-Path $pythonPath) {
try {
$version = & $pythonPath --version 2>&1
if ($version -match "Python") {
Write-Host "Found mapped Python: $pythonPath - $version"
return $pythonPath
}
} catch {
continue
}
}
}
# Also check subdirectories that might contain Python
$subDirs = Get-ChildItem -Path $folder.FullName -Directory -ErrorAction SilentlyContinue
foreach ($subDir in $subDirs) {
$pythonPath = "$($subDir.FullName)\python.exe"
if (Test-Path $pythonPath) {
try {
$version = & $pythonPath --version 2>&1
if ($version -match "Python") {
Write-Host "Found mapped Python in subdirectory: $pythonPath - $version"
return $pythonPath
}
} catch {
continue
}
}
}
}
# Fallback: try common Python commands that might be available
$pythonCommands = @("python", "py", "python3")
foreach ($cmd in $pythonCommands) {
try {
$version = & $cmd --version 2>&1
if ($version -match "Python") {
Write-Host "Found Python via command '$cmd': $version"
return $cmd
}
} catch {
continue
}
}
throw "Could not find any Python installation (mapped or otherwise)"
}
try {
# Step 1: Find the mapped Python installation
Write-Host "Step 1: Finding mapped Python installation..."
$pythonExe = Find-MappedPython
Write-Host "Using Python: $pythonExe"
# Verify Python works and show version
$pythonVersion = & $pythonExe --version 2>&1
Write-Host "Python version: $pythonVersion"
# Step 2: Install cua-computer-server directly
Write-Host "Step 2: Installing cua-computer-server..."
Write-Host "Upgrading pip..."
& $pythonExe -m pip install --upgrade pip --quiet
Write-Host "Installing cua-computer-server..."
& $pythonExe -m pip install cua-computer-server --quiet
Write-Host "cua-computer-server installation completed."
# Step 3: Start computer server in background
Write-Host "Step 3: Starting computer server in background..."
Write-Host "Starting computer server with: $pythonExe"
# Start the computer server in the background
$serverProcess = Start-Process -FilePath $pythonExe -ArgumentList "-m", "computer_server.main" -WindowStyle Hidden -PassThru
Write-Host "Computer server started in background with PID: $($serverProcess.Id)"
# Give it a moment to start
Start-Sleep -Seconds 3
# Check if the process is still running
if (Get-Process -Id $serverProcess.Id -ErrorAction SilentlyContinue) {
Write-Host "Computer server is running successfully in background"
} else {
throw "Computer server failed to start or exited immediately"
}
} catch {
Write-Error "Setup failed: $_"
Write-Host "Error details: $($_.Exception.Message)"
Write-Host "Stack trace: $($_.ScriptStackTrace)"
Write-Host ""
Write-Host "Press any key to close this window..."
$null = $Host.UI.RawUI.ReadKey("NoEcho,IncludeKeyDown")
exit 1
}
Write-Host ""
Write-Host "Setup completed successfully!"
Write-Host "Press any key to close this window..."
$null = $Host.UI.RawUI.ReadKey("NoEcho,IncludeKeyDown")

View File

@@ -106,7 +106,15 @@ class Computer:
# The default is currently to use non-ephemeral storage
if storage and ephemeral and storage != "ephemeral":
raise ValueError("Storage path and ephemeral flag cannot be used together")
self.storage = "ephemeral" if ephemeral else storage
# Windows Sandbox always uses ephemeral storage
if self.provider_type == VMProviderType.WINSANDBOX:
if not ephemeral and storage != None and storage != "ephemeral":
self.logger.warning("Windows Sandbox storage is always ephemeral. Setting ephemeral=True.")
self.ephemeral = True
self.storage = "ephemeral"
else:
self.storage = "ephemeral" if ephemeral else storage
# For Lumier provider, store the first shared directory path to use
# for VM file sharing
@@ -285,6 +293,15 @@ class Computer:
api_key=self.api_key,
verbose=verbose,
)
elif self.provider_type == VMProviderType.WINSANDBOX:
self.config.vm_provider = VMProviderFactory.create_provider(
self.provider_type,
port=port,
host=host,
storage=storage,
verbose=verbose,
ephemeral=ephemeral,
)
else:
raise ValueError(f"Unsupported provider type: {self.provider_type}")
self._provider_context = await self.config.vm_provider.__aenter__()
@@ -383,7 +400,6 @@ class Computer:
# Wait for VM to be ready with a valid IP address
self.logger.info("Waiting for VM to be ready with a valid IP address...")
try:
# Increased values for Lumier provider which needs more time for initial setup
if self.provider_type == VMProviderType.LUMIER:
max_retries = 60 # Increased for Lumier VM startup which takes longer
retry_delay = 3 # 3 seconds between retries for Lumier
@@ -513,7 +529,7 @@ class Computer:
return
# @property
async def get_ip(self, max_retries: int = 15, retry_delay: int = 2) -> str:
async def get_ip(self, max_retries: int = 15, retry_delay: int = 3) -> str:
"""Get the IP address of the VM or localhost if using host computer server.
This method delegates to the provider's get_ip method, which waits indefinitely

View File

@@ -8,7 +8,7 @@ class InterfaceFactory:
@staticmethod
def create_interface_for_os(
os: Literal['macos', 'linux'],
os: Literal['macos', 'linux', 'windows'],
ip_address: str,
api_key: Optional[str] = None,
vm_name: Optional[str] = None
@@ -16,7 +16,7 @@ class InterfaceFactory:
"""Create an interface for the specified OS.
Args:
os: Operating system type ('macos' or 'linux')
os: Operating system type ('macos', 'linux', or 'windows')
ip_address: IP address of the computer to control
api_key: Optional API key for cloud authentication
vm_name: Optional VM name for cloud authentication
@@ -30,10 +30,13 @@ class InterfaceFactory:
# Import implementations here to avoid circular imports
from .macos import MacOSComputerInterface
from .linux import LinuxComputerInterface
from .windows import WindowsComputerInterface
if os == 'macos':
return MacOSComputerInterface(ip_address, api_key=api_key, vm_name=vm_name)
elif os == 'linux':
return LinuxComputerInterface(ip_address, api_key=api_key, vm_name=vm_name)
elif os == 'windows':
return WindowsComputerInterface(ip_address, api_key=api_key, vm_name=vm_name)
else:
raise ValueError(f"Unsupported OS type: {os}")
raise ValueError(f"Unsupported OS type: {os}")

View File

@@ -0,0 +1,687 @@
import asyncio
import json
import time
from typing import Any, Dict, List, Optional, Tuple
from PIL import Image
import websockets
from ..logger import Logger, LogLevel
from .base import BaseComputerInterface
from ..utils import decode_base64_image, encode_base64_image, bytes_to_image, draw_box, resize_image
from .models import Key, KeyType, MouseButton
class WindowsComputerInterface(BaseComputerInterface):
"""Interface for Windows."""
def __init__(self, ip_address: str, username: str = "lume", password: str = "lume", api_key: Optional[str] = None, vm_name: Optional[str] = None):
super().__init__(ip_address, username, password, api_key, vm_name)
self._ws = None
self._reconnect_task = None
self._closed = False
self._last_ping = 0
self._ping_interval = 5 # Send ping every 5 seconds
self._ping_timeout = 120 # Wait 120 seconds for pong response
self._reconnect_delay = 1 # Start with 1 second delay
self._max_reconnect_delay = 30 # Maximum delay between reconnection attempts
self._log_connection_attempts = True # Flag to control connection attempt logging
self._authenticated = False # Track authentication status
self._command_lock = asyncio.Lock() # Lock to ensure only one command at a time
# Set logger name for Windows interface
self.logger = Logger("cua.interface.windows", LogLevel.NORMAL)
@property
def ws_uri(self) -> str:
"""Get the WebSocket URI using the current IP address.
Returns:
WebSocket URI for the Computer API Server
"""
protocol = "wss" if self.api_key else "ws"
port = "8443" if self.api_key else "8000"
return f"{protocol}://{self.ip_address}:{port}/ws"
async def _keep_alive(self):
"""Keep the WebSocket connection alive with automatic reconnection."""
retry_count = 0
max_log_attempts = 1 # Only log the first attempt at INFO level
log_interval = 500 # Then log every 500th attempt (significantly increased from 30)
last_warning_time = 0
min_warning_interval = 30 # Minimum seconds between connection lost warnings
min_retry_delay = 0.5 # Minimum delay between connection attempts (500ms)
while not self._closed:
try:
if self._ws is None or (
self._ws and self._ws.state == websockets.protocol.State.CLOSED
):
try:
retry_count += 1
# Add a minimum delay between connection attempts to avoid flooding
if retry_count > 1:
await asyncio.sleep(min_retry_delay)
# Only log the first attempt at INFO level, then every Nth attempt
if retry_count == 1:
self.logger.info(f"Attempting WebSocket connection to {self.ws_uri}")
elif retry_count % log_interval == 0:
self.logger.info(
f"Still attempting WebSocket connection (attempt {retry_count})..."
)
else:
# All other attempts are logged at DEBUG level
self.logger.debug(
f"Attempting WebSocket connection to {self.ws_uri} (attempt {retry_count})"
)
self._ws = await asyncio.wait_for(
websockets.connect(
self.ws_uri,
max_size=1024 * 1024 * 10, # 10MB limit
max_queue=32,
ping_interval=self._ping_interval,
ping_timeout=self._ping_timeout,
close_timeout=5,
compression=None, # Disable compression to reduce overhead
),
timeout=120,
)
self.logger.info("WebSocket connection established")
# Authentication will be handled by the first command that needs it
# Don't do authentication here to avoid recv conflicts
self._reconnect_delay = 1 # Reset reconnect delay on successful connection
self._last_ping = time.time()
retry_count = 0 # Reset retry count on successful connection
self._authenticated = False # Reset auth status on new connection
except (asyncio.TimeoutError, websockets.exceptions.WebSocketException) as e:
next_retry = self._reconnect_delay
# Only log the first error at WARNING level, then every Nth attempt
if retry_count == 1:
self.logger.warning(
f"Computer API Server not ready yet. Will retry automatically."
)
elif retry_count % log_interval == 0:
self.logger.warning(
f"Still waiting for Computer API Server (attempt {retry_count})..."
)
else:
# All other errors are logged at DEBUG level
self.logger.debug(f"Connection attempt {retry_count} failed: {e}")
if self._ws:
try:
await self._ws.close()
except:
pass
self._ws = None
# Regular ping to check connection
if self._ws and self._ws.state == websockets.protocol.State.OPEN:
try:
if time.time() - self._last_ping >= self._ping_interval:
pong_waiter = await self._ws.ping()
await asyncio.wait_for(pong_waiter, timeout=self._ping_timeout)
self._last_ping = time.time()
except Exception as e:
self.logger.debug(f"Ping failed: {e}")
if self._ws:
try:
await self._ws.close()
except:
pass
self._ws = None
continue
await asyncio.sleep(1)
except Exception as e:
current_time = time.time()
# Only log connection lost warnings at most once every min_warning_interval seconds
if current_time - last_warning_time >= min_warning_interval:
self.logger.warning(
f"Computer API Server connection lost. Will retry automatically."
)
last_warning_time = current_time
else:
# Log at debug level instead
self.logger.debug(f"Connection lost: {e}")
if self._ws:
try:
await self._ws.close()
except:
pass
self._ws = None
async def _ensure_connection(self):
"""Ensure WebSocket connection is established."""
if self._reconnect_task is None or self._reconnect_task.done():
self._reconnect_task = asyncio.create_task(self._keep_alive())
retry_count = 0
max_retries = 5
while retry_count < max_retries:
try:
if self._ws and self._ws.state == websockets.protocol.State.OPEN:
return
retry_count += 1
await asyncio.sleep(1)
except Exception as e:
# Only log at ERROR level for the last retry attempt
if retry_count == max_retries - 1:
self.logger.error(
f"Persistent connection check error after {retry_count} attempts: {e}"
)
else:
self.logger.debug(f"Connection check error (attempt {retry_count}): {e}")
retry_count += 1
await asyncio.sleep(1)
continue
raise ConnectionError("Failed to establish WebSocket connection after multiple retries")
async def _send_command(self, command: str, params: Optional[Dict] = None) -> Dict[str, Any]:
"""Send command through WebSocket."""
max_retries = 3
retry_count = 0
last_error = None
# Acquire lock to ensure only one command is processed at a time
async with self._command_lock:
self.logger.debug(f"Acquired lock for command: {command}")
while retry_count < max_retries:
try:
await self._ensure_connection()
if not self._ws:
raise ConnectionError("WebSocket connection is not established")
# Handle authentication if needed
if self.api_key and self.vm_name and not self._authenticated:
self.logger.info("Performing authentication handshake...")
auth_message = {
"command": "authenticate",
"params": {
"api_key": self.api_key,
"container_name": self.vm_name
}
}
await self._ws.send(json.dumps(auth_message))
# Wait for authentication response
auth_response = await asyncio.wait_for(self._ws.recv(), timeout=10)
auth_result = json.loads(auth_response)
if not auth_result.get("success"):
error_msg = auth_result.get("error", "Authentication failed")
self.logger.error(f"Authentication failed: {error_msg}")
self._authenticated = False
raise ConnectionError(f"Authentication failed: {error_msg}")
self.logger.info("Authentication successful")
self._authenticated = True
message = {"command": command, "params": params or {}}
await self._ws.send(json.dumps(message))
response = await asyncio.wait_for(self._ws.recv(), timeout=30)
self.logger.debug(f"Completed command: {command}")
return json.loads(response)
except Exception as e:
last_error = e
retry_count += 1
if retry_count < max_retries:
# Only log at debug level for intermediate retries
self.logger.debug(
f"Command '{command}' failed (attempt {retry_count}/{max_retries}): {e}"
)
await asyncio.sleep(1)
continue
else:
# Only log at error level for the final failure
self.logger.error(
f"Failed to send command '{command}' after {max_retries} retries"
)
self.logger.debug(f"Command failure details: {e}")
raise last_error if last_error else RuntimeError("Failed to send command")
async def wait_for_ready(self, timeout: int = 60, interval: float = 1.0):
"""Wait for WebSocket connection to become available."""
start_time = time.time()
last_error = None
attempt_count = 0
progress_interval = 10 # Log progress every 10 seconds
last_progress_time = start_time
# Disable detailed logging for connection attempts
self._log_connection_attempts = False
try:
self.logger.info(
f"Waiting for Computer API Server to be ready (timeout: {timeout}s)..."
)
# Start the keep-alive task if it's not already running
if self._reconnect_task is None or self._reconnect_task.done():
self._reconnect_task = asyncio.create_task(self._keep_alive())
# Wait for the connection to be established
while time.time() - start_time < timeout:
try:
attempt_count += 1
current_time = time.time()
# Log progress periodically without flooding logs
if current_time - last_progress_time >= progress_interval:
elapsed = current_time - start_time
self.logger.info(
f"Still waiting for Computer API Server... (elapsed: {elapsed:.1f}s, attempts: {attempt_count})"
)
last_progress_time = current_time
# Check if we have a connection
if self._ws and self._ws.state == websockets.protocol.State.OPEN:
# Test the connection with a simple command
try:
await self._send_command("get_screen_size")
elapsed = time.time() - start_time
self.logger.info(
f"Computer API Server is ready (after {elapsed:.1f}s, {attempt_count} attempts)"
)
return # Connection is fully working
except Exception as e:
last_error = e
self.logger.debug(f"Connection test failed: {e}")
# Wait before trying again
await asyncio.sleep(interval)
except Exception as e:
last_error = e
self.logger.debug(f"Connection attempt {attempt_count} failed: {e}")
await asyncio.sleep(interval)
# If we get here, we've timed out
error_msg = f"Could not connect to {self.ip_address} after {timeout} seconds"
if last_error:
error_msg += f": {str(last_error)}"
self.logger.error(error_msg)
raise TimeoutError(error_msg)
finally:
# Reset to default logging behavior
self._log_connection_attempts = False
def close(self):
"""Close WebSocket connection.
Note: In host computer server mode, we leave the connection open
to allow other clients to connect to the same server. The server
will handle cleaning up idle connections.
"""
# Only cancel the reconnect task
if self._reconnect_task:
self._reconnect_task.cancel()
# Don't set closed flag or close websocket by default
# This allows the server to stay connected for other clients
# self._closed = True
# if self._ws:
# asyncio.create_task(self._ws.close())
# self._ws = None
def force_close(self):
"""Force close the WebSocket connection.
This method should be called when you want to completely
shut down the connection, not just for regular cleanup.
"""
self._closed = True
if self._reconnect_task:
self._reconnect_task.cancel()
if self._ws:
asyncio.create_task(self._ws.close())
self._ws = None
# Mouse Actions
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> None:
await self._send_command("mouse_down", {"x": x, "y": y, "button": button})
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> None:
await self._send_command("mouse_up", {"x": x, "y": y, "button": button})
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
await self._send_command("left_click", {"x": x, "y": y})
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
await self._send_command("right_click", {"x": x, "y": y})
async def double_click(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
await self._send_command("double_click", {"x": x, "y": y})
async def move_cursor(self, x: int, y: int) -> None:
await self._send_command("move_cursor", {"x": x, "y": y})
async def drag_to(self, x: int, y: int, button: "MouseButton" = "left", duration: float = 0.5) -> None:
await self._send_command(
"drag_to", {"x": x, "y": y, "button": button, "duration": duration}
)
async def drag(self, path: List[Tuple[int, int]], button: "MouseButton" = "left", duration: float = 0.5) -> None:
await self._send_command(
"drag", {"path": path, "button": button, "duration": duration}
)
# Keyboard Actions
async def key_down(self, key: "KeyType") -> None:
await self._send_command("key_down", {"key": key})
async def key_up(self, key: "KeyType") -> None:
await self._send_command("key_up", {"key": key})
async def type_text(self, text: str) -> None:
# For Windows, use clipboard for Unicode text like Linux
if any(ord(char) > 127 for char in text):
# For Unicode text, use clipboard and paste
await self.set_clipboard(text)
await self.hotkey(Key.CTRL, 'v') # Windows uses Ctrl+V instead of Cmd+V
else:
# For ASCII text, use the regular typing method
await self._send_command("type_text", {"text": text})
async def press(self, key: "KeyType") -> None:
"""Press a single key.
Args:
key: The key to press. Can be any of:
- A Key enum value (recommended), e.g. Key.PAGE_DOWN
- A direct key value string, e.g. 'pagedown'
- A single character string, e.g. 'a'
Examples:
```python
# Using enum (recommended)
await interface.press(Key.PAGE_DOWN)
await interface.press(Key.ENTER)
# Using direct values
await interface.press('pagedown')
await interface.press('enter')
# Using single characters
await interface.press('a')
```
Raises:
ValueError: If the key type is invalid or the key is not recognized
"""
if isinstance(key, Key):
actual_key = key.value
elif isinstance(key, str):
# Try to convert to enum if it matches a known key
key_or_enum = Key.from_string(key)
actual_key = key_or_enum.value if isinstance(key_or_enum, Key) else key_or_enum
else:
raise ValueError(f"Invalid key type: {type(key)}. Must be Key enum or string.")
await self._send_command("press_key", {"key": actual_key})
async def press_key(self, key: "KeyType") -> None:
"""DEPRECATED: Use press() instead.
This method is kept for backward compatibility but will be removed in a future version.
Please use the press() method instead.
"""
await self.press(key)
async def hotkey(self, *keys: "KeyType") -> None:
"""Press multiple keys simultaneously.
Args:
*keys: Multiple keys to press simultaneously. Each key can be any of:
- A Key enum value (recommended), e.g. Key.CTRL
- A direct key value string, e.g. 'ctrl'
- A single character string, e.g. 'a'
Examples:
```python
# Using enums (recommended)
await interface.hotkey(Key.CTRL, Key.C) # Copy
await interface.hotkey(Key.CTRL, Key.V) # Paste
# Using mixed formats
await interface.hotkey(Key.CTRL, 'a') # Select all
```
Raises:
ValueError: If any key type is invalid or not recognized
"""
actual_keys = []
for key in keys:
if isinstance(key, Key):
actual_keys.append(key.value)
elif isinstance(key, str):
# Try to convert to enum if it matches a known key
key_or_enum = Key.from_string(key)
actual_keys.append(key_or_enum.value if isinstance(key_or_enum, Key) else key_or_enum)
else:
raise ValueError(f"Invalid key type: {type(key)}. Must be Key enum or string.")
await self._send_command("hotkey", {"keys": actual_keys})
# Scrolling Actions
async def scroll(self, x: int, y: int) -> None:
await self._send_command("scroll", {"x": x, "y": y})
async def scroll_down(self, clicks: int = 1) -> None:
await self._send_command("scroll_down", {"clicks": clicks})
async def scroll_up(self, clicks: int = 1) -> None:
await self._send_command("scroll_up", {"clicks": clicks})
# Screen Actions
async def screenshot(
self,
boxes: Optional[List[Tuple[int, int, int, int]]] = None,
box_color: str = "#FF0000",
box_thickness: int = 2,
scale_factor: float = 1.0,
) -> bytes:
"""Take a screenshot with optional box drawing and scaling.
Args:
boxes: Optional list of (x, y, width, height) tuples defining boxes to draw in screen coordinates
box_color: Color of the boxes in hex format (default: "#FF0000" red)
box_thickness: Thickness of the box borders in pixels (default: 2)
scale_factor: Factor to scale the final image by (default: 1.0)
Use > 1.0 to enlarge, < 1.0 to shrink (e.g., 0.5 for half size, 2.0 for double)
Returns:
bytes: The screenshot image data, optionally with boxes drawn on it and scaled
"""
result = await self._send_command("screenshot")
if not result.get("image_data"):
raise RuntimeError("Failed to take screenshot")
screenshot = decode_base64_image(result["image_data"])
if boxes:
# Get the natural scaling between screen and screenshot
screen_size = await self.get_screen_size()
screenshot_width, screenshot_height = bytes_to_image(screenshot).size
width_scale = screenshot_width / screen_size["width"]
height_scale = screenshot_height / screen_size["height"]
# Scale box coordinates from screen space to screenshot space
for box in boxes:
scaled_box = (
int(box[0] * width_scale), # x
int(box[1] * height_scale), # y
int(box[2] * width_scale), # width
int(box[3] * height_scale), # height
)
screenshot = draw_box(
screenshot,
x=scaled_box[0],
y=scaled_box[1],
width=scaled_box[2],
height=scaled_box[3],
color=box_color,
thickness=box_thickness,
)
if scale_factor != 1.0:
screenshot = resize_image(screenshot, scale_factor)
return screenshot
async def get_screen_size(self) -> Dict[str, int]:
result = await self._send_command("get_screen_size")
if result["success"] and result["size"]:
return result["size"]
raise RuntimeError("Failed to get screen size")
async def get_cursor_position(self) -> Dict[str, int]:
result = await self._send_command("get_cursor_position")
if result["success"] and result["position"]:
return result["position"]
raise RuntimeError("Failed to get cursor position")
# Clipboard Actions
async def copy_to_clipboard(self) -> str:
result = await self._send_command("copy_to_clipboard")
if result["success"] and result["content"]:
return result["content"]
raise RuntimeError("Failed to get clipboard content")
async def set_clipboard(self, text: str) -> None:
await self._send_command("set_clipboard", {"text": text})
# File System Actions
async def file_exists(self, path: str) -> bool:
result = await self._send_command("file_exists", {"path": path})
return result.get("exists", False)
async def directory_exists(self, path: str) -> bool:
result = await self._send_command("directory_exists", {"path": path})
return result.get("exists", False)
async def list_dir(self, path: str) -> list[str]:
result = await self._send_command("list_dir", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to list directory"))
return result.get("files", [])
async def read_text(self, path: str) -> str:
result = await self._send_command("read_text", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to read file"))
return result.get("content", "")
async def write_text(self, path: str, content: str) -> None:
result = await self._send_command("write_text", {"path": path, "content": content})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to write file"))
async def read_bytes(self, path: str) -> bytes:
result = await self._send_command("read_bytes", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to read file"))
content_b64 = result.get("content_b64", "")
return decode_base64_image(content_b64)
async def write_bytes(self, path: str, content: bytes) -> None:
result = await self._send_command("write_bytes", {"path": path, "content_b64": encode_base64_image(content)})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to write file"))
async def delete_file(self, path: str) -> None:
result = await self._send_command("delete_file", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to delete file"))
async def create_dir(self, path: str) -> None:
result = await self._send_command("create_dir", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to create directory"))
async def delete_dir(self, path: str) -> None:
result = await self._send_command("delete_dir", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to delete directory"))
async def run_command(self, command: str) -> Tuple[str, str]:
result = await self._send_command("run_command", {"command": command})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to run command"))
return result.get("stdout", ""), result.get("stderr", "")
# Accessibility Actions
async def get_accessibility_tree(self) -> Dict[str, Any]:
"""Get the accessibility tree of the current screen."""
result = await self._send_command("get_accessibility_tree")
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to get accessibility tree"))
return result
async def get_active_window_bounds(self) -> Dict[str, int]:
"""Get the bounds of the currently active window."""
result = await self._send_command("get_active_window_bounds")
if result["success"] and result["bounds"]:
return result["bounds"]
raise RuntimeError("Failed to get active window bounds")
async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""Convert screenshot coordinates to screen coordinates.
Args:
x: X coordinate in screenshot space
y: Y coordinate in screenshot space
Returns:
tuple[float, float]: (x, y) coordinates in screen space
"""
screen_size = await self.get_screen_size()
screenshot = await self.screenshot()
screenshot_img = bytes_to_image(screenshot)
screenshot_width, screenshot_height = screenshot_img.size
# Calculate scaling factors
width_scale = screen_size["width"] / screenshot_width
height_scale = screen_size["height"] / screenshot_height
# Convert coordinates
screen_x = x * width_scale
screen_y = y * height_scale
return screen_x, screen_y
async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""Convert screen coordinates to screenshot coordinates.
Args:
x: X coordinate in screen space
y: Y coordinate in screen space
Returns:
tuple[float, float]: (x, y) coordinates in screenshot space
"""
screen_size = await self.get_screen_size()
screenshot = await self.screenshot()
screenshot_img = bytes_to_image(screenshot)
screenshot_width, screenshot_height = screenshot_img.size
# Calculate scaling factors
width_scale = screenshot_width / screen_size["width"]
height_scale = screenshot_height / screen_size["height"]
# Convert coordinates
screenshot_x = x * width_scale
screenshot_y = y * height_scale
return screenshot_x, screenshot_y

View File

@@ -10,6 +10,7 @@ class VMProviderType(StrEnum):
LUME = "lume"
LUMIER = "lumier"
CLOUD = "cloud"
WINSANDBOX = "winsandbox"
UNKNOWN = "unknown"

View File

@@ -112,5 +112,27 @@ class VMProviderFactory:
"The CloudProvider is not fully implemented yet. "
"Please use LUME or LUMIER provider instead."
) from e
elif provider_type == VMProviderType.WINSANDBOX:
try:
from .winsandbox import WinSandboxProvider, HAS_WINSANDBOX
if not HAS_WINSANDBOX:
raise ImportError(
"pywinsandbox is required for WinSandboxProvider. "
"Please install it with 'pip install -U git+https://github.com/karkason/pywinsandbox.git'"
)
return WinSandboxProvider(
port=port,
host=host,
storage=storage,
verbose=verbose,
ephemeral=ephemeral,
**kwargs
)
except ImportError as e:
logger.error(f"Failed to import WinSandboxProvider: {e}")
raise ImportError(
"pywinsandbox is required for WinSandboxProvider. "
"Please install it with 'pip install -U git+https://github.com/karkason/pywinsandbox.git'"
) from e
else:
raise ValueError(f"Unsupported provider type: {provider_type}")

View File

@@ -0,0 +1,15 @@
"""
Main entry point for computer.ui module.
This allows running the computer UI with:
python -m computer.ui
Instead of:
python -m computer.ui.gradio.app
"""
from .gradio.app import create_gradio_ui
if __name__ == "__main__":
app = create_gradio_ui()
app.launch()

View File

@@ -529,10 +529,10 @@ async def execute(name, action, arguments):
return results
async def handle_init_computer(os_choice: str, app_list=None, provider="lume", container_name=None, api_key=None):
"""Initialize the computer instance and tools for macOS or Ubuntu
"""Initialize the computer instance and tools for macOS or Ubuntu or Windows
Args:
os_choice: The OS to use ("macOS" or "Ubuntu")
os_choice: The OS to use ("macOS" or "Ubuntu" or "Windows")
app_list: Optional list of apps to focus on using the app-use experiment
provider: The provider to use ("lume" or "self" or "cloud")
container_name: The container name to use for cloud provider
@@ -550,6 +550,9 @@ async def handle_init_computer(os_choice: str, app_list=None, provider="lume", c
if os_choice == "Ubuntu":
os_type_str = "linux"
image_str = "ubuntu-noble-vanilla:latest"
elif os_choice == "Windows":
os_type_str = "windows"
image_str = "windows-11-vanilla:latest"
else:
os_type_str = "macos"
image_str = "macos-sequoia-cua:latest"
@@ -571,6 +574,12 @@ async def handle_init_computer(os_choice: str, app_list=None, provider="lume", c
api_key=cloud_api_key,
experiments=experiments
)
elif provider == "winsandbox":
computer = Computer(
os_type="windows",
provider_type=VMProviderType.WINSANDBOX,
experiments=experiments
)
else:
computer = Computer(
image=image_str,
@@ -1081,14 +1090,14 @@ def create_gradio_ui():
with gr.Row():
os_choice = gr.Radio(
label="OS",
choices=["macOS", "Ubuntu"],
choices=["macOS", "Ubuntu", "Windows"],
value="macOS",
)
# Provider selection radio
provider_choice = gr.Radio(
label="Provider",
choices=["lume", "self", "cloud"],
choices=["lume", "self", "cloud", "winsandbox"],
value="lume",
info="'lume' uses a VM, 'self' uses the host computer server, 'cloud' uses a cloud container"
)