Merge pull request #281 from trycua/feat/python-venvs

[Computer/Agent] Add Sandboxed Python Execution API, Stability Fixes
This commit is contained in:
ddupont
2025-06-05 11:57:46 -04:00
committed by GitHub
16 changed files with 1805 additions and 153 deletions

View File

@@ -163,9 +163,11 @@ async def main():
loop="uitars",
model=LLM(provider="mlxvlm", name="mlx-community/UI-TARS-1.5-7B-6bit")
)
await agent.run("Find the trycua/cua repository on GitHub and follow the quick start guide")
async for result in agent.run("Find the trycua/cua repository on GitHub and follow the quick start guide"):
print(result)
main()
if __name__ == "__main__":
asyncio.run(main())
```
For ready-to-use examples, check out our [Notebooks](./notebooks/) collection.
@@ -273,6 +275,25 @@ await computer.interface.run_command(cmd) # Run shell command
# Accessibility
await computer.interface.get_accessibility_tree() # Get accessibility tree
# Python Virtual Environment Operations
await computer.venv_install("demo_venv", ["requests", "macos-pyxa"]) # Install packages in a virtual environment
await computer.venv_cmd("demo_venv", "python -c 'import requests; print(requests.get(`https://httpbin.org/ip`).json())'") # Run a shell command in a virtual environment
await computer.venv_exec("demo_venv", python_function_or_code, *args, **kwargs) # Run a Python function in a virtual environment and return the result / raise an exception
# Example: Use sandboxed functions to execute code in a C/ua Container
from computer.helpers import sandboxed
@sandboxed("demo_venv")
def greet_and_print(name, html_snippet_length=200):
# get .html of the current Safari tab
import PyXA
safari = PyXA.Application("Safari")
html = safari.current_document.source()
print(f"Hello from inside the container, {name}!")
print("Safari HTML length:", len(html))
return {"greeted": name, "safari_html_length": len(html), "safari_html_snippet": html[:html_snippet_length]}
result = await greet_and_print("C/ua", html_snippet_length=100) # Executes in the container
print("Result from sandboxed function:", result)
```
## ComputerAgent Reference

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,54 @@
from pathlib import Path
import os
import sys
# Load environment variables from .env file
project_root = Path(__file__).parent.parent
env_file = project_root / ".env"
print(f"Loading environment from: {env_file}")
from dotenv import load_dotenv
load_dotenv(env_file)
# Add paths to sys.path if needed
pythonpath = os.environ.get("PYTHONPATH", "")
for path in pythonpath.split(":"):
if path and path not in sys.path:
sys.path.insert(0, path) # Insert at beginning to prioritize
print(f"Added to sys.path: {path}")
import asyncio
from computer.computer import Computer
from computer.helpers import sandboxed
async def main():
# Initialize the computer in a C/ua Container
computer = Computer()
await computer.run()
# Install a package in a virtual environment in the container
await computer.venv_install("demo_venv", ["requests", "macos-pyxa"])
# Open Safari
await computer.interface.run_command("open -a Safari")
await asyncio.sleep(2)
# Define a sandboxed function
# This function will run inside the C/ua Container
@sandboxed("demo_venv")
def greet_and_print(name):
# get .html of the current Safari tab
import PyXA
safari = PyXA.Application("Safari")
current_doc = safari.current_document
html = current_doc.source()
print(f"Hello from inside the container, {name}!")
print("Safari HTML length:", len(html))
return {"greeted": name, "safari_html_length": len(html), "safari_html_snippet": html[:200]}
# Call with args and kwargs
result = await greet_and_print("C/ua")
print("Result from sandboxed function:", result)
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -478,17 +478,11 @@ class ComputerTool(BaseComputerTool, BaseAnthropicTool):
if direction == "down":
# Scroll down (Page Down on macOS)
self.logger.info(f"Scrolling down, amount: {amount}")
# Use fn+down for page down on macOS
for _ in range(amount):
await self.computer.interface.hotkey("fn", "down")
await asyncio.sleep(0.1)
await self.computer.interface.scroll_down(amount)
else:
# Scroll up (Page Up on macOS)
self.logger.info(f"Scrolling up, amount: {amount}")
# Use fn+up for page up on macOS
for _ in range(amount):
await self.computer.interface.hotkey("fn", "up")
await asyncio.sleep(0.1)
await self.computer.interface.scroll_up(amount)
# Wait briefly for UI changes
await asyncio.sleep(0.5)

View File

@@ -36,11 +36,21 @@ class Diorama:
cls._ensure_scheduler()
return cls(args).computer
# Dictionary to store cursor positions for each unique app_list hash
_cursor_positions = {}
def __init__(self, app_list):
self.app_list = app_list
self.interface = self.Interface(self)
self.computer = DioramaComputer(self)
self.focus_context = None
# Create a hash for this app_list to use as a key
self.app_list_hash = hash(tuple(sorted(app_list)))
# Initialize cursor position for this app_list if it doesn't exist
if self.app_list_hash not in Diorama._cursor_positions:
Diorama._cursor_positions[self.app_list_hash] = (0, 0)
@classmethod
def _ensure_scheduler(cls):
@@ -70,7 +80,6 @@ class Diorama:
with focus_context:
try:
if action == "screenshot":
app_whitelist = list(args["app_list"])
logger.info(f"Taking screenshot for apps: {app_whitelist}")
result, img = capture_all_apps(
app_whitelist=app_whitelist,
@@ -84,6 +93,7 @@ class Diorama:
elif action in ["left_click", "right_click", "double_click", "move_cursor", "drag_to"]:
x = args.get("x")
y = args.get("y")
duration = args.get("duration", 0.5)
if action == "left_click":
await automation_handler.left_click(x, y)
@@ -98,6 +108,11 @@ class Diorama:
if future:
future.set_result(None)
elif action in ["scroll_up", "scroll_down"]:
x = args.get("x")
y = args.get("y")
if x is not None and y is not None:
await automation_handler.move_cursor(x, y)
clicks = args.get("clicks", 1)
if action == "scroll_up":
await automation_handler.scroll_up(clicks)
@@ -175,22 +190,57 @@ class Diorama:
return img
async def left_click(self, x, y):
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
x, y = x or last_pos[0], y or last_pos[1]
# Update cursor position for this app_list hash
Diorama._cursor_positions[app_list_hash] = (x, y)
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("left_click", {"x": sx, "y": sy})
async def right_click(self, x, y):
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
x, y = x or last_pos[0], y or last_pos[1]
# Update cursor position for this app_list hash
Diorama._cursor_positions[app_list_hash] = (x, y)
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("right_click", {"x": sx, "y": sy})
async def double_click(self, x, y):
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
x, y = x or last_pos[0], y or last_pos[1]
# Update cursor position for this app_list hash
Diorama._cursor_positions[app_list_hash] = (x, y)
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("double_click", {"x": sx, "y": sy})
async def move_cursor(self, x, y):
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
x, y = x or last_pos[0], y or last_pos[1]
# Update cursor position for this app_list hash
Diorama._cursor_positions[app_list_hash] = (x, y)
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("move_cursor", {"x": sx, "y": sy})
async def drag_to(self, x, y, duration=0.5):
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
x, y = x or last_pos[0], y or last_pos[1]
# Update cursor position for this app_list hash
Diorama._cursor_positions[app_list_hash] = (x, y)
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("drag_to", {"x": sx, "y": sy, "duration": duration})
@@ -207,10 +257,20 @@ class Diorama:
await self._send_cmd("hotkey", {"keys": list(keys)})
async def scroll_up(self, clicks: int = 1):
await self._send_cmd("scroll_up", {"clicks": clicks})
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
x, y = last_pos[0], last_pos[1]
await self._send_cmd("scroll_up", {"clicks": clicks, "x": x, "y": y})
async def scroll_down(self, clicks: int = 1):
await self._send_cmd("scroll_down", {"clicks": clicks})
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
x, y = last_pos[0], last_pos[1]
await self._send_cmd("scroll_down", {"clicks": clicks, "x": x, "y": y})
async def get_screen_size(self) -> dict[str, int]:
if not self._scene_size:

View File

@@ -377,7 +377,7 @@ def draw_desktop_screenshot(app_whitelist: List[str] = None, all_windows: List[D
dock_orientation = "side" if dock_bounds["width"] < dock_bounds["height"] else "bottom"
menubar_length = max(item["bounds"]["x"] + item["bounds"]["width"] for item in menubar_items)
menubar_length = max(item["bounds"]["x"] + item["bounds"]["width"] for item in menubar_items) if menubar_items else 0
# Calculate bounds of app windows
app_bounds = {

View File

@@ -1,4 +1,7 @@
import pyautogui
from pynput.mouse import Button, Controller as MouseController
from pynput.keyboard import Key, Controller as KeyboardController
import time
import base64
from io import BytesIO
from typing import Optional, Dict, Any, List, Tuple
@@ -336,7 +339,6 @@ class UIElement:
"position": position,
"size": size,
"enabled": self.enabled,
"focused": self.focused,
"bbox": self.bbox,
"visible_bbox": self.visible_bbox,
"children": children_to_dict(self.children),
@@ -527,11 +529,14 @@ class MacOSAccessibilityHandler(BaseAccessibilityHandler):
class MacOSAutomationHandler(BaseAutomationHandler):
# Mouse Actions
mouse = MouseController()
keyboard = KeyboardController()
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.click()
self.mouse.position = (x, y)
self.mouse.click(Button.left, 1)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
@@ -539,8 +544,8 @@ class MacOSAutomationHandler(BaseAutomationHandler):
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.rightClick()
self.mouse.position = (x, y)
self.mouse.click(Button.right, 1)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
@@ -550,15 +555,15 @@ class MacOSAutomationHandler(BaseAutomationHandler):
) -> Dict[str, Any]:
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.doubleClick(interval=0.1)
self.mouse.position = (x, y)
self.mouse.click(Button.left, 2)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
try:
pyautogui.moveTo(x, y)
self.mouse.position = (x, y)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
@@ -567,9 +572,26 @@ class MacOSAutomationHandler(BaseAutomationHandler):
self, x: int, y: int, button: str = "left", duration: float = 0.5
) -> Dict[str, Any]:
try:
pyautogui.dragTo(x, y, button=button, duration=duration)
btn = Button.left if button == "left" else Button.right
# Press
self.mouse.press(btn)
# Move with sleep to simulate drag duration
start = self.mouse.position
steps = 20
start_x, start_y = start
dx = (x - start_x) / steps
dy = (y - start_y) / steps
for i in range(steps):
self.mouse.position = (int(start_x + dx * (i + 1)), int(start_y + dy * (i + 1)))
time.sleep(duration / steps)
# Release
self.mouse.release(btn)
return {"success": True}
except Exception as e:
try:
self.mouse.release(btn)
except:
pass
return {"success": False, "error": str(e)}
async def drag(
@@ -578,29 +600,19 @@ class MacOSAutomationHandler(BaseAutomationHandler):
try:
if not path or len(path) < 2:
return {"success": False, "error": "Path must contain at least 2 points"}
btn = Button.left if button == "left" else Button.right
# Move to the first point
start_x, start_y = path[0]
pyautogui.moveTo(start_x, start_y)
# Press the mouse button
pyautogui.mouseDown(button=button)
# Calculate time between points to distribute duration evenly
self.mouse.position = path[0]
self.mouse.press(btn)
step_duration = duration / (len(path) - 1) if len(path) > 1 else duration
# Move through each subsequent point
for x, y in path[1:]:
pyautogui.moveTo(x, y, duration=step_duration)
# Release the mouse button
pyautogui.mouseUp(button=button)
self.mouse.position = (x, y)
time.sleep(step_duration)
self.mouse.release(btn)
return {"success": True}
except Exception as e:
# Make sure to release the mouse button if an error occurs
try:
pyautogui.mouseUp(button=button)
self.mouse.release(btn)
except:
pass
return {"success": False, "error": str(e)}
@@ -608,7 +620,7 @@ class MacOSAutomationHandler(BaseAutomationHandler):
# Keyboard Actions
async def type_text(self, text: str) -> Dict[str, Any]:
try:
pyautogui.write(text)
self.keyboard.type(text)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
@@ -630,14 +642,14 @@ class MacOSAutomationHandler(BaseAutomationHandler):
# Scrolling Actions
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
try:
pyautogui.scroll(-clicks)
self.mouse.scroll(0, -clicks)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
try:
pyautogui.scroll(clicks)
self.mouse.scroll(0, clicks)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
@@ -668,8 +680,8 @@ class MacOSAutomationHandler(BaseAutomationHandler):
async def get_cursor_position(self) -> Dict[str, Any]:
try:
pos = pyautogui.position()
return {"success": True, "position": {"x": pos.x, "y": pos.y}}
x, y = self.mouse.position
return {"success": True, "position": {"x": x, "y": y}}
except Exception as e:
return {"success": False, "error": str(e)}

View File

@@ -17,6 +17,7 @@ dependencies = [
"uvicorn[standard]>=0.27.0",
"pydantic>=2.0.0",
"pyautogui>=0.9.54",
"pynput>=1.8.1",
"pillow>=10.2.0",
"aiohttp>=3.9.1"
]

View File

@@ -11,6 +11,7 @@ import json
import logging
from .telemetry import record_computer_initialization
import os
from . import helpers
# Import provider related modules
from .providers.base import VMProviderType
@@ -460,6 +461,10 @@ class Computer:
# Set the initialization flag and clear the initializing flag
self._initialized = True
# Set this instance as the default computer for remote decorators
helpers.set_default_computer(self)
self.logger.info("Computer successfully initialized")
except Exception as e:
raise
@@ -722,3 +727,177 @@ class Computer:
tuple[float, float]: (x, y) coordinates in screenshot space
"""
return await self.interface.to_screenshot_coordinates(x, y)
# Add virtual environment management functions to computer interface
async def venv_install(self, venv_name: str, requirements: list[str]) -> tuple[str, str]:
"""Install packages in a virtual environment.
Args:
venv_name: Name of the virtual environment
requirements: List of package requirements to install
Returns:
Tuple of (stdout, stderr) from the installation command
"""
requirements = requirements or []
# Create virtual environment if it doesn't exist
venv_path = f"~/.venvs/{venv_name}"
create_cmd = f"mkdir -p ~/.venvs && python3 -m venv {venv_path}"
# Check if venv exists, if not create it
check_cmd = f"test -d {venv_path} || ({create_cmd})"
_, _ = await self.interface.run_command(check_cmd)
# Install packages
requirements_str = " ".join(requirements)
install_cmd = f". {venv_path}/bin/activate && pip install {requirements_str}"
return await self.interface.run_command(install_cmd)
async def venv_cmd(self, venv_name: str, command: str) -> tuple[str, str]:
"""Execute a shell command in a virtual environment.
Args:
venv_name: Name of the virtual environment
command: Shell command to execute in the virtual environment
Returns:
Tuple of (stdout, stderr) from the command execution
"""
venv_path = f"~/.venvs/{venv_name}"
# Check if virtual environment exists
check_cmd = f"test -d {venv_path}"
stdout, stderr = await self.interface.run_command(check_cmd)
if stderr or "test:" in stdout: # venv doesn't exist
return "", f"Virtual environment '{venv_name}' does not exist. Create it first using venv_install."
# Activate virtual environment and run command
full_command = f". {venv_path}/bin/activate && {command}"
return await self.interface.run_command(full_command)
async def venv_exec(self, venv_name: str, python_func, *args, **kwargs):
"""Execute Python function in a virtual environment using source code extraction.
Args:
venv_name: Name of the virtual environment
python_func: A callable function to execute
*args: Positional arguments to pass to the function
**kwargs: Keyword arguments to pass to the function
Returns:
The result of the function execution, or raises any exception that occurred
"""
import base64
import inspect
import json
import textwrap
try:
# Get function source code using inspect.getsource
source = inspect.getsource(python_func)
# Remove common leading whitespace (dedent)
func_source = textwrap.dedent(source).strip()
# Remove decorators
while func_source.lstrip().startswith("@"):
func_source = func_source.split("\n", 1)[1].strip()
# Get function name for execution
func_name = python_func.__name__
# Serialize args and kwargs as JSON (safer than dill for cross-version compatibility)
args_json = json.dumps(args, default=str)
kwargs_json = json.dumps(kwargs, default=str)
except OSError as e:
raise Exception(f"Cannot retrieve source code for function {python_func.__name__}: {e}")
except Exception as e:
raise Exception(f"Failed to reconstruct function source: {e}")
# Create Python code that will define and execute the function
python_code = f'''
import json
import traceback
try:
# Define the function from source
{textwrap.indent(func_source, " ")}
# Deserialize args and kwargs from JSON
args_json = """{args_json}"""
kwargs_json = """{kwargs_json}"""
args = json.loads(args_json)
kwargs = json.loads(kwargs_json)
# Execute the function
result = {func_name}(*args, **kwargs)
# Create success output payload
output_payload = {{
"success": True,
"result": result,
"error": None
}}
except Exception as e:
# Create error output payload
output_payload = {{
"success": False,
"result": None,
"error": {{
"type": type(e).__name__,
"message": str(e),
"traceback": traceback.format_exc()
}}
}}
# Serialize the output payload as JSON
import json
output_json = json.dumps(output_payload, default=str)
# Print the JSON output with markers
print(f"<<<VENV_EXEC_START>>>{{output_json}}<<<VENV_EXEC_END>>>")
'''
# Encode the Python code in base64 to avoid shell escaping issues
encoded_code = base64.b64encode(python_code.encode('utf-8')).decode('ascii')
# Execute the Python code in the virtual environment
python_command = f"python -c \"import base64; exec(base64.b64decode('{encoded_code}').decode('utf-8'))\""
stdout, stderr = await self.venv_cmd(venv_name, python_command)
# Parse the output to extract the payload
start_marker = "<<<VENV_EXEC_START>>>"
end_marker = "<<<VENV_EXEC_END>>>"
# Print original stdout
print(stdout[:stdout.find(start_marker)])
if start_marker in stdout and end_marker in stdout:
start_idx = stdout.find(start_marker) + len(start_marker)
end_idx = stdout.find(end_marker)
if start_idx < end_idx:
output_json = stdout[start_idx:end_idx]
try:
# Decode and deserialize the output payload from JSON
output_payload = json.loads(output_json)
except Exception as e:
raise Exception(f"Failed to decode output payload: {e}")
if output_payload["success"]:
return output_payload["result"]
else:
# Recreate and raise the original exception
error_info = output_payload["error"]
error_class = eval(error_info["type"])
raise error_class(error_info["message"])
else:
raise Exception("Invalid output format: markers found but no content between them")
else:
# Fallback: return stdout/stderr if no payload markers found
raise Exception(f"No output payload found. stdout: {stdout}, stderr: {stderr}")

View File

@@ -1,4 +1,5 @@
import asyncio
from .interface.models import KeyType, Key
class DioramaComputer:
"""
@@ -37,7 +38,7 @@ class DioramaComputerInterface:
raise RuntimeError("Computer interface not initialized. Call run() first.")
result = await iface.diorama_cmd(action, arguments)
if not result.get("success"):
raise RuntimeError(f"Diorama command failed: {result.get('error')}")
raise RuntimeError(f"Diorama command failed: {result.get('error')}\n{result.get('trace')}")
return result.get("result")
async def screenshot(self, as_bytes=True):
@@ -87,7 +88,17 @@ class DioramaComputerInterface:
await self._send_cmd("press_key", {"key": key})
async def hotkey(self, *keys):
await self._send_cmd("hotkey", {"keys": list(keys)})
actual_keys = []
for key in keys:
if isinstance(key, Key):
actual_keys.append(key.value)
elif isinstance(key, str):
# Try to convert to enum if it matches a known key
key_or_enum = Key.from_string(key)
actual_keys.append(key_or_enum.value if isinstance(key_or_enum, Key) else key_or_enum)
else:
raise ValueError(f"Invalid key type: {type(key)}. Must be Key enum or string.")
await self._send_cmd("hotkey", {"keys": actual_keys})
async def to_screen_coordinates(self, x, y):
return await self._send_cmd("to_screen_coordinates", {"x": x, "y": y})

View File

@@ -0,0 +1,49 @@
"""
Helper functions and decorators for the Computer module.
"""
import asyncio
from functools import wraps
from typing import Any, Callable, Optional, TypeVar, cast
# Global reference to the default computer instance
_default_computer = None
def set_default_computer(computer):
"""
Set the default computer instance to be used by the remote decorator.
Args:
computer: The computer instance to use as default
"""
global _default_computer
_default_computer = computer
def sandboxed(venv_name: str = "default", computer: str = "default", max_retries: int = 3):
"""
Decorator that wraps a function to be executed remotely via computer.venv_exec
Args:
venv_name: Name of the virtual environment to execute in
computer: The computer instance to use, or "default" to use the globally set default
max_retries: Maximum number of retries for the remote execution
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Determine which computer instance to use
comp = computer if computer != "default" else _default_computer
if comp is None:
raise RuntimeError("No computer instance available. Either specify a computer instance or call set_default_computer() first.")
for i in range(max_retries):
try:
return await comp.venv_exec(venv_name, func, *args, **kwargs)
except Exception as e:
print(f"Attempt {i+1} failed: {e}")
await asyncio.sleep(1)
if i == max_retries - 1:
raise e
return wrapper
return decorator

View File

@@ -27,6 +27,7 @@ class LinuxComputerInterface(BaseComputerInterface):
self._max_reconnect_delay = 30 # Maximum delay between reconnection attempts
self._log_connection_attempts = True # Flag to control connection attempt logging
self._authenticated = False # Track authentication status
self._command_lock = asyncio.Lock() # Lock to ensure only one command at a time
# Set logger name for Linux interface
self.logger = Logger("cua.interface.linux", LogLevel.NORMAL)
@@ -193,58 +194,62 @@ class LinuxComputerInterface(BaseComputerInterface):
retry_count = 0
last_error = None
while retry_count < max_retries:
try:
await self._ensure_connection()
if not self._ws:
raise ConnectionError("WebSocket connection is not established")
# Acquire lock to ensure only one command is processed at a time
async with self._command_lock:
self.logger.debug(f"Acquired lock for command: {command}")
while retry_count < max_retries:
try:
await self._ensure_connection()
if not self._ws:
raise ConnectionError("WebSocket connection is not established")
# Handle authentication if needed
if self.api_key and self.vm_name and not self._authenticated:
self.logger.info("Performing authentication handshake...")
auth_message = {
"command": "authenticate",
"params": {
"api_key": self.api_key,
"container_name": self.vm_name
# Handle authentication if needed
if self.api_key and self.vm_name and not self._authenticated:
self.logger.info("Performing authentication handshake...")
auth_message = {
"command": "authenticate",
"params": {
"api_key": self.api_key,
"container_name": self.vm_name
}
}
}
await self._ws.send(json.dumps(auth_message))
# Wait for authentication response
auth_response = await asyncio.wait_for(self._ws.recv(), timeout=10)
auth_result = json.loads(auth_response)
if not auth_result.get("success"):
error_msg = auth_result.get("error", "Authentication failed")
self.logger.error(f"Authentication failed: {error_msg}")
self._authenticated = False
raise ConnectionError(f"Authentication failed: {error_msg}")
self.logger.info("Authentication successful")
self._authenticated = True
await self._ws.send(json.dumps(auth_message))
# Wait for authentication response
auth_response = await asyncio.wait_for(self._ws.recv(), timeout=10)
auth_result = json.loads(auth_response)
if not auth_result.get("success"):
error_msg = auth_result.get("error", "Authentication failed")
self.logger.error(f"Authentication failed: {error_msg}")
self._authenticated = False
raise ConnectionError(f"Authentication failed: {error_msg}")
self.logger.info("Authentication successful")
self._authenticated = True
message = {"command": command, "params": params or {}}
await self._ws.send(json.dumps(message))
response = await asyncio.wait_for(self._ws.recv(), timeout=30)
return json.loads(response)
except Exception as e:
last_error = e
retry_count += 1
if retry_count < max_retries:
# Only log at debug level for intermediate retries
self.logger.debug(
f"Command '{command}' failed (attempt {retry_count}/{max_retries}): {e}"
)
await asyncio.sleep(1)
continue
else:
# Only log at error level for the final failure
self.logger.error(
f"Failed to send command '{command}' after {max_retries} retries"
)
self.logger.debug(f"Command failure details: {e}")
raise last_error if last_error else RuntimeError("Failed to send command")
message = {"command": command, "params": params or {}}
await self._ws.send(json.dumps(message))
response = await asyncio.wait_for(self._ws.recv(), timeout=30)
self.logger.debug(f"Completed command: {command}")
return json.loads(response)
except Exception as e:
last_error = e
retry_count += 1
if retry_count < max_retries:
# Only log at debug level for intermediate retries
self.logger.debug(
f"Command '{command}' failed (attempt {retry_count}/{max_retries}): {e}"
)
await asyncio.sleep(1)
continue
else:
# Only log at error level for the final failure
self.logger.error(
f"Failed to send command '{command}' after {max_retries} retries"
)
self.logger.debug(f"Command failure details: {e}")
raise last_error if last_error else RuntimeError("Failed to send command")
async def wait_for_ready(self, timeout: int = 60, interval: float = 1.0):
"""Wait for WebSocket connection to become available."""

View File

@@ -26,6 +26,7 @@ class MacOSComputerInterface(BaseComputerInterface):
self._reconnect_delay = 1 # Start with 1 second delay
self._max_reconnect_delay = 30 # Maximum delay between reconnection attempts
self._log_connection_attempts = True # Flag to control connection attempt logging
self._command_lock = asyncio.Lock() # Lock to ensure only one command at a time
# Set logger name for macOS interface
self.logger = Logger("cua.interface.macos", LogLevel.NORMAL)
@@ -219,35 +220,39 @@ class MacOSComputerInterface(BaseComputerInterface):
retry_count = 0
last_error = None
while retry_count < max_retries:
try:
await self._ensure_connection()
if not self._ws:
raise ConnectionError("WebSocket connection is not established")
# Acquire lock to ensure only one command is processed at a time
async with self._command_lock:
self.logger.debug(f"Acquired lock for command: {command}")
while retry_count < max_retries:
try:
await self._ensure_connection()
if not self._ws:
raise ConnectionError("WebSocket connection is not established")
message = {"command": command, "params": params or {}}
await self._ws.send(json.dumps(message))
response = await asyncio.wait_for(self._ws.recv(), timeout=30)
return json.loads(response)
except Exception as e:
last_error = e
retry_count += 1
if retry_count < max_retries:
# Only log at debug level for intermediate retries
self.logger.debug(
f"Command '{command}' failed (attempt {retry_count}/{max_retries}): {e}"
)
await asyncio.sleep(1)
continue
else:
# Only log at error level for the final failure
self.logger.error(
f"Failed to send command '{command}' after {max_retries} retries"
)
self.logger.debug(f"Command failure details: {e}")
raise
message = {"command": command, "params": params or {}}
await self._ws.send(json.dumps(message))
response = await asyncio.wait_for(self._ws.recv(), timeout=30)
self.logger.debug(f"Completed command: {command}")
return json.loads(response)
except Exception as e:
last_error = e
retry_count += 1
if retry_count < max_retries:
# Only log at debug level for intermediate retries
self.logger.debug(
f"Command '{command}' failed (attempt {retry_count}/{max_retries}): {e}"
)
await asyncio.sleep(1)
continue
else:
# Only log at error level for the final failure
self.logger.error(
f"Failed to send command '{command}' after {max_retries} retries"
)
self.logger.debug(f"Command failure details: {e}")
raise
raise last_error if last_error else RuntimeError("Failed to send command")
raise last_error if last_error else RuntimeError("Failed to send command")
async def wait_for_ready(self, timeout: int = 60, interval: float = 1.0):
"""Wait for WebSocket connection to become available."""

View File

@@ -463,7 +463,7 @@ async def execute(name, action, arguments):
elif action == "left_click":
if "x" in arguments and "y" in arguments:
await computer.interface.move_cursor(arguments["x"], arguments["y"])
await computer.interface.left_click()
await computer.interface.left_click(arguments["x"], arguments["y"])
await asyncio.sleep(0.5)
elif action == "right_click":
if "x" in arguments and "y" in arguments:
@@ -528,43 +528,75 @@ async def execute(name, action, arguments):
return results
async def handle_init_computer(os_choice: str):
"""Initialize the computer instance and tools for macOS or Ubuntu"""
async def handle_init_computer(os_choice: str, app_list=None, provider="lume"):
"""Initialize the computer instance and tools for macOS or Ubuntu
Args:
os_choice: The OS to use ("macOS" or "Ubuntu")
app_list: Optional list of apps to focus on using the app-use experiment
provider: The provider to use ("lume" or "self")
"""
global computer, tool_call_logs, tools
# Check if we should enable app-use experiment
use_app_experiment = app_list and len(app_list) > 0
experiments = ["app-use"] if use_app_experiment else None
# Determine if we should use host computer server
use_host_computer_server = provider == "self"
if os_choice == "Ubuntu":
computer = Computer(
image="ubuntu-noble-vanilla:latest",
os_type="linux",
provider_type=VMProviderType.LUME,
display="1024x768",
memory="8GB",
cpu="4"
)
os_type_str = "linux"
image_str = "ubuntu-noble-vanilla:latest"
else:
os_type_str = "macos"
image_str = "macos-sequoia-cua:latest"
# Create computer instance with appropriate configuration
if use_host_computer_server:
computer = Computer(
image="macos-sequoia-cua:latest",
os_type="macos",
os_type=os_type_str,
use_host_computer_server=True,
experiments=experiments
)
else:
computer = Computer(
image=image_str,
os_type=os_type_str,
provider_type=VMProviderType.LUME,
display="1024x768",
memory="8GB",
cpu="4"
cpu="4",
experiments=experiments
)
os_type_str = "macos"
image_str = "macos-sequoia-cua:latest"
await computer.run()
# If app list is provided, create desktop from apps
if use_app_experiment:
computer = computer.create_desktop_from_apps(app_list)
# Log computer initialization as a tool call
result = await execute("computer", "initialize", {
init_params = {
"os": os_type_str,
"image": image_str,
"display": "1024x768",
"memory": "8GB",
"cpu": "4"
})
"provider": provider
}
# Add VM-specific parameters if not using host computer server
if not use_host_computer_server:
init_params.update({
"image": image_str,
"display": "1024x768",
"memory": "8GB",
"cpu": "4"
})
# Add app list to the log if provided
if use_app_experiment:
init_params["apps"] = app_list
init_params["experiments"] = ["app-use"]
result = await execute("computer", "initialize", init_params)
return result["screenshot"], json.dumps(tool_call_logs, indent=2)
@@ -1029,12 +1061,31 @@ def create_gradio_ui():
setup_status = gr.Textbox(label="Setup Status", value="")
with gr.Group():
os_choice = gr.Radio(
label="OS",
choices=["macOS", "Ubuntu"],
value="macOS",
interactive=False # disable until the ubuntu image is ready
)
with gr.Accordion("Computer Configuration", open=False):
with gr.Row():
os_choice = gr.Radio(
label="OS",
choices=["macOS", "Ubuntu"],
value="macOS",
interactive=False # disable until the ubuntu image is ready
)
# Provider selection radio
provider_choice = gr.Radio(
label="Provider",
choices=["lume", "self"],
value="lume",
info="'lume' uses a VM, 'self' uses the host computer server"
)
# App filtering dropdown for app-use experiment
app_filter = gr.Dropdown(
label="Filter by apps (App-Use)",
multiselect=True,
allow_custom_value=True,
info="When apps are selected, the computer will focus on those apps using the app-use experiment"
)
start_btn = gr.Button("Initialize Computer")
with gr.Group():
@@ -1199,7 +1250,7 @@ def create_gradio_ui():
)
img.select(handle_click, inputs=[img, click_type], outputs=[img, action_log])
start_btn.click(handle_init_computer, inputs=[os_choice], outputs=[img, action_log])
start_btn.click(handle_init_computer, inputs=[os_choice, app_filter, provider_choice], outputs=[img, action_log])
wait_btn.click(handle_wait, outputs=[img, action_log])
# DONE and FAIL buttons just do a placeholder action

4
tests/pytest.ini Normal file
View File

@@ -0,0 +1,4 @@
[pytest]
asyncio_mode = auto
markers =
asyncio: asyncio mark

206
tests/venv.py Normal file
View File

@@ -0,0 +1,206 @@
"""
Virtual Environment Testing Module
This module tests the ability to execute python code in a virtual environment within C/ua Containers.
Required environment variables:
- CUA_API_KEY: API key for C/ua cloud provider
- CUA_CONTAINER_NAME: Name of the container to use
"""
import os
import asyncio
import pytest
from pathlib import Path
import sys
import traceback
# Load environment variables from .env file
project_root = Path(__file__).parent.parent
env_file = project_root / ".env"
print(f"Loading environment from: {env_file}")
from dotenv import load_dotenv
load_dotenv(env_file)
# Add paths to sys.path if needed
pythonpath = os.environ.get("PYTHONPATH", "")
for path in pythonpath.split(":"):
if path and path not in sys.path:
sys.path.insert(0, path) # Insert at beginning to prioritize
print(f"Added to sys.path: {path}")
from computer.computer import Computer
from computer.providers.base import VMProviderType
from computer.helpers import sandboxed, set_default_computer
@pytest.fixture(scope="session")
async def computer():
"""Shared Computer instance for all test cases."""
# # Create a remote Linux computer with C/ua
# computer = Computer(
# os_type="linux",
# api_key=os.getenv("CUA_API_KEY"),
# name=str(os.getenv("CUA_CONTAINER_NAME")),
# provider_type=VMProviderType.CLOUD,
# )
# Create a local macOS computer with C/ua
computer = Computer()
try:
await computer.run()
yield computer
finally:
# await computer.stop()
pass
# Sample test cases
@pytest.mark.asyncio(loop_scope="session")
async def test_venv_install(computer):
"""Test virtual environment creation and package installation."""
# Create a test virtual environment and install requests
stdout, _ = await computer.venv_install("test_env", ["requests"])
# Check that installation was successful (no major errors)
assert "Successfully installed" in stdout or "Requirement already satisfied" in stdout
@pytest.mark.asyncio(loop_scope="session")
async def test_venv_cmd(computer):
"""Test executing shell commands in virtual environment."""
# Test Python version check
stdout, _ = await computer.venv_cmd("test_env", "python --version")
assert "Python" in stdout
@pytest.mark.asyncio(loop_scope="session")
async def test_venv_exec(computer):
"""Test executing Python functions in virtual environment."""
def test_function(message="Hello World"):
import sys
return f"Python {sys.version_info.major}.{sys.version_info.minor}: {message}"
result = await computer.venv_exec("test_env", test_function, message="Test successful!")
assert "Python" in result
assert "Test successful!" in result
@pytest.mark.asyncio(loop_scope="session")
async def test_venv_exec_with_package(computer):
"""Test executing Python functions that use installed packages."""
def test_requests():
import requests
return f"requests version: {requests.__version__}"
result = await computer.venv_exec("test_env", test_requests)
assert "requests version:" in result
@pytest.mark.asyncio(loop_scope="session")
async def test_venv_exec_error_handling(computer):
"""Test error handling in venv_exec."""
def test_error():
raise ValueError("This is a test error")
with pytest.raises(ValueError, match="This is a test error"):
await computer.venv_exec("test_env", test_error)
@pytest.mark.asyncio(loop_scope="session")
async def test_venv_exec_with_args_kwargs(computer):
"""Test executing Python functions with args and kwargs that return an object."""
def create_data_object(name, age, *hobbies, **metadata):
return {
"name": name,
"age": age,
"hobbies": list(hobbies),
"metadata": metadata,
"status": "active"
}
args = ["Alice", 25, "reading", "coding"]
kwargs = {"location": "New York", "department": "Engineering"}
result = await computer.venv_exec(
"test_env",
create_data_object,
*args,
**kwargs
)
assert result["name"] == "Alice"
assert result["age"] == 25
assert result["hobbies"] == ["reading", "coding"]
assert result["metadata"]["location"] == "New York"
assert result["status"] == "active"
@pytest.mark.asyncio(loop_scope="session")
async def test_venv_exec_stdout_capture(computer, capfd):
"""Test capturing stdout from Python functions executed in virtual environment."""
def hello_world_function():
print("Hello World!")
return "Function completed"
# Execute the function in the virtual environment
result = await computer.venv_exec("test_env", hello_world_function)
# Capture stdout and stderr
out, _ = capfd.readouterr()
# Assert the stdout contains our expected output
assert out == "Hello World!\n\n"
assert result == "Function completed"
@pytest.mark.asyncio(loop_scope="session")
async def test_remote_decorator(computer):
"""Test the remote decorator from computer.helpers module."""
# Set the computer as default for the remote decorator
set_default_computer(computer)
# Define a function with the remote decorator
@sandboxed("test_env")
def get_package_version():
import sys
import platform
return {
"python_version": sys.version,
"platform": platform.platform(),
"success": True
}
# Call the decorated function
result = await get_package_version()
# Verify the function executed in the virtual environment
assert "python_version" in result
assert "platform" in result
assert result["success"] == True
@pytest.mark.asyncio(loop_scope="session")
async def test_remote_decorator_with_custom_computer(computer):
"""Test the remote decorator with explicitly specified computer instance."""
# Define a function with the remote decorator that explicitly specifies the computer
@sandboxed("test_env", computer=computer)
def get_system_info():
import os
import sys
return {
"python_version": sys.version,
"environment_vars": dict(os.environ),
"working_directory": os.getcwd()
}
# Call the decorated function
result = await get_system_info()
# Verify the function executed in the virtual environment
assert "python_version" in result
assert "environment_vars" in result
assert "working_directory" in result
# The virtual environment should have a different working directory
# than the current test process
assert result["working_directory"] != os.getcwd()
if __name__ == "__main__":
# Run tests directly
pytest.main([__file__, "-v"])