Merge branch 'main' into fix/largefiles

This commit is contained in:
Dillon DuPont
2025-07-01 09:59:29 -04:00
262 changed files with 6750 additions and 1059 deletions

View File

@@ -0,0 +1,20 @@
"""
Computer API package.
Provides a server interface for the Computer API.
"""
from __future__ import annotations
__version__: str = "0.1.0"
# Explicitly export Server for static type checkers
from .server import Server as Server # noqa: F401
__all__ = ["Server", "run_cli"]
def run_cli() -> None:
"""Entry point for CLI"""
from .cli import main
main()

View File

@@ -0,0 +1,10 @@
"""
Main entry point for running the Computer Server as a module.
This allows the server to be started with `python -m computer_server`.
"""
import sys
from .cli import main
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,83 @@
"""
Command-line interface for the Computer API server.
"""
import argparse
import logging
import sys
from typing import List, Optional
from .server import Server
logger = logging.getLogger(__name__)
def parse_args(args: Optional[List[str]] = None) -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description="Start the Computer API server")
parser.add_argument(
"--host", default="0.0.0.0", help="Host to bind the server to (default: 0.0.0.0)"
)
parser.add_argument(
"--port", type=int, default=8000, help="Port to bind the server to (default: 8000)"
)
parser.add_argument(
"--log-level",
choices=["debug", "info", "warning", "error", "critical"],
default="info",
help="Logging level (default: info)",
)
parser.add_argument(
"--ssl-keyfile",
type=str,
help="Path to SSL private key file (enables HTTPS)",
)
parser.add_argument(
"--ssl-certfile",
type=str,
help="Path to SSL certificate file (enables HTTPS)",
)
return parser.parse_args(args)
def main() -> None:
"""Main entry point for the CLI."""
args = parse_args()
# Configure logging
logging.basicConfig(
level=getattr(logging, args.log_level.upper()),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
# Create and start the server
logger.info(f"Starting CUA Computer API server on {args.host}:{args.port}...")
# Handle SSL configuration
ssl_args = {}
if args.ssl_keyfile and args.ssl_certfile:
ssl_args = {
"ssl_keyfile": args.ssl_keyfile,
"ssl_certfile": args.ssl_certfile,
}
logger.info("HTTPS mode enabled with SSL certificates")
elif args.ssl_keyfile or args.ssl_certfile:
logger.warning("Both --ssl-keyfile and --ssl-certfile are required for HTTPS. Running in HTTP mode.")
else:
logger.info("HTTP mode (no SSL certificates provided)")
server = Server(host=args.host, port=args.port, log_level=args.log_level, **ssl_args)
try:
server.start()
except KeyboardInterrupt:
logger.info("Server stopped by user")
sys.exit(0)
except Exception as e:
logger.error(f"Error starting server: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,4 @@
class BaseDioramaHandler:
"""Base Diorama handler for unsupported OSes."""
async def diorama_cmd(self, action: str, arguments: dict = None) -> dict:
return {"success": False, "error": "Diorama is not supported on this OS yet."}

View File

@@ -0,0 +1,426 @@
#!/usr/bin/env python3
"""Diorama: A virtual desktop manager for macOS"""
import os
import asyncio
import logging
import sys
import io
from typing import Union
from PIL import Image, ImageDraw
from computer_server.diorama.draw import capture_all_apps, AppActivationContext, get_frontmost_and_active_app, get_all_windows, get_running_apps
from computer_server.diorama.diorama_computer import DioramaComputer
from computer_server.handlers.macos import *
# simple, nicely formatted logging
logger = logging.getLogger(__name__)
automation_handler = MacOSAutomationHandler()
class Diorama:
_scheduler_queue = None
_scheduler_task = None
_loop = None
_scheduler_started = False
@classmethod
def create_from_apps(cls, *args) -> DioramaComputer:
cls._ensure_scheduler()
return cls(args).computer
# Dictionary to store cursor positions for each unique app_list hash
_cursor_positions = {}
def __init__(self, app_list):
self.app_list = app_list
self.interface = self.Interface(self)
self.computer = DioramaComputer(self)
self.focus_context = None
# Create a hash for this app_list to use as a key
self.app_list_hash = hash(tuple(sorted(app_list)))
# Initialize cursor position for this app_list if it doesn't exist
if self.app_list_hash not in Diorama._cursor_positions:
Diorama._cursor_positions[self.app_list_hash] = (0, 0)
@classmethod
def _ensure_scheduler(cls):
if not cls._scheduler_started:
logger.info("Starting Diorama scheduler loop…")
cls._scheduler_queue = asyncio.Queue()
cls._loop = asyncio.get_event_loop()
cls._scheduler_task = cls._loop.create_task(cls._scheduler_loop())
cls._scheduler_started = True
@classmethod
async def _scheduler_loop(cls):
while True:
cmd = await cls._scheduler_queue.get()
action = cmd.get("action")
args = cmd.get("arguments", {})
future = cmd.get("future")
logger.info(f"Processing command: {action} | args={args}")
app_whitelist = args.get("app_list", [])
all_windows = get_all_windows()
running_apps = get_running_apps()
frontmost_app, active_app_to_use, active_app_pid = get_frontmost_and_active_app(all_windows, running_apps, app_whitelist)
focus_context = AppActivationContext(active_app_pid, active_app_to_use, logger)
with focus_context:
try:
if action == "screenshot":
logger.info(f"Taking screenshot for apps: {app_whitelist}")
result, img = capture_all_apps(
app_whitelist=app_whitelist,
save_to_disk=False,
take_focus=False
)
logger.info("Screenshot complete.")
if future:
future.set_result((result, img))
# Mouse actions
elif action in ["left_click", "right_click", "double_click", "move_cursor", "drag_to"]:
x = args.get("x")
y = args.get("y")
duration = args.get("duration", 0.5)
if action == "left_click":
await automation_handler.left_click(x, y)
elif action == "right_click":
await automation_handler.right_click(x, y)
elif action == "double_click":
await automation_handler.double_click(x, y)
elif action == "move_cursor":
await automation_handler.move_cursor(x, y)
elif action == "drag_to":
await automation_handler.drag_to(x, y, duration=duration)
if future:
future.set_result(None)
elif action in ["scroll_up", "scroll_down"]:
x = args.get("x")
y = args.get("y")
if x is not None and y is not None:
await automation_handler.move_cursor(x, y)
clicks = args.get("clicks", 1)
if action == "scroll_up":
await automation_handler.scroll_up(clicks)
else:
await automation_handler.scroll_down(clicks)
if future:
future.set_result(None)
# Keyboard actions
elif action == "type_text":
text = args.get("text")
await automation_handler.type_text(text)
if future:
future.set_result(None)
elif action == "press_key":
key = args.get("key")
await automation_handler.press_key(key)
if future:
future.set_result(None)
elif action == "hotkey":
keys = args.get("keys", [])
await automation_handler.hotkey(keys)
if future:
future.set_result(None)
elif action == "get_cursor_position":
pos = await automation_handler.get_cursor_position()
if future:
future.set_result(pos)
else:
logger.warning(f"Unknown action: {action}")
if future:
future.set_exception(ValueError(f"Unknown action: {action}"))
except Exception as e:
logger.error(f"Exception during {action}: {e}", exc_info=True)
if future:
future.set_exception(e)
class Interface():
def __init__(self, diorama):
self._diorama = diorama
self._scene_hitboxes = []
self._scene_size = None
async def _send_cmd(self, action, arguments=None):
Diorama._ensure_scheduler()
loop = asyncio.get_event_loop()
future = loop.create_future()
logger.info(f"Enqueuing {action} command for apps: {self._diorama.app_list}")
await Diorama._scheduler_queue.put({
"action": action,
"arguments": {"app_list": self._diorama.app_list, **(arguments or {})},
"future": future
})
try:
return await future
except asyncio.CancelledError:
logger.warning(f"Command was cancelled: {action}")
return None
async def screenshot(self, as_bytes: bool = True) -> Union[str, Image.Image]:
import base64
result, img = await self._send_cmd("screenshot")
self._scene_hitboxes = result.get("hitboxes", [])
self._scene_size = img.size
if as_bytes:
# PIL Image to bytes, then base64 encode for JSON
import io
img_byte_arr = io.BytesIO()
img.save(img_byte_arr, format="PNG")
img_bytes = img_byte_arr.getvalue()
img_b64 = base64.b64encode(img_bytes).decode("ascii")
return img_b64
else:
return img
async def left_click(self, x, y):
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
x, y = x or last_pos[0], y or last_pos[1]
# Update cursor position for this app_list hash
Diorama._cursor_positions[app_list_hash] = (x, y)
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("left_click", {"x": sx, "y": sy})
async def right_click(self, x, y):
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
x, y = x or last_pos[0], y or last_pos[1]
# Update cursor position for this app_list hash
Diorama._cursor_positions[app_list_hash] = (x, y)
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("right_click", {"x": sx, "y": sy})
async def double_click(self, x, y):
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
x, y = x or last_pos[0], y or last_pos[1]
# Update cursor position for this app_list hash
Diorama._cursor_positions[app_list_hash] = (x, y)
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("double_click", {"x": sx, "y": sy})
async def move_cursor(self, x, y):
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
x, y = x or last_pos[0], y or last_pos[1]
# Update cursor position for this app_list hash
Diorama._cursor_positions[app_list_hash] = (x, y)
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("move_cursor", {"x": sx, "y": sy})
async def drag_to(self, x, y, duration=0.5):
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
x, y = x or last_pos[0], y or last_pos[1]
# Update cursor position for this app_list hash
Diorama._cursor_positions[app_list_hash] = (x, y)
sx, sy = await self.to_screen_coordinates(x, y)
await self._send_cmd("drag_to", {"x": sx, "y": sy, "duration": duration})
async def get_cursor_position(self):
return await self._send_cmd("get_cursor_position")
async def type_text(self, text):
await self._send_cmd("type_text", {"text": text})
async def press_key(self, key):
await self._send_cmd("press_key", {"key": key})
async def hotkey(self, keys):
await self._send_cmd("hotkey", {"keys": list(keys)})
async def scroll_up(self, clicks: int = 1):
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
x, y = last_pos[0], last_pos[1]
await self._send_cmd("scroll_up", {"clicks": clicks, "x": x, "y": y})
async def scroll_down(self, clicks: int = 1):
# Get last cursor position for this app_list hash
app_list_hash = hash(tuple(sorted(self._diorama.app_list)))
last_pos = Diorama._cursor_positions.get(app_list_hash, (0, 0))
x, y = last_pos[0], last_pos[1]
await self._send_cmd("scroll_down", {"clicks": clicks, "x": x, "y": y})
async def get_screen_size(self) -> dict[str, int]:
if not self._scene_size:
await self.screenshot()
return { "width": self._scene_size[0], "height": self._scene_size[1] }
async def to_screen_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""Convert screenshot coordinates to screen coordinates.
Args:
x: X absolute coordinate in screenshot space
y: Y absolute coordinate in screenshot space
Returns:
tuple[float, float]: (x, y) absolute coordinates in screen space
"""
if not self._scene_hitboxes:
await self.screenshot() # get hitboxes
# Try all hitboxes
for h in self._scene_hitboxes[::-1]:
rect_from = h.get("hitbox")
rect_to = h.get("target")
if not rect_from or len(rect_from) != 4:
continue
# check if (x, y) is inside rect_from
x0, y0, x1, y1 = rect_from
if x0 <= x <= x1 and y0 <= y <= y1:
logger.info(f"Found hitbox: {h}")
# remap (x, y) to rect_to
tx0, ty0, tx1, ty1 = rect_to
# calculate offset from x0, y0
offset_x = x - x0
offset_y = y - y0
# remap offset to rect_to
tx = tx0 + offset_x
ty = ty0 + offset_y
return tx, ty
return x, y
async def to_screenshot_coordinates(self, x: float, y: float) -> tuple[float, float]:
"""Convert screen coordinates to screenshot coordinates.
Args:
x: X absolute coordinate in screen space
y: Y absolute coordinate in screen space
Returns:
tuple[float, float]: (x, y) absolute coordinates in screenshot space
"""
if not self._scene_hitboxes:
await self.screenshot() # get hitboxes
# Try all hitboxes
for h in self._scene_hitboxes[::-1]:
rect_from = h.get("target")
rect_to = h.get("hitbox")
if not rect_from or len(rect_from) != 4:
continue
# check if (x, y) is inside rect_from
x0, y0, x1, y1 = rect_from
if x0 <= x <= x1 and y0 <= y <= y1:
# remap (x, y) to rect_to
tx0, ty0, tx1, ty1 = rect_to
# calculate offset from x0, y0
offset_x = x - x0
offset_y = y - y0
# remap offset to rect_to
tx = tx0 + offset_x
ty = ty0 + offset_y
return tx, ty
return x, y
import pyautogui
import time
async def main():
desktop1 = Diorama.create_from_apps(["Discord", "Notes"])
desktop2 = Diorama.create_from_apps(["Terminal"])
img1 = await desktop1.interface.screenshot(as_bytes=False)
img2 = await desktop2.interface.screenshot(as_bytes=False)
img1.save("app_screenshots/desktop1.png")
img2.save("app_screenshots/desktop2.png")
# Initialize Diorama desktop
desktop3 = Diorama.create_from_apps("Safari")
screen_size = await desktop3.interface.get_screen_size()
print(screen_size)
# Take initial screenshot
img = await desktop3.interface.screenshot(as_bytes=False)
img.save("app_screenshots/desktop3.png")
# Prepare hitboxes and draw on the single screenshot
hitboxes = desktop3.interface._scene_hitboxes[::-1]
base_img = img.copy()
draw = ImageDraw.Draw(base_img)
for h in hitboxes:
rect = h.get("hitbox")
if not rect or len(rect) != 4:
continue
draw.rectangle(rect, outline="red", width=2)
# Track and draw mouse position in real time (single screenshot size)
last_mouse_pos = None
print("Tracking mouse... Press Ctrl+C to stop.")
try:
while True:
mouse_x, mouse_y = pyautogui.position()
if last_mouse_pos != (mouse_x, mouse_y):
last_mouse_pos = (mouse_x, mouse_y)
# Map to screenshot coordinates
sx, sy = await desktop3.interface.to_screenshot_coordinates(mouse_x, mouse_y)
# Draw on a copy of the screenshot
frame = base_img.copy()
frame_draw = ImageDraw.Draw(frame)
frame_draw.ellipse((sx-5, sy-5, sx+5, sy+5), fill="blue", outline="blue")
# Save the frame
frame.save("app_screenshots/desktop3_mouse.png")
print(f"Mouse at screen ({mouse_x}, {mouse_y}) -> screenshot ({sx:.1f}, {sy:.1f})")
time.sleep(0.05) # Throttle updates to ~20 FPS
except KeyboardInterrupt:
print("Stopped tracking.")
draw.text((rect[0], rect[1]), str(idx), fill="red")
canvas.save("app_screenshots/desktop3_hitboxes.png")
# move mouse in a square spiral around the screen
import math
import random
step = 20 # pixels per move
dot_radius = 10
width = screen_size["width"]
height = screen_size["height"]
x, y = 0, 10
while x < width and y < height:
await desktop3.interface.move_cursor(x, y)
img = await desktop3.interface.screenshot(as_bytes=False)
draw = ImageDraw.Draw(img)
draw.ellipse((x-dot_radius, y-dot_radius, x+dot_radius, y+dot_radius), fill="red")
img.save("current.png")
await asyncio.sleep(0.03)
x += step
y = math.sin(x / width * math.pi * 2) * 50 + 25
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,26 @@
import asyncio
class DioramaComputer:
"""
A minimal Computer-like interface for Diorama, compatible with ComputerAgent.
Implements _initialized, run(), and __aenter__ for agent compatibility.
"""
def __init__(self, diorama):
self.diorama = diorama
self.interface = self.diorama.interface
self._initialized = False
async def __aenter__(self):
# Ensure the event loop is running (for compatibility)
try:
asyncio.get_running_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
self._initialized = True
return self
async def run(self):
# This is a stub for compatibility
if not self._initialized:
await self.__aenter__()
return self

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,33 @@
import platform
import sys
import platform
import inspect
from computer_server.diorama.diorama import Diorama
from computer_server.diorama.base import BaseDioramaHandler
from typing import Optional
class MacOSDioramaHandler(BaseDioramaHandler):
"""Handler for Diorama commands on macOS, using local diorama module."""
async def diorama_cmd(self, action: str, arguments: Optional[dict] = None) -> dict:
if platform.system().lower() != "darwin":
return {"success": False, "error": "Diorama is only supported on macOS."}
try:
app_list = arguments.get("app_list") if arguments else None
if not app_list:
return {"success": False, "error": "Missing 'app_list' in arguments"}
diorama = Diorama(app_list)
interface = diorama.interface
if not hasattr(interface, action):
return {"success": False, "error": f"Unknown diorama action: {action}"}
method = getattr(interface, action)
# Remove app_list from arguments before calling the method
filtered_arguments = dict(arguments)
filtered_arguments.pop("app_list", None)
if inspect.iscoroutinefunction(method):
result = await method(**(filtered_arguments or {}))
else:
result = method(**(filtered_arguments or {}))
return {"success": True, "result": result}
except Exception as e:
import traceback
return {"success": False, "error": str(e), "trace": traceback.format_exc()}

View File

@@ -0,0 +1,199 @@
#!/usr/bin/env python3
"""
UI Safezone Helper - A utility to get accurate bounds for macOS UI elements
This module provides helper functions to get accurate bounds for macOS UI elements
like the menubar and dock, which are needed for proper screenshot composition.
"""
import sys
import time
from typing import Dict, Any, Optional, Tuple
# Import Objective-C bridge libraries
try:
import AppKit
from ApplicationServices import (
AXUIElementCreateSystemWide,
AXUIElementCreateApplication,
AXUIElementCopyAttributeValue,
AXUIElementCopyAttributeValues,
kAXChildrenAttribute,
kAXRoleAttribute,
kAXTitleAttribute,
kAXPositionAttribute,
kAXSizeAttribute,
kAXErrorSuccess,
AXValueGetType,
kAXValueCGSizeType,
kAXValueCGPointType,
AXUIElementGetTypeID,
AXValueGetValue,
kAXMenuBarAttribute,
)
from AppKit import NSWorkspace, NSRunningApplication
import Foundation
except ImportError:
print("Error: This script requires PyObjC to be installed.")
print("Please install it with: pip install pyobjc")
sys.exit(1)
# Constants for accessibility API
kAXErrorSuccess = 0
kAXRoleAttribute = "AXRole"
kAXSubroleAttribute = "AXSubrole"
kAXTitleAttribute = "AXTitle"
kAXPositionAttribute = "AXPosition"
kAXSizeAttribute = "AXSize"
kAXChildrenAttribute = "AXChildren"
kAXMenuBarAttribute = "AXMenuBar"
def element_attribute(element, attribute):
"""Get an attribute from an accessibility element"""
if attribute == kAXChildrenAttribute:
err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None)
if err == kAXErrorSuccess:
if isinstance(value, Foundation.NSArray):
return list(value)
else:
return value
err, value = AXUIElementCopyAttributeValue(element, attribute, None)
if err == kAXErrorSuccess:
return value
return None
def element_value(element, type):
"""Get a value from an accessibility element"""
err, value = AXValueGetValue(element, type, None)
if err == True:
return value
return None
def get_element_bounds(element):
"""Get the bounds of an accessibility element"""
bounds = {
"x": 0,
"y": 0,
"width": 0,
"height": 0
}
# Get position
position_value = element_attribute(element, kAXPositionAttribute)
if position_value:
position_value = element_value(position_value, kAXValueCGPointType)
if position_value:
bounds["x"] = position_value.x
bounds["y"] = position_value.y
# Get size
size_value = element_attribute(element, kAXSizeAttribute)
if size_value:
size_value = element_value(size_value, kAXValueCGSizeType)
if size_value:
bounds["width"] = size_value.width
bounds["height"] = size_value.height
return bounds
def find_dock_process():
"""Find the Dock process"""
running_apps = NSWorkspace.sharedWorkspace().runningApplications()
for app in running_apps:
if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock":
return app.processIdentifier()
return None
def get_menubar_bounds():
"""Get the bounds of the macOS menubar
Returns:
Dictionary with x, y, width, height of the menubar
"""
# Get the system-wide accessibility element
system_element = AXUIElementCreateSystemWide()
# Try to find the menubar
menubar = element_attribute(system_element, kAXMenuBarAttribute)
if menubar is None:
# If we can't get it directly, try through the frontmost app
frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()
if frontmost_app:
app_pid = frontmost_app.processIdentifier()
app_element = AXUIElementCreateApplication(app_pid)
menubar = element_attribute(app_element, kAXMenuBarAttribute)
if menubar is None:
print("Error: Could not get menubar")
# Return default menubar bounds as fallback
return {"x": 0, "y": 0, "width": 1800, "height": 24}
# Get menubar bounds
return get_element_bounds(menubar)
def get_dock_bounds():
"""Get the bounds of the macOS Dock
Returns:
Dictionary with x, y, width, height of the Dock
"""
dock_pid = find_dock_process()
if dock_pid is None:
print("Error: Could not find Dock process")
# Return empty bounds as fallback
return {"x": 0, "y": 0, "width": 0, "height": 0}
# Create an accessibility element for the Dock
dock_element = AXUIElementCreateApplication(dock_pid)
if dock_element is None:
print(f"Error: Could not create accessibility element for Dock (PID {dock_pid})")
return {"x": 0, "y": 0, "width": 0, "height": 0}
# Get the Dock's children
children = element_attribute(dock_element, kAXChildrenAttribute)
if not children or len(children) == 0:
print("Error: Could not get Dock children")
return {"x": 0, "y": 0, "width": 0, "height": 0}
# Find the Dock's list (first child is usually the main dock list)
dock_list = None
for child in children:
role = element_attribute(child, kAXRoleAttribute)
if role == "AXList":
dock_list = child
break
if dock_list is None:
print("Error: Could not find Dock list")
return {"x": 0, "y": 0, "width": 0, "height": 0}
# Get the bounds of the dock list
return get_element_bounds(dock_list)
def get_ui_element_bounds():
"""Get the bounds of important UI elements like menubar and dock
Returns:
Dictionary with menubar and dock bounds
"""
menubar_bounds = get_menubar_bounds()
dock_bounds = get_dock_bounds()
return {
"menubar": menubar_bounds,
"dock": dock_bounds
}
if __name__ == "__main__":
# Example usage
bounds = get_ui_element_bounds()
print("Menubar bounds:", bounds["menubar"])
print("Dock bounds:", bounds["dock"])

View File

@@ -0,0 +1,220 @@
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, List, Tuple
class BaseAccessibilityHandler(ABC):
"""Abstract base class for OS-specific accessibility handlers."""
@abstractmethod
async def get_accessibility_tree(self) -> Dict[str, Any]:
"""Get the accessibility tree of the current window."""
pass
@abstractmethod
async def find_element(self, role: Optional[str] = None,
title: Optional[str] = None,
value: Optional[str] = None) -> Dict[str, Any]:
"""Find an element in the accessibility tree by criteria."""
pass
class BaseFileHandler(ABC):
"""Abstract base class for OS-specific file handlers."""
@abstractmethod
async def file_exists(self, path: str) -> Dict[str, Any]:
"""Check if a file exists at the specified path."""
pass
@abstractmethod
async def directory_exists(self, path: str) -> Dict[str, Any]:
"""Check if a directory exists at the specified path."""
pass
@abstractmethod
async def list_dir(self, path: str) -> Dict[str, Any]:
"""List the contents of a directory."""
pass
@abstractmethod
async def read_text(self, path: str) -> Dict[str, Any]:
"""Read the text contents of a file."""
pass
@abstractmethod
async def write_text(self, path: str, content: str) -> Dict[str, Any]:
"""Write text content to a file."""
pass
@abstractmethod
async def write_bytes(self, path: str, content_b64: str) -> Dict[str, Any]:
"""Write binary content to a file. Sent over the websocket as a base64 string."""
pass
@abstractmethod
async def delete_file(self, path: str) -> Dict[str, Any]:
"""Delete a file."""
pass
@abstractmethod
async def create_dir(self, path: str) -> Dict[str, Any]:
"""Create a directory."""
pass
@abstractmethod
async def delete_dir(self, path: str) -> Dict[str, Any]:
"""Delete a directory."""
pass
@abstractmethod
async def read_bytes(self, path: str, offset: int = 0, length: Optional[int] = None) -> Dict[str, Any]:
"""Read the binary contents of a file. Sent over the websocket as a base64 string.
Args:
path: Path to the file
offset: Byte offset to start reading from (default: 0)
length: Number of bytes to read (default: None for entire file)
"""
pass
@abstractmethod
async def get_file_size(self, path: str) -> Dict[str, Any]:
"""Get the size of a file in bytes."""
pass
class BaseAutomationHandler(ABC):
"""Abstract base class for OS-specific automation handlers.
Categories:
- Mouse Actions: Methods for mouse control
- Keyboard Actions: Methods for keyboard input
- Scrolling Actions: Methods for scrolling
- Screen Actions: Methods for screen interaction
- Clipboard Actions: Methods for clipboard operations
"""
# Mouse Actions
@abstractmethod
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
"""Perform a mouse down at the current or specified position."""
pass
@abstractmethod
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
"""Perform a mouse up at the current or specified position."""
pass
@abstractmethod
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
"""Perform a left click at the current or specified position."""
pass
@abstractmethod
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
"""Perform a right click at the current or specified position."""
pass
@abstractmethod
async def double_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
"""Perform a double click at the current or specified position."""
pass
@abstractmethod
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
"""Move the cursor to the specified position."""
pass
@abstractmethod
async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
"""Drag the cursor from current position to specified coordinates.
Args:
x: The x coordinate to drag to
y: The y coordinate to drag to
button: The mouse button to use ('left', 'middle', 'right')
duration: How long the drag should take in seconds
"""
pass
@abstractmethod
async def drag(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
"""Drag the cursor from current position to specified coordinates.
Args:
path: A list of tuples of x and y coordinates to drag to
button: The mouse button to use ('left', 'middle', 'right')
duration: How long the drag should take in seconds
"""
pass
# Keyboard Actions
@abstractmethod
async def key_down(self, key: str) -> Dict[str, Any]:
"""Press and hold the specified key."""
pass
@abstractmethod
async def key_up(self, key: str) -> Dict[str, Any]:
"""Release the specified key."""
pass
@abstractmethod
async def type_text(self, text: str) -> Dict[str, Any]:
"""Type the specified text."""
pass
@abstractmethod
async def press_key(self, key: str) -> Dict[str, Any]:
"""Press the specified key."""
pass
@abstractmethod
async def hotkey(self, *keys: str) -> Dict[str, Any]:
"""Press a combination of keys together."""
pass
# Scrolling Actions
@abstractmethod
async def scroll(self, x: int, y: int) -> Dict[str, Any]:
"""Scroll the specified amount."""
pass
@abstractmethod
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
"""Scroll down by the specified number of clicks."""
pass
@abstractmethod
async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
"""Scroll up by the specified number of clicks."""
pass
# Screen Actions
@abstractmethod
async def screenshot(self) -> Dict[str, Any]:
"""Take a screenshot and return base64 encoded image data."""
pass
@abstractmethod
async def get_screen_size(self) -> Dict[str, Any]:
"""Get the screen size of the VM."""
pass
@abstractmethod
async def get_cursor_position(self) -> Dict[str, Any]:
"""Get the current cursor position."""
pass
# Clipboard Actions
@abstractmethod
async def copy_to_clipboard(self) -> Dict[str, Any]:
"""Get the current clipboard content."""
pass
@abstractmethod
async def set_clipboard(self, text: str) -> Dict[str, Any]:
"""Set the clipboard content."""
pass
@abstractmethod
async def run_command(self, command: str) -> Dict[str, Any]:
"""Run a command and return the output."""
pass

View File

@@ -0,0 +1,68 @@
import platform
import subprocess
from typing import Tuple, Type
from .base import BaseAccessibilityHandler, BaseAutomationHandler, BaseFileHandler
from computer_server.diorama.base import BaseDioramaHandler
# Conditionally import platform-specific handlers
system = platform.system().lower()
if system == 'darwin':
from .macos import MacOSAccessibilityHandler, MacOSAutomationHandler
from computer_server.diorama.macos import MacOSDioramaHandler
elif system == 'linux':
from .linux import LinuxAccessibilityHandler, LinuxAutomationHandler
elif system == 'windows':
from .windows import WindowsAccessibilityHandler, WindowsAutomationHandler
from .generic import GenericFileHandler
class HandlerFactory:
"""Factory for creating OS-specific handlers."""
@staticmethod
def _get_current_os() -> str:
"""Determine the current OS.
Returns:
str: The OS type ('darwin' for macOS, 'linux' for Linux, or 'windows' for Windows)
Raises:
RuntimeError: If unable to determine the current OS
"""
try:
# Use platform.system() as primary method
system = platform.system().lower()
if system in ['darwin', 'linux', 'windows']:
return system
# Fallback to uname if platform.system() doesn't return expected values (Unix-like systems only)
result = subprocess.run(['uname', '-s'], capture_output=True, text=True)
if result.returncode == 0:
return result.stdout.strip().lower()
raise RuntimeError(f"Unsupported OS: {system}")
except Exception as e:
raise RuntimeError(f"Failed to determine current OS: {str(e)}")
@staticmethod
def create_handlers() -> Tuple[BaseAccessibilityHandler, BaseAutomationHandler, BaseDioramaHandler, BaseFileHandler]:
"""Create and return appropriate handlers for the current OS.
Returns:
Tuple[BaseAccessibilityHandler, BaseAutomationHandler, BaseDioramaHandler, BaseFileHandler]: A tuple containing
the appropriate accessibility, automation, diorama, and file handlers for the current OS.
Raises:
NotImplementedError: If the current OS is not supported
RuntimeError: If unable to determine the current OS
"""
os_type = HandlerFactory._get_current_os()
if os_type == 'darwin':
return MacOSAccessibilityHandler(), MacOSAutomationHandler(), MacOSDioramaHandler(), GenericFileHandler()
elif os_type == 'linux':
return LinuxAccessibilityHandler(), LinuxAutomationHandler(), BaseDioramaHandler(), GenericFileHandler()
elif os_type == 'windows':
return WindowsAccessibilityHandler(), WindowsAutomationHandler(), BaseDioramaHandler(), GenericFileHandler()
else:
raise NotImplementedError(f"OS '{os_type}' is not supported")

View File

@@ -0,0 +1,100 @@
"""
Generic handlers for all OSes.
Includes:
- FileHandler
"""
from pathlib import Path
from typing import Dict, Any, Optional
from .base import BaseFileHandler
import base64
def resolve_path(path: str) -> Path:
"""Resolve a path to its absolute path. Expand ~ to the user's home directory."""
return Path(path).expanduser().resolve()
class GenericFileHandler(BaseFileHandler):
async def file_exists(self, path: str) -> Dict[str, Any]:
try:
return {"success": True, "exists": resolve_path(path).is_file()}
except Exception as e:
return {"success": False, "error": str(e)}
async def directory_exists(self, path: str) -> Dict[str, Any]:
try:
return {"success": True, "exists": resolve_path(path).is_dir()}
except Exception as e:
return {"success": False, "error": str(e)}
async def list_dir(self, path: str) -> Dict[str, Any]:
try:
return {"success": True, "files": [p.name for p in resolve_path(path).iterdir() if p.is_file() or p.is_dir()]}
except Exception as e:
return {"success": False, "error": str(e)}
async def read_text(self, path: str) -> Dict[str, Any]:
try:
return {"success": True, "content": resolve_path(path).read_text()}
except Exception as e:
return {"success": False, "error": str(e)}
async def write_text(self, path: str, content: str) -> Dict[str, Any]:
try:
resolve_path(path).write_text(content)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def write_bytes(self, path: str, content_b64: str) -> Dict[str, Any]:
try:
resolve_path(path).write_bytes(base64.b64decode(content_b64))
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def read_bytes(self, path: str, offset: int = 0, length: Optional[int] = None) -> Dict[str, Any]:
try:
file_path = resolve_path(path)
with open(file_path, 'rb') as f:
if offset > 0:
f.seek(offset)
if length is not None:
content = f.read(length)
else:
content = f.read()
return {"success": True, "content_b64": base64.b64encode(content).decode('utf-8')}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_file_size(self, path: str) -> Dict[str, Any]:
try:
file_path = resolve_path(path)
size = file_path.stat().st_size
return {"success": True, "size": size}
except Exception as e:
return {"success": False, "error": str(e)}
async def delete_file(self, path: str) -> Dict[str, Any]:
try:
resolve_path(path).unlink()
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def create_dir(self, path: str) -> Dict[str, Any]:
try:
resolve_path(path).mkdir(parents=True, exist_ok=True)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def delete_dir(self, path: str) -> Dict[str, Any]:
try:
resolve_path(path).rmdir()
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}

View File

@@ -0,0 +1,284 @@
"""
Linux implementation of automation and accessibility handlers.
This implementation attempts to use pyautogui for GUI automation when available.
If running in a headless environment without X11, it will fall back to simulated responses.
To use GUI automation in a headless environment:
1. Install Xvfb: sudo apt-get install xvfb
2. Run with virtual display: xvfb-run python -m computer_server
"""
from typing import Dict, Any, List, Tuple, Optional
import logging
import subprocess
import base64
import os
import json
from io import BytesIO
# Configure logger
logger = logging.getLogger(__name__)
# Try to import pyautogui, but don't fail if it's not available
# This allows the server to run in headless environments
try:
import pyautogui
logger.info("pyautogui successfully imported, GUI automation available")
except Exception as e:
logger.warning(f"pyautogui import failed: {str(e)}. GUI operations will be simulated.")
from .base import BaseAccessibilityHandler, BaseAutomationHandler
class LinuxAccessibilityHandler(BaseAccessibilityHandler):
"""Linux implementation of accessibility handler."""
async def get_accessibility_tree(self) -> Dict[str, Any]:
"""Get the accessibility tree of the current window."""
# Linux doesn't have equivalent accessibility API like macOS
# Return a minimal dummy tree
logger.info("Getting accessibility tree (simulated, no accessibility API available on Linux)")
return {
"success": True,
"tree": {
"role": "Window",
"title": "Linux Window",
"position": {"x": 0, "y": 0},
"size": {"width": 1920, "height": 1080},
"children": []
}
}
async def find_element(self, role: Optional[str] = None,
title: Optional[str] = None,
value: Optional[str] = None) -> Dict[str, Any]:
"""Find an element in the accessibility tree by criteria."""
logger.info(f"Finding element with role={role}, title={title}, value={value} (not supported on Linux)")
return {
"success": False,
"message": "Element search not supported on Linux"
}
def get_cursor_position(self) -> Tuple[int, int]:
"""Get the current cursor position."""
try:
pos = pyautogui.position()
return pos.x, pos.y
except Exception as e:
logger.warning(f"Failed to get cursor position with pyautogui: {e}")
logger.info("Getting cursor position (simulated)")
return 0, 0
def get_screen_size(self) -> Tuple[int, int]:
"""Get the screen size."""
try:
size = pyautogui.size()
return size.width, size.height
except Exception as e:
logger.warning(f"Failed to get screen size with pyautogui: {e}")
logger.info("Getting screen size (simulated)")
return 1920, 1080
class LinuxAutomationHandler(BaseAutomationHandler):
"""Linux implementation of automation handler using pyautogui."""
# Mouse Actions
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.mouseDown(button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.mouseUp(button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
try:
pyautogui.moveTo(x, y)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.click()
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.rightClick()
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def double_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.doubleClick(interval=0.1)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def click(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.click(button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
try:
pyautogui.dragTo(x, y, duration=duration, button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def drag(self, start_x: int, start_y: int, end_x: int, end_y: int, button: str = "left") -> Dict[str, Any]:
try:
pyautogui.moveTo(start_x, start_y)
pyautogui.dragTo(end_x, end_y, duration=0.5, button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def drag_path(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
try:
if not path:
return {"success": False, "error": "Path is empty"}
pyautogui.moveTo(*path[0])
for x, y in path[1:]:
pyautogui.dragTo(x, y, duration=duration, button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Keyboard Actions
async def key_down(self, key: str) -> Dict[str, Any]:
try:
pyautogui.keyDown(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def key_up(self, key: str) -> Dict[str, Any]:
try:
pyautogui.keyUp(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def type_text(self, text: str) -> Dict[str, Any]:
try:
pyautogui.write(text)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def press_key(self, key: str) -> Dict[str, Any]:
try:
pyautogui.press(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def hotkey(self, keys: List[str]) -> Dict[str, Any]:
try:
pyautogui.hotkey(*keys)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Scrolling Actions
async def scroll(self, x: int, y: int) -> Dict[str, Any]:
try:
pyautogui.scroll(x, y)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
try:
pyautogui.scroll(-clicks)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
try:
pyautogui.scroll(clicks)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Screen Actions
async def screenshot(self) -> Dict[str, Any]:
try:
from PIL import Image
screenshot = pyautogui.screenshot()
if not isinstance(screenshot, Image.Image):
return {"success": False, "error": "Failed to capture screenshot"}
buffered = BytesIO()
screenshot.save(buffered, format="PNG", optimize=True)
buffered.seek(0)
image_data = base64.b64encode(buffered.getvalue()).decode()
return {"success": True, "image_data": image_data}
except Exception as e:
return {"success": False, "error": f"Screenshot error: {str(e)}"}
async def get_screen_size(self) -> Dict[str, Any]:
try:
size = pyautogui.size()
return {"success": True, "size": {"width": size.width, "height": size.height}}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_cursor_position(self) -> Dict[str, Any]:
try:
pos = pyautogui.position()
return {"success": True, "position": {"x": pos.x, "y": pos.y}}
except Exception as e:
return {"success": False, "error": str(e)}
# Clipboard Actions
async def copy_to_clipboard(self) -> Dict[str, Any]:
try:
import pyperclip
content = pyperclip.paste()
return {"success": True, "content": content}
except Exception as e:
return {"success": False, "error": str(e)}
async def set_clipboard(self, text: str) -> Dict[str, Any]:
try:
import pyperclip
pyperclip.copy(text)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Command Execution
async def run_command(self, command: str) -> Dict[str, Any]:
try:
process = subprocess.run(command, shell=True, capture_output=True, text=True)
return {"success": True, "stdout": process.stdout, "stderr": process.stderr, "return_code": process.returncode}
except Exception as e:
return {"success": False, "error": str(e)}

View File

@@ -0,0 +1,943 @@
import pyautogui
from pynput.mouse import Button, Controller as MouseController
from pynput.keyboard import Key, Controller as KeyboardController
import time
import base64
from io import BytesIO
from typing import Optional, Dict, Any, List, Tuple
from ctypes import byref, c_void_p, POINTER
from AppKit import NSWorkspace # type: ignore
import AppKit
from Quartz.CoreGraphics import * # type: ignore
from Quartz.CoreGraphics import CGPoint, CGSize # type: ignore
import Foundation
from ApplicationServices import (
AXUIElementCreateSystemWide, # type: ignore
AXUIElementCreateApplication, # type: ignore
AXUIElementCopyAttributeValue, # type: ignore
AXUIElementCopyAttributeValues, # type: ignore
kAXFocusedWindowAttribute, # type: ignore
kAXWindowsAttribute, # type: ignore
kAXMainWindowAttribute, # type: ignore
kAXChildrenAttribute, # type: ignore
kAXRoleAttribute, # type: ignore
kAXTitleAttribute, # type: ignore
kAXValueAttribute, # type: ignore
kAXDescriptionAttribute, # type: ignore
kAXEnabledAttribute, # type: ignore
kAXPositionAttribute, # type: ignore
kAXSizeAttribute, # type: ignore
kAXErrorSuccess, # type: ignore
AXValueGetType, # type: ignore
kAXValueCGSizeType, # type: ignore
kAXValueCGPointType, # type: ignore
kAXValueCFRangeType, # type: ignore
AXUIElementGetTypeID, # type: ignore
AXValueGetValue, # type: ignore
kAXVisibleChildrenAttribute, # type: ignore
kAXRoleDescriptionAttribute, # type: ignore
kAXFocusedApplicationAttribute, # type: ignore
kAXFocusedUIElementAttribute, # type: ignore
kAXSelectedTextAttribute, # type: ignore
kAXSelectedTextRangeAttribute, # type: ignore
)
import objc
import re
import json
import copy
from .base import BaseAccessibilityHandler, BaseAutomationHandler
import logging
logger = logging.getLogger(__name__)
# Constants for accessibility API
kAXErrorSuccess = 0
kAXRoleAttribute = "AXRole"
kAXTitleAttribute = "AXTitle"
kAXValueAttribute = "AXValue"
kAXWindowsAttribute = "AXWindows"
kAXFocusedAttribute = "AXFocused"
kAXPositionAttribute = "AXPosition"
kAXSizeAttribute = "AXSize"
kAXChildrenAttribute = "AXChildren"
kAXMenuBarAttribute = "AXMenuBar"
kAXMenuBarItemAttribute = "AXMenuBarItem"
# Constants for window properties
kCGWindowLayer = "kCGWindowLayer" # Z-order information (lower values are higher in the stack)
kCGWindowAlpha = "kCGWindowAlpha" # Window opacity
# Constants for application activation options
NSApplicationActivationOptions = {
"regular": 0, # Default activation
"bringing_all_windows_forward": 1 << 0, # NSApplicationActivateAllWindows
"ignoring_other_apps": 1 << 1 # NSApplicationActivateIgnoringOtherApps
}
def CFAttributeToPyObject(attrValue):
def list_helper(list_value):
list_builder = []
for item in list_value:
list_builder.append(CFAttributeToPyObject(item))
return list_builder
def number_helper(number_value):
success, int_value = Foundation.CFNumberGetValue( # type: ignore
number_value, Foundation.kCFNumberIntType, None # type: ignore
)
if success:
return int(int_value)
success, float_value = Foundation.CFNumberGetValue( # type: ignore
number_value, Foundation.kCFNumberDoubleType, None # type: ignore
)
if success:
return float(float_value)
return None
def axuielement_helper(element_value):
return element_value
cf_attr_type = Foundation.CFGetTypeID(attrValue) # type: ignore
cf_type_mapping = {
Foundation.CFStringGetTypeID(): str, # type: ignore
Foundation.CFBooleanGetTypeID(): bool, # type: ignore
Foundation.CFArrayGetTypeID(): list_helper, # type: ignore
Foundation.CFNumberGetTypeID(): number_helper, # type: ignore
AXUIElementGetTypeID(): axuielement_helper, # type: ignore
}
try:
return cf_type_mapping[cf_attr_type](attrValue)
except KeyError:
# did not get a supported CF type. Move on to AX type
pass
ax_attr_type = AXValueGetType(attrValue)
ax_type_map = {
kAXValueCGSizeType: Foundation.NSSizeFromString, # type: ignore
kAXValueCGPointType: Foundation.NSPointFromString, # type: ignore
kAXValueCFRangeType: Foundation.NSRangeFromString, # type: ignore
}
try:
search_result = re.search("{.*}", attrValue.description())
if search_result:
extracted_str = search_result.group()
return tuple(ax_type_map[ax_attr_type](extracted_str))
return None
except KeyError:
return None
def element_attribute(element, attribute):
if attribute == kAXChildrenAttribute:
err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None)
if err == kAXErrorSuccess:
if isinstance(value, Foundation.NSArray): # type: ignore
return CFAttributeToPyObject(value)
else:
return value
err, value = AXUIElementCopyAttributeValue(element, attribute, None)
if err == kAXErrorSuccess:
if isinstance(value, Foundation.NSArray): # type: ignore
return CFAttributeToPyObject(value)
else:
return value
return None
def element_value(element, type):
err, value = AXValueGetValue(element, type, None)
if err == True:
return value
return None
class UIElement:
def __init__(self, element, offset_x=0, offset_y=0, max_depth=None, parents_visible_bbox=None):
self.ax_element = element
self.content_identifier = ""
self.identifier = ""
self.name = ""
self.children = []
self.description = ""
self.role_description = ""
self.value = None
self.max_depth = max_depth
# Set role
self.role = element_attribute(element, kAXRoleAttribute)
if self.role is None:
self.role = "No role"
# Set name
self.name = element_attribute(element, kAXTitleAttribute)
if self.name is not None:
# Convert tuple to string if needed
if isinstance(self.name, tuple):
self.name = str(self.name[0]) if self.name else ""
self.name = self.name.replace(" ", "_")
# Set enabled
self.enabled = element_attribute(element, kAXEnabledAttribute)
if self.enabled is None:
self.enabled = False
# Set position and size
position = element_attribute(element, kAXPositionAttribute)
size = element_attribute(element, kAXSizeAttribute)
start_position = element_value(position, kAXValueCGPointType)
if self.role == "AXWindow" and start_position is not None:
offset_x = start_position.x
offset_y = start_position.y
self.absolute_position = copy.copy(start_position)
self.position = start_position
if self.position is not None:
self.position.x -= max(0, offset_x)
self.position.y -= max(0, offset_y)
self.size = element_value(size, kAXValueCGSizeType)
self._set_bboxes(parents_visible_bbox)
# Set component center
if start_position is None or self.size is None:
print("Position is None")
return
self.center = (
start_position.x + offset_x + self.size.width / 2,
start_position.y + offset_y + self.size.height / 2,
)
self.description = element_attribute(element, kAXDescriptionAttribute)
self.role_description = element_attribute(element, kAXRoleDescriptionAttribute)
attribute_value = element_attribute(element, kAXValueAttribute)
# Set value
self.value = attribute_value
if attribute_value is not None:
if isinstance(attribute_value, Foundation.NSArray): # type: ignore
self.value = []
for value in attribute_value:
self.value.append(value)
# Check if it's an accessibility element by checking its type ID
elif Foundation.CFGetTypeID(attribute_value) == AXUIElementGetTypeID(): # type: ignore
self.value = UIElement(attribute_value, offset_x, offset_y)
# Set children
if self.max_depth is None or self.max_depth > 0:
self.children = self._get_children(element, start_position, offset_x, offset_y)
else:
self.children = []
self.calculate_hashes()
def _set_bboxes(self, parents_visible_bbox):
if not self.absolute_position or not self.size:
self.bbox = None
self.visible_bbox = None
return
self.bbox = [
int(self.absolute_position.x),
int(self.absolute_position.y),
int(self.absolute_position.x + self.size.width),
int(self.absolute_position.y + self.size.height),
]
if parents_visible_bbox:
# check if not intersected
if (
self.bbox[0] > parents_visible_bbox[2]
or self.bbox[1] > parents_visible_bbox[3]
or self.bbox[2] < parents_visible_bbox[0]
or self.bbox[3] < parents_visible_bbox[1]
):
self.visible_bbox = None
else:
self.visible_bbox = [
int(max(self.bbox[0], parents_visible_bbox[0])),
int(max(self.bbox[1], parents_visible_bbox[1])),
int(min(self.bbox[2], parents_visible_bbox[2])),
int(min(self.bbox[3], parents_visible_bbox[3])),
]
else:
self.visible_bbox = self.bbox
def _get_children(self, element, start_position, offset_x, offset_y):
children = element_attribute(element, kAXChildrenAttribute)
visible_children = element_attribute(element, kAXVisibleChildrenAttribute)
found_children = []
if children is not None:
found_children.extend(children)
else:
if visible_children is not None:
found_children.extend(visible_children)
result = []
if self.max_depth is None or self.max_depth > 0:
for child in found_children:
child = UIElement(
child,
offset_x,
offset_y,
self.max_depth - 1 if self.max_depth is not None else None,
self.visible_bbox,
)
result.append(child)
return result
def calculate_hashes(self):
self.identifier = self.component_hash()
self.content_identifier = self.children_content_hash(self.children)
def component_hash(self):
if self.position is None or self.size is None:
return ""
position_string = f"{self.position.x:.0f};{self.position.y:.0f}"
size_string = f"{self.size.width:.0f};{self.size.height:.0f}"
enabled_string = str(self.enabled)
# Ensure role is a string
role_string = ""
if self.role is not None:
role_string = str(self.role[0]) if isinstance(self.role, tuple) else str(self.role)
return self.hash_from_string(position_string + size_string + enabled_string + role_string)
def hash_from_string(self, string):
if string is None or string == "":
return ""
from hashlib import md5
return md5(string.encode()).hexdigest()
def children_content_hash(self, children):
if len(children) == 0:
return ""
all_content_hashes = []
all_hashes = []
for child in children:
all_content_hashes.append(child.content_identifier)
all_hashes.append(child.identifier)
all_content_hashes.sort()
if len(all_content_hashes) == 0:
return ""
content_hash = self.hash_from_string("".join(all_content_hashes))
content_structure_hash = self.hash_from_string("".join(all_hashes))
return self.hash_from_string(content_hash.join(content_structure_hash))
def to_dict(self):
def children_to_dict(children):
result = []
for child in children:
result.append(child.to_dict())
return result
value = self.value
if isinstance(value, UIElement):
value = json.dumps(value.to_dict(), indent=4)
elif isinstance(value, AppKit.NSDate): # type: ignore
value = str(value)
if self.absolute_position is not None:
absolute_position = f"{self.absolute_position.x:.2f};{self.absolute_position.y:.2f}"
else:
absolute_position = ""
if self.position is not None:
position = f"{self.position.x:.2f};{self.position.y:.2f}"
else:
position = ""
if self.size is not None:
size = f"{self.size.width:.0f};{self.size.height:.0f}"
else:
size = ""
return {
"id": self.identifier,
"name": self.name,
"role": self.role,
"description": self.description,
"role_description": self.role_description,
"value": value,
"absolute_position": absolute_position,
"position": position,
"size": size,
"enabled": self.enabled,
"bbox": self.bbox,
"visible_bbox": self.visible_bbox,
"children": children_to_dict(self.children),
}
import Quartz
from AppKit import NSWorkspace, NSRunningApplication
from pathlib import Path
def get_all_windows_zorder():
window_list = Quartz.CGWindowListCopyWindowInfo(
Quartz.kCGWindowListOptionOnScreenOnly,
Quartz.kCGNullWindowID
)
z_order = {window['kCGWindowNumber']: z_index for z_index, window in enumerate(window_list[::-1])}
window_list_all = Quartz.CGWindowListCopyWindowInfo(
Quartz.kCGWindowListOptionAll,
Quartz.kCGNullWindowID
)
windows = []
for window in window_list_all:
window_id = window.get('kCGWindowNumber', 0)
window_name = window.get('kCGWindowName', '')
window_pid = window.get('kCGWindowOwnerPID', 0)
window_bounds = window.get('kCGWindowBounds', {})
window_owner = window.get('kCGWindowOwnerName', '')
window_is_on_screen = window.get('kCGWindowIsOnscreen', False)
layer = window.get('kCGWindowLayer', 0)
opacity = window.get('kCGWindowAlpha', 1.0)
z_index = z_order.get(window_id, -1)
if window_name == "Dock" and window_owner == "Dock":
role = "dock"
elif window_name == "Menubar" and window_owner == "Window Server":
role = "menubar"
elif window_owner in ["Window Server", "Dock"]:
role = "desktop"
else:
role = "app"
if window_bounds:
windows.append({
"id": window_id,
"name": window_name or "Unnamed Window",
"pid": window_pid,
"owner": window_owner,
"role": role,
"is_on_screen": window_is_on_screen,
"bounds": {
"x": window_bounds.get('X', 0),
"y": window_bounds.get('Y', 0),
"width": window_bounds.get('Width', 0),
"height": window_bounds.get('Height', 0)
},
"layer": layer,
"z_index": z_index,
"opacity": opacity
})
windows = sorted(windows, key=lambda x: x["z_index"])
return windows
def get_app_info(app):
return {
"name": app.localizedName(),
"bundle_id": app.bundleIdentifier(),
"pid": app.processIdentifier(),
"active": app.isActive(),
"hidden": app.isHidden(),
"terminated": app.isTerminated(),
}
def get_menubar_items(active_app_pid=None):
menubar_items = []
if active_app_pid is None:
frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()
if frontmost_app:
active_app_pid = frontmost_app.processIdentifier()
else:
return menubar_items
app_element = AXUIElementCreateApplication(active_app_pid)
if app_element is None:
return menubar_items
menubar = element_attribute(app_element, kAXMenuBarAttribute)
if menubar is None:
return menubar_items
children = element_attribute(menubar, kAXChildrenAttribute)
if children is None:
return menubar_items
for i, item in enumerate(children):
title = element_attribute(item, kAXTitleAttribute) or "Untitled"
bounds = {"x": 0, "y": 0, "width": 0, "height": 0}
position_value = element_attribute(item, kAXPositionAttribute)
if position_value:
position_value = element_value(position_value, kAXValueCGPointType)
bounds["x"] = getattr(position_value, 'x', 0)
bounds["y"] = getattr(position_value, 'y', 0)
size_value = element_attribute(item, kAXSizeAttribute)
if size_value:
size_value = element_value(size_value, kAXValueCGSizeType)
bounds["width"] = getattr(size_value, 'width', 0)
bounds["height"] = getattr(size_value, 'height', 0)
menubar_items.append({
"title": title,
"bounds": bounds,
"index": i,
"app_pid": active_app_pid
})
return menubar_items
def get_dock_items():
dock_items = []
dock_pid = None
running_apps = NSWorkspace.sharedWorkspace().runningApplications()
for app in running_apps:
if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock":
dock_pid = app.processIdentifier()
break
if dock_pid is None:
return dock_items
dock_element = AXUIElementCreateApplication(dock_pid)
if dock_element is None:
return dock_items
dock_list = element_attribute(dock_element, kAXChildrenAttribute)
if dock_list is None or len(dock_list) == 0:
return dock_items
dock_app_list = None
for child in dock_list:
role = element_attribute(child, kAXRoleAttribute)
if role == "AXList":
dock_app_list = child
break
if dock_app_list is None:
return dock_items
items = element_attribute(dock_app_list, kAXChildrenAttribute)
if items is None:
return dock_items
for i, item in enumerate(items):
title = element_attribute(item, kAXTitleAttribute) or "Untitled"
description = element_attribute(item, kAXDescriptionAttribute) or ""
role = element_attribute(item, kAXRoleAttribute) or ""
subrole = element_attribute(item, "AXSubrole") or ""
bounds = {"x": 0, "y": 0, "width": 0, "height": 0}
position_value = element_attribute(item, kAXPositionAttribute)
if position_value:
position_value = element_value(position_value, kAXValueCGPointType)
bounds["x"] = getattr(position_value, 'x', 0)
bounds["y"] = getattr(position_value, 'y', 0)
size_value = element_attribute(item, kAXSizeAttribute)
if size_value:
size_value = element_value(size_value, kAXValueCGSizeType)
bounds["width"] = getattr(size_value, 'width', 0)
bounds["height"] = getattr(size_value, 'height', 0)
item_type = "unknown"
if subrole == "AXApplicationDockItem":
item_type = "application"
elif subrole == "AXFolderDockItem":
item_type = "folder"
elif subrole == "AXDocumentDockItem":
item_type = "document"
elif subrole == "AXSeparatorDockItem" or role == "AXSeparator":
item_type = "separator"
elif "trash" in title.lower():
item_type = "trash"
dock_items.append({
"title": title,
"description": description,
"bounds": bounds,
"index": i,
"type": item_type,
"role": role,
"subrole": subrole
})
return dock_items
class MacOSAccessibilityHandler(BaseAccessibilityHandler):
def get_desktop_state(self):
windows = [w for w in get_all_windows_zorder() if w.get("is_on_screen")]
running_apps = self.get_running_apps()
applications = []
pid_to_window_ids = {}
# Build a mapping: pid -> list of AX window trees
pid_to_ax_trees = {}
for app in running_apps:
pid = app.processIdentifier()
try:
app_elem = AXUIElementCreateApplication(pid)
err, app_windows = AXUIElementCopyAttributeValue(app_elem, kAXWindowsAttribute, None)
trees = []
if err == kAXErrorSuccess and app_windows:
for ax_win in app_windows:
try:
trees.append(UIElement(ax_win).to_dict())
except Exception as e:
trees.append({"error": str(e)})
pid_to_ax_trees[pid] = trees
except Exception as e:
pid_to_ax_trees[pid] = [{"error": str(e)}]
# Attach children by pid and index (order)
pid_to_idx = {}
for win in windows:
pid = win["pid"]
idx = pid_to_idx.get(pid, 0)
ax_trees = pid_to_ax_trees.get(pid, [])
win["children"] = ax_trees[idx]["children"] if idx < len(ax_trees) and "children" in ax_trees[idx] else []
pid_to_idx[pid] = idx + 1
pid_to_window_ids.setdefault(pid, []).append(win["id"])
for app in running_apps:
info = get_app_info(app)
app_pid = info["pid"]
applications.append({
"info": info,
"windows": pid_to_window_ids.get(app_pid, [])
})
menubar_items = get_menubar_items()
dock_items = get_dock_items()
return {
"applications": applications,
"windows": windows,
"menubar_items": menubar_items,
"dock_items": dock_items
}
def get_application_windows(self, pid: int):
"""Get all windows for a specific application."""
try:
app = AXUIElementCreateApplication(pid)
err, windows = AXUIElementCopyAttributeValue(app, kAXWindowsAttribute, None)
if err == kAXErrorSuccess and windows:
if isinstance(windows, Foundation.NSArray): # type: ignore
return windows
return []
except:
return []
def get_all_windows(self):
"""Get all visible windows in the system."""
try:
windows = []
running_apps = self.get_running_apps()
for app in running_apps:
try:
app_name = app.localizedName()
pid = app.processIdentifier()
# Skip system processes and background apps
if not app.activationPolicy() == 0: # NSApplicationActivationPolicyRegular
continue
# Get application windows
app_windows = self.get_application_windows(pid)
windows.append(
{
"app_name": app_name,
"pid": pid,
"frontmost": app.isActive(),
"has_windows": len(app_windows) > 0,
"windows": app_windows,
}
)
except:
continue
return windows
except:
return []
def get_running_apps(self):
# From NSWorkspace.runningApplications docs: https://developer.apple.com/documentation/appkit/nsworkspace/runningapplications
# "Similar to the NSRunningApplication classs properties, this property will only change when the main run loop runs in a common mode"
# So we need to run the main run loop to get the latest running applications
Foundation.CFRunLoopRunInMode(Foundation.kCFRunLoopDefaultMode, 0.1, False) # type: ignore
return NSWorkspace.sharedWorkspace().runningApplications()
def get_ax_attribute(self, element, attribute):
return element_attribute(element, attribute)
def serialize_node(self, element):
# Create a serializable dictionary representation of an accessibility element
result = {}
# Get basic attributes
result["role"] = self.get_ax_attribute(element, kAXRoleAttribute)
result["title"] = self.get_ax_attribute(element, kAXTitleAttribute)
result["value"] = self.get_ax_attribute(element, kAXValueAttribute)
# Get position and size if available
position = self.get_ax_attribute(element, kAXPositionAttribute)
if position:
try:
position_dict = {"x": position[0], "y": position[1]}
result["position"] = position_dict
except (IndexError, TypeError):
pass
size = self.get_ax_attribute(element, kAXSizeAttribute)
if size:
try:
size_dict = {"width": size[0], "height": size[1]}
result["size"] = size_dict
except (IndexError, TypeError):
pass
return result
async def get_accessibility_tree(self) -> Dict[str, Any]:
try:
desktop_state = self.get_desktop_state()
return {
"success": True,
**desktop_state
}
except Exception as e:
return {"success": False, "error": str(e)}
async def find_element(
self, role: Optional[str] = None, title: Optional[str] = None, value: Optional[str] = None
) -> Dict[str, Any]:
try:
system = AXUIElementCreateSystemWide()
def match_element(element):
if role and self.get_ax_attribute(element, kAXRoleAttribute) != role:
return False
if title and self.get_ax_attribute(element, kAXTitleAttribute) != title:
return False
if value and str(self.get_ax_attribute(element, kAXValueAttribute)) != value:
return False
return True
def search_tree(element):
if match_element(element):
return self.serialize_node(element)
children = self.get_ax_attribute(element, kAXChildrenAttribute)
if children:
for child in children:
result = search_tree(child)
if result:
return result
return None
element = search_tree(system)
return {"success": True, "element": element}
except Exception as e:
return {"success": False, "error": str(e)}
class MacOSAutomationHandler(BaseAutomationHandler):
# Mouse Actions
mouse = MouseController()
keyboard = KeyboardController()
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
try:
if x is not None and y is not None:
self.mouse.position = (x, y)
self.mouse.press(Button.left if button == "left" else Button.right if button == "right" else Button.middle)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
try:
if x is not None and y is not None:
self.mouse.position = (x, y)
self.mouse.release(Button.left if button == "left" else Button.right if button == "right" else Button.middle)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
try:
if x is not None and y is not None:
self.mouse.position = (x, y)
self.mouse.click(Button.left, 1)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
try:
if x is not None and y is not None:
self.mouse.position = (x, y)
self.mouse.click(Button.right, 1)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def double_click(
self, x: Optional[int] = None, y: Optional[int] = None
) -> Dict[str, Any]:
try:
if x is not None and y is not None:
self.mouse.position = (x, y)
self.mouse.click(Button.left, 2)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
try:
self.mouse.position = (x, y)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def drag_to(
self, x: int, y: int, button: str = "left", duration: float = 0.5
) -> Dict[str, Any]:
try:
btn = Button.left if button == "left" else Button.right if button == "right" else Button.middle
# Press
self.mouse.press(btn)
# Move with sleep to simulate drag duration
start = self.mouse.position
steps = 20
start_x, start_y = start
dx = (x - start_x) / steps
dy = (y - start_y) / steps
for i in range(steps):
self.mouse.position = (int(start_x + dx * (i + 1)), int(start_y + dy * (i + 1)))
time.sleep(duration / steps)
# Release
self.mouse.release(btn)
return {"success": True}
except Exception as e:
try:
self.mouse.release(btn)
except:
pass
return {"success": False, "error": str(e)}
async def drag(
self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5
) -> Dict[str, Any]:
try:
if not path or len(path) < 2:
return {"success": False, "error": "Path must contain at least 2 points"}
btn = Button.left if button == "left" else Button.right if button == "right" else Button.middle
# Move to the first point
self.mouse.position = path[0]
self.mouse.press(btn)
step_duration = duration / (len(path) - 1) if len(path) > 1 else duration
for x, y in path[1:]:
self.mouse.position = (x, y)
time.sleep(step_duration)
self.mouse.release(btn)
return {"success": True}
except Exception as e:
try:
self.mouse.release(btn)
except:
pass
return {"success": False, "error": str(e)}
# Keyboard Actions
async def key_down(self, key: str) -> Dict[str, Any]:
try:
# use pyautogui for their key names
pyautogui.keyDown(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def key_up(self, key: str) -> Dict[str, Any]:
try:
# use pyautogui for their key names
pyautogui.keyUp(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def type_text(self, text: str) -> Dict[str, Any]:
try:
# use pynput for Unicode support
self.keyboard.type(text)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def press_key(self, key: str) -> Dict[str, Any]:
try:
# use pyautogui for their key names
pyautogui.press(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def hotkey(self, keys: List[str]) -> Dict[str, Any]:
try:
# use pyautogui for their key names
pyautogui.hotkey(*keys)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Scrolling Actions
async def scroll(self, x: int, y: int) -> Dict[str, Any]:
try:
self.mouse.scroll(x, y)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
try:
self.mouse.scroll(0, -clicks)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
try:
self.mouse.scroll(0, clicks)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Screen Actions
async def screenshot(self) -> Dict[str, Any]:
try:
from PIL import Image
screenshot = pyautogui.screenshot()
if not isinstance(screenshot, Image.Image):
return {"success": False, "error": "Failed to capture screenshot"}
buffered = BytesIO()
screenshot.save(buffered, format="PNG", optimize=True)
buffered.seek(0)
image_data = base64.b64encode(buffered.getvalue()).decode()
return {"success": True, "image_data": image_data}
except Exception as e:
return {"success": False, "error": f"Screenshot error: {str(e)}"}
async def get_screen_size(self) -> Dict[str, Any]:
try:
size = pyautogui.size()
return {"success": True, "size": {"width": size.width, "height": size.height}}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_cursor_position(self) -> Dict[str, Any]:
try:
x, y = self.mouse.position
return {"success": True, "position": {"x": x, "y": y}}
except Exception as e:
return {"success": False, "error": str(e)}
# Clipboard Actions
async def copy_to_clipboard(self) -> Dict[str, Any]:
try:
import pyperclip
content = pyperclip.paste()
return {"success": True, "content": content}
except Exception as e:
return {"success": False, "error": str(e)}
async def set_clipboard(self, text: str) -> Dict[str, Any]:
try:
import pyperclip
pyperclip.copy(text)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def run_command(self, command: str) -> Dict[str, Any]:
"""Run a shell command and return its output."""
try:
import subprocess
process = subprocess.run(command, shell=True, capture_output=True, text=True)
return {"success": True, "stdout": process.stdout, "stderr": process.stderr}
except Exception as e:
return {"success": False, "error": str(e)}

View File

@@ -0,0 +1,405 @@
"""
Windows implementation of automation and accessibility handlers.
This implementation uses pyautogui for GUI automation and Windows-specific APIs
for accessibility and system operations.
"""
from typing import Dict, Any, List, Tuple, Optional
import logging
import subprocess
import base64
import os
from io import BytesIO
# Configure logger
logger = logging.getLogger(__name__)
# Try to import pyautogui
try:
import pyautogui
logger.info("pyautogui successfully imported, GUI automation available")
except Exception as e:
logger.error(f"pyautogui import failed: {str(e)}. GUI operations will not work.")
pyautogui = None
# Try to import Windows-specific modules
try:
import win32gui
import win32con
import win32api
logger.info("Windows API modules successfully imported")
WINDOWS_API_AVAILABLE = True
except Exception as e:
logger.error(f"Windows API modules import failed: {str(e)}. Some Windows-specific features will be unavailable.")
WINDOWS_API_AVAILABLE = False
from .base import BaseAccessibilityHandler, BaseAutomationHandler
class WindowsAccessibilityHandler(BaseAccessibilityHandler):
"""Windows implementation of accessibility handler."""
async def get_accessibility_tree(self) -> Dict[str, Any]:
"""Get the accessibility tree of the current window."""
if not WINDOWS_API_AVAILABLE:
return {"success": False, "error": "Windows API not available"}
try:
# Get the foreground window
hwnd = win32gui.GetForegroundWindow()
if not hwnd:
return {"success": False, "error": "No foreground window found"}
# Get window information
window_text = win32gui.GetWindowText(hwnd)
rect = win32gui.GetWindowRect(hwnd)
tree = {
"role": "Window",
"title": window_text,
"position": {"x": rect[0], "y": rect[1]},
"size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]},
"children": []
}
# Enumerate child windows
def enum_child_proc(hwnd_child, children_list):
try:
child_text = win32gui.GetWindowText(hwnd_child)
child_rect = win32gui.GetWindowRect(hwnd_child)
child_class = win32gui.GetClassName(hwnd_child)
child_info = {
"role": child_class,
"title": child_text,
"position": {"x": child_rect[0], "y": child_rect[1]},
"size": {"width": child_rect[2] - child_rect[0], "height": child_rect[3] - child_rect[1]},
"children": []
}
children_list.append(child_info)
except Exception as e:
logger.debug(f"Error getting child window info: {e}")
return True
win32gui.EnumChildWindows(hwnd, enum_child_proc, tree["children"])
return {"success": True, "tree": tree}
except Exception as e:
logger.error(f"Error getting accessibility tree: {e}")
return {"success": False, "error": str(e)}
async def find_element(self, role: Optional[str] = None,
title: Optional[str] = None,
value: Optional[str] = None) -> Dict[str, Any]:
"""Find an element in the accessibility tree by criteria."""
if not WINDOWS_API_AVAILABLE:
return {"success": False, "error": "Windows API not available"}
try:
# Find window by title if specified
if title:
hwnd = win32gui.FindWindow(None, title)
if hwnd:
rect = win32gui.GetWindowRect(hwnd)
return {
"success": True,
"element": {
"role": "Window",
"title": title,
"position": {"x": rect[0], "y": rect[1]},
"size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]}
}
}
# Find window by class name if role is specified
if role:
hwnd = win32gui.FindWindow(role, None)
if hwnd:
window_text = win32gui.GetWindowText(hwnd)
rect = win32gui.GetWindowRect(hwnd)
return {
"success": True,
"element": {
"role": role,
"title": window_text,
"position": {"x": rect[0], "y": rect[1]},
"size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]}
}
}
return {"success": False, "error": "Element not found"}
except Exception as e:
logger.error(f"Error finding element: {e}")
return {"success": False, "error": str(e)}
class WindowsAutomationHandler(BaseAutomationHandler):
"""Windows implementation of automation handler using pyautogui and Windows APIs."""
# Mouse Actions
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.mouseDown(button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.mouseUp(button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.moveTo(x, y)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.click()
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.rightClick()
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def double_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
if x is not None and y is not None:
pyautogui.moveTo(x, y)
pyautogui.doubleClick(interval=0.1)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.dragTo(x, y, duration=duration, button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def drag(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
if not path:
return {"success": False, "error": "Path is empty"}
# Move to first position
pyautogui.moveTo(*path[0])
# Drag through all positions
for x, y in path[1:]:
pyautogui.dragTo(x, y, duration=duration/len(path), button=button)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Keyboard Actions
async def key_down(self, key: str) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.keyDown(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def key_up(self, key: str) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.keyUp(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def type_text(self, text: str) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.write(text)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def press_key(self, key: str) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.press(key)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def hotkey(self, keys: str) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.hotkey(*keys)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Scrolling Actions
async def scroll(self, x: int, y: int) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
# pyautogui.scroll() only takes one parameter (vertical scroll)
pyautogui.scroll(y)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.scroll(-clicks)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
pyautogui.scroll(clicks)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Screen Actions
async def screenshot(self) -> Dict[str, Any]:
if not pyautogui:
return {"success": False, "error": "pyautogui not available"}
try:
from PIL import Image
screenshot = pyautogui.screenshot()
if not isinstance(screenshot, Image.Image):
return {"success": False, "error": "Failed to capture screenshot"}
buffered = BytesIO()
screenshot.save(buffered, format="PNG", optimize=True)
buffered.seek(0)
image_data = base64.b64encode(buffered.getvalue()).decode()
return {"success": True, "image_data": image_data}
except Exception as e:
return {"success": False, "error": f"Screenshot error: {str(e)}"}
async def get_screen_size(self) -> Dict[str, Any]:
try:
if pyautogui:
size = pyautogui.size()
return {"success": True, "size": {"width": size.width, "height": size.height}}
elif WINDOWS_API_AVAILABLE:
# Fallback to Windows API
width = win32api.GetSystemMetrics(win32con.SM_CXSCREEN)
height = win32api.GetSystemMetrics(win32con.SM_CYSCREEN)
return {"success": True, "size": {"width": width, "height": height}}
else:
return {"success": False, "error": "No screen size detection method available"}
except Exception as e:
return {"success": False, "error": str(e)}
async def get_cursor_position(self) -> Dict[str, Any]:
try:
if pyautogui:
pos = pyautogui.position()
return {"success": True, "position": {"x": pos.x, "y": pos.y}}
elif WINDOWS_API_AVAILABLE:
# Fallback to Windows API
pos = win32gui.GetCursorPos()
return {"success": True, "position": {"x": pos[0], "y": pos[1]}}
else:
return {"success": False, "error": "No cursor position detection method available"}
except Exception as e:
return {"success": False, "error": str(e)}
# Clipboard Actions
async def copy_to_clipboard(self) -> Dict[str, Any]:
try:
import pyperclip
content = pyperclip.paste()
return {"success": True, "content": content}
except Exception as e:
return {"success": False, "error": str(e)}
async def set_clipboard(self, text: str) -> Dict[str, Any]:
try:
import pyperclip
pyperclip.copy(text)
return {"success": True}
except Exception as e:
return {"success": False, "error": str(e)}
# Command Execution
async def run_command(self, command: str) -> Dict[str, Any]:
try:
# Use cmd.exe for Windows commands
process = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
)
return {
"success": True,
"stdout": process.stdout,
"stderr": process.stderr,
"return_code": process.returncode
}
except Exception as e:
return {"success": False, "error": str(e)}

View File

@@ -0,0 +1,254 @@
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List, Dict, Any
import uvicorn
import logging
import asyncio
import json
import traceback
from contextlib import redirect_stdout, redirect_stderr
from io import StringIO
from .handlers.factory import HandlerFactory
import os
import aiohttp
# Set up logging with more detail
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Configure WebSocket with larger message size
WEBSOCKET_MAX_SIZE = 1024 * 1024 * 10 # 10MB limit
# Configure application with WebSocket settings
app = FastAPI(
title="Computer API",
description="API for the Computer project",
version="0.1.0",
websocket_max_size=WEBSOCKET_MAX_SIZE,
)
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
# Create OS-specific handlers
self.accessibility_handler, self.automation_handler, self.diorama_handler, self.file_handler = HandlerFactory.create_handlers()
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
manager = ConnectionManager()
@app.websocket("/ws", name="websocket_endpoint")
async def websocket_endpoint(websocket: WebSocket):
# WebSocket message size is configured at the app or endpoint level, not on the instance
await manager.connect(websocket)
# Check if CONTAINER_NAME is set (indicating cloud provider)
container_name = os.environ.get("CONTAINER_NAME")
# If cloud provider, perform authentication handshake
if container_name:
try:
logger.info(f"Cloud provider detected. CONTAINER_NAME: {container_name}. Waiting for authentication...")
# Wait for authentication message
auth_data = await websocket.receive_json()
# Validate auth message format
if auth_data.get("command") != "authenticate":
await websocket.send_json({
"success": False,
"error": "First message must be authentication"
})
await websocket.close()
manager.disconnect(websocket)
return
# Extract credentials
client_api_key = auth_data.get("params", {}).get("api_key")
client_container_name = auth_data.get("params", {}).get("container_name")
# Layer 1: VM Identity Verification
if client_container_name != container_name:
logger.warning(f"VM name mismatch. Expected: {container_name}, Got: {client_container_name}")
await websocket.send_json({
"success": False,
"error": "VM name mismatch"
})
await websocket.close()
manager.disconnect(websocket)
return
# Layer 2: API Key Validation with TryCUA API
if not client_api_key:
await websocket.send_json({
"success": False,
"error": "API key required"
})
await websocket.close()
manager.disconnect(websocket)
return
# Validate with TryCUA API
try:
async with aiohttp.ClientSession() as session:
headers = {
"Authorization": f"Bearer {client_api_key}"
}
async with session.get(
f"https://www.trycua.com/api/vm/auth?container_name={container_name}",
headers=headers,
) as resp:
if resp.status != 200:
error_msg = await resp.text()
logger.warning(f"API validation failed: {error_msg}")
await websocket.send_json({
"success": False,
"error": "Authentication failed"
})
await websocket.close()
manager.disconnect(websocket)
return
# If we get a 200 response with VNC URL, the VM exists and user has access
vnc_url = (await resp.text()).strip()
if not vnc_url:
logger.warning(f"No VNC URL returned for VM: {container_name}")
await websocket.send_json({
"success": False,
"error": "VM not found"
})
await websocket.close()
manager.disconnect(websocket)
return
logger.info(f"Authentication successful for VM: {container_name}")
await websocket.send_json({
"success": True,
"message": "Authenticated"
})
except Exception as e:
logger.error(f"Error validating with TryCUA API: {e}")
await websocket.send_json({
"success": False,
"error": "Authentication service unavailable"
})
await websocket.close()
manager.disconnect(websocket)
return
except Exception as e:
logger.error(f"Authentication error: {e}")
await websocket.send_json({
"success": False,
"error": "Authentication failed"
})
await websocket.close()
manager.disconnect(websocket)
return
# Map commands to appropriate handler methods
handlers = {
# App-Use commands
"diorama_cmd": manager.diorama_handler.diorama_cmd,
# Accessibility commands
"get_accessibility_tree": manager.accessibility_handler.get_accessibility_tree,
"find_element": manager.accessibility_handler.find_element,
# Shell commands
"run_command": manager.automation_handler.run_command,
# File system commands
"file_exists": manager.file_handler.file_exists,
"directory_exists": manager.file_handler.directory_exists,
"list_dir": manager.file_handler.list_dir,
"read_text": manager.file_handler.read_text,
"write_text": manager.file_handler.write_text,
"read_bytes": manager.file_handler.read_bytes,
"write_bytes": manager.file_handler.write_bytes,
"get_file_size": manager.file_handler.get_file_size,
"delete_file": manager.file_handler.delete_file,
"create_dir": manager.file_handler.create_dir,
"delete_dir": manager.file_handler.delete_dir,
# Mouse commands
"mouse_down": manager.automation_handler.mouse_down,
"mouse_up": manager.automation_handler.mouse_up,
"left_click": manager.automation_handler.left_click,
"right_click": manager.automation_handler.right_click,
"double_click": manager.automation_handler.double_click,
"move_cursor": manager.automation_handler.move_cursor,
"drag_to": manager.automation_handler.drag_to,
"drag": manager.automation_handler.drag,
# Keyboard commands
"key_down": manager.automation_handler.key_down,
"key_up": manager.automation_handler.key_up,
"type_text": manager.automation_handler.type_text,
"press_key": manager.automation_handler.press_key,
"hotkey": manager.automation_handler.hotkey,
# Scrolling actions
"scroll": manager.automation_handler.scroll,
"scroll_down": manager.automation_handler.scroll_down,
"scroll_up": manager.automation_handler.scroll_up,
# Screen actions
"screenshot": manager.automation_handler.screenshot,
"get_cursor_position": manager.automation_handler.get_cursor_position,
"get_screen_size": manager.automation_handler.get_screen_size,
# Clipboard actions
"copy_to_clipboard": manager.automation_handler.copy_to_clipboard,
"set_clipboard": manager.automation_handler.set_clipboard,
}
try:
while True:
try:
data = await websocket.receive_json()
command = data.get("command")
params = data.get("params", {})
if command not in handlers:
await websocket.send_json(
{"success": False, "error": f"Unknown command: {command}"}
)
continue
try:
result = await handlers[command](**params)
await websocket.send_json({"success": True, **result})
except Exception as cmd_error:
logger.error(f"Error executing command {command}: {str(cmd_error)}")
logger.error(traceback.format_exc())
await websocket.send_json({"success": False, "error": str(cmd_error)})
except WebSocketDisconnect:
raise
except json.JSONDecodeError as json_err:
logger.error(f"JSON decode error: {str(json_err)}")
await websocket.send_json(
{"success": False, "error": f"Invalid JSON: {str(json_err)}"}
)
except Exception as loop_error:
logger.error(f"Error in message loop: {str(loop_error)}")
logger.error(traceback.format_exc())
await websocket.send_json({"success": False, "error": str(loop_error)})
except WebSocketDisconnect:
logger.info("Client disconnected")
manager.disconnect(websocket)
except Exception as e:
logger.error(f"Fatal error in websocket connection: {str(e)}")
logger.error(traceback.format_exc())
try:
await websocket.close()
except:
pass
manager.disconnect(websocket)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)

View File

@@ -0,0 +1,111 @@
"""
Server interface for Computer API.
Provides a clean API for starting and stopping the server.
"""
import asyncio
import logging
import uvicorn
from typing import Optional
from fastapi import FastAPI
from .main import app as fastapi_app
logger = logging.getLogger(__name__)
class Server:
"""
Server interface for Computer API.
Usage:
from computer_api import Server
# Synchronous usage
server = Server()
server.start() # Blocks until server is stopped
# Asynchronous usage
server = Server()
await server.start_async() # Starts server in background
# Do other things
await server.stop() # Stop the server
"""
def __init__(self, host: str = "0.0.0.0", port: int = 8000, log_level: str = "info",
ssl_keyfile: Optional[str] = None, ssl_certfile: Optional[str] = None):
"""
Initialize the server.
Args:
host: Host to bind the server to
port: Port to bind the server to
log_level: Logging level (debug, info, warning, error, critical)
ssl_keyfile: Path to SSL private key file (for HTTPS)
ssl_certfile: Path to SSL certificate file (for HTTPS)
"""
self.host = host
self.port = port
self.log_level = log_level
self.ssl_keyfile = ssl_keyfile
self.ssl_certfile = ssl_certfile
self.app = fastapi_app
self._server_task: Optional[asyncio.Task] = None
self._should_exit = asyncio.Event()
def start(self) -> None:
"""
Start the server synchronously. This will block until the server is stopped.
"""
uvicorn.run(
self.app,
host=self.host,
port=self.port,
log_level=self.log_level,
ssl_keyfile=self.ssl_keyfile,
ssl_certfile=self.ssl_certfile
)
async def start_async(self) -> None:
"""
Start the server asynchronously. This will return immediately and the server
will run in the background.
"""
server_config = uvicorn.Config(
self.app,
host=self.host,
port=self.port,
log_level=self.log_level,
ssl_keyfile=self.ssl_keyfile,
ssl_certfile=self.ssl_certfile
)
self._should_exit.clear()
server = uvicorn.Server(server_config)
# Create a task to run the server
self._server_task = asyncio.create_task(server.serve())
# Wait a short time to ensure the server starts
await asyncio.sleep(0.5)
protocol = "https" if self.ssl_certfile else "http"
logger.info(f"Server started at {protocol}://{self.host}:{self.port}")
async def stop(self) -> None:
"""
Stop the server if it's running asynchronously.
"""
if self._server_task and not self._server_task.done():
# Signal the server to exit
self._should_exit.set()
# Cancel the server task
self._server_task.cancel()
try:
await self._server_task
except asyncio.CancelledError:
logger.info("Server stopped")
self._server_task = None