import pyautogui pyautogui.FAILSAFE = False from pynput.mouse import Button, Controller as MouseController from pynput.keyboard import Key, Controller as KeyboardController import time import base64 from io import BytesIO from typing import Optional, Dict, Any, List, Tuple from ctypes import byref, c_void_p, POINTER from AppKit import NSWorkspace # type: ignore import AppKit from Quartz.CoreGraphics import * # type: ignore from Quartz.CoreGraphics import CGPoint, CGSize # type: ignore import Foundation from ApplicationServices import ( AXUIElementCreateSystemWide, # type: ignore AXUIElementCreateApplication, # type: ignore AXUIElementCopyAttributeValue, # type: ignore AXUIElementCopyAttributeValues, # type: ignore kAXFocusedWindowAttribute, # type: ignore kAXWindowsAttribute, # type: ignore kAXMainWindowAttribute, # type: ignore kAXChildrenAttribute, # type: ignore kAXRoleAttribute, # type: ignore kAXTitleAttribute, # type: ignore kAXValueAttribute, # type: ignore kAXDescriptionAttribute, # type: ignore kAXEnabledAttribute, # type: ignore kAXPositionAttribute, # type: ignore kAXSizeAttribute, # type: ignore kAXErrorSuccess, # type: ignore AXValueGetType, # type: ignore kAXValueCGSizeType, # type: ignore kAXValueCGPointType, # type: ignore kAXValueCFRangeType, # type: ignore AXUIElementGetTypeID, # type: ignore AXValueGetValue, # type: ignore kAXVisibleChildrenAttribute, # type: ignore kAXRoleDescriptionAttribute, # type: ignore kAXFocusedApplicationAttribute, # type: ignore kAXFocusedUIElementAttribute, # type: ignore kAXSelectedTextAttribute, # type: ignore kAXSelectedTextRangeAttribute, # type: ignore ) import objc import re import json import copy import asyncio from .base import BaseAccessibilityHandler, BaseAutomationHandler import logging logger = logging.getLogger(__name__) # Constants for accessibility API kAXErrorSuccess = 0 kAXRoleAttribute = "AXRole" kAXTitleAttribute = "AXTitle" kAXValueAttribute = "AXValue" kAXWindowsAttribute = "AXWindows" kAXFocusedAttribute = "AXFocused" kAXPositionAttribute = "AXPosition" kAXSizeAttribute = "AXSize" kAXChildrenAttribute = "AXChildren" kAXMenuBarAttribute = "AXMenuBar" kAXMenuBarItemAttribute = "AXMenuBarItem" # Constants for window properties kCGWindowLayer = "kCGWindowLayer" # Z-order information (lower values are higher in the stack) kCGWindowAlpha = "kCGWindowAlpha" # Window opacity # Constants for application activation options NSApplicationActivationOptions = { "regular": 0, # Default activation "bringing_all_windows_forward": 1 << 0, # NSApplicationActivateAllWindows "ignoring_other_apps": 1 << 1 # NSApplicationActivateIgnoringOtherApps } def CFAttributeToPyObject(attrValue): """Convert Core Foundation attribute values to Python objects. Args: attrValue: Core Foundation attribute value to convert Returns: Converted Python object or None if conversion fails """ def list_helper(list_value): """Helper function to convert CF arrays to Python lists. Args: list_value: Core Foundation array to convert Returns: Python list containing converted items """ list_builder = [] for item in list_value: list_builder.append(CFAttributeToPyObject(item)) return list_builder def number_helper(number_value): """Helper function to convert CF numbers to Python numbers. Args: number_value: Core Foundation number to convert Returns: Python int or float, or None if conversion fails """ success, int_value = Foundation.CFNumberGetValue( # type: ignore number_value, Foundation.kCFNumberIntType, None # type: ignore ) if success: return int(int_value) success, float_value = Foundation.CFNumberGetValue( # type: ignore number_value, Foundation.kCFNumberDoubleType, None # type: ignore ) if success: return float(float_value) return None def axuielement_helper(element_value): """Helper function to handle AX UI elements. Args: element_value: Accessibility UI element to process Returns: The element value unchanged """ return element_value cf_attr_type = Foundation.CFGetTypeID(attrValue) # type: ignore cf_type_mapping = { Foundation.CFStringGetTypeID(): str, # type: ignore Foundation.CFBooleanGetTypeID(): bool, # type: ignore Foundation.CFArrayGetTypeID(): list_helper, # type: ignore Foundation.CFNumberGetTypeID(): number_helper, # type: ignore AXUIElementGetTypeID(): axuielement_helper, # type: ignore } try: return cf_type_mapping[cf_attr_type](attrValue) except KeyError: # did not get a supported CF type. Move on to AX type pass ax_attr_type = AXValueGetType(attrValue) ax_type_map = { kAXValueCGSizeType: Foundation.NSSizeFromString, # type: ignore kAXValueCGPointType: Foundation.NSPointFromString, # type: ignore kAXValueCFRangeType: Foundation.NSRangeFromString, # type: ignore } try: search_result = re.search("{.*}", attrValue.description()) if search_result: extracted_str = search_result.group() return tuple(ax_type_map[ax_attr_type](extracted_str)) return None except KeyError: return None def element_attribute(element, attribute): """Get an attribute value from an accessibility element. Args: element: The accessibility element attribute: The attribute name to retrieve Returns: The attribute value or None if not found """ if attribute == kAXChildrenAttribute: err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None) if err == kAXErrorSuccess: if isinstance(value, Foundation.NSArray): # type: ignore return CFAttributeToPyObject(value) else: return value err, value = AXUIElementCopyAttributeValue(element, attribute, None) if err == kAXErrorSuccess: if isinstance(value, Foundation.NSArray): # type: ignore return CFAttributeToPyObject(value) else: return value return None def element_value(element, type): """Extract a typed value from an accessibility element. Args: element: The accessibility element containing the value type: The expected value type Returns: The extracted value or None if extraction fails """ err, value = AXValueGetValue(element, type, None) if err == True: return value return None class UIElement: """Represents a UI element in the accessibility tree with position, size, and hierarchy information.""" def __init__(self, element, offset_x=0, offset_y=0, max_depth=None, parents_visible_bbox=None): """Initialize a UIElement from an accessibility element. Args: element: The accessibility element to wrap offset_x: X offset for position calculations offset_y: Y offset for position calculations max_depth: Maximum depth to traverse for children parents_visible_bbox: Parent's visible bounding box for clipping """ self.ax_element = element self.content_identifier = "" self.identifier = "" self.name = "" self.children = [] self.description = "" self.role_description = "" self.value = None self.max_depth = max_depth # Set role self.role = element_attribute(element, kAXRoleAttribute) if self.role is None: self.role = "No role" # Set name self.name = element_attribute(element, kAXTitleAttribute) if self.name is not None: # Convert tuple to string if needed if isinstance(self.name, tuple): self.name = str(self.name[0]) if self.name else "" self.name = self.name.replace(" ", "_") # Set enabled self.enabled = element_attribute(element, kAXEnabledAttribute) if self.enabled is None: self.enabled = False # Set position and size position = element_attribute(element, kAXPositionAttribute) size = element_attribute(element, kAXSizeAttribute) start_position = element_value(position, kAXValueCGPointType) if self.role == "AXWindow" and start_position is not None: offset_x = start_position.x offset_y = start_position.y self.absolute_position = copy.copy(start_position) self.position = start_position if self.position is not None: self.position.x -= max(0, offset_x) self.position.y -= max(0, offset_y) self.size = element_value(size, kAXValueCGSizeType) self._set_bboxes(parents_visible_bbox) # Set component center if start_position is None or self.size is None: print("Position is None") return self.center = ( start_position.x + offset_x + self.size.width / 2, start_position.y + offset_y + self.size.height / 2, ) self.description = element_attribute(element, kAXDescriptionAttribute) self.role_description = element_attribute(element, kAXRoleDescriptionAttribute) attribute_value = element_attribute(element, kAXValueAttribute) # Set value self.value = attribute_value if attribute_value is not None: if isinstance(attribute_value, Foundation.NSArray): # type: ignore self.value = [] for value in attribute_value: self.value.append(value) # Check if it's an accessibility element by checking its type ID elif Foundation.CFGetTypeID(attribute_value) == AXUIElementGetTypeID(): # type: ignore self.value = UIElement(attribute_value, offset_x, offset_y) # Set children if self.max_depth is None or self.max_depth > 0: self.children = self._get_children(element, start_position, offset_x, offset_y) else: self.children = [] self.calculate_hashes() def _set_bboxes(self, parents_visible_bbox): """Set bounding box and visible bounding box for the element. Args: parents_visible_bbox: Parent's visible bounding box for intersection calculation """ if not self.absolute_position or not self.size: self.bbox = None self.visible_bbox = None return self.bbox = [ int(self.absolute_position.x), int(self.absolute_position.y), int(self.absolute_position.x + self.size.width), int(self.absolute_position.y + self.size.height), ] if parents_visible_bbox: # check if not intersected if ( self.bbox[0] > parents_visible_bbox[2] or self.bbox[1] > parents_visible_bbox[3] or self.bbox[2] < parents_visible_bbox[0] or self.bbox[3] < parents_visible_bbox[1] ): self.visible_bbox = None else: self.visible_bbox = [ int(max(self.bbox[0], parents_visible_bbox[0])), int(max(self.bbox[1], parents_visible_bbox[1])), int(min(self.bbox[2], parents_visible_bbox[2])), int(min(self.bbox[3], parents_visible_bbox[3])), ] else: self.visible_bbox = self.bbox def _get_children(self, element, start_position, offset_x, offset_y): """Get child elements from the accessibility element. Args: element: The parent accessibility element start_position: Starting position for offset calculations offset_x: X offset for child positioning offset_y: Y offset for child positioning Returns: List of UIElement children """ children = element_attribute(element, kAXChildrenAttribute) visible_children = element_attribute(element, kAXVisibleChildrenAttribute) found_children = [] if children is not None: found_children.extend(children) else: if visible_children is not None: found_children.extend(visible_children) result = [] if self.max_depth is None or self.max_depth > 0: for child in found_children: child = UIElement( child, offset_x, offset_y, self.max_depth - 1 if self.max_depth is not None else None, self.visible_bbox, ) result.append(child) return result def calculate_hashes(self): """Calculate unique identifiers for the element and its content.""" self.identifier = self.component_hash() self.content_identifier = self.children_content_hash(self.children) def component_hash(self): """Generate a hash identifier for this component based on its properties. Returns: MD5 hash string of component properties """ if self.position is None or self.size is None: return "" position_string = f"{self.position.x:.0f};{self.position.y:.0f}" size_string = f"{self.size.width:.0f};{self.size.height:.0f}" enabled_string = str(self.enabled) # Ensure role is a string role_string = "" if self.role is not None: role_string = str(self.role[0]) if isinstance(self.role, tuple) else str(self.role) return self.hash_from_string(position_string + size_string + enabled_string + role_string) def hash_from_string(self, string): """Generate MD5 hash from a string. Args: string: Input string to hash Returns: MD5 hash hexdigest or empty string if input is None/empty """ if string is None or string == "": return "" from hashlib import md5 return md5(string.encode()).hexdigest() def children_content_hash(self, children): """Generate a hash representing the content and structure of child elements. Args: children: List of child UIElement objects Returns: Combined hash of children content and structure """ if len(children) == 0: return "" all_content_hashes = [] all_hashes = [] for child in children: all_content_hashes.append(child.content_identifier) all_hashes.append(child.identifier) all_content_hashes.sort() if len(all_content_hashes) == 0: return "" content_hash = self.hash_from_string("".join(all_content_hashes)) content_structure_hash = self.hash_from_string("".join(all_hashes)) return self.hash_from_string(content_hash.join(content_structure_hash)) def to_dict(self): """Convert the UIElement to a dictionary representation. Returns: Dictionary containing all element properties and children """ def children_to_dict(children): """Convert list of children to dictionary format. Args: children: List of UIElement children to convert Returns: List of dictionaries representing the children """ result = [] for child in children: result.append(child.to_dict()) return result value = self.value if isinstance(value, UIElement): value = json.dumps(value.to_dict(), indent=4) elif isinstance(value, AppKit.NSDate): # type: ignore value = str(value) if self.absolute_position is not None: absolute_position = f"{self.absolute_position.x:.2f};{self.absolute_position.y:.2f}" else: absolute_position = "" if self.position is not None: position = f"{self.position.x:.2f};{self.position.y:.2f}" else: position = "" if self.size is not None: size = f"{self.size.width:.0f};{self.size.height:.0f}" else: size = "" return { "id": self.identifier, "name": self.name, "role": self.role, "description": self.description, "role_description": self.role_description, "value": value, "absolute_position": absolute_position, "position": position, "size": size, "enabled": self.enabled, "bbox": self.bbox, "visible_bbox": self.visible_bbox, "children": children_to_dict(self.children), } import Quartz from AppKit import NSWorkspace, NSRunningApplication from pathlib import Path def get_all_windows_zorder(): """Get all windows in the system with their z-order information. Returns: List of window dictionaries sorted by z-index, containing window properties like id, name, pid, owner, bounds, layer, and opacity """ window_list = Quartz.CGWindowListCopyWindowInfo( Quartz.kCGWindowListOptionOnScreenOnly, Quartz.kCGNullWindowID ) z_order = {window['kCGWindowNumber']: z_index for z_index, window in enumerate(window_list[::-1])} window_list_all = Quartz.CGWindowListCopyWindowInfo( Quartz.kCGWindowListOptionAll, Quartz.kCGNullWindowID ) windows = [] for window in window_list_all: window_id = window.get('kCGWindowNumber', 0) window_name = window.get('kCGWindowName', '') window_pid = window.get('kCGWindowOwnerPID', 0) window_bounds = window.get('kCGWindowBounds', {}) window_owner = window.get('kCGWindowOwnerName', '') window_is_on_screen = window.get('kCGWindowIsOnscreen', False) layer = window.get('kCGWindowLayer', 0) opacity = window.get('kCGWindowAlpha', 1.0) z_index = z_order.get(window_id, -1) if window_name == "Dock" and window_owner == "Dock": role = "dock" elif window_name == "Menubar" and window_owner == "Window Server": role = "menubar" elif window_owner in ["Window Server", "Dock"]: role = "desktop" else: role = "app" if window_bounds: windows.append({ "id": window_id, "name": window_name or "Unnamed Window", "pid": window_pid, "owner": window_owner, "role": role, "is_on_screen": window_is_on_screen, "bounds": { "x": window_bounds.get('X', 0), "y": window_bounds.get('Y', 0), "width": window_bounds.get('Width', 0), "height": window_bounds.get('Height', 0) }, "layer": layer, "z_index": z_index, "opacity": opacity }) windows = sorted(windows, key=lambda x: x["z_index"]) return windows def get_app_info(app): """Extract information from an NSRunningApplication object. Args: app: NSRunningApplication instance Returns: Dictionary containing app name, bundle ID, PID, and status flags """ return { "name": app.localizedName(), "bundle_id": app.bundleIdentifier(), "pid": app.processIdentifier(), "active": app.isActive(), "hidden": app.isHidden(), "terminated": app.isTerminated(), } def get_menubar_items(active_app_pid=None): """Get menubar items for the active application. Args: active_app_pid: Process ID of the active application, or None to use frontmost app Returns: List of menubar item dictionaries with title, bounds, index, and app_pid """ menubar_items = [] if active_app_pid is None: frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication() if frontmost_app: active_app_pid = frontmost_app.processIdentifier() else: return menubar_items app_element = AXUIElementCreateApplication(active_app_pid) if app_element is None: return menubar_items menubar = element_attribute(app_element, kAXMenuBarAttribute) if menubar is None: return menubar_items children = element_attribute(menubar, kAXChildrenAttribute) if children is None: return menubar_items for i, item in enumerate(children): title = element_attribute(item, kAXTitleAttribute) or "Untitled" bounds = {"x": 0, "y": 0, "width": 0, "height": 0} position_value = element_attribute(item, kAXPositionAttribute) if position_value: position_value = element_value(position_value, kAXValueCGPointType) bounds["x"] = getattr(position_value, 'x', 0) bounds["y"] = getattr(position_value, 'y', 0) size_value = element_attribute(item, kAXSizeAttribute) if size_value: size_value = element_value(size_value, kAXValueCGSizeType) bounds["width"] = getattr(size_value, 'width', 0) bounds["height"] = getattr(size_value, 'height', 0) menubar_items.append({ "title": title, "bounds": bounds, "index": i, "app_pid": active_app_pid }) return menubar_items def get_dock_items(): """Get all items in the macOS Dock. Returns: List of dock item dictionaries with title, description, bounds, index, type, role, and subrole information """ dock_items = [] dock_pid = None running_apps = NSWorkspace.sharedWorkspace().runningApplications() for app in running_apps: if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock": dock_pid = app.processIdentifier() break if dock_pid is None: return dock_items dock_element = AXUIElementCreateApplication(dock_pid) if dock_element is None: return dock_items dock_list = element_attribute(dock_element, kAXChildrenAttribute) if dock_list is None or len(dock_list) == 0: return dock_items dock_app_list = None for child in dock_list: role = element_attribute(child, kAXRoleAttribute) if role == "AXList": dock_app_list = child break if dock_app_list is None: return dock_items items = element_attribute(dock_app_list, kAXChildrenAttribute) if items is None: return dock_items for i, item in enumerate(items): title = element_attribute(item, kAXTitleAttribute) or "Untitled" description = element_attribute(item, kAXDescriptionAttribute) or "" role = element_attribute(item, kAXRoleAttribute) or "" subrole = element_attribute(item, "AXSubrole") or "" bounds = {"x": 0, "y": 0, "width": 0, "height": 0} position_value = element_attribute(item, kAXPositionAttribute) if position_value: position_value = element_value(position_value, kAXValueCGPointType) bounds["x"] = getattr(position_value, 'x', 0) bounds["y"] = getattr(position_value, 'y', 0) size_value = element_attribute(item, kAXSizeAttribute) if size_value: size_value = element_value(size_value, kAXValueCGSizeType) bounds["width"] = getattr(size_value, 'width', 0) bounds["height"] = getattr(size_value, 'height', 0) item_type = "unknown" if subrole == "AXApplicationDockItem": item_type = "application" elif subrole == "AXFolderDockItem": item_type = "folder" elif subrole == "AXDocumentDockItem": item_type = "document" elif subrole == "AXSeparatorDockItem" or role == "AXSeparator": item_type = "separator" elif "trash" in title.lower(): item_type = "trash" dock_items.append({ "title": title, "description": description, "bounds": bounds, "index": i, "type": item_type, "role": role, "subrole": subrole }) return dock_items class MacOSAccessibilityHandler(BaseAccessibilityHandler): """Handler for macOS accessibility features and UI element inspection.""" def get_desktop_state(self): """Get the current state of the desktop including windows, apps, menubar, and dock. Returns: Dictionary containing applications, windows, menubar_items, and dock_items """ windows = [w for w in get_all_windows_zorder() if w.get("is_on_screen")] running_apps = self.get_running_apps() applications = [] pid_to_window_ids = {} # Build a mapping: pid -> list of AX window trees pid_to_ax_trees = {} for app in running_apps: pid = app.processIdentifier() try: app_elem = AXUIElementCreateApplication(pid) err, app_windows = AXUIElementCopyAttributeValue(app_elem, kAXWindowsAttribute, None) trees = [] if err == kAXErrorSuccess and app_windows: for ax_win in app_windows: try: trees.append(UIElement(ax_win).to_dict()) except Exception as e: trees.append({"error": str(e)}) pid_to_ax_trees[pid] = trees except Exception as e: pid_to_ax_trees[pid] = [{"error": str(e)}] # Attach children by pid and index (order) pid_to_idx = {} for win in windows: pid = win["pid"] idx = pid_to_idx.get(pid, 0) ax_trees = pid_to_ax_trees.get(pid, []) win["children"] = ax_trees[idx]["children"] if idx < len(ax_trees) and "children" in ax_trees[idx] else [] pid_to_idx[pid] = idx + 1 pid_to_window_ids.setdefault(pid, []).append(win["id"]) for app in running_apps: info = get_app_info(app) app_pid = info["pid"] applications.append({ "info": info, "windows": pid_to_window_ids.get(app_pid, []) }) menubar_items = get_menubar_items() dock_items = get_dock_items() return { "applications": applications, "windows": windows, "menubar_items": menubar_items, "dock_items": dock_items } def get_application_windows(self, pid: int): """Get all windows for a specific application. Args: pid: Process ID of the application Returns: List of accessibility window elements or empty list if none found """ try: app = AXUIElementCreateApplication(pid) err, windows = AXUIElementCopyAttributeValue(app, kAXWindowsAttribute, None) if err == kAXErrorSuccess and windows: if isinstance(windows, Foundation.NSArray): # type: ignore return windows return [] except: return [] def get_all_windows(self): """Get all visible windows in the system. Returns: List of window dictionaries with app information and window details """ try: windows = [] running_apps = self.get_running_apps() for app in running_apps: try: app_name = app.localizedName() pid = app.processIdentifier() # Skip system processes and background apps if not app.activationPolicy() == 0: # NSApplicationActivationPolicyRegular continue # Get application windows app_windows = self.get_application_windows(pid) windows.append( { "app_name": app_name, "pid": pid, "frontmost": app.isActive(), "has_windows": len(app_windows) > 0, "windows": app_windows, } ) except: continue return windows except: return [] def get_running_apps(self): """Get all currently running applications. Returns: List of NSRunningApplication objects """ # From NSWorkspace.runningApplications docs: https://developer.apple.com/documentation/appkit/nsworkspace/runningapplications # "Similar to the NSRunningApplication class's properties, this property will only change when the main run loop runs in a common mode" # So we need to run the main run loop to get the latest running applications Foundation.CFRunLoopRunInMode(Foundation.kCFRunLoopDefaultMode, 0.1, False) # type: ignore return NSWorkspace.sharedWorkspace().runningApplications() def get_ax_attribute(self, element, attribute): """Get an accessibility attribute from an element. Args: element: The accessibility element attribute: The attribute name to retrieve Returns: The attribute value or None if not found """ return element_attribute(element, attribute) def serialize_node(self, element): """Create a serializable dictionary representation of an accessibility element. Args: element: The accessibility element to serialize Returns: Dictionary containing element properties like role, title, value, position, and size """ # Create a serializable dictionary representation of an accessibility element result = {} # Get basic attributes result["role"] = self.get_ax_attribute(element, kAXRoleAttribute) result["title"] = self.get_ax_attribute(element, kAXTitleAttribute) result["value"] = self.get_ax_attribute(element, kAXValueAttribute) # Get position and size if available position = self.get_ax_attribute(element, kAXPositionAttribute) if position: try: position_dict = {"x": position[0], "y": position[1]} result["position"] = position_dict except (IndexError, TypeError): pass size = self.get_ax_attribute(element, kAXSizeAttribute) if size: try: size_dict = {"width": size[0], "height": size[1]} result["size"] = size_dict except (IndexError, TypeError): pass return result async def get_accessibility_tree(self) -> Dict[str, Any]: """Get the complete accessibility tree for the current desktop state. Returns: Dictionary containing success status and desktop state information """ try: desktop_state = self.get_desktop_state() return { "success": True, **desktop_state } except Exception as e: return {"success": False, "error": str(e)} async def find_element( self, role: Optional[str] = None, title: Optional[str] = None, value: Optional[str] = None ) -> Dict[str, Any]: """Find an accessibility element matching the specified criteria. Args: role: The accessibility role to match (optional) title: The title to match (optional) value: The value to match (optional) Returns: Dictionary containing success status and the found element or error message """ try: system = AXUIElementCreateSystemWide() def match_element(element): """Check if an element matches the search criteria. Args: element: The accessibility element to check Returns: True if element matches all specified criteria, False otherwise """ if role and self.get_ax_attribute(element, kAXRoleAttribute) != role: return False if title and self.get_ax_attribute(element, kAXTitleAttribute) != title: return False if value and str(self.get_ax_attribute(element, kAXValueAttribute)) != value: return False return True def search_tree(element): """Recursively search the accessibility tree for matching elements. Args: element: The accessibility element to search from Returns: Serialized element dictionary if match found, None otherwise """ if match_element(element): return self.serialize_node(element) children = self.get_ax_attribute(element, kAXChildrenAttribute) if children: for child in children: result = search_tree(child) if result: return result return None element = search_tree(system) return {"success": True, "element": element} except Exception as e: return {"success": False, "error": str(e)} class MacOSAutomationHandler(BaseAutomationHandler): """Handler for macOS automation including mouse, keyboard, and screen operations.""" # Mouse Actions mouse = MouseController() keyboard = KeyboardController() async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]: """Press and hold a mouse button at the specified coordinates. Args: x: X coordinate (optional, uses current position if None) y: Y coordinate (optional, uses current position if None) button: Mouse button to press ("left", "right", or "middle") Returns: Dictionary containing success status and error message if failed """ try: if x is not None and y is not None: self.mouse.position = (x, y) self.mouse.press(Button.left if button == "left" else Button.right if button == "right" else Button.middle) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]: """Release a mouse button at the specified coordinates. Args: x: X coordinate (optional, uses current position if None) y: Y coordinate (optional, uses current position if None) button: Mouse button to release ("left", "right", or "middle") Returns: Dictionary containing success status and error message if failed """ try: if x is not None and y is not None: self.mouse.position = (x, y) self.mouse.release(Button.left if button == "left" else Button.right if button == "right" else Button.middle) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]: """Perform a left mouse click at the specified coordinates. Args: x: X coordinate (optional, uses current position if None) y: Y coordinate (optional, uses current position if None) Returns: Dictionary containing success status and error message if failed """ try: if x is not None and y is not None: self.mouse.position = (x, y) self.mouse.click(Button.left, 1) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]: """Perform a right mouse click at the specified coordinates. Args: x: X coordinate (optional, uses current position if None) y: Y coordinate (optional, uses current position if None) Returns: Dictionary containing success status and error message if failed """ try: if x is not None and y is not None: self.mouse.position = (x, y) self.mouse.click(Button.right, 1) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def double_click( self, x: Optional[int] = None, y: Optional[int] = None ) -> Dict[str, Any]: """Perform a double left mouse click at the specified coordinates. Args: x: X coordinate (optional, uses current position if None) y: Y coordinate (optional, uses current position if None) Returns: Dictionary containing success status and error message if failed """ try: if x is not None and y is not None: self.mouse.position = (x, y) self.mouse.click(Button.left, 2) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def move_cursor(self, x: int, y: int) -> Dict[str, Any]: """Move the mouse cursor to the specified coordinates. Args: x: Target X coordinate y: Target Y coordinate Returns: Dictionary containing success status and error message if failed """ try: self.mouse.position = (x, y) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def drag_to( self, x: int, y: int, button: str = "left", duration: float = 0.5 ) -> Dict[str, Any]: """Drag from current position to target coordinates. Args: x: Target X coordinate y: Target Y coordinate button: Mouse button to use for dragging ("left", "right", or "middle") duration: Duration of the drag operation in seconds Returns: Dictionary containing success status and error message if failed """ try: btn = Button.left if button == "left" else Button.right if button == "right" else Button.middle # Press self.mouse.press(btn) # Move with sleep to simulate drag duration start = self.mouse.position steps = 20 start_x, start_y = start dx = (x - start_x) / steps dy = (y - start_y) / steps for i in range(steps): self.mouse.position = (int(start_x + dx * (i + 1)), int(start_y + dy * (i + 1))) time.sleep(duration / steps) # Release self.mouse.release(btn) return {"success": True} except Exception as e: try: self.mouse.release(btn) except: pass return {"success": False, "error": str(e)} async def drag( self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5 ) -> Dict[str, Any]: """Drag the mouse along a specified path of coordinates. Args: path: List of (x, y) coordinate tuples defining the drag path button: Mouse button to use for dragging ("left", "right", or "middle") duration: Total duration of the drag operation in seconds Returns: Dictionary containing success status and error message if failed """ try: if not path or len(path) < 2: return {"success": False, "error": "Path must contain at least 2 points"} btn = Button.left if button == "left" else Button.right if button == "right" else Button.middle # Move to the first point self.mouse.position = path[0] self.mouse.press(btn) step_duration = duration / (len(path) - 1) if len(path) > 1 else duration for x, y in path[1:]: self.mouse.position = (x, y) time.sleep(step_duration) self.mouse.release(btn) return {"success": True} except Exception as e: try: self.mouse.release(btn) except: pass return {"success": False, "error": str(e)} # Keyboard Actions async def key_down(self, key: str) -> Dict[str, Any]: """Press and hold a keyboard key. Args: key: Key name to press (using pyautogui key names) Returns: Dictionary containing success status and error message if failed """ try: # use pyautogui for their key names pyautogui.keyDown(key) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def key_up(self, key: str) -> Dict[str, Any]: """Release a keyboard key. Args: key: Key name to release (using pyautogui key names) Returns: Dictionary containing success status and error message if failed """ try: # use pyautogui for their key names pyautogui.keyUp(key) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def type_text(self, text: str) -> Dict[str, Any]: """Type text using the keyboard with Unicode support. Args: text: Text string to type Returns: Dictionary containing success status and error message if failed """ try: # use pynput for Unicode support self.keyboard.type(text) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def press_key(self, key: str) -> Dict[str, Any]: """Press and release a keyboard key. Args: key: Key name to press (using pyautogui key names) Returns: Dictionary containing success status and error message if failed """ try: # use pyautogui for their key names pyautogui.press(key) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def hotkey(self, keys: List[str]) -> Dict[str, Any]: """Press a combination of keys simultaneously. Args: keys: List of key names to press together (using pyautogui key names) Returns: Dictionary containing success status and error message if failed """ try: # use pyautogui for their key names pyautogui.hotkey(*keys) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} # Scrolling Actions async def scroll(self, x: int, y: int) -> Dict[str, Any]: """Scroll the mouse wheel in the specified direction. Args: x: Horizontal scroll amount y: Vertical scroll amount (positive for up, negative for down) Returns: Dictionary containing success status and error message if failed """ try: self.mouse.scroll(x, y) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]: """Scroll down by the specified number of clicks. Args: clicks: Number of scroll clicks to perform Returns: Dictionary containing success status and error message if failed """ try: self.mouse.scroll(0, -clicks) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]: """Scroll up by the specified number of clicks. Args: clicks: Number of scroll clicks to perform Returns: Dictionary containing success status and error message if failed """ try: self.mouse.scroll(0, clicks) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} # Screen Actions async def screenshot(self) -> Dict[str, Any]: """Capture a screenshot of the current screen. Returns: Dictionary containing success status and base64-encoded image data or error message """ try: from PIL import Image screenshot = pyautogui.screenshot() if not isinstance(screenshot, Image.Image): return {"success": False, "error": "Failed to capture screenshot"} buffered = BytesIO() screenshot.save(buffered, format="PNG", optimize=True) buffered.seek(0) image_data = base64.b64encode(buffered.getvalue()).decode() return {"success": True, "image_data": image_data} except Exception as e: return {"success": False, "error": f"Screenshot error: {str(e)}"} async def get_screen_size(self) -> Dict[str, Any]: """Get the dimensions of the current screen. Returns: Dictionary containing success status and screen size or error message """ try: size = pyautogui.size() return {"success": True, "size": {"width": size.width, "height": size.height}} except Exception as e: return {"success": False, "error": str(e)} async def get_cursor_position(self) -> Dict[str, Any]: """Get the current position of the mouse cursor. Returns: Dictionary containing success status and cursor position or error message """ try: x, y = self.mouse.position return {"success": True, "position": {"x": x, "y": y}} except Exception as e: return {"success": False, "error": str(e)} # Clipboard Actions async def copy_to_clipboard(self) -> Dict[str, Any]: """Get the current content of the system clipboard. Returns: Dictionary containing success status and clipboard content or error message """ try: import pyperclip content = pyperclip.paste() return {"success": True, "content": content} except Exception as e: return {"success": False, "error": str(e)} async def set_clipboard(self, text: str) -> Dict[str, Any]: """Set the content of the system clipboard. Args: text: Text to copy to the clipboard Returns: Dictionary containing success status and error message if failed """ try: import pyperclip pyperclip.copy(text) return {"success": True} except Exception as e: return {"success": False, "error": str(e)} async def run_command(self, command: str) -> Dict[str, Any]: """Run a shell command and return its output. Args: command: Shell command to execute Returns: Dictionary containing success status, stdout, stderr, and return code """ try: # Create subprocess process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) # Wait for the subprocess to finish stdout, stderr = await process.communicate() # Return decoded output return { "success": True, "stdout": stdout.decode() if stdout else "", "stderr": stderr.decode() if stderr else "", "return_code": process.returncode } except Exception as e: return {"success": False, "error": str(e)}