mirror of
https://github.com/trycua/computer.git
synced 2026-01-01 11:00:31 -06:00
957 lines
36 KiB
Python
957 lines
36 KiB
Python
import pyautogui
|
||
pyautogui.FAILSAFE = False
|
||
from pynput.mouse import Button, Controller as MouseController
|
||
from pynput.keyboard import Key, Controller as KeyboardController
|
||
import time
|
||
import base64
|
||
from io import BytesIO
|
||
from typing import Optional, Dict, Any, List, Tuple
|
||
from ctypes import byref, c_void_p, POINTER
|
||
from AppKit import NSWorkspace # type: ignore
|
||
import AppKit
|
||
from Quartz.CoreGraphics import * # type: ignore
|
||
from Quartz.CoreGraphics import CGPoint, CGSize # type: ignore
|
||
import Foundation
|
||
from ApplicationServices import (
|
||
AXUIElementCreateSystemWide, # type: ignore
|
||
AXUIElementCreateApplication, # type: ignore
|
||
AXUIElementCopyAttributeValue, # type: ignore
|
||
AXUIElementCopyAttributeValues, # type: ignore
|
||
kAXFocusedWindowAttribute, # type: ignore
|
||
kAXWindowsAttribute, # type: ignore
|
||
kAXMainWindowAttribute, # type: ignore
|
||
kAXChildrenAttribute, # type: ignore
|
||
kAXRoleAttribute, # type: ignore
|
||
kAXTitleAttribute, # type: ignore
|
||
kAXValueAttribute, # type: ignore
|
||
kAXDescriptionAttribute, # type: ignore
|
||
kAXEnabledAttribute, # type: ignore
|
||
kAXPositionAttribute, # type: ignore
|
||
kAXSizeAttribute, # type: ignore
|
||
kAXErrorSuccess, # type: ignore
|
||
AXValueGetType, # type: ignore
|
||
kAXValueCGSizeType, # type: ignore
|
||
kAXValueCGPointType, # type: ignore
|
||
kAXValueCFRangeType, # type: ignore
|
||
AXUIElementGetTypeID, # type: ignore
|
||
AXValueGetValue, # type: ignore
|
||
kAXVisibleChildrenAttribute, # type: ignore
|
||
kAXRoleDescriptionAttribute, # type: ignore
|
||
kAXFocusedApplicationAttribute, # type: ignore
|
||
kAXFocusedUIElementAttribute, # type: ignore
|
||
kAXSelectedTextAttribute, # type: ignore
|
||
kAXSelectedTextRangeAttribute, # type: ignore
|
||
)
|
||
import objc
|
||
import re
|
||
import json
|
||
import copy
|
||
import asyncio
|
||
from .base import BaseAccessibilityHandler, BaseAutomationHandler
|
||
import logging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Constants for accessibility API
|
||
kAXErrorSuccess = 0
|
||
kAXRoleAttribute = "AXRole"
|
||
kAXTitleAttribute = "AXTitle"
|
||
kAXValueAttribute = "AXValue"
|
||
kAXWindowsAttribute = "AXWindows"
|
||
kAXFocusedAttribute = "AXFocused"
|
||
kAXPositionAttribute = "AXPosition"
|
||
kAXSizeAttribute = "AXSize"
|
||
kAXChildrenAttribute = "AXChildren"
|
||
kAXMenuBarAttribute = "AXMenuBar"
|
||
kAXMenuBarItemAttribute = "AXMenuBarItem"
|
||
|
||
# Constants for window properties
|
||
kCGWindowLayer = "kCGWindowLayer" # Z-order information (lower values are higher in the stack)
|
||
kCGWindowAlpha = "kCGWindowAlpha" # Window opacity
|
||
|
||
# Constants for application activation options
|
||
NSApplicationActivationOptions = {
|
||
"regular": 0, # Default activation
|
||
"bringing_all_windows_forward": 1 << 0, # NSApplicationActivateAllWindows
|
||
"ignoring_other_apps": 1 << 1 # NSApplicationActivateIgnoringOtherApps
|
||
}
|
||
|
||
def CFAttributeToPyObject(attrValue):
|
||
def list_helper(list_value):
|
||
list_builder = []
|
||
for item in list_value:
|
||
list_builder.append(CFAttributeToPyObject(item))
|
||
return list_builder
|
||
|
||
def number_helper(number_value):
|
||
success, int_value = Foundation.CFNumberGetValue( # type: ignore
|
||
number_value, Foundation.kCFNumberIntType, None # type: ignore
|
||
)
|
||
if success:
|
||
return int(int_value)
|
||
|
||
success, float_value = Foundation.CFNumberGetValue( # type: ignore
|
||
number_value, Foundation.kCFNumberDoubleType, None # type: ignore
|
||
)
|
||
if success:
|
||
return float(float_value)
|
||
return None
|
||
|
||
def axuielement_helper(element_value):
|
||
return element_value
|
||
|
||
cf_attr_type = Foundation.CFGetTypeID(attrValue) # type: ignore
|
||
cf_type_mapping = {
|
||
Foundation.CFStringGetTypeID(): str, # type: ignore
|
||
Foundation.CFBooleanGetTypeID(): bool, # type: ignore
|
||
Foundation.CFArrayGetTypeID(): list_helper, # type: ignore
|
||
Foundation.CFNumberGetTypeID(): number_helper, # type: ignore
|
||
AXUIElementGetTypeID(): axuielement_helper, # type: ignore
|
||
}
|
||
try:
|
||
return cf_type_mapping[cf_attr_type](attrValue)
|
||
except KeyError:
|
||
# did not get a supported CF type. Move on to AX type
|
||
pass
|
||
|
||
ax_attr_type = AXValueGetType(attrValue)
|
||
ax_type_map = {
|
||
kAXValueCGSizeType: Foundation.NSSizeFromString, # type: ignore
|
||
kAXValueCGPointType: Foundation.NSPointFromString, # type: ignore
|
||
kAXValueCFRangeType: Foundation.NSRangeFromString, # type: ignore
|
||
}
|
||
try:
|
||
search_result = re.search("{.*}", attrValue.description())
|
||
if search_result:
|
||
extracted_str = search_result.group()
|
||
return tuple(ax_type_map[ax_attr_type](extracted_str))
|
||
return None
|
||
except KeyError:
|
||
return None
|
||
|
||
|
||
def element_attribute(element, attribute):
|
||
if attribute == kAXChildrenAttribute:
|
||
err, value = AXUIElementCopyAttributeValues(element, attribute, 0, 999, None)
|
||
if err == kAXErrorSuccess:
|
||
if isinstance(value, Foundation.NSArray): # type: ignore
|
||
return CFAttributeToPyObject(value)
|
||
else:
|
||
return value
|
||
err, value = AXUIElementCopyAttributeValue(element, attribute, None)
|
||
if err == kAXErrorSuccess:
|
||
if isinstance(value, Foundation.NSArray): # type: ignore
|
||
return CFAttributeToPyObject(value)
|
||
else:
|
||
return value
|
||
return None
|
||
|
||
|
||
def element_value(element, type):
|
||
err, value = AXValueGetValue(element, type, None)
|
||
if err == True:
|
||
return value
|
||
return None
|
||
|
||
|
||
class UIElement:
|
||
def __init__(self, element, offset_x=0, offset_y=0, max_depth=None, parents_visible_bbox=None):
|
||
self.ax_element = element
|
||
self.content_identifier = ""
|
||
self.identifier = ""
|
||
self.name = ""
|
||
self.children = []
|
||
self.description = ""
|
||
self.role_description = ""
|
||
self.value = None
|
||
self.max_depth = max_depth
|
||
|
||
# Set role
|
||
self.role = element_attribute(element, kAXRoleAttribute)
|
||
if self.role is None:
|
||
self.role = "No role"
|
||
|
||
# Set name
|
||
self.name = element_attribute(element, kAXTitleAttribute)
|
||
if self.name is not None:
|
||
# Convert tuple to string if needed
|
||
if isinstance(self.name, tuple):
|
||
self.name = str(self.name[0]) if self.name else ""
|
||
self.name = self.name.replace(" ", "_")
|
||
|
||
# Set enabled
|
||
self.enabled = element_attribute(element, kAXEnabledAttribute)
|
||
if self.enabled is None:
|
||
self.enabled = False
|
||
|
||
# Set position and size
|
||
position = element_attribute(element, kAXPositionAttribute)
|
||
size = element_attribute(element, kAXSizeAttribute)
|
||
start_position = element_value(position, kAXValueCGPointType)
|
||
|
||
if self.role == "AXWindow" and start_position is not None:
|
||
offset_x = start_position.x
|
||
offset_y = start_position.y
|
||
|
||
self.absolute_position = copy.copy(start_position)
|
||
self.position = start_position
|
||
if self.position is not None:
|
||
self.position.x -= max(0, offset_x)
|
||
self.position.y -= max(0, offset_y)
|
||
self.size = element_value(size, kAXValueCGSizeType)
|
||
|
||
self._set_bboxes(parents_visible_bbox)
|
||
|
||
# Set component center
|
||
if start_position is None or self.size is None:
|
||
print("Position is None")
|
||
return
|
||
self.center = (
|
||
start_position.x + offset_x + self.size.width / 2,
|
||
start_position.y + offset_y + self.size.height / 2,
|
||
)
|
||
|
||
self.description = element_attribute(element, kAXDescriptionAttribute)
|
||
self.role_description = element_attribute(element, kAXRoleDescriptionAttribute)
|
||
attribute_value = element_attribute(element, kAXValueAttribute)
|
||
|
||
# Set value
|
||
self.value = attribute_value
|
||
if attribute_value is not None:
|
||
if isinstance(attribute_value, Foundation.NSArray): # type: ignore
|
||
self.value = []
|
||
for value in attribute_value:
|
||
self.value.append(value)
|
||
# Check if it's an accessibility element by checking its type ID
|
||
elif Foundation.CFGetTypeID(attribute_value) == AXUIElementGetTypeID(): # type: ignore
|
||
self.value = UIElement(attribute_value, offset_x, offset_y)
|
||
|
||
# Set children
|
||
if self.max_depth is None or self.max_depth > 0:
|
||
self.children = self._get_children(element, start_position, offset_x, offset_y)
|
||
else:
|
||
self.children = []
|
||
|
||
self.calculate_hashes()
|
||
|
||
def _set_bboxes(self, parents_visible_bbox):
|
||
if not self.absolute_position or not self.size:
|
||
self.bbox = None
|
||
self.visible_bbox = None
|
||
return
|
||
self.bbox = [
|
||
int(self.absolute_position.x),
|
||
int(self.absolute_position.y),
|
||
int(self.absolute_position.x + self.size.width),
|
||
int(self.absolute_position.y + self.size.height),
|
||
]
|
||
if parents_visible_bbox:
|
||
# check if not intersected
|
||
if (
|
||
self.bbox[0] > parents_visible_bbox[2]
|
||
or self.bbox[1] > parents_visible_bbox[3]
|
||
or self.bbox[2] < parents_visible_bbox[0]
|
||
or self.bbox[3] < parents_visible_bbox[1]
|
||
):
|
||
self.visible_bbox = None
|
||
else:
|
||
self.visible_bbox = [
|
||
int(max(self.bbox[0], parents_visible_bbox[0])),
|
||
int(max(self.bbox[1], parents_visible_bbox[1])),
|
||
int(min(self.bbox[2], parents_visible_bbox[2])),
|
||
int(min(self.bbox[3], parents_visible_bbox[3])),
|
||
]
|
||
else:
|
||
self.visible_bbox = self.bbox
|
||
|
||
def _get_children(self, element, start_position, offset_x, offset_y):
|
||
children = element_attribute(element, kAXChildrenAttribute)
|
||
visible_children = element_attribute(element, kAXVisibleChildrenAttribute)
|
||
found_children = []
|
||
if children is not None:
|
||
found_children.extend(children)
|
||
else:
|
||
if visible_children is not None:
|
||
found_children.extend(visible_children)
|
||
|
||
result = []
|
||
if self.max_depth is None or self.max_depth > 0:
|
||
for child in found_children:
|
||
child = UIElement(
|
||
child,
|
||
offset_x,
|
||
offset_y,
|
||
self.max_depth - 1 if self.max_depth is not None else None,
|
||
self.visible_bbox,
|
||
)
|
||
result.append(child)
|
||
return result
|
||
|
||
def calculate_hashes(self):
|
||
self.identifier = self.component_hash()
|
||
self.content_identifier = self.children_content_hash(self.children)
|
||
|
||
def component_hash(self):
|
||
if self.position is None or self.size is None:
|
||
return ""
|
||
position_string = f"{self.position.x:.0f};{self.position.y:.0f}"
|
||
size_string = f"{self.size.width:.0f};{self.size.height:.0f}"
|
||
enabled_string = str(self.enabled)
|
||
# Ensure role is a string
|
||
role_string = ""
|
||
if self.role is not None:
|
||
role_string = str(self.role[0]) if isinstance(self.role, tuple) else str(self.role)
|
||
return self.hash_from_string(position_string + size_string + enabled_string + role_string)
|
||
|
||
def hash_from_string(self, string):
|
||
if string is None or string == "":
|
||
return ""
|
||
from hashlib import md5
|
||
|
||
return md5(string.encode()).hexdigest()
|
||
|
||
def children_content_hash(self, children):
|
||
if len(children) == 0:
|
||
return ""
|
||
all_content_hashes = []
|
||
all_hashes = []
|
||
for child in children:
|
||
all_content_hashes.append(child.content_identifier)
|
||
all_hashes.append(child.identifier)
|
||
all_content_hashes.sort()
|
||
if len(all_content_hashes) == 0:
|
||
return ""
|
||
content_hash = self.hash_from_string("".join(all_content_hashes))
|
||
content_structure_hash = self.hash_from_string("".join(all_hashes))
|
||
return self.hash_from_string(content_hash.join(content_structure_hash))
|
||
|
||
def to_dict(self):
|
||
def children_to_dict(children):
|
||
result = []
|
||
for child in children:
|
||
result.append(child.to_dict())
|
||
return result
|
||
|
||
value = self.value
|
||
if isinstance(value, UIElement):
|
||
value = json.dumps(value.to_dict(), indent=4)
|
||
elif isinstance(value, AppKit.NSDate): # type: ignore
|
||
value = str(value)
|
||
|
||
if self.absolute_position is not None:
|
||
absolute_position = f"{self.absolute_position.x:.2f};{self.absolute_position.y:.2f}"
|
||
else:
|
||
absolute_position = ""
|
||
|
||
if self.position is not None:
|
||
position = f"{self.position.x:.2f};{self.position.y:.2f}"
|
||
else:
|
||
position = ""
|
||
|
||
if self.size is not None:
|
||
size = f"{self.size.width:.0f};{self.size.height:.0f}"
|
||
else:
|
||
size = ""
|
||
|
||
return {
|
||
"id": self.identifier,
|
||
"name": self.name,
|
||
"role": self.role,
|
||
"description": self.description,
|
||
"role_description": self.role_description,
|
||
"value": value,
|
||
"absolute_position": absolute_position,
|
||
"position": position,
|
||
"size": size,
|
||
"enabled": self.enabled,
|
||
"bbox": self.bbox,
|
||
"visible_bbox": self.visible_bbox,
|
||
"children": children_to_dict(self.children),
|
||
}
|
||
|
||
|
||
import Quartz
|
||
from AppKit import NSWorkspace, NSRunningApplication
|
||
from pathlib import Path
|
||
|
||
def get_all_windows_zorder():
|
||
window_list = Quartz.CGWindowListCopyWindowInfo(
|
||
Quartz.kCGWindowListOptionOnScreenOnly,
|
||
Quartz.kCGNullWindowID
|
||
)
|
||
z_order = {window['kCGWindowNumber']: z_index for z_index, window in enumerate(window_list[::-1])}
|
||
window_list_all = Quartz.CGWindowListCopyWindowInfo(
|
||
Quartz.kCGWindowListOptionAll,
|
||
Quartz.kCGNullWindowID
|
||
)
|
||
windows = []
|
||
for window in window_list_all:
|
||
window_id = window.get('kCGWindowNumber', 0)
|
||
window_name = window.get('kCGWindowName', '')
|
||
window_pid = window.get('kCGWindowOwnerPID', 0)
|
||
window_bounds = window.get('kCGWindowBounds', {})
|
||
window_owner = window.get('kCGWindowOwnerName', '')
|
||
window_is_on_screen = window.get('kCGWindowIsOnscreen', False)
|
||
layer = window.get('kCGWindowLayer', 0)
|
||
opacity = window.get('kCGWindowAlpha', 1.0)
|
||
z_index = z_order.get(window_id, -1)
|
||
if window_name == "Dock" and window_owner == "Dock":
|
||
role = "dock"
|
||
elif window_name == "Menubar" and window_owner == "Window Server":
|
||
role = "menubar"
|
||
elif window_owner in ["Window Server", "Dock"]:
|
||
role = "desktop"
|
||
else:
|
||
role = "app"
|
||
if window_bounds:
|
||
windows.append({
|
||
"id": window_id,
|
||
"name": window_name or "Unnamed Window",
|
||
"pid": window_pid,
|
||
"owner": window_owner,
|
||
"role": role,
|
||
"is_on_screen": window_is_on_screen,
|
||
"bounds": {
|
||
"x": window_bounds.get('X', 0),
|
||
"y": window_bounds.get('Y', 0),
|
||
"width": window_bounds.get('Width', 0),
|
||
"height": window_bounds.get('Height', 0)
|
||
},
|
||
"layer": layer,
|
||
"z_index": z_index,
|
||
"opacity": opacity
|
||
})
|
||
windows = sorted(windows, key=lambda x: x["z_index"])
|
||
return windows
|
||
|
||
def get_app_info(app):
|
||
return {
|
||
"name": app.localizedName(),
|
||
"bundle_id": app.bundleIdentifier(),
|
||
"pid": app.processIdentifier(),
|
||
"active": app.isActive(),
|
||
"hidden": app.isHidden(),
|
||
"terminated": app.isTerminated(),
|
||
}
|
||
|
||
def get_menubar_items(active_app_pid=None):
|
||
menubar_items = []
|
||
if active_app_pid is None:
|
||
frontmost_app = NSWorkspace.sharedWorkspace().frontmostApplication()
|
||
if frontmost_app:
|
||
active_app_pid = frontmost_app.processIdentifier()
|
||
else:
|
||
return menubar_items
|
||
app_element = AXUIElementCreateApplication(active_app_pid)
|
||
if app_element is None:
|
||
return menubar_items
|
||
menubar = element_attribute(app_element, kAXMenuBarAttribute)
|
||
if menubar is None:
|
||
return menubar_items
|
||
children = element_attribute(menubar, kAXChildrenAttribute)
|
||
if children is None:
|
||
return menubar_items
|
||
for i, item in enumerate(children):
|
||
title = element_attribute(item, kAXTitleAttribute) or "Untitled"
|
||
bounds = {"x": 0, "y": 0, "width": 0, "height": 0}
|
||
position_value = element_attribute(item, kAXPositionAttribute)
|
||
if position_value:
|
||
position_value = element_value(position_value, kAXValueCGPointType)
|
||
bounds["x"] = getattr(position_value, 'x', 0)
|
||
bounds["y"] = getattr(position_value, 'y', 0)
|
||
size_value = element_attribute(item, kAXSizeAttribute)
|
||
if size_value:
|
||
size_value = element_value(size_value, kAXValueCGSizeType)
|
||
bounds["width"] = getattr(size_value, 'width', 0)
|
||
bounds["height"] = getattr(size_value, 'height', 0)
|
||
menubar_items.append({
|
||
"title": title,
|
||
"bounds": bounds,
|
||
"index": i,
|
||
"app_pid": active_app_pid
|
||
})
|
||
return menubar_items
|
||
|
||
def get_dock_items():
|
||
dock_items = []
|
||
dock_pid = None
|
||
running_apps = NSWorkspace.sharedWorkspace().runningApplications()
|
||
for app in running_apps:
|
||
if app.localizedName() == "Dock" and app.bundleIdentifier() == "com.apple.dock":
|
||
dock_pid = app.processIdentifier()
|
||
break
|
||
if dock_pid is None:
|
||
return dock_items
|
||
dock_element = AXUIElementCreateApplication(dock_pid)
|
||
if dock_element is None:
|
||
return dock_items
|
||
dock_list = element_attribute(dock_element, kAXChildrenAttribute)
|
||
if dock_list is None or len(dock_list) == 0:
|
||
return dock_items
|
||
dock_app_list = None
|
||
for child in dock_list:
|
||
role = element_attribute(child, kAXRoleAttribute)
|
||
if role == "AXList":
|
||
dock_app_list = child
|
||
break
|
||
if dock_app_list is None:
|
||
return dock_items
|
||
items = element_attribute(dock_app_list, kAXChildrenAttribute)
|
||
if items is None:
|
||
return dock_items
|
||
for i, item in enumerate(items):
|
||
title = element_attribute(item, kAXTitleAttribute) or "Untitled"
|
||
description = element_attribute(item, kAXDescriptionAttribute) or ""
|
||
role = element_attribute(item, kAXRoleAttribute) or ""
|
||
subrole = element_attribute(item, "AXSubrole") or ""
|
||
bounds = {"x": 0, "y": 0, "width": 0, "height": 0}
|
||
position_value = element_attribute(item, kAXPositionAttribute)
|
||
if position_value:
|
||
position_value = element_value(position_value, kAXValueCGPointType)
|
||
bounds["x"] = getattr(position_value, 'x', 0)
|
||
bounds["y"] = getattr(position_value, 'y', 0)
|
||
size_value = element_attribute(item, kAXSizeAttribute)
|
||
if size_value:
|
||
size_value = element_value(size_value, kAXValueCGSizeType)
|
||
bounds["width"] = getattr(size_value, 'width', 0)
|
||
bounds["height"] = getattr(size_value, 'height', 0)
|
||
item_type = "unknown"
|
||
if subrole == "AXApplicationDockItem":
|
||
item_type = "application"
|
||
elif subrole == "AXFolderDockItem":
|
||
item_type = "folder"
|
||
elif subrole == "AXDocumentDockItem":
|
||
item_type = "document"
|
||
elif subrole == "AXSeparatorDockItem" or role == "AXSeparator":
|
||
item_type = "separator"
|
||
elif "trash" in title.lower():
|
||
item_type = "trash"
|
||
dock_items.append({
|
||
"title": title,
|
||
"description": description,
|
||
"bounds": bounds,
|
||
"index": i,
|
||
"type": item_type,
|
||
"role": role,
|
||
"subrole": subrole
|
||
})
|
||
return dock_items
|
||
|
||
class MacOSAccessibilityHandler(BaseAccessibilityHandler):
|
||
def get_desktop_state(self):
|
||
windows = [w for w in get_all_windows_zorder() if w.get("is_on_screen")]
|
||
running_apps = self.get_running_apps()
|
||
applications = []
|
||
pid_to_window_ids = {}
|
||
# Build a mapping: pid -> list of AX window trees
|
||
pid_to_ax_trees = {}
|
||
for app in running_apps:
|
||
pid = app.processIdentifier()
|
||
try:
|
||
app_elem = AXUIElementCreateApplication(pid)
|
||
err, app_windows = AXUIElementCopyAttributeValue(app_elem, kAXWindowsAttribute, None)
|
||
trees = []
|
||
if err == kAXErrorSuccess and app_windows:
|
||
for ax_win in app_windows:
|
||
try:
|
||
trees.append(UIElement(ax_win).to_dict())
|
||
except Exception as e:
|
||
trees.append({"error": str(e)})
|
||
pid_to_ax_trees[pid] = trees
|
||
except Exception as e:
|
||
pid_to_ax_trees[pid] = [{"error": str(e)}]
|
||
# Attach children by pid and index (order)
|
||
pid_to_idx = {}
|
||
for win in windows:
|
||
pid = win["pid"]
|
||
idx = pid_to_idx.get(pid, 0)
|
||
ax_trees = pid_to_ax_trees.get(pid, [])
|
||
win["children"] = ax_trees[idx]["children"] if idx < len(ax_trees) and "children" in ax_trees[idx] else []
|
||
pid_to_idx[pid] = idx + 1
|
||
pid_to_window_ids.setdefault(pid, []).append(win["id"])
|
||
for app in running_apps:
|
||
info = get_app_info(app)
|
||
app_pid = info["pid"]
|
||
applications.append({
|
||
"info": info,
|
||
"windows": pid_to_window_ids.get(app_pid, [])
|
||
})
|
||
menubar_items = get_menubar_items()
|
||
dock_items = get_dock_items()
|
||
return {
|
||
"applications": applications,
|
||
"windows": windows,
|
||
"menubar_items": menubar_items,
|
||
"dock_items": dock_items
|
||
}
|
||
|
||
def get_application_windows(self, pid: int):
|
||
"""Get all windows for a specific application."""
|
||
try:
|
||
app = AXUIElementCreateApplication(pid)
|
||
err, windows = AXUIElementCopyAttributeValue(app, kAXWindowsAttribute, None)
|
||
if err == kAXErrorSuccess and windows:
|
||
if isinstance(windows, Foundation.NSArray): # type: ignore
|
||
return windows
|
||
return []
|
||
except:
|
||
return []
|
||
|
||
def get_all_windows(self):
|
||
"""Get all visible windows in the system."""
|
||
try:
|
||
windows = []
|
||
running_apps = self.get_running_apps()
|
||
|
||
for app in running_apps:
|
||
try:
|
||
app_name = app.localizedName()
|
||
pid = app.processIdentifier()
|
||
|
||
# Skip system processes and background apps
|
||
if not app.activationPolicy() == 0: # NSApplicationActivationPolicyRegular
|
||
continue
|
||
|
||
# Get application windows
|
||
app_windows = self.get_application_windows(pid)
|
||
|
||
windows.append(
|
||
{
|
||
"app_name": app_name,
|
||
"pid": pid,
|
||
"frontmost": app.isActive(),
|
||
"has_windows": len(app_windows) > 0,
|
||
"windows": app_windows,
|
||
}
|
||
)
|
||
except:
|
||
continue
|
||
|
||
return windows
|
||
except:
|
||
return []
|
||
|
||
def get_running_apps(self):
|
||
# From NSWorkspace.runningApplications docs: https://developer.apple.com/documentation/appkit/nsworkspace/runningapplications
|
||
# "Similar to the NSRunningApplication class’s properties, this property will only change when the main run loop runs in a common mode"
|
||
# So we need to run the main run loop to get the latest running applications
|
||
Foundation.CFRunLoopRunInMode(Foundation.kCFRunLoopDefaultMode, 0.1, False) # type: ignore
|
||
return NSWorkspace.sharedWorkspace().runningApplications()
|
||
|
||
def get_ax_attribute(self, element, attribute):
|
||
return element_attribute(element, attribute)
|
||
|
||
def serialize_node(self, element):
|
||
# Create a serializable dictionary representation of an accessibility element
|
||
result = {}
|
||
|
||
# Get basic attributes
|
||
result["role"] = self.get_ax_attribute(element, kAXRoleAttribute)
|
||
result["title"] = self.get_ax_attribute(element, kAXTitleAttribute)
|
||
result["value"] = self.get_ax_attribute(element, kAXValueAttribute)
|
||
|
||
# Get position and size if available
|
||
position = self.get_ax_attribute(element, kAXPositionAttribute)
|
||
if position:
|
||
try:
|
||
position_dict = {"x": position[0], "y": position[1]}
|
||
result["position"] = position_dict
|
||
except (IndexError, TypeError):
|
||
pass
|
||
|
||
size = self.get_ax_attribute(element, kAXSizeAttribute)
|
||
if size:
|
||
try:
|
||
size_dict = {"width": size[0], "height": size[1]}
|
||
result["size"] = size_dict
|
||
except (IndexError, TypeError):
|
||
pass
|
||
|
||
return result
|
||
|
||
async def get_accessibility_tree(self) -> Dict[str, Any]:
|
||
try:
|
||
desktop_state = self.get_desktop_state()
|
||
return {
|
||
"success": True,
|
||
**desktop_state
|
||
}
|
||
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
async def find_element(
|
||
self, role: Optional[str] = None, title: Optional[str] = None, value: Optional[str] = None
|
||
) -> Dict[str, Any]:
|
||
try:
|
||
system = AXUIElementCreateSystemWide()
|
||
|
||
def match_element(element):
|
||
if role and self.get_ax_attribute(element, kAXRoleAttribute) != role:
|
||
return False
|
||
if title and self.get_ax_attribute(element, kAXTitleAttribute) != title:
|
||
return False
|
||
if value and str(self.get_ax_attribute(element, kAXValueAttribute)) != value:
|
||
return False
|
||
return True
|
||
|
||
def search_tree(element):
|
||
if match_element(element):
|
||
return self.serialize_node(element)
|
||
|
||
children = self.get_ax_attribute(element, kAXChildrenAttribute)
|
||
if children:
|
||
for child in children:
|
||
result = search_tree(child)
|
||
if result:
|
||
return result
|
||
return None
|
||
|
||
element = search_tree(system)
|
||
return {"success": True, "element": element}
|
||
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
class MacOSAutomationHandler(BaseAutomationHandler):
|
||
# Mouse Actions
|
||
mouse = MouseController()
|
||
keyboard = KeyboardController()
|
||
|
||
async def mouse_down(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
|
||
try:
|
||
if x is not None and y is not None:
|
||
self.mouse.position = (x, y)
|
||
self.mouse.press(Button.left if button == "left" else Button.right if button == "right" else Button.middle)
|
||
return {"success": True}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
async def mouse_up(self, x: Optional[int] = None, y: Optional[int] = None, button: str = "left") -> Dict[str, Any]:
|
||
try:
|
||
if x is not None and y is not None:
|
||
self.mouse.position = (x, y)
|
||
self.mouse.release(Button.left if button == "left" else Button.right if button == "right" else Button.middle)
|
||
return {"success": True}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
async def left_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||
try:
|
||
if x is not None and y is not None:
|
||
self.mouse.position = (x, y)
|
||
self.mouse.click(Button.left, 1)
|
||
return {"success": True}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
async def right_click(self, x: Optional[int] = None, y: Optional[int] = None) -> Dict[str, Any]:
|
||
try:
|
||
if x is not None and y is not None:
|
||
self.mouse.position = (x, y)
|
||
self.mouse.click(Button.right, 1)
|
||
return {"success": True}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
async def double_click(
|
||
self, x: Optional[int] = None, y: Optional[int] = None
|
||
) -> Dict[str, Any]:
|
||
try:
|
||
if x is not None and y is not None:
|
||
self.mouse.position = (x, y)
|
||
self.mouse.click(Button.left, 2)
|
||
return {"success": True}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
async def move_cursor(self, x: int, y: int) -> Dict[str, Any]:
|
||
try:
|
||
self.mouse.position = (x, y)
|
||
return {"success": True}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
async def drag_to(
|
||
self, x: int, y: int, button: str = "left", duration: float = 0.5
|
||
) -> Dict[str, Any]:
|
||
try:
|
||
btn = Button.left if button == "left" else Button.right if button == "right" else Button.middle
|
||
# Press
|
||
self.mouse.press(btn)
|
||
# Move with sleep to simulate drag duration
|
||
start = self.mouse.position
|
||
steps = 20
|
||
start_x, start_y = start
|
||
dx = (x - start_x) / steps
|
||
dy = (y - start_y) / steps
|
||
for i in range(steps):
|
||
self.mouse.position = (int(start_x + dx * (i + 1)), int(start_y + dy * (i + 1)))
|
||
time.sleep(duration / steps)
|
||
# Release
|
||
self.mouse.release(btn)
|
||
return {"success": True}
|
||
except Exception as e:
|
||
try:
|
||
self.mouse.release(btn)
|
||
except:
|
||
pass
|
||
return {"success": False, "error": str(e)}
|
||
|
||
async def drag(
|
||
self, path: List[Tuple[int, int]], button: str = "left", duration: float = 0.5
|
||
) -> Dict[str, Any]:
|
||
try:
|
||
if not path or len(path) < 2:
|
||
return {"success": False, "error": "Path must contain at least 2 points"}
|
||
btn = Button.left if button == "left" else Button.right if button == "right" else Button.middle
|
||
# Move to the first point
|
||
self.mouse.position = path[0]
|
||
self.mouse.press(btn)
|
||
step_duration = duration / (len(path) - 1) if len(path) > 1 else duration
|
||
for x, y in path[1:]:
|
||
self.mouse.position = (x, y)
|
||
time.sleep(step_duration)
|
||
self.mouse.release(btn)
|
||
return {"success": True}
|
||
except Exception as e:
|
||
try:
|
||
self.mouse.release(btn)
|
||
except:
|
||
pass
|
||
return {"success": False, "error": str(e)}
|
||
|
||
# Keyboard Actions
|
||
async def key_down(self, key: str) -> Dict[str, Any]:
|
||
try:
|
||
# use pyautogui for their key names
|
||
pyautogui.keyDown(key)
|
||
return {"success": True}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
async def key_up(self, key: str) -> Dict[str, Any]:
|
||
try:
|
||
# use pyautogui for their key names
|
||
pyautogui.keyUp(key)
|
||
return {"success": True}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
async def type_text(self, text: str) -> Dict[str, Any]:
|
||
try:
|
||
# use pynput for Unicode support
|
||
self.keyboard.type(text)
|
||
return {"success": True}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
async def press_key(self, key: str) -> Dict[str, Any]:
|
||
try:
|
||
# use pyautogui for their key names
|
||
pyautogui.press(key)
|
||
return {"success": True}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
async def hotkey(self, keys: List[str]) -> Dict[str, Any]:
|
||
try:
|
||
# use pyautogui for their key names
|
||
pyautogui.hotkey(*keys)
|
||
return {"success": True}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
# Scrolling Actions
|
||
async def scroll(self, x: int, y: int) -> Dict[str, Any]:
|
||
try:
|
||
self.mouse.scroll(x, y)
|
||
return {"success": True}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
async def scroll_down(self, clicks: int = 1) -> Dict[str, Any]:
|
||
try:
|
||
self.mouse.scroll(0, -clicks)
|
||
return {"success": True}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
async def scroll_up(self, clicks: int = 1) -> Dict[str, Any]:
|
||
try:
|
||
self.mouse.scroll(0, clicks)
|
||
return {"success": True}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
# Screen Actions
|
||
async def screenshot(self) -> Dict[str, Any]:
|
||
try:
|
||
from PIL import Image
|
||
|
||
screenshot = pyautogui.screenshot()
|
||
if not isinstance(screenshot, Image.Image):
|
||
return {"success": False, "error": "Failed to capture screenshot"}
|
||
|
||
buffered = BytesIO()
|
||
screenshot.save(buffered, format="PNG", optimize=True)
|
||
buffered.seek(0)
|
||
image_data = base64.b64encode(buffered.getvalue()).decode()
|
||
return {"success": True, "image_data": image_data}
|
||
except Exception as e:
|
||
return {"success": False, "error": f"Screenshot error: {str(e)}"}
|
||
|
||
async def get_screen_size(self) -> Dict[str, Any]:
|
||
try:
|
||
size = pyautogui.size()
|
||
return {"success": True, "size": {"width": size.width, "height": size.height}}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
async def get_cursor_position(self) -> Dict[str, Any]:
|
||
try:
|
||
x, y = self.mouse.position
|
||
return {"success": True, "position": {"x": x, "y": y}}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
# Clipboard Actions
|
||
async def copy_to_clipboard(self) -> Dict[str, Any]:
|
||
try:
|
||
import pyperclip
|
||
|
||
content = pyperclip.paste()
|
||
return {"success": True, "content": content}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
async def set_clipboard(self, text: str) -> Dict[str, Any]:
|
||
try:
|
||
import pyperclip
|
||
|
||
pyperclip.copy(text)
|
||
return {"success": True}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|
||
|
||
async def run_command(self, command: str) -> Dict[str, Any]:
|
||
"""Run a shell command and return its output."""
|
||
try:
|
||
# Create subprocess
|
||
process = await asyncio.create_subprocess_shell(
|
||
command,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE
|
||
)
|
||
# Wait for the subprocess to finish
|
||
stdout, stderr = await process.communicate()
|
||
# Return decoded output
|
||
return {
|
||
"success": True,
|
||
"stdout": stdout.decode() if stdout else "",
|
||
"stderr": stderr.decode() if stderr else "",
|
||
"return_code": process.returncode
|
||
}
|
||
except Exception as e:
|
||
return {"success": False, "error": str(e)}
|