Merge pull request #284 from trycua/feat/computer-ext

[Computer] Add kb+m primitive actions, horizontal scrolling, and file-system commands
This commit is contained in:
ddupont
2025-06-10 15:12:59 -04:00
committed by GitHub
13 changed files with 891 additions and 94 deletions

View File

@@ -247,6 +247,9 @@ docker run -it --rm \
For complete examples, see [computer_examples.py](./examples/computer_examples.py) or [computer_nb.ipynb](./notebooks/computer_nb.ipynb)
```python
# Shell Actions
await computer.interface.run_command(cmd) # Run shell command
# Mouse Actions
await computer.interface.left_click(x, y) # Left click at coordinates
await computer.interface.right_click(x, y) # Right click at coordinates
@@ -254,11 +257,20 @@ await computer.interface.double_click(x, y) # Double click at coordinates
await computer.interface.move_cursor(x, y) # Move cursor to coordinates
await computer.interface.drag_to(x, y, duration) # Drag to coordinates
await computer.interface.get_cursor_position() # Get current cursor position
await computer.interface.mouse_down(x, y, button="left") # Press and hold a mouse button
await computer.interface.mouse_up(x, y, button="left") # Release a mouse button
# Keyboard Actions
await computer.interface.type_text("Hello") # Type text
await computer.interface.press_key("enter") # Press a single key
await computer.interface.hotkey("command", "c") # Press key combination
await computer.interface.key_down("command") # Press and hold a key
await computer.interface.key_up("command") # Release a key
# Scrolling Actions
await computer.interface.scroll(x, y) # Scroll the mouse wheel
await computer.interface.scroll_down(clicks) # Scroll down
await computer.interface.scroll_up(clicks) # Scroll up
# Screen Actions
await computer.interface.screenshot() # Take a screenshot
@@ -271,7 +283,14 @@ await computer.interface.copy_to_clipboard() # Get clipboard content
# File System Operations
await computer.interface.file_exists(path) # Check if file exists
await computer.interface.directory_exists(path) # Check if directory exists
await computer.interface.run_command(cmd) # Run shell command
await computer.interface.read_text(path) # Read file content
await computer.interface.write_text(path, content) # Write file content
await computer.interface.read_bytes(path) # Read file content as bytes
await computer.interface.write_bytes(path, content) # Write file content as bytes
await computer.interface.delete_file(path) # Delete file
await computer.interface.create_dir(path) # Create directory
await computer.interface.delete_dir(path) # Delete directory
await computer.interface.list_dir(path) # List directory contents
# Accessibility
await computer.interface.get_accessibility_tree() # Get accessibility tree

View File

@@ -16,6 +16,59 @@ class BaseAccessibilityHandler(ABC):
"""Find an element in the accessibility tree by criteria."""
pass
class BaseFileHandler(ABC):
"""Abstract base class for OS-specific file handlers."""
@abstractmethod
async def file_exists(self, path: str) -> Dict[str, Any]:
"""Check if a file exists at the specified path."""
pass
@abstractmethod
async def directory_exists(self, path: str) -> Dict[str, Any]:
"""Check if a directory exists at the specified path."""
pass
@abstractmethod
async def list_dir(self, path: str) -> Dict[str, Any]:
"""List the contents of a directory."""
pass
@abstractmethod
async def read_text(self, path: str) -> Dict[str, Any]:
"""Read the text contents of a file."""
pass
@abstractmethod
async def write_text(self, path: str, content: str) -> Dict[str, Any]:
"""Write text content to a file."""
pass
@abstractmethod
async def read_bytes(self, path: str) -> Dict[str, Any]:
"""Read the binary contents of a file. Sent over the websocket as a base64 string."""
pass
@abstractmethod
async def write_bytes(self, path: str, content_b64: str) -> Dict[str, Any]:
"""Write binary content to a file. Sent over the websocket as a base64 string."""
pass
@abstractmethod
async def delete_file(self, path: str) -> Dict[str, Any]:
"""Delete a file."""
pass
@abstractmethod
async def create_dir(self, path: str) -> Dict[str, Any]:
"""Create a directory."""
pass
@abstractmethod
async def delete_dir(self, path: str) -> Dict[str, Any]:
"""Delete a directory."""
pass
class BaseAutomationHandler(ABC):
"""Abstract base class for OS-specific automation handlers.
@@ -28,6 +81,16 @@ class BaseAutomationHandler(ABC):
"""
# Mouse Actions
@abstractmethod
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
"""Perform a mouse down at the current or specified position."""
pass
@abstractmethod
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
"""Perform a mouse up at the current or specified position."""
pass
@abstractmethod
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
"""Perform a left click at the current or specified position."""
@@ -72,6 +135,16 @@ class BaseAutomationHandler(ABC):
pass
# Keyboard Actions
@abstractmethod
async def key_down(self, key: str) -> Dict[str, Any]:
"""Press and hold the specified key."""
pass
@abstractmethod
async def key_up(self, key: str) -> Dict[str, Any]:
"""Release the specified key."""
pass
@abstractmethod
async def type_text(self, text: str) -> Dict[str, Any]:
"""Type the specified text."""
@@ -88,6 +161,11 @@ class BaseAutomationHandler(ABC):
pass
# Scrolling Actions
@abstractmethod
async def scroll(self, x: int, y: int) -> Dict[str, Any]:
"""Scroll the specified amount."""
pass
@abstractmethod
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
"""Scroll down by the specified number of clicks."""

View File

@@ -1,7 +1,7 @@
import platform
import subprocess
from typing import Tuple, Type
from .base import BaseAccessibilityHandler, BaseAutomationHandler
from .base import BaseAccessibilityHandler, BaseAutomationHandler, BaseFileHandler
from computer_server.diorama.base import BaseDioramaHandler
# Conditionally import platform-specific handlers
@@ -12,6 +12,8 @@ if system == 'darwin':
elif system == 'linux':
from .linux import LinuxAccessibilityHandler, LinuxAutomationHandler
from .generic import GenericFileHandler
class HandlerFactory:
"""Factory for creating OS-specific handlers."""
@@ -40,12 +42,12 @@ class HandlerFactory:
raise RuntimeError(f"Failed to determine current OS: {str(e)}")
@staticmethod
def create_handlers() -> Tuple[BaseAccessibilityHandler, BaseAutomationHandler, BaseDioramaHandler]:
def create_handlers() -> Tuple[BaseAccessibilityHandler, BaseAutomationHandler, BaseDioramaHandler, BaseFileHandler]:
"""Create and return appropriate handlers for the current OS.
Returns:
Tuple[BaseAccessibilityHandler, BaseAutomationHandler, BaseDioramaHandler]: A tuple containing
the appropriate accessibility, automation, and diorama handlers for the current OS.
Tuple[BaseAccessibilityHandler, BaseAutomationHandler, BaseDioramaHandler, BaseFileHandler]: A tuple containing
the appropriate accessibility, automation, diorama, and file handlers for the current OS.
Raises:
NotImplementedError: If the current OS is not supported
@@ -54,8 +56,8 @@ class HandlerFactory:
os_type = HandlerFactory._get_current_os()
if os_type == 'darwin':
return MacOSAccessibilityHandler(), MacOSAutomationHandler(), MacOSDioramaHandler()
return MacOSAccessibilityHandler(), MacOSAutomationHandler(), MacOSDioramaHandler(), GenericFileHandler()
elif os_type == 'linux':
return LinuxAccessibilityHandler(), LinuxAutomationHandler(), BaseDioramaHandler()
return LinuxAccessibilityHandler(), LinuxAutomationHandler(), BaseDioramaHandler(), GenericFileHandler()
else:
raise NotImplementedError(f"OS '{os_type}' is not supported")

View File

@@ -0,0 +1,82 @@
"""
Generic handlers for all OSes.
Includes:
- FileHandler
"""
from pathlib import Path
from typing import Dict, Any
from .base import BaseFileHandler
import base64
def resolve_path(path: str) -> Path:
"""Resolve a path to its absolute path. Expand ~ to the user's home directory."""
return Path(path).expanduser().resolve()
class GenericFileHandler(BaseFileHandler):
async def file_exists(self, path: str) -> Dict[str, Any]:
try:
return {"success": True, "exists": resolve_path(path).is_file()}
except Exception as e:
return {"success": False, "error": str(e)}
async def directory_exists(self, path: str) -> Dict[str, Any]:
try:
return {"success": True, "exists": resolve_path(path).is_dir()}
except Exception as e:
return {"success": False, "error": str(e)}
async def list_dir(self, path: str) -> Dict[str, Any]:
try:
return {"success": True, "files": [p.name for p in resolve_path(path).iterdir() if p.is_file() or p.is_dir()]}
except Exception as e:
return {"success": False, "error": str(e)}
async def read_text(self, path: str) -> Dict[str, Any]:
try:
return {"success": True, "content": resolve_path(path).read_text()}
except Exception as e:
return {"success": False, "error": str(e)}
async def write_text(self, path: str, content: str) -> Dict[str, Any]:
try:
resolve_path(path).write_text(content)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def write_bytes(self, path: str, content_b64: str) -> Dict[str, Any]:
try:
resolve_path(path).write_bytes(base64.b64decode(content_b64))
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def read_bytes(self, path: str) -> Dict[str, Any]:
try:
return {"success": True, "content_b64": base64.b64encode(resolve_path(path).read_bytes()).decode('utf-8')}
except Exception as e:
return {"success": False, "error": str(e)}
async def delete_file(self, path: str) -> Dict[str, Any]:
try:
resolve_path(path).unlink()
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def create_dir(self, path: str) -> Dict[str, Any]:
try:
resolve_path(path).mkdir(parents=True, exist_ok=True)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def delete_dir(self, path: str) -> Dict[str, Any]:
try:
resolve_path(path).rmdir()
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}

View File

@@ -84,6 +84,24 @@ class LinuxAutomationHandler(BaseAutomationHandler):
"""Linux implementation of automation handler using pyautogui."""
# Mouse Actions
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.mouseDown(button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.mouseUp(button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
try:
pyautogui.moveTo(x, y)
@@ -154,6 +172,20 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
# Keyboard Actions
async def key_down(self, key: str) -> Dict[str, Any]:
try:
pyautogui.keyDown(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def key_up(self, key: str) -> Dict[str, Any]:
try:
pyautogui.keyUp(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def type_text(self, text: str) -> Dict[str, Any]:
try:
pyautogui.write(text)
@@ -176,6 +208,13 @@ class LinuxAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
# Scrolling Actions
async def scroll(self, x: int, y: int) -> Dict[str, Any]:
try:
pyautogui.scroll(x, y)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
try:
pyautogui.scroll(-clicks)

View File

@@ -50,6 +50,29 @@ import logging
logger = logging.getLogger(__name__)
# Constants for accessibility API
kAXErrorSuccess = 0
kAXRoleAttribute = "AXRole"
kAXTitleAttribute = "AXTitle"
kAXValueAttribute = "AXValue"
kAXWindowsAttribute = "AXWindows"
kAXFocusedAttribute = "AXFocused"
kAXPositionAttribute = "AXPosition"
kAXSizeAttribute = "AXSize"
kAXChildrenAttribute = "AXChildren"
kAXMenuBarAttribute = "AXMenuBar"
kAXMenuBarItemAttribute = "AXMenuBarItem"
# Constants for window properties
kCGWindowLayer = "kCGWindowLayer" # Z-order information (lower values are higher in the stack)
kCGWindowAlpha = "kCGWindowAlpha" # Window opacity
# Constants for application activation options
NSApplicationActivationOptions = {
"regular": 0, # Default activation
"bringing_all_windows_forward": 1 << 0, # NSApplicationActivateAllWindows
"ignoring_other_apps": 1 << 1 # NSApplicationActivateIgnoringOtherApps
}
def CFAttributeToPyObject(attrValue):
def list_helper(list_value):
@@ -210,15 +233,15 @@ class UIElement:
self.calculate_hashes()
def _set_bboxes(self, parents_visible_bbox):
if not self.position or not self.size:
if not self.absolute_position or not self.size:
self.bbox = None
self.visible_bbox = None
return
self.bbox = [
int(self.position.x),
int(self.position.y),
int(self.position.x + self.size.width),
int(self.position.y + self.size.height),
int(self.absolute_position.x),
int(self.absolute_position.y),
int(self.absolute_position.x + self.size.width),
int(self.absolute_position.y + self.size.height),
]
if parents_visible_bbox:
# check if not intersected
@@ -345,7 +368,221 @@ class UIElement:
}
import Quartz
from AppKit import NSWorkspace, NSRunningApplication
from pathlib import Path
def get_all_windows_zorder():
window_list = Quartz.CGWindowListCopyWindowInfo(
Quartz.kCGWindowListOptionOnScreenOnly,
Quartz.kCGNullWindowID
)
z_order = {window['kCGWindowNumber']: z_index for z_index, window in enumerate(window_list[::-1])}
window_list_all = Quartz.CGWindowListCopyWindowInfo(
Quartz.kCGWindowListOptionAll,
Quartz.kCGNullWindowID
)
windows = []
for window in window_list_all:
window_id = window.get('kCGWindowNumber', 0)
window_name = window.get('kCGWindowName', '')
window_pid = window.get('kCGWindowOwnerPID', 0)
window_bounds = window.get('kCGWindowBounds', {})
window_owner = window.get('kCGWindowOwnerName', '')
window_is_on_screen = window.get('kCGWindowIsOnscreen', False)
layer = window.get('kCGWindowLayer', 0)
opacity = window.get('kCGWindowAlpha', 1.0)
z_index = z_order.get(window_id, -1)
if window_name == "Dock" and window_owner == "Dock":
role = "dock"
elif window_name == "Menubar" and window_owner == "Window Server":
role = "menubar"
elif window_owner in ["Window Server", "Dock"]:
role = "desktop"
else:
role = "app"
if window_bounds:
windows.append({
"id": window_id,
"name": window_name or "Unnamed Window",
"pid": window_pid,
"owner": window_owner,
"role": role,
"is_on_screen": window_is_on_screen,
"bounds": {
"x": window_bounds.get('X', 0),
"y": window_bounds.get('Y', 0),
"width": window_bounds.get('Width', 0),
"height": window_bounds.get('Height', 0)
},
"layer": layer,
"z_index": z_index,
"opacity": opacity
})
windows = sorted(windows, key=lambda x: x["z_index"])
return windows
def get_app_info(app):
return {
"name": app.localizedName(),
"bundle_id": app.bundleIdentifier(),
"pid": app.processIdentifier(),
"active": app.isActive(),
"hidden": app.isHidden(),
"terminated": app.isTerminated(),
}
def get_menubar_items(active_app_pid=None):
menubar_items = []
if active_app_pid is None:
frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()
if frontmost_app:
active_app_pid = frontmost_app.processIdentifier()
else:
return menubar_items
app_element = AXUIElementCreateApplication(active_app_pid)
if app_element is None:
return menubar_items
menubar = element_attribute(app_element, kAXMenuBarAttribute)
if menubar is None:
return menubar_items
children = element_attribute(menubar, kAXChildrenAttribute)
if children is None:
return menubar_items
for i, item in enumerate(children):
title = element_attribute(item, kAXTitleAttribute) or "Untitled"
bounds = {"x": 0, "y": 0, "width": 0, "height": 0}
position_value = element_attribute(item, kAXPositionAttribute)
if position_value:
position_value = element_value(position_value, kAXValueCGPointType)
bounds["x"] = getattr(position_value, 'x', 0)
bounds["y"] = getattr(position_value, 'y', 0)
size_value = element_attribute(item, kAXSizeAttribute)
if size_value:
size_value = element_value(size_value, kAXValueCGSizeType)
bounds["width"] = getattr(size_value, 'width', 0)
bounds["height"] = getattr(size_value, 'height', 0)
menubar_items.append({
"title": title,
"bounds": bounds,
"index": i,
"app_pid": active_app_pid
})
return menubar_items
def get_dock_items():
dock_items = []
dock_pid = None
running_apps = NSWorkspace.sharedWorkspace().runningApplications()
for app in running_apps:
if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock":
dock_pid = app.processIdentifier()
break
if dock_pid is None:
return dock_items
dock_element = AXUIElementCreateApplication(dock_pid)
if dock_element is None:
return dock_items
dock_list = element_attribute(dock_element, kAXChildrenAttribute)
if dock_list is None or len(dock_list) == 0:
return dock_items
dock_app_list = None
for child in dock_list:
role = element_attribute(child, kAXRoleAttribute)
if role == "AXList":
dock_app_list = child
break
if dock_app_list is None:
return dock_items
items = element_attribute(dock_app_list, kAXChildrenAttribute)
if items is None:
return dock_items
for i, item in enumerate(items):
title = element_attribute(item, kAXTitleAttribute) or "Untitled"
description = element_attribute(item, kAXDescriptionAttribute) or ""
role = element_attribute(item, kAXRoleAttribute) or ""
subrole = element_attribute(item, "AXSubrole") or ""
bounds = {"x": 0, "y": 0, "width": 0, "height": 0}
position_value = element_attribute(item, kAXPositionAttribute)
if position_value:
position_value = element_value(position_value, kAXValueCGPointType)
bounds["x"] = getattr(position_value, 'x', 0)
bounds["y"] = getattr(position_value, 'y', 0)
size_value = element_attribute(item, kAXSizeAttribute)
if size_value:
size_value = element_value(size_value, kAXValueCGSizeType)
bounds["width"] = getattr(size_value, 'width', 0)
bounds["height"] = getattr(size_value, 'height', 0)
item_type = "unknown"
if subrole == "AXApplicationDockItem":
item_type = "application"
elif subrole == "AXFolderDockItem":
item_type = "folder"
elif subrole == "AXDocumentDockItem":
item_type = "document"
elif subrole == "AXSeparatorDockItem" or role == "AXSeparator":
item_type = "separator"
elif "trash" in title.lower():
item_type = "trash"
dock_items.append({
"title": title,
"description": description,
"bounds": bounds,
"index": i,
"type": item_type,
"role": role,
"subrole": subrole
})
return dock_items
class MacOSAccessibilityHandler(BaseAccessibilityHandler):
def get_desktop_state(self):
windows = [w for w in get_all_windows_zorder() if w.get("is_on_screen")]
running_apps = self.get_running_apps()
applications = []
pid_to_window_ids = {}
# Build a mapping: pid -> list of AX window trees
pid_to_ax_trees = {}
for app in running_apps:
pid = app.processIdentifier()
try:
app_elem = AXUIElementCreateApplication(pid)
err, app_windows = AXUIElementCopyAttributeValue(app_elem, kAXWindowsAttribute, None)
trees = []
if err == kAXErrorSuccess and app_windows:
for ax_win in app_windows:
try:
trees.append(UIElement(ax_win).to_dict())
except Exception as e:
trees.append({"error": str(e)})
pid_to_ax_trees[pid] = trees
except Exception as e:
pid_to_ax_trees[pid] = [{"error": str(e)}]
# Attach children by pid and index (order)
pid_to_idx = {}
for win in windows:
pid = win["pid"]
idx = pid_to_idx.get(pid, 0)
ax_trees = pid_to_ax_trees.get(pid, [])
win["children"] = ax_trees[idx]["children"] if idx < len(ax_trees) and "children" in ax_trees[idx] else []
pid_to_idx[pid] = idx + 1
pid_to_window_ids.setdefault(pid, []).append(win["id"])
for app in running_apps:
info = get_app_info(app)
app_pid = info["pid"]
applications.append({
"info": info,
"windows": pid_to_window_ids.get(app_pid, [])
})
menubar_items = get_menubar_items()
dock_items = get_dock_items()
return {
"applications": applications,
"windows": windows,
"menubar_items": menubar_items,
"dock_items": dock_items
}
def get_application_windows(self, pid: int):
"""Get all windows for a specific application."""
try:
@@ -430,66 +667,13 @@ class MacOSAccessibilityHandler(BaseAccessibilityHandler):
return result
async def get_accessibility_tree(self) -> Dict[str, Any]:
async def get_accessibility_tree(self) -> Dict[str, Any]:
try:
# Get all visible windows first
windows = self.get_all_windows()
if not windows:
return {"success": False, "error": "No visible windows found in the system"}
# Get the frontmost window
frontmost_app = next((w for w in windows if w["frontmost"]), None)
if not frontmost_app:
frontmost_app = windows[0]
app_name = frontmost_app["app_name"]
# Process all applications and their windows
processed_windows = []
for app in windows:
app_windows = app.get("windows", [])
if app_windows:
window_trees = []
for window in app_windows:
try:
window_element = UIElement(window)
window_trees.append(window_element.to_dict())
except Exception as e:
logger.error(f"Failed to process window {window}: {e}")
window_trees.append({"error": str(e)})
continue
processed_windows.append(
{
"app_name": app["app_name"],
"pid": app["pid"],
"frontmost": app["frontmost"],
"has_windows": app["has_windows"],
"windows": window_trees,
}
)
if not any(app["windows"] for app in processed_windows):
return {
"success": False,
"error": f"No accessible windows found. Available applications:\n"
+ "\n".join(
[
f"- {w['app_name']} (PID: {w['pid']}, Active: {w['frontmost']}, Has Windows: {w['has_windows']})"
for w in windows
]
)
+ "\nPlease ensure:\n"
+ "1. The terminal has accessibility permissions\n"
+ "2. The applications have visible windows\n"
+ "3. Try clicking on a window you want to inspect",
}
desktop_state = self.get_desktop_state()
return {
"success": True,
"frontmost_application": app_name,
"windows": processed_windows,
}
**desktop_state
}
except Exception as e:
return {"success": False, "error": str(e)}
@@ -531,6 +715,24 @@ class MacOSAutomationHandler(BaseAutomationHandler):
# Mouse Actions
mouse = MouseController()
keyboard = KeyboardController()
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
try:
if x is not None and y is not None:
self.mouse.position = (x, y)
self.mouse.press(Button.left if button == "left" else Button.right if button == "right" else Button.middle)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
try:
if x is not None and y is not None:
self.mouse.position = (x, y)
self.mouse.release(Button.left if button == "left" else Button.right if button == "right" else Button.middle)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
try:
@@ -572,7 +774,7 @@ class MacOSAutomationHandler(BaseAutomationHandler):
self, x: int, y: int, button: str = "left", duration: float = 0.5
) -> Dict[str, Any]:
try:
btn = Button.left if button == "left" else Button.right
btn = Button.left if button == "left" else Button.right if button == "right" else Button.middle
# Press
self.mouse.press(btn)
# Move with sleep to simulate drag duration
@@ -600,7 +802,7 @@ class MacOSAutomationHandler(BaseAutomationHandler):
try:
if not path or len(path) < 2:
return {"success": False, "error": "Path must contain at least 2 points"}
btn = Button.left if button == "left" else Button.right
btn = Button.left if button == "left" else Button.right if button == "right" else Button.middle
# Move to the first point
self.mouse.position = path[0]
self.mouse.press(btn)
@@ -618,8 +820,25 @@ class MacOSAutomationHandler(BaseAutomationHandler):
return {"success": False, "error": str(e)}
# Keyboard Actions
async def key_down(self, key: str) -> Dict[str, Any]:
try:
# use pyautogui for their key names
pyautogui.keyDown(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def key_up(self, key: str) -> Dict[str, Any]:
try:
# use pyautogui for their key names
pyautogui.keyUp(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def type_text(self, text: str) -> Dict[str, Any]:
try:
# use pynput for Unicode support
self.keyboard.type(text)
return {"success": True}
except Exception as e:
@@ -627,6 +846,7 @@ class MacOSAutomationHandler(BaseAutomationHandler):
async def press_key(self, key: str) -> Dict[str, Any]:
try:
# use pyautogui for their key names
pyautogui.press(key)
return {"success": True}
except Exception as e:
@@ -634,12 +854,20 @@ class MacOSAutomationHandler(BaseAutomationHandler):
async def hotkey(self, keys: List[str]) -> Dict[str, Any]:
try:
# use pyautogui for their key names
pyautogui.hotkey(*keys)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Scrolling Actions
async def scroll(self, x: int, y: int) -> Dict[str, Any]:
try:
self.mouse.scroll(x, y)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
try:
self.mouse.scroll(0, -clicks)

View File

@@ -31,7 +31,7 @@ class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
# Create OS-specific handlers
self.accessibility_handler, self.automation_handler, self.diorama_handler = HandlerFactory.create_handlers()
self.accessibility_handler, self.automation_handler, self.diorama_handler, self.file_handler = HandlerFactory.create_handlers()
async def connect(self, websocket: WebSocket):
await websocket.accept()
@@ -157,28 +157,50 @@ async def websocket_endpoint(websocket: WebSocket):
# Map commands to appropriate handler methods
handlers = {
# App-Use commands
"diorama_cmd": manager.diorama_handler.diorama_cmd,
# Accessibility commands
"get_accessibility_tree": manager.accessibility_handler.get_accessibility_tree,
"find_element": manager.accessibility_handler.find_element,
# Automation commands
"screenshot": manager.automation_handler.screenshot,
# Shell commands
"run_command": manager.automation_handler.run_command,
# File system commands
"file_exists": manager.file_handler.file_exists,
"directory_exists": manager.file_handler.directory_exists,
"list_dir": manager.file_handler.list_dir,
"read_text": manager.file_handler.read_text,
"write_text": manager.file_handler.write_text,
"read_bytes": manager.file_handler.read_bytes,
"write_bytes": manager.file_handler.write_bytes,
"delete_file": manager.file_handler.delete_file,
"create_dir": manager.file_handler.create_dir,
"delete_dir": manager.file_handler.delete_dir,
# Mouse commands
"mouse_down": manager.automation_handler.mouse_down,
"mouse_up": manager.automation_handler.mouse_up,
"left_click": manager.automation_handler.left_click,
"right_click": manager.automation_handler.right_click,
"double_click": manager.automation_handler.double_click,
"scroll_down": manager.automation_handler.scroll_down,
"scroll_up": manager.automation_handler.scroll_up,
"move_cursor": manager.automation_handler.move_cursor,
"type_text": manager.automation_handler.type_text,
"press_key": manager.automation_handler.press_key,
"drag_to": manager.automation_handler.drag_to,
"drag": manager.automation_handler.drag,
# Keyboard commands
"key_down": manager.automation_handler.key_down,
"key_up": manager.automation_handler.key_up,
"type_text": manager.automation_handler.type_text,
"press_key": manager.automation_handler.press_key,
"hotkey": manager.automation_handler.hotkey,
# Scrolling actions
"scroll": manager.automation_handler.scroll,
"scroll_down": manager.automation_handler.scroll_down,
"scroll_up": manager.automation_handler.scroll_up,
# Screen actions
"screenshot": manager.automation_handler.screenshot,
"get_cursor_position": manager.automation_handler.get_cursor_position,
"get_screen_size": manager.automation_handler.get_screen_size,
# Clipboard actions
"copy_to_clipboard": manager.automation_handler.copy_to_clipboard,
"set_clipboard": manager.automation_handler.set_clipboard,
"run_command": manager.automation_handler.run_command,
"diorama_cmd": manager.diorama_handler.diorama_cmd,
}
try:

View File

@@ -3,6 +3,7 @@
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, Tuple, List
from ..logger import Logger, LogLevel
from .models import MouseButton
class BaseComputerInterface(ABC):
@@ -51,6 +52,16 @@ class BaseComputerInterface(ABC):
self.close()
# Mouse Actions
@abstractmethod
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: "MouseButton" = "left") -> None:
"""Press and hold a mouse button."""
pass
@abstractmethod
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: "MouseButton" = "left") -> None:
"""Release a mouse button."""
pass
@abstractmethod
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
"""Perform a left click."""
@@ -95,6 +106,16 @@ class BaseComputerInterface(ABC):
pass
# Keyboard Actions
@abstractmethod
async def key_down(self, key: str) -> None:
"""Press and hold a key."""
pass
@abstractmethod
async def key_up(self, key: str) -> None:
"""Release a key."""
pass
@abstractmethod
async def type_text(self, text: str) -> None:
"""Type the specified text."""
@@ -111,6 +132,11 @@ class BaseComputerInterface(ABC):
pass
# Scrolling Actions
@abstractmethod
async def scroll(self, x: int, y: int) -> None:
"""Scroll the mouse wheel."""
pass
@abstractmethod
async def scroll_down(self, clicks: int = 1) -> None:
"""Scroll down."""
@@ -166,7 +192,47 @@ class BaseComputerInterface(ABC):
async def directory_exists(self, path: str) -> bool:
"""Check if directory exists."""
pass
@abstractmethod
async def list_dir(self, path: str) -> List[str]:
"""List directory contents."""
pass
@abstractmethod
async def read_text(self, path: str) -> str:
"""Read file text contents."""
pass
@abstractmethod
async def write_text(self, path: str, content: str) -> None:
"""Write file text contents."""
pass
@abstractmethod
async def read_bytes(self, path: str) -> bytes:
"""Read file binary contents."""
pass
@abstractmethod
async def write_bytes(self, path: str, content: bytes) -> None:
"""Write file binary contents."""
pass
@abstractmethod
async def delete_file(self, path: str) -> None:
"""Delete file."""
pass
@abstractmethod
async def create_dir(self, path: str) -> None:
"""Create directory."""
pass
@abstractmethod
async def delete_dir(self, path: str) -> None:
"""Delete directory."""
pass
@abstractmethod
async def run_command(self, command: str) -> Tuple[str, str]:
"""Run shell command."""

View File

@@ -8,8 +8,8 @@ import websockets
from ..logger import Logger, LogLevel
from .base import BaseComputerInterface
from ..utils import decode_base64_image, bytes_to_image, draw_box, resize_image
from .models import Key, KeyType
from ..utils import decode_base64_image, encode_base64_image, bytes_to_image, draw_box, resize_image
from .models import Key, KeyType, MouseButton
class LinuxComputerInterface(BaseComputerInterface):
@@ -22,7 +22,7 @@ class LinuxComputerInterface(BaseComputerInterface):
self._closed = False
self._last_ping = 0
self._ping_interval = 5 # Send ping every 5 seconds
self._ping_timeout = 10 # Wait 10 seconds for pong response
self._ping_timeout = 120 # Wait 120 seconds for pong response
self._reconnect_delay = 1 # Start with 1 second delay
self._max_reconnect_delay = 30 # Maximum delay between reconnection attempts
self._log_connection_attempts = True # Flag to control connection attempt logging
@@ -87,7 +87,7 @@ class LinuxComputerInterface(BaseComputerInterface):
close_timeout=5,
compression=None, # Disable compression to reduce overhead
),
timeout=30,
timeout=120,
)
self.logger.info("WebSocket connection established")
@@ -349,6 +349,12 @@ class LinuxComputerInterface(BaseComputerInterface):
self._ws = None
# Mouse Actions
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> None:
await self._send_command("mouse_down", {"x": x, "y": y, "button": button})
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> None:
await self._send_command("mouse_up", {"x": x, "y": y, "button": button})
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
await self._send_command("left_click", {"x": x, "y": y})
@@ -361,17 +367,23 @@ class LinuxComputerInterface(BaseComputerInterface):
async def move_cursor(self, x: int, y: int) -> None:
await self._send_command("move_cursor", {"x": x, "y": y})
async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5) -> None:
async def drag_to(self, x: int, y: int, button: "MouseButton" = "left", duration: float = 0.5) -> None:
await self._send_command(
"drag_to", {"x": x, "y": y, "button": button, "duration": duration}
)
async def drag(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5) -> None:
async def drag(self, path: List[Tuple[int, int]], button: "MouseButton" = "left", duration: float = 0.5) -> None:
await self._send_command(
"drag", {"path": path, "button": button, "duration": duration}
)
# Keyboard Actions
async def key_down(self, key: "KeyType") -> None:
await self._send_command("key_down", {"key": key})
async def key_up(self, key: "KeyType") -> None:
await self._send_command("key_up", {"key": key})
async def type_text(self, text: str) -> None:
# Temporary fix for https://github.com/trycua/cua/issues/165
# Check if text contains Unicode characters
@@ -464,6 +476,9 @@ class LinuxComputerInterface(BaseComputerInterface):
await self._send_command("hotkey", {"keys": actual_keys})
# Scrolling Actions
async def scroll(self, x: int, y: int) -> None:
await self._send_command("scroll", {"x": x, "y": y})
async def scroll_down(self, clicks: int = 1) -> None:
await self._send_command("scroll_down", {"clicks": clicks})
@@ -557,6 +572,50 @@ class LinuxComputerInterface(BaseComputerInterface):
result = await self._send_command("directory_exists", {"path": path})
return result.get("exists", False)
async def list_dir(self, path: str) -> list[str]:
result = await self._send_command("list_dir", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to list directory"))
return result.get("files", [])
async def read_text(self, path: str) -> str:
result = await self._send_command("read_text", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to read file"))
return result.get("content", "")
async def write_text(self, path: str, content: str) -> None:
result = await self._send_command("write_text", {"path": path, "content": content})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to write file"))
async def read_bytes(self, path: str) -> bytes:
result = await self._send_command("read_bytes", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to read file"))
content_b64 = result.get("content_b64", "")
return decode_base64_image(content_b64)
async def write_bytes(self, path: str, content: bytes) -> None:
result = await self._send_command("write_bytes", {"path": path, "content_b64": encode_base64_image(content)})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to write file"))
async def delete_file(self, path: str) -> None:
result = await self._send_command("delete_file", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to delete file"))
async def create_dir(self, path: str) -> None:
result = await self._send_command("create_dir", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to create directory"))
async def delete_dir(self, path: str) -> None:
result = await self._send_command("delete_dir", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to delete directory"))
async def run_command(self, command: str) -> Tuple[str, str]:
result = await self._send_command("run_command", {"command": command})
if not result.get("success", False):

View File

@@ -8,8 +8,8 @@ import websockets
from ..logger import Logger, LogLevel
from .base import BaseComputerInterface
from ..utils import decode_base64_image, bytes_to_image, draw_box, resize_image
from .models import Key, KeyType
from ..utils import decode_base64_image, encode_base64_image, bytes_to_image, draw_box, resize_image
from .models import Key, KeyType, MouseButton
class MacOSComputerInterface(BaseComputerInterface):
@@ -22,7 +22,7 @@ class MacOSComputerInterface(BaseComputerInterface):
self._closed = False
self._last_ping = 0
self._ping_interval = 5 # Send ping every 5 seconds
self._ping_timeout = 10 # Wait 10 seconds for pong response
self._ping_timeout = 120 # Wait 120 seconds for pong response
self._reconnect_delay = 1 # Start with 1 second delay
self._max_reconnect_delay = 30 # Maximum delay between reconnection attempts
self._log_connection_attempts = True # Flag to control connection attempt logging
@@ -86,7 +86,7 @@ class MacOSComputerInterface(BaseComputerInterface):
close_timeout=5,
compression=None, # Disable compression to reduce overhead
),
timeout=30,
timeout=120,
)
self.logger.info("WebSocket connection established")
@@ -231,7 +231,7 @@ class MacOSComputerInterface(BaseComputerInterface):
message = {"command": command, "params": params or {}}
await self._ws.send(json.dumps(message))
response = await asyncio.wait_for(self._ws.recv(), timeout=30)
response = await asyncio.wait_for(self._ws.recv(), timeout=120)
self.logger.debug(f"Completed command: {command}")
return json.loads(response)
except Exception as e:
@@ -356,6 +356,12 @@ class MacOSComputerInterface(BaseComputerInterface):
return await self._send_command("diorama_cmd", {"action": action, "arguments": arguments or {}})
# Mouse Actions
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: "MouseButton" = "left") -> None:
await self._send_command("mouse_down", {"x": x, "y": y, "button": button})
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: "MouseButton" = "left") -> None:
await self._send_command("mouse_up", {"x": x, "y": y, "button": button})
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> None:
await self._send_command("left_click", {"x": x, "y": y})
@@ -379,6 +385,12 @@ class MacOSComputerInterface(BaseComputerInterface):
)
# Keyboard Actions
async def key_down(self, key: "KeyType") -> None:
await self._send_command("key_down", {"key": key})
async def key_up(self, key: "KeyType") -> None:
await self._send_command("key_up", {"key": key})
async def type_text(self, text: str) -> None:
# Temporary fix for https://github.com/trycua/cua/issues/165
# Check if text contains Unicode characters
@@ -471,6 +483,9 @@ class MacOSComputerInterface(BaseComputerInterface):
await self._send_command("hotkey", {"keys": actual_keys})
# Scrolling Actions
async def scroll(self, x: int, y: int) -> None:
await self._send_command("scroll", {"x": x, "y": y})
async def scroll_down(self, clicks: int = 1) -> None:
await self._send_command("scroll_down", {"clicks": clicks})
@@ -564,6 +579,50 @@ class MacOSComputerInterface(BaseComputerInterface):
result = await self._send_command("directory_exists", {"path": path})
return result.get("exists", False)
async def list_dir(self, path: str) -> list[str]:
result = await self._send_command("list_dir", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to list directory"))
return result.get("files", [])
async def read_text(self, path: str) -> str:
result = await self._send_command("read_text", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to read file"))
return result.get("content", "")
async def write_text(self, path: str, content: str) -> None:
result = await self._send_command("write_text", {"path": path, "content": content})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to write file"))
async def read_bytes(self, path: str) -> bytes:
result = await self._send_command("read_bytes", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to read file"))
content_b64 = result.get("content_b64", "")
return decode_base64_image(content_b64)
async def write_bytes(self, path: str, content: bytes) -> None:
result = await self._send_command("write_bytes", {"path": path, "content_b64": encode_base64_image(content)})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to write file"))
async def delete_file(self, path: str) -> None:
result = await self._send_command("delete_file", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to delete file"))
async def create_dir(self, path: str) -> None:
result = await self._send_command("create_dir", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to create directory"))
async def delete_dir(self, path: str) -> None:
result = await self._send_command("delete_dir", {"path": path})
if not result.get("success", False):
raise RuntimeError(result.get("error", "Failed to delete directory"))
async def run_command(self, command: str) -> Tuple[str, str]:
result = await self._send_command("run_command", {"command": command})
if not result.get("success", False):

View File

@@ -106,6 +106,9 @@ class Key(Enum):
# Combined key type
KeyType = Union[Key, NavigationKey, SpecialKey, ModifierKey, FunctionKey, str]
# Key type for mouse actions
MouseButton = Literal['left', 'right', 'middle']
class AccessibilityWindow(TypedDict):
"""Information about a window in the accessibility tree."""
app_name: str

141
tests/files.py Normal file
View File

@@ -0,0 +1,141 @@
"""
File System Interface Tests
Tests for the file system methods of the Computer interface (macOS).
Required environment variables:
- CUA_API_KEY: API key for C/ua cloud provider
- CUA_CONTAINER_NAME: Name of the container to use
"""
import os
import asyncio
import pytest
from pathlib import Path
import sys
import traceback
# Load environment variables from .env file
project_root = Path(__file__).parent.parent
env_file = project_root / ".env"
print(f"Loading environment from: {env_file}")
from dotenv import load_dotenv
load_dotenv(env_file)
# Add paths to sys.path if needed
pythonpath = os.environ.get("PYTHONPATH", "")
for path in pythonpath.split(":"):
if path and path not in sys.path:
sys.path.insert(0, path) # Insert at beginning to prioritize
print(f"Added to sys.path: {path}")
from computer.computer import Computer
@pytest.fixture(scope="session")
async def computer():
"""Shared Computer instance for all test cases."""
# # Create a remote Linux computer with C/ua
# computer = Computer(
# os_type="linux",
# api_key=os.getenv("CUA_API_KEY"),
# name=str(os.getenv("CUA_CONTAINER_NAME")),
# provider_type=VMProviderType.CLOUD,
# )
# Create a local macOS computer with C/ua
# computer = Computer()
# Connect to host computer
computer = Computer(use_host_computer_server=True)
try:
await computer.run()
yield computer
finally:
await computer.disconnect()
@pytest.mark.asyncio(loop_scope="session")
async def test_file_exists(computer):
tmp_path = "test_file_exists.txt"
# Ensure file does not exist
if await computer.interface.file_exists(tmp_path):
await computer.interface.delete_file(tmp_path)
exists = await computer.interface.file_exists(tmp_path)
assert exists is False, f"File {tmp_path} should not exist"
# Create file and check again
await computer.interface.write_text(tmp_path, "hello")
exists = await computer.interface.file_exists(tmp_path)
assert exists is True, f"File {tmp_path} should exist"
await computer.interface.delete_file(tmp_path)
@pytest.mark.asyncio(loop_scope="session")
async def test_directory_exists(computer):
tmp_dir = "test_directory_exists"
if await computer.interface.directory_exists(tmp_dir):
# Remove all files in directory before removing directory
files = await computer.interface.list_dir(tmp_dir)
for fname in files:
await computer.interface.delete_file(f"{tmp_dir}/{fname}")
# Remove the directory itself
await computer.interface.delete_dir(tmp_dir)
exists = await computer.interface.directory_exists(tmp_dir)
assert exists is False, f"Directory {tmp_dir} should not exist"
await computer.interface.create_dir(tmp_dir)
exists = await computer.interface.directory_exists(tmp_dir)
assert exists is True, f"Directory {tmp_dir} should exist"
# Cleanup: remove files and directory
files = await computer.interface.list_dir(tmp_dir)
for fname in files:
await computer.interface.delete_file(f"{tmp_dir}/{fname}")
await computer.interface.delete_dir(tmp_dir)
@pytest.mark.asyncio(loop_scope="session")
async def test_list_dir(computer):
tmp_dir = "test_list_dir"
if not await computer.interface.directory_exists(tmp_dir):
await computer.interface.create_dir(tmp_dir)
files = ["foo.txt", "bar.txt"]
for fname in files:
await computer.interface.write_text(f"{tmp_dir}/{fname}", "hi")
result = await computer.interface.list_dir(tmp_dir)
assert set(result) >= set(files), f"Directory {tmp_dir} should contain files {files}"
for fname in files:
await computer.interface.delete_file(f"{tmp_dir}/{fname}")
await computer.interface.delete_dir(tmp_dir)
@pytest.mark.asyncio(loop_scope="session")
async def test_read_write_text(computer):
tmp_path = "test_rw_text.txt"
content = "sample text"
await computer.interface.write_text(tmp_path, content)
read = await computer.interface.read_text(tmp_path)
assert read == content, "File content should match"
await computer.interface.delete_file(tmp_path)
@pytest.mark.asyncio(loop_scope="session")
async def test_delete_file(computer):
tmp_path = "test_delete_file.txt"
await computer.interface.write_text(tmp_path, "bye")
exists = await computer.interface.file_exists(tmp_path)
assert exists is True, "File should exist"
await computer.interface.delete_file(tmp_path)
exists = await computer.interface.file_exists(tmp_path)
assert exists is False, "File should not exist"
@pytest.mark.asyncio(loop_scope="session")
async def test_create_dir(computer):
tmp_dir = "test_create_dir"
if await computer.interface.directory_exists(tmp_dir):
await computer.interface.delete_dir(tmp_dir)
await computer.interface.create_dir(tmp_dir)
exists = await computer.interface.directory_exists(tmp_dir)
assert exists is True, "Directory should exist"
await computer.interface.delete_dir(tmp_dir)
if __name__ == "__main__":
# Run tests directly
pytest.main([__file__, "-v"])

View File

@@ -52,8 +52,7 @@ async def computer():
await computer.run()
yield computer
finally:
# await computer.stop()
pass
await computer.disconnect()
# Sample test cases