From 84de38bac9224480fda6f2eaeb964a68dbeff0bb Mon Sep 17 00:00:00 2001 From: Dillon DuPont Date: Fri, 13 Jun 2025 11:36:46 -0400 Subject: [PATCH] Add windows computer-server --- .../computer_server/handlers/factory.py | 22 +- .../computer_server/handlers/windows.py | 405 ++++++++++++++++++ libs/computer-server/pyproject.toml | 8 +- 3 files changed, 425 insertions(+), 10 deletions(-) create mode 100644 libs/computer-server/computer_server/handlers/windows.py diff --git a/libs/computer-server/computer_server/handlers/factory.py b/libs/computer-server/computer_server/handlers/factory.py index 5a9dc414..31d02fee 100644 --- a/libs/computer-server/computer_server/handlers/factory.py +++ b/libs/computer-server/computer_server/handlers/factory.py @@ -11,6 +11,8 @@ if system == 'darwin': from computer_server.diorama.macos import MacOSDioramaHandler elif system == 'linux': from .linux import LinuxAccessibilityHandler, LinuxAutomationHandler +elif system == 'windows': + from .windows import WindowsAccessibilityHandler, WindowsAutomationHandler from .generic import GenericFileHandler @@ -22,7 +24,7 @@ class HandlerFactory: """Determine the current OS. Returns: - str: The OS type ('darwin' for macOS or 'linux' for Linux) + str: The OS type ('darwin' for macOS, 'linux' for Linux, or 'windows' for Windows) Raises: RuntimeError: If unable to determine the current OS @@ -31,13 +33,15 @@ class HandlerFactory: # Use platform.system() as primary method system = platform.system().lower() if system in ['darwin', 'linux', 'windows']: - return 'darwin' if system == 'darwin' else 'linux' if system == 'linux' else 'windows' + return system - # Fallback to uname if platform.system() doesn't return expected values - result = subprocess.run(['uname', '-s'], capture_output=True, text=True) - if result.returncode != 0: - raise RuntimeError(f"uname command failed: {result.stderr}") - return result.stdout.strip().lower() + # Fallback to uname if platform.system() doesn't return expected values (Unix-like systems only) + if system != 'windows': + result = subprocess.run(['uname', '-s'], capture_output=True, text=True) + if result.returncode == 0: + return result.stdout.strip().lower() + + raise RuntimeError(f"Unsupported OS: {system}") except Exception as e: raise RuntimeError(f"Failed to determine current OS: {str(e)}") @@ -59,5 +63,7 @@ class HandlerFactory: return MacOSAccessibilityHandler(), MacOSAutomationHandler(), MacOSDioramaHandler(), GenericFileHandler() elif os_type == 'linux': return LinuxAccessibilityHandler(), LinuxAutomationHandler(), BaseDioramaHandler(), GenericFileHandler() + elif os_type == 'windows': + return WindowsAccessibilityHandler(), WindowsAutomationHandler(), BaseDioramaHandler(), GenericFileHandler() else: - raise NotImplementedError(f"OS '{os_type}' is not supported") \ No newline at end of file + raise NotImplementedError(f"OS '{os_type}' is not supported") diff --git a/libs/computer-server/computer_server/handlers/windows.py b/libs/computer-server/computer_server/handlers/windows.py new file mode 100644 index 00000000..269b97b6 --- /dev/null +++ b/libs/computer-server/computer_server/handlers/windows.py @@ -0,0 +1,405 @@ +""" +Windows implementation of automation and accessibility handlers. + +This implementation uses pyautogui for GUI automation and Windows-specific APIs +for accessibility and system operations. +""" +from typing import Dict, Any, List, Tuple, Optional +import logging +import subprocess +import base64 +import os +from io import BytesIO + +# Configure logger +logger = logging.getLogger(__name__) + +# Try to import pyautogui +try: + import pyautogui + logger.info("pyautogui successfully imported, GUI automation available") +except Exception as e: + logger.error(f"pyautogui import failed: {str(e)}. GUI operations will not work.") + pyautogui = None + +# Try to import Windows-specific modules +try: + import win32gui + import win32con + import win32api + logger.info("Windows API modules successfully imported") + WINDOWS_API_AVAILABLE = True +except Exception as e: + logger.error(f"Windows API modules import failed: {str(e)}. Some Windows-specific features will be unavailable.") + WINDOWS_API_AVAILABLE = False + +from .base import BaseAccessibilityHandler, BaseAutomationHandler + +class WindowsAccessibilityHandler(BaseAccessibilityHandler): + """Windows implementation of accessibility handler.""" + + async def get_accessibility_tree(self) -> Dict[str, Any]: + """Get the accessibility tree of the current window.""" + if not WINDOWS_API_AVAILABLE: + return {"success": False, "error": "Windows API not available"} + + try: + # Get the foreground window + hwnd = win32gui.GetForegroundWindow() + if not hwnd: + return {"success": False, "error": "No foreground window found"} + + # Get window information + window_text = win32gui.GetWindowText(hwnd) + rect = win32gui.GetWindowRect(hwnd) + + tree = { + "role": "Window", + "title": window_text, + "position": {"x": rect[0], "y": rect[1]}, + "size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]}, + "children": [] + } + + # Enumerate child windows + def enum_child_proc(hwnd_child, children_list): + try: + child_text = win32gui.GetWindowText(hwnd_child) + child_rect = win32gui.GetWindowRect(hwnd_child) + child_class = win32gui.GetClassName(hwnd_child) + + child_info = { + "role": child_class, + "title": child_text, + "position": {"x": child_rect[0], "y": child_rect[1]}, + "size": {"width": child_rect[2] - child_rect[0], "height": child_rect[3] - child_rect[1]}, + "children": [] + } + children_list.append(child_info) + except Exception as e: + logger.debug(f"Error getting child window info: {e}") + return True + + win32gui.EnumChildWindows(hwnd, enum_child_proc, tree["children"]) + + return {"success": True, "tree": tree} + + except Exception as e: + logger.error(f"Error getting accessibility tree: {e}") + return {"success": False, "error": str(e)} + + async def find_element(self, role: Optional[str] = None, + title: Optional[str] = None, + value: Optional[str] = None) -> Dict[str, Any]: + """Find an element in the accessibility tree by criteria.""" + if not WINDOWS_API_AVAILABLE: + return {"success": False, "error": "Windows API not available"} + + try: + # Find window by title if specified + if title: + hwnd = win32gui.FindWindow(None, title) + if hwnd: + rect = win32gui.GetWindowRect(hwnd) + return { + "success": True, + "element": { + "role": "Window", + "title": title, + "position": {"x": rect[0], "y": rect[1]}, + "size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]} + } + } + + # Find window by class name if role is specified + if role: + hwnd = win32gui.FindWindow(role, None) + if hwnd: + window_text = win32gui.GetWindowText(hwnd) + rect = win32gui.GetWindowRect(hwnd) + return { + "success": True, + "element": { + "role": role, + "title": window_text, + "position": {"x": rect[0], "y": rect[1]}, + "size": {"width": rect[2] - rect[0], "height": rect[3] - rect[1]} + } + } + + return {"success": False, "error": "Element not found"} + + except Exception as e: + logger.error(f"Error finding element: {e}") + return {"success": False, "error": str(e)} + +class WindowsAutomationHandler(BaseAutomationHandler): + """Windows implementation of automation handler using pyautogui and Windows APIs.""" + + # Mouse Actions + async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]: + if not pyautogui: + return {"success": False, "error": "pyautogui not available"} + + try: + if x is not None and y is not None: + pyautogui.moveTo(x, y) + pyautogui.mouseDown(button=button) + return {"success": True} + except Exception as e: + return {"success": False, "error": str(e)} + + async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]: + if not pyautogui: + return {"success": False, "error": "pyautogui not available"} + + try: + if x is not None and y is not None: + pyautogui.moveTo(x, y) + pyautogui.mouseUp(button=button) + return {"success": True} + except Exception as e: + return {"success": False, "error": str(e)} + + async def move_cursor(self, x: int, y: int) -> Dict[str, Any]: + if not pyautogui: + return {"success": False, "error": "pyautogui not available"} + + try: + pyautogui.moveTo(x, y) + return {"success": True} + except Exception as e: + return {"success": False, "error": str(e)} + + async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]: + if not pyautogui: + return {"success": False, "error": "pyautogui not available"} + + try: + if x is not None and y is not None: + pyautogui.moveTo(x, y) + pyautogui.click() + return {"success": True} + except Exception as e: + return {"success": False, "error": str(e)} + + async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]: + if not pyautogui: + return {"success": False, "error": "pyautogui not available"} + + try: + if x is not None and y is not None: + pyautogui.moveTo(x, y) + pyautogui.rightClick() + return {"success": True} + except Exception as e: + return {"success": False, "error": str(e)} + + async def double_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]: + if not pyautogui: + return {"success": False, "error": "pyautogui not available"} + + try: + if x is not None and y is not None: + pyautogui.moveTo(x, y) + pyautogui.doubleClick(interval=0.1) + return {"success": True} + except Exception as e: + return {"success": False, "error": str(e)} + + async def drag_to(self, x: int, y: int, button: str = "left", duration: float = 0.5) -> Dict[str, Any]: + if not pyautogui: + return {"success": False, "error": "pyautogui not available"} + + try: + pyautogui.dragTo(x, y, duration=duration, button=button) + return {"success": True} + except Exception as e: + return {"success": False, "error": str(e)} + + async def drag(self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5) -> Dict[str, Any]: + if not pyautogui: + return {"success": False, "error": "pyautogui not available"} + + try: + if not path: + return {"success": False, "error": "Path is empty"} + + # Move to first position + pyautogui.moveTo(*path[0]) + + # Drag through all positions + for x, y in path[1:]: + pyautogui.dragTo(x, y, duration=duration/len(path), button=button) + + return {"success": True} + except Exception as e: + return {"success": False, "error": str(e)} + + # Keyboard Actions + async def key_down(self, key: str) -> Dict[str, Any]: + if not pyautogui: + return {"success": False, "error": "pyautogui not available"} + + try: + pyautogui.keyDown(key) + return {"success": True} + except Exception as e: + return {"success": False, "error": str(e)} + + async def key_up(self, key: str) -> Dict[str, Any]: + if not pyautogui: + return {"success": False, "error": "pyautogui not available"} + + try: + pyautogui.keyUp(key) + return {"success": True} + except Exception as e: + return {"success": False, "error": str(e)} + + async def type_text(self, text: str) -> Dict[str, Any]: + if not pyautogui: + return {"success": False, "error": "pyautogui not available"} + + try: + pyautogui.write(text) + return {"success": True} + except Exception as e: + return {"success": False, "error": str(e)} + + async def press_key(self, key: str) -> Dict[str, Any]: + if not pyautogui: + return {"success": False, "error": "pyautogui not available"} + + try: + pyautogui.press(key) + return {"success": True} + except Exception as e: + return {"success": False, "error": str(e)} + + async def hotkey(self, *keys: str) -> Dict[str, Any]: + if not pyautogui: + return {"success": False, "error": "pyautogui not available"} + + try: + pyautogui.hotkey(*keys) + return {"success": True} + except Exception as e: + return {"success": False, "error": str(e)} + + # Scrolling Actions + async def scroll(self, x: int, y: int) -> Dict[str, Any]: + if not pyautogui: + return {"success": False, "error": "pyautogui not available"} + + try: + # pyautogui.scroll() only takes one parameter (vertical scroll) + pyautogui.scroll(y) + return {"success": True} + except Exception as e: + return {"success": False, "error": str(e)} + + async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]: + if not pyautogui: + return {"success": False, "error": "pyautogui not available"} + + try: + pyautogui.scroll(-clicks) + return {"success": True} + except Exception as e: + return {"success": False, "error": str(e)} + + async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]: + if not pyautogui: + return {"success": False, "error": "pyautogui not available"} + + try: + pyautogui.scroll(clicks) + return {"success": True} + except Exception as e: + return {"success": False, "error": str(e)} + + # Screen Actions + async def screenshot(self) -> Dict[str, Any]: + if not pyautogui: + return {"success": False, "error": "pyautogui not available"} + + try: + from PIL import Image + screenshot = pyautogui.screenshot() + if not isinstance(screenshot, Image.Image): + return {"success": False, "error": "Failed to capture screenshot"} + + buffered = BytesIO() + screenshot.save(buffered, format="PNG", optimize=True) + buffered.seek(0) + image_data = base64.b64encode(buffered.getvalue()).decode() + return {"success": True, "image_data": image_data} + except Exception as e: + return {"success": False, "error": f"Screenshot error: {str(e)}"} + + async def get_screen_size(self) -> Dict[str, Any]: + try: + if pyautogui: + size = pyautogui.size() + return {"success": True, "size": {"width": size.width, "height": size.height}} + elif WINDOWS_API_AVAILABLE: + # Fallback to Windows API + width = win32api.GetSystemMetrics(win32con.SM_CXSCREEN) + height = win32api.GetSystemMetrics(win32con.SM_CYSCREEN) + return {"success": True, "size": {"width": width, "height": height}} + else: + return {"success": False, "error": "No screen size detection method available"} + except Exception as e: + return {"success": False, "error": str(e)} + + async def get_cursor_position(self) -> Dict[str, Any]: + try: + if pyautogui: + pos = pyautogui.position() + return {"success": True, "position": {"x": pos.x, "y": pos.y}} + elif WINDOWS_API_AVAILABLE: + # Fallback to Windows API + pos = win32gui.GetCursorPos() + return {"success": True, "position": {"x": pos[0], "y": pos[1]}} + else: + return {"success": False, "error": "No cursor position detection method available"} + except Exception as e: + return {"success": False, "error": str(e)} + + # Clipboard Actions + async def copy_to_clipboard(self) -> Dict[str, Any]: + try: + import pyperclip + content = pyperclip.paste() + return {"success": True, "content": content} + except Exception as e: + return {"success": False, "error": str(e)} + + async def set_clipboard(self, text: str) -> Dict[str, Any]: + try: + import pyperclip + pyperclip.copy(text) + return {"success": True} + except Exception as e: + return {"success": False, "error": str(e)} + + # Command Execution + async def run_command(self, command: str) -> Dict[str, Any]: + try: + # Use cmd.exe for Windows commands + process = subprocess.run( + command, + shell=True, + capture_output=True, + text=True, + creationflags=subprocess.CREATE_NO_WINDOW if os.name == 'nt' else 0 + ) + return { + "success": True, + "stdout": process.stdout, + "stderr": process.stderr, + "return_code": process.returncode + } + except Exception as e: + return {"success": False, "error": str(e)} diff --git a/libs/computer-server/pyproject.toml b/libs/computer-server/pyproject.toml index cbf9821a..090da016 100644 --- a/libs/computer-server/pyproject.toml +++ b/libs/computer-server/pyproject.toml @@ -19,7 +19,8 @@ dependencies = [ "pyautogui>=0.9.54", "pynput>=1.8.1", "pillow>=10.2.0", - "aiohttp>=3.9.1" + "aiohttp>=3.9.1", + "pyperclip>=1.9.0" ] [project.optional-dependencies] @@ -31,6 +32,9 @@ macos = [ linux = [ "python-xlib>=0.33" ] +windows = [ + "pywin32>=310" +] [project.urls] homepage = "https://github.com/trycua/cua" @@ -80,4 +84,4 @@ disallow_untyped_defs = true check_untyped_defs = true warn_return_any = true show_error_codes = true -warn_unused_ignores = false \ No newline at end of file +warn_unused_ignores = false