mirror of
https://github.com/trycua/computer.git
synced 2026-05-06 23:21:32 -05:00
Reorganize lib folder w/typescript and python roots, initialize core library.
This commit is contained in:
@@ -0,0 +1,209 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Optional, Dict, Any, List, Tuple
|
||||
|
||||
class BaseAccessibilityHandler(ABC):
|
||||
"""Abstract base class for OS-specific accessibility handlers."""
|
||||
|
||||
@abstractmethod
|
||||
async def get_accessibility_tree(self) -> Dict[str, Any]:
|
||||
"""Get the accessibility tree of the current window."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def find_element(self, role: Optional[str] = None,
|
||||
title: Optional[str] = None,
|
||||
value: Optional[str] = None) -> Dict[str, Any]:
|
||||
"""Find an element in the accessibility tree by criteria."""
|
||||
pass
|
||||
|
||||
class BaseFileHandler(ABC):
|
||||
"""Abstract base class for OS-specific file handlers."""
|
||||
|
||||
@abstractmethod
|
||||
async def file_exists(self, path: str) -> Dict[str, Any]:
|
||||
"""Check if a file exists at the specified path."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def directory_exists(self, path: str) -> Dict[str, Any]:
|
||||
"""Check if a directory exists at the specified path."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def list_dir(self, path: str) -> Dict[str, Any]:
|
||||
"""List the contents of a directory."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def read_text(self, path: str) -> Dict[str, Any]:
|
||||
"""Read the text contents of a file."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def write_text(self, path: str, content: str) -> Dict[str, Any]:
|
||||
"""Write text content to a file."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def read_bytes(self, path: str) -> Dict[str, Any]:
|
||||
"""Read the binary contents of a file. Sent over the websocket as a base64 string."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def write_bytes(self, path: str, content_b64: str) -> Dict[str, Any]:
|
||||
"""Write binary content to a file. Sent over the websocket as a base64 string."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def delete_file(self, path: str) -> Dict[str, Any]:
|
||||
"""Delete a file."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def create_dir(self, path: str) -> Dict[str, Any]:
|
||||
"""Create a directory."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def delete_dir(self, path: str) -> Dict[str, Any]:
|
||||
"""Delete a directory."""
|
||||
pass
|
||||
|
||||
class BaseAutomationHandler(ABC):
|
||||
"""Abstract base class for OS-specific automation handlers.
|
||||
|
||||
Categories:
|
||||
- Mouse Actions: Methods for mouse control
|
||||
- Keyboard Actions: Methods for keyboard input
|
||||
- Scrolling Actions: Methods for scrolling
|
||||
- Screen Actions: Methods for screen interaction
|
||||
- Clipboard Actions: Methods for clipboard operations
|
||||
"""
|
||||
|
||||
# Mouse Actions
|
||||
@abstractmethod
|
||||
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
|
||||
"""Perform a mouse down at the current or specified position."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
|
||||
"""Perform a mouse up at the current or specified position."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||||
"""Perform a left click at the current or specified position."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||||
"""Perform a right click at the current or specified position."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def double_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||||
"""Perform a double click at the current or specified position."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
|
||||
"""Move the cursor to the specified position."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
|
||||
"""Drag the cursor from current position to specified coordinates.
|
||||
|
||||
Args:
|
||||
x: The x coordinate to drag to
|
||||
y: The y coordinate to drag to
|
||||
button: The mouse button to use ('left', 'middle', 'right')
|
||||
duration: How long the drag should take in seconds
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def drag(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
|
||||
"""Drag the cursor from current position to specified coordinates.
|
||||
|
||||
Args:
|
||||
path: A list of tuples of x and y coordinates to drag to
|
||||
button: The mouse button to use ('left', 'middle', 'right')
|
||||
duration: How long the drag should take in seconds
|
||||
"""
|
||||
pass
|
||||
|
||||
# Keyboard Actions
|
||||
@abstractmethod
|
||||
async def key_down(self, key: str) -> Dict[str, Any]:
|
||||
"""Press and hold the specified key."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def key_up(self, key: str) -> Dict[str, Any]:
|
||||
"""Release the specified key."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def type_text(self, text: str) -> Dict[str, Any]:
|
||||
"""Type the specified text."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def press_key(self, key: str) -> Dict[str, Any]:
|
||||
"""Press the specified key."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def hotkey(self, *keys: str) -> Dict[str, Any]:
|
||||
"""Press a combination of keys together."""
|
||||
pass
|
||||
|
||||
# Scrolling Actions
|
||||
@abstractmethod
|
||||
async def scroll(self, x: int, y: int) -> Dict[str, Any]:
|
||||
"""Scroll the specified amount."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
|
||||
"""Scroll down by the specified number of clicks."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
|
||||
"""Scroll up by the specified number of clicks."""
|
||||
pass
|
||||
|
||||
# Screen Actions
|
||||
@abstractmethod
|
||||
async def screenshot(self) -> Dict[str, Any]:
|
||||
"""Take a screenshot and return base64 encoded image data."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def get_screen_size(self) -> Dict[str, Any]:
|
||||
"""Get the screen size of the VM."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def get_cursor_position(self) -> Dict[str, Any]:
|
||||
"""Get the current cursor position."""
|
||||
pass
|
||||
|
||||
# Clipboard Actions
|
||||
@abstractmethod
|
||||
async def copy_to_clipboard(self) -> Dict[str, Any]:
|
||||
"""Get the current clipboard content."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def set_clipboard(self, text: str) -> Dict[str, Any]:
|
||||
"""Set the clipboard content."""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
async def run_command(self, command: str) -> Dict[str, Any]:
|
||||
"""Run a command and return the output."""
|
||||
pass
|
||||
@@ -0,0 +1,68 @@
|
||||
import platform
|
||||
import subprocess
|
||||
from typing import Tuple, Type
|
||||
from .base import BaseAccessibilityHandler, BaseAutomationHandler, BaseFileHandler
|
||||
from computer_server.diorama.base import BaseDioramaHandler
|
||||
|
||||
# Conditionally import platform-specific handlers
|
||||
system = platform.system().lower()
|
||||
if system == 'darwin':
|
||||
from .macos import MacOSAccessibilityHandler, MacOSAutomationHandler
|
||||
from computer_server.diorama.macos import MacOSDioramaHandler
|
||||
elif system == 'linux':
|
||||
from .linux import LinuxAccessibilityHandler, LinuxAutomationHandler
|
||||
elif system == 'windows':
|
||||
from .windows import WindowsAccessibilityHandler, WindowsAutomationHandler
|
||||
|
||||
from .generic import GenericFileHandler
|
||||
|
||||
class HandlerFactory:
|
||||
"""Factory for creating OS-specific handlers."""
|
||||
|
||||
@staticmethod
|
||||
def _get_current_os() -> str:
|
||||
"""Determine the current OS.
|
||||
|
||||
Returns:
|
||||
str: The OS type ('darwin' for macOS, 'linux' for Linux, or 'windows' for Windows)
|
||||
|
||||
Raises:
|
||||
RuntimeError: If unable to determine the current OS
|
||||
"""
|
||||
try:
|
||||
# Use platform.system() as primary method
|
||||
system = platform.system().lower()
|
||||
if system in ['darwin', 'linux', 'windows']:
|
||||
return system
|
||||
|
||||
# Fallback to uname if platform.system() doesn't return expected values (Unix-like systems only)
|
||||
result = subprocess.run(['uname', '-s'], capture_output=True, text=True)
|
||||
if result.returncode == 0:
|
||||
return result.stdout.strip().lower()
|
||||
|
||||
raise RuntimeError(f"Unsupported OS: {system}")
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to determine current OS: {str(e)}")
|
||||
|
||||
@staticmethod
|
||||
def create_handlers() -> Tuple[BaseAccessibilityHandler, BaseAutomationHandler, BaseDioramaHandler, BaseFileHandler]:
|
||||
"""Create and return appropriate handlers for the current OS.
|
||||
|
||||
Returns:
|
||||
Tuple[BaseAccessibilityHandler, BaseAutomationHandler, BaseDioramaHandler, BaseFileHandler]: A tuple containing
|
||||
the appropriate accessibility, automation, diorama, and file handlers for the current OS.
|
||||
|
||||
Raises:
|
||||
NotImplementedError: If the current OS is not supported
|
||||
RuntimeError: If unable to determine the current OS
|
||||
"""
|
||||
os_type = HandlerFactory._get_current_os()
|
||||
|
||||
if os_type == 'darwin':
|
||||
return MacOSAccessibilityHandler(), MacOSAutomationHandler(), MacOSDioramaHandler(), GenericFileHandler()
|
||||
elif os_type == 'linux':
|
||||
return LinuxAccessibilityHandler(), LinuxAutomationHandler(), BaseDioramaHandler(), GenericFileHandler()
|
||||
elif os_type == 'windows':
|
||||
return WindowsAccessibilityHandler(), WindowsAutomationHandler(), BaseDioramaHandler(), GenericFileHandler()
|
||||
else:
|
||||
raise NotImplementedError(f"OS '{os_type}' is not supported")
|
||||
@@ -0,0 +1,82 @@
|
||||
"""
|
||||
Generic handlers for all OSes.
|
||||
|
||||
Includes:
|
||||
- FileHandler
|
||||
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Dict, Any
|
||||
from .base import BaseFileHandler
|
||||
import base64
|
||||
|
||||
def resolve_path(path: str) -> Path:
|
||||
"""Resolve a path to its absolute path. Expand ~ to the user's home directory."""
|
||||
return Path(path).expanduser().resolve()
|
||||
|
||||
class GenericFileHandler(BaseFileHandler):
|
||||
async def file_exists(self, path: str) -> Dict[str, Any]:
|
||||
try:
|
||||
return {"success": True, "exists": resolve_path(path).is_file()}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def directory_exists(self, path: str) -> Dict[str, Any]:
|
||||
try:
|
||||
return {"success": True, "exists": resolve_path(path).is_dir()}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def list_dir(self, path: str) -> Dict[str, Any]:
|
||||
try:
|
||||
return {"success": True, "files": [p.name for p in resolve_path(path).iterdir() if p.is_file() or p.is_dir()]}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def read_text(self, path: str) -> Dict[str, Any]:
|
||||
try:
|
||||
return {"success": True, "content": resolve_path(path).read_text()}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def write_text(self, path: str, content: str) -> Dict[str, Any]:
|
||||
try:
|
||||
resolve_path(path).write_text(content)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def write_bytes(self, path: str, content_b64: str) -> Dict[str, Any]:
|
||||
try:
|
||||
resolve_path(path).write_bytes(base64.b64decode(content_b64))
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def read_bytes(self, path: str) -> Dict[str, Any]:
|
||||
try:
|
||||
return {"success": True, "content_b64": base64.b64encode(resolve_path(path).read_bytes()).decode('utf-8')}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def delete_file(self, path: str) -> Dict[str, Any]:
|
||||
try:
|
||||
resolve_path(path).unlink()
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def create_dir(self, path: str) -> Dict[str, Any]:
|
||||
try:
|
||||
resolve_path(path).mkdir(parents=True, exist_ok=True)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def delete_dir(self, path: str) -> Dict[str, Any]:
|
||||
try:
|
||||
resolve_path(path).rmdir()
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
@@ -0,0 +1,284 @@
|
||||
"""
|
||||
Linux implementation of automation and accessibility handlers.
|
||||
|
||||
This implementation attempts to use pyautogui for GUI automation when available.
|
||||
If running in a headless environment without X11, it will fall back to simulated responses.
|
||||
To use GUI automation in a headless environment:
|
||||
1. Install Xvfb: sudo apt-get install xvfb
|
||||
2. Run with virtual display: xvfb-run python -m computer_server
|
||||
"""
|
||||
from typing import Dict, Any, List, Tuple, Optional
|
||||
import logging
|
||||
import subprocess
|
||||
import base64
|
||||
import os
|
||||
import json
|
||||
from io import BytesIO
|
||||
|
||||
# Configure logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Try to import pyautogui, but don't fail if it's not available
|
||||
# This allows the server to run in headless environments
|
||||
try:
|
||||
import pyautogui
|
||||
|
||||
logger.info("pyautogui successfully imported, GUI automation available")
|
||||
except Exception as e:
|
||||
logger.warning(f"pyautogui import failed: {str(e)}. GUI operations will be simulated.")
|
||||
|
||||
from .base import BaseAccessibilityHandler, BaseAutomationHandler
|
||||
|
||||
class LinuxAccessibilityHandler(BaseAccessibilityHandler):
|
||||
"""Linux implementation of accessibility handler."""
|
||||
|
||||
async def get_accessibility_tree(self) -> Dict[str, Any]:
|
||||
"""Get the accessibility tree of the current window."""
|
||||
# Linux doesn't have equivalent accessibility API like macOS
|
||||
# Return a minimal dummy tree
|
||||
logger.info("Getting accessibility tree (simulated, no accessibility API available on Linux)")
|
||||
return {
|
||||
"success": True,
|
||||
"tree": {
|
||||
"role": "Window",
|
||||
"title": "Linux Window",
|
||||
"position": {"x": 0, "y": 0},
|
||||
"size": {"width": 1920, "height": 1080},
|
||||
"children": []
|
||||
}
|
||||
}
|
||||
|
||||
async def find_element(self, role: Optional[str] = None,
|
||||
title: Optional[str] = None,
|
||||
value: Optional[str] = None) -> Dict[str, Any]:
|
||||
"""Find an element in the accessibility tree by criteria."""
|
||||
logger.info(f"Finding element with role={role}, title={title}, value={value} (not supported on Linux)")
|
||||
return {
|
||||
"success": False,
|
||||
"message": "Element search not supported on Linux"
|
||||
}
|
||||
|
||||
def get_cursor_position(self) -> Tuple[int, int]:
|
||||
"""Get the current cursor position."""
|
||||
try:
|
||||
pos = pyautogui.position()
|
||||
return pos.x, pos.y
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to get cursor position with pyautogui: {e}")
|
||||
|
||||
logger.info("Getting cursor position (simulated)")
|
||||
return 0, 0
|
||||
|
||||
def get_screen_size(self) -> Tuple[int, int]:
|
||||
"""Get the screen size."""
|
||||
try:
|
||||
size = pyautogui.size()
|
||||
return size.width, size.height
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to get screen size with pyautogui: {e}")
|
||||
|
||||
logger.info("Getting screen size (simulated)")
|
||||
return 1920, 1080
|
||||
|
||||
class LinuxAutomationHandler(BaseAutomationHandler):
|
||||
"""Linux implementation of automation handler using pyautogui."""
|
||||
|
||||
# Mouse Actions
|
||||
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
pyautogui.moveTo(x, y)
|
||||
pyautogui.mouseDown(button=button)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
pyautogui.moveTo(x, y)
|
||||
pyautogui.mouseUp(button=button)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.moveTo(x, y)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
pyautogui.moveTo(x, y)
|
||||
pyautogui.click()
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
pyautogui.moveTo(x, y)
|
||||
pyautogui.rightClick()
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def double_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
pyautogui.moveTo(x, y)
|
||||
pyautogui.doubleClick(interval=0.1)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def click(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
pyautogui.moveTo(x, y)
|
||||
pyautogui.click(button=button)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.dragTo(x, y, duration=duration, button=button)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def drag(self, start_x: int, start_y: int, end_x: int, end_y: int, button: str = "left") -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.moveTo(start_x, start_y)
|
||||
pyautogui.dragTo(end_x, end_y, duration=0.5, button=button)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def drag_path(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
|
||||
try:
|
||||
if not path:
|
||||
return {"success": False, "error": "Path is empty"}
|
||||
pyautogui.moveTo(*path[0])
|
||||
for x, y in path[1:]:
|
||||
pyautogui.dragTo(x, y, duration=duration, button=button)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Keyboard Actions
|
||||
async def key_down(self, key: str) -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.keyDown(key)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def key_up(self, key: str) -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.keyUp(key)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def type_text(self, text: str) -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.write(text)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def press_key(self, key: str) -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.press(key)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def hotkey(self, keys: List[str]) -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.hotkey(*keys)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Scrolling Actions
|
||||
async def scroll(self, x: int, y: int) -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.scroll(x, y)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.scroll(-clicks)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
|
||||
try:
|
||||
pyautogui.scroll(clicks)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Screen Actions
|
||||
async def screenshot(self) -> Dict[str, Any]:
|
||||
try:
|
||||
from PIL import Image
|
||||
screenshot = pyautogui.screenshot()
|
||||
if not isinstance(screenshot, Image.Image):
|
||||
return {"success": False, "error": "Failed to capture screenshot"}
|
||||
buffered = BytesIO()
|
||||
screenshot.save(buffered, format="PNG", optimize=True)
|
||||
buffered.seek(0)
|
||||
image_data = base64.b64encode(buffered.getvalue()).decode()
|
||||
return {"success": True, "image_data": image_data}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": f"Screenshot error: {str(e)}"}
|
||||
|
||||
async def get_screen_size(self) -> Dict[str, Any]:
|
||||
try:
|
||||
size = pyautogui.size()
|
||||
return {"success": True, "size": {"width": size.width, "height": size.height}}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def get_cursor_position(self) -> Dict[str, Any]:
|
||||
try:
|
||||
pos = pyautogui.position()
|
||||
return {"success": True, "position": {"x": pos.x, "y": pos.y}}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Clipboard Actions
|
||||
async def copy_to_clipboard(self) -> Dict[str, Any]:
|
||||
try:
|
||||
import pyperclip
|
||||
content = pyperclip.paste()
|
||||
return {"success": True, "content": content}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def set_clipboard(self, text: str) -> Dict[str, Any]:
|
||||
try:
|
||||
import pyperclip
|
||||
pyperclip.copy(text)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Command Execution
|
||||
async def run_command(self, command: str) -> Dict[str, Any]:
|
||||
try:
|
||||
process = subprocess.run(command, shell=True, capture_output=True, text=True)
|
||||
return {"success": True, "stdout": process.stdout, "stderr": process.stderr, "return_code": process.returncode}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
@@ -0,0 +1,943 @@
|
||||
import pyautogui
|
||||
from pynput.mouse import Button, Controller as MouseController
|
||||
from pynput.keyboard import Key, Controller as KeyboardController
|
||||
import time
|
||||
import base64
|
||||
from io import BytesIO
|
||||
from typing import Optional, Dict, Any, List, Tuple
|
||||
from ctypes import byref, c_void_p, POINTER
|
||||
from AppKit import NSWorkspace # type: ignore
|
||||
import AppKit
|
||||
from Quartz.CoreGraphics import * # type: ignore
|
||||
from Quartz.CoreGraphics import CGPoint, CGSize # type: ignore
|
||||
import Foundation
|
||||
from ApplicationServices import (
|
||||
AXUIElementCreateSystemWide, # type: ignore
|
||||
AXUIElementCreateApplication, # type: ignore
|
||||
AXUIElementCopyAttributeValue, # type: ignore
|
||||
AXUIElementCopyAttributeValues, # type: ignore
|
||||
kAXFocusedWindowAttribute, # type: ignore
|
||||
kAXWindowsAttribute, # type: ignore
|
||||
kAXMainWindowAttribute, # type: ignore
|
||||
kAXChildrenAttribute, # type: ignore
|
||||
kAXRoleAttribute, # type: ignore
|
||||
kAXTitleAttribute, # type: ignore
|
||||
kAXValueAttribute, # type: ignore
|
||||
kAXDescriptionAttribute, # type: ignore
|
||||
kAXEnabledAttribute, # type: ignore
|
||||
kAXPositionAttribute, # type: ignore
|
||||
kAXSizeAttribute, # type: ignore
|
||||
kAXErrorSuccess, # type: ignore
|
||||
AXValueGetType, # type: ignore
|
||||
kAXValueCGSizeType, # type: ignore
|
||||
kAXValueCGPointType, # type: ignore
|
||||
kAXValueCFRangeType, # type: ignore
|
||||
AXUIElementGetTypeID, # type: ignore
|
||||
AXValueGetValue, # type: ignore
|
||||
kAXVisibleChildrenAttribute, # type: ignore
|
||||
kAXRoleDescriptionAttribute, # type: ignore
|
||||
kAXFocusedApplicationAttribute, # type: ignore
|
||||
kAXFocusedUIElementAttribute, # type: ignore
|
||||
kAXSelectedTextAttribute, # type: ignore
|
||||
kAXSelectedTextRangeAttribute, # type: ignore
|
||||
)
|
||||
import objc
|
||||
import re
|
||||
import json
|
||||
import copy
|
||||
from .base import BaseAccessibilityHandler, BaseAutomationHandler
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Constants for accessibility API
|
||||
kAXErrorSuccess = 0
|
||||
kAXRoleAttribute = "AXRole"
|
||||
kAXTitleAttribute = "AXTitle"
|
||||
kAXValueAttribute = "AXValue"
|
||||
kAXWindowsAttribute = "AXWindows"
|
||||
kAXFocusedAttribute = "AXFocused"
|
||||
kAXPositionAttribute = "AXPosition"
|
||||
kAXSizeAttribute = "AXSize"
|
||||
kAXChildrenAttribute = "AXChildren"
|
||||
kAXMenuBarAttribute = "AXMenuBar"
|
||||
kAXMenuBarItemAttribute = "AXMenuBarItem"
|
||||
|
||||
# Constants for window properties
|
||||
kCGWindowLayer = "kCGWindowLayer" # Z-order information (lower values are higher in the stack)
|
||||
kCGWindowAlpha = "kCGWindowAlpha" # Window opacity
|
||||
|
||||
# Constants for application activation options
|
||||
NSApplicationActivationOptions = {
|
||||
"regular": 0, # Default activation
|
||||
"bringing_all_windows_forward": 1 << 0, # NSApplicationActivateAllWindows
|
||||
"ignoring_other_apps": 1 << 1 # NSApplicationActivateIgnoringOtherApps
|
||||
}
|
||||
|
||||
def CFAttributeToPyObject(attrValue):
|
||||
def list_helper(list_value):
|
||||
list_builder = []
|
||||
for item in list_value:
|
||||
list_builder.append(CFAttributeToPyObject(item))
|
||||
return list_builder
|
||||
|
||||
def number_helper(number_value):
|
||||
success, int_value = Foundation.CFNumberGetValue( # type: ignore
|
||||
number_value, Foundation.kCFNumberIntType, None # type: ignore
|
||||
)
|
||||
if success:
|
||||
return int(int_value)
|
||||
|
||||
success, float_value = Foundation.CFNumberGetValue( # type: ignore
|
||||
number_value, Foundation.kCFNumberDoubleType, None # type: ignore
|
||||
)
|
||||
if success:
|
||||
return float(float_value)
|
||||
return None
|
||||
|
||||
def axuielement_helper(element_value):
|
||||
return element_value
|
||||
|
||||
cf_attr_type = Foundation.CFGetTypeID(attrValue) # type: ignore
|
||||
cf_type_mapping = {
|
||||
Foundation.CFStringGetTypeID(): str, # type: ignore
|
||||
Foundation.CFBooleanGetTypeID(): bool, # type: ignore
|
||||
Foundation.CFArrayGetTypeID(): list_helper, # type: ignore
|
||||
Foundation.CFNumberGetTypeID(): number_helper, # type: ignore
|
||||
AXUIElementGetTypeID(): axuielement_helper, # type: ignore
|
||||
}
|
||||
try:
|
||||
return cf_type_mapping[cf_attr_type](attrValue)
|
||||
except KeyError:
|
||||
# did not get a supported CF type. Move on to AX type
|
||||
pass
|
||||
|
||||
ax_attr_type = AXValueGetType(attrValue)
|
||||
ax_type_map = {
|
||||
kAXValueCGSizeType: Foundation.NSSizeFromString, # type: ignore
|
||||
kAXValueCGPointType: Foundation.NSPointFromString, # type: ignore
|
||||
kAXValueCFRangeType: Foundation.NSRangeFromString, # type: ignore
|
||||
}
|
||||
try:
|
||||
search_result = re.search("{.*}", attrValue.description())
|
||||
if search_result:
|
||||
extracted_str = search_result.group()
|
||||
return tuple(ax_type_map[ax_attr_type](extracted_str))
|
||||
return None
|
||||
except KeyError:
|
||||
return None
|
||||
|
||||
|
||||
def element_attribute(element, attribute):
|
||||
if attribute == kAXChildrenAttribute:
|
||||
err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None)
|
||||
if err == kAXErrorSuccess:
|
||||
if isinstance(value, Foundation.NSArray): # type: ignore
|
||||
return CFAttributeToPyObject(value)
|
||||
else:
|
||||
return value
|
||||
err, value = AXUIElementCopyAttributeValue(element, attribute, None)
|
||||
if err == kAXErrorSuccess:
|
||||
if isinstance(value, Foundation.NSArray): # type: ignore
|
||||
return CFAttributeToPyObject(value)
|
||||
else:
|
||||
return value
|
||||
return None
|
||||
|
||||
|
||||
def element_value(element, type):
|
||||
err, value = AXValueGetValue(element, type, None)
|
||||
if err == True:
|
||||
return value
|
||||
return None
|
||||
|
||||
|
||||
class UIElement:
|
||||
def __init__(self, element, offset_x=0, offset_y=0, max_depth=None, parents_visible_bbox=None):
|
||||
self.ax_element = element
|
||||
self.content_identifier = ""
|
||||
self.identifier = ""
|
||||
self.name = ""
|
||||
self.children = []
|
||||
self.description = ""
|
||||
self.role_description = ""
|
||||
self.value = None
|
||||
self.max_depth = max_depth
|
||||
|
||||
# Set role
|
||||
self.role = element_attribute(element, kAXRoleAttribute)
|
||||
if self.role is None:
|
||||
self.role = "No role"
|
||||
|
||||
# Set name
|
||||
self.name = element_attribute(element, kAXTitleAttribute)
|
||||
if self.name is not None:
|
||||
# Convert tuple to string if needed
|
||||
if isinstance(self.name, tuple):
|
||||
self.name = str(self.name[0]) if self.name else ""
|
||||
self.name = self.name.replace(" ", "_")
|
||||
|
||||
# Set enabled
|
||||
self.enabled = element_attribute(element, kAXEnabledAttribute)
|
||||
if self.enabled is None:
|
||||
self.enabled = False
|
||||
|
||||
# Set position and size
|
||||
position = element_attribute(element, kAXPositionAttribute)
|
||||
size = element_attribute(element, kAXSizeAttribute)
|
||||
start_position = element_value(position, kAXValueCGPointType)
|
||||
|
||||
if self.role == "AXWindow" and start_position is not None:
|
||||
offset_x = start_position.x
|
||||
offset_y = start_position.y
|
||||
|
||||
self.absolute_position = copy.copy(start_position)
|
||||
self.position = start_position
|
||||
if self.position is not None:
|
||||
self.position.x -= max(0, offset_x)
|
||||
self.position.y -= max(0, offset_y)
|
||||
self.size = element_value(size, kAXValueCGSizeType)
|
||||
|
||||
self._set_bboxes(parents_visible_bbox)
|
||||
|
||||
# Set component center
|
||||
if start_position is None or self.size is None:
|
||||
print("Position is None")
|
||||
return
|
||||
self.center = (
|
||||
start_position.x + offset_x + self.size.width / 2,
|
||||
start_position.y + offset_y + self.size.height / 2,
|
||||
)
|
||||
|
||||
self.description = element_attribute(element, kAXDescriptionAttribute)
|
||||
self.role_description = element_attribute(element, kAXRoleDescriptionAttribute)
|
||||
attribute_value = element_attribute(element, kAXValueAttribute)
|
||||
|
||||
# Set value
|
||||
self.value = attribute_value
|
||||
if attribute_value is not None:
|
||||
if isinstance(attribute_value, Foundation.NSArray): # type: ignore
|
||||
self.value = []
|
||||
for value in attribute_value:
|
||||
self.value.append(value)
|
||||
# Check if it's an accessibility element by checking its type ID
|
||||
elif Foundation.CFGetTypeID(attribute_value) == AXUIElementGetTypeID(): # type: ignore
|
||||
self.value = UIElement(attribute_value, offset_x, offset_y)
|
||||
|
||||
# Set children
|
||||
if self.max_depth is None or self.max_depth > 0:
|
||||
self.children = self._get_children(element, start_position, offset_x, offset_y)
|
||||
else:
|
||||
self.children = []
|
||||
|
||||
self.calculate_hashes()
|
||||
|
||||
def _set_bboxes(self, parents_visible_bbox):
|
||||
if not self.absolute_position or not self.size:
|
||||
self.bbox = None
|
||||
self.visible_bbox = None
|
||||
return
|
||||
self.bbox = [
|
||||
int(self.absolute_position.x),
|
||||
int(self.absolute_position.y),
|
||||
int(self.absolute_position.x + self.size.width),
|
||||
int(self.absolute_position.y + self.size.height),
|
||||
]
|
||||
if parents_visible_bbox:
|
||||
# check if not intersected
|
||||
if (
|
||||
self.bbox[0] > parents_visible_bbox[2]
|
||||
or self.bbox[1] > parents_visible_bbox[3]
|
||||
or self.bbox[2] < parents_visible_bbox[0]
|
||||
or self.bbox[3] < parents_visible_bbox[1]
|
||||
):
|
||||
self.visible_bbox = None
|
||||
else:
|
||||
self.visible_bbox = [
|
||||
int(max(self.bbox[0], parents_visible_bbox[0])),
|
||||
int(max(self.bbox[1], parents_visible_bbox[1])),
|
||||
int(min(self.bbox[2], parents_visible_bbox[2])),
|
||||
int(min(self.bbox[3], parents_visible_bbox[3])),
|
||||
]
|
||||
else:
|
||||
self.visible_bbox = self.bbox
|
||||
|
||||
def _get_children(self, element, start_position, offset_x, offset_y):
|
||||
children = element_attribute(element, kAXChildrenAttribute)
|
||||
visible_children = element_attribute(element, kAXVisibleChildrenAttribute)
|
||||
found_children = []
|
||||
if children is not None:
|
||||
found_children.extend(children)
|
||||
else:
|
||||
if visible_children is not None:
|
||||
found_children.extend(visible_children)
|
||||
|
||||
result = []
|
||||
if self.max_depth is None or self.max_depth > 0:
|
||||
for child in found_children:
|
||||
child = UIElement(
|
||||
child,
|
||||
offset_x,
|
||||
offset_y,
|
||||
self.max_depth - 1 if self.max_depth is not None else None,
|
||||
self.visible_bbox,
|
||||
)
|
||||
result.append(child)
|
||||
return result
|
||||
|
||||
def calculate_hashes(self):
|
||||
self.identifier = self.component_hash()
|
||||
self.content_identifier = self.children_content_hash(self.children)
|
||||
|
||||
def component_hash(self):
|
||||
if self.position is None or self.size is None:
|
||||
return ""
|
||||
position_string = f"{self.position.x:.0f};{self.position.y:.0f}"
|
||||
size_string = f"{self.size.width:.0f};{self.size.height:.0f}"
|
||||
enabled_string = str(self.enabled)
|
||||
# Ensure role is a string
|
||||
role_string = ""
|
||||
if self.role is not None:
|
||||
role_string = str(self.role[0]) if isinstance(self.role, tuple) else str(self.role)
|
||||
return self.hash_from_string(position_string + size_string + enabled_string + role_string)
|
||||
|
||||
def hash_from_string(self, string):
|
||||
if string is None or string == "":
|
||||
return ""
|
||||
from hashlib import md5
|
||||
|
||||
return md5(string.encode()).hexdigest()
|
||||
|
||||
def children_content_hash(self, children):
|
||||
if len(children) == 0:
|
||||
return ""
|
||||
all_content_hashes = []
|
||||
all_hashes = []
|
||||
for child in children:
|
||||
all_content_hashes.append(child.content_identifier)
|
||||
all_hashes.append(child.identifier)
|
||||
all_content_hashes.sort()
|
||||
if len(all_content_hashes) == 0:
|
||||
return ""
|
||||
content_hash = self.hash_from_string("".join(all_content_hashes))
|
||||
content_structure_hash = self.hash_from_string("".join(all_hashes))
|
||||
return self.hash_from_string(content_hash.join(content_structure_hash))
|
||||
|
||||
def to_dict(self):
|
||||
def children_to_dict(children):
|
||||
result = []
|
||||
for child in children:
|
||||
result.append(child.to_dict())
|
||||
return result
|
||||
|
||||
value = self.value
|
||||
if isinstance(value, UIElement):
|
||||
value = json.dumps(value.to_dict(), indent=4)
|
||||
elif isinstance(value, AppKit.NSDate): # type: ignore
|
||||
value = str(value)
|
||||
|
||||
if self.absolute_position is not None:
|
||||
absolute_position = f"{self.absolute_position.x:.2f};{self.absolute_position.y:.2f}"
|
||||
else:
|
||||
absolute_position = ""
|
||||
|
||||
if self.position is not None:
|
||||
position = f"{self.position.x:.2f};{self.position.y:.2f}"
|
||||
else:
|
||||
position = ""
|
||||
|
||||
if self.size is not None:
|
||||
size = f"{self.size.width:.0f};{self.size.height:.0f}"
|
||||
else:
|
||||
size = ""
|
||||
|
||||
return {
|
||||
"id": self.identifier,
|
||||
"name": self.name,
|
||||
"role": self.role,
|
||||
"description": self.description,
|
||||
"role_description": self.role_description,
|
||||
"value": value,
|
||||
"absolute_position": absolute_position,
|
||||
"position": position,
|
||||
"size": size,
|
||||
"enabled": self.enabled,
|
||||
"bbox": self.bbox,
|
||||
"visible_bbox": self.visible_bbox,
|
||||
"children": children_to_dict(self.children),
|
||||
}
|
||||
|
||||
|
||||
import Quartz
|
||||
from AppKit import NSWorkspace, NSRunningApplication
|
||||
from pathlib import Path
|
||||
|
||||
def get_all_windows_zorder():
|
||||
window_list = Quartz.CGWindowListCopyWindowInfo(
|
||||
Quartz.kCGWindowListOptionOnScreenOnly,
|
||||
Quartz.kCGNullWindowID
|
||||
)
|
||||
z_order = {window['kCGWindowNumber']: z_index for z_index, window in enumerate(window_list[::-1])}
|
||||
window_list_all = Quartz.CGWindowListCopyWindowInfo(
|
||||
Quartz.kCGWindowListOptionAll,
|
||||
Quartz.kCGNullWindowID
|
||||
)
|
||||
windows = []
|
||||
for window in window_list_all:
|
||||
window_id = window.get('kCGWindowNumber', 0)
|
||||
window_name = window.get('kCGWindowName', '')
|
||||
window_pid = window.get('kCGWindowOwnerPID', 0)
|
||||
window_bounds = window.get('kCGWindowBounds', {})
|
||||
window_owner = window.get('kCGWindowOwnerName', '')
|
||||
window_is_on_screen = window.get('kCGWindowIsOnscreen', False)
|
||||
layer = window.get('kCGWindowLayer', 0)
|
||||
opacity = window.get('kCGWindowAlpha', 1.0)
|
||||
z_index = z_order.get(window_id, -1)
|
||||
if window_name == "Dock" and window_owner == "Dock":
|
||||
role = "dock"
|
||||
elif window_name == "Menubar" and window_owner == "Window Server":
|
||||
role = "menubar"
|
||||
elif window_owner in ["Window Server", "Dock"]:
|
||||
role = "desktop"
|
||||
else:
|
||||
role = "app"
|
||||
if window_bounds:
|
||||
windows.append({
|
||||
"id": window_id,
|
||||
"name": window_name or "Unnamed Window",
|
||||
"pid": window_pid,
|
||||
"owner": window_owner,
|
||||
"role": role,
|
||||
"is_on_screen": window_is_on_screen,
|
||||
"bounds": {
|
||||
"x": window_bounds.get('X', 0),
|
||||
"y": window_bounds.get('Y', 0),
|
||||
"width": window_bounds.get('Width', 0),
|
||||
"height": window_bounds.get('Height', 0)
|
||||
},
|
||||
"layer": layer,
|
||||
"z_index": z_index,
|
||||
"opacity": opacity
|
||||
})
|
||||
windows = sorted(windows, key=lambda x: x["z_index"])
|
||||
return windows
|
||||
|
||||
def get_app_info(app):
|
||||
return {
|
||||
"name": app.localizedName(),
|
||||
"bundle_id": app.bundleIdentifier(),
|
||||
"pid": app.processIdentifier(),
|
||||
"active": app.isActive(),
|
||||
"hidden": app.isHidden(),
|
||||
"terminated": app.isTerminated(),
|
||||
}
|
||||
|
||||
def get_menubar_items(active_app_pid=None):
|
||||
menubar_items = []
|
||||
if active_app_pid is None:
|
||||
frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()
|
||||
if frontmost_app:
|
||||
active_app_pid = frontmost_app.processIdentifier()
|
||||
else:
|
||||
return menubar_items
|
||||
app_element = AXUIElementCreateApplication(active_app_pid)
|
||||
if app_element is None:
|
||||
return menubar_items
|
||||
menubar = element_attribute(app_element, kAXMenuBarAttribute)
|
||||
if menubar is None:
|
||||
return menubar_items
|
||||
children = element_attribute(menubar, kAXChildrenAttribute)
|
||||
if children is None:
|
||||
return menubar_items
|
||||
for i, item in enumerate(children):
|
||||
title = element_attribute(item, kAXTitleAttribute) or "Untitled"
|
||||
bounds = {"x": 0, "y": 0, "width": 0, "height": 0}
|
||||
position_value = element_attribute(item, kAXPositionAttribute)
|
||||
if position_value:
|
||||
position_value = element_value(position_value, kAXValueCGPointType)
|
||||
bounds["x"] = getattr(position_value, 'x', 0)
|
||||
bounds["y"] = getattr(position_value, 'y', 0)
|
||||
size_value = element_attribute(item, kAXSizeAttribute)
|
||||
if size_value:
|
||||
size_value = element_value(size_value, kAXValueCGSizeType)
|
||||
bounds["width"] = getattr(size_value, 'width', 0)
|
||||
bounds["height"] = getattr(size_value, 'height', 0)
|
||||
menubar_items.append({
|
||||
"title": title,
|
||||
"bounds": bounds,
|
||||
"index": i,
|
||||
"app_pid": active_app_pid
|
||||
})
|
||||
return menubar_items
|
||||
|
||||
def get_dock_items():
|
||||
dock_items = []
|
||||
dock_pid = None
|
||||
running_apps = NSWorkspace.sharedWorkspace().runningApplications()
|
||||
for app in running_apps:
|
||||
if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock":
|
||||
dock_pid = app.processIdentifier()
|
||||
break
|
||||
if dock_pid is None:
|
||||
return dock_items
|
||||
dock_element = AXUIElementCreateApplication(dock_pid)
|
||||
if dock_element is None:
|
||||
return dock_items
|
||||
dock_list = element_attribute(dock_element, kAXChildrenAttribute)
|
||||
if dock_list is None or len(dock_list) == 0:
|
||||
return dock_items
|
||||
dock_app_list = None
|
||||
for child in dock_list:
|
||||
role = element_attribute(child, kAXRoleAttribute)
|
||||
if role == "AXList":
|
||||
dock_app_list = child
|
||||
break
|
||||
if dock_app_list is None:
|
||||
return dock_items
|
||||
items = element_attribute(dock_app_list, kAXChildrenAttribute)
|
||||
if items is None:
|
||||
return dock_items
|
||||
for i, item in enumerate(items):
|
||||
title = element_attribute(item, kAXTitleAttribute) or "Untitled"
|
||||
description = element_attribute(item, kAXDescriptionAttribute) or ""
|
||||
role = element_attribute(item, kAXRoleAttribute) or ""
|
||||
subrole = element_attribute(item, "AXSubrole") or ""
|
||||
bounds = {"x": 0, "y": 0, "width": 0, "height": 0}
|
||||
position_value = element_attribute(item, kAXPositionAttribute)
|
||||
if position_value:
|
||||
position_value = element_value(position_value, kAXValueCGPointType)
|
||||
bounds["x"] = getattr(position_value, 'x', 0)
|
||||
bounds["y"] = getattr(position_value, 'y', 0)
|
||||
size_value = element_attribute(item, kAXSizeAttribute)
|
||||
if size_value:
|
||||
size_value = element_value(size_value, kAXValueCGSizeType)
|
||||
bounds["width"] = getattr(size_value, 'width', 0)
|
||||
bounds["height"] = getattr(size_value, 'height', 0)
|
||||
item_type = "unknown"
|
||||
if subrole == "AXApplicationDockItem":
|
||||
item_type = "application"
|
||||
elif subrole == "AXFolderDockItem":
|
||||
item_type = "folder"
|
||||
elif subrole == "AXDocumentDockItem":
|
||||
item_type = "document"
|
||||
elif subrole == "AXSeparatorDockItem" or role == "AXSeparator":
|
||||
item_type = "separator"
|
||||
elif "trash" in title.lower():
|
||||
item_type = "trash"
|
||||
dock_items.append({
|
||||
"title": title,
|
||||
"description": description,
|
||||
"bounds": bounds,
|
||||
"index": i,
|
||||
"type": item_type,
|
||||
"role": role,
|
||||
"subrole": subrole
|
||||
})
|
||||
return dock_items
|
||||
|
||||
class MacOSAccessibilityHandler(BaseAccessibilityHandler):
|
||||
def get_desktop_state(self):
|
||||
windows = [w for w in get_all_windows_zorder() if w.get("is_on_screen")]
|
||||
running_apps = self.get_running_apps()
|
||||
applications = []
|
||||
pid_to_window_ids = {}
|
||||
# Build a mapping: pid -> list of AX window trees
|
||||
pid_to_ax_trees = {}
|
||||
for app in running_apps:
|
||||
pid = app.processIdentifier()
|
||||
try:
|
||||
app_elem = AXUIElementCreateApplication(pid)
|
||||
err, app_windows = AXUIElementCopyAttributeValue(app_elem, kAXWindowsAttribute, None)
|
||||
trees = []
|
||||
if err == kAXErrorSuccess and app_windows:
|
||||
for ax_win in app_windows:
|
||||
try:
|
||||
trees.append(UIElement(ax_win).to_dict())
|
||||
except Exception as e:
|
||||
trees.append({"error": str(e)})
|
||||
pid_to_ax_trees[pid] = trees
|
||||
except Exception as e:
|
||||
pid_to_ax_trees[pid] = [{"error": str(e)}]
|
||||
# Attach children by pid and index (order)
|
||||
pid_to_idx = {}
|
||||
for win in windows:
|
||||
pid = win["pid"]
|
||||
idx = pid_to_idx.get(pid, 0)
|
||||
ax_trees = pid_to_ax_trees.get(pid, [])
|
||||
win["children"] = ax_trees[idx]["children"] if idx < len(ax_trees) and "children" in ax_trees[idx] else []
|
||||
pid_to_idx[pid] = idx + 1
|
||||
pid_to_window_ids.setdefault(pid, []).append(win["id"])
|
||||
for app in running_apps:
|
||||
info = get_app_info(app)
|
||||
app_pid = info["pid"]
|
||||
applications.append({
|
||||
"info": info,
|
||||
"windows": pid_to_window_ids.get(app_pid, [])
|
||||
})
|
||||
menubar_items = get_menubar_items()
|
||||
dock_items = get_dock_items()
|
||||
return {
|
||||
"applications": applications,
|
||||
"windows": windows,
|
||||
"menubar_items": menubar_items,
|
||||
"dock_items": dock_items
|
||||
}
|
||||
|
||||
def get_application_windows(self, pid: int):
|
||||
"""Get all windows for a specific application."""
|
||||
try:
|
||||
app = AXUIElementCreateApplication(pid)
|
||||
err, windows = AXUIElementCopyAttributeValue(app, kAXWindowsAttribute, None)
|
||||
if err == kAXErrorSuccess and windows:
|
||||
if isinstance(windows, Foundation.NSArray): # type: ignore
|
||||
return windows
|
||||
return []
|
||||
except:
|
||||
return []
|
||||
|
||||
def get_all_windows(self):
|
||||
"""Get all visible windows in the system."""
|
||||
try:
|
||||
windows = []
|
||||
running_apps = self.get_running_apps()
|
||||
|
||||
for app in running_apps:
|
||||
try:
|
||||
app_name = app.localizedName()
|
||||
pid = app.processIdentifier()
|
||||
|
||||
# Skip system processes and background apps
|
||||
if not app.activationPolicy() == 0: # NSApplicationActivationPolicyRegular
|
||||
continue
|
||||
|
||||
# Get application windows
|
||||
app_windows = self.get_application_windows(pid)
|
||||
|
||||
windows.append(
|
||||
{
|
||||
"app_name": app_name,
|
||||
"pid": pid,
|
||||
"frontmost": app.isActive(),
|
||||
"has_windows": len(app_windows) > 0,
|
||||
"windows": app_windows,
|
||||
}
|
||||
)
|
||||
except:
|
||||
continue
|
||||
|
||||
return windows
|
||||
except:
|
||||
return []
|
||||
|
||||
def get_running_apps(self):
|
||||
# From NSWorkspace.runningApplications docs: https://developer.apple.com/documentation/appkit/nsworkspace/runningapplications
|
||||
# "Similar to the NSRunningApplication class’s properties, this property will only change when the main run loop runs in a common mode"
|
||||
# So we need to run the main run loop to get the latest running applications
|
||||
Foundation.CFRunLoopRunInMode(Foundation.kCFRunLoopDefaultMode, 0.1, False) # type: ignore
|
||||
return NSWorkspace.sharedWorkspace().runningApplications()
|
||||
|
||||
def get_ax_attribute(self, element, attribute):
|
||||
return element_attribute(element, attribute)
|
||||
|
||||
def serialize_node(self, element):
|
||||
# Create a serializable dictionary representation of an accessibility element
|
||||
result = {}
|
||||
|
||||
# Get basic attributes
|
||||
result["role"] = self.get_ax_attribute(element, kAXRoleAttribute)
|
||||
result["title"] = self.get_ax_attribute(element, kAXTitleAttribute)
|
||||
result["value"] = self.get_ax_attribute(element, kAXValueAttribute)
|
||||
|
||||
# Get position and size if available
|
||||
position = self.get_ax_attribute(element, kAXPositionAttribute)
|
||||
if position:
|
||||
try:
|
||||
position_dict = {"x": position[0], "y": position[1]}
|
||||
result["position"] = position_dict
|
||||
except (IndexError, TypeError):
|
||||
pass
|
||||
|
||||
size = self.get_ax_attribute(element, kAXSizeAttribute)
|
||||
if size:
|
||||
try:
|
||||
size_dict = {"width": size[0], "height": size[1]}
|
||||
result["size"] = size_dict
|
||||
except (IndexError, TypeError):
|
||||
pass
|
||||
|
||||
return result
|
||||
|
||||
async def get_accessibility_tree(self) -> Dict[str, Any]:
|
||||
try:
|
||||
desktop_state = self.get_desktop_state()
|
||||
return {
|
||||
"success": True,
|
||||
**desktop_state
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def find_element(
|
||||
self, role: Optional[str] = None, title: Optional[str] = None, value: Optional[str] = None
|
||||
) -> Dict[str, Any]:
|
||||
try:
|
||||
system = AXUIElementCreateSystemWide()
|
||||
|
||||
def match_element(element):
|
||||
if role and self.get_ax_attribute(element, kAXRoleAttribute) != role:
|
||||
return False
|
||||
if title and self.get_ax_attribute(element, kAXTitleAttribute) != title:
|
||||
return False
|
||||
if value and str(self.get_ax_attribute(element, kAXValueAttribute)) != value:
|
||||
return False
|
||||
return True
|
||||
|
||||
def search_tree(element):
|
||||
if match_element(element):
|
||||
return self.serialize_node(element)
|
||||
|
||||
children = self.get_ax_attribute(element, kAXChildrenAttribute)
|
||||
if children:
|
||||
for child in children:
|
||||
result = search_tree(child)
|
||||
if result:
|
||||
return result
|
||||
return None
|
||||
|
||||
element = search_tree(system)
|
||||
return {"success": True, "element": element}
|
||||
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
class MacOSAutomationHandler(BaseAutomationHandler):
|
||||
# Mouse Actions
|
||||
mouse = MouseController()
|
||||
keyboard = KeyboardController()
|
||||
|
||||
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
self.mouse.position = (x, y)
|
||||
self.mouse.press(Button.left if button == "left" else Button.right if button == "right" else Button.middle)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
self.mouse.position = (x, y)
|
||||
self.mouse.release(Button.left if button == "left" else Button.right if button == "right" else Button.middle)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
self.mouse.position = (x, y)
|
||||
self.mouse.click(Button.left, 1)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
self.mouse.position = (x, y)
|
||||
self.mouse.click(Button.right, 1)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def double_click(
|
||||
self, x: Optional[int] = None, y: Optional[int] = None
|
||||
) -> Dict[str, Any]:
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
self.mouse.position = (x, y)
|
||||
self.mouse.click(Button.left, 2)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
|
||||
try:
|
||||
self.mouse.position = (x, y)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def drag_to(
|
||||
self, x: int, y: int, button: str = "left", duration: float = 0.5
|
||||
) -> Dict[str, Any]:
|
||||
try:
|
||||
btn = Button.left if button == "left" else Button.right if button == "right" else Button.middle
|
||||
# Press
|
||||
self.mouse.press(btn)
|
||||
# Move with sleep to simulate drag duration
|
||||
start = self.mouse.position
|
||||
steps = 20
|
||||
start_x, start_y = start
|
||||
dx = (x - start_x) / steps
|
||||
dy = (y - start_y) / steps
|
||||
for i in range(steps):
|
||||
self.mouse.position = (int(start_x + dx * (i + 1)), int(start_y + dy * (i + 1)))
|
||||
time.sleep(duration / steps)
|
||||
# Release
|
||||
self.mouse.release(btn)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
try:
|
||||
self.mouse.release(btn)
|
||||
except:
|
||||
pass
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def drag(
|
||||
self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5
|
||||
) -> Dict[str, Any]:
|
||||
try:
|
||||
if not path or len(path) < 2:
|
||||
return {"success": False, "error": "Path must contain at least 2 points"}
|
||||
btn = Button.left if button == "left" else Button.right if button == "right" else Button.middle
|
||||
# Move to the first point
|
||||
self.mouse.position = path[0]
|
||||
self.mouse.press(btn)
|
||||
step_duration = duration / (len(path) - 1) if len(path) > 1 else duration
|
||||
for x, y in path[1:]:
|
||||
self.mouse.position = (x, y)
|
||||
time.sleep(step_duration)
|
||||
self.mouse.release(btn)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
try:
|
||||
self.mouse.release(btn)
|
||||
except:
|
||||
pass
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Keyboard Actions
|
||||
async def key_down(self, key: str) -> Dict[str, Any]:
|
||||
try:
|
||||
# use pyautogui for their key names
|
||||
pyautogui.keyDown(key)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def key_up(self, key: str) -> Dict[str, Any]:
|
||||
try:
|
||||
# use pyautogui for their key names
|
||||
pyautogui.keyUp(key)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def type_text(self, text: str) -> Dict[str, Any]:
|
||||
try:
|
||||
# use pynput for Unicode support
|
||||
self.keyboard.type(text)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def press_key(self, key: str) -> Dict[str, Any]:
|
||||
try:
|
||||
# use pyautogui for their key names
|
||||
pyautogui.press(key)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def hotkey(self, keys: List[str]) -> Dict[str, Any]:
|
||||
try:
|
||||
# use pyautogui for their key names
|
||||
pyautogui.hotkey(*keys)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Scrolling Actions
|
||||
async def scroll(self, x: int, y: int) -> Dict[str, Any]:
|
||||
try:
|
||||
self.mouse.scroll(x, y)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
|
||||
try:
|
||||
self.mouse.scroll(0, -clicks)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
|
||||
try:
|
||||
self.mouse.scroll(0, clicks)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Screen Actions
|
||||
async def screenshot(self) -> Dict[str, Any]:
|
||||
try:
|
||||
from PIL import Image
|
||||
|
||||
screenshot = pyautogui.screenshot()
|
||||
if not isinstance(screenshot, Image.Image):
|
||||
return {"success": False, "error": "Failed to capture screenshot"}
|
||||
|
||||
buffered = BytesIO()
|
||||
screenshot.save(buffered, format="PNG", optimize=True)
|
||||
buffered.seek(0)
|
||||
image_data = base64.b64encode(buffered.getvalue()).decode()
|
||||
return {"success": True, "image_data": image_data}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": f"Screenshot error: {str(e)}"}
|
||||
|
||||
async def get_screen_size(self) -> Dict[str, Any]:
|
||||
try:
|
||||
size = pyautogui.size()
|
||||
return {"success": True, "size": {"width": size.width, "height": size.height}}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def get_cursor_position(self) -> Dict[str, Any]:
|
||||
try:
|
||||
x, y = self.mouse.position
|
||||
return {"success": True, "position": {"x": x, "y": y}}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Clipboard Actions
|
||||
async def copy_to_clipboard(self) -> Dict[str, Any]:
|
||||
try:
|
||||
import pyperclip
|
||||
|
||||
content = pyperclip.paste()
|
||||
return {"success": True, "content": content}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def set_clipboard(self, text: str) -> Dict[str, Any]:
|
||||
try:
|
||||
import pyperclip
|
||||
|
||||
pyperclip.copy(text)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def run_command(self, command: str) -> Dict[str, Any]:
|
||||
"""Run a shell command and return its output."""
|
||||
try:
|
||||
import subprocess
|
||||
|
||||
process = subprocess.run(command, shell=True, capture_output=True, text=True)
|
||||
return {"success": True, "stdout": process.stdout, "stderr": process.stderr}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
@@ -0,0 +1,405 @@
|
||||
"""
|
||||
Windows implementation of automation and accessibility handlers.
|
||||
|
||||
This implementation uses pyautogui for GUI automation and Windows-specific APIs
|
||||
for accessibility and system operations.
|
||||
"""
|
||||
from typing import Dict, Any, List, Tuple, Optional
|
||||
import logging
|
||||
import subprocess
|
||||
import base64
|
||||
import os
|
||||
from io import BytesIO
|
||||
|
||||
# Configure logger
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Try to import pyautogui
|
||||
try:
|
||||
import pyautogui
|
||||
logger.info("pyautogui successfully imported, GUI automation available")
|
||||
except Exception as e:
|
||||
logger.error(f"pyautogui import failed: {str(e)}. GUI operations will not work.")
|
||||
pyautogui = None
|
||||
|
||||
# Try to import Windows-specific modules
|
||||
try:
|
||||
import win32gui
|
||||
import win32con
|
||||
import win32api
|
||||
logger.info("Windows API modules successfully imported")
|
||||
WINDOWS_API_AVAILABLE = True
|
||||
except Exception as e:
|
||||
logger.error(f"Windows API modules import failed: {str(e)}. Some Windows-specific features will be unavailable.")
|
||||
WINDOWS_API_AVAILABLE = False
|
||||
|
||||
from .base import BaseAccessibilityHandler, BaseAutomationHandler
|
||||
|
||||
class WindowsAccessibilityHandler(BaseAccessibilityHandler):
|
||||
"""Windows implementation of accessibility handler."""
|
||||
|
||||
async def get_accessibility_tree(self) -> Dict[str, Any]:
|
||||
"""Get the accessibility tree of the current window."""
|
||||
if not WINDOWS_API_AVAILABLE:
|
||||
return {"success": False, "error": "Windows API not available"}
|
||||
|
||||
try:
|
||||
# Get the foreground window
|
||||
hwnd = win32gui.GetForegroundWindow()
|
||||
if not hwnd:
|
||||
return {"success": False, "error": "No foreground window found"}
|
||||
|
||||
# Get window information
|
||||
window_text = win32gui.GetWindowText(hwnd)
|
||||
rect = win32gui.GetWindowRect(hwnd)
|
||||
|
||||
tree = {
|
||||
"role": "Window",
|
||||
"title": window_text,
|
||||
"position": {"x": rect[0], "y": rect[1]},
|
||||
"size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]},
|
||||
"children": []
|
||||
}
|
||||
|
||||
# Enumerate child windows
|
||||
def enum_child_proc(hwnd_child, children_list):
|
||||
try:
|
||||
child_text = win32gui.GetWindowText(hwnd_child)
|
||||
child_rect = win32gui.GetWindowRect(hwnd_child)
|
||||
child_class = win32gui.GetClassName(hwnd_child)
|
||||
|
||||
child_info = {
|
||||
"role": child_class,
|
||||
"title": child_text,
|
||||
"position": {"x": child_rect[0], "y": child_rect[1]},
|
||||
"size": {"width": child_rect[2] - child_rect[0], "height": child_rect[3] - child_rect[1]},
|
||||
"children": []
|
||||
}
|
||||
children_list.append(child_info)
|
||||
except Exception as e:
|
||||
logger.debug(f"Error getting child window info: {e}")
|
||||
return True
|
||||
|
||||
win32gui.EnumChildWindows(hwnd, enum_child_proc, tree["children"])
|
||||
|
||||
return {"success": True, "tree": tree}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting accessibility tree: {e}")
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def find_element(self, role: Optional[str] = None,
|
||||
title: Optional[str] = None,
|
||||
value: Optional[str] = None) -> Dict[str, Any]:
|
||||
"""Find an element in the accessibility tree by criteria."""
|
||||
if not WINDOWS_API_AVAILABLE:
|
||||
return {"success": False, "error": "Windows API not available"}
|
||||
|
||||
try:
|
||||
# Find window by title if specified
|
||||
if title:
|
||||
hwnd = win32gui.FindWindow(None, title)
|
||||
if hwnd:
|
||||
rect = win32gui.GetWindowRect(hwnd)
|
||||
return {
|
||||
"success": True,
|
||||
"element": {
|
||||
"role": "Window",
|
||||
"title": title,
|
||||
"position": {"x": rect[0], "y": rect[1]},
|
||||
"size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]}
|
||||
}
|
||||
}
|
||||
|
||||
# Find window by class name if role is specified
|
||||
if role:
|
||||
hwnd = win32gui.FindWindow(role, None)
|
||||
if hwnd:
|
||||
window_text = win32gui.GetWindowText(hwnd)
|
||||
rect = win32gui.GetWindowRect(hwnd)
|
||||
return {
|
||||
"success": True,
|
||||
"element": {
|
||||
"role": role,
|
||||
"title": window_text,
|
||||
"position": {"x": rect[0], "y": rect[1]},
|
||||
"size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]}
|
||||
}
|
||||
}
|
||||
|
||||
return {"success": False, "error": "Element not found"}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error finding element: {e}")
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
class WindowsAutomationHandler(BaseAutomationHandler):
|
||||
"""Windows implementation of automation handler using pyautogui and Windows APIs."""
|
||||
|
||||
# Mouse Actions
|
||||
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
|
||||
if not pyautogui:
|
||||
return {"success": False, "error": "pyautogui not available"}
|
||||
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
pyautogui.moveTo(x, y)
|
||||
pyautogui.mouseDown(button=button)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
|
||||
if not pyautogui:
|
||||
return {"success": False, "error": "pyautogui not available"}
|
||||
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
pyautogui.moveTo(x, y)
|
||||
pyautogui.mouseUp(button=button)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
|
||||
if not pyautogui:
|
||||
return {"success": False, "error": "pyautogui not available"}
|
||||
|
||||
try:
|
||||
pyautogui.moveTo(x, y)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||||
if not pyautogui:
|
||||
return {"success": False, "error": "pyautogui not available"}
|
||||
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
pyautogui.moveTo(x, y)
|
||||
pyautogui.click()
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||||
if not pyautogui:
|
||||
return {"success": False, "error": "pyautogui not available"}
|
||||
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
pyautogui.moveTo(x, y)
|
||||
pyautogui.rightClick()
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def double_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||||
if not pyautogui:
|
||||
return {"success": False, "error": "pyautogui not available"}
|
||||
|
||||
try:
|
||||
if x is not None and y is not None:
|
||||
pyautogui.moveTo(x, y)
|
||||
pyautogui.doubleClick(interval=0.1)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
|
||||
if not pyautogui:
|
||||
return {"success": False, "error": "pyautogui not available"}
|
||||
|
||||
try:
|
||||
pyautogui.dragTo(x, y, duration=duration, button=button)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def drag(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5) -> Dict[str, Any]:
|
||||
if not pyautogui:
|
||||
return {"success": False, "error": "pyautogui not available"}
|
||||
|
||||
try:
|
||||
if not path:
|
||||
return {"success": False, "error": "Path is empty"}
|
||||
|
||||
# Move to first position
|
||||
pyautogui.moveTo(*path[0])
|
||||
|
||||
# Drag through all positions
|
||||
for x, y in path[1:]:
|
||||
pyautogui.dragTo(x, y, duration=duration/len(path), button=button)
|
||||
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Keyboard Actions
|
||||
async def key_down(self, key: str) -> Dict[str, Any]:
|
||||
if not pyautogui:
|
||||
return {"success": False, "error": "pyautogui not available"}
|
||||
|
||||
try:
|
||||
pyautogui.keyDown(key)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def key_up(self, key: str) -> Dict[str, Any]:
|
||||
if not pyautogui:
|
||||
return {"success": False, "error": "pyautogui not available"}
|
||||
|
||||
try:
|
||||
pyautogui.keyUp(key)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def type_text(self, text: str) -> Dict[str, Any]:
|
||||
if not pyautogui:
|
||||
return {"success": False, "error": "pyautogui not available"}
|
||||
|
||||
try:
|
||||
pyautogui.write(text)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def press_key(self, key: str) -> Dict[str, Any]:
|
||||
if not pyautogui:
|
||||
return {"success": False, "error": "pyautogui not available"}
|
||||
|
||||
try:
|
||||
pyautogui.press(key)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def hotkey(self, keys: str) -> Dict[str, Any]:
|
||||
if not pyautogui:
|
||||
return {"success": False, "error": "pyautogui not available"}
|
||||
|
||||
try:
|
||||
pyautogui.hotkey(*keys)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Scrolling Actions
|
||||
async def scroll(self, x: int, y: int) -> Dict[str, Any]:
|
||||
if not pyautogui:
|
||||
return {"success": False, "error": "pyautogui not available"}
|
||||
|
||||
try:
|
||||
# pyautogui.scroll() only takes one parameter (vertical scroll)
|
||||
pyautogui.scroll(y)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
|
||||
if not pyautogui:
|
||||
return {"success": False, "error": "pyautogui not available"}
|
||||
|
||||
try:
|
||||
pyautogui.scroll(-clicks)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
|
||||
if not pyautogui:
|
||||
return {"success": False, "error": "pyautogui not available"}
|
||||
|
||||
try:
|
||||
pyautogui.scroll(clicks)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Screen Actions
|
||||
async def screenshot(self) -> Dict[str, Any]:
|
||||
if not pyautogui:
|
||||
return {"success": False, "error": "pyautogui not available"}
|
||||
|
||||
try:
|
||||
from PIL import Image
|
||||
screenshot = pyautogui.screenshot()
|
||||
if not isinstance(screenshot, Image.Image):
|
||||
return {"success": False, "error": "Failed to capture screenshot"}
|
||||
|
||||
buffered = BytesIO()
|
||||
screenshot.save(buffered, format="PNG", optimize=True)
|
||||
buffered.seek(0)
|
||||
image_data = base64.b64encode(buffered.getvalue()).decode()
|
||||
return {"success": True, "image_data": image_data}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": f"Screenshot error: {str(e)}"}
|
||||
|
||||
async def get_screen_size(self) -> Dict[str, Any]:
|
||||
try:
|
||||
if pyautogui:
|
||||
size = pyautogui.size()
|
||||
return {"success": True, "size": {"width": size.width, "height": size.height}}
|
||||
elif WINDOWS_API_AVAILABLE:
|
||||
# Fallback to Windows API
|
||||
width = win32api.GetSystemMetrics(win32con.SM_CXSCREEN)
|
||||
height = win32api.GetSystemMetrics(win32con.SM_CYSCREEN)
|
||||
return {"success": True, "size": {"width": width, "height": height}}
|
||||
else:
|
||||
return {"success": False, "error": "No screen size detection method available"}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def get_cursor_position(self) -> Dict[str, Any]:
|
||||
try:
|
||||
if pyautogui:
|
||||
pos = pyautogui.position()
|
||||
return {"success": True, "position": {"x": pos.x, "y": pos.y}}
|
||||
elif WINDOWS_API_AVAILABLE:
|
||||
# Fallback to Windows API
|
||||
pos = win32gui.GetCursorPos()
|
||||
return {"success": True, "position": {"x": pos[0], "y": pos[1]}}
|
||||
else:
|
||||
return {"success": False, "error": "No cursor position detection method available"}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Clipboard Actions
|
||||
async def copy_to_clipboard(self) -> Dict[str, Any]:
|
||||
try:
|
||||
import pyperclip
|
||||
content = pyperclip.paste()
|
||||
return {"success": True, "content": content}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
async def set_clipboard(self, text: str) -> Dict[str, Any]:
|
||||
try:
|
||||
import pyperclip
|
||||
pyperclip.copy(text)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Command Execution
|
||||
async def run_command(self, command: str) -> Dict[str, Any]:
|
||||
try:
|
||||
# Use cmd.exe for Windows commands
|
||||
process = subprocess.run(
|
||||
command,
|
||||
shell=True,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0
|
||||
)
|
||||
return {
|
||||
"success": True,
|
||||
"stdout": process.stdout,
|
||||
"stderr": process.stderr,
|
||||
"return_code": process.returncode
|
||||
}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
Reference in New Issue
Block a user