mirror of
https://github.com/trycua/computer.git
synced 2026-01-05 21:09:58 -06:00
Add Cua Preview
This commit is contained in:
20
libs/computer-server/computer_server/__init__.py
Normal file
20
libs/computer-server/computer_server/__init__.py
Normal file
@@ -0,0 +1,20 @@
|
||||
"""
|
||||
Computer API package.
|
||||
Provides a server interface for the Computer API.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
__version__: str = "0.1.0"
|
||||
|
||||
# Explicitly export Server for static type checkers
|
||||
from .server import Server as Server # noqa: F401
|
||||
|
||||
__all__ = ["Server", "run_cli"]
|
||||
|
||||
|
||||
def run_cli() -> None:
|
||||
"""Entry point for CLI"""
|
||||
from .cli import main
|
||||
|
||||
main()
|
||||
59
libs/computer-server/computer_server/cli.py
Normal file
59
libs/computer-server/computer_server/cli.py
Normal file
@@ -0,0 +1,59 @@
|
||||
"""
|
||||
Command-line interface for the Computer API server.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import sys
|
||||
from typing import List, Optional
|
||||
|
||||
from .server import Server
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def parse_args(args: Optional[List[str]] = None) -> argparse.Namespace:
|
||||
"""Parse command-line arguments."""
|
||||
parser = argparse.ArgumentParser(description="Start the Computer API server")
|
||||
parser.add_argument(
|
||||
"--host", default="0.0.0.0", help="Host to bind the server to (default: 0.0.0.0)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--port", type=int, default=8000, help="Port to bind the server to (default: 8000)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--log-level",
|
||||
choices=["debug", "info", "warning", "error", "critical"],
|
||||
default="info",
|
||||
help="Logging level (default: info)",
|
||||
)
|
||||
|
||||
return parser.parse_args(args)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Main entry point for the CLI."""
|
||||
args = parse_args()
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=getattr(logging, args.log_level.upper()),
|
||||
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
||||
)
|
||||
|
||||
# Create and start the server
|
||||
logger.info(f"Starting CUA Computer API server on {args.host}:{args.port}...")
|
||||
server = Server(host=args.host, port=args.port, log_level=args.log_level)
|
||||
|
||||
try:
|
||||
server.start()
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Server stopped by user")
|
||||
sys.exit(0)
|
||||
except Exception as e:
|
||||
logger.error(f"Error starting server: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
120
libs/computer-server/computer_server/handlers/base.py
Normal file
120
libs/computer-server/computer_server/handlers/base.py
Normal file
@@ -0,0 +1,120 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Optional, Dict, Any
|
||||
|
||||
class BaseAccessibilityHandler(ABC):
|
||||
"""Abstract base class for OS-specific accessibility handlers."""
|
||||
|
||||
@abstractmethod
|
||||
async def get_accessibility_tree(self) -> Dict[str, Any]:
|
||||
"""Get the accessibility tree of the current window."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def find_element(self, role: Optional[str] = None,
|
||||
title: Optional[str] = None,
|
||||
value: Optional[str] = None) -> Dict[str, Any]:
|
||||
"""Find an element in the accessibility tree by criteria."""
|
||||
pass
|
||||
|
||||
class BaseAutomationHandler(ABC):
|
||||
"""Abstract base class for OS-specific automation handlers.
|
||||
|
||||
Categories:
|
||||
- Mouse Actions: Methods for mouse control
|
||||
- Keyboard Actions: Methods for keyboard input
|
||||
- Scrolling Actions: Methods for scrolling
|
||||
- Screen Actions: Methods for screen interaction
|
||||
- Clipboard Actions: Methods for clipboard operations
|
||||
"""
|
||||
|
||||
# Mouse Actions
|
||||
@abstractmethod
|
||||
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||||
"""Perform a left click at the current or specified position."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||||
"""Perform a right click at the current or specified position."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def double_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||||
"""Perform a double click at the current or specified position."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
|
||||
"""Move the cursor to the specified position."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
|
||||
"""Drag the cursor from current position to specified coordinates.
|
||||
|
||||
Args:
|
||||
x: The x coordinate to drag to
|
||||
y: The y coordinate to drag to
|
||||
button: The mouse button to use ('left', 'middle', 'right')
|
||||
duration: How long the drag should take in seconds
|
||||
"""
|
||||
pass
|
||||
|
||||
# Keyboard Actions
|
||||
@abstractmethod
|
||||
async def type_text(self, text: str) -> Dict[str, Any]:
|
||||
"""Type the specified text."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def press_key(self, key: str) -> Dict[str, Any]:
|
||||
"""Press the specified key."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def hotkey(self, *keys: str) -> Dict[str, Any]:
|
||||
"""Press a combination of keys together."""
|
||||
pass
|
||||
|
||||
# Scrolling Actions
|
||||
@abstractmethod
|
||||
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
|
||||
"""Scroll down by the specified number of clicks."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
|
||||
"""Scroll up by the specified number of clicks."""
|
||||
pass
|
||||
|
||||
# Screen Actions
|
||||
@abstractmethod
|
||||
async def screenshot(self) -> Dict[str, Any]:
|
||||
"""Take a screenshot and return base64 encoded image data."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def get_screen_size(self) -> Dict[str, Any]:
|
||||
"""Get the screen size of the VM."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def get_cursor_position(self) -> Dict[str, Any]:
|
||||
"""Get the current cursor position."""
|
||||
pass
|
||||
|
||||
# Clipboard Actions
|
||||
@abstractmethod
|
||||
async def copy_to_clipboard(self) -> Dict[str, Any]:
|
||||
"""Get the current clipboard content."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def set_clipboard(self, text: str) -> Dict[str, Any]:
|
||||
"""Set the clipboard content."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def run_command(self, command: str) -> Dict[str, Any]:
|
||||
"""Run a command and return the output."""
|
||||
pass
|
||||
49
libs/computer-server/computer_server/handlers/factory.py
Normal file
49
libs/computer-server/computer_server/handlers/factory.py
Normal file
@@ -0,0 +1,49 @@
|
||||
import platform
|
||||
import subprocess
|
||||
from typing import Tuple, Type
|
||||
from .base import BaseAccessibilityHandler, BaseAutomationHandler
|
||||
from .macos import MacOSAccessibilityHandler, MacOSAutomationHandler
|
||||
# from .linux import LinuxAccessibilityHandler, LinuxAutomationHandler
|
||||
|
||||
class HandlerFactory:
|
||||
"""Factory for creating OS-specific handlers."""
|
||||
|
||||
@staticmethod
|
||||
def _get_current_os() -> str:
|
||||
"""Determine the current OS.
|
||||
|
||||
Returns:
|
||||
str: The OS type ('darwin' for macOS or 'linux' for Linux)
|
||||
|
||||
Raises:
|
||||
RuntimeError: If unable to determine the current OS
|
||||
"""
|
||||
try:
|
||||
# Use uname -s to determine OS since this runs on the target machine
|
||||
result = subprocess.run(['uname', '-s'], capture_output=True, text=True)
|
||||
if result.returncode != 0:
|
||||
raise RuntimeError(f"uname command failed: {result.stderr}")
|
||||
return result.stdout.strip().lower()
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to determine current OS: {str(e)}")
|
||||
|
||||
@staticmethod
|
||||
def create_handlers() -> Tuple[BaseAccessibilityHandler, BaseAutomationHandler]:
|
||||
"""Create and return appropriate handlers for the current OS.
|
||||
|
||||
Returns:
|
||||
Tuple[BaseAccessibilityHandler, BaseAutomationHandler]: A tuple containing
|
||||
the appropriate accessibility and automation handlers for the current OS.
|
||||
|
||||
Raises:
|
||||
NotImplementedError: If the current OS is not supported
|
||||
RuntimeError: If unable to determine the current OS
|
||||
"""
|
||||
os_type = HandlerFactory._get_current_os()
|
||||
|
||||
if os_type == 'darwin':
|
||||
return MacOSAccessibilityHandler(), MacOSAutomationHandler()
|
||||
# elif os_type == 'linux':
|
||||
# return LinuxAccessibilityHandler(), LinuxAutomationHandler()
|
||||
else:
|
||||
raise NotImplementedError(f"OS '{os_type}' is not supported")
|
||||
654
libs/computer-server/computer_server/handlers/macos.py
Normal file
654
libs/computer-server/computer_server/handlers/macos.py
Normal file
@@ -0,0 +1,654 @@
|
||||
import pyautogui
|
||||
import base64
|
||||
from io import BytesIO
|
||||
from typing import Optional, Dict, Any, List
|
||||
from ctypes import byref, c_void_p, POINTER
|
||||
from AppKit import NSWorkspace # type: ignore
|
||||
import AppKit
|
||||
from Quartz.CoreGraphics import * # type: ignore
|
||||
from Quartz.CoreGraphics import CGPoint, CGSize # type: ignore
|
||||
import Foundation
|
||||
from ApplicationServices import (
|
||||
AXUIElementCreateSystemWide, # type: ignore
|
||||
AXUIElementCreateApplication, # type: ignore
|
||||
AXUIElementCopyAttributeValue, # type: ignore
|
||||
AXUIElementCopyAttributeValues, # type: ignore
|
||||
kAXFocusedWindowAttribute, # type: ignore
|
||||
kAXWindowsAttribute, # type: ignore
|
||||
kAXMainWindowAttribute, # type: ignore
|
||||
kAXChildrenAttribute, # type: ignore
|
||||
kAXRoleAttribute, # type: ignore
|
||||
kAXTitleAttribute, # type: ignore
|
||||
kAXValueAttribute, # type: ignore
|
||||
kAXDescriptionAttribute, # type: ignore
|
||||
kAXEnabledAttribute, # type: ignore
|
||||
kAXPositionAttribute, # type: ignore
|
||||
kAXSizeAttribute, # type: ignore
|
||||
kAXErrorSuccess, # type: ignore
|
||||
AXValueGetType, # type: ignore
|
||||
kAXValueCGSizeType, # type: ignore
|
||||
kAXValueCGPointType, # type: ignore
|
||||
kAXValueCFRangeType, # type: ignore
|
||||
AXUIElementGetTypeID, # type: ignore
|
||||
AXValueGetValue, # type: ignore
|
||||
kAXVisibleChildrenAttribute, # type: ignore
|
||||
kAXRoleDescriptionAttribute, # type: ignore
|
||||
)
|
||||
import objc
|
||||
import re
|
||||
import json
|
||||
import copy
|
||||
from .base import BaseAccessibilityHandler, BaseAutomationHandler
|
||||
|
||||
|
||||
def CFAttributeToPyObject(attrValue):
|
||||
def list_helper(list_value):
|
||||
list_builder = []
|
||||
for item in list_value:
|
||||
list_builder.append(CFAttributeToPyObject(item))
|
||||
return list_builder
|
||||
|
||||
def number_helper(number_value):
|
||||
success, int_value = Foundation.CFNumberGetValue( # type: ignore
|
||||
number_value, Foundation.kCFNumberIntType, None # type: ignore
|
||||
)
|
||||
if success:
|
||||
return int(int_value)
|
||||
|
||||
success, float_value = Foundation.CFNumberGetValue( # type: ignore
|
||||
number_value, Foundation.kCFNumberDoubleType, None # type: ignore
|
||||
)
|
||||
if success:
|
||||
return float(float_value)
|
||||
return None
|
||||
|
||||
def axuielement_helper(element_value):
|
||||
return element_value
|
||||
|
||||
cf_attr_type = Foundation.CFGetTypeID(attrValue) # type: ignore
|
||||
cf_type_mapping = {
|
||||
Foundation.CFStringGetTypeID(): str, # type: ignore
|
||||
Foundation.CFBooleanGetTypeID(): bool, # type: ignore
|
||||
Foundation.CFArrayGetTypeID(): list_helper, # type: ignore
|
||||
Foundation.CFNumberGetTypeID(): number_helper, # type: ignore
|
||||
AXUIElementGetTypeID(): axuielement_helper, # type: ignore
|
||||
}
|
||||
try:
|
||||
return cf_type_mapping[cf_attr_type](attrValue)
|
||||
except KeyError:
|
||||
# did not get a supported CF type. Move on to AX type
|
||||
pass
|
||||
|
||||
ax_attr_type = AXValueGetType(attrValue)
|
||||
ax_type_map = {
|
||||
kAXValueCGSizeType: Foundation.NSSizeFromString, # type: ignore
|
||||
kAXValueCGPointType: Foundation.NSPointFromString, # type: ignore
|
||||
kAXValueCFRangeType: Foundation.NSRangeFromString, # type: ignore
|
||||
}
|
||||
try:
|
||||
search_result = re.search("{.*}", attrValue.description())
|
||||
if search_result:
|
||||
extracted_str = search_result.group()
|
||||
return tuple(ax_type_map[ax_attr_type](extracted_str))
|
||||
return None
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
|
||||
def element_attribute(element, attribute):
|
||||
if attribute == kAXChildrenAttribute:
|
||||
err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None)
|
||||
if err == kAXErrorSuccess:
|
||||
if isinstance(value, Foundation.NSArray): # type: ignore
|
||||
return CFAttributeToPyObject(value)
|
||||
else:
|
||||
return value
|
||||
err, value = AXUIElementCopyAttributeValue(element, attribute, None)
|
||||
if err == kAXErrorSuccess:
|
||||
if isinstance(value, Foundation.NSArray): # type: ignore
|
||||
return CFAttributeToPyObject(value)
|
||||
else:
|
||||
return value
|
||||
return None
|
||||
|
||||
|
||||
def element_value(element, type):
|
||||
err, value = AXValueGetValue(element, type, None)
|
||||
if err == True:
|
||||
return value
|
||||
return None
|
||||
|
||||
|
||||
class UIElement:
|
||||
def __init__(self, element, offset_x=0, offset_y=0, max_depth=None, parents_visible_bbox=None):
|
||||
self.ax_element = element
|
||||
self.content_identifier = ""
|
||||
self.identifier = ""
|
||||
self.name = ""
|
||||
self.children = []
|
||||
self.description = ""
|
||||
self.role_description = ""
|
||||
self.value = None
|
||||
self.max_depth = max_depth
|
||||
|
||||
# Set role
|
||||
self.role = element_attribute(element, kAXRoleAttribute)
|
||||
if self.role is None:
|
||||
self.role = "No role"
|
||||
|
||||
# Set name
|
||||
self.name = element_attribute(element, kAXTitleAttribute)
|
||||
if self.name is not None:
|
||||
# Convert tuple to string if needed
|
||||
if isinstance(self.name, tuple):
|
||||
self.name = str(self.name[0]) if self.name else ""
|
||||
self.name = self.name.replace(" ", "_")
|
||||
|
||||
# Set enabled
|
||||
self.enabled = element_attribute(element, kAXEnabledAttribute)
|
||||
if self.enabled is None:
|
||||
self.enabled = False
|
||||
|
||||
# Set position and size
|
||||
position = element_attribute(element, kAXPositionAttribute)
|
||||
size = element_attribute(element, kAXSizeAttribute)
|
||||
start_position = element_value(position, kAXValueCGPointType)
|
||||
|
||||
if self.role == "AXWindow" and start_position is not None:
|
||||
offset_x = start_position.x
|
||||
offset_y = start_position.y
|
||||
|
||||
self.absolute_position = copy.copy(start_position)
|
||||
self.position = start_position
|
||||
if self.position is not None:
|
||||
self.position.x -= max(0, offset_x)
|
||||
self.position.y -= max(0, offset_y)
|
||||
self.size = element_value(size, kAXValueCGSizeType)
|
||||
|
||||
self._set_bboxes(parents_visible_bbox)
|
||||
|
||||
# Set component center
|
||||
if start_position is None or self.size is None:
|
||||
print("Position is None")
|
||||
return
|
||||
self.center = (
|
||||
start_position.x + offset_x + self.size.width / 2,
|
||||
start_position.y + offset_y + self.size.height / 2,
|
||||
)
|
||||
|
||||
self.description = element_attribute(element, kAXDescriptionAttribute)
|
||||
self.role_description = element_attribute(element, kAXRoleDescriptionAttribute)
|
||||
attribute_value = element_attribute(element, kAXValueAttribute)
|
||||
|
||||
# Set value
|
||||
self.value = attribute_value
|
||||
if attribute_value is not None:
|
||||
if isinstance(attribute_value, Foundation.NSArray): # type: ignore
|
||||
self.value = []
|
||||
for value in attribute_value:
|
||||
self.value.append(value)
|
||||
# Check if it's an accessibility element by checking its type ID
|
||||
elif Foundation.CFGetTypeID(attribute_value) == AXUIElementGetTypeID(): # type: ignore
|
||||
self.value = UIElement(attribute_value, offset_x, offset_y)
|
||||
|
||||
# Set children
|
||||
if self.max_depth is None or self.max_depth > 0:
|
||||
self.children = self._get_children(element, start_position, offset_x, offset_y)
|
||||
else:
|
||||
self.children = []
|
||||
|
||||
self.calculate_hashes()
|
||||
|
||||
def _set_bboxes(self, parents_visible_bbox):
|
||||
if not self.position or not self.size:
|
||||
self.bbox = None
|
||||
self.visible_bbox = None
|
||||
return
|
||||
self.bbox = [
|
||||
int(self.position.x),
|
||||
int(self.position.y),
|
||||
int(self.position.x + self.size.width),
|
||||
int(self.position.y + self.size.height),
|
||||
]
|
||||
if parents_visible_bbox:
|
||||
# check if not intersected
|
||||
if (
|
||||
self.bbox[0] > parents_visible_bbox[2]
|
||||
or self.bbox[1] > parents_visible_bbox[3]
|
||||
or self.bbox[2] < parents_visible_bbox[0]
|
||||
or self.bbox[3] < parents_visible_bbox[1]
|
||||
):
|
||||
self.visible_bbox = None
|
||||
else:
|
||||
self.visible_bbox = [
|
||||
int(max(self.bbox[0], parents_visible_bbox[0])),
|
||||
int(max(self.bbox[1], parents_visible_bbox[1])),
|
||||
int(min(self.bbox[2], parents_visible_bbox[2])),
|
||||
int(min(self.bbox[3], parents_visible_bbox[3])),
|
||||
]
|
||||
else:
|
||||
self.visible_bbox = self.bbox
|
||||
|
||||
def _get_children(self, element, start_position, offset_x, offset_y):
|
||||
children = element_attribute(element, kAXChildrenAttribute)
|
||||
visible_children = element_attribute(element, kAXVisibleChildrenAttribute)
|
||||
found_children = []
|
||||
if children is not None:
|
||||
found_children.extend(children)
|
||||
else:
|
||||
if visible_children is not None:
|
||||
found_children.extend(visible_children)
|
||||
|
||||
result = []
|
||||
if self.max_depth is None or self.max_depth > 0:
|
||||
for child in found_children:
|
||||
child = UIElement(
|
||||
child,
|
||||
offset_x,
|
||||
offset_y,
|
||||
self.max_depth - 1 if self.max_depth is not None else None,
|
||||
self.visible_bbox,
|
||||
)
|
||||
result.append(child)
|
||||
return result
|
||||
|
||||
def calculate_hashes(self):
|
||||
self.identifier = self.component_hash()
|
||||
self.content_identifier = self.children_content_hash(self.children)
|
||||
|
||||
def component_hash(self):
|
||||
if self.position is None or self.size is None:
|
||||
return ""
|
||||
position_string = f"{self.position.x:.0f};{self.position.y:.0f}"
|
||||
size_string = f"{self.size.width:.0f};{self.size.height:.0f}"
|
||||
enabled_string = str(self.enabled)
|
||||
# Ensure role is a string
|
||||
role_string = ""
|
||||
if self.role is not None:
|
||||
role_string = str(self.role[0]) if isinstance(self.role, tuple) else str(self.role)
|
||||
return self.hash_from_string(position_string + size_string + enabled_string + role_string)
|
||||
|
||||
def hash_from_string(self, string):
|
||||
if string is None or string == "":
|
||||
return ""
|
||||
from hashlib import md5
|
||||
|
||||
return md5(string.encode()).hexdigest()
|
||||
|
||||
def children_content_hash(self, children):
|
||||
if len(children) == 0:
|
||||
return ""
|
||||
all_content_hashes = []
|
||||
all_hashes = []
|
||||
for child in children:
|
||||
all_content_hashes.append(child.content_identifier)
|
||||
all_hashes.append(child.identifier)
|
||||
all_content_hashes.sort()
|
||||
if len(all_content_hashes) == 0:
|
||||
return ""
|
||||
content_hash = self.hash_from_string("".join(all_content_hashes))
|
||||
content_structure_hash = self.hash_from_string("".join(all_hashes))
|
||||
return self.hash_from_string(content_hash.join(content_structure_hash))
|
||||
|
||||
def to_dict(self):
|
||||
def children_to_dict(children):
|
||||
result = []
|
||||
for child in children:
|
||||
result.append(child.to_dict())
|
||||
return result
|
||||
|
||||
value = self.value
|
||||
if isinstance(value, UIElement):
|
||||
value = json.dumps(value.to_dict(), indent=4)
|
||||
elif isinstance(value, AppKit.NSDate): # type: ignore
|
||||
value = str(value)
|
||||
|
||||
if self.absolute_position is not None:
|
||||
absolute_position = f"{self.absolute_position.x:.2f};{self.absolute_position.y:.2f}"
|
||||
else:
|
||||
absolute_position = ""
|
||||
|
||||
if self.position is not None:
|
||||
position = f"{self.position.x:.2f};{self.position.y:.2f}"
|
||||
else:
|
||||
position = ""
|
||||
|
||||
if self.size is not None:
|
||||
size = f"{self.size.width:.0f};{self.size.height:.0f}"
|
||||
else:
|
||||
size = ""
|
||||
|
||||
return {
|
||||
"id": self.identifier,
|
||||
"name": self.name,
|
||||
"role": self.role,
|
||||
"description": self.description,
|
||||
"role_description": self.role_description,
|
||||
"value": value,
|
||||
"absolute_position": absolute_position,
|
||||
"position": position,
|
||||
"size": size,
|
||||
"enabled": self.enabled,
|
||||
"bbox": self.bbox,
|
||||
"visible_bbox": self.visible_bbox,
|
||||
"children": children_to_dict(self.children),
|
||||
}
|
||||
|
||||
|
||||
class MacOSAccessibilityHandler(BaseAccessibilityHandler):
|
||||
def get_application_windows(self, pid: int):
|
||||
"""Get all windows for a specific application."""
|
||||
try:
|
||||
app = AXUIElementCreateApplication(pid)
|
||||
err, windows = AXUIElementCopyAttributeValue(app, kAXWindowsAttribute, None)
|
||||
if err == kAXErrorSuccess and windows:
|
||||
if isinstance(windows, Foundation.NSArray): # type: ignore
|
||||
return windows
|
||||
return []
|
||||
except:
|
||||
return []
|
||||
|
||||
def get_all_windows(self):
|
||||
"""Get all visible windows in the system."""
|
||||
try:
|
||||
windows = []
|
||||
running_apps = NSWorkspace.sharedWorkspace().runningApplications()
|
||||
|
||||
for app in running_apps:
|
||||
try:
|
||||
app_name = app.localizedName()
|
||||
pid = app.processIdentifier()
|
||||
|
||||
# Skip system processes and background apps
|
||||
if not app.activationPolicy() == 0: # NSApplicationActivationPolicyRegular
|
||||
continue
|
||||
|
||||
# Get application windows
|
||||
app_windows = self.get_application_windows(pid)
|
||||
|
||||
windows.append(
|
||||
{
|
||||
"app_name": app_name,
|
||||
"pid": pid,
|
||||
"frontmost": app.isActive(),
|
||||
"has_windows": len(app_windows) > 0,
|
||||
"windows": app_windows,
|
||||
}
|
||||
)
|
||||
except:
|
||||
continue
|
||||
|
||||
return windows
|
||||
except:
|
||||
return []
|
||||
|
||||
def get_ax_attribute(self, element, attribute):
|
||||
return element_attribute(element, attribute)
|
||||
|
||||
def serialize_node(self, element):
|
||||
# Create a serializable dictionary representation of an accessibility element
|
||||
result = {}
|
||||
|
||||
# Get basic attributes
|
||||
result["role"] = self.get_ax_attribute(element, kAXRoleAttribute)
|
||||
result["title"] = self.get_ax_attribute(element, kAXTitleAttribute)
|
||||
result["value"] = self.get_ax_attribute(element, kAXValueAttribute)
|
||||
|
||||
# Get position and size if available
|
||||
position = self.get_ax_attribute(element, kAXPositionAttribute)
|
||||
if position:
|
||||
try:
|
||||
position_dict = {"x": position[0], "y": position[1]}
|
||||
result["position"] = position_dict
|
||||
except (IndexError, TypeError):
|
||||
pass
|
||||
|
||||
size = self.get_ax_attribute(element, kAXSizeAttribute)
|
||||
if size:
|
||||
try:
|
||||
size_dict = {"width": size[0], "height": size[1]}
|
||||
result["size"] = size_dict
|
||||
except (IndexError, TypeError):
|
||||
pass
|
||||
|
||||
return result
|
||||
|
||||
async def get_accessibility_tree(self) -> Dict[str, Any]:
|
||||
try:
|
||||
# Get all visible windows first
|
||||
windows = self.get_all_windows()
|
||||
if not windows:
|
||||
return {"success": False, "error": "No visible windows found in the system"}
|
||||
|
||||
# Get the frontmost window
|
||||
frontmost_app = next((w for w in windows if w["frontmost"]), None)
|
||||
if not frontmost_app:
|
||||
frontmost_app = windows[0]
|
||||
|
||||
app_name = frontmost_app["app_name"]
|
||||
|
||||
# Process all applications and their windows
|
||||
processed_windows = []
|
||||
for app in windows:
|
||||
app_windows = app.get("windows", [])
|
||||
if app_windows:
|
||||
window_trees = []
|
||||
for window in app_windows:
|
||||
try:
|
||||
window_element = UIElement(window)
|
||||
window_trees.append(window_element.to_dict())
|
||||
except:
|
||||
continue
|
||||
|
||||
processed_windows.append(
|
||||
{
|
||||
"app_name": app["app_name"],
|
||||
"pid": app["pid"],
|
||||
"frontmost": app["frontmost"],
|
||||
"has_windows": app["has_windows"],
|
||||
"windows": window_trees,
|
||||
}
|
||||
)
|
||||
|
||||
if not any(app["windows"] for app in processed_windows):
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"No accessible windows found. Available applications:\n"
|
||||
+ "\n".join(
|
||||
[
|
||||
f"- {w['app_name']} (PID: {w['pid']}, Active: {w['frontmost']}, Has Windows: {w['has_windows']})"
|
||||
for w in windows
|
||||
]
|
||||
)
|
||||
+ "\nPlease ensure:\n"
|
||||
+ "1. The terminal has accessibility permissions\n"
|
||||
+ "2. The applications have visible windows\n"
|
||||
+ "3. Try clicking on a window you want to inspect",
|
||||
}
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"frontmost_application": app_name,
|
||||
"windows": processed_windows,
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def find_element(
|
||||
self, role: Optional[str] = None, title: Optional[str] = None, value: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
try:
|
||||
system = AXUIElementCreateSystemWide()
|
||||
|
||||
def match_element(element):
|
||||
if role and self.get_ax_attribute(element, kAXRoleAttribute) != role:
|
||||
return False
|
||||
if title and self.get_ax_attribute(element, kAXTitleAttribute) != title:
|
||||
return False
|
||||
if value and str(self.get_ax_attribute(element, kAXValueAttribute)) != value:
|
||||
return False
|
||||
return True
|
||||
|
||||
def search_tree(element):
|
||||
if match_element(element):
|
||||
return self.serialize_node(element)
|
||||
|
||||
children = self.get_ax_attribute(element, kAXChildrenAttribute)
|
||||
if children:
|
||||
for child in children:
|
||||
result = search_tree(child)
|
||||
if result:
|
||||
return result
|
||||
return None
|
||||
|
||||
element = search_tree(system)
|
||||
return {"success": True, "element": element}
|
||||
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
|
||||
class MacOSAutomationHandler(BaseAutomationHandler):
|
||||
# Mouse Actions
|
||||
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
pyautogui.moveTo(x, y)
|
||||
pyautogui.click()
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
pyautogui.moveTo(x, y)
|
||||
pyautogui.rightClick()
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def double_click(
|
||||
self, x: Optional[int] = None, y: Optional[int] = None
|
||||
) -> Dict[str, Any]:
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
pyautogui.moveTo(x, y)
|
||||
pyautogui.doubleClick()
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.moveTo(x, y)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def drag_to(
|
||||
self, x: int, y: int, button: str = "left", duration: float = 0.5
|
||||
) -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.dragTo(x, y, button=button, duration=duration)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Keyboard Actions
|
||||
async def type_text(self, text: str) -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.write(text)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def press_key(self, key: str) -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.press(key)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def hotkey(self, keys: List[str]) -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.hotkey(*keys)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Scrolling Actions
|
||||
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.scroll(-clicks)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.scroll(clicks)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Screen Actions
|
||||
async def screenshot(self) -> Dict[str, Any]:
|
||||
try:
|
||||
from PIL import Image
|
||||
|
||||
screenshot = pyautogui.screenshot()
|
||||
if not isinstance(screenshot, Image.Image):
|
||||
return {"success": False, "error": "Failed to capture screenshot"}
|
||||
|
||||
buffered = BytesIO()
|
||||
screenshot.save(buffered, format="PNG", optimize=True)
|
||||
buffered.seek(0)
|
||||
image_data = base64.b64encode(buffered.getvalue()).decode()
|
||||
return {"success": True, "image_data": image_data}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": f"Screenshot error: {str(e)}"}
|
||||
|
||||
async def get_screen_size(self) -> Dict[str, Any]:
|
||||
try:
|
||||
size = pyautogui.size()
|
||||
return {"success": True, "size": {"width": size.width, "height": size.height}}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def get_cursor_position(self) -> Dict[str, Any]:
|
||||
try:
|
||||
pos = pyautogui.position()
|
||||
return {"success": True, "position": {"x": pos.x, "y": pos.y}}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Clipboard Actions
|
||||
async def copy_to_clipboard(self) -> Dict[str, Any]:
|
||||
try:
|
||||
import pyperclip
|
||||
|
||||
content = pyperclip.paste()
|
||||
return {"success": True, "content": content}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def set_clipboard(self, text: str) -> Dict[str, Any]:
|
||||
try:
|
||||
import pyperclip
|
||||
|
||||
pyperclip.copy(text)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def run_command(self, command: str) -> Dict[str, Any]:
|
||||
"""Run a shell command and return its output."""
|
||||
try:
|
||||
import subprocess
|
||||
|
||||
process = subprocess.run(command, shell=True, capture_output=True, text=True)
|
||||
return {"success": True, "stdout": process.stdout, "stderr": process.stderr}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
123
libs/computer-server/computer_server/main.py
Normal file
123
libs/computer-server/computer_server/main.py
Normal file
@@ -0,0 +1,123 @@
|
||||
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
|
||||
from typing import List, Dict, Any
|
||||
import uvicorn
|
||||
import logging
|
||||
import asyncio
|
||||
import json
|
||||
import traceback
|
||||
from contextlib import redirect_stdout, redirect_stderr
|
||||
from io import StringIO
|
||||
from .handlers.factory import HandlerFactory
|
||||
|
||||
# Set up logging with more detail
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Configure WebSocket with larger message size
|
||||
WEBSOCKET_MAX_SIZE = 1024 * 1024 * 10 # 10MB limit
|
||||
|
||||
# Configure application with WebSocket settings
|
||||
app = FastAPI(
|
||||
title="Computer API",
|
||||
description="API for the Computer project",
|
||||
version="0.1.0",
|
||||
websocket_max_size=WEBSOCKET_MAX_SIZE,
|
||||
)
|
||||
|
||||
|
||||
class ConnectionManager:
|
||||
def __init__(self):
|
||||
self.active_connections: List[WebSocket] = []
|
||||
# Create OS-specific handlers
|
||||
self.accessibility_handler, self.automation_handler = HandlerFactory.create_handlers()
|
||||
|
||||
async def connect(self, websocket: WebSocket):
|
||||
await websocket.accept()
|
||||
self.active_connections.append(websocket)
|
||||
|
||||
def disconnect(self, websocket: WebSocket):
|
||||
self.active_connections.remove(websocket)
|
||||
|
||||
|
||||
manager = ConnectionManager()
|
||||
|
||||
|
||||
@app.websocket("/ws", name="websocket_endpoint")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
# WebSocket message size is configured at the app or endpoint level, not on the instance
|
||||
await manager.connect(websocket)
|
||||
|
||||
# Map commands to appropriate handler methods
|
||||
handlers = {
|
||||
# Accessibility commands
|
||||
"get_accessibility_tree": manager.accessibility_handler.get_accessibility_tree,
|
||||
"find_element": manager.accessibility_handler.find_element,
|
||||
# Automation commands
|
||||
"screenshot": manager.automation_handler.screenshot,
|
||||
"left_click": manager.automation_handler.left_click,
|
||||
"right_click": manager.automation_handler.right_click,
|
||||
"double_click": manager.automation_handler.double_click,
|
||||
"scroll_down": manager.automation_handler.scroll_down,
|
||||
"scroll_up": manager.automation_handler.scroll_up,
|
||||
"move_cursor": manager.automation_handler.move_cursor,
|
||||
"type_text": manager.automation_handler.type_text,
|
||||
"press_key": manager.automation_handler.press_key,
|
||||
"drag_to": manager.automation_handler.drag_to,
|
||||
"hotkey": manager.automation_handler.hotkey,
|
||||
"get_cursor_position": manager.automation_handler.get_cursor_position,
|
||||
"get_screen_size": manager.automation_handler.get_screen_size,
|
||||
"copy_to_clipboard": manager.automation_handler.copy_to_clipboard,
|
||||
"set_clipboard": manager.automation_handler.set_clipboard,
|
||||
"run_command": manager.automation_handler.run_command,
|
||||
}
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
data = await websocket.receive_json()
|
||||
command = data.get("command")
|
||||
params = data.get("params", {})
|
||||
|
||||
if command not in handlers:
|
||||
await websocket.send_json(
|
||||
{"success": False, "error": f"Unknown command: {command}"}
|
||||
)
|
||||
continue
|
||||
|
||||
try:
|
||||
result = await handlers[command](**params)
|
||||
await websocket.send_json({"success": True, **result})
|
||||
except Exception as cmd_error:
|
||||
logger.error(f"Error executing command {command}: {str(cmd_error)}")
|
||||
logger.error(traceback.format_exc())
|
||||
await websocket.send_json({"success": False, "error": str(cmd_error)})
|
||||
|
||||
except WebSocketDisconnect:
|
||||
raise
|
||||
except json.JSONDecodeError as json_err:
|
||||
logger.error(f"JSON decode error: {str(json_err)}")
|
||||
await websocket.send_json(
|
||||
{"success": False, "error": f"Invalid JSON: {str(json_err)}"}
|
||||
)
|
||||
except Exception as loop_error:
|
||||
logger.error(f"Error in message loop: {str(loop_error)}")
|
||||
logger.error(traceback.format_exc())
|
||||
await websocket.send_json({"success": False, "error": str(loop_error)})
|
||||
|
||||
except WebSocketDisconnect:
|
||||
logger.info("Client disconnected")
|
||||
manager.disconnect(websocket)
|
||||
except Exception as e:
|
||||
logger.error(f"Fatal error in websocket connection: {str(e)}")
|
||||
logger.error(traceback.format_exc())
|
||||
try:
|
||||
await websocket.close()
|
||||
except:
|
||||
pass
|
||||
manager.disconnect(websocket)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
uvicorn.run(app, host="0.0.0.0", port=8000)
|
||||
93
libs/computer-server/computer_server/server.py
Normal file
93
libs/computer-server/computer_server/server.py
Normal file
@@ -0,0 +1,93 @@
|
||||
"""
|
||||
Server interface for Computer API.
|
||||
Provides a clean API for starting and stopping the server.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import uvicorn
|
||||
from typing import Optional
|
||||
from fastapi import FastAPI
|
||||
|
||||
from .main import app as fastapi_app
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Server:
|
||||
"""
|
||||
Server interface for Computer API.
|
||||
|
||||
Usage:
|
||||
from computer_api import Server
|
||||
|
||||
# Synchronous usage
|
||||
server = Server()
|
||||
server.start() # Blocks until server is stopped
|
||||
|
||||
# Asynchronous usage
|
||||
server = Server()
|
||||
await server.start_async() # Starts server in background
|
||||
# Do other things
|
||||
await server.stop() # Stop the server
|
||||
"""
|
||||
|
||||
def __init__(self, host: str = "0.0.0.0", port: int = 8000, log_level: str = "info"):
|
||||
"""
|
||||
Initialize the server.
|
||||
|
||||
Args:
|
||||
host: Host to bind the server to
|
||||
port: Port to bind the server to
|
||||
log_level: Logging level (debug, info, warning, error, critical)
|
||||
"""
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.log_level = log_level
|
||||
self.app = fastapi_app
|
||||
self._server_task: Optional[asyncio.Task] = None
|
||||
self._should_exit = asyncio.Event()
|
||||
|
||||
def start(self) -> None:
|
||||
"""
|
||||
Start the server synchronously. This will block until the server is stopped.
|
||||
"""
|
||||
uvicorn.run(self.app, host=self.host, port=self.port, log_level=self.log_level)
|
||||
|
||||
async def start_async(self) -> None:
|
||||
"""
|
||||
Start the server asynchronously. This will return immediately and the server
|
||||
will run in the background.
|
||||
"""
|
||||
server_config = uvicorn.Config(
|
||||
self.app, host=self.host, port=self.port, log_level=self.log_level
|
||||
)
|
||||
|
||||
self._should_exit.clear()
|
||||
server = uvicorn.Server(server_config)
|
||||
|
||||
# Create a task to run the server
|
||||
self._server_task = asyncio.create_task(server.serve())
|
||||
|
||||
# Wait a short time to ensure the server starts
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
logger.info(f"Server started at http://{self.host}:{self.port}")
|
||||
|
||||
async def stop(self) -> None:
|
||||
"""
|
||||
Stop the server if it's running asynchronously.
|
||||
"""
|
||||
if self._server_task and not self._server_task.done():
|
||||
# Signal the server to exit
|
||||
self._should_exit.set()
|
||||
|
||||
# Cancel the server task
|
||||
self._server_task.cancel()
|
||||
|
||||
try:
|
||||
await self._server_task
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Server stopped")
|
||||
|
||||
self._server_task = None
|
||||
Reference in New Issue
Block a user